rbd: update email address in Documentation
[deliverable/linux.git] / net / ceph / osd_client.c
CommitLineData
3d14c5d2 1#include <linux/ceph/ceph_debug.h>
f24e9980 2
3d14c5d2 3#include <linux/module.h>
f24e9980
SW
4#include <linux/err.h>
5#include <linux/highmem.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/slab.h>
9#include <linux/uaccess.h>
68b4476b
YS
10#ifdef CONFIG_BLOCK
11#include <linux/bio.h>
12#endif
f24e9980 13
3d14c5d2
YS
14#include <linux/ceph/libceph.h>
15#include <linux/ceph/osd_client.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/auth.h>
19#include <linux/ceph/pagelist.h>
f24e9980 20
c16e7869
SW
21#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 23
9e32789f 24static const struct ceph_connection_operations osd_con_ops;
f24e9980 25
6f6c7006
SW
26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
f24e9980 28
68b4476b
YS
29static int op_needs_trail(int op)
30{
31 switch (op) {
32 case CEPH_OSD_OP_GETXATTR:
33 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL:
36 return 1;
37 default:
38 return 0;
39 }
40}
41
42static int op_has_extent(int op)
43{
44 return (op == CEPH_OSD_OP_READ ||
45 op == CEPH_OSD_OP_WRITE);
46}
47
3499e8a5
YS
48void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
49 struct ceph_file_layout *layout,
50 u64 snapid,
68b4476b
YS
51 u64 off, u64 *plen, u64 *bno,
52 struct ceph_osd_request *req,
53 struct ceph_osd_req_op *op)
3499e8a5
YS
54{
55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
68b4476b 56 u64 orig_len = *plen;
3499e8a5
YS
57 u64 objoff, objlen; /* extent in object */
58
59 reqhead->snapid = cpu_to_le64(snapid);
60
61 /* object extent? */
68b4476b 62 ceph_calc_file_object_mapping(layout, off, plen, bno,
3499e8a5 63 &objoff, &objlen);
68b4476b 64 if (*plen < orig_len)
3499e8a5 65 dout(" skipping last %llu, final file extent %llu~%llu\n",
68b4476b 66 orig_len - *plen, off, *plen);
3499e8a5 67
68b4476b
YS
68 if (op_has_extent(op->op)) {
69 op->extent.offset = objoff;
70 op->extent.length = objlen;
71 }
72 req->r_num_pages = calc_pages_for(off, *plen);
b7495fc2 73 req->r_page_alignment = off & ~PAGE_MASK;
3d14c5d2
YS
74 if (op->op == CEPH_OSD_OP_WRITE)
75 op->payload_len = *plen;
3499e8a5
YS
76
77 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
78 *bno, objoff, objlen, req->r_num_pages);
79
80}
3d14c5d2 81EXPORT_SYMBOL(ceph_calc_raw_layout);
3499e8a5 82
f24e9980
SW
83/*
84 * Implement client access to distributed object storage cluster.
85 *
86 * All data objects are stored within a cluster/cloud of OSDs, or
87 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
88 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
89 * remote daemons serving up and coordinating consistent and safe
90 * access to storage.
91 *
92 * Cluster membership and the mapping of data objects onto storage devices
93 * are described by the osd map.
94 *
95 * We keep track of pending OSD requests (read, write), resubmit
96 * requests to different OSDs when the cluster topology/data layout
97 * change, or retry the affected requests when the communications
98 * channel with an OSD is reset.
99 */
100
101/*
102 * calculate the mapping of a file extent onto an object, and fill out the
103 * request accordingly. shorten extent as necessary if it crosses an
104 * object boundary.
105 *
106 * fill osd op in request message.
107 */
108static void calc_layout(struct ceph_osd_client *osdc,
3499e8a5
YS
109 struct ceph_vino vino,
110 struct ceph_file_layout *layout,
f24e9980 111 u64 off, u64 *plen,
68b4476b
YS
112 struct ceph_osd_request *req,
113 struct ceph_osd_req_op *op)
f24e9980 114{
f24e9980
SW
115 u64 bno;
116
68b4476b
YS
117 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
118 plen, &bno, req, op);
f24e9980
SW
119
120 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
121 req->r_oid_len = strlen(req->r_oid);
f24e9980
SW
122}
123
f24e9980
SW
124/*
125 * requests
126 */
415e49a9 127void ceph_osdc_release_request(struct kref *kref)
f24e9980 128{
415e49a9
SW
129 struct ceph_osd_request *req = container_of(kref,
130 struct ceph_osd_request,
131 r_kref);
132
133 if (req->r_request)
134 ceph_msg_put(req->r_request);
135 if (req->r_reply)
136 ceph_msg_put(req->r_reply);
0d59ab81 137 if (req->r_con_filling_msg) {
350b1c32 138 dout("release_request revoking pages %p from con %p\n",
0d59ab81
YS
139 req->r_pages, req->r_con_filling_msg);
140 ceph_con_revoke_message(req->r_con_filling_msg,
141 req->r_reply);
142 ceph_con_put(req->r_con_filling_msg);
350b1c32 143 }
415e49a9
SW
144 if (req->r_own_pages)
145 ceph_release_page_vector(req->r_pages,
146 req->r_num_pages);
68b4476b
YS
147#ifdef CONFIG_BLOCK
148 if (req->r_bio)
149 bio_put(req->r_bio);
150#endif
415e49a9 151 ceph_put_snap_context(req->r_snapc);
68b4476b
YS
152 if (req->r_trail) {
153 ceph_pagelist_release(req->r_trail);
154 kfree(req->r_trail);
155 }
415e49a9
SW
156 if (req->r_mempool)
157 mempool_free(req, req->r_osdc->req_mempool);
158 else
159 kfree(req);
f24e9980 160}
3d14c5d2 161EXPORT_SYMBOL(ceph_osdc_release_request);
68b4476b
YS
162
163static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
164{
165 int i = 0;
166
167 if (needs_trail)
168 *needs_trail = 0;
169 while (ops[i].op) {
170 if (needs_trail && op_needs_trail(ops[i].op))
171 *needs_trail = 1;
172 i++;
173 }
174
175 return i;
176}
177
3499e8a5
YS
178struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
179 int flags,
f24e9980 180 struct ceph_snap_context *snapc,
68b4476b 181 struct ceph_osd_req_op *ops,
3499e8a5
YS
182 bool use_mempool,
183 gfp_t gfp_flags,
68b4476b
YS
184 struct page **pages,
185 struct bio *bio)
f24e9980
SW
186{
187 struct ceph_osd_request *req;
188 struct ceph_msg *msg;
68b4476b
YS
189 int needs_trail;
190 int num_op = get_num_ops(ops, &needs_trail);
191 size_t msg_size = sizeof(struct ceph_osd_request_head);
3499e8a5 192
68b4476b 193 msg_size += num_op*sizeof(struct ceph_osd_op);
f24e9980
SW
194
195 if (use_mempool) {
3499e8a5 196 req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980
SW
197 memset(req, 0, sizeof(*req));
198 } else {
3499e8a5 199 req = kzalloc(sizeof(*req), gfp_flags);
f24e9980
SW
200 }
201 if (req == NULL)
a79832f2 202 return NULL;
f24e9980 203
f24e9980
SW
204 req->r_osdc = osdc;
205 req->r_mempool = use_mempool;
68b4476b 206
415e49a9 207 kref_init(&req->r_kref);
f24e9980
SW
208 init_completion(&req->r_completion);
209 init_completion(&req->r_safe_completion);
210 INIT_LIST_HEAD(&req->r_unsafe_item);
211 req->r_flags = flags;
212
213 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
214
c16e7869
SW
215 /* create reply message */
216 if (use_mempool)
217 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
218 else
219 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
3499e8a5 220 OSD_OPREPLY_FRONT_LEN, gfp_flags);
a79832f2 221 if (!msg) {
c16e7869 222 ceph_osdc_put_request(req);
a79832f2 223 return NULL;
c16e7869
SW
224 }
225 req->r_reply = msg;
226
68b4476b
YS
227 /* allocate space for the trailing data */
228 if (needs_trail) {
229 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
230 if (!req->r_trail) {
231 ceph_osdc_put_request(req);
232 return NULL;
233 }
234 ceph_pagelist_init(req->r_trail);
235 }
c16e7869 236 /* create request message; allow space for oid */
f24e9980
SW
237 msg_size += 40;
238 if (snapc)
239 msg_size += sizeof(u64) * snapc->num_snaps;
240 if (use_mempool)
8f3bc053 241 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 242 else
3499e8a5 243 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
a79832f2 244 if (!msg) {
f24e9980 245 ceph_osdc_put_request(req);
a79832f2 246 return NULL;
f24e9980 247 }
68b4476b 248
f24e9980
SW
249 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
250 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5
YS
251
252 req->r_request = msg;
253 req->r_pages = pages;
68b4476b
YS
254#ifdef CONFIG_BLOCK
255 if (bio) {
256 req->r_bio = bio;
257 bio_get(req->r_bio);
258 }
259#endif
3499e8a5
YS
260
261 return req;
262}
3d14c5d2 263EXPORT_SYMBOL(ceph_osdc_alloc_request);
3499e8a5 264
68b4476b
YS
265static void osd_req_encode_op(struct ceph_osd_request *req,
266 struct ceph_osd_op *dst,
267 struct ceph_osd_req_op *src)
268{
269 dst->op = cpu_to_le16(src->op);
270
271 switch (dst->op) {
272 case CEPH_OSD_OP_READ:
273 case CEPH_OSD_OP_WRITE:
274 dst->extent.offset =
275 cpu_to_le64(src->extent.offset);
276 dst->extent.length =
277 cpu_to_le64(src->extent.length);
278 dst->extent.truncate_size =
279 cpu_to_le64(src->extent.truncate_size);
280 dst->extent.truncate_seq =
281 cpu_to_le32(src->extent.truncate_seq);
282 break;
283
284 case CEPH_OSD_OP_GETXATTR:
285 case CEPH_OSD_OP_SETXATTR:
286 case CEPH_OSD_OP_CMPXATTR:
287 BUG_ON(!req->r_trail);
288
289 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
290 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
291 dst->xattr.cmp_op = src->xattr.cmp_op;
292 dst->xattr.cmp_mode = src->xattr.cmp_mode;
293 ceph_pagelist_append(req->r_trail, src->xattr.name,
294 src->xattr.name_len);
295 ceph_pagelist_append(req->r_trail, src->xattr.val,
296 src->xattr.value_len);
297 break;
ae1533b6
YS
298 case CEPH_OSD_OP_CALL:
299 BUG_ON(!req->r_trail);
300
301 dst->cls.class_len = src->cls.class_len;
302 dst->cls.method_len = src->cls.method_len;
303 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
304
305 ceph_pagelist_append(req->r_trail, src->cls.class_name,
306 src->cls.class_len);
307 ceph_pagelist_append(req->r_trail, src->cls.method_name,
308 src->cls.method_len);
309 ceph_pagelist_append(req->r_trail, src->cls.indata,
310 src->cls.indata_len);
311 break;
312 case CEPH_OSD_OP_ROLLBACK:
313 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
314 break;
68b4476b
YS
315 case CEPH_OSD_OP_STARTSYNC:
316 break;
317 default:
318 pr_err("unrecognized osd opcode %d\n", dst->op);
319 WARN_ON(1);
320 break;
321 }
322 dst->payload_len = cpu_to_le32(src->payload_len);
323}
324
3499e8a5
YS
325/*
326 * build new request AND message
327 *
328 */
329void ceph_osdc_build_request(struct ceph_osd_request *req,
68b4476b
YS
330 u64 off, u64 *plen,
331 struct ceph_osd_req_op *src_ops,
332 struct ceph_snap_context *snapc,
333 struct timespec *mtime,
334 const char *oid,
335 int oid_len)
3499e8a5
YS
336{
337 struct ceph_msg *msg = req->r_request;
338 struct ceph_osd_request_head *head;
68b4476b 339 struct ceph_osd_req_op *src_op;
3499e8a5
YS
340 struct ceph_osd_op *op;
341 void *p;
68b4476b 342 int num_op = get_num_ops(src_ops, NULL);
3499e8a5 343 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
3499e8a5 344 int flags = req->r_flags;
68b4476b
YS
345 u64 data_len = 0;
346 int i;
3499e8a5 347
f24e9980
SW
348 head = msg->front.iov_base;
349 op = (void *)(head + 1);
350 p = (void *)(op + num_op);
351
f24e9980
SW
352 req->r_snapc = ceph_get_snap_context(snapc);
353
354 head->client_inc = cpu_to_le32(1); /* always, for now. */
355 head->flags = cpu_to_le32(flags);
356 if (flags & CEPH_OSD_FLAG_WRITE)
357 ceph_encode_timespec(&head->mtime, mtime);
358 head->num_ops = cpu_to_le16(num_op);
f24e9980 359
f24e9980
SW
360
361 /* fill in oid */
3499e8a5
YS
362 head->object_len = cpu_to_le32(oid_len);
363 memcpy(p, oid, oid_len);
364 p += oid_len;
f24e9980 365
68b4476b
YS
366 src_op = src_ops;
367 while (src_op->op) {
368 osd_req_encode_op(req, op, src_op);
369 src_op++;
f24e9980 370 op++;
f24e9980 371 }
68b4476b
YS
372
373 if (req->r_trail)
374 data_len += req->r_trail->length;
375
f24e9980
SW
376 if (snapc) {
377 head->snap_seq = cpu_to_le64(snapc->seq);
378 head->num_snaps = cpu_to_le32(snapc->num_snaps);
379 for (i = 0; i < snapc->num_snaps; i++) {
380 put_unaligned_le64(snapc->snaps[i], p);
381 p += sizeof(u64);
382 }
383 }
384
68b4476b
YS
385 if (flags & CEPH_OSD_FLAG_WRITE) {
386 req->r_request->hdr.data_off = cpu_to_le16(off);
387 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
388 } else if (data_len) {
389 req->r_request->hdr.data_off = 0;
390 req->r_request->hdr.data_len = cpu_to_le32(data_len);
391 }
392
c5c6b19d
SW
393 req->r_request->page_alignment = req->r_page_alignment;
394
f24e9980 395 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
396 msg_size = p - msg->front.iov_base;
397 msg->front.iov_len = msg_size;
398 msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5
YS
399 return;
400}
3d14c5d2 401EXPORT_SYMBOL(ceph_osdc_build_request);
3499e8a5
YS
402
403/*
404 * build new request AND message, calculate layout, and adjust file
405 * extent as needed.
406 *
407 * if the file was recently truncated, we include information about its
408 * old and new size so that the object can be updated appropriately. (we
409 * avoid synchronously deleting truncated objects because it's slow.)
410 *
411 * if @do_sync, include a 'startsync' command so that the osd will flush
412 * data quickly.
413 */
414struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
415 struct ceph_file_layout *layout,
416 struct ceph_vino vino,
417 u64 off, u64 *plen,
418 int opcode, int flags,
419 struct ceph_snap_context *snapc,
420 int do_sync,
421 u32 truncate_seq,
422 u64 truncate_size,
423 struct timespec *mtime,
b7495fc2
SW
424 bool use_mempool, int num_reply,
425 int page_align)
3499e8a5 426{
68b4476b
YS
427 struct ceph_osd_req_op ops[3];
428 struct ceph_osd_request *req;
429
430 ops[0].op = opcode;
431 ops[0].extent.truncate_seq = truncate_seq;
432 ops[0].extent.truncate_size = truncate_size;
433 ops[0].payload_len = 0;
434
435 if (do_sync) {
436 ops[1].op = CEPH_OSD_OP_STARTSYNC;
437 ops[1].payload_len = 0;
438 ops[2].op = 0;
439 } else
440 ops[1].op = 0;
441
442 req = ceph_osdc_alloc_request(osdc, flags,
443 snapc, ops,
3499e8a5 444 use_mempool,
68b4476b 445 GFP_NOFS, NULL, NULL);
3499e8a5
YS
446 if (IS_ERR(req))
447 return req;
448
449 /* calculate max write size */
68b4476b 450 calc_layout(osdc, vino, layout, off, plen, req, ops);
3499e8a5
YS
451 req->r_file_layout = *layout; /* keep a copy */
452
b7495fc2
SW
453 /* in case it differs from natural alignment that calc_layout
454 filled in for us */
455 req->r_page_alignment = page_align;
456
68b4476b
YS
457 ceph_osdc_build_request(req, off, plen, ops,
458 snapc,
3499e8a5
YS
459 mtime,
460 req->r_oid, req->r_oid_len);
461
f24e9980
SW
462 return req;
463}
3d14c5d2 464EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980
SW
465
466/*
467 * We keep osd requests in an rbtree, sorted by ->r_tid.
468 */
469static void __insert_request(struct ceph_osd_client *osdc,
470 struct ceph_osd_request *new)
471{
472 struct rb_node **p = &osdc->requests.rb_node;
473 struct rb_node *parent = NULL;
474 struct ceph_osd_request *req = NULL;
475
476 while (*p) {
477 parent = *p;
478 req = rb_entry(parent, struct ceph_osd_request, r_node);
479 if (new->r_tid < req->r_tid)
480 p = &(*p)->rb_left;
481 else if (new->r_tid > req->r_tid)
482 p = &(*p)->rb_right;
483 else
484 BUG();
485 }
486
487 rb_link_node(&new->r_node, parent, p);
488 rb_insert_color(&new->r_node, &osdc->requests);
489}
490
491static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
492 u64 tid)
493{
494 struct ceph_osd_request *req;
495 struct rb_node *n = osdc->requests.rb_node;
496
497 while (n) {
498 req = rb_entry(n, struct ceph_osd_request, r_node);
499 if (tid < req->r_tid)
500 n = n->rb_left;
501 else if (tid > req->r_tid)
502 n = n->rb_right;
503 else
504 return req;
505 }
506 return NULL;
507}
508
509static struct ceph_osd_request *
510__lookup_request_ge(struct ceph_osd_client *osdc,
511 u64 tid)
512{
513 struct ceph_osd_request *req;
514 struct rb_node *n = osdc->requests.rb_node;
515
516 while (n) {
517 req = rb_entry(n, struct ceph_osd_request, r_node);
518 if (tid < req->r_tid) {
519 if (!n->rb_left)
520 return req;
521 n = n->rb_left;
522 } else if (tid > req->r_tid) {
523 n = n->rb_right;
524 } else {
525 return req;
526 }
527 }
528 return NULL;
529}
530
6f6c7006
SW
531/*
532 * Resubmit requests pending on the given osd.
533 */
534static void __kick_osd_requests(struct ceph_osd_client *osdc,
535 struct ceph_osd *osd)
536{
537 struct ceph_osd_request *req;
538 int err;
539
540 dout("__kick_osd_requests osd%d\n", osd->o_osd);
541 err = __reset_osd(osdc, osd);
542 if (err == -EAGAIN)
543 return;
544
545 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
546 list_move(&req->r_req_lru_item, &osdc->req_unsent);
547 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
548 osd->o_osd);
549 req->r_flags |= CEPH_OSD_FLAG_RETRY;
550 }
551}
552
553static void kick_osd_requests(struct ceph_osd_client *osdc,
554 struct ceph_osd *kickosd)
555{
556 mutex_lock(&osdc->request_mutex);
557 __kick_osd_requests(osdc, kickosd);
558 mutex_unlock(&osdc->request_mutex);
559}
f24e9980
SW
560
561/*
81b024e7 562 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
563 */
564static void osd_reset(struct ceph_connection *con)
565{
566 struct ceph_osd *osd = con->private;
567 struct ceph_osd_client *osdc;
568
569 if (!osd)
570 return;
571 dout("osd_reset osd%d\n", osd->o_osd);
572 osdc = osd->o_osdc;
f24e9980 573 down_read(&osdc->map_sem);
6f6c7006
SW
574 kick_osd_requests(osdc, osd);
575 send_queued(osdc);
f24e9980
SW
576 up_read(&osdc->map_sem);
577}
578
579/*
580 * Track open sessions with osds.
581 */
582static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
583{
584 struct ceph_osd *osd;
585
586 osd = kzalloc(sizeof(*osd), GFP_NOFS);
587 if (!osd)
588 return NULL;
589
590 atomic_set(&osd->o_ref, 1);
591 osd->o_osdc = osdc;
592 INIT_LIST_HEAD(&osd->o_requests);
f5a2041b 593 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
594 osd->o_incarnation = 1;
595
596 ceph_con_init(osdc->client->msgr, &osd->o_con);
597 osd->o_con.private = osd;
598 osd->o_con.ops = &osd_con_ops;
599 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd 600
422d2cb8 601 INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980
SW
602 return osd;
603}
604
605static struct ceph_osd *get_osd(struct ceph_osd *osd)
606{
607 if (atomic_inc_not_zero(&osd->o_ref)) {
608 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
609 atomic_read(&osd->o_ref));
610 return osd;
611 } else {
612 dout("get_osd %p FAIL\n", osd);
613 return NULL;
614 }
615}
616
617static void put_osd(struct ceph_osd *osd)
618{
619 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
620 atomic_read(&osd->o_ref) - 1);
79494d1b
SW
621 if (atomic_dec_and_test(&osd->o_ref)) {
622 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
623
624 if (osd->o_authorizer)
625 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
f24e9980 626 kfree(osd);
79494d1b 627 }
f24e9980
SW
628}
629
630/*
631 * remove an osd from our map
632 */
f5a2041b 633static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 634{
f5a2041b 635 dout("__remove_osd %p\n", osd);
f24e9980
SW
636 BUG_ON(!list_empty(&osd->o_requests));
637 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 638 list_del_init(&osd->o_osd_lru);
f24e9980
SW
639 ceph_con_close(&osd->o_con);
640 put_osd(osd);
641}
642
f5a2041b
YS
643static void __move_osd_to_lru(struct ceph_osd_client *osdc,
644 struct ceph_osd *osd)
645{
646 dout("__move_osd_to_lru %p\n", osd);
647 BUG_ON(!list_empty(&osd->o_osd_lru));
648 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
3d14c5d2 649 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
f5a2041b
YS
650}
651
652static void __remove_osd_from_lru(struct ceph_osd *osd)
653{
654 dout("__remove_osd_from_lru %p\n", osd);
655 if (!list_empty(&osd->o_osd_lru))
656 list_del_init(&osd->o_osd_lru);
657}
658
659static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
660{
661 struct ceph_osd *osd, *nosd;
662
663 dout("__remove_old_osds %p\n", osdc);
664 mutex_lock(&osdc->request_mutex);
665 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
666 if (!remove_all && time_before(jiffies, osd->lru_ttl))
667 break;
668 __remove_osd(osdc, osd);
669 }
670 mutex_unlock(&osdc->request_mutex);
671}
672
f24e9980
SW
673/*
674 * reset osd connect
675 */
f5a2041b 676static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 677{
87b315a5 678 struct ceph_osd_request *req;
f24e9980
SW
679 int ret = 0;
680
f5a2041b 681 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
f24e9980 682 if (list_empty(&osd->o_requests)) {
f5a2041b 683 __remove_osd(osdc, osd);
87b315a5
SW
684 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
685 &osd->o_con.peer_addr,
686 sizeof(osd->o_con.peer_addr)) == 0 &&
687 !ceph_con_opened(&osd->o_con)) {
688 dout(" osd addr hasn't changed and connection never opened,"
689 " letting msgr retry");
690 /* touch each r_stamp for handle_timeout()'s benfit */
691 list_for_each_entry(req, &osd->o_requests, r_osd_item)
692 req->r_stamp = jiffies;
693 ret = -EAGAIN;
f24e9980
SW
694 } else {
695 ceph_con_close(&osd->o_con);
696 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
697 osd->o_incarnation++;
698 }
699 return ret;
700}
701
702static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
703{
704 struct rb_node **p = &osdc->osds.rb_node;
705 struct rb_node *parent = NULL;
706 struct ceph_osd *osd = NULL;
707
708 while (*p) {
709 parent = *p;
710 osd = rb_entry(parent, struct ceph_osd, o_node);
711 if (new->o_osd < osd->o_osd)
712 p = &(*p)->rb_left;
713 else if (new->o_osd > osd->o_osd)
714 p = &(*p)->rb_right;
715 else
716 BUG();
717 }
718
719 rb_link_node(&new->o_node, parent, p);
720 rb_insert_color(&new->o_node, &osdc->osds);
721}
722
723static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
724{
725 struct ceph_osd *osd;
726 struct rb_node *n = osdc->osds.rb_node;
727
728 while (n) {
729 osd = rb_entry(n, struct ceph_osd, o_node);
730 if (o < osd->o_osd)
731 n = n->rb_left;
732 else if (o > osd->o_osd)
733 n = n->rb_right;
734 else
735 return osd;
736 }
737 return NULL;
738}
739
422d2cb8
YS
740static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
741{
742 schedule_delayed_work(&osdc->timeout_work,
3d14c5d2 743 osdc->client->options->osd_keepalive_timeout * HZ);
422d2cb8
YS
744}
745
746static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
747{
748 cancel_delayed_work(&osdc->timeout_work);
749}
f24e9980
SW
750
751/*
752 * Register request, assign tid. If this is the first request, set up
753 * the timeout event.
754 */
755static void register_request(struct ceph_osd_client *osdc,
756 struct ceph_osd_request *req)
757{
f24e9980
SW
758 mutex_lock(&osdc->request_mutex);
759 req->r_tid = ++osdc->last_tid;
6df058c0 760 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
422d2cb8 761 INIT_LIST_HEAD(&req->r_req_lru_item);
f24e9980
SW
762
763 dout("register_request %p tid %lld\n", req, req->r_tid);
764 __insert_request(osdc, req);
765 ceph_osdc_get_request(req);
766 osdc->num_requests++;
767
f24e9980 768 if (osdc->num_requests == 1) {
422d2cb8
YS
769 dout(" first request, scheduling timeout\n");
770 __schedule_osd_timeout(osdc);
f24e9980
SW
771 }
772 mutex_unlock(&osdc->request_mutex);
773}
774
775/*
776 * called under osdc->request_mutex
777 */
778static void __unregister_request(struct ceph_osd_client *osdc,
779 struct ceph_osd_request *req)
780{
781 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
782 rb_erase(&req->r_node, &osdc->requests);
783 osdc->num_requests--;
784
0ba6478d
SW
785 if (req->r_osd) {
786 /* make sure the original request isn't in flight. */
787 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
788
789 list_del_init(&req->r_osd_item);
790 if (list_empty(&req->r_osd->o_requests))
f5a2041b 791 __move_osd_to_lru(osdc, req->r_osd);
0ba6478d
SW
792 req->r_osd = NULL;
793 }
f24e9980
SW
794
795 ceph_osdc_put_request(req);
796
422d2cb8
YS
797 list_del_init(&req->r_req_lru_item);
798 if (osdc->num_requests == 0) {
799 dout(" no requests, canceling timeout\n");
800 __cancel_osd_timeout(osdc);
f24e9980
SW
801 }
802}
803
804/*
805 * Cancel a previously queued request message
806 */
807static void __cancel_request(struct ceph_osd_request *req)
808{
6bc18876 809 if (req->r_sent && req->r_osd) {
f24e9980
SW
810 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
811 req->r_sent = 0;
812 }
813}
814
815/*
816 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
817 * (as needed), and set the request r_osd appropriately. If there is
6f6c7006
SW
818 * no up osd, set r_osd to NULL. Move the request to the appropiate list
819 * (unsent, homeless) or leave on in-flight lru.
f24e9980
SW
820 *
821 * Return 0 if unchanged, 1 if changed, or negative on error.
822 *
823 * Caller should hold map_sem for read and request_mutex.
824 */
6f6c7006
SW
825static int __map_request(struct ceph_osd_client *osdc,
826 struct ceph_osd_request *req)
f24e9980
SW
827{
828 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 829 struct ceph_pg pgid;
d85b7056
SW
830 int acting[CEPH_PG_MAX_SIZE];
831 int o = -1, num = 0;
f24e9980 832 int err;
f24e9980 833
6f6c7006 834 dout("map_request %p tid %lld\n", req, req->r_tid);
f24e9980
SW
835 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
836 &req->r_file_layout, osdc->osdmap);
6f6c7006
SW
837 if (err) {
838 list_move(&req->r_req_lru_item, &osdc->req_notarget);
f24e9980 839 return err;
6f6c7006 840 }
51042122 841 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
842 req->r_pgid = pgid;
843
d85b7056
SW
844 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
845 if (err > 0) {
846 o = acting[0];
847 num = err;
848 }
f24e9980
SW
849
850 if ((req->r_osd && req->r_osd->o_osd == o &&
d85b7056
SW
851 req->r_sent >= req->r_osd->o_incarnation &&
852 req->r_num_pg_osds == num &&
853 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980
SW
854 (req->r_osd == NULL && o == -1))
855 return 0; /* no change */
856
6f6c7006 857 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
51042122 858 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
859 req->r_osd ? req->r_osd->o_osd : -1);
860
d85b7056
SW
861 /* record full pg acting set */
862 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
863 req->r_num_pg_osds = num;
864
f24e9980
SW
865 if (req->r_osd) {
866 __cancel_request(req);
867 list_del_init(&req->r_osd_item);
f24e9980
SW
868 req->r_osd = NULL;
869 }
870
871 req->r_osd = __lookup_osd(osdc, o);
872 if (!req->r_osd && o >= 0) {
c99eb1c7
SW
873 err = -ENOMEM;
874 req->r_osd = create_osd(osdc);
6f6c7006
SW
875 if (!req->r_osd) {
876 list_move(&req->r_req_lru_item, &osdc->req_notarget);
c99eb1c7 877 goto out;
6f6c7006 878 }
f24e9980 879
6f6c7006 880 dout("map_request osd %p is osd%d\n", req->r_osd, o);
f24e9980
SW
881 req->r_osd->o_osd = o;
882 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
883 __insert_osd(osdc, req->r_osd);
884
885 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
886 }
887
f5a2041b
YS
888 if (req->r_osd) {
889 __remove_osd_from_lru(req->r_osd);
f24e9980 890 list_add(&req->r_osd_item, &req->r_osd->o_requests);
6f6c7006
SW
891 list_move(&req->r_req_lru_item, &osdc->req_unsent);
892 } else {
893 list_move(&req->r_req_lru_item, &osdc->req_notarget);
f5a2041b 894 }
d85b7056 895 err = 1; /* osd or pg changed */
f24e9980
SW
896
897out:
f24e9980
SW
898 return err;
899}
900
901/*
902 * caller should hold map_sem (for read) and request_mutex
903 */
904static int __send_request(struct ceph_osd_client *osdc,
905 struct ceph_osd_request *req)
906{
907 struct ceph_osd_request_head *reqhead;
f24e9980
SW
908
909 dout("send_request %p tid %llu to osd%d flags %d\n",
910 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
911
912 reqhead = req->r_request->front.iov_base;
913 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
914 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
915 reqhead->reassert_version = req->r_reassert_version;
916
3dd72fc0 917 req->r_stamp = jiffies;
07a27e22 918 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980
SW
919
920 ceph_msg_get(req->r_request); /* send consumes a ref */
921 ceph_con_send(&req->r_osd->o_con, req->r_request);
922 req->r_sent = req->r_osd->o_incarnation;
923 return 0;
924}
925
6f6c7006
SW
926/*
927 * Send any requests in the queue (req_unsent).
928 */
929static void send_queued(struct ceph_osd_client *osdc)
930{
931 struct ceph_osd_request *req, *tmp;
932
933 dout("send_queued\n");
934 mutex_lock(&osdc->request_mutex);
935 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
936 __send_request(osdc, req);
937 }
938 mutex_unlock(&osdc->request_mutex);
939}
940
f24e9980
SW
941/*
942 * Timeout callback, called every N seconds when 1 or more osd
943 * requests has been active for more than N seconds. When this
944 * happens, we ping all OSDs with requests who have timed out to
945 * ensure any communications channel reset is detected. Reset the
946 * request timeouts another N seconds in the future as we go.
947 * Reschedule the timeout event another N seconds in future (unless
948 * there are no open requests).
949 */
950static void handle_timeout(struct work_struct *work)
951{
952 struct ceph_osd_client *osdc =
953 container_of(work, struct ceph_osd_client, timeout_work.work);
422d2cb8 954 struct ceph_osd_request *req, *last_req = NULL;
f24e9980 955 struct ceph_osd *osd;
3d14c5d2 956 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
422d2cb8 957 unsigned long keepalive =
3d14c5d2 958 osdc->client->options->osd_keepalive_timeout * HZ;
3dd72fc0 959 unsigned long last_stamp = 0;
422d2cb8 960 struct list_head slow_osds;
f24e9980
SW
961
962 dout("timeout\n");
963 down_read(&osdc->map_sem);
964
965 ceph_monc_request_next_osdmap(&osdc->client->monc);
966
967 mutex_lock(&osdc->request_mutex);
f24e9980 968
422d2cb8
YS
969 /*
970 * reset osds that appear to be _really_ unresponsive. this
971 * is a failsafe measure.. we really shouldn't be getting to
972 * this point if the system is working properly. the monitors
973 * should mark the osd as failed and we should find out about
974 * it from an updated osd map.
975 */
f26e681d 976 while (timeout && !list_empty(&osdc->req_lru)) {
422d2cb8
YS
977 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
978 r_req_lru_item);
979
3dd72fc0 980 if (time_before(jiffies, req->r_stamp + timeout))
422d2cb8
YS
981 break;
982
3dd72fc0 983 BUG_ON(req == last_req && req->r_stamp == last_stamp);
422d2cb8 984 last_req = req;
3dd72fc0 985 last_stamp = req->r_stamp;
422d2cb8
YS
986
987 osd = req->r_osd;
988 BUG_ON(!osd);
989 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
990 req->r_tid, osd->o_osd);
6f6c7006 991 __kick_osd_requests(osdc, osd);
422d2cb8
YS
992 }
993
994 /*
995 * ping osds that are a bit slow. this ensures that if there
996 * is a break in the TCP connection we will notice, and reopen
997 * a connection with that osd (from the fault callback).
998 */
999 INIT_LIST_HEAD(&slow_osds);
1000 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0 1001 if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8
YS
1002 break;
1003
1004 osd = req->r_osd;
1005 BUG_ON(!osd);
1006 dout(" tid %llu is slow, will send keepalive on osd%d\n",
f24e9980 1007 req->r_tid, osd->o_osd);
422d2cb8
YS
1008 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1009 }
1010 while (!list_empty(&slow_osds)) {
1011 osd = list_entry(slow_osds.next, struct ceph_osd,
1012 o_keepalive_item);
1013 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
1014 ceph_con_keepalive(&osd->o_con);
1015 }
1016
422d2cb8 1017 __schedule_osd_timeout(osdc);
f24e9980 1018 mutex_unlock(&osdc->request_mutex);
6f6c7006 1019 send_queued(osdc);
f24e9980
SW
1020 up_read(&osdc->map_sem);
1021}
1022
f5a2041b
YS
1023static void handle_osds_timeout(struct work_struct *work)
1024{
1025 struct ceph_osd_client *osdc =
1026 container_of(work, struct ceph_osd_client,
1027 osds_timeout_work.work);
1028 unsigned long delay =
3d14c5d2 1029 osdc->client->options->osd_idle_ttl * HZ >> 2;
f5a2041b
YS
1030
1031 dout("osds timeout\n");
1032 down_read(&osdc->map_sem);
1033 remove_old_osds(osdc, 0);
1034 up_read(&osdc->map_sem);
1035
1036 schedule_delayed_work(&osdc->osds_timeout_work,
1037 round_jiffies_relative(delay));
1038}
1039
f24e9980
SW
1040/*
1041 * handle osd op reply. either call the callback if it is specified,
1042 * or do the completion to wake up the waiting thread.
1043 */
350b1c32
SW
1044static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1045 struct ceph_connection *con)
f24e9980
SW
1046{
1047 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1048 struct ceph_osd_request *req;
1049 u64 tid;
1050 int numops, object_len, flags;
0ceed5db 1051 s32 result;
f24e9980 1052
6df058c0 1053 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
1054 if (msg->front.iov_len < sizeof(*rhead))
1055 goto bad;
f24e9980
SW
1056 numops = le32_to_cpu(rhead->num_ops);
1057 object_len = le32_to_cpu(rhead->object_len);
0ceed5db 1058 result = le32_to_cpu(rhead->result);
f24e9980
SW
1059 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1060 numops * sizeof(struct ceph_osd_op))
1061 goto bad;
0ceed5db 1062 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
f24e9980
SW
1063
1064 /* lookup */
1065 mutex_lock(&osdc->request_mutex);
1066 req = __lookup_request(osdc, tid);
1067 if (req == NULL) {
1068 dout("handle_reply tid %llu dne\n", tid);
1069 mutex_unlock(&osdc->request_mutex);
1070 return;
1071 }
1072 ceph_osdc_get_request(req);
1073 flags = le32_to_cpu(rhead->flags);
1074
350b1c32 1075 /*
0d59ab81 1076 * if this connection filled our message, drop our reference now, to
350b1c32
SW
1077 * avoid a (safe but slower) revoke later.
1078 */
0d59ab81 1079 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 1080 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 1081 req->r_con_filling_msg = NULL;
350b1c32
SW
1082 ceph_con_put(con);
1083 }
1084
f24e9980
SW
1085 if (!req->r_got_reply) {
1086 unsigned bytes;
1087
1088 req->r_result = le32_to_cpu(rhead->result);
1089 bytes = le32_to_cpu(msg->hdr.data_len);
1090 dout("handle_reply result %d bytes %d\n", req->r_result,
1091 bytes);
1092 if (req->r_result == 0)
1093 req->r_result = bytes;
1094
1095 /* in case this is a write and we need to replay, */
1096 req->r_reassert_version = rhead->reassert_version;
1097
1098 req->r_got_reply = 1;
1099 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1100 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 1101 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
1102 goto done;
1103 }
1104
1105 dout("handle_reply tid %llu flags %d\n", tid, flags);
1106
1107 /* either this is a read, or we got the safe response */
0ceed5db
SW
1108 if (result < 0 ||
1109 (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980
SW
1110 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1111 __unregister_request(osdc, req);
1112
1113 mutex_unlock(&osdc->request_mutex);
1114
1115 if (req->r_callback)
1116 req->r_callback(req, msg);
1117 else
03066f23 1118 complete_all(&req->r_completion);
f24e9980
SW
1119
1120 if (flags & CEPH_OSD_FLAG_ONDISK) {
1121 if (req->r_safe_callback)
1122 req->r_safe_callback(req, msg);
03066f23 1123 complete_all(&req->r_safe_completion); /* fsync waiter */
f24e9980
SW
1124 }
1125
1126done:
1127 ceph_osdc_put_request(req);
1128 return;
1129
1130bad:
1131 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1132 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1133 (int)sizeof(*rhead));
9ec7cab1 1134 ceph_msg_dump(msg);
f24e9980
SW
1135}
1136
6f6c7006 1137static void reset_changed_osds(struct ceph_osd_client *osdc)
f24e9980 1138{
f24e9980 1139 struct rb_node *p, *n;
f24e9980 1140
6f6c7006
SW
1141 for (p = rb_first(&osdc->osds); p; p = n) {
1142 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
f24e9980 1143
6f6c7006
SW
1144 n = rb_next(p);
1145 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1146 memcmp(&osd->o_con.peer_addr,
1147 ceph_osd_addr(osdc->osdmap,
1148 osd->o_osd),
1149 sizeof(struct ceph_entity_addr)) != 0)
1150 __reset_osd(osdc, osd);
f24e9980 1151 }
422d2cb8
YS
1152}
1153
1154/*
6f6c7006
SW
1155 * Requeue requests whose mapping to an OSD has changed. If requests map to
1156 * no osd, request a new map.
422d2cb8
YS
1157 *
1158 * Caller should hold map_sem for read and request_mutex.
1159 */
6f6c7006 1160static void kick_requests(struct ceph_osd_client *osdc)
422d2cb8 1161{
6f6c7006
SW
1162 struct ceph_osd_request *req;
1163 struct rb_node *p;
1164 int needmap = 0;
1165 int err;
422d2cb8 1166
6f6c7006 1167 dout("kick_requests\n");
422d2cb8 1168 mutex_lock(&osdc->request_mutex);
6f6c7006
SW
1169 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1170 req = rb_entry(p, struct ceph_osd_request, r_node);
1171 err = __map_request(osdc, req);
1172 if (err < 0)
1173 continue; /* error */
1174 if (req->r_osd == NULL) {
1175 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1176 needmap++; /* request a newer map */
1177 } else if (err > 0) {
1178 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1179 req->r_osd ? req->r_osd->o_osd : -1);
1180 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1181 }
1182 }
f24e9980
SW
1183 mutex_unlock(&osdc->request_mutex);
1184
1185 if (needmap) {
1186 dout("%d requests for down osds, need new map\n", needmap);
1187 ceph_monc_request_next_osdmap(&osdc->client->monc);
1188 }
422d2cb8 1189}
6f6c7006
SW
1190
1191
f24e9980
SW
1192/*
1193 * Process updated osd map.
1194 *
1195 * The message contains any number of incremental and full maps, normally
1196 * indicating some sort of topology change in the cluster. Kick requests
1197 * off to different OSDs as needed.
1198 */
1199void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1200{
1201 void *p, *end, *next;
1202 u32 nr_maps, maplen;
1203 u32 epoch;
1204 struct ceph_osdmap *newmap = NULL, *oldmap;
1205 int err;
1206 struct ceph_fsid fsid;
1207
1208 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1209 p = msg->front.iov_base;
1210 end = p + msg->front.iov_len;
1211
1212 /* verify fsid */
1213 ceph_decode_need(&p, end, sizeof(fsid), bad);
1214 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
1215 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1216 return;
f24e9980
SW
1217
1218 down_write(&osdc->map_sem);
1219
1220 /* incremental maps */
1221 ceph_decode_32_safe(&p, end, nr_maps, bad);
1222 dout(" %d inc maps\n", nr_maps);
1223 while (nr_maps > 0) {
1224 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1225 epoch = ceph_decode_32(&p);
1226 maplen = ceph_decode_32(&p);
f24e9980
SW
1227 ceph_decode_need(&p, end, maplen, bad);
1228 next = p + maplen;
1229 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1230 dout("applying incremental map %u len %d\n",
1231 epoch, maplen);
1232 newmap = osdmap_apply_incremental(&p, next,
1233 osdc->osdmap,
1234 osdc->client->msgr);
1235 if (IS_ERR(newmap)) {
1236 err = PTR_ERR(newmap);
1237 goto bad;
1238 }
30dc6381 1239 BUG_ON(!newmap);
f24e9980
SW
1240 if (newmap != osdc->osdmap) {
1241 ceph_osdmap_destroy(osdc->osdmap);
1242 osdc->osdmap = newmap;
1243 }
6f6c7006
SW
1244 kick_requests(osdc);
1245 reset_changed_osds(osdc);
f24e9980
SW
1246 } else {
1247 dout("ignoring incremental map %u len %d\n",
1248 epoch, maplen);
1249 }
1250 p = next;
1251 nr_maps--;
1252 }
1253 if (newmap)
1254 goto done;
1255
1256 /* full maps */
1257 ceph_decode_32_safe(&p, end, nr_maps, bad);
1258 dout(" %d full maps\n", nr_maps);
1259 while (nr_maps) {
1260 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1261 epoch = ceph_decode_32(&p);
1262 maplen = ceph_decode_32(&p);
f24e9980
SW
1263 ceph_decode_need(&p, end, maplen, bad);
1264 if (nr_maps > 1) {
1265 dout("skipping non-latest full map %u len %d\n",
1266 epoch, maplen);
1267 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1268 dout("skipping full map %u len %d, "
1269 "older than our %u\n", epoch, maplen,
1270 osdc->osdmap->epoch);
1271 } else {
1272 dout("taking full map %u len %d\n", epoch, maplen);
1273 newmap = osdmap_decode(&p, p+maplen);
1274 if (IS_ERR(newmap)) {
1275 err = PTR_ERR(newmap);
1276 goto bad;
1277 }
30dc6381 1278 BUG_ON(!newmap);
f24e9980
SW
1279 oldmap = osdc->osdmap;
1280 osdc->osdmap = newmap;
1281 if (oldmap)
1282 ceph_osdmap_destroy(oldmap);
6f6c7006 1283 kick_requests(osdc);
f24e9980
SW
1284 }
1285 p += maplen;
1286 nr_maps--;
1287 }
1288
1289done:
1290 downgrade_write(&osdc->map_sem);
1291 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
6f6c7006 1292 send_queued(osdc);
f24e9980 1293 up_read(&osdc->map_sem);
03066f23 1294 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
1295 return;
1296
1297bad:
1298 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1299 ceph_msg_dump(msg);
f24e9980
SW
1300 up_write(&osdc->map_sem);
1301 return;
1302}
1303
f24e9980
SW
1304/*
1305 * Register request, send initial attempt.
1306 */
1307int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1308 struct ceph_osd_request *req,
1309 bool nofail)
1310{
c1ea8823 1311 int rc = 0;
f24e9980
SW
1312
1313 req->r_request->pages = req->r_pages;
1314 req->r_request->nr_pages = req->r_num_pages;
68b4476b
YS
1315#ifdef CONFIG_BLOCK
1316 req->r_request->bio = req->r_bio;
1317#endif
1318 req->r_request->trail = req->r_trail;
f24e9980
SW
1319
1320 register_request(osdc, req);
1321
1322 down_read(&osdc->map_sem);
1323 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1324 /*
1325 * a racing kick_requests() may have sent the message for us
1326 * while we dropped request_mutex above, so only send now if
1327 * the request still han't been touched yet.
1328 */
1329 if (req->r_sent == 0) {
6f6c7006
SW
1330 rc = __map_request(osdc, req);
1331 if (rc < 0)
1332 return rc;
1333 if (req->r_osd == NULL) {
1334 dout("send_request %p no up osds in pg\n", req);
1335 ceph_monc_request_next_osdmap(&osdc->client->monc);
1336 } else {
1337 rc = __send_request(osdc, req);
1338 if (rc) {
1339 if (nofail) {
1340 dout("osdc_start_request failed send, "
1341 " will retry %lld\n", req->r_tid);
1342 rc = 0;
1343 } else {
1344 __unregister_request(osdc, req);
1345 }
c1ea8823 1346 }
f24e9980
SW
1347 }
1348 }
1349 mutex_unlock(&osdc->request_mutex);
1350 up_read(&osdc->map_sem);
1351 return rc;
1352}
3d14c5d2 1353EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980
SW
1354
1355/*
1356 * wait for a request to complete
1357 */
1358int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1359 struct ceph_osd_request *req)
1360{
1361 int rc;
1362
1363 rc = wait_for_completion_interruptible(&req->r_completion);
1364 if (rc < 0) {
1365 mutex_lock(&osdc->request_mutex);
1366 __cancel_request(req);
529cfcc4 1367 __unregister_request(osdc, req);
f24e9980 1368 mutex_unlock(&osdc->request_mutex);
529cfcc4 1369 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1370 return rc;
1371 }
1372
1373 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1374 return req->r_result;
1375}
3d14c5d2 1376EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980
SW
1377
1378/*
1379 * sync - wait for all in-flight requests to flush. avoid starvation.
1380 */
1381void ceph_osdc_sync(struct ceph_osd_client *osdc)
1382{
1383 struct ceph_osd_request *req;
1384 u64 last_tid, next_tid = 0;
1385
1386 mutex_lock(&osdc->request_mutex);
1387 last_tid = osdc->last_tid;
1388 while (1) {
1389 req = __lookup_request_ge(osdc, next_tid);
1390 if (!req)
1391 break;
1392 if (req->r_tid > last_tid)
1393 break;
1394
1395 next_tid = req->r_tid + 1;
1396 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1397 continue;
1398
1399 ceph_osdc_get_request(req);
1400 mutex_unlock(&osdc->request_mutex);
1401 dout("sync waiting on tid %llu (last is %llu)\n",
1402 req->r_tid, last_tid);
1403 wait_for_completion(&req->r_safe_completion);
1404 mutex_lock(&osdc->request_mutex);
1405 ceph_osdc_put_request(req);
1406 }
1407 mutex_unlock(&osdc->request_mutex);
1408 dout("sync done (thru tid %llu)\n", last_tid);
1409}
3d14c5d2 1410EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980
SW
1411
1412/*
1413 * init, shutdown
1414 */
1415int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1416{
1417 int err;
1418
1419 dout("init\n");
1420 osdc->client = client;
1421 osdc->osdmap = NULL;
1422 init_rwsem(&osdc->map_sem);
1423 init_completion(&osdc->map_waiters);
1424 osdc->last_requested_map = 0;
1425 mutex_init(&osdc->request_mutex);
f24e9980
SW
1426 osdc->last_tid = 0;
1427 osdc->osds = RB_ROOT;
f5a2041b 1428 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980 1429 osdc->requests = RB_ROOT;
422d2cb8 1430 INIT_LIST_HEAD(&osdc->req_lru);
6f6c7006
SW
1431 INIT_LIST_HEAD(&osdc->req_unsent);
1432 INIT_LIST_HEAD(&osdc->req_notarget);
f24e9980
SW
1433 osdc->num_requests = 0;
1434 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
1435 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1436
1437 schedule_delayed_work(&osdc->osds_timeout_work,
3d14c5d2 1438 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
f24e9980 1439
5f44f142 1440 err = -ENOMEM;
f24e9980
SW
1441 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1442 sizeof(struct ceph_osd_request));
1443 if (!osdc->req_mempool)
5f44f142 1444 goto out;
f24e9980 1445
4f48280e
SW
1446 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1447 "osd_op");
f24e9980 1448 if (err < 0)
5f44f142 1449 goto out_mempool;
c16e7869 1450 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
4f48280e
SW
1451 OSD_OPREPLY_FRONT_LEN, 10, true,
1452 "osd_op_reply");
c16e7869
SW
1453 if (err < 0)
1454 goto out_msgpool;
f24e9980 1455 return 0;
5f44f142 1456
c16e7869
SW
1457out_msgpool:
1458 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1459out_mempool:
1460 mempool_destroy(osdc->req_mempool);
1461out:
1462 return err;
f24e9980 1463}
3d14c5d2 1464EXPORT_SYMBOL(ceph_osdc_init);
f24e9980
SW
1465
1466void ceph_osdc_stop(struct ceph_osd_client *osdc)
1467{
1468 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1469 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1470 if (osdc->osdmap) {
1471 ceph_osdmap_destroy(osdc->osdmap);
1472 osdc->osdmap = NULL;
1473 }
f5a2041b 1474 remove_old_osds(osdc, 1);
f24e9980
SW
1475 mempool_destroy(osdc->req_mempool);
1476 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1477 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980 1478}
3d14c5d2 1479EXPORT_SYMBOL(ceph_osdc_stop);
f24e9980
SW
1480
1481/*
1482 * Read some contiguous pages. If we cross a stripe boundary, shorten
1483 * *plen. Return number of bytes read, or error.
1484 */
1485int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1486 struct ceph_vino vino, struct ceph_file_layout *layout,
1487 u64 off, u64 *plen,
1488 u32 truncate_seq, u64 truncate_size,
b7495fc2 1489 struct page **pages, int num_pages, int page_align)
f24e9980
SW
1490{
1491 struct ceph_osd_request *req;
1492 int rc = 0;
1493
1494 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1495 vino.snap, off, *plen);
1496 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1497 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1498 NULL, 0, truncate_seq, truncate_size, NULL,
b7495fc2 1499 false, 1, page_align);
a79832f2
SW
1500 if (!req)
1501 return -ENOMEM;
f24e9980
SW
1502
1503 /* it may be a short read due to an object boundary */
1504 req->r_pages = pages;
f24e9980 1505
b7495fc2
SW
1506 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1507 off, *plen, req->r_num_pages, page_align);
f24e9980
SW
1508
1509 rc = ceph_osdc_start_request(osdc, req, false);
1510 if (!rc)
1511 rc = ceph_osdc_wait_request(osdc, req);
1512
1513 ceph_osdc_put_request(req);
1514 dout("readpages result %d\n", rc);
1515 return rc;
1516}
3d14c5d2 1517EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980
SW
1518
1519/*
1520 * do a synchronous write on N pages
1521 */
1522int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1523 struct ceph_file_layout *layout,
1524 struct ceph_snap_context *snapc,
1525 u64 off, u64 len,
1526 u32 truncate_seq, u64 truncate_size,
1527 struct timespec *mtime,
1528 struct page **pages, int num_pages,
1529 int flags, int do_sync, bool nofail)
1530{
1531 struct ceph_osd_request *req;
1532 int rc = 0;
b7495fc2 1533 int page_align = off & ~PAGE_MASK;
f24e9980
SW
1534
1535 BUG_ON(vino.snap != CEPH_NOSNAP);
1536 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1537 CEPH_OSD_OP_WRITE,
1538 flags | CEPH_OSD_FLAG_ONDISK |
1539 CEPH_OSD_FLAG_WRITE,
1540 snapc, do_sync,
1541 truncate_seq, truncate_size, mtime,
b7495fc2 1542 nofail, 1, page_align);
a79832f2
SW
1543 if (!req)
1544 return -ENOMEM;
f24e9980
SW
1545
1546 /* it may be a short write due to an object boundary */
1547 req->r_pages = pages;
f24e9980
SW
1548 dout("writepages %llu~%llu (%d pages)\n", off, len,
1549 req->r_num_pages);
1550
1551 rc = ceph_osdc_start_request(osdc, req, nofail);
1552 if (!rc)
1553 rc = ceph_osdc_wait_request(osdc, req);
1554
1555 ceph_osdc_put_request(req);
1556 if (rc == 0)
1557 rc = len;
1558 dout("writepages result %d\n", rc);
1559 return rc;
1560}
3d14c5d2 1561EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980
SW
1562
1563/*
1564 * handle incoming message
1565 */
1566static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1567{
1568 struct ceph_osd *osd = con->private;
32c895e7 1569 struct ceph_osd_client *osdc;
f24e9980
SW
1570 int type = le16_to_cpu(msg->hdr.type);
1571
1572 if (!osd)
4a32f93d 1573 goto out;
32c895e7 1574 osdc = osd->o_osdc;
f24e9980
SW
1575
1576 switch (type) {
1577 case CEPH_MSG_OSD_MAP:
1578 ceph_osdc_handle_map(osdc, msg);
1579 break;
1580 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1581 handle_reply(osdc, msg, con);
f24e9980
SW
1582 break;
1583
1584 default:
1585 pr_err("received unknown message type %d %s\n", type,
1586 ceph_msg_type_name(type));
1587 }
4a32f93d 1588out:
f24e9980
SW
1589 ceph_msg_put(msg);
1590}
1591
5b3a4db3 1592/*
21b667f6
SW
1593 * lookup and return message for incoming reply. set up reply message
1594 * pages.
5b3a4db3
SW
1595 */
1596static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1597 struct ceph_msg_header *hdr,
1598 int *skip)
f24e9980
SW
1599{
1600 struct ceph_osd *osd = con->private;
1601 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1602 struct ceph_msg *m;
0547a9b3 1603 struct ceph_osd_request *req;
5b3a4db3
SW
1604 int front = le32_to_cpu(hdr->front_len);
1605 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3 1606 u64 tid;
f24e9980 1607
0547a9b3
YS
1608 tid = le64_to_cpu(hdr->tid);
1609 mutex_lock(&osdc->request_mutex);
1610 req = __lookup_request(osdc, tid);
1611 if (!req) {
1612 *skip = 1;
1613 m = NULL;
c16e7869 1614 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
5b3a4db3 1615 osd->o_osd);
0547a9b3
YS
1616 goto out;
1617 }
c16e7869
SW
1618
1619 if (req->r_con_filling_msg) {
1620 dout("get_reply revoking msg %p from old con %p\n",
1621 req->r_reply, req->r_con_filling_msg);
1622 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1623 ceph_con_put(req->r_con_filling_msg);
6f46cb29 1624 req->r_con_filling_msg = NULL;
0547a9b3
YS
1625 }
1626
c16e7869
SW
1627 if (front > req->r_reply->front.iov_len) {
1628 pr_warning("get_reply front %d > preallocated %d\n",
1629 front, (int)req->r_reply->front.iov_len);
34d23762 1630 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
a79832f2 1631 if (!m)
c16e7869
SW
1632 goto out;
1633 ceph_msg_put(req->r_reply);
1634 req->r_reply = m;
1635 }
1636 m = ceph_msg_get(req->r_reply);
1637
0547a9b3 1638 if (data_len > 0) {
b7495fc2 1639 int want = calc_pages_for(req->r_page_alignment, data_len);
21b667f6
SW
1640
1641 if (unlikely(req->r_num_pages < want)) {
1642 pr_warning("tid %lld reply %d > expected %d pages\n",
1643 tid, want, m->nr_pages);
0547a9b3
YS
1644 *skip = 1;
1645 ceph_msg_put(m);
a79832f2 1646 m = NULL;
21b667f6 1647 goto out;
0547a9b3 1648 }
21b667f6
SW
1649 m->pages = req->r_pages;
1650 m->nr_pages = req->r_num_pages;
c5c6b19d 1651 m->page_alignment = req->r_page_alignment;
68b4476b
YS
1652#ifdef CONFIG_BLOCK
1653 m->bio = req->r_bio;
1654#endif
0547a9b3 1655 }
5b3a4db3 1656 *skip = 0;
c16e7869
SW
1657 req->r_con_filling_msg = ceph_con_get(con);
1658 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
1659
1660out:
1661 mutex_unlock(&osdc->request_mutex);
2450418c 1662 return m;
5b3a4db3
SW
1663
1664}
1665
1666static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1667 struct ceph_msg_header *hdr,
1668 int *skip)
1669{
1670 struct ceph_osd *osd = con->private;
1671 int type = le16_to_cpu(hdr->type);
1672 int front = le32_to_cpu(hdr->front_len);
1673
1674 switch (type) {
1675 case CEPH_MSG_OSD_MAP:
34d23762 1676 return ceph_msg_new(type, front, GFP_NOFS);
5b3a4db3
SW
1677 case CEPH_MSG_OSD_OPREPLY:
1678 return get_reply(con, hdr, skip);
1679 default:
1680 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1681 osd->o_osd);
1682 *skip = 1;
1683 return NULL;
1684 }
f24e9980
SW
1685}
1686
1687/*
1688 * Wrappers to refcount containing ceph_osd struct
1689 */
1690static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1691{
1692 struct ceph_osd *osd = con->private;
1693 if (get_osd(osd))
1694 return con;
1695 return NULL;
1696}
1697
1698static void put_osd_con(struct ceph_connection *con)
1699{
1700 struct ceph_osd *osd = con->private;
1701 put_osd(osd);
1702}
1703
4e7a5dcd
SW
1704/*
1705 * authentication
1706 */
1707static int get_authorizer(struct ceph_connection *con,
213c99ee
SW
1708 void **buf, int *len, int *proto,
1709 void **reply_buf, int *reply_len, int force_new)
4e7a5dcd
SW
1710{
1711 struct ceph_osd *o = con->private;
1712 struct ceph_osd_client *osdc = o->o_osdc;
1713 struct ceph_auth_client *ac = osdc->client->monc.auth;
1714 int ret = 0;
1715
1716 if (force_new && o->o_authorizer) {
1717 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1718 o->o_authorizer = NULL;
1719 }
1720 if (o->o_authorizer == NULL) {
1721 ret = ac->ops->create_authorizer(
1722 ac, CEPH_ENTITY_TYPE_OSD,
1723 &o->o_authorizer,
1724 &o->o_authorizer_buf,
1725 &o->o_authorizer_buf_len,
1726 &o->o_authorizer_reply_buf,
1727 &o->o_authorizer_reply_buf_len);
1728 if (ret)
213c99ee 1729 return ret;
4e7a5dcd
SW
1730 }
1731
1732 *proto = ac->protocol;
1733 *buf = o->o_authorizer_buf;
1734 *len = o->o_authorizer_buf_len;
1735 *reply_buf = o->o_authorizer_reply_buf;
1736 *reply_len = o->o_authorizer_reply_buf_len;
1737 return 0;
1738}
1739
1740
1741static int verify_authorizer_reply(struct ceph_connection *con, int len)
1742{
1743 struct ceph_osd *o = con->private;
1744 struct ceph_osd_client *osdc = o->o_osdc;
1745 struct ceph_auth_client *ac = osdc->client->monc.auth;
1746
1747 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1748}
1749
9bd2e6f8
SW
1750static int invalidate_authorizer(struct ceph_connection *con)
1751{
1752 struct ceph_osd *o = con->private;
1753 struct ceph_osd_client *osdc = o->o_osdc;
1754 struct ceph_auth_client *ac = osdc->client->monc.auth;
1755
1756 if (ac->ops->invalidate_authorizer)
1757 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1758
1759 return ceph_monc_validate_auth(&osdc->client->monc);
1760}
4e7a5dcd 1761
9e32789f 1762static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
1763 .get = get_osd_con,
1764 .put = put_osd_con,
1765 .dispatch = dispatch,
4e7a5dcd
SW
1766 .get_authorizer = get_authorizer,
1767 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 1768 .invalidate_authorizer = invalidate_authorizer,
f24e9980 1769 .alloc_msg = alloc_msg,
81b024e7 1770 .fault = osd_reset,
f24e9980 1771};
This page took 0.171977 seconds and 5 git commands to generate.