libceph: switch to calc_target(), part 1
[deliverable/linux.git] / net / ceph / osd_client.c
index d66dacc9d0d4046f8580eb7d5d74e837abc4ad01..013101598c4109ef1bed5daccbf738f832d9b8f8 100644 (file)
@@ -19,7 +19,6 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
 
-#define OSD_OP_FRONT_LEN       4096
 #define OSD_OPREPLY_FRONT_LEN  512
 
 static struct kmem_cache       *ceph_osd_request_cache;
@@ -144,14 +143,6 @@ osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
 
-struct ceph_osd_data *
-osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
-                       unsigned int which)
-{
-       return osd_req_op_data(osd_req, which, cls, response_data);
-}
-EXPORT_SYMBOL(osd_req_op_cls_response_data);   /* ??? */
-
 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
                        unsigned int which, struct page **pages,
                        u64 length, u32 alignment,
@@ -307,6 +298,30 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        }
 }
 
+/*
+ * Assumes @t is zero-initialized.
+ */
+static void target_init(struct ceph_osd_request_target *t)
+{
+       ceph_oid_init(&t->base_oid);
+       ceph_oloc_init(&t->base_oloc);
+       ceph_oid_init(&t->target_oid);
+       ceph_oloc_init(&t->target_oloc);
+
+       ceph_osds_init(&t->acting);
+       ceph_osds_init(&t->up);
+       t->size = -1;
+       t->min_size = -1;
+
+       t->osd = CEPH_HOMELESS_OSD;
+}
+
+static void target_destroy(struct ceph_osd_request_target *t)
+{
+       ceph_oid_destroy(&t->base_oid);
+       ceph_oid_destroy(&t->target_oid);
+}
+
 /*
  * requests
  */
@@ -335,7 +350,9 @@ static void ceph_osdc_release_request(struct kref *kref)
        for (which = 0; which < req->r_num_ops; which++)
                osd_req_op_data_release(req, which);
 
+       target_destroy(&req->r_t);
        ceph_put_snap_context(req->r_snapc);
+
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
@@ -402,8 +419,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
-       req->r_base_oloc.pool = -1;
-       req->r_target_oloc.pool = -1;
+       target_init(&req->r_t);
 
        dout("%s req %p\n", __func__, req);
        return req;
@@ -416,6 +432,8 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        struct ceph_msg *msg;
        int msg_size;
 
+       WARN_ON(ceph_oid_empty(&req->r_base_oid));
+
        /* create request message */
        msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
        msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
@@ -440,11 +458,8 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 
        /* create reply message */
        msg_size = OSD_OPREPLY_FRONT_LEN;
-       if (req->r_num_ops > CEPH_OSD_SLAB_OPS) {
-               /* ceph_osd_op and rval */
-               msg_size += (req->r_num_ops - CEPH_OSD_SLAB_OPS) *
-                           (sizeof(struct ceph_osd_op) + 4);
-       }
+       msg_size += req->r_base_oid.name_len;
+       msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 
        if (req->r_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -863,10 +878,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
-
-       snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
-                "%llx.%08llx", vino.ino, objnum);
-       req->r_base_oid.name_len = strlen(req->r_base_oid.name);
+       ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
        r = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (r)
@@ -883,45 +895,7 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
 /*
  * We keep osd requests in an rbtree, sorted by ->r_tid.
  */
-static void __insert_request(struct ceph_osd_client *osdc,
-                            struct ceph_osd_request *new)
-{
-       struct rb_node **p = &osdc->requests.rb_node;
-       struct rb_node *parent = NULL;
-       struct ceph_osd_request *req = NULL;
-
-       while (*p) {
-               parent = *p;
-               req = rb_entry(parent, struct ceph_osd_request, r_node);
-               if (new->r_tid < req->r_tid)
-                       p = &(*p)->rb_left;
-               else if (new->r_tid > req->r_tid)
-                       p = &(*p)->rb_right;
-               else
-                       BUG();
-       }
-
-       rb_link_node(&new->r_node, parent, p);
-       rb_insert_color(&new->r_node, &osdc->requests);
-}
-
-static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
-                                                u64 tid)
-{
-       struct ceph_osd_request *req;
-       struct rb_node *n = osdc->requests.rb_node;
-
-       while (n) {
-               req = rb_entry(n, struct ceph_osd_request, r_node);
-               if (tid < req->r_tid)
-                       n = n->rb_left;
-               else if (tid > req->r_tid)
-                       n = n->rb_right;
-               else
-                       return req;
-       }
-       return NULL;
-}
+DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
 
 static struct ceph_osd_request *
 __lookup_request_ge(struct ceph_osd_client *osdc,
@@ -1109,6 +1083,8 @@ static void put_osd(struct ceph_osd *osd)
        }
 }
 
+DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
+
 /*
  * remove an osd from our map
  */
@@ -1119,8 +1095,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        WARN_ON(!list_empty(&osd->o_linger_requests));
 
        list_del_init(&osd->o_osd_lru);
-       rb_erase(&osd->o_node, &osdc->osds);
-       RB_CLEAR_NODE(&osd->o_node);
+       erase_osd(&osdc->osds, osd);
 }
 
 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
@@ -1134,18 +1109,6 @@ static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        }
 }
 
-static void remove_all_osds(struct ceph_osd_client *osdc)
-{
-       dout("%s %p\n", __func__, osdc);
-       mutex_lock(&osdc->request_mutex);
-       while (!RB_EMPTY_ROOT(&osdc->osds)) {
-               struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
-                                               struct ceph_osd, o_node);
-               remove_osd(osdc, osd);
-       }
-       mutex_unlock(&osdc->request_mutex);
-}
-
 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
                              struct ceph_osd *osd)
 {
@@ -1173,20 +1136,6 @@ static void __remove_osd_from_lru(struct ceph_osd *osd)
                list_del_init(&osd->o_osd_lru);
 }
 
-static void remove_old_osds(struct ceph_osd_client *osdc)
-{
-       struct ceph_osd *osd, *nosd;
-
-       dout("__remove_old_osds %p\n", osdc);
-       mutex_lock(&osdc->request_mutex);
-       list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
-               if (time_before(jiffies, osd->lru_ttl))
-                       break;
-               remove_osd(osdc, osd);
-       }
-       mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * reset osd connect
  */
@@ -1222,45 +1171,6 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        return 0;
 }
 
