libceph: drop snapid in ceph_calc_raw_layout()
[deliverable/linux.git] / net / ceph / osd_client.c
index ca59e66c9787303805519f2bf325cc5d5817ff55..b5a4b2875e8ac175f3a8981c1f2152370499df66 100644 (file)
@@ -32,49 +32,44 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-       switch (op) {
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_NOTIFY:
-               return 1;
-       default:
-               return 0;
-       }
-}
-
 static int op_has_extent(int op)
 {
        return (op == CEPH_OSD_OP_READ ||
                op == CEPH_OSD_OP_WRITE);
 }
 
-void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                        struct ceph_file_layout *layout,
-                       u64 snapid,
                        u64 off, u64 *plen, u64 *bno,
                        struct ceph_osd_request *req,
                        struct ceph_osd_req_op *op)
 {
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
-
-       reqhead->snapid = cpu_to_le64(snapid);
+       int r;
 
        /* object extent? */
-       ceph_calc_file_object_mapping(layout, off, plen, bno,
-                                     &objoff, &objlen);
-       if (*plen < orig_len)
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
+                                         &objoff, &objlen);
+       if (r < 0)
+               return r;
+       if (objlen < orig_len) {
+               *plen = objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
+       }
 
        if (op_has_extent(op->op)) {
+               u32 osize = le32_to_cpu(layout->fl_object_size);
                op->extent.offset = objoff;
                op->extent.length = objlen;
+               if (op->extent.truncate_size <= off - objoff) {
+                       op->extent.truncate_size = 0;
+               } else {
+                       op->extent.truncate_size -= off - objoff;
+                       if (op->extent.truncate_size > osize)
+                               op->extent.truncate_size = osize;
+               }
        }
        req->r_num_pages = calc_pages_for(off, *plen);
        req->r_page_alignment = off & ~PAGE_MASK;
@@ -83,7 +78,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 
        dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
             *bno, objoff, objlen, req->r_num_pages);
-
+       return 0;
 }
 EXPORT_SYMBOL(ceph_calc_raw_layout);
 
@@ -112,20 +107,24 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static void calc_layout(struct ceph_osd_client *osdc,
-                       struct ceph_vino vino,
-                       struct ceph_file_layout *layout,
-                       u64 off, u64 *plen,
-                       struct ceph_osd_request *req,
-                       struct ceph_osd_req_op *op)
+static int calc_layout(struct ceph_osd_client *osdc,
+                      struct ceph_vino vino,
+                      struct ceph_file_layout *layout,
+                      u64 off, u64 *plen,
+                      struct ceph_osd_request *req,
+                      struct ceph_osd_req_op *op)
 {
        u64 bno;
+       int r;
 
-       ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                            plen, &bno, req, op);
+       r = ceph_calc_raw_layout(osdc, layout, off, plen, &bno, req, op);
+       if (r < 0)
+               return r;
 
        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
        req->r_oid_len = strlen(req->r_oid);
