34f46c3b188ff72ed6aee3e766f13b8cdbd89deb
[deliverable/linux.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
66
67 #define RBD_SNAP_HEAD_NAME "-"
68
69 #define RBD_IMAGE_ID_LEN_MAX 64
70
71 /*
72 * An RBD device name will be "rbd#", where the "rbd" comes from
73 * RBD_DRV_NAME above, and # is a unique integer identifier.
74 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
75 * enough to hold all possible device names.
76 */
77 #define DEV_NAME_LEN 32
78 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
79
80 #define RBD_READ_ONLY_DEFAULT false
81
82 /*
83 * block device image metadata (in-memory version)
84 */
85 struct rbd_image_header {
86 /* These four fields never change for a given rbd image */
87 char *object_prefix;
88 __u8 obj_order;
89 __u8 crypt_type;
90 __u8 comp_type;
91
92 /* The remaining fields need to be updated occasionally */
93 u64 image_size;
94 struct ceph_snap_context *snapc;
95 char *snap_names;
96 u64 *snap_sizes;
97
98 u64 obj_version;
99 };
100
101 struct rbd_options {
102 bool read_only;
103 };
104
105 /*
106 * an instance of the client. multiple devices may share an rbd client.
107 */
108 struct rbd_client {
109 struct ceph_client *client;
110 struct kref kref;
111 struct list_head node;
112 };
113
114 /*
115 * a request completion status
116 */
117 struct rbd_req_status {
118 int done;
119 int rc;
120 u64 bytes;
121 };
122
123 /*
124 * a collection of requests
125 */
126 struct rbd_req_coll {
127 int total;
128 int num_done;
129 struct kref kref;
130 struct rbd_req_status status[0];
131 };
132
133 /*
134 * a single io request
135 */
136 struct rbd_request {
137 struct request *rq; /* blk layer request */
138 struct bio *bio; /* cloned bio */
139 struct page **pages; /* list of used pages */
140 u64 len;
141 int coll_index;
142 struct rbd_req_coll *coll;
143 };
144
145 struct rbd_snap {
146 struct device dev;
147 const char *name;
148 u64 size;
149 struct list_head node;
150 u64 id;
151 };
152
153 struct rbd_mapping {
154 char *snap_name;
155 u64 snap_id;
156 u64 size;
157 bool snap_exists;
158 bool read_only;
159 };
160
161 /*
162 * a single device
163 */
164 struct rbd_device {
165 int dev_id; /* blkdev unique id */
166
167 int major; /* blkdev assigned major */
168 struct gendisk *disk; /* blkdev's gendisk and rq */
169
170 struct rbd_options rbd_opts;
171 struct rbd_client *rbd_client;
172
173 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
174
175 spinlock_t lock; /* queue lock */
176
177 struct rbd_image_header header;
178 char *image_id;
179 size_t image_id_len;
180 char *image_name;
181 size_t image_name_len;
182 char *header_name;
183 char *pool_name;
184 int pool_id;
185
186 struct ceph_osd_event *watch_event;
187 struct ceph_osd_request *watch_request;
188
189 /* protects updating the header */
190 struct rw_semaphore header_rwsem;
191
192 struct rbd_mapping mapping;
193
194 struct list_head node;
195
196 /* list of snapshots */
197 struct list_head snaps;
198
199 /* sysfs related */
200 struct device dev;
201 };
202
203 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
204
205 static LIST_HEAD(rbd_dev_list); /* devices */
206 static DEFINE_SPINLOCK(rbd_dev_list_lock);
207
208 static LIST_HEAD(rbd_client_list); /* clients */
209 static DEFINE_SPINLOCK(rbd_client_list_lock);
210
211 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
212 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
213
214 static void rbd_dev_release(struct device *dev);
215 static ssize_t rbd_snap_add(struct device *dev,
216 struct device_attribute *attr,
217 const char *buf,
218 size_t count);
219 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
220
221 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
222 size_t count);
223 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
224 size_t count);
225
226 static struct bus_attribute rbd_bus_attrs[] = {
227 __ATTR(add, S_IWUSR, NULL, rbd_add),
228 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
229 __ATTR_NULL
230 };
231
232 static struct bus_type rbd_bus_type = {
233 .name = "rbd",
234 .bus_attrs = rbd_bus_attrs,
235 };
236
237 static void rbd_root_dev_release(struct device *dev)
238 {
239 }
240
241 static struct device rbd_root_dev = {
242 .init_name = "rbd",
243 .release = rbd_root_dev_release,
244 };
245
246 #ifdef RBD_DEBUG
247 #define rbd_assert(expr) \
248 if (unlikely(!(expr))) { \
249 printk(KERN_ERR "\nAssertion failure in %s() " \
250 "at line %d:\n\n" \
251 "\trbd_assert(%s);\n\n", \
252 __func__, __LINE__, #expr); \
253 BUG(); \
254 }
255 #else /* !RBD_DEBUG */
256 # define rbd_assert(expr) ((void) 0)
257 #endif /* !RBD_DEBUG */
258
259 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
260 {
261 return get_device(&rbd_dev->dev);
262 }
263
264 static void rbd_put_dev(struct rbd_device *rbd_dev)
265 {
266 put_device(&rbd_dev->dev);
267 }
268
269 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
270
271 static int rbd_open(struct block_device *bdev, fmode_t mode)
272 {
273 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
274
275 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
276 return -EROFS;
277
278 rbd_get_dev(rbd_dev);
279 set_device_ro(bdev, rbd_dev->mapping.read_only);
280
281 return 0;
282 }
283
284 static int rbd_release(struct gendisk *disk, fmode_t mode)
285 {
286 struct rbd_device *rbd_dev = disk->private_data;
287
288 rbd_put_dev(rbd_dev);
289
290 return 0;
291 }
292
293 static const struct block_device_operations rbd_bd_ops = {
294 .owner = THIS_MODULE,
295 .open = rbd_open,
296 .release = rbd_release,
297 };
298
299 /*
300 * Initialize an rbd client instance.
301 * We own *ceph_opts.
302 */
303 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
304 {
305 struct rbd_client *rbdc;
306 int ret = -ENOMEM;
307
308 dout("rbd_client_create\n");
309 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
310 if (!rbdc)
311 goto out_opt;
312
313 kref_init(&rbdc->kref);
314 INIT_LIST_HEAD(&rbdc->node);
315
316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317
318 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
319 if (IS_ERR(rbdc->client))
320 goto out_mutex;
321 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
322
323 ret = ceph_open_session(rbdc->client);
324 if (ret < 0)
325 goto out_err;
326
327 spin_lock(&rbd_client_list_lock);
328 list_add_tail(&rbdc->node, &rbd_client_list);
329 spin_unlock(&rbd_client_list_lock);
330
331 mutex_unlock(&ctl_mutex);
332
333 dout("rbd_client_create created %p\n", rbdc);
334 return rbdc;
335
336 out_err:
337 ceph_destroy_client(rbdc->client);
338 out_mutex:
339 mutex_unlock(&ctl_mutex);
340 kfree(rbdc);
341 out_opt:
342 if (ceph_opts)
343 ceph_destroy_options(ceph_opts);
344 return ERR_PTR(ret);
345 }
346
347 /*
348 * Find a ceph client with specific addr and configuration. If
349 * found, bump its reference count.
350 */
351 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
352 {
353 struct rbd_client *client_node;
354 bool found = false;
355
356 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
357 return NULL;
358
359 spin_lock(&rbd_client_list_lock);
360 list_for_each_entry(client_node, &rbd_client_list, node) {
361 if (!ceph_compare_options(ceph_opts, client_node->client)) {
362 kref_get(&client_node->kref);
363 found = true;
364 break;
365 }
366 }
367 spin_unlock(&rbd_client_list_lock);
368
369 return found ? client_node : NULL;
370 }
371
372 /*
373 * mount options
374 */
375 enum {
376 Opt_last_int,
377 /* int args above */
378 Opt_last_string,
379 /* string args above */
380 Opt_read_only,
381 Opt_read_write,
382 /* Boolean args above */
383 Opt_last_bool,
384 };
385
386 static match_table_t rbd_opts_tokens = {
387 /* int args above */
388 /* string args above */
389 {Opt_read_only, "mapping.read_only"},
390 {Opt_read_only, "ro"}, /* Alternate spelling */
391 {Opt_read_write, "read_write"},
392 {Opt_read_write, "rw"}, /* Alternate spelling */
393 /* Boolean args above */
394 {-1, NULL}
395 };
396
397 static int parse_rbd_opts_token(char *c, void *private)
398 {
399 struct rbd_options *rbd_opts = private;
400 substring_t argstr[MAX_OPT_ARGS];
401 int token, intval, ret;
402
403 token = match_token(c, rbd_opts_tokens, argstr);
404 if (token < 0)
405 return -EINVAL;
406
407 if (token < Opt_last_int) {
408 ret = match_int(&argstr[0], &intval);
409 if (ret < 0) {
410 pr_err("bad mount option arg (not int) "
411 "at '%s'\n", c);
412 return ret;
413 }
414 dout("got int token %d val %d\n", token, intval);
415 } else if (token > Opt_last_int && token < Opt_last_string) {
416 dout("got string token %d val %s\n", token,
417 argstr[0].from);
418 } else if (token > Opt_last_string && token < Opt_last_bool) {
419 dout("got Boolean token %d\n", token);
420 } else {
421 dout("got token %d\n", token);
422 }
423
424 switch (token) {
425 case Opt_read_only:
426 rbd_opts->read_only = true;
427 break;
428 case Opt_read_write:
429 rbd_opts->read_only = false;
430 break;
431 default:
432 rbd_assert(false);
433 break;
434 }
435 return 0;
436 }
437
438 /*
439 * Get a ceph client with specific addr and configuration, if one does
440 * not exist create it.
441 */
442 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
443 size_t mon_addr_len, char *options)
444 {
445 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
446 struct ceph_options *ceph_opts;
447 struct rbd_client *rbdc;
448
449 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
450
451 ceph_opts = ceph_parse_options(options, mon_addr,
452 mon_addr + mon_addr_len,
453 parse_rbd_opts_token, rbd_opts);
454 if (IS_ERR(ceph_opts))
455 return PTR_ERR(ceph_opts);
456
457 rbdc = rbd_client_find(ceph_opts);
458 if (rbdc) {
459 /* using an existing client */
460 ceph_destroy_options(ceph_opts);
461 } else {
462 rbdc = rbd_client_create(ceph_opts);
463 if (IS_ERR(rbdc))
464 return PTR_ERR(rbdc);
465 }
466 rbd_dev->rbd_client = rbdc;
467
468 return 0;
469 }
470
471 /*
472 * Destroy ceph client
473 *
474 * Caller must hold rbd_client_list_lock.
475 */
476 static void rbd_client_release(struct kref *kref)
477 {
478 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
479
480 dout("rbd_release_client %p\n", rbdc);
481 spin_lock(&rbd_client_list_lock);
482 list_del(&rbdc->node);
483 spin_unlock(&rbd_client_list_lock);
484
485 ceph_destroy_client(rbdc->client);
486 kfree(rbdc);
487 }
488
489 /*
490 * Drop reference to ceph client node. If it's not referenced anymore, release
491 * it.
492 */
493 static void rbd_put_client(struct rbd_device *rbd_dev)
494 {
495 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
496 rbd_dev->rbd_client = NULL;
497 }
498
499 /*
500 * Destroy requests collection
501 */
502 static void rbd_coll_release(struct kref *kref)
503 {
504 struct rbd_req_coll *coll =
505 container_of(kref, struct rbd_req_coll, kref);
506
507 dout("rbd_coll_release %p\n", coll);
508 kfree(coll);
509 }
510
511 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
512 {
513 size_t size;
514 u32 snap_count;
515
516 /* The header has to start with the magic rbd header text */
517 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
518 return false;
519
520 /*
521 * The size of a snapshot header has to fit in a size_t, and
522 * that limits the number of snapshots.
523 */
524 snap_count = le32_to_cpu(ondisk->snap_count);
525 size = SIZE_MAX - sizeof (struct ceph_snap_context);
526 if (snap_count > size / sizeof (__le64))
527 return false;
528
529 /*
530 * Not only that, but the size of the entire the snapshot
531 * header must also be representable in a size_t.
532 */
533 size -= snap_count * sizeof (__le64);
534 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
535 return false;
536
537 return true;
538 }
539
540 /*
541 * Create a new header structure, translate header format from the on-disk
542 * header.
543 */
544 static int rbd_header_from_disk(struct rbd_image_header *header,
545 struct rbd_image_header_ondisk *ondisk)
546 {
547 u32 snap_count;
548 size_t len;
549 size_t size;
550 u32 i;
551
552 memset(header, 0, sizeof (*header));
553
554 snap_count = le32_to_cpu(ondisk->snap_count);
555
556 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
557 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
558 if (!header->object_prefix)
559 return -ENOMEM;
560 memcpy(header->object_prefix, ondisk->object_prefix, len);
561 header->object_prefix[len] = '\0';
562
563 if (snap_count) {
564 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
565
566 /* Save a copy of the snapshot names */
567
568 if (snap_names_len > (u64) SIZE_MAX)
569 return -EIO;
570 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
571 if (!header->snap_names)
572 goto out_err;
573 /*
574 * Note that rbd_dev_v1_header_read() guarantees
575 * the ondisk buffer we're working with has
576 * snap_names_len bytes beyond the end of the
577 * snapshot id array, this memcpy() is safe.
578 */
579 memcpy(header->snap_names, &ondisk->snaps[snap_count],
580 snap_names_len);
581
582 /* Record each snapshot's size */
583
584 size = snap_count * sizeof (*header->snap_sizes);
585 header->snap_sizes = kmalloc(size, GFP_KERNEL);
586 if (!header->snap_sizes)
587 goto out_err;
588 for (i = 0; i < snap_count; i++)
589 header->snap_sizes[i] =
590 le64_to_cpu(ondisk->snaps[i].image_size);
591 } else {
592 WARN_ON(ondisk->snap_names_len);
593 header->snap_names = NULL;
594 header->snap_sizes = NULL;
595 }
596
597 header->obj_order = ondisk->options.order;
598 header->crypt_type = ondisk->options.crypt_type;
599 header->comp_type = ondisk->options.comp_type;
600
601 /* Allocate and fill in the snapshot context */
602
603 header->image_size = le64_to_cpu(ondisk->image_size);
604 size = sizeof (struct ceph_snap_context);
605 size += snap_count * sizeof (header->snapc->snaps[0]);
606 header->snapc = kzalloc(size, GFP_KERNEL);
607 if (!header->snapc)
608 goto out_err;
609
610 atomic_set(&header->snapc->nref, 1);
611 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
612 header->snapc->num_snaps = snap_count;
613 for (i = 0; i < snap_count; i++)
614 header->snapc->snaps[i] =
615 le64_to_cpu(ondisk->snaps[i].id);
616
617 return 0;
618
619 out_err:
620 kfree(header->snap_sizes);
621 header->snap_sizes = NULL;
622 kfree(header->snap_names);
623 header->snap_names = NULL;
624 kfree(header->object_prefix);
625 header->object_prefix = NULL;
626
627 return -ENOMEM;
628 }
629
630 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
631 {
632
633 struct rbd_snap *snap;
634
635 list_for_each_entry(snap, &rbd_dev->snaps, node) {
636 if (!strcmp(snap_name, snap->name)) {
637 rbd_dev->mapping.snap_id = snap->id;
638 rbd_dev->mapping.size = snap->size;
639
640 return 0;
641 }
642 }
643
644 return -ENOENT;
645 }
646
647 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
648 {
649 int ret;
650
651 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
652 sizeof (RBD_SNAP_HEAD_NAME))) {
653 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
654 rbd_dev->mapping.size = rbd_dev->header.image_size;
655 rbd_dev->mapping.snap_exists = false;
656 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
657 ret = 0;
658 } else {
659 ret = snap_by_name(rbd_dev, snap_name);
660 if (ret < 0)
661 goto done;
662 rbd_dev->mapping.snap_exists = true;
663 rbd_dev->mapping.read_only = true;
664 }
665 rbd_dev->mapping.snap_name = snap_name;
666 done:
667 return ret;
668 }
669
670 static void rbd_header_free(struct rbd_image_header *header)
671 {
672 kfree(header->object_prefix);
673 header->object_prefix = NULL;
674 kfree(header->snap_sizes);
675 header->snap_sizes = NULL;
676 kfree(header->snap_names);
677 header->snap_names = NULL;
678 ceph_put_snap_context(header->snapc);
679 header->snapc = NULL;
680 }
681
682 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
683 {
684 char *name;
685 u64 segment;
686 int ret;
687
688 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
689 if (!name)
690 return NULL;
691 segment = offset >> rbd_dev->header.obj_order;
692 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
693 rbd_dev->header.object_prefix, segment);
694 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
695 pr_err("error formatting segment name for #%llu (%d)\n",
696 segment, ret);
697 kfree(name);
698 name = NULL;
699 }
700
701 return name;
702 }
703
704 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
705 {
706 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
707
708 return offset & (segment_size - 1);
709 }
710
711 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
712 u64 offset, u64 length)
713 {
714 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
715
716 offset &= segment_size - 1;
717
718 rbd_assert(length <= U64_MAX - offset);
719 if (offset + length > segment_size)
720 length = segment_size - offset;
721
722 return length;
723 }
724
725 static int rbd_get_num_segments(struct rbd_image_header *header,
726 u64 ofs, u64 len)
727 {
728 u64 start_seg;
729 u64 end_seg;
730
731 if (!len)
732 return 0;
733 if (len - 1 > U64_MAX - ofs)
734 return -ERANGE;
735
736 start_seg = ofs >> header->obj_order;
737 end_seg = (ofs + len - 1) >> header->obj_order;
738
739 return end_seg - start_seg + 1;
740 }
741
742 /*
743 * returns the size of an object in the image
744 */
745 static u64 rbd_obj_bytes(struct rbd_image_header *header)
746 {
747 return 1 << header->obj_order;
748 }
749
750 /*
751 * bio helpers
752 */
753
754 static void bio_chain_put(struct bio *chain)
755 {
756 struct bio *tmp;
757
758 while (chain) {
759 tmp = chain;
760 chain = chain->bi_next;
761 bio_put(tmp);
762 }
763 }
764
765 /*
766 * zeros a bio chain, starting at specific offset
767 */
768 static void zero_bio_chain(struct bio *chain, int start_ofs)
769 {
770 struct bio_vec *bv;
771 unsigned long flags;
772 void *buf;
773 int i;
774 int pos = 0;
775
776 while (chain) {
777 bio_for_each_segment(bv, chain, i) {
778 if (pos + bv->bv_len > start_ofs) {
779 int remainder = max(start_ofs - pos, 0);
780 buf = bvec_kmap_irq(bv, &flags);
781 memset(buf + remainder, 0,
782 bv->bv_len - remainder);
783 bvec_kunmap_irq(buf, &flags);
784 }
785 pos += bv->bv_len;
786 }
787
788 chain = chain->bi_next;
789 }
790 }
791
792 /*
793 * bio_chain_clone - clone a chain of bios up to a certain length.
794 * might return a bio_pair that will need to be released.
795 */
796 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
797 struct bio_pair **bp,
798 int len, gfp_t gfpmask)
799 {
800 struct bio *old_chain = *old;
801 struct bio *new_chain = NULL;
802 struct bio *tail;
803 int total = 0;
804
805 if (*bp) {
806 bio_pair_release(*bp);
807 *bp = NULL;
808 }
809
810 while (old_chain && (total < len)) {
811 struct bio *tmp;
812
813 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
814 if (!tmp)
815 goto err_out;
816 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
817
818 if (total + old_chain->bi_size > len) {
819 struct bio_pair *bp;
820
821 /*
822 * this split can only happen with a single paged bio,
823 * split_bio will BUG_ON if this is not the case
824 */
825 dout("bio_chain_clone split! total=%d remaining=%d"
826 "bi_size=%u\n",
827 total, len - total, old_chain->bi_size);
828
829 /* split the bio. We'll release it either in the next
830 call, or it will have to be released outside */
831 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
832 if (!bp)
833 goto err_out;
834
835 __bio_clone(tmp, &bp->bio1);
836
837 *next = &bp->bio2;
838 } else {
839 __bio_clone(tmp, old_chain);
840 *next = old_chain->bi_next;
841 }
842
843 tmp->bi_bdev = NULL;
844 tmp->bi_next = NULL;
845 if (new_chain)
846 tail->bi_next = tmp;
847 else
848 new_chain = tmp;
849 tail = tmp;
850 old_chain = old_chain->bi_next;
851
852 total += tmp->bi_size;
853 }
854
855 rbd_assert(total == len);
856
857 *old = old_chain;
858
859 return new_chain;
860
861 err_out:
862 dout("bio_chain_clone with err\n");
863 bio_chain_put(new_chain);
864 return NULL;
865 }
866
867 /*
868 * helpers for osd request op vectors.
869 */
870 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
871 int opcode, u32 payload_len)
872 {
873 struct ceph_osd_req_op *ops;
874
875 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
876 if (!ops)
877 return NULL;
878
879 ops[0].op = opcode;
880
881 /*
882 * op extent offset and length will be set later on
883 * in calc_raw_layout()
884 */
885 ops[0].payload_len = payload_len;
886
887 return ops;
888 }
889
890 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
891 {
892 kfree(ops);
893 }
894
895 static void rbd_coll_end_req_index(struct request *rq,
896 struct rbd_req_coll *coll,
897 int index,
898 int ret, u64 len)
899 {
900 struct request_queue *q;
901 int min, max, i;
902
903 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
904 coll, index, ret, (unsigned long long) len);
905
906 if (!rq)
907 return;
908
909 if (!coll) {
910 blk_end_request(rq, ret, len);
911 return;
912 }
913
914 q = rq->q;
915
916 spin_lock_irq(q->queue_lock);
917 coll->status[index].done = 1;
918 coll->status[index].rc = ret;
919 coll->status[index].bytes = len;
920 max = min = coll->num_done;
921 while (max < coll->total && coll->status[max].done)
922 max++;
923
924 for (i = min; i<max; i++) {
925 __blk_end_request(rq, coll->status[i].rc,
926 coll->status[i].bytes);
927 coll->num_done++;
928 kref_put(&coll->kref, rbd_coll_release);
929 }
930 spin_unlock_irq(q->queue_lock);
931 }
932
933 static void rbd_coll_end_req(struct rbd_request *req,
934 int ret, u64 len)
935 {
936 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
937 }
938
939 /*
940 * Send ceph osd request
941 */
942 static int rbd_do_request(struct request *rq,
943 struct rbd_device *rbd_dev,
944 struct ceph_snap_context *snapc,
945 u64 snapid,
946 const char *object_name, u64 ofs, u64 len,
947 struct bio *bio,
948 struct page **pages,
949 int num_pages,
950 int flags,
951 struct ceph_osd_req_op *ops,
952 struct rbd_req_coll *coll,
953 int coll_index,
954 void (*rbd_cb)(struct ceph_osd_request *req,
955 struct ceph_msg *msg),
956 struct ceph_osd_request **linger_req,
957 u64 *ver)
958 {
959 struct ceph_osd_request *req;
960 struct ceph_file_layout *layout;
961 int ret;
962 u64 bno;
963 struct timespec mtime = CURRENT_TIME;
964 struct rbd_request *req_data;
965 struct ceph_osd_request_head *reqhead;
966 struct ceph_osd_client *osdc;
967
968 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
969 if (!req_data) {
970 if (coll)
971 rbd_coll_end_req_index(rq, coll, coll_index,
972 -ENOMEM, len);
973 return -ENOMEM;
974 }
975
976 if (coll) {
977 req_data->coll = coll;
978 req_data->coll_index = coll_index;
979 }
980
981 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
982 (unsigned long long) ofs, (unsigned long long) len);
983
984 osdc = &rbd_dev->rbd_client->client->osdc;
985 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
986 false, GFP_NOIO, pages, bio);
987 if (!req) {
988 ret = -ENOMEM;
989 goto done_pages;
990 }
991
992 req->r_callback = rbd_cb;
993
994 req_data->rq = rq;
995 req_data->bio = bio;
996 req_data->pages = pages;
997 req_data->len = len;
998
999 req->r_priv = req_data;
1000
1001 reqhead = req->r_request->front.iov_base;
1002 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1003
1004 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1005 req->r_oid_len = strlen(req->r_oid);
1006
1007 layout = &req->r_file_layout;
1008 memset(layout, 0, sizeof(*layout));
1009 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1010 layout->fl_stripe_count = cpu_to_le32(1);
1011 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1012 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1013 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1014 req, ops);
1015
1016 ceph_osdc_build_request(req, ofs, &len,
1017 ops,
1018 snapc,
1019 &mtime,
1020 req->r_oid, req->r_oid_len);
1021
1022 if (linger_req) {
1023 ceph_osdc_set_request_linger(osdc, req);
1024 *linger_req = req;
1025 }
1026
1027 ret = ceph_osdc_start_request(osdc, req, false);
1028 if (ret < 0)
1029 goto done_err;
1030
1031 if (!rbd_cb) {
1032 ret = ceph_osdc_wait_request(osdc, req);
1033 if (ver)
1034 *ver = le64_to_cpu(req->r_reassert_version.version);
1035 dout("reassert_ver=%llu\n",
1036 (unsigned long long)
1037 le64_to_cpu(req->r_reassert_version.version));
1038 ceph_osdc_put_request(req);
1039 }
1040 return ret;
1041
1042 done_err:
1043 bio_chain_put(req_data->bio);
1044 ceph_osdc_put_request(req);
1045 done_pages:
1046 rbd_coll_end_req(req_data, ret, len);
1047 kfree(req_data);
1048 return ret;
1049 }
1050
1051 /*
1052 * Ceph osd op callback
1053 */
1054 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1055 {
1056 struct rbd_request *req_data = req->r_priv;
1057 struct ceph_osd_reply_head *replyhead;
1058 struct ceph_osd_op *op;
1059 __s32 rc;
1060 u64 bytes;
1061 int read_op;
1062
1063 /* parse reply */
1064 replyhead = msg->front.iov_base;
1065 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1066 op = (void *)(replyhead + 1);
1067 rc = le32_to_cpu(replyhead->result);
1068 bytes = le64_to_cpu(op->extent.length);
1069 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1070
1071 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1072 (unsigned long long) bytes, read_op, (int) rc);
1073
1074 if (rc == -ENOENT && read_op) {
1075 zero_bio_chain(req_data->bio, 0);
1076 rc = 0;
1077 } else if (rc == 0 && read_op && bytes < req_data->len) {
1078 zero_bio_chain(req_data->bio, bytes);
1079 bytes = req_data->len;
1080 }
1081
1082 rbd_coll_end_req(req_data, rc, bytes);
1083
1084 if (req_data->bio)
1085 bio_chain_put(req_data->bio);
1086
1087 ceph_osdc_put_request(req);
1088 kfree(req_data);
1089 }
1090
1091 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1092 {
1093 ceph_osdc_put_request(req);
1094 }
1095
1096 /*
1097 * Do a synchronous ceph osd operation
1098 */
1099 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1100 struct ceph_snap_context *snapc,
1101 u64 snapid,
1102 int flags,
1103 struct ceph_osd_req_op *ops,
1104 const char *object_name,
1105 u64 ofs, u64 inbound_size,
1106 char *inbound,
1107 struct ceph_osd_request **linger_req,
1108 u64 *ver)
1109 {
1110 int ret;
1111 struct page **pages;
1112 int num_pages;
1113
1114 rbd_assert(ops != NULL);
1115
1116 num_pages = calc_pages_for(ofs, inbound_size);
1117 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1118 if (IS_ERR(pages))
1119 return PTR_ERR(pages);
1120
1121 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1122 object_name, ofs, inbound_size, NULL,
1123 pages, num_pages,
1124 flags,
1125 ops,
1126 NULL, 0,
1127 NULL,
1128 linger_req, ver);
1129 if (ret < 0)
1130 goto done;
1131
1132 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1133 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1134
1135 done:
1136 ceph_release_page_vector(pages, num_pages);
1137 return ret;
1138 }
1139
1140 /*
1141 * Do an asynchronous ceph osd operation
1142 */
1143 static int rbd_do_op(struct request *rq,
1144 struct rbd_device *rbd_dev,
1145 struct ceph_snap_context *snapc,
1146 u64 snapid,
1147 int opcode, int flags,
1148 u64 ofs, u64 len,
1149 struct bio *bio,
1150 struct rbd_req_coll *coll,
1151 int coll_index)
1152 {
1153 char *seg_name;
1154 u64 seg_ofs;
1155 u64 seg_len;
1156 int ret;
1157 struct ceph_osd_req_op *ops;
1158 u32 payload_len;
1159
1160 seg_name = rbd_segment_name(rbd_dev, ofs);
1161 if (!seg_name)
1162 return -ENOMEM;
1163 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1164 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1165
1166 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1167
1168 ret = -ENOMEM;
1169 ops = rbd_create_rw_ops(1, opcode, payload_len);
1170 if (!ops)
1171 goto done;
1172
1173 /* we've taken care of segment sizes earlier when we
1174 cloned the bios. We should never have a segment
1175 truncated at this point */
1176 rbd_assert(seg_len == len);
1177
1178 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1179 seg_name, seg_ofs, seg_len,
1180 bio,
1181 NULL, 0,
1182 flags,
1183 ops,
1184 coll, coll_index,
1185 rbd_req_cb, 0, NULL);
1186
1187 rbd_destroy_ops(ops);
1188 done:
1189 kfree(seg_name);
1190 return ret;
1191 }
1192
1193 /*
1194 * Request async osd write
1195 */
1196 static int rbd_req_write(struct request *rq,
1197 struct rbd_device *rbd_dev,
1198 struct ceph_snap_context *snapc,
1199 u64 ofs, u64 len,
1200 struct bio *bio,
1201 struct rbd_req_coll *coll,
1202 int coll_index)
1203 {
1204 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1205 CEPH_OSD_OP_WRITE,
1206 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1207 ofs, len, bio, coll, coll_index);
1208 }
1209
1210 /*
1211 * Request async osd read
1212 */
1213 static int rbd_req_read(struct request *rq,
1214 struct rbd_device *rbd_dev,
1215 u64 snapid,
1216 u64 ofs, u64 len,
1217 struct bio *bio,
1218 struct rbd_req_coll *coll,
1219 int coll_index)
1220 {
1221 return rbd_do_op(rq, rbd_dev, NULL,
1222 snapid,
1223 CEPH_OSD_OP_READ,
1224 CEPH_OSD_FLAG_READ,
1225 ofs, len, bio, coll, coll_index);
1226 }
1227
1228 /*
1229 * Request sync osd read
1230 */
1231 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1232 u64 snapid,
1233 const char *object_name,
1234 u64 ofs, u64 len,
1235 char *buf,
1236 u64 *ver)
1237 {
1238 struct ceph_osd_req_op *ops;
1239 int ret;
1240
1241 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1242 if (!ops)
1243 return -ENOMEM;
1244
1245 ret = rbd_req_sync_op(rbd_dev, NULL,
1246 snapid,
1247 CEPH_OSD_FLAG_READ,
1248 ops, object_name, ofs, len, buf, NULL, ver);
1249 rbd_destroy_ops(ops);
1250
1251 return ret;
1252 }
1253
1254 /*
1255 * Request sync osd watch
1256 */
1257 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1258 u64 ver,
1259 u64 notify_id)
1260 {
1261 struct ceph_osd_req_op *ops;
1262 int ret;
1263
1264 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1265 if (!ops)
1266 return -ENOMEM;
1267
1268 ops[0].watch.ver = cpu_to_le64(ver);
1269 ops[0].watch.cookie = notify_id;
1270 ops[0].watch.flag = 0;
1271
1272 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1273 rbd_dev->header_name, 0, 0, NULL,
1274 NULL, 0,
1275 CEPH_OSD_FLAG_READ,
1276 ops,
1277 NULL, 0,
1278 rbd_simple_req_cb, 0, NULL);
1279
1280 rbd_destroy_ops(ops);
1281 return ret;
1282 }
1283
1284 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1285 {
1286 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1287 u64 hver;
1288 int rc;
1289
1290 if (!rbd_dev)
1291 return;
1292
1293 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1294 rbd_dev->header_name, (unsigned long long) notify_id,
1295 (unsigned int) opcode);
1296 rc = rbd_refresh_header(rbd_dev, &hver);
1297 if (rc)
1298 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1299 " update snaps: %d\n", rbd_dev->major, rc);
1300
1301 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1302 }
1303
1304 /*
1305 * Request sync osd watch
1306 */
1307 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1308 {
1309 struct ceph_osd_req_op *ops;
1310 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1311 int ret;
1312
1313 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1314 if (!ops)
1315 return -ENOMEM;
1316
1317 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1318 (void *)rbd_dev, &rbd_dev->watch_event);
1319 if (ret < 0)
1320 goto fail;
1321
1322 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1323 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1324 ops[0].watch.flag = 1;
1325
1326 ret = rbd_req_sync_op(rbd_dev, NULL,
1327 CEPH_NOSNAP,
1328 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1329 ops,
1330 rbd_dev->header_name,
1331 0, 0, NULL,
1332 &rbd_dev->watch_request, NULL);
1333
1334 if (ret < 0)
1335 goto fail_event;
1336
1337 rbd_destroy_ops(ops);
1338 return 0;
1339
1340 fail_event:
1341 ceph_osdc_cancel_event(rbd_dev->watch_event);
1342 rbd_dev->watch_event = NULL;
1343 fail:
1344 rbd_destroy_ops(ops);
1345 return ret;
1346 }
1347
1348 /*
1349 * Request sync osd unwatch
1350 */
1351 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1352 {
1353 struct ceph_osd_req_op *ops;
1354 int ret;
1355
1356 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1357 if (!ops)
1358 return -ENOMEM;
1359
1360 ops[0].watch.ver = 0;
1361 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1362 ops[0].watch.flag = 0;
1363
1364 ret = rbd_req_sync_op(rbd_dev, NULL,
1365 CEPH_NOSNAP,
1366 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1367 ops,
1368 rbd_dev->header_name,
1369 0, 0, NULL, NULL, NULL);
1370
1371
1372 rbd_destroy_ops(ops);
1373 ceph_osdc_cancel_event(rbd_dev->watch_event);
1374 rbd_dev->watch_event = NULL;
1375 return ret;
1376 }
1377
1378 struct rbd_notify_info {
1379 struct rbd_device *rbd_dev;
1380 };
1381
1382 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1383 {
1384 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1385 if (!rbd_dev)
1386 return;
1387
1388 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1389 rbd_dev->header_name, (unsigned long long) notify_id,
1390 (unsigned int) opcode);
1391 }
1392
1393 /*
1394 * Request sync osd notify
1395 */
1396 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1397 {
1398 struct ceph_osd_req_op *ops;
1399 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1400 struct ceph_osd_event *event;
1401 struct rbd_notify_info info;
1402 int payload_len = sizeof(u32) + sizeof(u32);
1403 int ret;
1404
1405 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1406 if (!ops)
1407 return -ENOMEM;
1408
1409 info.rbd_dev = rbd_dev;
1410
1411 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1412 (void *)&info, &event);
1413 if (ret < 0)
1414 goto fail;
1415
1416 ops[0].watch.ver = 1;
1417 ops[0].watch.flag = 1;
1418 ops[0].watch.cookie = event->cookie;
1419 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1420 ops[0].watch.timeout = 12;
1421
1422 ret = rbd_req_sync_op(rbd_dev, NULL,
1423 CEPH_NOSNAP,
1424 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1425 ops,
1426 rbd_dev->header_name,
1427 0, 0, NULL, NULL, NULL);
1428 if (ret < 0)
1429 goto fail_event;
1430
1431 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1432 dout("ceph_osdc_wait_event returned %d\n", ret);
1433 rbd_destroy_ops(ops);
1434 return 0;
1435
1436 fail_event:
1437 ceph_osdc_cancel_event(event);
1438 fail:
1439 rbd_destroy_ops(ops);
1440 return ret;
1441 }
1442
1443 /*
1444 * Synchronous osd object method call
1445 */
1446 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1447 const char *object_name,
1448 const char *class_name,
1449 const char *method_name,
1450 const char *outbound,
1451 size_t outbound_size,
1452 char *inbound,
1453 size_t inbound_size,
1454 int flags,
1455 u64 *ver)
1456 {
1457 struct ceph_osd_req_op *ops;
1458 int class_name_len = strlen(class_name);
1459 int method_name_len = strlen(method_name);
1460 int payload_size;
1461 int ret;
1462
1463 /*
1464 * Any input parameters required by the method we're calling
1465 * will be sent along with the class and method names as
1466 * part of the message payload. That data and its size are
1467 * supplied via the indata and indata_len fields (named from
1468 * the perspective of the server side) in the OSD request
1469 * operation.
1470 */
1471 payload_size = class_name_len + method_name_len + outbound_size;
1472 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1473 if (!ops)
1474 return -ENOMEM;
1475
1476 ops[0].cls.class_name = class_name;
1477 ops[0].cls.class_len = (__u8) class_name_len;
1478 ops[0].cls.method_name = method_name;
1479 ops[0].cls.method_len = (__u8) method_name_len;
1480 ops[0].cls.argc = 0;
1481 ops[0].cls.indata = outbound;
1482 ops[0].cls.indata_len = outbound_size;
1483
1484 ret = rbd_req_sync_op(rbd_dev, NULL,
1485 CEPH_NOSNAP,
1486 flags, ops,
1487 object_name, 0, inbound_size, inbound,
1488 NULL, ver);
1489
1490 rbd_destroy_ops(ops);
1491
1492 dout("cls_exec returned %d\n", ret);
1493 return ret;
1494 }
1495
1496 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1497 {
1498 struct rbd_req_coll *coll =
1499 kzalloc(sizeof(struct rbd_req_coll) +
1500 sizeof(struct rbd_req_status) * num_reqs,
1501 GFP_ATOMIC);
1502
1503 if (!coll)
1504 return NULL;
1505 coll->total = num_reqs;
1506 kref_init(&coll->kref);
1507 return coll;
1508 }
1509
1510 /*
1511 * block device queue callback
1512 */
1513 static void rbd_rq_fn(struct request_queue *q)
1514 {
1515 struct rbd_device *rbd_dev = q->queuedata;
1516 struct request *rq;
1517 struct bio_pair *bp = NULL;
1518
1519 while ((rq = blk_fetch_request(q))) {
1520 struct bio *bio;
1521 struct bio *rq_bio, *next_bio = NULL;
1522 bool do_write;
1523 unsigned int size;
1524 u64 op_size = 0;
1525 u64 ofs;
1526 int num_segs, cur_seg = 0;
1527 struct rbd_req_coll *coll;
1528 struct ceph_snap_context *snapc;
1529
1530 dout("fetched request\n");
1531
1532 /* filter out block requests we don't understand */
1533 if ((rq->cmd_type != REQ_TYPE_FS)) {
1534 __blk_end_request_all(rq, 0);
1535 continue;
1536 }
1537
1538 /* deduce our operation (read, write) */
1539 do_write = (rq_data_dir(rq) == WRITE);
1540
1541 size = blk_rq_bytes(rq);
1542 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1543 rq_bio = rq->bio;
1544 if (do_write && rbd_dev->mapping.read_only) {
1545 __blk_end_request_all(rq, -EROFS);
1546 continue;
1547 }
1548
1549 spin_unlock_irq(q->queue_lock);
1550
1551 down_read(&rbd_dev->header_rwsem);
1552
1553 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1554 !rbd_dev->mapping.snap_exists) {
1555 up_read(&rbd_dev->header_rwsem);
1556 dout("request for non-existent snapshot");
1557 spin_lock_irq(q->queue_lock);
1558 __blk_end_request_all(rq, -ENXIO);
1559 continue;
1560 }
1561
1562 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1563
1564 up_read(&rbd_dev->header_rwsem);
1565
1566 dout("%s 0x%x bytes at 0x%llx\n",
1567 do_write ? "write" : "read",
1568 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1569
1570 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1571 if (num_segs <= 0) {
1572 spin_lock_irq(q->queue_lock);
1573 __blk_end_request_all(rq, num_segs);
1574 ceph_put_snap_context(snapc);
1575 continue;
1576 }
1577 coll = rbd_alloc_coll(num_segs);
1578 if (!coll) {
1579 spin_lock_irq(q->queue_lock);
1580 __blk_end_request_all(rq, -ENOMEM);
1581 ceph_put_snap_context(snapc);
1582 continue;
1583 }
1584
1585 do {
1586 /* a bio clone to be passed down to OSD req */
1587 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1588 op_size = rbd_segment_length(rbd_dev, ofs, size);
1589 kref_get(&coll->kref);
1590 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1591 op_size, GFP_ATOMIC);
1592 if (!bio) {
1593 rbd_coll_end_req_index(rq, coll, cur_seg,
1594 -ENOMEM, op_size);
1595 goto next_seg;
1596 }
1597
1598
1599 /* init OSD command: write or read */
1600 if (do_write)
1601 rbd_req_write(rq, rbd_dev,
1602 snapc,
1603 ofs,
1604 op_size, bio,
1605 coll, cur_seg);
1606 else
1607 rbd_req_read(rq, rbd_dev,
1608 rbd_dev->mapping.snap_id,
1609 ofs,
1610 op_size, bio,
1611 coll, cur_seg);
1612
1613 next_seg:
1614 size -= op_size;
1615 ofs += op_size;
1616
1617 cur_seg++;
1618 rq_bio = next_bio;
1619 } while (size > 0);
1620 kref_put(&coll->kref, rbd_coll_release);
1621
1622 if (bp)
1623 bio_pair_release(bp);
1624 spin_lock_irq(q->queue_lock);
1625
1626 ceph_put_snap_context(snapc);
1627 }
1628 }
1629
1630 /*
1631 * a queue callback. Makes sure that we don't create a bio that spans across
1632 * multiple osd objects. One exception would be with a single page bios,
1633 * which we handle later at bio_chain_clone
1634 */
1635 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1636 struct bio_vec *bvec)
1637 {
1638 struct rbd_device *rbd_dev = q->queuedata;
1639 unsigned int chunk_sectors;
1640 sector_t sector;
1641 unsigned int bio_sectors;
1642 int max;
1643
1644 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1645 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1646 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1647
1648 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1649 + bio_sectors)) << SECTOR_SHIFT;
1650 if (max < 0)
1651 max = 0; /* bio_add cannot handle a negative return */
1652 if (max <= bvec->bv_len && bio_sectors == 0)
1653 return bvec->bv_len;
1654 return max;
1655 }
1656
1657 static void rbd_free_disk(struct rbd_device *rbd_dev)
1658 {
1659 struct gendisk *disk = rbd_dev->disk;
1660
1661 if (!disk)
1662 return;
1663
1664 if (disk->flags & GENHD_FL_UP)
1665 del_gendisk(disk);
1666 if (disk->queue)
1667 blk_cleanup_queue(disk->queue);
1668 put_disk(disk);
1669 }
1670
1671 /*
1672 * Read the complete header for the given rbd device.
1673 *
1674 * Returns a pointer to a dynamically-allocated buffer containing
1675 * the complete and validated header. Caller can pass the address
1676 * of a variable that will be filled in with the version of the
1677 * header object at the time it was read.
1678 *
1679 * Returns a pointer-coded errno if a failure occurs.
1680 */
1681 static struct rbd_image_header_ondisk *
1682 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1683 {
1684 struct rbd_image_header_ondisk *ondisk = NULL;
1685 u32 snap_count = 0;
1686 u64 names_size = 0;
1687 u32 want_count;
1688 int ret;
1689
1690 /*
1691 * The complete header will include an array of its 64-bit
1692 * snapshot ids, followed by the names of those snapshots as
1693 * a contiguous block of NUL-terminated strings. Note that
1694 * the number of snapshots could change by the time we read
1695 * it in, in which case we re-read it.
1696 */
1697 do {
1698 size_t size;
1699
1700 kfree(ondisk);
1701
1702 size = sizeof (*ondisk);
1703 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1704 size += names_size;
1705 ondisk = kmalloc(size, GFP_KERNEL);
1706 if (!ondisk)
1707 return ERR_PTR(-ENOMEM);
1708
1709 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1710 rbd_dev->header_name,
1711 0, size,
1712 (char *) ondisk, version);
1713
1714 if (ret < 0)
1715 goto out_err;
1716 if (WARN_ON((size_t) ret < size)) {
1717 ret = -ENXIO;
1718 pr_warning("short header read for image %s"
1719 " (want %zd got %d)\n",
1720 rbd_dev->image_name, size, ret);
1721 goto out_err;
1722 }
1723 if (!rbd_dev_ondisk_valid(ondisk)) {
1724 ret = -ENXIO;
1725 pr_warning("invalid header for image %s\n",
1726 rbd_dev->image_name);
1727 goto out_err;
1728 }
1729
1730 names_size = le64_to_cpu(ondisk->snap_names_len);
1731 want_count = snap_count;
1732 snap_count = le32_to_cpu(ondisk->snap_count);
1733 } while (snap_count != want_count);
1734
1735 return ondisk;
1736
1737 out_err:
1738 kfree(ondisk);
1739
1740 return ERR_PTR(ret);
1741 }
1742
1743 /*
1744 * reload the ondisk the header
1745 */
1746 static int rbd_read_header(struct rbd_device *rbd_dev,
1747 struct rbd_image_header *header)
1748 {
1749 struct rbd_image_header_ondisk *ondisk;
1750 u64 ver = 0;
1751 int ret;
1752
1753 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1754 if (IS_ERR(ondisk))
1755 return PTR_ERR(ondisk);
1756 ret = rbd_header_from_disk(header, ondisk);
1757 if (ret >= 0)
1758 header->obj_version = ver;
1759 kfree(ondisk);
1760
1761 return ret;
1762 }
1763
1764 /*
1765 * create a snapshot
1766 */
1767 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1768 const char *snap_name,
1769 gfp_t gfp_flags)
1770 {
1771 int name_len = strlen(snap_name);
1772 u64 new_snapid;
1773 int ret;
1774 void *data, *p, *e;
1775 struct ceph_mon_client *monc;
1776
1777 /* we should create a snapshot only if we're pointing at the head */
1778 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1779 return -EINVAL;
1780
1781 monc = &rbd_dev->rbd_client->client->monc;
1782 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1783 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1784 if (ret < 0)
1785 return ret;
1786
1787 data = kmalloc(name_len + 16, gfp_flags);
1788 if (!data)
1789 return -ENOMEM;
1790
1791 p = data;
1792 e = data + name_len + 16;
1793
1794 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1795 ceph_encode_64_safe(&p, e, new_snapid, bad);
1796
1797 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1798 "rbd", "snap_add",
1799 data, (size_t) (p - data), NULL, 0,
1800 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1801 NULL);
1802
1803 kfree(data);
1804
1805 return ret < 0 ? ret : 0;
1806 bad:
1807 return -ERANGE;
1808 }
1809
1810 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1811 {
1812 struct rbd_snap *snap;
1813 struct rbd_snap *next;
1814
1815 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1816 __rbd_remove_snap_dev(snap);
1817 }
1818
1819 /*
1820 * only read the first part of the ondisk header, without the snaps info
1821 */
1822 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1823 {
1824 int ret;
1825 struct rbd_image_header h;
1826
1827 ret = rbd_read_header(rbd_dev, &h);
1828 if (ret < 0)
1829 return ret;
1830
1831 down_write(&rbd_dev->header_rwsem);
1832
1833 /* resized? */
1834 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1835 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1836
1837 if (size != (sector_t) rbd_dev->mapping.size) {
1838 dout("setting size to %llu sectors",
1839 (unsigned long long) size);
1840 rbd_dev->mapping.size = (u64) size;
1841 set_capacity(rbd_dev->disk, size);
1842 }
1843 }
1844
1845 /* rbd_dev->header.object_prefix shouldn't change */
1846 kfree(rbd_dev->header.snap_sizes);
1847 kfree(rbd_dev->header.snap_names);
1848 /* osd requests may still refer to snapc */
1849 ceph_put_snap_context(rbd_dev->header.snapc);
1850
1851 if (hver)
1852 *hver = h.obj_version;
1853 rbd_dev->header.obj_version = h.obj_version;
1854 rbd_dev->header.image_size = h.image_size;
1855 rbd_dev->header.snapc = h.snapc;
1856 rbd_dev->header.snap_names = h.snap_names;
1857 rbd_dev->header.snap_sizes = h.snap_sizes;
1858 /* Free the extra copy of the object prefix */
1859 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1860 kfree(h.object_prefix);
1861
1862 ret = rbd_dev_snaps_update(rbd_dev);
1863 if (!ret)
1864 ret = rbd_dev_snaps_register(rbd_dev);
1865
1866 up_write(&rbd_dev->header_rwsem);
1867
1868 return ret;
1869 }
1870
1871 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1872 {
1873 int ret;
1874
1875 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1876 ret = __rbd_refresh_header(rbd_dev, hver);
1877 mutex_unlock(&ctl_mutex);
1878
1879 return ret;
1880 }
1881
1882 static int rbd_init_disk(struct rbd_device *rbd_dev)
1883 {
1884 struct gendisk *disk;
1885 struct request_queue *q;
1886 u64 segment_size;
1887
1888 /* create gendisk info */
1889 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1890 if (!disk)
1891 return -ENOMEM;
1892
1893 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1894 rbd_dev->dev_id);
1895 disk->major = rbd_dev->major;
1896 disk->first_minor = 0;
1897 disk->fops = &rbd_bd_ops;
1898 disk->private_data = rbd_dev;
1899
1900 /* init rq */
1901 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1902 if (!q)
1903 goto out_disk;
1904
1905 /* We use the default size, but let's be explicit about it. */
1906 blk_queue_physical_block_size(q, SECTOR_SIZE);
1907
1908 /* set io sizes to object size */
1909 segment_size = rbd_obj_bytes(&rbd_dev->header);
1910 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1911 blk_queue_max_segment_size(q, segment_size);
1912 blk_queue_io_min(q, segment_size);
1913 blk_queue_io_opt(q, segment_size);
1914
1915 blk_queue_merge_bvec(q, rbd_merge_bvec);
1916 disk->queue = q;
1917
1918 q->queuedata = rbd_dev;
1919
1920 rbd_dev->disk = disk;
1921
1922 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1923
1924 return 0;
1925 out_disk:
1926 put_disk(disk);
1927
1928 return -ENOMEM;
1929 }
1930
1931 /*
1932 sysfs
1933 */
1934
1935 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1936 {
1937 return container_of(dev, struct rbd_device, dev);
1938 }
1939
1940 static ssize_t rbd_size_show(struct device *dev,
1941 struct device_attribute *attr, char *buf)
1942 {
1943 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1944 sector_t size;
1945
1946 down_read(&rbd_dev->header_rwsem);
1947 size = get_capacity(rbd_dev->disk);
1948 up_read(&rbd_dev->header_rwsem);
1949
1950 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1951 }
1952
1953 static ssize_t rbd_major_show(struct device *dev,
1954 struct device_attribute *attr, char *buf)
1955 {
1956 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1957
1958 return sprintf(buf, "%d\n", rbd_dev->major);
1959 }
1960
1961 static ssize_t rbd_client_id_show(struct device *dev,
1962 struct device_attribute *attr, char *buf)
1963 {
1964 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1965
1966 return sprintf(buf, "client%lld\n",
1967 ceph_client_id(rbd_dev->rbd_client->client));
1968 }
1969
1970 static ssize_t rbd_pool_show(struct device *dev,
1971 struct device_attribute *attr, char *buf)
1972 {
1973 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1974
1975 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1976 }
1977
1978 static ssize_t rbd_pool_id_show(struct device *dev,
1979 struct device_attribute *attr, char *buf)
1980 {
1981 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1982
1983 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1984 }
1985
1986 static ssize_t rbd_name_show(struct device *dev,
1987 struct device_attribute *attr, char *buf)
1988 {
1989 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1990
1991 return sprintf(buf, "%s\n", rbd_dev->image_name);
1992 }
1993
1994 static ssize_t rbd_image_id_show(struct device *dev,
1995 struct device_attribute *attr, char *buf)
1996 {
1997 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1998
1999 return sprintf(buf, "%s\n", rbd_dev->image_id);
2000 }
2001
2002 static ssize_t rbd_snap_show(struct device *dev,
2003 struct device_attribute *attr,
2004 char *buf)
2005 {
2006 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2007
2008 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2009 }
2010
2011 static ssize_t rbd_image_refresh(struct device *dev,
2012 struct device_attribute *attr,
2013 const char *buf,
2014 size_t size)
2015 {
2016 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2017 int ret;
2018
2019 ret = rbd_refresh_header(rbd_dev, NULL);
2020
2021 return ret < 0 ? ret : size;
2022 }
2023
2024 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2025 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2026 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2027 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2028 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2029 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2030 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2031 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2032 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2033 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2034
2035 static struct attribute *rbd_attrs[] = {
2036 &dev_attr_size.attr,
2037 &dev_attr_major.attr,
2038 &dev_attr_client_id.attr,
2039 &dev_attr_pool.attr,
2040 &dev_attr_pool_id.attr,
2041 &dev_attr_name.attr,
2042 &dev_attr_image_id.attr,
2043 &dev_attr_current_snap.attr,
2044 &dev_attr_refresh.attr,
2045 &dev_attr_create_snap.attr,
2046 NULL
2047 };
2048
2049 static struct attribute_group rbd_attr_group = {
2050 .attrs = rbd_attrs,
2051 };
2052
2053 static const struct attribute_group *rbd_attr_groups[] = {
2054 &rbd_attr_group,
2055 NULL
2056 };
2057
2058 static void rbd_sysfs_dev_release(struct device *dev)
2059 {
2060 }
2061
2062 static struct device_type rbd_device_type = {
2063 .name = "rbd",
2064 .groups = rbd_attr_groups,
2065 .release = rbd_sysfs_dev_release,
2066 };
2067
2068
2069 /*
2070 sysfs - snapshots
2071 */
2072
2073 static ssize_t rbd_snap_size_show(struct device *dev,
2074 struct device_attribute *attr,
2075 char *buf)
2076 {
2077 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2078
2079 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2080 }
2081
2082 static ssize_t rbd_snap_id_show(struct device *dev,
2083 struct device_attribute *attr,
2084 char *buf)
2085 {
2086 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2087
2088 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2089 }
2090
2091 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2092 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2093
2094 static struct attribute *rbd_snap_attrs[] = {
2095 &dev_attr_snap_size.attr,
2096 &dev_attr_snap_id.attr,
2097 NULL,
2098 };
2099
2100 static struct attribute_group rbd_snap_attr_group = {
2101 .attrs = rbd_snap_attrs,
2102 };
2103
2104 static void rbd_snap_dev_release(struct device *dev)
2105 {
2106 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2107 kfree(snap->name);
2108 kfree(snap);
2109 }
2110
2111 static const struct attribute_group *rbd_snap_attr_groups[] = {
2112 &rbd_snap_attr_group,
2113 NULL
2114 };
2115
2116 static struct device_type rbd_snap_device_type = {
2117 .groups = rbd_snap_attr_groups,
2118 .release = rbd_snap_dev_release,
2119 };
2120
2121 static bool rbd_snap_registered(struct rbd_snap *snap)
2122 {
2123 bool ret = snap->dev.type == &rbd_snap_device_type;
2124 bool reg = device_is_registered(&snap->dev);
2125
2126 rbd_assert(!ret ^ reg);
2127
2128 return ret;
2129 }
2130
2131 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2132 {
2133 list_del(&snap->node);
2134 if (device_is_registered(&snap->dev))
2135 device_unregister(&snap->dev);
2136 }
2137
2138 static int rbd_register_snap_dev(struct rbd_snap *snap,
2139 struct device *parent)
2140 {
2141 struct device *dev = &snap->dev;
2142 int ret;
2143
2144 dev->type = &rbd_snap_device_type;
2145 dev->parent = parent;
2146 dev->release = rbd_snap_dev_release;
2147 dev_set_name(dev, "snap_%s", snap->name);
2148 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2149
2150 ret = device_register(dev);
2151
2152 return ret;
2153 }
2154
2155 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2156 int i, const char *name)
2157 {
2158 struct rbd_snap *snap;
2159 int ret;
2160
2161 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2162 if (!snap)
2163 return ERR_PTR(-ENOMEM);
2164
2165 ret = -ENOMEM;
2166 snap->name = kstrdup(name, GFP_KERNEL);
2167 if (!snap->name)
2168 goto err;
2169
2170 snap->size = rbd_dev->header.snap_sizes[i];
2171 snap->id = rbd_dev->header.snapc->snaps[i];
2172
2173 return snap;
2174
2175 err:
2176 kfree(snap->name);
2177 kfree(snap);
2178
2179 return ERR_PTR(ret);
2180 }
2181
2182 /*
2183 * Scan the rbd device's current snapshot list and compare it to the
2184 * newly-received snapshot context. Remove any existing snapshots
2185 * not present in the new snapshot context. Add a new snapshot for
2186 * any snaphots in the snapshot context not in the current list.
2187 * And verify there are no changes to snapshots we already know
2188 * about.
2189 *
2190 * Assumes the snapshots in the snapshot context are sorted by
2191 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2192 * are also maintained in that order.)
2193 */
2194 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2195 {
2196 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2197 const u32 snap_count = snapc->num_snaps;
2198 char *snap_name = rbd_dev->header.snap_names;
2199 struct list_head *head = &rbd_dev->snaps;
2200 struct list_head *links = head->next;
2201 u32 index = 0;
2202
2203 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2204 while (index < snap_count || links != head) {
2205 u64 snap_id;
2206 struct rbd_snap *snap;
2207
2208 snap_id = index < snap_count ? snapc->snaps[index]
2209 : CEPH_NOSNAP;
2210 snap = links != head ? list_entry(links, struct rbd_snap, node)
2211 : NULL;
2212 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2213
2214 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2215 struct list_head *next = links->next;
2216
2217 /* Existing snapshot not in the new snap context */
2218
2219 if (rbd_dev->mapping.snap_id == snap->id)
2220 rbd_dev->mapping.snap_exists = false;
2221 __rbd_remove_snap_dev(snap);
2222 dout("%ssnap id %llu has been removed\n",
2223 rbd_dev->mapping.snap_id == snap->id ?
2224 "mapped " : "",
2225 (unsigned long long) snap->id);
2226
2227 /* Done with this list entry; advance */
2228
2229 links = next;
2230 continue;
2231 }
2232
2233 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2234 (unsigned long long) snap_id);
2235 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2236 struct rbd_snap *new_snap;
2237
2238 /* We haven't seen this snapshot before */
2239
2240 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2241 snap_name);
2242 if (IS_ERR(new_snap)) {
2243 int err = PTR_ERR(new_snap);
2244
2245 dout(" failed to add dev, error %d\n", err);
2246
2247 return err;
2248 }
2249
2250 /* New goes before existing, or at end of list */
2251
2252 dout(" added dev%s\n", snap ? "" : " at end\n");
2253 if (snap)
2254 list_add_tail(&new_snap->node, &snap->node);
2255 else
2256 list_add_tail(&new_snap->node, head);
2257 } else {
2258 /* Already have this one */
2259
2260 dout(" already present\n");
2261
2262 rbd_assert(snap->size ==
2263 rbd_dev->header.snap_sizes[index]);
2264 rbd_assert(!strcmp(snap->name, snap_name));
2265
2266 /* Done with this list entry; advance */
2267
2268 links = links->next;
2269 }
2270
2271 /* Advance to the next entry in the snapshot context */
2272
2273 index++;
2274 snap_name += strlen(snap_name) + 1;
2275 }
2276 dout("%s: done\n", __func__);
2277
2278 return 0;
2279 }
2280
2281 /*
2282 * Scan the list of snapshots and register the devices for any that
2283 * have not already been registered.
2284 */
2285 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2286 {
2287 struct rbd_snap *snap;
2288 int ret = 0;
2289
2290 dout("%s called\n", __func__);
2291 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2292 return -EIO;
2293
2294 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2295 if (!rbd_snap_registered(snap)) {
2296 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2297 if (ret < 0)
2298 break;
2299 }
2300 }
2301 dout("%s: returning %d\n", __func__, ret);
2302
2303 return ret;
2304 }
2305
2306 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2307 {
2308 struct device *dev;
2309 int ret;
2310
2311 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2312
2313 dev = &rbd_dev->dev;
2314 dev->bus = &rbd_bus_type;
2315 dev->type = &rbd_device_type;
2316 dev->parent = &rbd_root_dev;
2317 dev->release = rbd_dev_release;
2318 dev_set_name(dev, "%d", rbd_dev->dev_id);
2319 ret = device_register(dev);
2320
2321 mutex_unlock(&ctl_mutex);
2322
2323 return ret;
2324 }
2325
2326 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2327 {
2328 device_unregister(&rbd_dev->dev);
2329 }
2330
2331 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2332 {
2333 int ret, rc;
2334
2335 do {
2336 ret = rbd_req_sync_watch(rbd_dev);
2337 if (ret == -ERANGE) {
2338 rc = rbd_refresh_header(rbd_dev, NULL);
2339 if (rc < 0)
2340 return rc;
2341 }
2342 } while (ret == -ERANGE);
2343
2344 return ret;
2345 }
2346
2347 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2348
2349 /*
2350 * Get a unique rbd identifier for the given new rbd_dev, and add
2351 * the rbd_dev to the global list. The minimum rbd id is 1.
2352 */
2353 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2354 {
2355 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2356
2357 spin_lock(&rbd_dev_list_lock);
2358 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2359 spin_unlock(&rbd_dev_list_lock);
2360 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2361 (unsigned long long) rbd_dev->dev_id);
2362 }
2363
2364 /*
2365 * Remove an rbd_dev from the global list, and record that its
2366 * identifier is no longer in use.
2367 */
2368 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2369 {
2370 struct list_head *tmp;
2371 int rbd_id = rbd_dev->dev_id;
2372 int max_id;
2373
2374 rbd_assert(rbd_id > 0);
2375
2376 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2377 (unsigned long long) rbd_dev->dev_id);
2378 spin_lock(&rbd_dev_list_lock);
2379 list_del_init(&rbd_dev->node);
2380
2381 /*
2382 * If the id being "put" is not the current maximum, there
2383 * is nothing special we need to do.
2384 */
2385 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2386 spin_unlock(&rbd_dev_list_lock);
2387 return;
2388 }
2389
2390 /*
2391 * We need to update the current maximum id. Search the
2392 * list to find out what it is. We're more likely to find
2393 * the maximum at the end, so search the list backward.
2394 */
2395 max_id = 0;
2396 list_for_each_prev(tmp, &rbd_dev_list) {
2397 struct rbd_device *rbd_dev;
2398
2399 rbd_dev = list_entry(tmp, struct rbd_device, node);
2400 if (rbd_id > max_id)
2401 max_id = rbd_id;
2402 }
2403 spin_unlock(&rbd_dev_list_lock);
2404
2405 /*
2406 * The max id could have been updated by rbd_dev_id_get(), in
2407 * which case it now accurately reflects the new maximum.
2408 * Be careful not to overwrite the maximum value in that
2409 * case.
2410 */
2411 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2412 dout(" max dev id has been reset\n");
2413 }
2414
2415 /*
2416 * Skips over white space at *buf, and updates *buf to point to the
2417 * first found non-space character (if any). Returns the length of
2418 * the token (string of non-white space characters) found. Note
2419 * that *buf must be terminated with '\0'.
2420 */
2421 static inline size_t next_token(const char **buf)
2422 {
2423 /*
2424 * These are the characters that produce nonzero for
2425 * isspace() in the "C" and "POSIX" locales.
2426 */
2427 const char *spaces = " \f\n\r\t\v";
2428
2429 *buf += strspn(*buf, spaces); /* Find start of token */
2430
2431 return strcspn(*buf, spaces); /* Return token length */
2432 }
2433
2434 /*
2435 * Finds the next token in *buf, and if the provided token buffer is
2436 * big enough, copies the found token into it. The result, if
2437 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2438 * must be terminated with '\0' on entry.
2439 *
2440 * Returns the length of the token found (not including the '\0').
2441 * Return value will be 0 if no token is found, and it will be >=
2442 * token_size if the token would not fit.
2443 *
2444 * The *buf pointer will be updated to point beyond the end of the
2445 * found token. Note that this occurs even if the token buffer is
2446 * too small to hold it.
2447 */
2448 static inline size_t copy_token(const char **buf,
2449 char *token,
2450 size_t token_size)
2451 {
2452 size_t len;
2453
2454 len = next_token(buf);
2455 if (len < token_size) {
2456 memcpy(token, *buf, len);
2457 *(token + len) = '\0';
2458 }
2459 *buf += len;
2460
2461 return len;
2462 }
2463
2464 /*
2465 * Finds the next token in *buf, dynamically allocates a buffer big
2466 * enough to hold a copy of it, and copies the token into the new
2467 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2468 * that a duplicate buffer is created even for a zero-length token.
2469 *
2470 * Returns a pointer to the newly-allocated duplicate, or a null
2471 * pointer if memory for the duplicate was not available. If
2472 * the lenp argument is a non-null pointer, the length of the token
2473 * (not including the '\0') is returned in *lenp.
2474 *
2475 * If successful, the *buf pointer will be updated to point beyond
2476 * the end of the found token.
2477 *
2478 * Note: uses GFP_KERNEL for allocation.
2479 */
2480 static inline char *dup_token(const char **buf, size_t *lenp)
2481 {
2482 char *dup;
2483 size_t len;
2484
2485 len = next_token(buf);
2486 dup = kmalloc(len + 1, GFP_KERNEL);
2487 if (!dup)
2488 return NULL;
2489
2490 memcpy(dup, *buf, len);
2491 *(dup + len) = '\0';
2492 *buf += len;
2493
2494 if (lenp)
2495 *lenp = len;
2496
2497 return dup;
2498 }
2499
2500 /*
2501 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2502 * rbd_md_name, and name fields of the given rbd_dev, based on the
2503 * list of monitor addresses and other options provided via
2504 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2505 * copy of the snapshot name to map if successful, or a
2506 * pointer-coded error otherwise.
2507 *
2508 * Note: rbd_dev is assumed to have been initially zero-filled.
2509 */
2510 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2511 const char *buf,
2512 const char **mon_addrs,
2513 size_t *mon_addrs_size,
2514 char *options,
2515 size_t options_size)
2516 {
2517 size_t len;
2518 char *err_ptr = ERR_PTR(-EINVAL);
2519 char *snap_name;
2520
2521 /* The first four tokens are required */
2522
2523 len = next_token(&buf);
2524 if (!len)
2525 return err_ptr;
2526 *mon_addrs_size = len + 1;
2527 *mon_addrs = buf;
2528
2529 buf += len;
2530
2531 len = copy_token(&buf, options, options_size);
2532 if (!len || len >= options_size)
2533 return err_ptr;
2534
2535 err_ptr = ERR_PTR(-ENOMEM);
2536 rbd_dev->pool_name = dup_token(&buf, NULL);
2537 if (!rbd_dev->pool_name)
2538 goto out_err;
2539
2540 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2541 if (!rbd_dev->image_name)
2542 goto out_err;
2543
2544 /* Snapshot name is optional */
2545 len = next_token(&buf);
2546 if (!len) {
2547 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2548 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2549 }
2550 snap_name = kmalloc(len + 1, GFP_KERNEL);
2551 if (!snap_name)
2552 goto out_err;
2553 memcpy(snap_name, buf, len);
2554 *(snap_name + len) = '\0';
2555
2556 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2557
2558 return snap_name;
2559
2560 out_err:
2561 kfree(rbd_dev->image_name);
2562 rbd_dev->image_name = NULL;
2563 rbd_dev->image_name_len = 0;
2564 kfree(rbd_dev->pool_name);
2565 rbd_dev->pool_name = NULL;
2566
2567 return err_ptr;
2568 }
2569
2570 /*
2571 * An rbd format 2 image has a unique identifier, distinct from the
2572 * name given to it by the user. Internally, that identifier is
2573 * what's used to specify the names of objects related to the image.
2574 *
2575 * A special "rbd id" object is used to map an rbd image name to its
2576 * id. If that object doesn't exist, then there is no v2 rbd image
2577 * with the supplied name.
2578 *
2579 * This function will record the given rbd_dev's image_id field if
2580 * it can be determined, and in that case will return 0. If any
2581 * errors occur a negative errno will be returned and the rbd_dev's
2582 * image_id field will be unchanged (and should be NULL).
2583 */
2584 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2585 {
2586 int ret;
2587 size_t size;
2588 char *object_name;
2589 void *response;
2590 void *p;
2591
2592 /*
2593 * First, see if the format 2 image id file exists, and if
2594 * so, get the image's persistent id from it.
2595 */
2596 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2597 object_name = kmalloc(size, GFP_NOIO);
2598 if (!object_name)
2599 return -ENOMEM;
2600 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2601 dout("rbd id object name is %s\n", object_name);
2602
2603 /* Response will be an encoded string, which includes a length */
2604
2605 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2606 response = kzalloc(size, GFP_NOIO);
2607 if (!response) {
2608 ret = -ENOMEM;
2609 goto out;
2610 }
2611
2612 ret = rbd_req_sync_exec(rbd_dev, object_name,
2613 "rbd", "get_id",
2614 NULL, 0,
2615 response, RBD_IMAGE_ID_LEN_MAX,
2616 CEPH_OSD_FLAG_READ, NULL);
2617 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2618 if (ret < 0)
2619 goto out;
2620
2621 p = response;
2622 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2623 p + RBD_IMAGE_ID_LEN_MAX,
2624 &rbd_dev->image_id_len,
2625 GFP_NOIO);
2626 if (IS_ERR(rbd_dev->image_id)) {
2627 ret = PTR_ERR(rbd_dev->image_id);
2628 rbd_dev->image_id = NULL;
2629 } else {
2630 dout("image_id is %s\n", rbd_dev->image_id);
2631 }
2632 out:
2633 kfree(response);
2634 kfree(object_name);
2635
2636 return ret;
2637 }
2638
2639 static ssize_t rbd_add(struct bus_type *bus,
2640 const char *buf,
2641 size_t count)
2642 {
2643 char *options;
2644 struct rbd_device *rbd_dev = NULL;
2645 const char *mon_addrs = NULL;
2646 size_t mon_addrs_size = 0;
2647 struct ceph_osd_client *osdc;
2648 int rc = -ENOMEM;
2649 char *snap_name;
2650
2651 if (!try_module_get(THIS_MODULE))
2652 return -ENODEV;
2653
2654 options = kmalloc(count, GFP_KERNEL);
2655 if (!options)
2656 goto err_out_mem;
2657 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2658 if (!rbd_dev)
2659 goto err_out_mem;
2660
2661 /* static rbd_device initialization */
2662 spin_lock_init(&rbd_dev->lock);
2663 INIT_LIST_HEAD(&rbd_dev->node);
2664 INIT_LIST_HEAD(&rbd_dev->snaps);
2665 init_rwsem(&rbd_dev->header_rwsem);
2666
2667 /* parse add command */
2668 snap_name = rbd_add_parse_args(rbd_dev, buf,
2669 &mon_addrs, &mon_addrs_size, options, count);
2670 if (IS_ERR(snap_name)) {
2671 rc = PTR_ERR(snap_name);
2672 goto err_out_mem;
2673 }
2674
2675 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2676 if (rc < 0)
2677 goto err_out_args;
2678
2679 /* pick the pool */
2680 osdc = &rbd_dev->rbd_client->client->osdc;
2681 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2682 if (rc < 0)
2683 goto err_out_client;
2684 rbd_dev->pool_id = rc;
2685
2686 rc = rbd_dev_image_id(rbd_dev);
2687 if (!rc) {
2688 rc = -ENOTSUPP; /* Not actually supporting format 2 yet */
2689 goto err_out_client;
2690 }
2691
2692 /* Version 1 images have no id; empty string is used */
2693
2694 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2695 if (!rbd_dev->image_id) {
2696 rc = -ENOMEM;
2697 goto err_out_client;
2698 }
2699 rbd_dev->image_id_len = 0;
2700
2701 /* Create the name of the header object */
2702
2703 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2704 + sizeof (RBD_SUFFIX),
2705 GFP_KERNEL);
2706 if (!rbd_dev->header_name)
2707 goto err_out_client;
2708 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2709
2710 /* Get information about the image being mapped */
2711
2712 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
2713 if (rc)
2714 goto err_out_client;
2715
2716 /* no need to lock here, as rbd_dev is not registered yet */
2717 rc = rbd_dev_snaps_update(rbd_dev);
2718 if (rc)
2719 goto err_out_header;
2720
2721 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2722 if (rc)
2723 goto err_out_header;
2724
2725 /* generate unique id: find highest unique id, add one */
2726 rbd_dev_id_get(rbd_dev);
2727
2728 /* Fill in the device name, now that we have its id. */
2729 BUILD_BUG_ON(DEV_NAME_LEN
2730 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2731 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2732
2733 /* Get our block major device number. */
2734
2735 rc = register_blkdev(0, rbd_dev->name);
2736 if (rc < 0)
2737 goto err_out_id;
2738 rbd_dev->major = rc;
2739
2740 /* Set up the blkdev mapping. */
2741
2742 rc = rbd_init_disk(rbd_dev);
2743 if (rc)
2744 goto err_out_blkdev;
2745
2746 rc = rbd_bus_add_dev(rbd_dev);
2747 if (rc)
2748 goto err_out_disk;
2749
2750 /*
2751 * At this point cleanup in the event of an error is the job
2752 * of the sysfs code (initiated by rbd_bus_del_dev()).
2753 */
2754
2755 down_write(&rbd_dev->header_rwsem);
2756 rc = rbd_dev_snaps_register(rbd_dev);
2757 up_write(&rbd_dev->header_rwsem);
2758 if (rc)
2759 goto err_out_bus;
2760
2761 rc = rbd_init_watch_dev(rbd_dev);
2762 if (rc)
2763 goto err_out_bus;
2764
2765 /* Everything's ready. Announce the disk to the world. */
2766
2767 add_disk(rbd_dev->disk);
2768
2769 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2770 (unsigned long long) rbd_dev->mapping.size);
2771
2772 return count;
2773
2774 err_out_bus:
2775 /* this will also clean up rest of rbd_dev stuff */
2776
2777 rbd_bus_del_dev(rbd_dev);
2778 kfree(options);
2779 return rc;
2780
2781 err_out_disk:
2782 rbd_free_disk(rbd_dev);
2783 err_out_blkdev:
2784 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2785 err_out_id:
2786 rbd_dev_id_put(rbd_dev);
2787 err_out_header:
2788 rbd_header_free(&rbd_dev->header);
2789 err_out_client:
2790 kfree(rbd_dev->header_name);
2791 rbd_put_client(rbd_dev);
2792 kfree(rbd_dev->image_id);
2793 err_out_args:
2794 kfree(rbd_dev->mapping.snap_name);
2795 kfree(rbd_dev->image_name);
2796 kfree(rbd_dev->pool_name);
2797 err_out_mem:
2798 kfree(rbd_dev);
2799 kfree(options);
2800
2801 dout("Error adding device %s\n", buf);
2802 module_put(THIS_MODULE);
2803
2804 return (ssize_t) rc;
2805 }
2806
2807 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2808 {
2809 struct list_head *tmp;
2810 struct rbd_device *rbd_dev;
2811
2812 spin_lock(&rbd_dev_list_lock);
2813 list_for_each(tmp, &rbd_dev_list) {
2814 rbd_dev = list_entry(tmp, struct rbd_device, node);
2815 if (rbd_dev->dev_id == dev_id) {
2816 spin_unlock(&rbd_dev_list_lock);
2817 return rbd_dev;
2818 }
2819 }
2820 spin_unlock(&rbd_dev_list_lock);
2821 return NULL;
2822 }
2823
2824 static void rbd_dev_release(struct device *dev)
2825 {
2826 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2827
2828 if (rbd_dev->watch_request) {
2829 struct ceph_client *client = rbd_dev->rbd_client->client;
2830
2831 ceph_osdc_unregister_linger_request(&client->osdc,
2832 rbd_dev->watch_request);
2833 }
2834 if (rbd_dev->watch_event)
2835 rbd_req_sync_unwatch(rbd_dev);
2836
2837 rbd_put_client(rbd_dev);
2838
2839 /* clean up and free blkdev */
2840 rbd_free_disk(rbd_dev);
2841 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2842
2843 /* release allocated disk header fields */
2844 rbd_header_free(&rbd_dev->header);
2845
2846 /* done with the id, and with the rbd_dev */
2847 kfree(rbd_dev->mapping.snap_name);
2848 kfree(rbd_dev->image_id);
2849 kfree(rbd_dev->header_name);
2850 kfree(rbd_dev->pool_name);
2851 kfree(rbd_dev->image_name);
2852 rbd_dev_id_put(rbd_dev);
2853 kfree(rbd_dev);
2854
2855 /* release module ref */
2856 module_put(THIS_MODULE);
2857 }
2858
2859 static ssize_t rbd_remove(struct bus_type *bus,
2860 const char *buf,
2861 size_t count)
2862 {
2863 struct rbd_device *rbd_dev = NULL;
2864 int target_id, rc;
2865 unsigned long ul;
2866 int ret = count;
2867
2868 rc = strict_strtoul(buf, 10, &ul);
2869 if (rc)
2870 return rc;
2871
2872 /* convert to int; abort if we lost anything in the conversion */
2873 target_id = (int) ul;
2874 if (target_id != ul)
2875 return -EINVAL;
2876
2877 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2878
2879 rbd_dev = __rbd_get_dev(target_id);
2880 if (!rbd_dev) {
2881 ret = -ENOENT;
2882 goto done;
2883 }
2884
2885 __rbd_remove_all_snaps(rbd_dev);
2886 rbd_bus_del_dev(rbd_dev);
2887
2888 done:
2889 mutex_unlock(&ctl_mutex);
2890
2891 return ret;
2892 }
2893
2894 static ssize_t rbd_snap_add(struct device *dev,
2895 struct device_attribute *attr,
2896 const char *buf,
2897 size_t count)
2898 {
2899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2900 int ret;
2901 char *name = kmalloc(count + 1, GFP_KERNEL);
2902 if (!name)
2903 return -ENOMEM;
2904
2905 snprintf(name, count, "%s", buf);
2906
2907 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2908
2909 ret = rbd_header_add_snap(rbd_dev,
2910 name, GFP_KERNEL);
2911 if (ret < 0)
2912 goto err_unlock;
2913
2914 ret = __rbd_refresh_header(rbd_dev, NULL);
2915 if (ret < 0)
2916 goto err_unlock;
2917
2918 /* shouldn't hold ctl_mutex when notifying.. notify might
2919 trigger a watch callback that would need to get that mutex */
2920 mutex_unlock(&ctl_mutex);
2921
2922 /* make a best effort, don't error if failed */
2923 rbd_req_sync_notify(rbd_dev);
2924
2925 ret = count;
2926 kfree(name);
2927 return ret;
2928
2929 err_unlock:
2930 mutex_unlock(&ctl_mutex);
2931 kfree(name);
2932 return ret;
2933 }
2934
2935 /*
2936 * create control files in sysfs
2937 * /sys/bus/rbd/...
2938 */
2939 static int rbd_sysfs_init(void)
2940 {
2941 int ret;
2942
2943 ret = device_register(&rbd_root_dev);
2944 if (ret < 0)
2945 return ret;
2946
2947 ret = bus_register(&rbd_bus_type);
2948 if (ret < 0)
2949 device_unregister(&rbd_root_dev);
2950
2951 return ret;
2952 }
2953
2954 static void rbd_sysfs_cleanup(void)
2955 {
2956 bus_unregister(&rbd_bus_type);
2957 device_unregister(&rbd_root_dev);
2958 }
2959
2960 int __init rbd_init(void)
2961 {
2962 int rc;
2963
2964 rc = rbd_sysfs_init();
2965 if (rc)
2966 return rc;
2967 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2968 return 0;
2969 }
2970
2971 void __exit rbd_exit(void)
2972 {
2973 rbd_sysfs_cleanup();
2974 }
2975
2976 module_init(rbd_init);
2977 module_exit(rbd_exit);
2978
2979 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2980 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2981 MODULE_DESCRIPTION("rados block device");
2982
2983 /* following authorship retained from original osdblk.c */
2984 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2985
2986 MODULE_LICENSE("GPL");
This page took 0.16453 seconds and 5 git commands to generate.