-static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
-{
-       struct rb_node **p = &osdc->osds.rb_node;
-       struct rb_node *parent = NULL;
-       struct ceph_osd *osd = NULL;
-
-       dout("__insert_osd %p osd%d\n", new, new->o_osd);
-       while (*p) {
-               parent = *p;
-               osd = rb_entry(parent, struct ceph_osd, o_node);
-               if (new->o_osd < osd->o_osd)
-                       p = &(*p)->rb_left;
-               else if (new->o_osd > osd->o_osd)
-                       p = &(*p)->rb_right;
-               else
-                       BUG();
-       }
-
-       rb_link_node(&new->o_node, parent, p);
-       rb_insert_color(&new->o_node, &osdc->osds);
-}
-
-static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
-{
-       struct ceph_osd *osd;
-       struct rb_node *n = osdc->osds.rb_node;
-
-       while (n) {
-               osd = rb_entry(n, struct ceph_osd, o_node);
-               if (o < osd->o_osd)
-                       n = n->rb_left;
-               else if (o > osd->o_osd)
-                       n = n->rb_right;
-               else
-                       return osd;
-       }
-       return NULL;
-}
-
 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 {
        schedule_delayed_work(&osdc->timeout_work,
@@ -1282,7 +1192,7 @@ static void __register_request(struct ceph_osd_client *osdc,
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
        dout("__register_request %p tid %lld\n", req, req->r_tid);
-       __insert_request(osdc, req);
+       insert_request(&osdc->requests, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
        if (osdc->num_requests == 1) {
@@ -1304,8 +1214,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        }
 
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
-       rb_erase(&req->r_node, &osdc->requests);
-       RB_CLEAR_NODE(&req->r_node);
+       erase_request(&osdc->requests, req);
        osdc->num_requests--;
 
        if (req->r_osd) {
@@ -1384,59 +1293,139 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 
+static bool __pool_full(struct ceph_pg_pool_info *pi)
+{
+       return pi->flags & CEPH_POOL_FLAG_FULL;
+}
+
 /*
  * Returns whether a request should be blocked from being sent
  * based on the current osdmap and osd_client settings.
  *
  * Caller should hold map_sem for read.
  */
-static bool __req_should_be_paused(struct ceph_osd_client *osdc,
-                                  struct ceph_osd_request *req)
+static bool target_should_be_paused(struct ceph_osd_client *osdc,
+                                   const struct ceph_osd_request_target *t,
+                                   struct ceph_pg_pool_info *pi)
 {
        bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
        bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
-               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
-       return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
-               (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
+                      ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+                      __pool_full(pi);
+
+       WARN_ON(pi->id != t->base_oloc.pool);
+       return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
+              (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
 }
 
-/*
- * Calculate mapping of a request to a PG.  Takes tiering into account.
- */
-static int __calc_request_pg(struct ceph_osdmap *osdmap,
-                            struct ceph_osd_request *req,
-                            struct ceph_pg *pg_out)
-{
-       bool need_check_tiering;
+enum calc_target_result {
+       CALC_TARGET_NO_ACTION = 0,
+       CALC_TARGET_NEED_RESEND,
+       CALC_TARGET_POOL_DNE,
+};
 
-       need_check_tiering = false;
-       if (req->r_target_oloc.pool == -1) {
-               req->r_target_oloc = req->r_base_oloc; /* struct */
+static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
+                                          struct ceph_osd_request_target *t,
+                                          u32 *last_force_resend,
+                                          bool any_change)
+{
+       struct ceph_pg_pool_info *pi;
+       struct ceph_pg pgid, last_pgid;
+       struct ceph_osds up, acting;
+       bool force_resend = false;
+       bool need_check_tiering = false;
+       bool need_resend = false;
+       bool sort_bitwise = ceph_osdmap_flag(osdc->osdmap,
+                                            CEPH_OSDMAP_SORTBITWISE);
+       enum calc_target_result ct_res;
+       int ret;
+
+       pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
+       if (!pi) {
+               t->osd = CEPH_HOMELESS_OSD;
+               ct_res = CALC_TARGET_POOL_DNE;
+               goto out;
+       }
+
+       if (osdc->osdmap->epoch == pi->last_force_request_resend) {
+               if (last_force_resend &&
+                   *last_force_resend < pi->last_force_request_resend) {
+                       *last_force_resend = pi->last_force_request_resend;
+                       force_resend = true;
+               } else if (!last_force_resend) {
+                       force_resend = true;
+               }
+       }
+       if (ceph_oid_empty(&t->target_oid) || force_resend) {
+               ceph_oid_copy(&t->target_oid, &t->base_oid);
                need_check_tiering = true;
        }
-       if (req->r_target_oid.name_len == 0) {
-               ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+       if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
+               ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
                need_check_tiering = true;
        }
 
        if (need_check_tiering &&
-           (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
-               struct ceph_pg_pool_info *pi;
-
-               pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
-               if (pi) {
-                       if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
-                           pi->read_tier >= 0)
-                               req->r_target_oloc.pool = pi->read_tier;
-                       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
-                           pi->write_tier >= 0)
-                               req->r_target_oloc.pool = pi->write_tier;
-               }
-               /* !pi is caught in ceph_oloc_oid_to_pg() */
+           (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+               if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
+                       t->target_oloc.pool = pi->read_tier;
+               if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
+                       t->target_oloc.pool = pi->write_tier;
+       }
+
+       ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
+                                       &t->target_oloc, &pgid);
+       if (ret) {
+               WARN_ON(ret != -ENOENT);
+               t->osd = CEPH_HOMELESS_OSD;
+               ct_res = CALC_TARGET_POOL_DNE;
+               goto out;
        }
-
-       return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
-                                  &req->r_target_oid, pg_out);
+       last_pgid.pool = pgid.pool;
+       last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
+
+       ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
+       if (any_change &&
+           ceph_is_new_interval(&t->acting,
+                                &acting,
+                                &t->up,
+                                &up,
+                                t->size,
+                                pi->size,
+                                t->min_size,
+                                pi->min_size,
+                                t->pg_num,
+                                pi->pg_num,
+                                t->sort_bitwise,
+                                sort_bitwise,
+                                &last_pgid))
+               force_resend = true;
+
+       if (t->paused && !target_should_be_paused(osdc, t, pi)) {
+               t->paused = false;
+               need_resend = true;
+       }
+
+       if (ceph_pg_compare(&t->pgid, &pgid) ||
+           ceph_osds_changed(&t->acting, &acting, any_change) ||
+           force_resend) {
+               t->pgid = pgid; /* struct */
+               ceph_osds_copy(&t->acting, &acting);
+               ceph_osds_copy(&t->up, &up);
+               t->size = pi->size;
+               t->min_size = pi->min_size;
+               t->pg_num = pi->pg_num;
+               t->pg_num_mask = pi->pg_num_mask;
+               t->sort_bitwise = sort_bitwise;
+
+               t->osd = acting.primary;
+               need_resend = true;
+       }
+
+       ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
+out:
+       dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
+       return ct_res;
 }
 
 static void __enqueue_request(struct ceph_osd_request *req)
@@ -1468,47 +1457,26 @@ static void __enqueue_request(struct ceph_osd_request *req)
 static int __map_request(struct ceph_osd_client *osdc,
                         struct ceph_osd_request *req, int force_resend)
 {
-       struct ceph_pg pgid;
-       int acting[CEPH_PG_MAX_SIZE];
-       int num, o;
+       enum calc_target_result ct_res;
        int err;
-       bool was_paused;
 
        dout("map_request %p tid %lld\n", req, req->r_tid);
 
-       err = __calc_request_pg(osdc->osdmap, req, &pgid);
-       if (err) {
+       ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
+       switch (ct_res) {
+       case CALC_TARGET_POOL_DNE:
                list_move(&req->r_req_lru_item, &osdc->req_notarget);
-               return err;
-       }
-       req->r_pgid = pgid;
-
-       num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
-       if (num < 0)
-               num = 0;
-
-       was_paused = req->r_paused;
-       req->r_paused = __req_should_be_paused(osdc, req);
-       if (was_paused && !req->r_paused)
-               force_resend = 1;
-
-       if ((!force_resend &&
-            req->r_osd && req->r_osd->o_osd == o &&
-            req->r_sent >= req->r_osd->o_incarnation &&
-            req->r_num_pg_osds == num &&
-            memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
-           (req->r_osd == NULL && o == -1) ||
-           req->r_paused)
+               return -EIO;
+       case CALC_TARGET_NO_ACTION:
                return 0;  /* no change */
+       default:
+               BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
+       }
 
        dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
-            req->r_tid, pgid.pool, pgid.seed, o,
+            req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
             req->r_osd ? req->r_osd->o_osd : -1);
 
-       /* record full pg acting set */
-       memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
-       req->r_num_pg_osds = num;
-
        if (req->r_osd) {
                __cancel_request(req);
                list_del_init(&req->r_osd_item);
@@ -1516,21 +1484,22 @@ static int __map_request(struct ceph_osd_client *osdc,
                req->r_osd = NULL;
        }
 
-       req->r_osd = __lookup_osd(osdc, o);
-       if (!req->r_osd && o >= 0) {
+       req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
+       if (!req->r_osd && req->r_t.osd >= 0) {
                err = -ENOMEM;
-               req->r_osd = create_osd(osdc, o);
+               req->r_osd = create_osd(osdc, req->r_t.osd);
                if (!req->r_osd) {
                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
                }
 
-               dout("map_request osd %p is osd%d\n", req->r_osd, o);
-               __insert_osd(osdc, req->r_osd);
+               dout("map_request osd %p is osd%d\n", req->r_osd,
+                    req->r_osd->o_osd);
+               insert_osd(&osdc->osds, req->r_osd);
 
                ceph_con_open(&req->r_osd->o_con,
-                             CEPH_ENTITY_TYPE_OSD, o,
-                             &osdc->osdmap->osd_addr[o]);
+                             CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
+                             &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
        }
 
        __enqueue_request(req);
@@ -1550,15 +1519,15 @@ static void __send_request(struct ceph_osd_client *osdc,
 
        dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
             req, req->r_tid, req->r_osd->o_osd, req->r_flags,
-            (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+            req->r_t.pgid.pool, req->r_t.pgid.seed);
 
        /* fill in message content that changes each time we send it */
        put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
        put_unaligned_le32(req->r_flags, req->r_request_flags);
-       put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
+       put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool);
        p = req->r_request_pgid;
-       ceph_encode_64(&p, req->r_pgid.pool);
-       ceph_encode_32(&p, req->r_pgid.seed);
+       ceph_encode_64(&p, req->r_t.pgid.pool);
+       ceph_encode_32(&p, req->r_t.pgid.seed);
        put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
        memcpy(req->r_request_reassert_version, &req->r_reassert_version,
               sizeof(req->r_reassert_version));
@@ -1679,12 +1648,21 @@ static void handle_osds_timeout(struct work_struct *work)
                container_of(work, struct ceph_osd_client,
                             osds_timeout_work.work);
        unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
+       struct ceph_osd *osd, *nosd;
 
-       dout("osds timeout\n");
+       dout("%s osdc %p\n", __func__, osdc);
        down_read(&osdc->map_sem);
-       remove_old_osds(osdc);
-       up_read(&osdc->map_sem);
+       mutex_lock(&osdc->request_mutex);
+
+       list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+               if (time_before(jiffies, osd->lru_ttl))
+                       break;
 
+               remove_osd(osdc, osd);
+       }
+
+       mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
        schedule_delayed_work(&osdc->osds_timeout_work,
                              round_jiffies_relative(delay));
 }
@@ -1847,7 +1825,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        /* lookup */
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
-       req = __lookup_request(osdc, tid);
+       req = lookup_request(&osdc->requests, tid);
        if (req == NULL) {
                dout("handle_reply tid %llu dne\n", tid);
                goto bad_mutex;
@@ -1907,12 +1885,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                redir.oloc.pool = -1;
        }
 
-       if (redir.oloc.pool != -1) {
+       if (!ceph_oloc_empty(&redir.oloc)) {
                dout("redirect pool %lld\n", redir.oloc.pool);
 
                __unregister_request(osdc, req);
 
-               req->r_target_oloc = redir.oloc; /* struct */
+               ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc);
 
                /*
                 * Start redirect requests with nofail=true.  If
@@ -2166,8 +2144,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        dout("applying incremental map %u len %d\n",
                             epoch, maplen);
                        newmap = osdmap_apply_incremental(&p, next,
-                                                         osdc->osdmap,
-                                                         &osdc->client->msgr);
+                                                         osdc->osdmap);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
@@ -2505,7 +2482,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
        /* oid */
        ceph_encode_32(&p, req->r_base_oid.name_len);
        memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
-       dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
+       dout("oid %*pE len %d\n", req->r_base_oid.name_len,
             req->r_base_oid.name, req->r_base_oid.name_len);
        p += req->r_base_oid.name_len;
 
@@ -2674,8 +2651,6 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->client = client;
        osdc->osdmap = NULL;
        init_rwsem(&osdc->map_sem);
-       init_completion(&osdc->map_waiters);
-       osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
@@ -2702,13 +2677,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                goto out;
 
        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
-                               OSD_OP_FRONT_LEN, 10, true,
-                               "osd_op");
+                               PAGE_SIZE, 10, true, "osd_op");
        if (err < 0)
                goto out_mempool;
        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
-                               OSD_OPREPLY_FRONT_LEN, 10, true,
-                               "osd_op_reply");
+                               PAGE_SIZE, 10, true, "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
 
@@ -2735,11 +2708,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        destroy_workqueue(osdc->notify_wq);
        cancel_delayed_work_sync(&osdc->timeout_work);
        cancel_delayed_work_sync(&osdc->osds_timeout_work);
+
+       mutex_lock(&osdc->request_mutex);
+       while (!RB_EMPTY_ROOT(&osdc->osds)) {
+               struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
+                                               struct ceph_osd, o_node);
+               remove_osd(osdc, osd);
+       }
+       mutex_unlock(&osdc->request_mutex);
+
        if (osdc->osdmap) {
                ceph_osdmap_destroy(osdc->osdmap);
                osdc->osdmap = NULL;
        }
-       remove_all_osds(osdc);
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -2902,7 +2883,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 
        tid = le64_to_cpu(hdr->tid);
        mutex_lock(&osdc->request_mutex);
-       req = __lookup_request(osdc, tid);
+       req = lookup_request(&osdc->requests, tid);
        if (!req) {
                dout("%s osd%d tid %llu unknown, skipping\n", __func__,
                     osd->o_osd, tid);
This page took 0.045466 seconds and 5 git commands to generate.