+
+       return r;
 }
 
 /*
@@ -140,10 +139,9 @@ void ceph_osdc_release_request(struct kref *kref)
        if (req->r_request)
                ceph_msg_put(req->r_request);
        if (req->r_con_filling_msg) {
-               dout("release_request revoking pages %p from con %p\n",
+               dout("%s revoking pages %p from con %p\n", __func__,
                     req->r_pages, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg,
-                                     req->r_reply);
+               ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
        }
        if (req->r_reply)
@@ -156,10 +154,7 @@ void ceph_osdc_release_request(struct kref *kref)
                bio_put(req->r_bio);
 #endif
        ceph_put_snap_context(req->r_snapc);
-       if (req->r_trail) {
-               ceph_pagelist_release(req->r_trail);
-               kfree(req->r_trail);
-       }
+       ceph_pagelist_release(&req->r_trail);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
@@ -167,17 +162,12 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+static int get_num_ops(struct ceph_osd_req_op *ops)
 {
        int i = 0;
 
-       if (needs_trail)
-               *needs_trail = 0;
-       while (ops[i].op) {
-               if (needs_trail && op_needs_trail(ops[i].op))
-                       *needs_trail = 1;
+       while (ops[i].op)
                i++;
-       }
 
        return i;
 }
@@ -193,8 +183,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int needs_trail;
-       int num_op = get_num_ops(ops, &needs_trail);
+       int num_op = get_num_ops(ops);
        size_t msg_size = sizeof(struct ceph_osd_request_head);
 
        msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -214,10 +203,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
+       RB_CLEAR_NODE(&req->r_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd);
        INIT_LIST_HEAD(&req->r_req_lru_item);
+       INIT_LIST_HEAD(&req->r_osd_item);
+
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -234,15 +226,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       /* allocate space for the trailing data */
-       if (needs_trail) {
-               req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-               if (!req->r_trail) {
-                       ceph_osdc_put_request(req);
-                       return NULL;
-               }
-               ceph_pagelist_init(req->r_trail);
-       }
+       ceph_pagelist_init(&req->r_trail);
+
        /* create request message; allow space for oid */
        msg_size += MAX_OBJ_NAME_SIZE;
        if (snapc)
@@ -256,7 +241,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                return NULL;
        }
 
-       msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
        memset(msg->front.iov_base, 0, msg->front.iov_len);
 
        req->r_request = msg;
@@ -294,29 +278,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_GETXATTR:
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
-               BUG_ON(!req->r_trail);
-
                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
                dst->xattr.cmp_op = src->xattr.cmp_op;
                dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               ceph_pagelist_append(req->r_trail, src->xattr.name,
+               ceph_pagelist_append(&req->r_trail, src->xattr.name,
                                     src->xattr.name_len);
-               ceph_pagelist_append(req->r_trail, src->xattr.val,
+               ceph_pagelist_append(&req->r_trail, src->xattr.val,
                                     src->xattr.value_len);
                break;
        case CEPH_OSD_OP_CALL:
-               BUG_ON(!req->r_trail);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-               ceph_pagelist_append(req->r_trail, src->cls.class_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.class_name,
                                     src->cls.class_len);
-               ceph_pagelist_append(req->r_trail, src->cls.method_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.method_name,
                                     src->cls.method_len);
-               ceph_pagelist_append(req->r_trail, src->cls.indata,
+               ceph_pagelist_append(&req->r_trail, src->cls.indata,
                                     src->cls.indata_len);
                break;
        case CEPH_OSD_OP_ROLLBACK:
@@ -329,11 +309,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                        __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
                        __le32 timeout = cpu_to_le32(src->watch.timeout);
 
-                       BUG_ON(!req->r_trail);
-
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &prot_ver, sizeof(prot_ver));
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &timeout, sizeof(timeout));
                }
        case CEPH_OSD_OP_NOTIFY_ACK:
@@ -355,25 +333,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 *plen,
+                            u64 off, u64 len,
                             struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc,
-                            struct timespec *mtime,
-                            const char *oid,
-                            int oid_len)
+                            struct ceph_snap_context *snapc, u64 snap_id,
+                            struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
        struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
        struct ceph_osd_op *op;
        void *p;
-       int num_op = get_num_ops(src_ops, NULL);
+       int num_op = get_num_ops(src_ops);
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
        int flags = req->r_flags;
        u64 data_len = 0;
        int i;
 
        head = msg->front.iov_base;
+       head->snapid = cpu_to_le64(snap_id);
        op = (void *)(head + 1);
        p = (void *)(op + num_op);
 
@@ -387,9 +364,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
 
        /* fill in oid */
