68447d83288cc60f6d5276eb6664adfa2ac46f1f
[deliverable/linux.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
70
71 #define RBD_SNAP_HEAD_NAME "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING 1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL (0)
83
84 /*
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
89 */
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT false
94
95 /*
96 * block device image metadata (in-memory version)
97 */
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
100 char *object_prefix;
101 u64 features;
102 __u8 obj_order;
103 __u8 crypt_type;
104 __u8 comp_type;
105
106 /* The remaining fields need to be updated occasionally */
107 u64 image_size;
108 struct ceph_snap_context *snapc;
109 char *snap_names;
110 u64 *snap_sizes;
111
112 u64 obj_version;
113 };
114
115 struct rbd_options {
116 bool read_only;
117 };
118
119 /*
120 * an instance of the client. multiple devices may share an rbd client.
121 */
122 struct rbd_client {
123 struct ceph_client *client;
124 struct kref kref;
125 struct list_head node;
126 };
127
128 /*
129 * a request completion status
130 */
131 struct rbd_req_status {
132 int done;
133 int rc;
134 u64 bytes;
135 };
136
137 /*
138 * a collection of requests
139 */
140 struct rbd_req_coll {
141 int total;
142 int num_done;
143 struct kref kref;
144 struct rbd_req_status status[0];
145 };
146
147 /*
148 * a single io request
149 */
150 struct rbd_request {
151 struct request *rq; /* blk layer request */
152 struct bio *bio; /* cloned bio */
153 struct page **pages; /* list of used pages */
154 u64 len;
155 int coll_index;
156 struct rbd_req_coll *coll;
157 };
158
159 struct rbd_snap {
160 struct device dev;
161 const char *name;
162 u64 size;
163 struct list_head node;
164 u64 id;
165 u64 features;
166 };
167
168 struct rbd_mapping {
169 u64 size;
170 u64 features;
171 bool read_only;
172 };
173
174 /*
175 * a single device
176 */
177 struct rbd_device {
178 int dev_id; /* blkdev unique id */
179
180 int major; /* blkdev assigned major */
181 struct gendisk *disk; /* blkdev's gendisk and rq */
182
183 u32 image_format; /* Either 1 or 2 */
184 struct rbd_client *rbd_client;
185
186 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
187
188 spinlock_t lock; /* queue lock */
189
190 struct rbd_image_header header;
191 bool exists;
192 char *image_id;
193 size_t image_id_len;
194 char *image_name;
195 size_t image_name_len;
196 char *header_name;
197 char *pool_name;
198 u64 pool_id;
199
200 char *snap_name;
201 u64 snap_id;
202
203 struct ceph_osd_event *watch_event;
204 struct ceph_osd_request *watch_request;
205
206 /* protects updating the header */
207 struct rw_semaphore header_rwsem;
208
209 struct rbd_mapping mapping;
210
211 struct list_head node;
212
213 /* list of snapshots */
214 struct list_head snaps;
215
216 /* sysfs related */
217 struct device dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list); /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list); /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235 size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237 size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240 __ATTR(add, S_IWUSR, NULL, rbd_add),
241 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242 __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246 .name = "rbd",
247 .bus_attrs = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255 .init_name = "rbd",
256 .release = rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr) \
261 if (unlikely(!(expr))) { \
262 printk(KERN_ERR "\nAssertion failure in %s() " \
263 "at line %d:\n\n" \
264 "\trbd_assert(%s);\n\n", \
265 __func__, __LINE__, #expr); \
266 BUG(); \
267 }
268 #else /* !RBD_DEBUG */
269 # define rbd_assert(expr) ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274 return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279 put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290 return -EROFS;
291
292 rbd_get_dev(rbd_dev);
293 set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295 return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300 struct rbd_device *rbd_dev = disk->private_data;
301
302 rbd_put_dev(rbd_dev);
303
304 return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308 .owner = THIS_MODULE,
309 .open = rbd_open,
310 .release = rbd_release,
311 };
312
313 /*
314 * Initialize an rbd client instance.
315 * We own *ceph_opts.
316 */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319 struct rbd_client *rbdc;
320 int ret = -ENOMEM;
321
322 dout("rbd_client_create\n");
323 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324 if (!rbdc)
325 goto out_opt;
326
327 kref_init(&rbdc->kref);
328 INIT_LIST_HEAD(&rbdc->node);
329
330 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333 if (IS_ERR(rbdc->client))
334 goto out_mutex;
335 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337 ret = ceph_open_session(rbdc->client);
338 if (ret < 0)
339 goto out_err;
340
341 spin_lock(&rbd_client_list_lock);
342 list_add_tail(&rbdc->node, &rbd_client_list);
343 spin_unlock(&rbd_client_list_lock);
344
345 mutex_unlock(&ctl_mutex);
346
347 dout("rbd_client_create created %p\n", rbdc);
348 return rbdc;
349
350 out_err:
351 ceph_destroy_client(rbdc->client);
352 out_mutex:
353 mutex_unlock(&ctl_mutex);
354 kfree(rbdc);
355 out_opt:
356 if (ceph_opts)
357 ceph_destroy_options(ceph_opts);
358 return ERR_PTR(ret);
359 }
360
361 /*
362 * Find a ceph client with specific addr and configuration. If
363 * found, bump its reference count.
364 */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367 struct rbd_client *client_node;
368 bool found = false;
369
370 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371 return NULL;
372
373 spin_lock(&rbd_client_list_lock);
374 list_for_each_entry(client_node, &rbd_client_list, node) {
375 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376 kref_get(&client_node->kref);
377 found = true;
378 break;
379 }
380 }
381 spin_unlock(&rbd_client_list_lock);
382
383 return found ? client_node : NULL;
384 }
385
386 /*
387 * mount options
388 */
389 enum {
390 Opt_last_int,
391 /* int args above */
392 Opt_last_string,
393 /* string args above */
394 Opt_read_only,
395 Opt_read_write,
396 /* Boolean args above */
397 Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401 /* int args above */
402 /* string args above */
403 {Opt_read_only, "read_only"},
404 {Opt_read_only, "ro"}, /* Alternate spelling */
405 {Opt_read_write, "read_write"},
406 {Opt_read_write, "rw"}, /* Alternate spelling */
407 /* Boolean args above */
408 {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413 struct rbd_options *rbd_opts = private;
414 substring_t argstr[MAX_OPT_ARGS];
415 int token, intval, ret;
416
417 token = match_token(c, rbd_opts_tokens, argstr);
418 if (token < 0)
419 return -EINVAL;
420
421 if (token < Opt_last_int) {
422 ret = match_int(&argstr[0], &intval);
423 if (ret < 0) {
424 pr_err("bad mount option arg (not int) "
425 "at '%s'\n", c);
426 return ret;
427 }
428 dout("got int token %d val %d\n", token, intval);
429 } else if (token > Opt_last_int && token < Opt_last_string) {
430 dout("got string token %d val %s\n", token,
431 argstr[0].from);
432 } else if (token > Opt_last_string && token < Opt_last_bool) {
433 dout("got Boolean token %d\n", token);
434 } else {
435 dout("got token %d\n", token);
436 }
437
438 switch (token) {
439 case Opt_read_only:
440 rbd_opts->read_only = true;
441 break;
442 case Opt_read_write:
443 rbd_opts->read_only = false;
444 break;
445 default:
446 rbd_assert(false);
447 break;
448 }
449 return 0;
450 }
451
452 /*
453 * Get a ceph client with specific addr and configuration, if one does
454 * not exist create it.
455 */
456 static int rbd_get_client(struct rbd_device *rbd_dev,
457 struct ceph_options *ceph_opts)
458 {
459 struct rbd_client *rbdc;
460
461 rbdc = rbd_client_find(ceph_opts);
462 if (rbdc) {
463 /* using an existing client */
464 ceph_destroy_options(ceph_opts);
465 } else {
466 rbdc = rbd_client_create(ceph_opts);
467 if (IS_ERR(rbdc))
468 return PTR_ERR(rbdc);
469 }
470 rbd_dev->rbd_client = rbdc;
471
472 return 0;
473 }
474
475 /*
476 * Destroy ceph client
477 *
478 * Caller must hold rbd_client_list_lock.
479 */
480 static void rbd_client_release(struct kref *kref)
481 {
482 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
483
484 dout("rbd_release_client %p\n", rbdc);
485 spin_lock(&rbd_client_list_lock);
486 list_del(&rbdc->node);
487 spin_unlock(&rbd_client_list_lock);
488
489 ceph_destroy_client(rbdc->client);
490 kfree(rbdc);
491 }
492
493 /*
494 * Drop reference to ceph client node. If it's not referenced anymore, release
495 * it.
496 */
497 static void rbd_put_client(struct rbd_device *rbd_dev)
498 {
499 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
500 rbd_dev->rbd_client = NULL;
501 }
502
503 /*
504 * Destroy requests collection
505 */
506 static void rbd_coll_release(struct kref *kref)
507 {
508 struct rbd_req_coll *coll =
509 container_of(kref, struct rbd_req_coll, kref);
510
511 dout("rbd_coll_release %p\n", coll);
512 kfree(coll);
513 }
514
515 static bool rbd_image_format_valid(u32 image_format)
516 {
517 return image_format == 1 || image_format == 2;
518 }
519
520 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
521 {
522 size_t size;
523 u32 snap_count;
524
525 /* The header has to start with the magic rbd header text */
526 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
527 return false;
528
529 /* The bio layer requires at least sector-sized I/O */
530
531 if (ondisk->options.order < SECTOR_SHIFT)
532 return false;
533
534 /* If we use u64 in a few spots we may be able to loosen this */
535
536 if (ondisk->options.order > 8 * sizeof (int) - 1)
537 return false;
538
539 /*
540 * The size of a snapshot header has to fit in a size_t, and
541 * that limits the number of snapshots.
542 */
543 snap_count = le32_to_cpu(ondisk->snap_count);
544 size = SIZE_MAX - sizeof (struct ceph_snap_context);
545 if (snap_count > size / sizeof (__le64))
546 return false;
547
548 /*
549 * Not only that, but the size of the entire the snapshot
550 * header must also be representable in a size_t.
551 */
552 size -= snap_count * sizeof (__le64);
553 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
554 return false;
555
556 return true;
557 }
558
559 /*
560 * Create a new header structure, translate header format from the on-disk
561 * header.
562 */
563 static int rbd_header_from_disk(struct rbd_image_header *header,
564 struct rbd_image_header_ondisk *ondisk)
565 {
566 u32 snap_count;
567 size_t len;
568 size_t size;
569 u32 i;
570
571 memset(header, 0, sizeof (*header));
572
573 snap_count = le32_to_cpu(ondisk->snap_count);
574
575 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
576 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
577 if (!header->object_prefix)
578 return -ENOMEM;
579 memcpy(header->object_prefix, ondisk->object_prefix, len);
580 header->object_prefix[len] = '\0';
581
582 if (snap_count) {
583 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
584
585 /* Save a copy of the snapshot names */
586
587 if (snap_names_len > (u64) SIZE_MAX)
588 return -EIO;
589 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
590 if (!header->snap_names)
591 goto out_err;
592 /*
593 * Note that rbd_dev_v1_header_read() guarantees
594 * the ondisk buffer we're working with has
595 * snap_names_len bytes beyond the end of the
596 * snapshot id array, this memcpy() is safe.
597 */
598 memcpy(header->snap_names, &ondisk->snaps[snap_count],
599 snap_names_len);
600
601 /* Record each snapshot's size */
602
603 size = snap_count * sizeof (*header->snap_sizes);
604 header->snap_sizes = kmalloc(size, GFP_KERNEL);
605 if (!header->snap_sizes)
606 goto out_err;
607 for (i = 0; i < snap_count; i++)
608 header->snap_sizes[i] =
609 le64_to_cpu(ondisk->snaps[i].image_size);
610 } else {
611 WARN_ON(ondisk->snap_names_len);
612 header->snap_names = NULL;
613 header->snap_sizes = NULL;
614 }
615
616 header->features = 0; /* No features support in v1 images */
617 header->obj_order = ondisk->options.order;
618 header->crypt_type = ondisk->options.crypt_type;
619 header->comp_type = ondisk->options.comp_type;
620
621 /* Allocate and fill in the snapshot context */
622
623 header->image_size = le64_to_cpu(ondisk->image_size);
624 size = sizeof (struct ceph_snap_context);
625 size += snap_count * sizeof (header->snapc->snaps[0]);
626 header->snapc = kzalloc(size, GFP_KERNEL);
627 if (!header->snapc)
628 goto out_err;
629
630 atomic_set(&header->snapc->nref, 1);
631 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
632 header->snapc->num_snaps = snap_count;
633 for (i = 0; i < snap_count; i++)
634 header->snapc->snaps[i] =
635 le64_to_cpu(ondisk->snaps[i].id);
636
637 return 0;
638
639 out_err:
640 kfree(header->snap_sizes);
641 header->snap_sizes = NULL;
642 kfree(header->snap_names);
643 header->snap_names = NULL;
644 kfree(header->object_prefix);
645 header->object_prefix = NULL;
646
647 return -ENOMEM;
648 }
649
650 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
651 {
652
653 struct rbd_snap *snap;
654
655 list_for_each_entry(snap, &rbd_dev->snaps, node) {
656 if (!strcmp(snap_name, snap->name)) {
657 rbd_dev->snap_id = snap->id;
658 rbd_dev->mapping.size = snap->size;
659 rbd_dev->mapping.features = snap->features;
660
661 return 0;
662 }
663 }
664
665 return -ENOENT;
666 }
667
668 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
669 {
670 int ret;
671
672 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
673 sizeof (RBD_SNAP_HEAD_NAME))) {
674 rbd_dev->snap_id = CEPH_NOSNAP;
675 rbd_dev->mapping.size = rbd_dev->header.image_size;
676 rbd_dev->mapping.features = rbd_dev->header.features;
677 ret = 0;
678 } else {
679 ret = snap_by_name(rbd_dev, snap_name);
680 if (ret < 0)
681 goto done;
682 rbd_dev->mapping.read_only = true;
683 }
684 rbd_dev->snap_name = snap_name;
685 rbd_dev->exists = true;
686 done:
687 return ret;
688 }
689
690 static void rbd_header_free(struct rbd_image_header *header)
691 {
692 kfree(header->object_prefix);
693 header->object_prefix = NULL;
694 kfree(header->snap_sizes);
695 header->snap_sizes = NULL;
696 kfree(header->snap_names);
697 header->snap_names = NULL;
698 ceph_put_snap_context(header->snapc);
699 header->snapc = NULL;
700 }
701
702 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
703 {
704 char *name;
705 u64 segment;
706 int ret;
707
708 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
709 if (!name)
710 return NULL;
711 segment = offset >> rbd_dev->header.obj_order;
712 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
713 rbd_dev->header.object_prefix, segment);
714 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
715 pr_err("error formatting segment name for #%llu (%d)\n",
716 segment, ret);
717 kfree(name);
718 name = NULL;
719 }
720
721 return name;
722 }
723
724 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
725 {
726 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
727
728 return offset & (segment_size - 1);
729 }
730
731 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
732 u64 offset, u64 length)
733 {
734 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
735
736 offset &= segment_size - 1;
737
738 rbd_assert(length <= U64_MAX - offset);
739 if (offset + length > segment_size)
740 length = segment_size - offset;
741
742 return length;
743 }
744
745 static int rbd_get_num_segments(struct rbd_image_header *header,
746 u64 ofs, u64 len)
747 {
748 u64 start_seg;
749 u64 end_seg;
750
751 if (!len)
752 return 0;
753 if (len - 1 > U64_MAX - ofs)
754 return -ERANGE;
755
756 start_seg = ofs >> header->obj_order;
757 end_seg = (ofs + len - 1) >> header->obj_order;
758
759 return end_seg - start_seg + 1;
760 }
761
762 /*
763 * returns the size of an object in the image
764 */
765 static u64 rbd_obj_bytes(struct rbd_image_header *header)
766 {
767 return 1 << header->obj_order;
768 }
769
770 /*
771 * bio helpers
772 */
773
774 static void bio_chain_put(struct bio *chain)
775 {
776 struct bio *tmp;
777
778 while (chain) {
779 tmp = chain;
780 chain = chain->bi_next;
781 bio_put(tmp);
782 }
783 }
784
785 /*
786 * zeros a bio chain, starting at specific offset
787 */
788 static void zero_bio_chain(struct bio *chain, int start_ofs)
789 {
790 struct bio_vec *bv;
791 unsigned long flags;
792 void *buf;
793 int i;
794 int pos = 0;
795
796 while (chain) {
797 bio_for_each_segment(bv, chain, i) {
798 if (pos + bv->bv_len > start_ofs) {
799 int remainder = max(start_ofs - pos, 0);
800 buf = bvec_kmap_irq(bv, &flags);
801 memset(buf + remainder, 0,
802 bv->bv_len - remainder);
803 bvec_kunmap_irq(buf, &flags);
804 }
805 pos += bv->bv_len;
806 }
807
808 chain = chain->bi_next;
809 }
810 }
811
812 /*
813 * Clone a portion of a bio, starting at the given byte offset
814 * and continuing for the number of bytes indicated.
815 */
816 static struct bio *bio_clone_range(struct bio *bio_src,
817 unsigned int offset,
818 unsigned int len,
819 gfp_t gfpmask)
820 {
821 struct bio_vec *bv;
822 unsigned int resid;
823 unsigned short idx;
824 unsigned int voff;
825 unsigned short end_idx;
826 unsigned short vcnt;
827 struct bio *bio;
828
829 /* Handle the easy case for the caller */
830
831 if (!offset && len == bio_src->bi_size)
832 return bio_clone(bio_src, gfpmask);
833
834 if (WARN_ON_ONCE(!len))
835 return NULL;
836 if (WARN_ON_ONCE(len > bio_src->bi_size))
837 return NULL;
838 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
839 return NULL;
840
841 /* Find first affected segment... */
842
843 resid = offset;
844 __bio_for_each_segment(bv, bio_src, idx, 0) {
845 if (resid < bv->bv_len)
846 break;
847 resid -= bv->bv_len;
848 }
849 voff = resid;
850
851 /* ...and the last affected segment */
852
853 resid += len;
854 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
855 if (resid <= bv->bv_len)
856 break;
857 resid -= bv->bv_len;
858 }
859 vcnt = end_idx - idx + 1;
860
861 /* Build the clone */
862
863 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
864 if (!bio)
865 return NULL; /* ENOMEM */
866
867 bio->bi_bdev = bio_src->bi_bdev;
868 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
869 bio->bi_rw = bio_src->bi_rw;
870 bio->bi_flags |= 1 << BIO_CLONED;
871
872 /*
873 * Copy over our part of the bio_vec, then update the first
874 * and last (or only) entries.
875 */
876 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
877 vcnt * sizeof (struct bio_vec));
878 bio->bi_io_vec[0].bv_offset += voff;
879 if (vcnt > 1) {
880 bio->bi_io_vec[0].bv_len -= voff;
881 bio->bi_io_vec[vcnt - 1].bv_len = resid;
882 } else {
883 bio->bi_io_vec[0].bv_len = len;
884 }
885
886 bio->bi_vcnt = vcnt;
887 bio->bi_size = len;
888 bio->bi_idx = 0;
889
890 return bio;
891 }
892
893 /*
894 * Clone a portion of a bio chain, starting at the given byte offset
895 * into the first bio in the source chain and continuing for the
896 * number of bytes indicated. The result is another bio chain of
897 * exactly the given length, or a null pointer on error.
898 *
899 * The bio_src and offset parameters are both in-out. On entry they
900 * refer to the first source bio and the offset into that bio where
901 * the start of data to be cloned is located.
902 *
903 * On return, bio_src is updated to refer to the bio in the source
904 * chain that contains first un-cloned byte, and *offset will
905 * contain the offset of that byte within that bio.
906 */
907 static struct bio *bio_chain_clone_range(struct bio **bio_src,
908 unsigned int *offset,
909 unsigned int len,
910 gfp_t gfpmask)
911 {
912 struct bio *bi = *bio_src;
913 unsigned int off = *offset;
914 struct bio *chain = NULL;
915 struct bio **end;
916
917 /* Build up a chain of clone bios up to the limit */
918
919 if (!bi || off >= bi->bi_size || !len)
920 return NULL; /* Nothing to clone */
921
922 end = &chain;
923 while (len) {
924 unsigned int bi_size;
925 struct bio *bio;
926
927 if (!bi)
928 goto out_err; /* EINVAL; ran out of bio's */
929 bi_size = min_t(unsigned int, bi->bi_size - off, len);
930 bio = bio_clone_range(bi, off, bi_size, gfpmask);
931 if (!bio)
932 goto out_err; /* ENOMEM */
933
934 *end = bio;
935 end = &bio->bi_next;
936
937 off += bi_size;
938 if (off == bi->bi_size) {
939 bi = bi->bi_next;
940 off = 0;
941 }
942 len -= bi_size;
943 }
944 *bio_src = bi;
945 *offset = off;
946
947 return chain;
948 out_err:
949 bio_chain_put(chain);
950
951 return NULL;
952 }
953
954 /*
955 * helpers for osd request op vectors.
956 */
957 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
958 int opcode, u32 payload_len)
959 {
960 struct ceph_osd_req_op *ops;
961
962 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
963 if (!ops)
964 return NULL;
965
966 ops[0].op = opcode;
967
968 /*
969 * op extent offset and length will be set later on
970 * in calc_raw_layout()
971 */
972 ops[0].payload_len = payload_len;
973
974 return ops;
975 }
976
977 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
978 {
979 kfree(ops);
980 }
981
982 static void rbd_coll_end_req_index(struct request *rq,
983 struct rbd_req_coll *coll,
984 int index,
985 int ret, u64 len)
986 {
987 struct request_queue *q;
988 int min, max, i;
989
990 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
991 coll, index, ret, (unsigned long long) len);
992
993 if (!rq)
994 return;
995
996 if (!coll) {
997 blk_end_request(rq, ret, len);
998 return;
999 }
1000
1001 q = rq->q;
1002
1003 spin_lock_irq(q->queue_lock);
1004 coll->status[index].done = 1;
1005 coll->status[index].rc = ret;
1006 coll->status[index].bytes = len;
1007 max = min = coll->num_done;
1008 while (max < coll->total && coll->status[max].done)
1009 max++;
1010
1011 for (i = min; i<max; i++) {
1012 __blk_end_request(rq, coll->status[i].rc,
1013 coll->status[i].bytes);
1014 coll->num_done++;
1015 kref_put(&coll->kref, rbd_coll_release);
1016 }
1017 spin_unlock_irq(q->queue_lock);
1018 }
1019
1020 static void rbd_coll_end_req(struct rbd_request *req,
1021 int ret, u64 len)
1022 {
1023 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1024 }
1025
1026 /*
1027 * Send ceph osd request
1028 */
1029 static int rbd_do_request(struct request *rq,
1030 struct rbd_device *rbd_dev,
1031 struct ceph_snap_context *snapc,
1032 u64 snapid,
1033 const char *object_name, u64 ofs, u64 len,
1034 struct bio *bio,
1035 struct page **pages,
1036 int num_pages,
1037 int flags,
1038 struct ceph_osd_req_op *ops,
1039 struct rbd_req_coll *coll,
1040 int coll_index,
1041 void (*rbd_cb)(struct ceph_osd_request *req,
1042 struct ceph_msg *msg),
1043 struct ceph_osd_request **linger_req,
1044 u64 *ver)
1045 {
1046 struct ceph_osd_request *req;
1047 struct ceph_file_layout *layout;
1048 int ret;
1049 u64 bno;
1050 struct timespec mtime = CURRENT_TIME;
1051 struct rbd_request *req_data;
1052 struct ceph_osd_request_head *reqhead;
1053 struct ceph_osd_client *osdc;
1054
1055 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1056 if (!req_data) {
1057 if (coll)
1058 rbd_coll_end_req_index(rq, coll, coll_index,
1059 -ENOMEM, len);
1060 return -ENOMEM;
1061 }
1062
1063 if (coll) {
1064 req_data->coll = coll;
1065 req_data->coll_index = coll_index;
1066 }
1067
1068 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1069 object_name, (unsigned long long) ofs,
1070 (unsigned long long) len, coll, coll_index);
1071
1072 osdc = &rbd_dev->rbd_client->client->osdc;
1073 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1074 false, GFP_NOIO, pages, bio);
1075 if (!req) {
1076 ret = -ENOMEM;
1077 goto done_pages;
1078 }
1079
1080 req->r_callback = rbd_cb;
1081
1082 req_data->rq = rq;
1083 req_data->bio = bio;
1084 req_data->pages = pages;
1085 req_data->len = len;
1086
1087 req->r_priv = req_data;
1088
1089 reqhead = req->r_request->front.iov_base;
1090 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1091
1092 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1093 req->r_oid_len = strlen(req->r_oid);
1094
1095 layout = &req->r_file_layout;
1096 memset(layout, 0, sizeof(*layout));
1097 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1098 layout->fl_stripe_count = cpu_to_le32(1);
1099 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1100 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1101 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1102 req, ops);
1103 rbd_assert(ret == 0);
1104
1105 ceph_osdc_build_request(req, ofs, &len,
1106 ops,
1107 snapc,
1108 &mtime,
1109 req->r_oid, req->r_oid_len);
1110
1111 if (linger_req) {
1112 ceph_osdc_set_request_linger(osdc, req);
1113 *linger_req = req;
1114 }
1115
1116 ret = ceph_osdc_start_request(osdc, req, false);
1117 if (ret < 0)
1118 goto done_err;
1119
1120 if (!rbd_cb) {
1121 ret = ceph_osdc_wait_request(osdc, req);
1122 if (ver)
1123 *ver = le64_to_cpu(req->r_reassert_version.version);
1124 dout("reassert_ver=%llu\n",
1125 (unsigned long long)
1126 le64_to_cpu(req->r_reassert_version.version));
1127 ceph_osdc_put_request(req);
1128 }
1129 return ret;
1130
1131 done_err:
1132 bio_chain_put(req_data->bio);
1133 ceph_osdc_put_request(req);
1134 done_pages:
1135 rbd_coll_end_req(req_data, ret, len);
1136 kfree(req_data);
1137 return ret;
1138 }
1139
1140 /*
1141 * Ceph osd op callback
1142 */
1143 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1144 {
1145 struct rbd_request *req_data = req->r_priv;
1146 struct ceph_osd_reply_head *replyhead;
1147 struct ceph_osd_op *op;
1148 __s32 rc;
1149 u64 bytes;
1150 int read_op;
1151
1152 /* parse reply */
1153 replyhead = msg->front.iov_base;
1154 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1155 op = (void *)(replyhead + 1);
1156 rc = le32_to_cpu(replyhead->result);
1157 bytes = le64_to_cpu(op->extent.length);
1158 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1159
1160 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1161 (unsigned long long) bytes, read_op, (int) rc);
1162
1163 if (rc == -ENOENT && read_op) {
1164 zero_bio_chain(req_data->bio, 0);
1165 rc = 0;
1166 } else if (rc == 0 && read_op && bytes < req_data->len) {
1167 zero_bio_chain(req_data->bio, bytes);
1168 bytes = req_data->len;
1169 }
1170
1171 rbd_coll_end_req(req_data, rc, bytes);
1172
1173 if (req_data->bio)
1174 bio_chain_put(req_data->bio);
1175
1176 ceph_osdc_put_request(req);
1177 kfree(req_data);
1178 }
1179
1180 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1181 {
1182 ceph_osdc_put_request(req);
1183 }
1184
1185 /*
1186 * Do a synchronous ceph osd operation
1187 */
1188 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1189 struct ceph_snap_context *snapc,
1190 u64 snapid,
1191 int flags,
1192 struct ceph_osd_req_op *ops,
1193 const char *object_name,
1194 u64 ofs, u64 inbound_size,
1195 char *inbound,
1196 struct ceph_osd_request **linger_req,
1197 u64 *ver)
1198 {
1199 int ret;
1200 struct page **pages;
1201 int num_pages;
1202
1203 rbd_assert(ops != NULL);
1204
1205 num_pages = calc_pages_for(ofs, inbound_size);
1206 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1207 if (IS_ERR(pages))
1208 return PTR_ERR(pages);
1209
1210 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1211 object_name, ofs, inbound_size, NULL,
1212 pages, num_pages,
1213 flags,
1214 ops,
1215 NULL, 0,
1216 NULL,
1217 linger_req, ver);
1218 if (ret < 0)
1219 goto done;
1220
1221 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1222 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1223
1224 done:
1225 ceph_release_page_vector(pages, num_pages);
1226 return ret;
1227 }
1228
1229 /*
1230 * Do an asynchronous ceph osd operation
1231 */
1232 static int rbd_do_op(struct request *rq,
1233 struct rbd_device *rbd_dev,
1234 struct ceph_snap_context *snapc,
1235 u64 ofs, u64 len,
1236 struct bio *bio,
1237 struct rbd_req_coll *coll,
1238 int coll_index)
1239 {
1240 char *seg_name;
1241 u64 seg_ofs;
1242 u64 seg_len;
1243 int ret;
1244 struct ceph_osd_req_op *ops;
1245 u32 payload_len;
1246 int opcode;
1247 int flags;
1248 u64 snapid;
1249
1250 seg_name = rbd_segment_name(rbd_dev, ofs);
1251 if (!seg_name)
1252 return -ENOMEM;
1253 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1254 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1255
1256 if (rq_data_dir(rq) == WRITE) {
1257 opcode = CEPH_OSD_OP_WRITE;
1258 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1259 snapid = CEPH_NOSNAP;
1260 payload_len = seg_len;
1261 } else {
1262 opcode = CEPH_OSD_OP_READ;
1263 flags = CEPH_OSD_FLAG_READ;
1264 snapc = NULL;
1265 snapid = rbd_dev->snap_id;
1266 payload_len = 0;
1267 }
1268
1269 ret = -ENOMEM;
1270 ops = rbd_create_rw_ops(1, opcode, payload_len);
1271 if (!ops)
1272 goto done;
1273
1274 /* we've taken care of segment sizes earlier when we
1275 cloned the bios. We should never have a segment
1276 truncated at this point */
1277 rbd_assert(seg_len == len);
1278
1279 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1280 seg_name, seg_ofs, seg_len,
1281 bio,
1282 NULL, 0,
1283 flags,
1284 ops,
1285 coll, coll_index,
1286 rbd_req_cb, 0, NULL);
1287
1288 rbd_destroy_ops(ops);
1289 done:
1290 kfree(seg_name);
1291 return ret;
1292 }
1293
1294 /*
1295 * Request sync osd read
1296 */
1297 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1298 u64 snapid,
1299 const char *object_name,
1300 u64 ofs, u64 len,
1301 char *buf,
1302 u64 *ver)
1303 {
1304 struct ceph_osd_req_op *ops;
1305 int ret;
1306
1307 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1308 if (!ops)
1309 return -ENOMEM;
1310
1311 ret = rbd_req_sync_op(rbd_dev, NULL,
1312 snapid,
1313 CEPH_OSD_FLAG_READ,
1314 ops, object_name, ofs, len, buf, NULL, ver);
1315 rbd_destroy_ops(ops);
1316
1317 return ret;
1318 }
1319
1320 /*
1321 * Request sync osd watch
1322 */
1323 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1324 u64 ver,
1325 u64 notify_id)
1326 {
1327 struct ceph_osd_req_op *ops;
1328 int ret;
1329
1330 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1331 if (!ops)
1332 return -ENOMEM;
1333
1334 ops[0].watch.ver = cpu_to_le64(ver);
1335 ops[0].watch.cookie = notify_id;
1336 ops[0].watch.flag = 0;
1337
1338 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1339 rbd_dev->header_name, 0, 0, NULL,
1340 NULL, 0,
1341 CEPH_OSD_FLAG_READ,
1342 ops,
1343 NULL, 0,
1344 rbd_simple_req_cb, 0, NULL);
1345
1346 rbd_destroy_ops(ops);
1347 return ret;
1348 }
1349
1350 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1351 {
1352 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1353 u64 hver;
1354 int rc;
1355
1356 if (!rbd_dev)
1357 return;
1358
1359 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1360 rbd_dev->header_name, (unsigned long long) notify_id,
1361 (unsigned int) opcode);
1362 rc = rbd_dev_refresh(rbd_dev, &hver);
1363 if (rc)
1364 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1365 " update snaps: %d\n", rbd_dev->major, rc);
1366
1367 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1368 }
1369
1370 /*
1371 * Request sync osd watch
1372 */
1373 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1374 {
1375 struct ceph_osd_req_op *ops;
1376 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1377 int ret;
1378
1379 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1380 if (!ops)
1381 return -ENOMEM;
1382
1383 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1384 (void *)rbd_dev, &rbd_dev->watch_event);
1385 if (ret < 0)
1386 goto fail;
1387
1388 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1389 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1390 ops[0].watch.flag = 1;
1391
1392 ret = rbd_req_sync_op(rbd_dev, NULL,
1393 CEPH_NOSNAP,
1394 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395 ops,
1396 rbd_dev->header_name,
1397 0, 0, NULL,
1398 &rbd_dev->watch_request, NULL);
1399
1400 if (ret < 0)
1401 goto fail_event;
1402
1403 rbd_destroy_ops(ops);
1404 return 0;
1405
1406 fail_event:
1407 ceph_osdc_cancel_event(rbd_dev->watch_event);
1408 rbd_dev->watch_event = NULL;
1409 fail:
1410 rbd_destroy_ops(ops);
1411 return ret;
1412 }
1413
1414 /*
1415 * Request sync osd unwatch
1416 */
1417 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1418 {
1419 struct ceph_osd_req_op *ops;
1420 int ret;
1421
1422 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1423 if (!ops)
1424 return -ENOMEM;
1425
1426 ops[0].watch.ver = 0;
1427 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1428 ops[0].watch.flag = 0;
1429
1430 ret = rbd_req_sync_op(rbd_dev, NULL,
1431 CEPH_NOSNAP,
1432 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1433 ops,
1434 rbd_dev->header_name,
1435 0, 0, NULL, NULL, NULL);
1436
1437
1438 rbd_destroy_ops(ops);
1439 ceph_osdc_cancel_event(rbd_dev->watch_event);
1440 rbd_dev->watch_event = NULL;
1441 return ret;
1442 }
1443
1444 /*
1445 * Synchronous osd object method call
1446 */
1447 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1448 const char *object_name,
1449 const char *class_name,
1450 const char *method_name,
1451 const char *outbound,
1452 size_t outbound_size,
1453 char *inbound,
1454 size_t inbound_size,
1455 int flags,
1456 u64 *ver)
1457 {
1458 struct ceph_osd_req_op *ops;
1459 int class_name_len = strlen(class_name);
1460 int method_name_len = strlen(method_name);
1461 int payload_size;
1462 int ret;
1463
1464 /*
1465 * Any input parameters required by the method we're calling
1466 * will be sent along with the class and method names as
1467 * part of the message payload. That data and its size are
1468 * supplied via the indata and indata_len fields (named from
1469 * the perspective of the server side) in the OSD request
1470 * operation.
1471 */
1472 payload_size = class_name_len + method_name_len + outbound_size;
1473 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1474 if (!ops)
1475 return -ENOMEM;
1476
1477 ops[0].cls.class_name = class_name;
1478 ops[0].cls.class_len = (__u8) class_name_len;
1479 ops[0].cls.method_name = method_name;
1480 ops[0].cls.method_len = (__u8) method_name_len;
1481 ops[0].cls.argc = 0;
1482 ops[0].cls.indata = outbound;
1483 ops[0].cls.indata_len = outbound_size;
1484
1485 ret = rbd_req_sync_op(rbd_dev, NULL,
1486 CEPH_NOSNAP,
1487 flags, ops,
1488 object_name, 0, inbound_size, inbound,
1489 NULL, ver);
1490
1491 rbd_destroy_ops(ops);
1492
1493 dout("cls_exec returned %d\n", ret);
1494 return ret;
1495 }
1496
1497 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1498 {
1499 struct rbd_req_coll *coll =
1500 kzalloc(sizeof(struct rbd_req_coll) +
1501 sizeof(struct rbd_req_status) * num_reqs,
1502 GFP_ATOMIC);
1503
1504 if (!coll)
1505 return NULL;
1506 coll->total = num_reqs;
1507 kref_init(&coll->kref);
1508 return coll;
1509 }
1510
1511 /*
1512 * block device queue callback
1513 */
1514 static void rbd_rq_fn(struct request_queue *q)
1515 {
1516 struct rbd_device *rbd_dev = q->queuedata;
1517 struct request *rq;
1518
1519 while ((rq = blk_fetch_request(q))) {
1520 struct bio *bio;
1521 bool do_write;
1522 unsigned int size;
1523 u64 ofs;
1524 int num_segs, cur_seg = 0;
1525 struct rbd_req_coll *coll;
1526 struct ceph_snap_context *snapc;
1527 unsigned int bio_offset;
1528
1529 dout("fetched request\n");
1530
1531 /* filter out block requests we don't understand */
1532 if ((rq->cmd_type != REQ_TYPE_FS)) {
1533 __blk_end_request_all(rq, 0);
1534 continue;
1535 }
1536
1537 /* deduce our operation (read, write) */
1538 do_write = (rq_data_dir(rq) == WRITE);
1539 if (do_write && rbd_dev->mapping.read_only) {
1540 __blk_end_request_all(rq, -EROFS);
1541 continue;
1542 }
1543
1544 spin_unlock_irq(q->queue_lock);
1545
1546 down_read(&rbd_dev->header_rwsem);
1547
1548 if (!rbd_dev->exists) {
1549 rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1550 up_read(&rbd_dev->header_rwsem);
1551 dout("request for non-existent snapshot");
1552 spin_lock_irq(q->queue_lock);
1553 __blk_end_request_all(rq, -ENXIO);
1554 continue;
1555 }
1556
1557 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1558
1559 up_read(&rbd_dev->header_rwsem);
1560
1561 size = blk_rq_bytes(rq);
1562 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1563 bio = rq->bio;
1564
1565 dout("%s 0x%x bytes at 0x%llx\n",
1566 do_write ? "write" : "read",
1567 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1568
1569 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1570 if (num_segs <= 0) {
1571 spin_lock_irq(q->queue_lock);
1572 __blk_end_request_all(rq, num_segs);
1573 ceph_put_snap_context(snapc);
1574 continue;
1575 }
1576 coll = rbd_alloc_coll(num_segs);
1577 if (!coll) {
1578 spin_lock_irq(q->queue_lock);
1579 __blk_end_request_all(rq, -ENOMEM);
1580 ceph_put_snap_context(snapc);
1581 continue;
1582 }
1583
1584 bio_offset = 0;
1585 do {
1586 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1587 unsigned int chain_size;
1588 struct bio *bio_chain;
1589
1590 BUG_ON(limit > (u64) UINT_MAX);
1591 chain_size = (unsigned int) limit;
1592 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1593
1594 kref_get(&coll->kref);
1595
1596 /* Pass a cloned bio chain via an osd request */
1597
1598 bio_chain = bio_chain_clone_range(&bio,
1599 &bio_offset, chain_size,
1600 GFP_ATOMIC);
1601 if (bio_chain)
1602 (void) rbd_do_op(rq, rbd_dev, snapc,
1603 ofs, chain_size,
1604 bio_chain, coll, cur_seg);
1605 else
1606 rbd_coll_end_req_index(rq, coll, cur_seg,
1607 -ENOMEM, chain_size);
1608 size -= chain_size;
1609 ofs += chain_size;
1610
1611 cur_seg++;
1612 } while (size > 0);
1613 kref_put(&coll->kref, rbd_coll_release);
1614
1615 spin_lock_irq(q->queue_lock);
1616
1617 ceph_put_snap_context(snapc);
1618 }
1619 }
1620
1621 /*
1622 * a queue callback. Makes sure that we don't create a bio that spans across
1623 * multiple osd objects. One exception would be with a single page bios,
1624 * which we handle later at bio_chain_clone_range()
1625 */
1626 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1627 struct bio_vec *bvec)
1628 {
1629 struct rbd_device *rbd_dev = q->queuedata;
1630 sector_t sector_offset;
1631 sector_t sectors_per_obj;
1632 sector_t obj_sector_offset;
1633 int ret;
1634
1635 /*
1636 * Find how far into its rbd object the partition-relative
1637 * bio start sector is to offset relative to the enclosing
1638 * device.
1639 */
1640 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1641 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1642 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1643
1644 /*
1645 * Compute the number of bytes from that offset to the end
1646 * of the object. Account for what's already used by the bio.
1647 */
1648 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1649 if (ret > bmd->bi_size)
1650 ret -= bmd->bi_size;
1651 else
1652 ret = 0;
1653
1654 /*
1655 * Don't send back more than was asked for. And if the bio
1656 * was empty, let the whole thing through because: "Note
1657 * that a block device *must* allow a single page to be
1658 * added to an empty bio."
1659 */
1660 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1661 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1662 ret = (int) bvec->bv_len;
1663
1664 return ret;
1665 }
1666
1667 static void rbd_free_disk(struct rbd_device *rbd_dev)
1668 {
1669 struct gendisk *disk = rbd_dev->disk;
1670
1671 if (!disk)
1672 return;
1673
1674 if (disk->flags & GENHD_FL_UP)
1675 del_gendisk(disk);
1676 if (disk->queue)
1677 blk_cleanup_queue(disk->queue);
1678 put_disk(disk);
1679 }
1680
1681 /*
1682 * Read the complete header for the given rbd device.
1683 *
1684 * Returns a pointer to a dynamically-allocated buffer containing
1685 * the complete and validated header. Caller can pass the address
1686 * of a variable that will be filled in with the version of the
1687 * header object at the time it was read.
1688 *
1689 * Returns a pointer-coded errno if a failure occurs.
1690 */
1691 static struct rbd_image_header_ondisk *
1692 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1693 {
1694 struct rbd_image_header_ondisk *ondisk = NULL;
1695 u32 snap_count = 0;
1696 u64 names_size = 0;
1697 u32 want_count;
1698 int ret;
1699
1700 /*
1701 * The complete header will include an array of its 64-bit
1702 * snapshot ids, followed by the names of those snapshots as
1703 * a contiguous block of NUL-terminated strings. Note that
1704 * the number of snapshots could change by the time we read
1705 * it in, in which case we re-read it.
1706 */
1707 do {
1708 size_t size;
1709
1710 kfree(ondisk);
1711
1712 size = sizeof (*ondisk);
1713 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1714 size += names_size;
1715 ondisk = kmalloc(size, GFP_KERNEL);
1716 if (!ondisk)
1717 return ERR_PTR(-ENOMEM);
1718
1719 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1720 rbd_dev->header_name,
1721 0, size,
1722 (char *) ondisk, version);
1723
1724 if (ret < 0)
1725 goto out_err;
1726 if (WARN_ON((size_t) ret < size)) {
1727 ret = -ENXIO;
1728 pr_warning("short header read for image %s"
1729 " (want %zd got %d)\n",
1730 rbd_dev->image_name, size, ret);
1731 goto out_err;
1732 }
1733 if (!rbd_dev_ondisk_valid(ondisk)) {
1734 ret = -ENXIO;
1735 pr_warning("invalid header for image %s\n",
1736 rbd_dev->image_name);
1737 goto out_err;
1738 }
1739
1740 names_size = le64_to_cpu(ondisk->snap_names_len);
1741 want_count = snap_count;
1742 snap_count = le32_to_cpu(ondisk->snap_count);
1743 } while (snap_count != want_count);
1744
1745 return ondisk;
1746
1747 out_err:
1748 kfree(ondisk);
1749
1750 return ERR_PTR(ret);
1751 }
1752
1753 /*
1754 * reload the ondisk the header
1755 */
1756 static int rbd_read_header(struct rbd_device *rbd_dev,
1757 struct rbd_image_header *header)
1758 {
1759 struct rbd_image_header_ondisk *ondisk;
1760 u64 ver = 0;
1761 int ret;
1762
1763 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1764 if (IS_ERR(ondisk))
1765 return PTR_ERR(ondisk);
1766 ret = rbd_header_from_disk(header, ondisk);
1767 if (ret >= 0)
1768 header->obj_version = ver;
1769 kfree(ondisk);
1770
1771 return ret;
1772 }
1773
1774 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1775 {
1776 struct rbd_snap *snap;
1777 struct rbd_snap *next;
1778
1779 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1780 rbd_remove_snap_dev(snap);
1781 }
1782
1783 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1784 {
1785 sector_t size;
1786
1787 if (rbd_dev->snap_id != CEPH_NOSNAP)
1788 return;
1789
1790 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1791 dout("setting size to %llu sectors", (unsigned long long) size);
1792 rbd_dev->mapping.size = (u64) size;
1793 set_capacity(rbd_dev->disk, size);
1794 }
1795
1796 /*
1797 * only read the first part of the ondisk header, without the snaps info
1798 */
1799 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1800 {
1801 int ret;
1802 struct rbd_image_header h;
1803
1804 ret = rbd_read_header(rbd_dev, &h);
1805 if (ret < 0)
1806 return ret;
1807
1808 down_write(&rbd_dev->header_rwsem);
1809
1810 /* Update image size, and check for resize of mapped image */
1811 rbd_dev->header.image_size = h.image_size;
1812 rbd_update_mapping_size(rbd_dev);
1813
1814 /* rbd_dev->header.object_prefix shouldn't change */
1815 kfree(rbd_dev->header.snap_sizes);
1816 kfree(rbd_dev->header.snap_names);
1817 /* osd requests may still refer to snapc */
1818 ceph_put_snap_context(rbd_dev->header.snapc);
1819
1820 if (hver)
1821 *hver = h.obj_version;
1822 rbd_dev->header.obj_version = h.obj_version;
1823 rbd_dev->header.image_size = h.image_size;
1824 rbd_dev->header.snapc = h.snapc;
1825 rbd_dev->header.snap_names = h.snap_names;
1826 rbd_dev->header.snap_sizes = h.snap_sizes;
1827 /* Free the extra copy of the object prefix */
1828 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1829 kfree(h.object_prefix);
1830
1831 ret = rbd_dev_snaps_update(rbd_dev);
1832 if (!ret)
1833 ret = rbd_dev_snaps_register(rbd_dev);
1834
1835 up_write(&rbd_dev->header_rwsem);
1836
1837 return ret;
1838 }
1839
1840 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1841 {
1842 int ret;
1843
1844 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1845 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1846 if (rbd_dev->image_format == 1)
1847 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1848 else
1849 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1850 mutex_unlock(&ctl_mutex);
1851
1852 return ret;
1853 }
1854
1855 static int rbd_init_disk(struct rbd_device *rbd_dev)
1856 {
1857 struct gendisk *disk;
1858 struct request_queue *q;
1859 u64 segment_size;
1860
1861 /* create gendisk info */
1862 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1863 if (!disk)
1864 return -ENOMEM;
1865
1866 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1867 rbd_dev->dev_id);
1868 disk->major = rbd_dev->major;
1869 disk->first_minor = 0;
1870 disk->fops = &rbd_bd_ops;
1871 disk->private_data = rbd_dev;
1872
1873 /* init rq */
1874 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1875 if (!q)
1876 goto out_disk;
1877
1878 /* We use the default size, but let's be explicit about it. */
1879 blk_queue_physical_block_size(q, SECTOR_SIZE);
1880
1881 /* set io sizes to object size */
1882 segment_size = rbd_obj_bytes(&rbd_dev->header);
1883 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1884 blk_queue_max_segment_size(q, segment_size);
1885 blk_queue_io_min(q, segment_size);
1886 blk_queue_io_opt(q, segment_size);
1887
1888 blk_queue_merge_bvec(q, rbd_merge_bvec);
1889 disk->queue = q;
1890
1891 q->queuedata = rbd_dev;
1892
1893 rbd_dev->disk = disk;
1894
1895 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1896
1897 return 0;
1898 out_disk:
1899 put_disk(disk);
1900
1901 return -ENOMEM;
1902 }
1903
1904 /*
1905 sysfs
1906 */
1907
1908 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1909 {
1910 return container_of(dev, struct rbd_device, dev);
1911 }
1912
1913 static ssize_t rbd_size_show(struct device *dev,
1914 struct device_attribute *attr, char *buf)
1915 {
1916 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917 sector_t size;
1918
1919 down_read(&rbd_dev->header_rwsem);
1920 size = get_capacity(rbd_dev->disk);
1921 up_read(&rbd_dev->header_rwsem);
1922
1923 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1924 }
1925
1926 /*
1927 * Note this shows the features for whatever's mapped, which is not
1928 * necessarily the base image.
1929 */
1930 static ssize_t rbd_features_show(struct device *dev,
1931 struct device_attribute *attr, char *buf)
1932 {
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935 return sprintf(buf, "0x%016llx\n",
1936 (unsigned long long) rbd_dev->mapping.features);
1937 }
1938
1939 static ssize_t rbd_major_show(struct device *dev,
1940 struct device_attribute *attr, char *buf)
1941 {
1942 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943
1944 return sprintf(buf, "%d\n", rbd_dev->major);
1945 }
1946
1947 static ssize_t rbd_client_id_show(struct device *dev,
1948 struct device_attribute *attr, char *buf)
1949 {
1950 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952 return sprintf(buf, "client%lld\n",
1953 ceph_client_id(rbd_dev->rbd_client->client));
1954 }
1955
1956 static ssize_t rbd_pool_show(struct device *dev,
1957 struct device_attribute *attr, char *buf)
1958 {
1959 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960
1961 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1962 }
1963
1964 static ssize_t rbd_pool_id_show(struct device *dev,
1965 struct device_attribute *attr, char *buf)
1966 {
1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969 return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1970 }
1971
1972 static ssize_t rbd_name_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1974 {
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977 return sprintf(buf, "%s\n", rbd_dev->image_name);
1978 }
1979
1980 static ssize_t rbd_image_id_show(struct device *dev,
1981 struct device_attribute *attr, char *buf)
1982 {
1983 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985 return sprintf(buf, "%s\n", rbd_dev->image_id);
1986 }
1987
1988 /*
1989 * Shows the name of the currently-mapped snapshot (or
1990 * RBD_SNAP_HEAD_NAME for the base image).
1991 */
1992 static ssize_t rbd_snap_show(struct device *dev,
1993 struct device_attribute *attr,
1994 char *buf)
1995 {
1996 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1997
1998 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1999 }
2000
2001 static ssize_t rbd_image_refresh(struct device *dev,
2002 struct device_attribute *attr,
2003 const char *buf,
2004 size_t size)
2005 {
2006 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2007 int ret;
2008
2009 ret = rbd_dev_refresh(rbd_dev, NULL);
2010
2011 return ret < 0 ? ret : size;
2012 }
2013
2014 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2015 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2016 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2017 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2018 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2019 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2020 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2021 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2022 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2023 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2024
2025 static struct attribute *rbd_attrs[] = {
2026 &dev_attr_size.attr,
2027 &dev_attr_features.attr,
2028 &dev_attr_major.attr,
2029 &dev_attr_client_id.attr,
2030 &dev_attr_pool.attr,
2031 &dev_attr_pool_id.attr,
2032 &dev_attr_name.attr,
2033 &dev_attr_image_id.attr,
2034 &dev_attr_current_snap.attr,
2035 &dev_attr_refresh.attr,
2036 NULL
2037 };
2038
2039 static struct attribute_group rbd_attr_group = {
2040 .attrs = rbd_attrs,
2041 };
2042
2043 static const struct attribute_group *rbd_attr_groups[] = {
2044 &rbd_attr_group,
2045 NULL
2046 };
2047
2048 static void rbd_sysfs_dev_release(struct device *dev)
2049 {
2050 }
2051
2052 static struct device_type rbd_device_type = {
2053 .name = "rbd",
2054 .groups = rbd_attr_groups,
2055 .release = rbd_sysfs_dev_release,
2056 };
2057
2058
2059 /*
2060 sysfs - snapshots
2061 */
2062
2063 static ssize_t rbd_snap_size_show(struct device *dev,
2064 struct device_attribute *attr,
2065 char *buf)
2066 {
2067 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2068
2069 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2070 }
2071
2072 static ssize_t rbd_snap_id_show(struct device *dev,
2073 struct device_attribute *attr,
2074 char *buf)
2075 {
2076 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2077
2078 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2079 }
2080
2081 static ssize_t rbd_snap_features_show(struct device *dev,
2082 struct device_attribute *attr,
2083 char *buf)
2084 {
2085 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2086
2087 return sprintf(buf, "0x%016llx\n",
2088 (unsigned long long) snap->features);
2089 }
2090
2091 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2092 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2093 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2094
2095 static struct attribute *rbd_snap_attrs[] = {
2096 &dev_attr_snap_size.attr,
2097 &dev_attr_snap_id.attr,
2098 &dev_attr_snap_features.attr,
2099 NULL,
2100 };
2101
2102 static struct attribute_group rbd_snap_attr_group = {
2103 .attrs = rbd_snap_attrs,
2104 };
2105
2106 static void rbd_snap_dev_release(struct device *dev)
2107 {
2108 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2109 kfree(snap->name);
2110 kfree(snap);
2111 }
2112
2113 static const struct attribute_group *rbd_snap_attr_groups[] = {
2114 &rbd_snap_attr_group,
2115 NULL
2116 };
2117
2118 static struct device_type rbd_snap_device_type = {
2119 .groups = rbd_snap_attr_groups,
2120 .release = rbd_snap_dev_release,
2121 };
2122
2123 static bool rbd_snap_registered(struct rbd_snap *snap)
2124 {
2125 bool ret = snap->dev.type == &rbd_snap_device_type;
2126 bool reg = device_is_registered(&snap->dev);
2127
2128 rbd_assert(!ret ^ reg);
2129
2130 return ret;
2131 }
2132
2133 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2134 {
2135 list_del(&snap->node);
2136 if (device_is_registered(&snap->dev))
2137 device_unregister(&snap->dev);
2138 }
2139
2140 static int rbd_register_snap_dev(struct rbd_snap *snap,
2141 struct device *parent)
2142 {
2143 struct device *dev = &snap->dev;
2144 int ret;
2145
2146 dev->type = &rbd_snap_device_type;
2147 dev->parent = parent;
2148 dev->release = rbd_snap_dev_release;
2149 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2150 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2151
2152 ret = device_register(dev);
2153
2154 return ret;
2155 }
2156
2157 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2158 const char *snap_name,
2159 u64 snap_id, u64 snap_size,
2160 u64 snap_features)
2161 {
2162 struct rbd_snap *snap;
2163 int ret;
2164
2165 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2166 if (!snap)
2167 return ERR_PTR(-ENOMEM);
2168
2169 ret = -ENOMEM;
2170 snap->name = kstrdup(snap_name, GFP_KERNEL);
2171 if (!snap->name)
2172 goto err;
2173
2174 snap->id = snap_id;
2175 snap->size = snap_size;
2176 snap->features = snap_features;
2177
2178 return snap;
2179
2180 err:
2181 kfree(snap->name);
2182 kfree(snap);
2183
2184 return ERR_PTR(ret);
2185 }
2186
2187 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2188 u64 *snap_size, u64 *snap_features)
2189 {
2190 char *snap_name;
2191
2192 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2193
2194 *snap_size = rbd_dev->header.snap_sizes[which];
2195 *snap_features = 0; /* No features for v1 */
2196
2197 /* Skip over names until we find the one we are looking for */
2198
2199 snap_name = rbd_dev->header.snap_names;
2200 while (which--)
2201 snap_name += strlen(snap_name) + 1;
2202
2203 return snap_name;
2204 }
2205
2206 /*
2207 * Get the size and object order for an image snapshot, or if
2208 * snap_id is CEPH_NOSNAP, gets this information for the base
2209 * image.
2210 */
2211 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2212 u8 *order, u64 *snap_size)
2213 {
2214 __le64 snapid = cpu_to_le64(snap_id);
2215 int ret;
2216 struct {
2217 u8 order;
2218 __le64 size;
2219 } __attribute__ ((packed)) size_buf = { 0 };
2220
2221 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222 "rbd", "get_size",
2223 (char *) &snapid, sizeof (snapid),
2224 (char *) &size_buf, sizeof (size_buf),
2225 CEPH_OSD_FLAG_READ, NULL);
2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227 if (ret < 0)
2228 return ret;
2229
2230 *order = size_buf.order;
2231 *snap_size = le64_to_cpu(size_buf.size);
2232
2233 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2234 (unsigned long long) snap_id, (unsigned int) *order,
2235 (unsigned long long) *snap_size);
2236
2237 return 0;
2238 }
2239
2240 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2241 {
2242 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2243 &rbd_dev->header.obj_order,
2244 &rbd_dev->header.image_size);
2245 }
2246
2247 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2248 {
2249 void *reply_buf;
2250 int ret;
2251 void *p;
2252
2253 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2254 if (!reply_buf)
2255 return -ENOMEM;
2256
2257 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2258 "rbd", "get_object_prefix",
2259 NULL, 0,
2260 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2261 CEPH_OSD_FLAG_READ, NULL);
2262 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2263 if (ret < 0)
2264 goto out;
2265 ret = 0; /* rbd_req_sync_exec() can return positive */
2266
2267 p = reply_buf;
2268 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2269 p + RBD_OBJ_PREFIX_LEN_MAX,
2270 NULL, GFP_NOIO);
2271
2272 if (IS_ERR(rbd_dev->header.object_prefix)) {
2273 ret = PTR_ERR(rbd_dev->header.object_prefix);
2274 rbd_dev->header.object_prefix = NULL;
2275 } else {
2276 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2277 }
2278
2279 out:
2280 kfree(reply_buf);
2281
2282 return ret;
2283 }
2284
2285 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2286 u64 *snap_features)
2287 {
2288 __le64 snapid = cpu_to_le64(snap_id);
2289 struct {
2290 __le64 features;
2291 __le64 incompat;
2292 } features_buf = { 0 };
2293 u64 incompat;
2294 int ret;
2295
2296 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2297 "rbd", "get_features",
2298 (char *) &snapid, sizeof (snapid),
2299 (char *) &features_buf, sizeof (features_buf),
2300 CEPH_OSD_FLAG_READ, NULL);
2301 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2302 if (ret < 0)
2303 return ret;
2304
2305 incompat = le64_to_cpu(features_buf.incompat);
2306 if (incompat & ~RBD_FEATURES_ALL)
2307 return -ENOTSUPP;
2308
2309 *snap_features = le64_to_cpu(features_buf.features);
2310
2311 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2312 (unsigned long long) snap_id,
2313 (unsigned long long) *snap_features,
2314 (unsigned long long) le64_to_cpu(features_buf.incompat));
2315
2316 return 0;
2317 }
2318
2319 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2320 {
2321 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2322 &rbd_dev->header.features);
2323 }
2324
2325 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2326 {
2327 size_t size;
2328 int ret;
2329 void *reply_buf;
2330 void *p;
2331 void *end;
2332 u64 seq;
2333 u32 snap_count;
2334 struct ceph_snap_context *snapc;
2335 u32 i;
2336
2337 /*
2338 * We'll need room for the seq value (maximum snapshot id),
2339 * snapshot count, and array of that many snapshot ids.
2340 * For now we have a fixed upper limit on the number we're
2341 * prepared to receive.
2342 */
2343 size = sizeof (__le64) + sizeof (__le32) +
2344 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2345 reply_buf = kzalloc(size, GFP_KERNEL);
2346 if (!reply_buf)
2347 return -ENOMEM;
2348
2349 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2350 "rbd", "get_snapcontext",
2351 NULL, 0,
2352 reply_buf, size,
2353 CEPH_OSD_FLAG_READ, ver);
2354 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2355 if (ret < 0)
2356 goto out;
2357
2358 ret = -ERANGE;
2359 p = reply_buf;
2360 end = (char *) reply_buf + size;
2361 ceph_decode_64_safe(&p, end, seq, out);
2362 ceph_decode_32_safe(&p, end, snap_count, out);
2363
2364 /*
2365 * Make sure the reported number of snapshot ids wouldn't go
2366 * beyond the end of our buffer. But before checking that,
2367 * make sure the computed size of the snapshot context we
2368 * allocate is representable in a size_t.
2369 */
2370 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2371 / sizeof (u64)) {
2372 ret = -EINVAL;
2373 goto out;
2374 }
2375 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2376 goto out;
2377
2378 size = sizeof (struct ceph_snap_context) +
2379 snap_count * sizeof (snapc->snaps[0]);
2380 snapc = kmalloc(size, GFP_KERNEL);
2381 if (!snapc) {
2382 ret = -ENOMEM;
2383 goto out;
2384 }
2385
2386 atomic_set(&snapc->nref, 1);
2387 snapc->seq = seq;
2388 snapc->num_snaps = snap_count;
2389 for (i = 0; i < snap_count; i++)
2390 snapc->snaps[i] = ceph_decode_64(&p);
2391
2392 rbd_dev->header.snapc = snapc;
2393
2394 dout(" snap context seq = %llu, snap_count = %u\n",
2395 (unsigned long long) seq, (unsigned int) snap_count);
2396
2397 out:
2398 kfree(reply_buf);
2399
2400 return 0;
2401 }
2402
2403 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2404 {
2405 size_t size;
2406 void *reply_buf;
2407 __le64 snap_id;
2408 int ret;
2409 void *p;
2410 void *end;
2411 size_t snap_name_len;
2412 char *snap_name;
2413
2414 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2415 reply_buf = kmalloc(size, GFP_KERNEL);
2416 if (!reply_buf)
2417 return ERR_PTR(-ENOMEM);
2418
2419 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2420 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2421 "rbd", "get_snapshot_name",
2422 (char *) &snap_id, sizeof (snap_id),
2423 reply_buf, size,
2424 CEPH_OSD_FLAG_READ, NULL);
2425 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2426 if (ret < 0)
2427 goto out;
2428
2429 p = reply_buf;
2430 end = (char *) reply_buf + size;
2431 snap_name_len = 0;
2432 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2433 GFP_KERNEL);
2434 if (IS_ERR(snap_name)) {
2435 ret = PTR_ERR(snap_name);
2436 goto out;
2437 } else {
2438 dout(" snap_id 0x%016llx snap_name = %s\n",
2439 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2440 }
2441 kfree(reply_buf);
2442
2443 return snap_name;
2444 out:
2445 kfree(reply_buf);
2446
2447 return ERR_PTR(ret);
2448 }
2449
2450 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2451 u64 *snap_size, u64 *snap_features)
2452 {
2453 __le64 snap_id;
2454 u8 order;
2455 int ret;
2456
2457 snap_id = rbd_dev->header.snapc->snaps[which];
2458 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2459 if (ret)
2460 return ERR_PTR(ret);
2461 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2462 if (ret)
2463 return ERR_PTR(ret);
2464
2465 return rbd_dev_v2_snap_name(rbd_dev, which);
2466 }
2467
2468 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2469 u64 *snap_size, u64 *snap_features)
2470 {
2471 if (rbd_dev->image_format == 1)
2472 return rbd_dev_v1_snap_info(rbd_dev, which,
2473 snap_size, snap_features);
2474 if (rbd_dev->image_format == 2)
2475 return rbd_dev_v2_snap_info(rbd_dev, which,
2476 snap_size, snap_features);
2477 return ERR_PTR(-EINVAL);
2478 }
2479
2480 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2481 {
2482 int ret;
2483 __u8 obj_order;
2484
2485 down_write(&rbd_dev->header_rwsem);
2486
2487 /* Grab old order first, to see if it changes */
2488
2489 obj_order = rbd_dev->header.obj_order,
2490 ret = rbd_dev_v2_image_size(rbd_dev);
2491 if (ret)
2492 goto out;
2493 if (rbd_dev->header.obj_order != obj_order) {
2494 ret = -EIO;
2495 goto out;
2496 }
2497 rbd_update_mapping_size(rbd_dev);
2498
2499 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2500 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2501 if (ret)
2502 goto out;
2503 ret = rbd_dev_snaps_update(rbd_dev);
2504 dout("rbd_dev_snaps_update returned %d\n", ret);
2505 if (ret)
2506 goto out;
2507 ret = rbd_dev_snaps_register(rbd_dev);
2508 dout("rbd_dev_snaps_register returned %d\n", ret);
2509 out:
2510 up_write(&rbd_dev->header_rwsem);
2511
2512 return ret;
2513 }
2514
2515 /*
2516 * Scan the rbd device's current snapshot list and compare it to the
2517 * newly-received snapshot context. Remove any existing snapshots
2518 * not present in the new snapshot context. Add a new snapshot for
2519 * any snaphots in the snapshot context not in the current list.
2520 * And verify there are no changes to snapshots we already know
2521 * about.
2522 *
2523 * Assumes the snapshots in the snapshot context are sorted by
2524 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2525 * are also maintained in that order.)
2526 */
2527 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2528 {
2529 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2530 const u32 snap_count = snapc->num_snaps;
2531 struct list_head *head = &rbd_dev->snaps;
2532 struct list_head *links = head->next;
2533 u32 index = 0;
2534
2535 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2536 while (index < snap_count || links != head) {
2537 u64 snap_id;
2538 struct rbd_snap *snap;
2539 char *snap_name;
2540 u64 snap_size = 0;
2541 u64 snap_features = 0;
2542
2543 snap_id = index < snap_count ? snapc->snaps[index]
2544 : CEPH_NOSNAP;
2545 snap = links != head ? list_entry(links, struct rbd_snap, node)
2546 : NULL;
2547 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2548
2549 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2550 struct list_head *next = links->next;
2551
2552 /* Existing snapshot not in the new snap context */
2553
2554 if (rbd_dev->snap_id == snap->id)
2555 rbd_dev->exists = false;
2556 rbd_remove_snap_dev(snap);
2557 dout("%ssnap id %llu has been removed\n",
2558 rbd_dev->snap_id == snap->id ? "mapped " : "",
2559 (unsigned long long) snap->id);
2560
2561 /* Done with this list entry; advance */
2562
2563 links = next;
2564 continue;
2565 }
2566
2567 snap_name = rbd_dev_snap_info(rbd_dev, index,
2568 &snap_size, &snap_features);
2569 if (IS_ERR(snap_name))
2570 return PTR_ERR(snap_name);
2571
2572 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2573 (unsigned long long) snap_id);
2574 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2575 struct rbd_snap *new_snap;
2576
2577 /* We haven't seen this snapshot before */
2578
2579 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2580 snap_id, snap_size, snap_features);
2581 if (IS_ERR(new_snap)) {
2582 int err = PTR_ERR(new_snap);
2583
2584 dout(" failed to add dev, error %d\n", err);
2585
2586 return err;
2587 }
2588
2589 /* New goes before existing, or at end of list */
2590
2591 dout(" added dev%s\n", snap ? "" : " at end\n");
2592 if (snap)
2593 list_add_tail(&new_snap->node, &snap->node);
2594 else
2595 list_add_tail(&new_snap->node, head);
2596 } else {
2597 /* Already have this one */
2598
2599 dout(" already present\n");
2600
2601 rbd_assert(snap->size == snap_size);
2602 rbd_assert(!strcmp(snap->name, snap_name));
2603 rbd_assert(snap->features == snap_features);
2604
2605 /* Done with this list entry; advance */
2606
2607 links = links->next;
2608 }
2609
2610 /* Advance to the next entry in the snapshot context */
2611
2612 index++;
2613 }
2614 dout("%s: done\n", __func__);
2615
2616 return 0;
2617 }
2618
2619 /*
2620 * Scan the list of snapshots and register the devices for any that
2621 * have not already been registered.
2622 */
2623 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2624 {
2625 struct rbd_snap *snap;
2626 int ret = 0;
2627
2628 dout("%s called\n", __func__);
2629 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2630 return -EIO;
2631
2632 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2633 if (!rbd_snap_registered(snap)) {
2634 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2635 if (ret < 0)
2636 break;
2637 }
2638 }
2639 dout("%s: returning %d\n", __func__, ret);
2640
2641 return ret;
2642 }
2643
2644 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2645 {
2646 struct device *dev;
2647 int ret;
2648
2649 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2650
2651 dev = &rbd_dev->dev;
2652 dev->bus = &rbd_bus_type;
2653 dev->type = &rbd_device_type;
2654 dev->parent = &rbd_root_dev;
2655 dev->release = rbd_dev_release;
2656 dev_set_name(dev, "%d", rbd_dev->dev_id);
2657 ret = device_register(dev);
2658
2659 mutex_unlock(&ctl_mutex);
2660
2661 return ret;
2662 }
2663
2664 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2665 {
2666 device_unregister(&rbd_dev->dev);
2667 }
2668
2669 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2670 {
2671 int ret, rc;
2672
2673 do {
2674 ret = rbd_req_sync_watch(rbd_dev);
2675 if (ret == -ERANGE) {
2676 rc = rbd_dev_refresh(rbd_dev, NULL);
2677 if (rc < 0)
2678 return rc;
2679 }
2680 } while (ret == -ERANGE);
2681
2682 return ret;
2683 }
2684
2685 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2686
2687 /*
2688 * Get a unique rbd identifier for the given new rbd_dev, and add
2689 * the rbd_dev to the global list. The minimum rbd id is 1.
2690 */
2691 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2692 {
2693 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2694
2695 spin_lock(&rbd_dev_list_lock);
2696 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2697 spin_unlock(&rbd_dev_list_lock);
2698 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2699 (unsigned long long) rbd_dev->dev_id);
2700 }
2701
2702 /*
2703 * Remove an rbd_dev from the global list, and record that its
2704 * identifier is no longer in use.
2705 */
2706 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2707 {
2708 struct list_head *tmp;
2709 int rbd_id = rbd_dev->dev_id;
2710 int max_id;
2711
2712 rbd_assert(rbd_id > 0);
2713
2714 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2715 (unsigned long long) rbd_dev->dev_id);
2716 spin_lock(&rbd_dev_list_lock);
2717 list_del_init(&rbd_dev->node);
2718
2719 /*
2720 * If the id being "put" is not the current maximum, there
2721 * is nothing special we need to do.
2722 */
2723 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2724 spin_unlock(&rbd_dev_list_lock);
2725 return;
2726 }
2727
2728 /*
2729 * We need to update the current maximum id. Search the
2730 * list to find out what it is. We're more likely to find
2731 * the maximum at the end, so search the list backward.
2732 */
2733 max_id = 0;
2734 list_for_each_prev(tmp, &rbd_dev_list) {
2735 struct rbd_device *rbd_dev;
2736
2737 rbd_dev = list_entry(tmp, struct rbd_device, node);
2738 if (rbd_dev->dev_id > max_id)
2739 max_id = rbd_dev->dev_id;
2740 }
2741 spin_unlock(&rbd_dev_list_lock);
2742
2743 /*
2744 * The max id could have been updated by rbd_dev_id_get(), in
2745 * which case it now accurately reflects the new maximum.
2746 * Be careful not to overwrite the maximum value in that
2747 * case.
2748 */
2749 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2750 dout(" max dev id has been reset\n");
2751 }
2752
2753 /*
2754 * Skips over white space at *buf, and updates *buf to point to the
2755 * first found non-space character (if any). Returns the length of
2756 * the token (string of non-white space characters) found. Note
2757 * that *buf must be terminated with '\0'.
2758 */
2759 static inline size_t next_token(const char **buf)
2760 {
2761 /*
2762 * These are the characters that produce nonzero for
2763 * isspace() in the "C" and "POSIX" locales.
2764 */
2765 const char *spaces = " \f\n\r\t\v";
2766
2767 *buf += strspn(*buf, spaces); /* Find start of token */
2768
2769 return strcspn(*buf, spaces); /* Return token length */
2770 }
2771
2772 /*
2773 * Finds the next token in *buf, and if the provided token buffer is
2774 * big enough, copies the found token into it. The result, if
2775 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2776 * must be terminated with '\0' on entry.
2777 *
2778 * Returns the length of the token found (not including the '\0').
2779 * Return value will be 0 if no token is found, and it will be >=
2780 * token_size if the token would not fit.
2781 *
2782 * The *buf pointer will be updated to point beyond the end of the
2783 * found token. Note that this occurs even if the token buffer is
2784 * too small to hold it.
2785 */
2786 static inline size_t copy_token(const char **buf,
2787 char *token,
2788 size_t token_size)
2789 {
2790 size_t len;
2791
2792 len = next_token(buf);
2793 if (len < token_size) {
2794 memcpy(token, *buf, len);
2795 *(token + len) = '\0';
2796 }
2797 *buf += len;
2798
2799 return len;
2800 }
2801
2802 /*
2803 * Finds the next token in *buf, dynamically allocates a buffer big
2804 * enough to hold a copy of it, and copies the token into the new
2805 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2806 * that a duplicate buffer is created even for a zero-length token.
2807 *
2808 * Returns a pointer to the newly-allocated duplicate, or a null
2809 * pointer if memory for the duplicate was not available. If
2810 * the lenp argument is a non-null pointer, the length of the token
2811 * (not including the '\0') is returned in *lenp.
2812 *
2813 * If successful, the *buf pointer will be updated to point beyond
2814 * the end of the found token.
2815 *
2816 * Note: uses GFP_KERNEL for allocation.
2817 */
2818 static inline char *dup_token(const char **buf, size_t *lenp)
2819 {
2820 char *dup;
2821 size_t len;
2822
2823 len = next_token(buf);
2824 dup = kmalloc(len + 1, GFP_KERNEL);
2825 if (!dup)
2826 return NULL;
2827
2828 memcpy(dup, *buf, len);
2829 *(dup + len) = '\0';
2830 *buf += len;
2831
2832 if (lenp)
2833 *lenp = len;
2834
2835 return dup;
2836 }
2837
2838 /*
2839 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2840 * rbd_md_name, and name fields of the given rbd_dev, based on the
2841 * list of monitor addresses and other options provided via
2842 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2843 * copy of the snapshot name to map if successful, or a
2844 * pointer-coded error otherwise.
2845 *
2846 * Note: rbd_dev is assumed to have been initially zero-filled.
2847 */
2848 static struct ceph_options *rbd_add_parse_args(struct rbd_device *rbd_dev,
2849 const char *buf,
2850 char *options,
2851 size_t options_size,
2852 char **snap_name,
2853 size_t *snap_name_len)
2854 {
2855 size_t len;
2856 const char *mon_addrs;
2857 size_t mon_addrs_size;
2858 struct rbd_options rbd_opts;
2859 struct ceph_options *ceph_opts;
2860 struct ceph_options *err_ptr = ERR_PTR(-EINVAL);
2861
2862 /* The first four tokens are required */
2863
2864 len = next_token(&buf);
2865 if (!len)
2866 return err_ptr;
2867 mon_addrs_size = len + 1;
2868 mon_addrs = buf;
2869
2870 buf += len;
2871
2872 len = copy_token(&buf, options, options_size);
2873 if (!len || len >= options_size)
2874 return err_ptr;
2875
2876 err_ptr = ERR_PTR(-ENOMEM);
2877 rbd_dev->pool_name = dup_token(&buf, NULL);
2878 if (!rbd_dev->pool_name)
2879 goto out_err;
2880
2881 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2882 if (!rbd_dev->image_name)
2883 goto out_err;
2884
2885 /* Snapshot name is optional; default is to use "head" */
2886
2887 len = next_token(&buf);
2888 if (len > RBD_MAX_SNAP_NAME_LEN) {
2889 err_ptr = ERR_PTR(-ENAMETOOLONG);
2890 goto out_err;
2891 }
2892 if (!len) {
2893 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2894 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2895 }
2896 *snap_name = kmalloc(len + 1, GFP_KERNEL);
2897 if (!*snap_name)
2898 goto out_err;
2899 memcpy(*snap_name, buf, len);
2900 *(*snap_name + len) = '\0';
2901 *snap_name_len = len;
2902 /* Initialize all rbd options to the defaults */
2903
2904 rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
2905
2906 ceph_opts = ceph_parse_options(options, mon_addrs,
2907 mon_addrs + mon_addrs_size - 1,
2908 parse_rbd_opts_token, &rbd_opts);
2909
2910 /* Record the parsed rbd options */
2911
2912 if (!IS_ERR(ceph_opts)) {
2913 rbd_dev->mapping.read_only = rbd_opts.read_only;
2914 }
2915
2916 return ceph_opts;
2917 out_err:
2918 kfree(rbd_dev->image_name);
2919 rbd_dev->image_name = NULL;
2920 rbd_dev->image_name_len = 0;
2921 kfree(rbd_dev->pool_name);
2922 rbd_dev->pool_name = NULL;
2923
2924 return err_ptr;
2925 }
2926
2927 /*
2928 * An rbd format 2 image has a unique identifier, distinct from the
2929 * name given to it by the user. Internally, that identifier is
2930 * what's used to specify the names of objects related to the image.
2931 *
2932 * A special "rbd id" object is used to map an rbd image name to its
2933 * id. If that object doesn't exist, then there is no v2 rbd image
2934 * with the supplied name.
2935 *
2936 * This function will record the given rbd_dev's image_id field if
2937 * it can be determined, and in that case will return 0. If any
2938 * errors occur a negative errno will be returned and the rbd_dev's
2939 * image_id field will be unchanged (and should be NULL).
2940 */
2941 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2942 {
2943 int ret;
2944 size_t size;
2945 char *object_name;
2946 void *response;
2947 void *p;
2948
2949 /*
2950 * First, see if the format 2 image id file exists, and if
2951 * so, get the image's persistent id from it.
2952 */
2953 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2954 object_name = kmalloc(size, GFP_NOIO);
2955 if (!object_name)
2956 return -ENOMEM;
2957 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2958 dout("rbd id object name is %s\n", object_name);
2959
2960 /* Response will be an encoded string, which includes a length */
2961
2962 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2963 response = kzalloc(size, GFP_NOIO);
2964 if (!response) {
2965 ret = -ENOMEM;
2966 goto out;
2967 }
2968
2969 ret = rbd_req_sync_exec(rbd_dev, object_name,
2970 "rbd", "get_id",
2971 NULL, 0,
2972 response, RBD_IMAGE_ID_LEN_MAX,
2973 CEPH_OSD_FLAG_READ, NULL);
2974 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2975 if (ret < 0)
2976 goto out;
2977 ret = 0; /* rbd_req_sync_exec() can return positive */
2978
2979 p = response;
2980 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2981 p + RBD_IMAGE_ID_LEN_MAX,
2982 &rbd_dev->image_id_len,
2983 GFP_NOIO);
2984 if (IS_ERR(rbd_dev->image_id)) {
2985 ret = PTR_ERR(rbd_dev->image_id);
2986 rbd_dev->image_id = NULL;
2987 } else {
2988 dout("image_id is %s\n", rbd_dev->image_id);
2989 }
2990 out:
2991 kfree(response);
2992 kfree(object_name);
2993
2994 return ret;
2995 }
2996
2997 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2998 {
2999 int ret;
3000 size_t size;
3001
3002 /* Version 1 images have no id; empty string is used */
3003
3004 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3005 if (!rbd_dev->image_id)
3006 return -ENOMEM;
3007 rbd_dev->image_id_len = 0;
3008
3009 /* Record the header object name for this rbd image. */
3010
3011 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3012 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3013 if (!rbd_dev->header_name) {
3014 ret = -ENOMEM;
3015 goto out_err;
3016 }
3017 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3018
3019 /* Populate rbd image metadata */
3020
3021 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3022 if (ret < 0)
3023 goto out_err;
3024 rbd_dev->image_format = 1;
3025
3026 dout("discovered version 1 image, header name is %s\n",
3027 rbd_dev->header_name);
3028
3029 return 0;
3030
3031 out_err:
3032 kfree(rbd_dev->header_name);
3033 rbd_dev->header_name = NULL;
3034 kfree(rbd_dev->image_id);
3035 rbd_dev->image_id = NULL;
3036
3037 return ret;
3038 }
3039
3040 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3041 {
3042 size_t size;
3043 int ret;
3044 u64 ver = 0;
3045
3046 /*
3047 * Image id was filled in by the caller. Record the header
3048 * object name for this rbd image.
3049 */
3050 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3051 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3052 if (!rbd_dev->header_name)
3053 return -ENOMEM;
3054 sprintf(rbd_dev->header_name, "%s%s",
3055 RBD_HEADER_PREFIX, rbd_dev->image_id);
3056
3057 /* Get the size and object order for the image */
3058
3059 ret = rbd_dev_v2_image_size(rbd_dev);
3060 if (ret < 0)
3061 goto out_err;
3062
3063 /* Get the object prefix (a.k.a. block_name) for the image */
3064
3065 ret = rbd_dev_v2_object_prefix(rbd_dev);
3066 if (ret < 0)
3067 goto out_err;
3068
3069 /* Get the and check features for the image */
3070
3071 ret = rbd_dev_v2_features(rbd_dev);
3072 if (ret < 0)
3073 goto out_err;
3074
3075 /* crypto and compression type aren't (yet) supported for v2 images */
3076
3077 rbd_dev->header.crypt_type = 0;
3078 rbd_dev->header.comp_type = 0;
3079
3080 /* Get the snapshot context, plus the header version */
3081
3082 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3083 if (ret)
3084 goto out_err;
3085 rbd_dev->header.obj_version = ver;
3086
3087 rbd_dev->image_format = 2;
3088
3089 dout("discovered version 2 image, header name is %s\n",
3090 rbd_dev->header_name);
3091
3092 return 0;
3093 out_err:
3094 kfree(rbd_dev->header_name);
3095 rbd_dev->header_name = NULL;
3096 kfree(rbd_dev->header.object_prefix);
3097 rbd_dev->header.object_prefix = NULL;
3098
3099 return ret;
3100 }
3101
3102 /*
3103 * Probe for the existence of the header object for the given rbd
3104 * device. For format 2 images this includes determining the image
3105 * id.
3106 */
3107 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3108 {
3109 int ret;
3110
3111 /*
3112 * Get the id from the image id object. If it's not a
3113 * format 2 image, we'll get ENOENT back, and we'll assume
3114 * it's a format 1 image.
3115 */
3116 ret = rbd_dev_image_id(rbd_dev);
3117 if (ret)
3118 ret = rbd_dev_v1_probe(rbd_dev);
3119 else
3120 ret = rbd_dev_v2_probe(rbd_dev);
3121 if (ret)
3122 dout("probe failed, returning %d\n", ret);
3123
3124 return ret;
3125 }
3126
3127 static ssize_t rbd_add(struct bus_type *bus,
3128 const char *buf,
3129 size_t count)
3130 {
3131 char *options;
3132 struct rbd_device *rbd_dev = NULL;
3133 char *snap_name;
3134 size_t snap_name_len = 0;
3135 struct ceph_options *ceph_opts;
3136 struct ceph_osd_client *osdc;
3137 int rc = -ENOMEM;
3138
3139 if (!try_module_get(THIS_MODULE))
3140 return -ENODEV;
3141
3142 options = kmalloc(count, GFP_KERNEL);
3143 if (!options)
3144 goto err_out_mem;
3145 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3146 if (!rbd_dev)
3147 goto err_out_mem;
3148
3149 /* static rbd_device initialization */
3150 spin_lock_init(&rbd_dev->lock);
3151 INIT_LIST_HEAD(&rbd_dev->node);
3152 INIT_LIST_HEAD(&rbd_dev->snaps);
3153 init_rwsem(&rbd_dev->header_rwsem);
3154
3155 /* parse add command */
3156 ceph_opts = rbd_add_parse_args(rbd_dev, buf, options, count,
3157 &snap_name, &snap_name_len);
3158 if (IS_ERR(ceph_opts)) {
3159 rc = PTR_ERR(ceph_opts);
3160 goto err_out_mem;
3161 }
3162
3163 rc = rbd_get_client(rbd_dev, ceph_opts);
3164 if (rc < 0)
3165 goto err_out_args;
3166 ceph_opts = NULL; /* ceph_opts now owned by rbd_dev client */
3167
3168 /* pick the pool */
3169 osdc = &rbd_dev->rbd_client->client->osdc;
3170 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3171 if (rc < 0)
3172 goto err_out_client;
3173 rbd_dev->pool_id = (u64) rc;
3174
3175 rc = rbd_dev_probe(rbd_dev);
3176 if (rc < 0)
3177 goto err_out_client;
3178
3179 /* no need to lock here, as rbd_dev is not registered yet */
3180 rc = rbd_dev_snaps_update(rbd_dev);
3181 if (rc)
3182 goto err_out_probe;
3183
3184 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3185 if (rc)
3186 goto err_out_snaps;
3187
3188 /* generate unique id: find highest unique id, add one */
3189 rbd_dev_id_get(rbd_dev);
3190
3191 /* Fill in the device name, now that we have its id. */
3192 BUILD_BUG_ON(DEV_NAME_LEN
3193 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3194 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3195
3196 /* Get our block major device number. */
3197
3198 rc = register_blkdev(0, rbd_dev->name);
3199 if (rc < 0)
3200 goto err_out_id;
3201 rbd_dev->major = rc;
3202
3203 /* Set up the blkdev mapping. */
3204
3205 rc = rbd_init_disk(rbd_dev);
3206 if (rc)
3207 goto err_out_blkdev;
3208
3209 rc = rbd_bus_add_dev(rbd_dev);
3210 if (rc)
3211 goto err_out_disk;
3212
3213 /*
3214 * At this point cleanup in the event of an error is the job
3215 * of the sysfs code (initiated by rbd_bus_del_dev()).
3216 */
3217
3218 down_write(&rbd_dev->header_rwsem);
3219 rc = rbd_dev_snaps_register(rbd_dev);
3220 up_write(&rbd_dev->header_rwsem);
3221 if (rc)
3222 goto err_out_bus;
3223
3224 rc = rbd_init_watch_dev(rbd_dev);
3225 if (rc)
3226 goto err_out_bus;
3227
3228 /* Everything's ready. Announce the disk to the world. */
3229
3230 add_disk(rbd_dev->disk);
3231
3232 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3233 (unsigned long long) rbd_dev->mapping.size);
3234
3235 return count;
3236
3237 err_out_bus:
3238 /* this will also clean up rest of rbd_dev stuff */
3239
3240 rbd_bus_del_dev(rbd_dev);
3241 kfree(options);
3242 return rc;
3243
3244 err_out_disk:
3245 rbd_free_disk(rbd_dev);
3246 err_out_blkdev:
3247 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3248 err_out_id:
3249 rbd_dev_id_put(rbd_dev);
3250 err_out_snaps:
3251 rbd_remove_all_snaps(rbd_dev);
3252 err_out_probe:
3253 rbd_header_free(&rbd_dev->header);
3254 err_out_client:
3255 kfree(rbd_dev->header_name);
3256 rbd_put_client(rbd_dev);
3257 kfree(rbd_dev->image_id);
3258 err_out_args:
3259 if (ceph_opts)
3260 ceph_destroy_options(ceph_opts);
3261 kfree(rbd_dev->snap_name);
3262 kfree(rbd_dev->image_name);
3263 kfree(rbd_dev->pool_name);
3264 err_out_mem:
3265 kfree(rbd_dev);
3266 kfree(options);
3267
3268 dout("Error adding device %s\n", buf);
3269 module_put(THIS_MODULE);
3270
3271 return (ssize_t) rc;
3272 }
3273
3274 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3275 {
3276 struct list_head *tmp;
3277 struct rbd_device *rbd_dev;
3278
3279 spin_lock(&rbd_dev_list_lock);
3280 list_for_each(tmp, &rbd_dev_list) {
3281 rbd_dev = list_entry(tmp, struct rbd_device, node);
3282 if (rbd_dev->dev_id == dev_id) {
3283 spin_unlock(&rbd_dev_list_lock);
3284 return rbd_dev;
3285 }
3286 }
3287 spin_unlock(&rbd_dev_list_lock);
3288 return NULL;
3289 }
3290
3291 static void rbd_dev_release(struct device *dev)
3292 {
3293 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294
3295 if (rbd_dev->watch_request) {
3296 struct ceph_client *client = rbd_dev->rbd_client->client;
3297
3298 ceph_osdc_unregister_linger_request(&client->osdc,
3299 rbd_dev->watch_request);
3300 }
3301 if (rbd_dev->watch_event)
3302 rbd_req_sync_unwatch(rbd_dev);
3303
3304 rbd_put_client(rbd_dev);
3305
3306 /* clean up and free blkdev */
3307 rbd_free_disk(rbd_dev);
3308 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3309
3310 /* release allocated disk header fields */
3311 rbd_header_free(&rbd_dev->header);
3312
3313 /* done with the id, and with the rbd_dev */
3314 kfree(rbd_dev->snap_name);
3315 kfree(rbd_dev->image_id);
3316 kfree(rbd_dev->header_name);
3317 kfree(rbd_dev->pool_name);
3318 kfree(rbd_dev->image_name);
3319 rbd_dev_id_put(rbd_dev);
3320 kfree(rbd_dev);
3321
3322 /* release module ref */
3323 module_put(THIS_MODULE);
3324 }
3325
3326 static ssize_t rbd_remove(struct bus_type *bus,
3327 const char *buf,
3328 size_t count)
3329 {
3330 struct rbd_device *rbd_dev = NULL;
3331 int target_id, rc;
3332 unsigned long ul;
3333 int ret = count;
3334
3335 rc = strict_strtoul(buf, 10, &ul);
3336 if (rc)
3337 return rc;
3338
3339 /* convert to int; abort if we lost anything in the conversion */
3340 target_id = (int) ul;
3341 if (target_id != ul)
3342 return -EINVAL;
3343
3344 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3345
3346 rbd_dev = __rbd_get_dev(target_id);
3347 if (!rbd_dev) {
3348 ret = -ENOENT;
3349 goto done;
3350 }
3351
3352 rbd_remove_all_snaps(rbd_dev);
3353 rbd_bus_del_dev(rbd_dev);
3354
3355 done:
3356 mutex_unlock(&ctl_mutex);
3357
3358 return ret;
3359 }
3360
3361 /*
3362 * create control files in sysfs
3363 * /sys/bus/rbd/...
3364 */
3365 static int rbd_sysfs_init(void)
3366 {
3367 int ret;
3368
3369 ret = device_register(&rbd_root_dev);
3370 if (ret < 0)
3371 return ret;
3372
3373 ret = bus_register(&rbd_bus_type);
3374 if (ret < 0)
3375 device_unregister(&rbd_root_dev);
3376
3377 return ret;
3378 }
3379
3380 static void rbd_sysfs_cleanup(void)
3381 {
3382 bus_unregister(&rbd_bus_type);
3383 device_unregister(&rbd_root_dev);
3384 }
3385
3386 int __init rbd_init(void)
3387 {
3388 int rc;
3389
3390 rc = rbd_sysfs_init();
3391 if (rc)
3392 return rc;
3393 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3394 return 0;
3395 }
3396
3397 void __exit rbd_exit(void)
3398 {
3399 rbd_sysfs_cleanup();
3400 }
3401
3402 module_init(rbd_init);
3403 module_exit(rbd_exit);
3404
3405 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3406 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3407 MODULE_DESCRIPTION("rados block device");
3408
3409 /* following authorship retained from original osdblk.c */
3410 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3411
3412 MODULE_LICENSE("GPL");
This page took 0.108947 seconds and 4 git commands to generate.