libceph: drop snapid in ceph_calc_raw_layout()
[deliverable/linux.git] / net / ceph / osd_client.c
index 0174c04edac023ca3a0695a8d0ca189e13c0d5d3..b5a4b2875e8ac175f3a8981c1f2152370499df66 100644 (file)
@@ -32,20 +32,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-       switch (op) {
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_NOTIFY:
-               return 1;
-       default:
-               return 0;
-       }
-}
-
 static int op_has_extent(int op)
 {
        return (op == CEPH_OSD_OP_READ ||
@@ -54,30 +40,36 @@ static int op_has_extent(int op)
 
 int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                        struct ceph_file_layout *layout,
-                       u64 snapid,
                        u64 off, u64 *plen, u64 *bno,
                        struct ceph_osd_request *req,
                        struct ceph_osd_req_op *op)
 {
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
        int r;
 
-       reqhead->snapid = cpu_to_le64(snapid);
-
        /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
                                          &objoff, &objlen);
        if (r < 0)
                return r;
-       if (*plen < orig_len)
+       if (objlen < orig_len) {
+               *plen = objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
+       }
 
        if (op_has_extent(op->op)) {
+               u32 osize = le32_to_cpu(layout->fl_object_size);
                op->extent.offset = objoff;
                op->extent.length = objlen;
+               if (op->extent.truncate_size <= off - objoff) {
+                       op->extent.truncate_size = 0;
+               } else {
+                       op->extent.truncate_size -= off - objoff;
+                       if (op->extent.truncate_size > osize)
+                               op->extent.truncate_size = osize;
+               }
        }
        req->r_num_pages = calc_pages_for(off, *plen);
        req->r_page_alignment = off & ~PAGE_MASK;
@@ -125,8 +117,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
        u64 bno;
        int r;
 
-       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                                plen, &bno, req, op);
+       r = ceph_calc_raw_layout(osdc, layout, off, plen, &bno, req, op);
        if (r < 0)
                return r;
 
@@ -163,10 +154,7 @@ void ceph_osdc_release_request(struct kref *kref)
                bio_put(req->r_bio);
 #endif
        ceph_put_snap_context(req->r_snapc);
-       if (req->r_trail) {
-               ceph_pagelist_release(req->r_trail);
-               kfree(req->r_trail);
-       }
+       ceph_pagelist_release(&req->r_trail);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
@@ -174,17 +162,12 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+static int get_num_ops(struct ceph_osd_req_op *ops)
 {
        int i = 0;
 
-       if (needs_trail)
-               *needs_trail = 0;
-       while (ops[i].op) {
-               if (needs_trail && op_needs_trail(ops[i].op))
-                       *needs_trail = 1;
+       while (ops[i].op)
                i++;
-       }
 
        return i;
 }
@@ -200,8 +183,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int needs_trail;
-       int num_op = get_num_ops(ops, &needs_trail);
+       int num_op = get_num_ops(ops);
        size_t msg_size = sizeof(struct ceph_osd_request_head);
 
        msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -244,15 +226,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       /* allocate space for the trailing data */
-       if (needs_trail) {
-               req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-               if (!req->r_trail) {
-                       ceph_osdc_put_request(req);
-                       return NULL;
-               }
-               ceph_pagelist_init(req->r_trail);
-       }
+       ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
        msg_size += MAX_OBJ_NAME_SIZE;