-       head->object_len = cpu_to_le32(oid_len);
-       memcpy(p, oid, oid_len);
-       p += oid_len;
+       head->object_len = cpu_to_le32(req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
 
        src_op = src_ops;
        while (src_op->op) {
@@ -398,8 +375,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
                op++;
        }
 
-       if (req->r_trail)
-               data_len += req->r_trail->length;
+       data_len += req->r_trail.length;
 
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
@@ -412,7 +388,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
-               req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+               req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
        } else if (data_len) {
                req->r_request->hdr.data_off = 0;
                req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -454,6 +430,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_req_op ops[3];
        struct ceph_osd_request *req;
+       int r;
 
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
@@ -472,10 +449,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                         use_mempool,
                                         GFP_NOFS, NULL, NULL);
        if (!req)
-               return NULL;
+               return ERR_PTR(-ENOMEM);
 
        /* calculate max write size */
-       calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       if (r < 0)
+               return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
 
        /* in case it differs from natural (file) alignment that
@@ -483,10 +462,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_num_pages = calc_pages_for(page_align, *plen);
        req->r_page_alignment = page_align;
 
-       ceph_osdc_build_request(req, off, plen, ops,
-                               snapc,
-                               mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(req, off, *plen, ops, snapc, vino.snap, mtime);
 
        return req;
 }
@@ -568,7 +544,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
        err = __reset_osd(osdc, osd);
-       if (err == -EAGAIN)
+       if (err)
                return;
 
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -595,14 +571,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        }
 }
 
-static void kick_osd_requests(struct ceph_osd_client *osdc,
-                             struct ceph_osd *kickosd)
-{
-       mutex_lock(&osdc->request_mutex);
-       __kick_osd_requests(osdc, kickosd);
-       mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * If the osd connection drops, we need to resubmit all requests.
  */
@@ -616,7 +584,9 @@ static void osd_reset(struct ceph_connection *con)
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
        down_read(&osdc->map_sem);
-       kick_osd_requests(osdc, osd);
+       mutex_lock(&osdc->request_mutex);
+       __kick_osd_requests(osdc, osd);
+       mutex_unlock(&osdc->request_mutex);
        send_queued(osdc);
        up_read(&osdc->map_sem);
 }
@@ -624,7 +594,7 @@ static void osd_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 {
        struct ceph_osd *osd;
 
@@ -634,15 +604,14 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
+       osd->o_osd = onum;
+       RB_CLEAR_NODE(&osd->o_node);
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
-       ceph_con_init(osdc->client->msgr, &osd->o_con);
-       osd->o_con.private = osd;
-       osd->o_con.ops = &osd_con_ops;
-       osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+       ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 
        INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
@@ -688,7 +657,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 
 static void remove_all_osds(struct ceph_osd_client *osdc)
 {
-       dout("__remove_old_osds %p\n", osdc);
+       dout("%s %p\n", __func__, osdc);
        mutex_lock(&osdc->request_mutex);
        while (!RB_EMPTY_ROOT(&osdc->osds)) {
                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -733,29 +702,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
-       int ret = 0;
+       struct ceph_entity_addr *peer_addr;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
-       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-                         &osd->o_con.peer_addr,
-                         sizeof(osd->o_con.peer_addr)) == 0 &&
-                  !ceph_con_opened(&osd->o_con)) {
+
+               return -ENODEV;
+       }
+
+       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+                       !ceph_con_opened(&osd->o_con)) {
+               struct ceph_osd_request *req;
+
                dout(" osd addr hasn't changed and connection never opened,"
                     " letting msgr retry");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
-               ret = -EAGAIN;
-       } else {
-               ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
-               osd->o_incarnation++;
+
+               return -EAGAIN;
        }
-       return ret;
+
+       ceph_con_close(&osd->o_con);
+       ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+       osd->o_incarnation++;
+
+       return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -853,7 +828,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        if (req->r_osd) {
                /* make sure the original request isn't in flight. */
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
 
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests) &&
@@ -865,9 +840,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                        req->r_osd = NULL;
        }
 
+       list_del_init(&req->r_req_lru_item);
        ceph_osdc_put_request(req);
 
