Merge tag 'fbdev-updates-for-3.6' of git://github.com/schandinat/linux-2.6
[deliverable/linux.git] / net / ceph / osd_client.c
index ca59e66c9787303805519f2bf325cc5d5817ff55..42119c05e82c023b777298e9f41b6a6ebbade0cb 100644 (file)
@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
        if (req->r_request)
                ceph_msg_put(req->r_request);
        if (req->r_con_filling_msg) {
-               dout("release_request revoking pages %p from con %p\n",
+               dout("%s revoking pages %p from con %p\n", __func__,
                     req->r_pages, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg,
-                                     req->r_reply);
+               ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
        }
        if (req->r_reply)
@@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
+       rb_init_node(&req->r_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd);
        INIT_LIST_HEAD(&req->r_req_lru_item);
+       INIT_LIST_HEAD(&req->r_osd_item);
+
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                }
                ceph_pagelist_init(req->r_trail);
        }
+
        /* create request message; allow space for oid */
        msg_size += MAX_OBJ_NAME_SIZE;
        if (snapc)
@@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                return NULL;
        }
 
-       msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
        memset(msg->front.iov_base, 0, msg->front.iov_len);
 
        req->r_request = msg;
@@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 {
        struct ceph_osd *osd;
 
@@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
+       osd->o_osd = onum;
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
-       ceph_con_init(osdc->client->msgr, &osd->o_con);
-       osd->o_con.private = osd;
-       osd->o_con.ops = &osd_con_ops;
-       osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+       ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 
        INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
@@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 
 static void remove_all_osds(struct ceph_osd_client *osdc)
 {
-       dout("__remove_old_osds %p\n", osdc);
+       dout("%s %p\n", __func__, osdc);
        mutex_lock(&osdc->request_mutex);
        while (!RB_EMPTY_ROOT(&osdc->osds)) {
                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
                ret = -EAGAIN;
        } else {
                ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
+               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+                             &osdc->osdmap->osd_addr[osd->o_osd]);
                osd->o_incarnation++;
        }
        return ret;
@@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        if (req->r_osd) {
                /* make sure the original request isn't in flight. */
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
 
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 static void __cancel_request(struct ceph_osd_request *req)
 {
        if (req->r_sent && req->r_osd) {
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
                req->r_sent = 0;
        }
 }
@@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
 {
        dout("__register_linger_request %p\n", req);
        list_add_tail(&req->r_linger_item, &osdc->req_linger);
-       list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+       if (req->r_osd)
+               list_add_tail(&req->r_linger_osd,
+                             &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc,
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
-               req->r_osd = create_osd(osdc);
+               req->r_osd = create_osd(osdc, o);
                if (!req->r_osd) {
                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
                }
 
                dout("map_request osd %p is osd%d\n", req->r_osd, o);
-               req->r_osd->o_osd = o;
-               req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
 
-               ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
+               ceph_con_open(&req->r_osd->o_con,
+                             CEPH_ENTITY_TYPE_OSD, o,
+                             &osdc->osdmap->osd_addr[o]);
        }
 
        if (req->r_osd) {
@@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
        dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
        mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+       for (p = rb_first(&osdc->requests); p; ) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
+               p = rb_next(p);
                err = __map_request(osdc, req, force_resend);
                if (err < 0)
                        continue;  /* error */
@@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                        dout("%p tid %llu maps to no osd\n", req, req->r_tid);
                        needmap++;  /* request a newer map */
                } else if (err > 0) {
-                       dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
-                            req->r_osd ? req->r_osd->o_osd : -1);
-                       if (!req->r_linger)
+                       if (!req->r_linger) {
+                               dout("%p tid %llu requeued on osd%d\n", req,
+                                    req->r_tid,
+                                    req->r_osd ? req->r_osd->o_osd : -1);
                                req->r_flags |= CEPH_OSD_FLAG_RETRY;
+                       }
+               }
+               if (req->r_linger && list_empty(&req->r_linger_item)) {
+                       /*
+                        * register as a linger so that we will
+                        * re-submit below and get a new tid
+                        */
+                       dout("%p tid %llu restart on osd%d\n",
+                            req, req->r_tid,
+                            req->r_osd ? req->r_osd->o_osd : -1);
+                       __register_linger_request(osdc, req);
+                       __unregister_request(osdc, req);
                }
        }
 
@@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             epoch, maplen);
                        newmap = osdmap_apply_incremental(&p, next,
                                                          osdc->osdmap,
-                                                         osdc->client->msgr);
+                                                         &osdc->client->msgr);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
@@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (!osdc->req_mempool)
                goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
+       err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
+                               OSD_OP_FRONT_LEN, 10, true,
                                "osd_op");
        if (err < 0)
                goto out_mempool;
-       err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
                                OSD_OPREPLY_FRONT_LEN, 10, true,
                                "osd_op_reply");
        if (err < 0)
@@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (!req) {
                *skip = 1;
                m = NULL;
-               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
-                       osd->o_osd);
+               dout("get_reply unknown tid %llu from osd%d\n", tid,
+                    osd->o_osd);
                goto out;
        }
 
        if (req->r_con_filling_msg) {
-               dout("get_reply revoking msg %p from old con %p\n",
+               dout("%s revoking msg %p from old con %p\n", __func__,
                     req->r_reply, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
                req->r_con_filling_msg = NULL;
        }
@@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
 
+       *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY:
This page took 0.029878 seconds and 5 git commands to generate.