03439565c7383588689d597760a3a8291fecc28d
[deliverable/linux.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
60
61 #define RBD_SNAP_HEAD_NAME "-"
62
63 /*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75 * block device image metadata (in-memory version)
76 */
77 struct rbd_image_header {
78 u64 image_size;
79 char *object_prefix;
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u64 snap_seq;
86 u32 total_snaps;
87
88 char *snap_names;
89 u64 *snap_sizes;
90
91 u64 obj_version;
92 };
93
94 struct rbd_options {
95 int notify_timeout;
96 };
97
98 /*
99 * an instance of the client. multiple devices may share an rbd client.
100 */
101 struct rbd_client {
102 struct ceph_client *client;
103 struct rbd_options *rbd_opts;
104 struct kref kref;
105 struct list_head node;
106 };
107
108 /*
109 * a request completion status
110 */
111 struct rbd_req_status {
112 int done;
113 int rc;
114 u64 bytes;
115 };
116
117 /*
118 * a collection of requests
119 */
120 struct rbd_req_coll {
121 int total;
122 int num_done;
123 struct kref kref;
124 struct rbd_req_status status[0];
125 };
126
127 /*
128 * a single io request
129 */
130 struct rbd_request {
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
134 u64 len;
135 int coll_index;
136 struct rbd_req_coll *coll;
137 };
138
139 struct rbd_snap {
140 struct device dev;
141 const char *name;
142 u64 size;
143 struct list_head node;
144 u64 id;
145 };
146
147 /*
148 * a single device
149 */
150 struct rbd_device {
151 int id; /* blkdev unique id */
152
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
156
157 struct rbd_client *rbd_client;
158
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161 spinlock_t lock; /* queue lock */
162
163 struct rbd_image_header header;
164 char *obj; /* rbd image name */
165 size_t obj_len;
166 char *obj_md_name; /* hdr nm. */
167 char *pool_name;
168 int pool_id;
169
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
172
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
175 char *snap_name;
176 u64 snap_id; /* current snapshot id */
177 int read_only;
178
179 struct list_head node;
180
181 /* list of snapshots */
182 struct list_head snaps;
183
184 /* sysfs related */
185 struct device dev;
186 };
187
188 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
189
190 static LIST_HEAD(rbd_dev_list); /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
192
193 static LIST_HEAD(rbd_client_list); /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
195
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199 struct device_attribute *attr,
200 const char *buf,
201 size_t count);
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203 struct rbd_snap *snap);
204
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
206 size_t count);
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
208 size_t count);
209
210 static struct bus_attribute rbd_bus_attrs[] = {
211 __ATTR(add, S_IWUSR, NULL, rbd_add),
212 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
213 __ATTR_NULL
214 };
215
216 static struct bus_type rbd_bus_type = {
217 .name = "rbd",
218 .bus_attrs = rbd_bus_attrs,
219 };
220
221 static void rbd_root_dev_release(struct device *dev)
222 {
223 }
224
225 static struct device rbd_root_dev = {
226 .init_name = "rbd",
227 .release = rbd_root_dev_release,
228 };
229
230
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
232 {
233 return get_device(&rbd_dev->dev);
234 }
235
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
237 {
238 put_device(&rbd_dev->dev);
239 }
240
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
242
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
244 {
245 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
246
247 rbd_get_dev(rbd_dev);
248
249 set_device_ro(bdev, rbd_dev->read_only);
250
251 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252 return -EROFS;
253
254 return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259 struct rbd_device *rbd_dev = disk->private_data;
260
261 rbd_put_dev(rbd_dev);
262
263 return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
268 .open = rbd_open,
269 .release = rbd_release,
270 };
271
272 /*
273 * Initialize an rbd client instance.
274 * We own *opt.
275 */
276 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
277 struct rbd_options *rbd_opts)
278 {
279 struct rbd_client *rbdc;
280 int ret = -ENOMEM;
281
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284 if (!rbdc)
285 goto out_opt;
286
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
289
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
292 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
293 if (IS_ERR(rbdc->client))
294 goto out_mutex;
295 opt = NULL; /* Now rbdc->client is responsible for opt */
296
297 ret = ceph_open_session(rbdc->client);
298 if (ret < 0)
299 goto out_err;
300
301 rbdc->rbd_opts = rbd_opts;
302
303 spin_lock(&rbd_client_list_lock);
304 list_add_tail(&rbdc->node, &rbd_client_list);
305 spin_unlock(&rbd_client_list_lock);
306
307 mutex_unlock(&ctl_mutex);
308
309 dout("rbd_client_create created %p\n", rbdc);
310 return rbdc;
311
312 out_err:
313 ceph_destroy_client(rbdc->client);
314 out_mutex:
315 mutex_unlock(&ctl_mutex);
316 kfree(rbdc);
317 out_opt:
318 if (opt)
319 ceph_destroy_options(opt);
320 return ERR_PTR(ret);
321 }
322
323 /*
324 * Find a ceph client with specific addr and configuration.
325 */
326 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
327 {
328 struct rbd_client *client_node;
329
330 if (opt->flags & CEPH_OPT_NOSHARE)
331 return NULL;
332
333 list_for_each_entry(client_node, &rbd_client_list, node)
334 if (ceph_compare_options(opt, client_node->client) == 0)
335 return client_node;
336 return NULL;
337 }
338
339 /*
340 * mount options
341 */
342 enum {
343 Opt_notify_timeout,
344 Opt_last_int,
345 /* int args above */
346 Opt_last_string,
347 /* string args above */
348 };
349
350 static match_table_t rbdopt_tokens = {
351 {Opt_notify_timeout, "notify_timeout=%d"},
352 /* int args above */
353 /* string args above */
354 {-1, NULL}
355 };
356
357 static int parse_rbd_opts_token(char *c, void *private)
358 {
359 struct rbd_options *rbdopt = private;
360 substring_t argstr[MAX_OPT_ARGS];
361 int token, intval, ret;
362
363 token = match_token(c, rbdopt_tokens, argstr);
364 if (token < 0)
365 return -EINVAL;
366
367 if (token < Opt_last_int) {
368 ret = match_int(&argstr[0], &intval);
369 if (ret < 0) {
370 pr_err("bad mount option arg (not int) "
371 "at '%s'\n", c);
372 return ret;
373 }
374 dout("got int token %d val %d\n", token, intval);
375 } else if (token > Opt_last_int && token < Opt_last_string) {
376 dout("got string token %d val %s\n", token,
377 argstr[0].from);
378 } else {
379 dout("got token %d\n", token);
380 }
381
382 switch (token) {
383 case Opt_notify_timeout:
384 rbdopt->notify_timeout = intval;
385 break;
386 default:
387 BUG_ON(token);
388 }
389 return 0;
390 }
391
392 /*
393 * Get a ceph client with specific addr and configuration, if one does
394 * not exist create it.
395 */
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
397 size_t mon_addr_len,
398 char *options)
399 {
400 struct rbd_client *rbdc;
401 struct ceph_options *opt;
402 struct rbd_options *rbd_opts;
403
404 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
405 if (!rbd_opts)
406 return ERR_PTR(-ENOMEM);
407
408 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
409
410 opt = ceph_parse_options(options, mon_addr,
411 mon_addr + mon_addr_len,
412 parse_rbd_opts_token, rbd_opts);
413 if (IS_ERR(opt)) {
414 kfree(rbd_opts);
415 return ERR_CAST(opt);
416 }
417
418 spin_lock(&rbd_client_list_lock);
419 rbdc = __rbd_client_find(opt);
420 if (rbdc) {
421 /* using an existing client */
422 kref_get(&rbdc->kref);
423 spin_unlock(&rbd_client_list_lock);
424
425 ceph_destroy_options(opt);
426 kfree(rbd_opts);
427
428 return rbdc;
429 }
430 spin_unlock(&rbd_client_list_lock);
431
432 rbdc = rbd_client_create(opt, rbd_opts);
433
434 if (IS_ERR(rbdc))
435 kfree(rbd_opts);
436
437 return rbdc;
438 }
439
440 /*
441 * Destroy ceph client
442 *
443 * Caller must hold rbd_client_list_lock.
444 */
445 static void rbd_client_release(struct kref *kref)
446 {
447 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
448
449 dout("rbd_release_client %p\n", rbdc);
450 spin_lock(&rbd_client_list_lock);
451 list_del(&rbdc->node);
452 spin_unlock(&rbd_client_list_lock);
453
454 ceph_destroy_client(rbdc->client);
455 kfree(rbdc->rbd_opts);
456 kfree(rbdc);
457 }
458
459 /*
460 * Drop reference to ceph client node. If it's not referenced anymore, release
461 * it.
462 */
463 static void rbd_put_client(struct rbd_device *rbd_dev)
464 {
465 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466 rbd_dev->rbd_client = NULL;
467 }
468
469 /*
470 * Destroy requests collection
471 */
472 static void rbd_coll_release(struct kref *kref)
473 {
474 struct rbd_req_coll *coll =
475 container_of(kref, struct rbd_req_coll, kref);
476
477 dout("rbd_coll_release %p\n", coll);
478 kfree(coll);
479 }
480
481 /*
482 * Create a new header structure, translate header format from the on-disk
483 * header.
484 */
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486 struct rbd_image_header_ondisk *ondisk,
487 u32 allocated_snaps,
488 gfp_t gfp_flags)
489 {
490 u32 i, snap_count;
491
492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
493 return -ENXIO;
494
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
497 / sizeof (*ondisk))
498 return -EINVAL;
499 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500 snap_count * sizeof(u64),
501 gfp_flags);
502 if (!header->snapc)
503 return -ENOMEM;
504
505 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
506 if (snap_count) {
507 header->snap_names = kmalloc(header->snap_names_len,
508 gfp_flags);
509 if (!header->snap_names)
510 goto err_snapc;
511 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
512 gfp_flags);
513 if (!header->snap_sizes)
514 goto err_names;
515 } else {
516 header->snap_names = NULL;
517 header->snap_sizes = NULL;
518 }
519
520 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
521 gfp_flags);
522 if (!header->object_prefix)
523 goto err_sizes;
524
525 memcpy(header->object_prefix, ondisk->block_name,
526 sizeof(ondisk->block_name));
527 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
528
529 header->image_size = le64_to_cpu(ondisk->image_size);
530 header->obj_order = ondisk->options.order;
531 header->crypt_type = ondisk->options.crypt_type;
532 header->comp_type = ondisk->options.comp_type;
533
534 atomic_set(&header->snapc->nref, 1);
535 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536 header->snapc->num_snaps = snap_count;
537 header->total_snaps = snap_count;
538
539 if (snap_count && allocated_snaps == snap_count) {
540 for (i = 0; i < snap_count; i++) {
541 header->snapc->snaps[i] =
542 le64_to_cpu(ondisk->snaps[i].id);
543 header->snap_sizes[i] =
544 le64_to_cpu(ondisk->snaps[i].image_size);
545 }
546
547 /* copy snapshot names */
548 memcpy(header->snap_names, &ondisk->snaps[i],
549 header->snap_names_len);
550 }
551
552 return 0;
553
554 err_sizes:
555 kfree(header->snap_sizes);
556 err_names:
557 kfree(header->snap_names);
558 err_snapc:
559 kfree(header->snapc);
560 return -ENOMEM;
561 }
562
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
564 u64 *seq, u64 *size)
565 {
566 int i;
567 char *p = header->snap_names;
568
569 for (i = 0; i < header->total_snaps; i++) {
570 if (!strcmp(snap_name, p)) {
571
572 /* Found it. Pass back its id and/or size */
573
574 if (seq)
575 *seq = header->snapc->snaps[i];
576 if (size)
577 *size = header->snap_sizes[i];
578 return i;
579 }
580 p += strlen(p) + 1; /* Skip ahead to the next name */
581 }
582 return -ENOENT;
583 }
584
585 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
586 {
587 struct rbd_image_header *header = &rbd_dev->header;
588 struct ceph_snap_context *snapc = header->snapc;
589 int ret = -ENOENT;
590
591 down_write(&rbd_dev->header_rwsem);
592
593 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
594 sizeof (RBD_SNAP_HEAD_NAME))) {
595 if (header->total_snaps)
596 snapc->seq = header->snap_seq;
597 else
598 snapc->seq = 0;
599 rbd_dev->snap_id = CEPH_NOSNAP;
600 rbd_dev->read_only = 0;
601 if (size)
602 *size = header->image_size;
603 } else {
604 ret = snap_by_name(header, rbd_dev->snap_name,
605 &snapc->seq, size);
606 if (ret < 0)
607 goto done;
608 rbd_dev->snap_id = snapc->seq;
609 rbd_dev->read_only = 1;
610 }
611
612 ret = 0;
613 done:
614 up_write(&rbd_dev->header_rwsem);
615 return ret;
616 }
617
618 static void rbd_header_free(struct rbd_image_header *header)
619 {
620 kfree(header->object_prefix);
621 kfree(header->snap_sizes);
622 kfree(header->snap_names);
623 kfree(header->snapc);
624 }
625
626 /*
627 * get the actual striped segment name, offset and length
628 */
629 static u64 rbd_get_segment(struct rbd_image_header *header,
630 const char *object_prefix,
631 u64 ofs, u64 len,
632 char *seg_name, u64 *segofs)
633 {
634 u64 seg = ofs >> header->obj_order;
635
636 if (seg_name)
637 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
638 "%s.%012llx", object_prefix, seg);
639
640 ofs = ofs & ((1 << header->obj_order) - 1);
641 len = min_t(u64, len, (1 << header->obj_order) - ofs);
642
643 if (segofs)
644 *segofs = ofs;
645
646 return len;
647 }
648
649 static int rbd_get_num_segments(struct rbd_image_header *header,
650 u64 ofs, u64 len)
651 {
652 u64 start_seg = ofs >> header->obj_order;
653 u64 end_seg = (ofs + len - 1) >> header->obj_order;
654 return end_seg - start_seg + 1;
655 }
656
657 /*
658 * returns the size of an object in the image
659 */
660 static u64 rbd_obj_bytes(struct rbd_image_header *header)
661 {
662 return 1 << header->obj_order;
663 }
664
665 /*
666 * bio helpers
667 */
668
669 static void bio_chain_put(struct bio *chain)
670 {
671 struct bio *tmp;
672
673 while (chain) {
674 tmp = chain;
675 chain = chain->bi_next;
676 bio_put(tmp);
677 }
678 }
679
680 /*
681 * zeros a bio chain, starting at specific offset
682 */
683 static void zero_bio_chain(struct bio *chain, int start_ofs)
684 {
685 struct bio_vec *bv;
686 unsigned long flags;
687 void *buf;
688 int i;
689 int pos = 0;
690
691 while (chain) {
692 bio_for_each_segment(bv, chain, i) {
693 if (pos + bv->bv_len > start_ofs) {
694 int remainder = max(start_ofs - pos, 0);
695 buf = bvec_kmap_irq(bv, &flags);
696 memset(buf + remainder, 0,
697 bv->bv_len - remainder);
698 bvec_kunmap_irq(buf, &flags);
699 }
700 pos += bv->bv_len;
701 }
702
703 chain = chain->bi_next;
704 }
705 }
706
707 /*
708 * bio_chain_clone - clone a chain of bios up to a certain length.
709 * might return a bio_pair that will need to be released.
710 */
711 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
712 struct bio_pair **bp,
713 int len, gfp_t gfpmask)
714 {
715 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
716 int total = 0;
717
718 if (*bp) {
719 bio_pair_release(*bp);
720 *bp = NULL;
721 }
722
723 while (old_chain && (total < len)) {
724 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
725 if (!tmp)
726 goto err_out;
727
728 if (total + old_chain->bi_size > len) {
729 struct bio_pair *bp;
730
731 /*
732 * this split can only happen with a single paged bio,
733 * split_bio will BUG_ON if this is not the case
734 */
735 dout("bio_chain_clone split! total=%d remaining=%d"
736 "bi_size=%d\n",
737 (int)total, (int)len-total,
738 (int)old_chain->bi_size);
739
740 /* split the bio. We'll release it either in the next
741 call, or it will have to be released outside */
742 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
743 if (!bp)
744 goto err_out;
745
746 __bio_clone(tmp, &bp->bio1);
747
748 *next = &bp->bio2;
749 } else {
750 __bio_clone(tmp, old_chain);
751 *next = old_chain->bi_next;
752 }
753
754 tmp->bi_bdev = NULL;
755 gfpmask &= ~__GFP_WAIT;
756 tmp->bi_next = NULL;
757
758 if (!new_chain) {
759 new_chain = tail = tmp;
760 } else {
761 tail->bi_next = tmp;
762 tail = tmp;
763 }
764 old_chain = old_chain->bi_next;
765
766 total += tmp->bi_size;
767 }
768
769 BUG_ON(total < len);
770
771 if (tail)
772 tail->bi_next = NULL;
773
774 *old = old_chain;
775
776 return new_chain;
777
778 err_out:
779 dout("bio_chain_clone with err\n");
780 bio_chain_put(new_chain);
781 return NULL;
782 }
783
784 /*
785 * helpers for osd request op vectors.
786 */
787 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
788 int num_ops,
789 int opcode,
790 u32 payload_len)
791 {
792 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
793 GFP_NOIO);
794 if (!*ops)
795 return -ENOMEM;
796 (*ops)[0].op = opcode;
797 /*
798 * op extent offset and length will be set later on
799 * in calc_raw_layout()
800 */
801 (*ops)[0].payload_len = payload_len;
802 return 0;
803 }
804
805 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
806 {
807 kfree(ops);
808 }
809
810 static void rbd_coll_end_req_index(struct request *rq,
811 struct rbd_req_coll *coll,
812 int index,
813 int ret, u64 len)
814 {
815 struct request_queue *q;
816 int min, max, i;
817
818 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
819 coll, index, ret, len);
820
821 if (!rq)
822 return;
823
824 if (!coll) {
825 blk_end_request(rq, ret, len);
826 return;
827 }
828
829 q = rq->q;
830
831 spin_lock_irq(q->queue_lock);
832 coll->status[index].done = 1;
833 coll->status[index].rc = ret;
834 coll->status[index].bytes = len;
835 max = min = coll->num_done;
836 while (max < coll->total && coll->status[max].done)
837 max++;
838
839 for (i = min; i<max; i++) {
840 __blk_end_request(rq, coll->status[i].rc,
841 coll->status[i].bytes);
842 coll->num_done++;
843 kref_put(&coll->kref, rbd_coll_release);
844 }
845 spin_unlock_irq(q->queue_lock);
846 }
847
848 static void rbd_coll_end_req(struct rbd_request *req,
849 int ret, u64 len)
850 {
851 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
852 }
853
854 /*
855 * Send ceph osd request
856 */
857 static int rbd_do_request(struct request *rq,
858 struct rbd_device *rbd_dev,
859 struct ceph_snap_context *snapc,
860 u64 snapid,
861 const char *obj, u64 ofs, u64 len,
862 struct bio *bio,
863 struct page **pages,
864 int num_pages,
865 int flags,
866 struct ceph_osd_req_op *ops,
867 int num_reply,
868 struct rbd_req_coll *coll,
869 int coll_index,
870 void (*rbd_cb)(struct ceph_osd_request *req,
871 struct ceph_msg *msg),
872 struct ceph_osd_request **linger_req,
873 u64 *ver)
874 {
875 struct ceph_osd_request *req;
876 struct ceph_file_layout *layout;
877 int ret;
878 u64 bno;
879 struct timespec mtime = CURRENT_TIME;
880 struct rbd_request *req_data;
881 struct ceph_osd_request_head *reqhead;
882 struct ceph_osd_client *osdc;
883
884 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
885 if (!req_data) {
886 if (coll)
887 rbd_coll_end_req_index(rq, coll, coll_index,
888 -ENOMEM, len);
889 return -ENOMEM;
890 }
891
892 if (coll) {
893 req_data->coll = coll;
894 req_data->coll_index = coll_index;
895 }
896
897 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
898
899 down_read(&rbd_dev->header_rwsem);
900
901 osdc = &rbd_dev->rbd_client->client->osdc;
902 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
903 false, GFP_NOIO, pages, bio);
904 if (!req) {
905 up_read(&rbd_dev->header_rwsem);
906 ret = -ENOMEM;
907 goto done_pages;
908 }
909
910 req->r_callback = rbd_cb;
911
912 req_data->rq = rq;
913 req_data->bio = bio;
914 req_data->pages = pages;
915 req_data->len = len;
916
917 req->r_priv = req_data;
918
919 reqhead = req->r_request->front.iov_base;
920 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
921
922 strncpy(req->r_oid, obj, sizeof(req->r_oid));
923 req->r_oid_len = strlen(req->r_oid);
924
925 layout = &req->r_file_layout;
926 memset(layout, 0, sizeof(*layout));
927 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
928 layout->fl_stripe_count = cpu_to_le32(1);
929 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
930 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
931 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
932 req, ops);
933
934 ceph_osdc_build_request(req, ofs, &len,
935 ops,
936 snapc,
937 &mtime,
938 req->r_oid, req->r_oid_len);
939 up_read(&rbd_dev->header_rwsem);
940
941 if (linger_req) {
942 ceph_osdc_set_request_linger(osdc, req);
943 *linger_req = req;
944 }
945
946 ret = ceph_osdc_start_request(osdc, req, false);
947 if (ret < 0)
948 goto done_err;
949
950 if (!rbd_cb) {
951 ret = ceph_osdc_wait_request(osdc, req);
952 if (ver)
953 *ver = le64_to_cpu(req->r_reassert_version.version);
954 dout("reassert_ver=%lld\n",
955 le64_to_cpu(req->r_reassert_version.version));
956 ceph_osdc_put_request(req);
957 }
958 return ret;
959
960 done_err:
961 bio_chain_put(req_data->bio);
962 ceph_osdc_put_request(req);
963 done_pages:
964 rbd_coll_end_req(req_data, ret, len);
965 kfree(req_data);
966 return ret;
967 }
968
969 /*
970 * Ceph osd op callback
971 */
972 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
973 {
974 struct rbd_request *req_data = req->r_priv;
975 struct ceph_osd_reply_head *replyhead;
976 struct ceph_osd_op *op;
977 __s32 rc;
978 u64 bytes;
979 int read_op;
980
981 /* parse reply */
982 replyhead = msg->front.iov_base;
983 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
984 op = (void *)(replyhead + 1);
985 rc = le32_to_cpu(replyhead->result);
986 bytes = le64_to_cpu(op->extent.length);
987 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
988
989 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
990
991 if (rc == -ENOENT && read_op) {
992 zero_bio_chain(req_data->bio, 0);
993 rc = 0;
994 } else if (rc == 0 && read_op && bytes < req_data->len) {
995 zero_bio_chain(req_data->bio, bytes);
996 bytes = req_data->len;
997 }
998
999 rbd_coll_end_req(req_data, rc, bytes);
1000
1001 if (req_data->bio)
1002 bio_chain_put(req_data->bio);
1003
1004 ceph_osdc_put_request(req);
1005 kfree(req_data);
1006 }
1007
1008 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1009 {
1010 ceph_osdc_put_request(req);
1011 }
1012
1013 /*
1014 * Do a synchronous ceph osd operation
1015 */
1016 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1017 struct ceph_snap_context *snapc,
1018 u64 snapid,
1019 int opcode,
1020 int flags,
1021 struct ceph_osd_req_op *orig_ops,
1022 int num_reply,
1023 const char *obj,
1024 u64 ofs, u64 len,
1025 char *buf,
1026 struct ceph_osd_request **linger_req,
1027 u64 *ver)
1028 {
1029 int ret;
1030 struct page **pages;
1031 int num_pages;
1032 struct ceph_osd_req_op *ops = orig_ops;
1033 u32 payload_len;
1034
1035 num_pages = calc_pages_for(ofs , len);
1036 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1037 if (IS_ERR(pages))
1038 return PTR_ERR(pages);
1039
1040 if (!orig_ops) {
1041 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1042 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1043 if (ret < 0)
1044 goto done;
1045
1046 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1047 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1048 if (ret < 0)
1049 goto done_ops;
1050 }
1051 }
1052
1053 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1054 obj, ofs, len, NULL,
1055 pages, num_pages,
1056 flags,
1057 ops,
1058 2,
1059 NULL, 0,
1060 NULL,
1061 linger_req, ver);
1062 if (ret < 0)
1063 goto done_ops;
1064
1065 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1066 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1067
1068 done_ops:
1069 if (!orig_ops)
1070 rbd_destroy_ops(ops);
1071 done:
1072 ceph_release_page_vector(pages, num_pages);
1073 return ret;
1074 }
1075
1076 /*
1077 * Do an asynchronous ceph osd operation
1078 */
1079 static int rbd_do_op(struct request *rq,
1080 struct rbd_device *rbd_dev,
1081 struct ceph_snap_context *snapc,
1082 u64 snapid,
1083 int opcode, int flags, int num_reply,
1084 u64 ofs, u64 len,
1085 struct bio *bio,
1086 struct rbd_req_coll *coll,
1087 int coll_index)
1088 {
1089 char *seg_name;
1090 u64 seg_ofs;
1091 u64 seg_len;
1092 int ret;
1093 struct ceph_osd_req_op *ops;
1094 u32 payload_len;
1095
1096 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1097 if (!seg_name)
1098 return -ENOMEM;
1099
1100 seg_len = rbd_get_segment(&rbd_dev->header,
1101 rbd_dev->header.object_prefix,
1102 ofs, len,
1103 seg_name, &seg_ofs);
1104
1105 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1106
1107 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1108 if (ret < 0)
1109 goto done;
1110
1111 /* we've taken care of segment sizes earlier when we
1112 cloned the bios. We should never have a segment
1113 truncated at this point */
1114 BUG_ON(seg_len < len);
1115
1116 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1117 seg_name, seg_ofs, seg_len,
1118 bio,
1119 NULL, 0,
1120 flags,
1121 ops,
1122 num_reply,
1123 coll, coll_index,
1124 rbd_req_cb, 0, NULL);
1125
1126 rbd_destroy_ops(ops);
1127 done:
1128 kfree(seg_name);
1129 return ret;
1130 }
1131
1132 /*
1133 * Request async osd write
1134 */
1135 static int rbd_req_write(struct request *rq,
1136 struct rbd_device *rbd_dev,
1137 struct ceph_snap_context *snapc,
1138 u64 ofs, u64 len,
1139 struct bio *bio,
1140 struct rbd_req_coll *coll,
1141 int coll_index)
1142 {
1143 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1144 CEPH_OSD_OP_WRITE,
1145 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1146 2,
1147 ofs, len, bio, coll, coll_index);
1148 }
1149
1150 /*
1151 * Request async osd read
1152 */
1153 static int rbd_req_read(struct request *rq,
1154 struct rbd_device *rbd_dev,
1155 u64 snapid,
1156 u64 ofs, u64 len,
1157 struct bio *bio,
1158 struct rbd_req_coll *coll,
1159 int coll_index)
1160 {
1161 return rbd_do_op(rq, rbd_dev, NULL,
1162 snapid,
1163 CEPH_OSD_OP_READ,
1164 CEPH_OSD_FLAG_READ,
1165 2,
1166 ofs, len, bio, coll, coll_index);
1167 }
1168
1169 /*
1170 * Request sync osd read
1171 */
1172 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1173 struct ceph_snap_context *snapc,
1174 u64 snapid,
1175 const char *obj,
1176 u64 ofs, u64 len,
1177 char *buf,
1178 u64 *ver)
1179 {
1180 return rbd_req_sync_op(rbd_dev, NULL,
1181 snapid,
1182 CEPH_OSD_OP_READ,
1183 CEPH_OSD_FLAG_READ,
1184 NULL,
1185 1, obj, ofs, len, buf, NULL, ver);
1186 }
1187
1188 /*
1189 * Request sync osd watch
1190 */
1191 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1192 u64 ver,
1193 u64 notify_id,
1194 const char *obj)
1195 {
1196 struct ceph_osd_req_op *ops;
1197 int ret;
1198
1199 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1200 if (ret < 0)
1201 return ret;
1202
1203 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1204 ops[0].watch.cookie = notify_id;
1205 ops[0].watch.flag = 0;
1206
1207 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1208 obj, 0, 0, NULL,
1209 NULL, 0,
1210 CEPH_OSD_FLAG_READ,
1211 ops,
1212 1,
1213 NULL, 0,
1214 rbd_simple_req_cb, 0, NULL);
1215
1216 rbd_destroy_ops(ops);
1217 return ret;
1218 }
1219
1220 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1221 {
1222 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1223 int rc;
1224
1225 if (!rbd_dev)
1226 return;
1227
1228 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", rbd_dev->obj_md_name,
1229 notify_id, (int)opcode);
1230 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231 rc = __rbd_refresh_header(rbd_dev);
1232 mutex_unlock(&ctl_mutex);
1233 if (rc)
1234 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1235 " update snaps: %d\n", rbd_dev->major, rc);
1236
1237 rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->obj_md_name);
1238 }
1239
1240 /*
1241 * Request sync osd watch
1242 */
1243 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1244 const char *obj,
1245 u64 ver)
1246 {
1247 struct ceph_osd_req_op *ops;
1248 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1249
1250 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1251 if (ret < 0)
1252 return ret;
1253
1254 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255 (void *)rbd_dev, &rbd_dev->watch_event);
1256 if (ret < 0)
1257 goto fail;
1258
1259 ops[0].watch.ver = cpu_to_le64(ver);
1260 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261 ops[0].watch.flag = 1;
1262
1263 ret = rbd_req_sync_op(rbd_dev, NULL,
1264 CEPH_NOSNAP,
1265 0,
1266 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1267 ops,
1268 1, obj, 0, 0, NULL,
1269 &rbd_dev->watch_request, NULL);
1270
1271 if (ret < 0)
1272 goto fail_event;
1273
1274 rbd_destroy_ops(ops);
1275 return 0;
1276
1277 fail_event:
1278 ceph_osdc_cancel_event(rbd_dev->watch_event);
1279 rbd_dev->watch_event = NULL;
1280 fail:
1281 rbd_destroy_ops(ops);
1282 return ret;
1283 }
1284
1285 /*
1286 * Request sync osd unwatch
1287 */
1288 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1289 const char *obj)
1290 {
1291 struct ceph_osd_req_op *ops;
1292
1293 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1294 if (ret < 0)
1295 return ret;
1296
1297 ops[0].watch.ver = 0;
1298 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1299 ops[0].watch.flag = 0;
1300
1301 ret = rbd_req_sync_op(rbd_dev, NULL,
1302 CEPH_NOSNAP,
1303 0,
1304 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1305 ops,
1306 1, obj, 0, 0, NULL, NULL, NULL);
1307
1308 rbd_destroy_ops(ops);
1309 ceph_osdc_cancel_event(rbd_dev->watch_event);
1310 rbd_dev->watch_event = NULL;
1311 return ret;
1312 }
1313
1314 struct rbd_notify_info {
1315 struct rbd_device *rbd_dev;
1316 };
1317
1318 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1319 {
1320 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1321 if (!rbd_dev)
1322 return;
1323
1324 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1325 rbd_dev->obj_md_name,
1326 notify_id, (int)opcode);
1327 }
1328
1329 /*
1330 * Request sync osd notify
1331 */
1332 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1333 const char *obj)
1334 {
1335 struct ceph_osd_req_op *ops;
1336 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1337 struct ceph_osd_event *event;
1338 struct rbd_notify_info info;
1339 int payload_len = sizeof(u32) + sizeof(u32);
1340 int ret;
1341
1342 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1343 if (ret < 0)
1344 return ret;
1345
1346 info.rbd_dev = rbd_dev;
1347
1348 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1349 (void *)&info, &event);
1350 if (ret < 0)
1351 goto fail;
1352
1353 ops[0].watch.ver = 1;
1354 ops[0].watch.flag = 1;
1355 ops[0].watch.cookie = event->cookie;
1356 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1357 ops[0].watch.timeout = 12;
1358
1359 ret = rbd_req_sync_op(rbd_dev, NULL,
1360 CEPH_NOSNAP,
1361 0,
1362 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363 ops,
1364 1, obj, 0, 0, NULL, NULL, NULL);
1365 if (ret < 0)
1366 goto fail_event;
1367
1368 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1369 dout("ceph_osdc_wait_event returned %d\n", ret);
1370 rbd_destroy_ops(ops);
1371 return 0;
1372
1373 fail_event:
1374 ceph_osdc_cancel_event(event);
1375 fail:
1376 rbd_destroy_ops(ops);
1377 return ret;
1378 }
1379
1380 /*
1381 * Request sync osd read
1382 */
1383 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1384 const char *obj,
1385 const char *cls,
1386 const char *method,
1387 const char *data,
1388 int len,
1389 u64 *ver)
1390 {
1391 struct ceph_osd_req_op *ops;
1392 int cls_len = strlen(cls);
1393 int method_len = strlen(method);
1394 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1395 cls_len + method_len + len);
1396 if (ret < 0)
1397 return ret;
1398
1399 ops[0].cls.class_name = cls;
1400 ops[0].cls.class_len = (__u8)cls_len;
1401 ops[0].cls.method_name = method;
1402 ops[0].cls.method_len = (__u8)method_len;
1403 ops[0].cls.argc = 0;
1404 ops[0].cls.indata = data;
1405 ops[0].cls.indata_len = len;
1406
1407 ret = rbd_req_sync_op(rbd_dev, NULL,
1408 CEPH_NOSNAP,
1409 0,
1410 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411 ops,
1412 1, obj, 0, 0, NULL, NULL, ver);
1413
1414 rbd_destroy_ops(ops);
1415
1416 dout("cls_exec returned %d\n", ret);
1417 return ret;
1418 }
1419
1420 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1421 {
1422 struct rbd_req_coll *coll =
1423 kzalloc(sizeof(struct rbd_req_coll) +
1424 sizeof(struct rbd_req_status) * num_reqs,
1425 GFP_ATOMIC);
1426
1427 if (!coll)
1428 return NULL;
1429 coll->total = num_reqs;
1430 kref_init(&coll->kref);
1431 return coll;
1432 }
1433
1434 /*
1435 * block device queue callback
1436 */
1437 static void rbd_rq_fn(struct request_queue *q)
1438 {
1439 struct rbd_device *rbd_dev = q->queuedata;
1440 struct request *rq;
1441 struct bio_pair *bp = NULL;
1442
1443 while ((rq = blk_fetch_request(q))) {
1444 struct bio *bio;
1445 struct bio *rq_bio, *next_bio = NULL;
1446 bool do_write;
1447 int size, op_size = 0;
1448 u64 ofs;
1449 int num_segs, cur_seg = 0;
1450 struct rbd_req_coll *coll;
1451
1452 /* peek at request from block layer */
1453 if (!rq)
1454 break;
1455
1456 dout("fetched request\n");
1457
1458 /* filter out block requests we don't understand */
1459 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460 __blk_end_request_all(rq, 0);
1461 continue;
1462 }
1463
1464 /* deduce our operation (read, write) */
1465 do_write = (rq_data_dir(rq) == WRITE);
1466
1467 size = blk_rq_bytes(rq);
1468 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1469 rq_bio = rq->bio;
1470 if (do_write && rbd_dev->read_only) {
1471 __blk_end_request_all(rq, -EROFS);
1472 continue;
1473 }
1474
1475 spin_unlock_irq(q->queue_lock);
1476
1477 dout("%s 0x%x bytes at 0x%llx\n",
1478 do_write ? "write" : "read",
1479 size, blk_rq_pos(rq) * SECTOR_SIZE);
1480
1481 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1482 coll = rbd_alloc_coll(num_segs);
1483 if (!coll) {
1484 spin_lock_irq(q->queue_lock);
1485 __blk_end_request_all(rq, -ENOMEM);
1486 continue;
1487 }
1488
1489 do {
1490 /* a bio clone to be passed down to OSD req */
1491 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1492 op_size = rbd_get_segment(&rbd_dev->header,
1493 rbd_dev->header.object_prefix,
1494 ofs, size,
1495 NULL, NULL);
1496 kref_get(&coll->kref);
1497 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1498 op_size, GFP_ATOMIC);
1499 if (!bio) {
1500 rbd_coll_end_req_index(rq, coll, cur_seg,
1501 -ENOMEM, op_size);
1502 goto next_seg;
1503 }
1504
1505
1506 /* init OSD command: write or read */
1507 if (do_write)
1508 rbd_req_write(rq, rbd_dev,
1509 rbd_dev->header.snapc,
1510 ofs,
1511 op_size, bio,
1512 coll, cur_seg);
1513 else
1514 rbd_req_read(rq, rbd_dev,
1515 rbd_dev->snap_id,
1516 ofs,
1517 op_size, bio,
1518 coll, cur_seg);
1519
1520 next_seg:
1521 size -= op_size;
1522 ofs += op_size;
1523
1524 cur_seg++;
1525 rq_bio = next_bio;
1526 } while (size > 0);
1527 kref_put(&coll->kref, rbd_coll_release);
1528
1529 if (bp)
1530 bio_pair_release(bp);
1531 spin_lock_irq(q->queue_lock);
1532 }
1533 }
1534
1535 /*
1536 * a queue callback. Makes sure that we don't create a bio that spans across
1537 * multiple osd objects. One exception would be with a single page bios,
1538 * which we handle later at bio_chain_clone
1539 */
1540 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1541 struct bio_vec *bvec)
1542 {
1543 struct rbd_device *rbd_dev = q->queuedata;
1544 unsigned int chunk_sectors;
1545 sector_t sector;
1546 unsigned int bio_sectors;
1547 int max;
1548
1549 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1550 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1551 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1552
1553 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1554 + bio_sectors)) << SECTOR_SHIFT;
1555 if (max < 0)
1556 max = 0; /* bio_add cannot handle a negative return */
1557 if (max <= bvec->bv_len && bio_sectors == 0)
1558 return bvec->bv_len;
1559 return max;
1560 }
1561
1562 static void rbd_free_disk(struct rbd_device *rbd_dev)
1563 {
1564 struct gendisk *disk = rbd_dev->disk;
1565
1566 if (!disk)
1567 return;
1568
1569 rbd_header_free(&rbd_dev->header);
1570
1571 if (disk->flags & GENHD_FL_UP)
1572 del_gendisk(disk);
1573 if (disk->queue)
1574 blk_cleanup_queue(disk->queue);
1575 put_disk(disk);
1576 }
1577
1578 /*
1579 * reload the ondisk the header
1580 */
1581 static int rbd_read_header(struct rbd_device *rbd_dev,
1582 struct rbd_image_header *header)
1583 {
1584 ssize_t rc;
1585 struct rbd_image_header_ondisk *dh;
1586 u32 snap_count = 0;
1587 u64 ver;
1588 size_t len;
1589
1590 /*
1591 * First reads the fixed-size header to determine the number
1592 * of snapshots, then re-reads it, along with all snapshot
1593 * records as well as their stored names.
1594 */
1595 len = sizeof (*dh);
1596 while (1) {
1597 dh = kmalloc(len, GFP_KERNEL);
1598 if (!dh)
1599 return -ENOMEM;
1600
1601 rc = rbd_req_sync_read(rbd_dev,
1602 NULL, CEPH_NOSNAP,
1603 rbd_dev->obj_md_name,
1604 0, len,
1605 (char *)dh, &ver);
1606 if (rc < 0)
1607 goto out_dh;
1608
1609 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1610 if (rc < 0) {
1611 if (rc == -ENXIO)
1612 pr_warning("unrecognized header format"
1613 " for image %s", rbd_dev->obj);
1614 goto out_dh;
1615 }
1616
1617 if (snap_count == header->total_snaps)
1618 break;
1619
1620 snap_count = header->total_snaps;
1621 len = sizeof (*dh) +
1622 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1623 header->snap_names_len;
1624
1625 rbd_header_free(header);
1626 kfree(dh);
1627 }
1628 header->obj_version = ver;
1629
1630 out_dh:
1631 kfree(dh);
1632 return rc;
1633 }
1634
1635 /*
1636 * create a snapshot
1637 */
1638 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1639 const char *snap_name,
1640 gfp_t gfp_flags)
1641 {
1642 int name_len = strlen(snap_name);
1643 u64 new_snapid;
1644 int ret;
1645 void *data, *p, *e;
1646 u64 ver;
1647 struct ceph_mon_client *monc;
1648
1649 /* we should create a snapshot only if we're pointing at the head */
1650 if (rbd_dev->snap_id != CEPH_NOSNAP)
1651 return -EINVAL;
1652
1653 monc = &rbd_dev->rbd_client->client->monc;
1654 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1655 dout("created snapid=%lld\n", new_snapid);
1656 if (ret < 0)
1657 return ret;
1658
1659 data = kmalloc(name_len + 16, gfp_flags);
1660 if (!data)
1661 return -ENOMEM;
1662
1663 p = data;
1664 e = data + name_len + 16;
1665
1666 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1667 ceph_encode_64_safe(&p, e, new_snapid, bad);
1668
1669 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->obj_md_name,
1670 "rbd", "snap_add",
1671 data, p - data, &ver);
1672
1673 kfree(data);
1674
1675 if (ret < 0)
1676 return ret;
1677
1678 down_write(&rbd_dev->header_rwsem);
1679 rbd_dev->header.snapc->seq = new_snapid;
1680 up_write(&rbd_dev->header_rwsem);
1681
1682 return 0;
1683 bad:
1684 return -ERANGE;
1685 }
1686
1687 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1688 {
1689 struct rbd_snap *snap;
1690
1691 while (!list_empty(&rbd_dev->snaps)) {
1692 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1693 __rbd_remove_snap_dev(rbd_dev, snap);
1694 }
1695 }
1696
1697 /*
1698 * only read the first part of the ondisk header, without the snaps info
1699 */
1700 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1701 {
1702 int ret;
1703 struct rbd_image_header h;
1704 u64 snap_seq;
1705 int follow_seq = 0;
1706
1707 ret = rbd_read_header(rbd_dev, &h);
1708 if (ret < 0)
1709 return ret;
1710
1711 /* resized? */
1712 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1713
1714 down_write(&rbd_dev->header_rwsem);
1715
1716 snap_seq = rbd_dev->header.snapc->seq;
1717 if (rbd_dev->header.total_snaps &&
1718 rbd_dev->header.snapc->snaps[0] == snap_seq)
1719 /* pointing at the head, will need to follow that
1720 if head moves */
1721 follow_seq = 1;
1722
1723 /* rbd_dev->header.object_prefix shouldn't change */
1724 kfree(rbd_dev->header.snap_sizes);
1725 kfree(rbd_dev->header.snap_names);
1726 kfree(rbd_dev->header.snapc);
1727
1728 rbd_dev->header.total_snaps = h.total_snaps;
1729 rbd_dev->header.snapc = h.snapc;
1730 rbd_dev->header.snap_names = h.snap_names;
1731 rbd_dev->header.snap_names_len = h.snap_names_len;
1732 rbd_dev->header.snap_sizes = h.snap_sizes;
1733 /* Free the extra copy of the object prefix */
1734 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1735 kfree(h.object_prefix);
1736
1737 if (follow_seq)
1738 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1739 else
1740 rbd_dev->header.snapc->seq = snap_seq;
1741
1742 ret = __rbd_init_snaps_header(rbd_dev);
1743
1744 up_write(&rbd_dev->header_rwsem);
1745
1746 return ret;
1747 }
1748
1749 static int rbd_init_disk(struct rbd_device *rbd_dev)
1750 {
1751 struct gendisk *disk;
1752 struct request_queue *q;
1753 int rc;
1754 u64 segment_size;
1755 u64 total_size = 0;
1756
1757 /* contact OSD, request size info about the object being mapped */
1758 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1759 if (rc)
1760 return rc;
1761
1762 /* no need to lock here, as rbd_dev is not registered yet */
1763 rc = __rbd_init_snaps_header(rbd_dev);
1764 if (rc)
1765 return rc;
1766
1767 rc = rbd_header_set_snap(rbd_dev, &total_size);
1768 if (rc)
1769 return rc;
1770
1771 /* create gendisk info */
1772 rc = -ENOMEM;
1773 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1774 if (!disk)
1775 goto out;
1776
1777 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1778 rbd_dev->id);
1779 disk->major = rbd_dev->major;
1780 disk->first_minor = 0;
1781 disk->fops = &rbd_bd_ops;
1782 disk->private_data = rbd_dev;
1783
1784 /* init rq */
1785 rc = -ENOMEM;
1786 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1787 if (!q)
1788 goto out_disk;
1789
1790 /* We use the default size, but let's be explicit about it. */
1791 blk_queue_physical_block_size(q, SECTOR_SIZE);
1792
1793 /* set io sizes to object size */
1794 segment_size = rbd_obj_bytes(&rbd_dev->header);
1795 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1796 blk_queue_max_segment_size(q, segment_size);
1797 blk_queue_io_min(q, segment_size);
1798 blk_queue_io_opt(q, segment_size);
1799
1800 blk_queue_merge_bvec(q, rbd_merge_bvec);
1801 disk->queue = q;
1802
1803 q->queuedata = rbd_dev;
1804
1805 rbd_dev->disk = disk;
1806 rbd_dev->q = q;
1807
1808 /* finally, announce the disk to the world */
1809 set_capacity(disk, total_size / SECTOR_SIZE);
1810 add_disk(disk);
1811
1812 pr_info("%s: added with size 0x%llx\n",
1813 disk->disk_name, (unsigned long long)total_size);
1814 return 0;
1815
1816 out_disk:
1817 put_disk(disk);
1818 out:
1819 return rc;
1820 }
1821
1822 /*
1823 sysfs
1824 */
1825
1826 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1827 {
1828 return container_of(dev, struct rbd_device, dev);
1829 }
1830
1831 static ssize_t rbd_size_show(struct device *dev,
1832 struct device_attribute *attr, char *buf)
1833 {
1834 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1835
1836 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1837 }
1838
1839 static ssize_t rbd_major_show(struct device *dev,
1840 struct device_attribute *attr, char *buf)
1841 {
1842 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1843
1844 return sprintf(buf, "%d\n", rbd_dev->major);
1845 }
1846
1847 static ssize_t rbd_client_id_show(struct device *dev,
1848 struct device_attribute *attr, char *buf)
1849 {
1850 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1851
1852 return sprintf(buf, "client%lld\n",
1853 ceph_client_id(rbd_dev->rbd_client->client));
1854 }
1855
1856 static ssize_t rbd_pool_show(struct device *dev,
1857 struct device_attribute *attr, char *buf)
1858 {
1859 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1860
1861 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1862 }
1863
1864 static ssize_t rbd_pool_id_show(struct device *dev,
1865 struct device_attribute *attr, char *buf)
1866 {
1867 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1868
1869 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1870 }
1871
1872 static ssize_t rbd_name_show(struct device *dev,
1873 struct device_attribute *attr, char *buf)
1874 {
1875 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876
1877 return sprintf(buf, "%s\n", rbd_dev->obj);
1878 }
1879
1880 static ssize_t rbd_snap_show(struct device *dev,
1881 struct device_attribute *attr,
1882 char *buf)
1883 {
1884 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1885
1886 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1887 }
1888
1889 static ssize_t rbd_image_refresh(struct device *dev,
1890 struct device_attribute *attr,
1891 const char *buf,
1892 size_t size)
1893 {
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 int rc;
1896 int ret = size;
1897
1898 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1899
1900 rc = __rbd_refresh_header(rbd_dev);
1901 if (rc < 0)
1902 ret = rc;
1903
1904 mutex_unlock(&ctl_mutex);
1905 return ret;
1906 }
1907
1908 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1909 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1910 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1911 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1912 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1913 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1914 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1915 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1916 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1917
1918 static struct attribute *rbd_attrs[] = {
1919 &dev_attr_size.attr,
1920 &dev_attr_major.attr,
1921 &dev_attr_client_id.attr,
1922 &dev_attr_pool.attr,
1923 &dev_attr_pool_id.attr,
1924 &dev_attr_name.attr,
1925 &dev_attr_current_snap.attr,
1926 &dev_attr_refresh.attr,
1927 &dev_attr_create_snap.attr,
1928 NULL
1929 };
1930
1931 static struct attribute_group rbd_attr_group = {
1932 .attrs = rbd_attrs,
1933 };
1934
1935 static const struct attribute_group *rbd_attr_groups[] = {
1936 &rbd_attr_group,
1937 NULL
1938 };
1939
1940 static void rbd_sysfs_dev_release(struct device *dev)
1941 {
1942 }
1943
1944 static struct device_type rbd_device_type = {
1945 .name = "rbd",
1946 .groups = rbd_attr_groups,
1947 .release = rbd_sysfs_dev_release,
1948 };
1949
1950
1951 /*
1952 sysfs - snapshots
1953 */
1954
1955 static ssize_t rbd_snap_size_show(struct device *dev,
1956 struct device_attribute *attr,
1957 char *buf)
1958 {
1959 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960
1961 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1962 }
1963
1964 static ssize_t rbd_snap_id_show(struct device *dev,
1965 struct device_attribute *attr,
1966 char *buf)
1967 {
1968 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1969
1970 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1971 }
1972
1973 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1974 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1975
1976 static struct attribute *rbd_snap_attrs[] = {
1977 &dev_attr_snap_size.attr,
1978 &dev_attr_snap_id.attr,
1979 NULL,
1980 };
1981
1982 static struct attribute_group rbd_snap_attr_group = {
1983 .attrs = rbd_snap_attrs,
1984 };
1985
1986 static void rbd_snap_dev_release(struct device *dev)
1987 {
1988 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1989 kfree(snap->name);
1990 kfree(snap);
1991 }
1992
1993 static const struct attribute_group *rbd_snap_attr_groups[] = {
1994 &rbd_snap_attr_group,
1995 NULL
1996 };
1997
1998 static struct device_type rbd_snap_device_type = {
1999 .groups = rbd_snap_attr_groups,
2000 .release = rbd_snap_dev_release,
2001 };
2002
2003 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2004 struct rbd_snap *snap)
2005 {
2006 list_del(&snap->node);
2007 device_unregister(&snap->dev);
2008 }
2009
2010 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2011 struct rbd_snap *snap,
2012 struct device *parent)
2013 {
2014 struct device *dev = &snap->dev;
2015 int ret;
2016
2017 dev->type = &rbd_snap_device_type;
2018 dev->parent = parent;
2019 dev->release = rbd_snap_dev_release;
2020 dev_set_name(dev, "snap_%s", snap->name);
2021 ret = device_register(dev);
2022
2023 return ret;
2024 }
2025
2026 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2027 int i, const char *name,
2028 struct rbd_snap **snapp)
2029 {
2030 int ret;
2031 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2032 if (!snap)
2033 return -ENOMEM;
2034 snap->name = kstrdup(name, GFP_KERNEL);
2035 snap->size = rbd_dev->header.snap_sizes[i];
2036 snap->id = rbd_dev->header.snapc->snaps[i];
2037 if (device_is_registered(&rbd_dev->dev)) {
2038 ret = rbd_register_snap_dev(rbd_dev, snap,
2039 &rbd_dev->dev);
2040 if (ret < 0)
2041 goto err;
2042 }
2043 *snapp = snap;
2044 return 0;
2045 err:
2046 kfree(snap->name);
2047 kfree(snap);
2048 return ret;
2049 }
2050
2051 /*
2052 * search for the previous snap in a null delimited string list
2053 */
2054 const char *rbd_prev_snap_name(const char *name, const char *start)
2055 {
2056 if (name < start + 2)
2057 return NULL;
2058
2059 name -= 2;
2060 while (*name) {
2061 if (name == start)
2062 return start;
2063 name--;
2064 }
2065 return name + 1;
2066 }
2067
2068 /*
2069 * compare the old list of snapshots that we have to what's in the header
2070 * and update it accordingly. Note that the header holds the snapshots
2071 * in a reverse order (from newest to oldest) and we need to go from
2072 * older to new so that we don't get a duplicate snap name when
2073 * doing the process (e.g., removed snapshot and recreated a new
2074 * one with the same name.
2075 */
2076 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2077 {
2078 const char *name, *first_name;
2079 int i = rbd_dev->header.total_snaps;
2080 struct rbd_snap *snap, *old_snap = NULL;
2081 int ret;
2082 struct list_head *p, *n;
2083
2084 first_name = rbd_dev->header.snap_names;
2085 name = first_name + rbd_dev->header.snap_names_len;
2086
2087 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2088 u64 cur_id;
2089
2090 old_snap = list_entry(p, struct rbd_snap, node);
2091
2092 if (i)
2093 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2094
2095 if (!i || old_snap->id < cur_id) {
2096 /* old_snap->id was skipped, thus was removed */
2097 __rbd_remove_snap_dev(rbd_dev, old_snap);
2098 continue;
2099 }
2100 if (old_snap->id == cur_id) {
2101 /* we have this snapshot already */
2102 i--;
2103 name = rbd_prev_snap_name(name, first_name);
2104 continue;
2105 }
2106 for (; i > 0;
2107 i--, name = rbd_prev_snap_name(name, first_name)) {
2108 if (!name) {
2109 WARN_ON(1);
2110 return -EINVAL;
2111 }
2112 cur_id = rbd_dev->header.snapc->snaps[i];
2113 /* snapshot removal? handle it above */
2114 if (cur_id >= old_snap->id)
2115 break;
2116 /* a new snapshot */
2117 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2118 if (ret < 0)
2119 return ret;
2120
2121 /* note that we add it backward so using n and not p */
2122 list_add(&snap->node, n);
2123 p = &snap->node;
2124 }
2125 }
2126 /* we're done going over the old snap list, just add what's left */
2127 for (; i > 0; i--) {
2128 name = rbd_prev_snap_name(name, first_name);
2129 if (!name) {
2130 WARN_ON(1);
2131 return -EINVAL;
2132 }
2133 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2134 if (ret < 0)
2135 return ret;
2136 list_add(&snap->node, &rbd_dev->snaps);
2137 }
2138
2139 return 0;
2140 }
2141
2142 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2143 {
2144 int ret;
2145 struct device *dev;
2146 struct rbd_snap *snap;
2147
2148 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2149 dev = &rbd_dev->dev;
2150
2151 dev->bus = &rbd_bus_type;
2152 dev->type = &rbd_device_type;
2153 dev->parent = &rbd_root_dev;
2154 dev->release = rbd_dev_release;
2155 dev_set_name(dev, "%d", rbd_dev->id);
2156 ret = device_register(dev);
2157 if (ret < 0)
2158 goto out;
2159
2160 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2161 ret = rbd_register_snap_dev(rbd_dev, snap,
2162 &rbd_dev->dev);
2163 if (ret < 0)
2164 break;
2165 }
2166 out:
2167 mutex_unlock(&ctl_mutex);
2168 return ret;
2169 }
2170
2171 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2172 {
2173 device_unregister(&rbd_dev->dev);
2174 }
2175
2176 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2177 {
2178 int ret, rc;
2179
2180 do {
2181 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2182 rbd_dev->header.obj_version);
2183 if (ret == -ERANGE) {
2184 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2185 rc = __rbd_refresh_header(rbd_dev);
2186 mutex_unlock(&ctl_mutex);
2187 if (rc < 0)
2188 return rc;
2189 }
2190 } while (ret == -ERANGE);
2191
2192 return ret;
2193 }
2194
2195 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2196
2197 /*
2198 * Get a unique rbd identifier for the given new rbd_dev, and add
2199 * the rbd_dev to the global list. The minimum rbd id is 1.
2200 */
2201 static void rbd_id_get(struct rbd_device *rbd_dev)
2202 {
2203 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2204
2205 spin_lock(&rbd_dev_list_lock);
2206 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2207 spin_unlock(&rbd_dev_list_lock);
2208 }
2209
2210 /*
2211 * Remove an rbd_dev from the global list, and record that its
2212 * identifier is no longer in use.
2213 */
2214 static void rbd_id_put(struct rbd_device *rbd_dev)
2215 {
2216 struct list_head *tmp;
2217 int rbd_id = rbd_dev->id;
2218 int max_id;
2219
2220 BUG_ON(rbd_id < 1);
2221
2222 spin_lock(&rbd_dev_list_lock);
2223 list_del_init(&rbd_dev->node);
2224
2225 /*
2226 * If the id being "put" is not the current maximum, there
2227 * is nothing special we need to do.
2228 */
2229 if (rbd_id != atomic64_read(&rbd_id_max)) {
2230 spin_unlock(&rbd_dev_list_lock);
2231 return;
2232 }
2233
2234 /*
2235 * We need to update the current maximum id. Search the
2236 * list to find out what it is. We're more likely to find
2237 * the maximum at the end, so search the list backward.
2238 */
2239 max_id = 0;
2240 list_for_each_prev(tmp, &rbd_dev_list) {
2241 struct rbd_device *rbd_dev;
2242
2243 rbd_dev = list_entry(tmp, struct rbd_device, node);
2244 if (rbd_id > max_id)
2245 max_id = rbd_id;
2246 }
2247 spin_unlock(&rbd_dev_list_lock);
2248
2249 /*
2250 * The max id could have been updated by rbd_id_get(), in
2251 * which case it now accurately reflects the new maximum.
2252 * Be careful not to overwrite the maximum value in that
2253 * case.
2254 */
2255 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2256 }
2257
2258 /*
2259 * Skips over white space at *buf, and updates *buf to point to the
2260 * first found non-space character (if any). Returns the length of
2261 * the token (string of non-white space characters) found. Note
2262 * that *buf must be terminated with '\0'.
2263 */
2264 static inline size_t next_token(const char **buf)
2265 {
2266 /*
2267 * These are the characters that produce nonzero for
2268 * isspace() in the "C" and "POSIX" locales.
2269 */
2270 const char *spaces = " \f\n\r\t\v";
2271
2272 *buf += strspn(*buf, spaces); /* Find start of token */
2273
2274 return strcspn(*buf, spaces); /* Return token length */
2275 }
2276
2277 /*
2278 * Finds the next token in *buf, and if the provided token buffer is
2279 * big enough, copies the found token into it. The result, if
2280 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2281 * must be terminated with '\0' on entry.
2282 *
2283 * Returns the length of the token found (not including the '\0').
2284 * Return value will be 0 if no token is found, and it will be >=
2285 * token_size if the token would not fit.
2286 *
2287 * The *buf pointer will be updated to point beyond the end of the
2288 * found token. Note that this occurs even if the token buffer is
2289 * too small to hold it.
2290 */
2291 static inline size_t copy_token(const char **buf,
2292 char *token,
2293 size_t token_size)
2294 {
2295 size_t len;
2296
2297 len = next_token(buf);
2298 if (len < token_size) {
2299 memcpy(token, *buf, len);
2300 *(token + len) = '\0';
2301 }
2302 *buf += len;
2303
2304 return len;
2305 }
2306
2307 /*
2308 * Finds the next token in *buf, dynamically allocates a buffer big
2309 * enough to hold a copy of it, and copies the token into the new
2310 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2311 * that a duplicate buffer is created even for a zero-length token.
2312 *
2313 * Returns a pointer to the newly-allocated duplicate, or a null
2314 * pointer if memory for the duplicate was not available. If
2315 * the lenp argument is a non-null pointer, the length of the token
2316 * (not including the '\0') is returned in *lenp.
2317 *
2318 * If successful, the *buf pointer will be updated to point beyond
2319 * the end of the found token.
2320 *
2321 * Note: uses GFP_KERNEL for allocation.
2322 */
2323 static inline char *dup_token(const char **buf, size_t *lenp)
2324 {
2325 char *dup;
2326 size_t len;
2327
2328 len = next_token(buf);
2329 dup = kmalloc(len + 1, GFP_KERNEL);
2330 if (!dup)
2331 return NULL;
2332
2333 memcpy(dup, *buf, len);
2334 *(dup + len) = '\0';
2335 *buf += len;
2336
2337 if (lenp)
2338 *lenp = len;
2339
2340 return dup;
2341 }
2342
2343 /*
2344 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2345 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2346 * on the list of monitor addresses and other options provided via
2347 * /sys/bus/rbd/add.
2348 *
2349 * Note: rbd_dev is assumed to have been initially zero-filled.
2350 */
2351 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2352 const char *buf,
2353 const char **mon_addrs,
2354 size_t *mon_addrs_size,
2355 char *options,
2356 size_t options_size)
2357 {
2358 size_t len;
2359 int ret;
2360
2361 /* The first four tokens are required */
2362
2363 len = next_token(&buf);
2364 if (!len)
2365 return -EINVAL;
2366 *mon_addrs_size = len + 1;
2367 *mon_addrs = buf;
2368
2369 buf += len;
2370
2371 len = copy_token(&buf, options, options_size);
2372 if (!len || len >= options_size)
2373 return -EINVAL;
2374
2375 ret = -ENOMEM;
2376 rbd_dev->pool_name = dup_token(&buf, NULL);
2377 if (!rbd_dev->pool_name)
2378 goto out_err;
2379
2380 rbd_dev->obj = dup_token(&buf, &rbd_dev->obj_len);
2381 if (!rbd_dev->obj)
2382 goto out_err;
2383
2384 /* Create the name of the header object */
2385
2386 rbd_dev->obj_md_name = kmalloc(rbd_dev->obj_len
2387 + sizeof (RBD_SUFFIX),
2388 GFP_KERNEL);
2389 if (!rbd_dev->obj_md_name)
2390 goto out_err;
2391 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2392
2393 /*
2394 * The snapshot name is optional. If none is is supplied,
2395 * we use the default value.
2396 */
2397 rbd_dev->snap_name = dup_token(&buf, &len);
2398 if (!rbd_dev->snap_name)
2399 goto out_err;
2400 if (!len) {
2401 /* Replace the empty name with the default */
2402 kfree(rbd_dev->snap_name);
2403 rbd_dev->snap_name
2404 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2405 if (!rbd_dev->snap_name)
2406 goto out_err;
2407
2408 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2409 sizeof (RBD_SNAP_HEAD_NAME));
2410 }
2411
2412 return 0;
2413
2414 out_err:
2415 kfree(rbd_dev->obj_md_name);
2416 kfree(rbd_dev->obj);
2417 kfree(rbd_dev->pool_name);
2418 rbd_dev->pool_name = NULL;
2419
2420 return ret;
2421 }
2422
2423 static ssize_t rbd_add(struct bus_type *bus,
2424 const char *buf,
2425 size_t count)
2426 {
2427 char *options;
2428 struct rbd_device *rbd_dev = NULL;
2429 const char *mon_addrs = NULL;
2430 size_t mon_addrs_size = 0;
2431 struct ceph_osd_client *osdc;
2432 int rc = -ENOMEM;
2433
2434 if (!try_module_get(THIS_MODULE))
2435 return -ENODEV;
2436
2437 options = kmalloc(count, GFP_KERNEL);
2438 if (!options)
2439 goto err_nomem;
2440 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2441 if (!rbd_dev)
2442 goto err_nomem;
2443
2444 /* static rbd_device initialization */
2445 spin_lock_init(&rbd_dev->lock);
2446 INIT_LIST_HEAD(&rbd_dev->node);
2447 INIT_LIST_HEAD(&rbd_dev->snaps);
2448 init_rwsem(&rbd_dev->header_rwsem);
2449
2450 init_rwsem(&rbd_dev->header_rwsem);
2451
2452 /* generate unique id: find highest unique id, add one */
2453 rbd_id_get(rbd_dev);
2454
2455 /* Fill in the device name, now that we have its id. */
2456 BUILD_BUG_ON(DEV_NAME_LEN
2457 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2458 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2459
2460 /* parse add command */
2461 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2462 options, count);
2463 if (rc)
2464 goto err_put_id;
2465
2466 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2467 options);
2468 if (IS_ERR(rbd_dev->rbd_client)) {
2469 rc = PTR_ERR(rbd_dev->rbd_client);
2470 goto err_put_id;
2471 }
2472
2473 /* pick the pool */
2474 osdc = &rbd_dev->rbd_client->client->osdc;
2475 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2476 if (rc < 0)
2477 goto err_out_client;
2478 rbd_dev->pool_id = rc;
2479
2480 /* register our block device */
2481 rc = register_blkdev(0, rbd_dev->name);
2482 if (rc < 0)
2483 goto err_out_client;
2484 rbd_dev->major = rc;
2485
2486 rc = rbd_bus_add_dev(rbd_dev);
2487 if (rc)
2488 goto err_out_blkdev;
2489
2490 /*
2491 * At this point cleanup in the event of an error is the job
2492 * of the sysfs code (initiated by rbd_bus_del_dev()).
2493 *
2494 * Set up and announce blkdev mapping.
2495 */
2496 rc = rbd_init_disk(rbd_dev);
2497 if (rc)
2498 goto err_out_bus;
2499
2500 rc = rbd_init_watch_dev(rbd_dev);
2501 if (rc)
2502 goto err_out_bus;
2503
2504 return count;
2505
2506 err_out_bus:
2507 /* this will also clean up rest of rbd_dev stuff */
2508
2509 rbd_bus_del_dev(rbd_dev);
2510 kfree(options);
2511 return rc;
2512
2513 err_out_blkdev:
2514 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2515 err_out_client:
2516 rbd_put_client(rbd_dev);
2517 err_put_id:
2518 if (rbd_dev->pool_name) {
2519 kfree(rbd_dev->snap_name);
2520 kfree(rbd_dev->obj_md_name);
2521 kfree(rbd_dev->obj);
2522 kfree(rbd_dev->pool_name);
2523 }
2524 rbd_id_put(rbd_dev);
2525 err_nomem:
2526 kfree(rbd_dev);
2527 kfree(options);
2528
2529 dout("Error adding device %s\n", buf);
2530 module_put(THIS_MODULE);
2531
2532 return (ssize_t) rc;
2533 }
2534
2535 static struct rbd_device *__rbd_get_dev(unsigned long id)
2536 {
2537 struct list_head *tmp;
2538 struct rbd_device *rbd_dev;
2539
2540 spin_lock(&rbd_dev_list_lock);
2541 list_for_each(tmp, &rbd_dev_list) {
2542 rbd_dev = list_entry(tmp, struct rbd_device, node);
2543 if (rbd_dev->id == id) {
2544 spin_unlock(&rbd_dev_list_lock);
2545 return rbd_dev;
2546 }
2547 }
2548 spin_unlock(&rbd_dev_list_lock);
2549 return NULL;
2550 }
2551
2552 static void rbd_dev_release(struct device *dev)
2553 {
2554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2555
2556 if (rbd_dev->watch_request) {
2557 struct ceph_client *client = rbd_dev->rbd_client->client;
2558
2559 ceph_osdc_unregister_linger_request(&client->osdc,
2560 rbd_dev->watch_request);
2561 }
2562 if (rbd_dev->watch_event)
2563 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2564
2565 rbd_put_client(rbd_dev);
2566
2567 /* clean up and free blkdev */
2568 rbd_free_disk(rbd_dev);
2569 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2570
2571 /* done with the id, and with the rbd_dev */
2572 kfree(rbd_dev->snap_name);
2573 kfree(rbd_dev->obj_md_name);
2574 kfree(rbd_dev->pool_name);
2575 kfree(rbd_dev->obj);
2576 rbd_id_put(rbd_dev);
2577 kfree(rbd_dev);
2578
2579 /* release module ref */
2580 module_put(THIS_MODULE);
2581 }
2582
2583 static ssize_t rbd_remove(struct bus_type *bus,
2584 const char *buf,
2585 size_t count)
2586 {
2587 struct rbd_device *rbd_dev = NULL;
2588 int target_id, rc;
2589 unsigned long ul;
2590 int ret = count;
2591
2592 rc = strict_strtoul(buf, 10, &ul);
2593 if (rc)
2594 return rc;
2595
2596 /* convert to int; abort if we lost anything in the conversion */
2597 target_id = (int) ul;
2598 if (target_id != ul)
2599 return -EINVAL;
2600
2601 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2602
2603 rbd_dev = __rbd_get_dev(target_id);
2604 if (!rbd_dev) {
2605 ret = -ENOENT;
2606 goto done;
2607 }
2608
2609 __rbd_remove_all_snaps(rbd_dev);
2610 rbd_bus_del_dev(rbd_dev);
2611
2612 done:
2613 mutex_unlock(&ctl_mutex);
2614 return ret;
2615 }
2616
2617 static ssize_t rbd_snap_add(struct device *dev,
2618 struct device_attribute *attr,
2619 const char *buf,
2620 size_t count)
2621 {
2622 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2623 int ret;
2624 char *name = kmalloc(count + 1, GFP_KERNEL);
2625 if (!name)
2626 return -ENOMEM;
2627
2628 snprintf(name, count, "%s", buf);
2629
2630 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2631
2632 ret = rbd_header_add_snap(rbd_dev,
2633 name, GFP_KERNEL);
2634 if (ret < 0)
2635 goto err_unlock;
2636
2637 ret = __rbd_refresh_header(rbd_dev);
2638 if (ret < 0)
2639 goto err_unlock;
2640
2641 /* shouldn't hold ctl_mutex when notifying.. notify might
2642 trigger a watch callback that would need to get that mutex */
2643 mutex_unlock(&ctl_mutex);
2644
2645 /* make a best effort, don't error if failed */
2646 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2647
2648 ret = count;
2649 kfree(name);
2650 return ret;
2651
2652 err_unlock:
2653 mutex_unlock(&ctl_mutex);
2654 kfree(name);
2655 return ret;
2656 }
2657
2658 /*
2659 * create control files in sysfs
2660 * /sys/bus/rbd/...
2661 */
2662 static int rbd_sysfs_init(void)
2663 {
2664 int ret;
2665
2666 ret = device_register(&rbd_root_dev);
2667 if (ret < 0)
2668 return ret;
2669
2670 ret = bus_register(&rbd_bus_type);
2671 if (ret < 0)
2672 device_unregister(&rbd_root_dev);
2673
2674 return ret;
2675 }
2676
2677 static void rbd_sysfs_cleanup(void)
2678 {
2679 bus_unregister(&rbd_bus_type);
2680 device_unregister(&rbd_root_dev);
2681 }
2682
2683 int __init rbd_init(void)
2684 {
2685 int rc;
2686
2687 rc = rbd_sysfs_init();
2688 if (rc)
2689 return rc;
2690 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2691 return 0;
2692 }
2693
2694 void __exit rbd_exit(void)
2695 {
2696 rbd_sysfs_cleanup();
2697 }
2698
2699 module_init(rbd_init);
2700 module_exit(rbd_exit);
2701
2702 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2703 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2704 MODULE_DESCRIPTION("rados block device");
2705
2706 /* following authorship retained from original osdblk.c */
2707 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2708
2709 MODULE_LICENSE("GPL");
This page took 0.105068 seconds and 4 git commands to generate.