-       list_del_init(&req->r_req_lru_item);
        if (osdc->num_requests == 0) {
                dout(" no requests, canceling timeout\n");
                __cancel_osd_timeout(osdc);
@@ -880,7 +855,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 static void __cancel_request(struct ceph_osd_request *req)
 {
        if (req->r_sent && req->r_osd) {
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
                req->r_sent = 0;
        }
 }
@@ -890,15 +865,17 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
 {
        dout("__register_linger_request %p\n", req);
        list_add_tail(&req->r_linger_item, &osdc->req_linger);
-       list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+       if (req->r_osd)
+               list_add_tail(&req->r_linger_osd,
+                             &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req)
 {
        dout("__unregister_linger_request %p\n", req);
+       list_del_init(&req->r_linger_item);
        if (req->r_osd) {
-               list_del_init(&req->r_linger_item);
                list_del_init(&req->r_linger_osd);
 
                if (list_empty(&req->r_osd->o_requests) &&
@@ -998,18 +975,18 @@ static int __map_request(struct ceph_osd_client *osdc,
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
-               req->r_osd = create_osd(osdc);
+               req->r_osd = create_osd(osdc, o);
                if (!req->r_osd) {
                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
                }
 
                dout("map_request osd %p is osd%d\n", req->r_osd, o);
-               req->r_osd->o_osd = o;
-               req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
 
-               ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
+               ceph_con_open(&req->r_osd->o_con,
+                             CEPH_ENTITY_TYPE_OSD, o,
+                             &osdc->osdmap->osd_addr[o]);
        }
 
        if (req->r_osd) {
@@ -1077,12 +1054,10 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req, *last_req = NULL;
+       struct ceph_osd_request *req;
        struct ceph_osd *osd;
-       unsigned long timeout = osdc->client->options->osd_timeout * HZ;
        unsigned long keepalive =
                osdc->client->options->osd_keepalive_timeout * HZ;
-       unsigned long last_stamp = 0;
        struct list_head slow_osds;
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -1091,37 +1066,6 @@ static void handle_timeout(struct work_struct *work)
 
        mutex_lock(&osdc->request_mutex);
 
-       /*
-        * reset osds that appear to be _really_ unresponsive.  this
-        * is a failsafe measure.. we really shouldn't be getting to
-        * this point if the system is working properly.  the monitors
-        * should mark the osd as failed and we should find out about
-        * it from an updated osd map.
-        */
-       while (timeout && !list_empty(&osdc->req_lru)) {
-               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
-                                r_req_lru_item);
-
-               /* hasn't been long enough since we sent it? */
-               if (time_before(jiffies, req->r_stamp + timeout))
-                       break;
-
-               /* hasn't been long enough since it was acked? */
-               if (req->r_request->ack_stamp == 0 ||
-                   time_before(jiffies, req->r_request->ack_stamp + timeout))
-                       break;
-
-               BUG_ON(req == last_req && req->r_stamp == last_stamp);
-               last_req = req;
-               last_stamp = req->r_stamp;
-
-               osd = req->r_osd;
-               BUG_ON(!osd);
-               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
-                          req->r_tid, osd->o_osd);
-               __kick_osd_requests(osdc, osd);
-       }
-
        /*
         * ping osds that are a bit slow.  this ensures that if there
         * is a break in the TCP connection we will notice, and reopen
@@ -1293,7 +1237,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  * Requeue requests whose mapping to an OSD has changed.  If requests map to
  * no osd, request a new map.
  *
- * Caller should hold map_sem for read and request_mutex.
+ * Caller should hold map_sem for read.
  */
 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 {
@@ -1304,8 +1248,27 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
        dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
        mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+       for (p = rb_first(&osdc->requests); p; ) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
+               p = rb_next(p);
+
+               /*
+                * For linger requests that have not yet been
+                * registered, move them to the linger list; they'll
+                * be sent to the osd in the loop below.  Unregister
+                * the request before re-registering it as a linger
+                * request to ensure the __map_request() below
+                * will decide it needs to be sent.
+                */
+               if (req->r_linger && list_empty(&req->r_linger_item)) {
+                       dout("%p tid %llu restart on osd%d\n",
+                            req, req->r_tid,
+                            req->r_osd ? req->r_osd->o_osd : -1);
+                       __unregister_request(osdc, req);
+                       __register_linger_request(osdc, req);
+                       continue;
+               }
+
                err = __map_request(osdc, req, force_resend);
                if (err < 0)
                        continue;  /* error */
@@ -1313,10 +1276,12 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                        dout("%p tid %llu maps to no osd\n", req, req->r_tid);
                        needmap++;  /* request a newer map */
                } else if (err > 0) {
-                       dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
-                            req->r_osd ? req->r_osd->o_osd : -1);
-                       if (!req->r_linger)
+                       if (!req->r_linger) {
+                               dout("%p tid %llu requeued on osd%d\n", req,
+                                    req->r_tid,
+                                    req->r_osd ? req->r_osd->o_osd : -1);
                                req->r_flags |= CEPH_OSD_FLAG_RETRY;
+                       }
                }
        }
 
@@ -1325,6 +1290,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
                err = __map_request(osdc, req, force_resend);
+               dout("__map_request returned %d\n", err);
                if (err == 0)
                        continue;  /* no change and no osd was specified */
                if (err < 0)
@@ -1337,8 +1303,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
                dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
                     req->r_osd ? req->r_osd->o_osd : -1);
-               __unregister_linger_request(osdc, req);
                __register_request(osdc, req);
+               __unregister_linger_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -1346,6 +1312,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
+       reset_changed_osds(osdc);
 }
 
 