@@ -304,29 +278,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_GETXATTR:
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
-               BUG_ON(!req->r_trail);
-
                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
                dst->xattr.cmp_op = src->xattr.cmp_op;
                dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               ceph_pagelist_append(req->r_trail, src->xattr.name,
+               ceph_pagelist_append(&req->r_trail, src->xattr.name,
                                     src->xattr.name_len);
-               ceph_pagelist_append(req->r_trail, src->xattr.val,
+               ceph_pagelist_append(&req->r_trail, src->xattr.val,
                                     src->xattr.value_len);
                break;
        case CEPH_OSD_OP_CALL:
-               BUG_ON(!req->r_trail);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-               ceph_pagelist_append(req->r_trail, src->cls.class_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.class_name,
                                     src->cls.class_len);
-               ceph_pagelist_append(req->r_trail, src->cls.method_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.method_name,
                                     src->cls.method_len);
-               ceph_pagelist_append(req->r_trail, src->cls.indata,
+               ceph_pagelist_append(&req->r_trail, src->cls.indata,
                                     src->cls.indata_len);
                break;
        case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +309,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                        __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
                        __le32 timeout = cpu_to_le32(src->watch.timeout);
 
-                       BUG_ON(!req->r_trail);
-
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &prot_ver, sizeof(prot_ver));
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &timeout, sizeof(timeout));
                }
        case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +333,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 *plen,
+                            u64 off, u64 len,
                             struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc,
-                            struct timespec *mtime,
-                            const char *oid,
-                            int oid_len)
+                            struct ceph_snap_context *snapc, u64 snap_id,
+                            struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
        struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
        struct ceph_osd_op *op;
        void *p;
-       int num_op = get_num_ops(src_ops, NULL);
+       int num_op = get_num_ops(src_ops);
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
        int flags = req->r_flags;
        u64 data_len = 0;
        int i;
 
        head = msg->front.iov_base;
+       head->snapid = cpu_to_le64(snap_id);
        op = (void *)(head + 1);
        p = (void *)(op + num_op);
 
@@ -397,9 +364,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
 
        /* fill in oid */
-       head->object_len = cpu_to_le32(oid_len);
-       memcpy(p, oid, oid_len);
-       p += oid_len;
+       head->object_len = cpu_to_le32(req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
 
        src_op = src_ops;
        while (src_op->op) {
@@ -408,8 +375,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
                op++;
        }
 
-       if (req->r_trail)
-               data_len += req->r_trail->length;
+       data_len += req->r_trail.length;
 
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +388,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
-               req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+               req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
        } else if (data_len) {
                req->r_request->hdr.data_off = 0;
                req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -496,10 +462,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_num_pages = calc_pages_for(page_align, *plen);
        req->r_page_alignment = page_align;
 
-       ceph_osdc_build_request(req, off, plen, ops,
-                               snapc,
-                               mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(req, off, *plen, ops, snapc, vino.snap, mtime);
 
        return req;
 }
@@ -739,31 +702,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
-       int ret = 0;
+       struct ceph_entity_addr *peer_addr;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
-               ret = -ENODEV;
-       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-                         &osd->o_con.peer_addr,
-                         sizeof(osd->o_con.peer_addr)) == 0 &&
-                  !ceph_con_opened(&osd->o_con)) {
+
+               return -ENODEV;
+       }
+
+       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+                       !ceph_con_opened(&osd->o_con)) {
+               struct ceph_osd_request *req;
+
                dout(" osd addr hasn't changed and connection never opened,"
                     " letting msgr retry");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
-               ret = -EAGAIN;
-       } else {
-               ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-                             &osdc->osdmap->osd_addr[osd->o_osd]);
-               osd->o_incarnation++;
+
+               return -EAGAIN;
        }
-       return ret;
+
+       ceph_con_close(&osd->o_con);
+       ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+       osd->o_incarnation++;
+
+       return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1270,7 +1237,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  * Requeue requests whose mapping to an OSD has changed.  If requests map to
  * no osd, request a new map.
  *
- * Caller should hold map_sem for read and request_mutex.
+ * Caller should hold map_sem for read.
  */
 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 {
@@ -1345,6 +1312,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
+       reset_changed_osds(osdc);
 }
 
 
@@ -1401,7 +1369,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                osdc->osdmap = newmap;
                        }
                        kick_requests(osdc, 0);
-                       reset_changed_osds(osdc);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1706,7 +1673,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
        req->r_request->bio = req->r_bio;
 #endif
-       req->r_request->trail = req->r_trail;
+       req->r_request->trail = &req->r_trail;
 
        register_request(osdc, req);
 
This page took 0.030306 seconds and 5 git commands to generate.