@@ -1391,7 +1358,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             epoch, maplen);
                        newmap = osdmap_apply_incremental(&p, next,
                                                          osdc->osdmap,
-                                                         osdc->client->msgr);
+                                                         &osdc->client->msgr);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
@@ -1402,7 +1369,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                osdc->osdmap = newmap;
                        }
                        kick_requests(osdc, 0);
-                       reset_changed_osds(osdc);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1572,6 +1538,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
        event->data = data;
        event->osdc = osdc;
        INIT_LIST_HEAD(&event->osd_node);
+       RB_CLEAR_NODE(&event->node);
        kref_init(&event->kref);   /* one ref for us */
        kref_get(&event->kref);    /* one ref for the caller */
        init_completion(&event->completion);
@@ -1706,7 +1673,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
        req->r_request->bio = req->r_bio;
 #endif
-       req->r_request->trail = req->r_trail;
+       req->r_request->trail = &req->r_trail;
 
        register_request(osdc, req);
 
@@ -1839,11 +1806,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (!osdc->req_mempool)
                goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
+       err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
+                               OSD_OP_FRONT_LEN, 10, true,
                                "osd_op");
        if (err < 0)
                goto out_mempool;
-       err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
                                OSD_OPREPLY_FRONT_LEN, 10, true,
                                "osd_op_reply");
        if (err < 0)
@@ -1902,8 +1870,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
                                    false, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
@@ -1945,8 +1913,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
                                    nofail, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short write due to an object boundary */
        req->r_pages = pages;
@@ -2019,15 +1987,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (!req) {
                *skip = 1;
                m = NULL;
-               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
-                       osd->o_osd);
+               dout("get_reply unknown tid %llu from osd%d\n", tid,
+                    osd->o_osd);
                goto out;
        }
 
        if (req->r_con_filling_msg) {
-               dout("get_reply revoking msg %p from old con %p\n",
+               dout("%s revoking msg %p from old con %p\n", __func__,
                     req->r_reply, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
                req->r_con_filling_msg = NULL;
        }
@@ -2080,6 +2048,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
 
+       *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY:
This page took 0.035649 seconds and 5 git commands to generate.