2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
67 #define RBD_SNAP_HEAD_NAME "-"
69 #define RBD_IMAGE_ID_LEN_MAX 64
72 * An RBD device name will be "rbd#", where the "rbd" comes from
73 * RBD_DRV_NAME above, and # is a unique integer identifier.
74 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
75 * enough to hold all possible device names.
77 #define DEV_NAME_LEN 32
78 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
80 #define RBD_READ_ONLY_DEFAULT false
83 * block device image metadata (in-memory version)
85 struct rbd_image_header
{
86 /* These four fields never change for a given rbd image */
92 /* The remaining fields need to be updated occasionally */
94 struct ceph_snap_context
*snapc
;
106 * an instance of the client. multiple devices may share an rbd client.
109 struct ceph_client
*client
;
111 struct list_head node
;
115 * a request completion status
117 struct rbd_req_status
{
124 * a collection of requests
126 struct rbd_req_coll
{
130 struct rbd_req_status status
[0];
134 * a single io request
137 struct request
*rq
; /* blk layer request */
138 struct bio
*bio
; /* cloned bio */
139 struct page
**pages
; /* list of used pages */
142 struct rbd_req_coll
*coll
;
149 struct list_head node
;
165 int dev_id
; /* blkdev unique id */
167 int major
; /* blkdev assigned major */
168 struct gendisk
*disk
; /* blkdev's gendisk and rq */
170 struct rbd_options rbd_opts
;
171 struct rbd_client
*rbd_client
;
173 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
175 spinlock_t lock
; /* queue lock */
177 struct rbd_image_header header
;
181 size_t image_name_len
;
186 struct ceph_osd_event
*watch_event
;
187 struct ceph_osd_request
*watch_request
;
189 /* protects updating the header */
190 struct rw_semaphore header_rwsem
;
192 struct rbd_mapping mapping
;
194 struct list_head node
;
196 /* list of snapshots */
197 struct list_head snaps
;
203 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
205 static LIST_HEAD(rbd_dev_list
); /* devices */
206 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
208 static LIST_HEAD(rbd_client_list
); /* clients */
209 static DEFINE_SPINLOCK(rbd_client_list_lock
);
211 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
212 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
214 static void rbd_dev_release(struct device
*dev
);
215 static ssize_t
rbd_snap_add(struct device
*dev
,
216 struct device_attribute
*attr
,
219 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
);
221 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
223 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
226 static struct bus_attribute rbd_bus_attrs
[] = {
227 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
228 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
232 static struct bus_type rbd_bus_type
= {
234 .bus_attrs
= rbd_bus_attrs
,
237 static void rbd_root_dev_release(struct device
*dev
)
241 static struct device rbd_root_dev
= {
243 .release
= rbd_root_dev_release
,
247 #define rbd_assert(expr) \
248 if (unlikely(!(expr))) { \
249 printk(KERN_ERR "\nAssertion failure in %s() " \
251 "\trbd_assert(%s);\n\n", \
252 __func__, __LINE__, #expr); \
255 #else /* !RBD_DEBUG */
256 # define rbd_assert(expr) ((void) 0)
257 #endif /* !RBD_DEBUG */
259 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
261 return get_device(&rbd_dev
->dev
);
264 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
266 put_device(&rbd_dev
->dev
);
269 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
);
271 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
273 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
275 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
278 rbd_get_dev(rbd_dev
);
279 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
284 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
286 struct rbd_device
*rbd_dev
= disk
->private_data
;
288 rbd_put_dev(rbd_dev
);
293 static const struct block_device_operations rbd_bd_ops
= {
294 .owner
= THIS_MODULE
,
296 .release
= rbd_release
,
300 * Initialize an rbd client instance.
303 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
305 struct rbd_client
*rbdc
;
308 dout("rbd_client_create\n");
309 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
313 kref_init(&rbdc
->kref
);
314 INIT_LIST_HEAD(&rbdc
->node
);
316 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
318 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
319 if (IS_ERR(rbdc
->client
))
321 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
323 ret
= ceph_open_session(rbdc
->client
);
327 spin_lock(&rbd_client_list_lock
);
328 list_add_tail(&rbdc
->node
, &rbd_client_list
);
329 spin_unlock(&rbd_client_list_lock
);
331 mutex_unlock(&ctl_mutex
);
333 dout("rbd_client_create created %p\n", rbdc
);
337 ceph_destroy_client(rbdc
->client
);
339 mutex_unlock(&ctl_mutex
);
343 ceph_destroy_options(ceph_opts
);
348 * Find a ceph client with specific addr and configuration. If
349 * found, bump its reference count.
351 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
353 struct rbd_client
*client_node
;
356 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
359 spin_lock(&rbd_client_list_lock
);
360 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
361 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
362 kref_get(&client_node
->kref
);
367 spin_unlock(&rbd_client_list_lock
);
369 return found
? client_node
: NULL
;
379 /* string args above */
382 /* Boolean args above */
386 static match_table_t rbd_opts_tokens
= {
388 /* string args above */
389 {Opt_read_only
, "mapping.read_only"},
390 {Opt_read_only
, "ro"}, /* Alternate spelling */
391 {Opt_read_write
, "read_write"},
392 {Opt_read_write
, "rw"}, /* Alternate spelling */
393 /* Boolean args above */
397 static int parse_rbd_opts_token(char *c
, void *private)
399 struct rbd_options
*rbd_opts
= private;
400 substring_t argstr
[MAX_OPT_ARGS
];
401 int token
, intval
, ret
;
403 token
= match_token(c
, rbd_opts_tokens
, argstr
);
407 if (token
< Opt_last_int
) {
408 ret
= match_int(&argstr
[0], &intval
);
410 pr_err("bad mount option arg (not int) "
414 dout("got int token %d val %d\n", token
, intval
);
415 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
416 dout("got string token %d val %s\n", token
,
418 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
419 dout("got Boolean token %d\n", token
);
421 dout("got token %d\n", token
);
426 rbd_opts
->read_only
= true;
429 rbd_opts
->read_only
= false;
439 * Get a ceph client with specific addr and configuration, if one does
440 * not exist create it.
442 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
443 size_t mon_addr_len
, char *options
)
445 struct rbd_options
*rbd_opts
= &rbd_dev
->rbd_opts
;
446 struct ceph_options
*ceph_opts
;
447 struct rbd_client
*rbdc
;
449 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
451 ceph_opts
= ceph_parse_options(options
, mon_addr
,
452 mon_addr
+ mon_addr_len
,
453 parse_rbd_opts_token
, rbd_opts
);
454 if (IS_ERR(ceph_opts
))
455 return PTR_ERR(ceph_opts
);
457 rbdc
= rbd_client_find(ceph_opts
);
459 /* using an existing client */
460 ceph_destroy_options(ceph_opts
);
462 rbdc
= rbd_client_create(ceph_opts
);
464 return PTR_ERR(rbdc
);
466 rbd_dev
->rbd_client
= rbdc
;
472 * Destroy ceph client
474 * Caller must hold rbd_client_list_lock.
476 static void rbd_client_release(struct kref
*kref
)
478 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
480 dout("rbd_release_client %p\n", rbdc
);
481 spin_lock(&rbd_client_list_lock
);
482 list_del(&rbdc
->node
);
483 spin_unlock(&rbd_client_list_lock
);
485 ceph_destroy_client(rbdc
->client
);
490 * Drop reference to ceph client node. If it's not referenced anymore, release
493 static void rbd_put_client(struct rbd_device
*rbd_dev
)
495 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
496 rbd_dev
->rbd_client
= NULL
;
500 * Destroy requests collection
502 static void rbd_coll_release(struct kref
*kref
)
504 struct rbd_req_coll
*coll
=
505 container_of(kref
, struct rbd_req_coll
, kref
);
507 dout("rbd_coll_release %p\n", coll
);
511 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
516 /* The header has to start with the magic rbd header text */
517 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
521 * The size of a snapshot header has to fit in a size_t, and
522 * that limits the number of snapshots.
524 snap_count
= le32_to_cpu(ondisk
->snap_count
);
525 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
526 if (snap_count
> size
/ sizeof (__le64
))
530 * Not only that, but the size of the entire the snapshot
531 * header must also be representable in a size_t.
533 size
-= snap_count
* sizeof (__le64
);
534 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
541 * Create a new header structure, translate header format from the on-disk
544 static int rbd_header_from_disk(struct rbd_image_header
*header
,
545 struct rbd_image_header_ondisk
*ondisk
)
552 memset(header
, 0, sizeof (*header
));
554 snap_count
= le32_to_cpu(ondisk
->snap_count
);
556 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
557 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
558 if (!header
->object_prefix
)
560 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
561 header
->object_prefix
[len
] = '\0';
564 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
566 /* Save a copy of the snapshot names */
568 if (snap_names_len
> (u64
) SIZE_MAX
)
570 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
571 if (!header
->snap_names
)
574 * Note that rbd_dev_v1_header_read() guarantees
575 * the ondisk buffer we're working with has
576 * snap_names_len bytes beyond the end of the
577 * snapshot id array, this memcpy() is safe.
579 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
582 /* Record each snapshot's size */
584 size
= snap_count
* sizeof (*header
->snap_sizes
);
585 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
586 if (!header
->snap_sizes
)
588 for (i
= 0; i
< snap_count
; i
++)
589 header
->snap_sizes
[i
] =
590 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
592 WARN_ON(ondisk
->snap_names_len
);
593 header
->snap_names
= NULL
;
594 header
->snap_sizes
= NULL
;
597 header
->obj_order
= ondisk
->options
.order
;
598 header
->crypt_type
= ondisk
->options
.crypt_type
;
599 header
->comp_type
= ondisk
->options
.comp_type
;
601 /* Allocate and fill in the snapshot context */
603 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
604 size
= sizeof (struct ceph_snap_context
);
605 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
606 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
610 atomic_set(&header
->snapc
->nref
, 1);
611 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
612 header
->snapc
->num_snaps
= snap_count
;
613 for (i
= 0; i
< snap_count
; i
++)
614 header
->snapc
->snaps
[i
] =
615 le64_to_cpu(ondisk
->snaps
[i
].id
);
620 kfree(header
->snap_sizes
);
621 header
->snap_sizes
= NULL
;
622 kfree(header
->snap_names
);
623 header
->snap_names
= NULL
;
624 kfree(header
->object_prefix
);
625 header
->object_prefix
= NULL
;
630 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
633 struct rbd_snap
*snap
;
635 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
636 if (!strcmp(snap_name
, snap
->name
)) {
637 rbd_dev
->mapping
.snap_id
= snap
->id
;
638 rbd_dev
->mapping
.size
= snap
->size
;
647 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
, char *snap_name
)
651 if (!memcmp(snap_name
, RBD_SNAP_HEAD_NAME
,
652 sizeof (RBD_SNAP_HEAD_NAME
))) {
653 rbd_dev
->mapping
.snap_id
= CEPH_NOSNAP
;
654 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
655 rbd_dev
->mapping
.snap_exists
= false;
656 rbd_dev
->mapping
.read_only
= rbd_dev
->rbd_opts
.read_only
;
659 ret
= snap_by_name(rbd_dev
, snap_name
);
662 rbd_dev
->mapping
.snap_exists
= true;
663 rbd_dev
->mapping
.read_only
= true;
665 rbd_dev
->mapping
.snap_name
= snap_name
;
670 static void rbd_header_free(struct rbd_image_header
*header
)
672 kfree(header
->object_prefix
);
673 header
->object_prefix
= NULL
;
674 kfree(header
->snap_sizes
);
675 header
->snap_sizes
= NULL
;
676 kfree(header
->snap_names
);
677 header
->snap_names
= NULL
;
678 ceph_put_snap_context(header
->snapc
);
679 header
->snapc
= NULL
;
682 static char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
688 name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
691 segment
= offset
>> rbd_dev
->header
.obj_order
;
692 ret
= snprintf(name
, RBD_MAX_SEG_NAME_LEN
, "%s.%012llx",
693 rbd_dev
->header
.object_prefix
, segment
);
694 if (ret
< 0 || ret
>= RBD_MAX_SEG_NAME_LEN
) {
695 pr_err("error formatting segment name for #%llu (%d)\n",
704 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
706 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
708 return offset
& (segment_size
- 1);
711 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
712 u64 offset
, u64 length
)
714 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
716 offset
&= segment_size
- 1;
718 rbd_assert(length
<= U64_MAX
- offset
);
719 if (offset
+ length
> segment_size
)
720 length
= segment_size
- offset
;
725 static int rbd_get_num_segments(struct rbd_image_header
*header
,
733 if (len
- 1 > U64_MAX
- ofs
)
736 start_seg
= ofs
>> header
->obj_order
;
737 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
739 return end_seg
- start_seg
+ 1;
743 * returns the size of an object in the image
745 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
747 return 1 << header
->obj_order
;
754 static void bio_chain_put(struct bio
*chain
)
760 chain
= chain
->bi_next
;
766 * zeros a bio chain, starting at specific offset
768 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
777 bio_for_each_segment(bv
, chain
, i
) {
778 if (pos
+ bv
->bv_len
> start_ofs
) {
779 int remainder
= max(start_ofs
- pos
, 0);
780 buf
= bvec_kmap_irq(bv
, &flags
);
781 memset(buf
+ remainder
, 0,
782 bv
->bv_len
- remainder
);
783 bvec_kunmap_irq(buf
, &flags
);
788 chain
= chain
->bi_next
;
793 * bio_chain_clone - clone a chain of bios up to a certain length.
794 * might return a bio_pair that will need to be released.
796 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
797 struct bio_pair
**bp
,
798 int len
, gfp_t gfpmask
)
800 struct bio
*old_chain
= *old
;
801 struct bio
*new_chain
= NULL
;
806 bio_pair_release(*bp
);
810 while (old_chain
&& (total
< len
)) {
813 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
816 gfpmask
&= ~__GFP_WAIT
; /* can't wait after the first */
818 if (total
+ old_chain
->bi_size
> len
) {
822 * this split can only happen with a single paged bio,
823 * split_bio will BUG_ON if this is not the case
825 dout("bio_chain_clone split! total=%d remaining=%d"
827 total
, len
- total
, old_chain
->bi_size
);
829 /* split the bio. We'll release it either in the next
830 call, or it will have to be released outside */
831 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
835 __bio_clone(tmp
, &bp
->bio1
);
839 __bio_clone(tmp
, old_chain
);
840 *next
= old_chain
->bi_next
;
850 old_chain
= old_chain
->bi_next
;
852 total
+= tmp
->bi_size
;
855 rbd_assert(total
== len
);
862 dout("bio_chain_clone with err\n");
863 bio_chain_put(new_chain
);
868 * helpers for osd request op vectors.
870 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
871 int opcode
, u32 payload_len
)
873 struct ceph_osd_req_op
*ops
;
875 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
882 * op extent offset and length will be set later on
883 * in calc_raw_layout()
885 ops
[0].payload_len
= payload_len
;
890 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
895 static void rbd_coll_end_req_index(struct request
*rq
,
896 struct rbd_req_coll
*coll
,
900 struct request_queue
*q
;
903 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
904 coll
, index
, ret
, (unsigned long long) len
);
910 blk_end_request(rq
, ret
, len
);
916 spin_lock_irq(q
->queue_lock
);
917 coll
->status
[index
].done
= 1;
918 coll
->status
[index
].rc
= ret
;
919 coll
->status
[index
].bytes
= len
;
920 max
= min
= coll
->num_done
;
921 while (max
< coll
->total
&& coll
->status
[max
].done
)
924 for (i
= min
; i
<max
; i
++) {
925 __blk_end_request(rq
, coll
->status
[i
].rc
,
926 coll
->status
[i
].bytes
);
928 kref_put(&coll
->kref
, rbd_coll_release
);
930 spin_unlock_irq(q
->queue_lock
);
933 static void rbd_coll_end_req(struct rbd_request
*req
,
936 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
940 * Send ceph osd request
942 static int rbd_do_request(struct request
*rq
,
943 struct rbd_device
*rbd_dev
,
944 struct ceph_snap_context
*snapc
,
946 const char *object_name
, u64 ofs
, u64 len
,
951 struct ceph_osd_req_op
*ops
,
952 struct rbd_req_coll
*coll
,
954 void (*rbd_cb
)(struct ceph_osd_request
*req
,
955 struct ceph_msg
*msg
),
956 struct ceph_osd_request
**linger_req
,
959 struct ceph_osd_request
*req
;
960 struct ceph_file_layout
*layout
;
963 struct timespec mtime
= CURRENT_TIME
;
964 struct rbd_request
*req_data
;
965 struct ceph_osd_request_head
*reqhead
;
966 struct ceph_osd_client
*osdc
;
968 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
971 rbd_coll_end_req_index(rq
, coll
, coll_index
,
977 req_data
->coll
= coll
;
978 req_data
->coll_index
= coll_index
;
981 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name
,
982 (unsigned long long) ofs
, (unsigned long long) len
);
984 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
985 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
986 false, GFP_NOIO
, pages
, bio
);
992 req
->r_callback
= rbd_cb
;
996 req_data
->pages
= pages
;
999 req
->r_priv
= req_data
;
1001 reqhead
= req
->r_request
->front
.iov_base
;
1002 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
1004 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
1005 req
->r_oid_len
= strlen(req
->r_oid
);
1007 layout
= &req
->r_file_layout
;
1008 memset(layout
, 0, sizeof(*layout
));
1009 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1010 layout
->fl_stripe_count
= cpu_to_le32(1);
1011 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1012 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
1013 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
1016 ceph_osdc_build_request(req
, ofs
, &len
,
1020 req
->r_oid
, req
->r_oid_len
);
1023 ceph_osdc_set_request_linger(osdc
, req
);
1027 ret
= ceph_osdc_start_request(osdc
, req
, false);
1032 ret
= ceph_osdc_wait_request(osdc
, req
);
1034 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
1035 dout("reassert_ver=%llu\n",
1036 (unsigned long long)
1037 le64_to_cpu(req
->r_reassert_version
.version
));
1038 ceph_osdc_put_request(req
);
1043 bio_chain_put(req_data
->bio
);
1044 ceph_osdc_put_request(req
);
1046 rbd_coll_end_req(req_data
, ret
, len
);
1052 * Ceph osd op callback
1054 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1056 struct rbd_request
*req_data
= req
->r_priv
;
1057 struct ceph_osd_reply_head
*replyhead
;
1058 struct ceph_osd_op
*op
;
1064 replyhead
= msg
->front
.iov_base
;
1065 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1066 op
= (void *)(replyhead
+ 1);
1067 rc
= le32_to_cpu(replyhead
->result
);
1068 bytes
= le64_to_cpu(op
->extent
.length
);
1069 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1071 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1072 (unsigned long long) bytes
, read_op
, (int) rc
);
1074 if (rc
== -ENOENT
&& read_op
) {
1075 zero_bio_chain(req_data
->bio
, 0);
1077 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1078 zero_bio_chain(req_data
->bio
, bytes
);
1079 bytes
= req_data
->len
;
1082 rbd_coll_end_req(req_data
, rc
, bytes
);
1085 bio_chain_put(req_data
->bio
);
1087 ceph_osdc_put_request(req
);
1091 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1093 ceph_osdc_put_request(req
);
1097 * Do a synchronous ceph osd operation
1099 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1100 struct ceph_snap_context
*snapc
,
1103 struct ceph_osd_req_op
*ops
,
1104 const char *object_name
,
1105 u64 ofs
, u64 inbound_size
,
1107 struct ceph_osd_request
**linger_req
,
1111 struct page
**pages
;
1114 rbd_assert(ops
!= NULL
);
1116 num_pages
= calc_pages_for(ofs
, inbound_size
);
1117 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1119 return PTR_ERR(pages
);
1121 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1122 object_name
, ofs
, inbound_size
, NULL
,
1132 if ((flags
& CEPH_OSD_FLAG_READ
) && inbound
)
1133 ret
= ceph_copy_from_page_vector(pages
, inbound
, ofs
, ret
);
1136 ceph_release_page_vector(pages
, num_pages
);
1141 * Do an asynchronous ceph osd operation
1143 static int rbd_do_op(struct request
*rq
,
1144 struct rbd_device
*rbd_dev
,
1145 struct ceph_snap_context
*snapc
,
1147 int opcode
, int flags
,
1150 struct rbd_req_coll
*coll
,
1157 struct ceph_osd_req_op
*ops
;
1160 seg_name
= rbd_segment_name(rbd_dev
, ofs
);
1163 seg_len
= rbd_segment_length(rbd_dev
, ofs
, len
);
1164 seg_ofs
= rbd_segment_offset(rbd_dev
, ofs
);
1166 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1169 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1173 /* we've taken care of segment sizes earlier when we
1174 cloned the bios. We should never have a segment
1175 truncated at this point */
1176 rbd_assert(seg_len
== len
);
1178 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1179 seg_name
, seg_ofs
, seg_len
,
1185 rbd_req_cb
, 0, NULL
);
1187 rbd_destroy_ops(ops
);
1194 * Request async osd write
1196 static int rbd_req_write(struct request
*rq
,
1197 struct rbd_device
*rbd_dev
,
1198 struct ceph_snap_context
*snapc
,
1201 struct rbd_req_coll
*coll
,
1204 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1206 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1207 ofs
, len
, bio
, coll
, coll_index
);
1211 * Request async osd read
1213 static int rbd_req_read(struct request
*rq
,
1214 struct rbd_device
*rbd_dev
,
1218 struct rbd_req_coll
*coll
,
1221 return rbd_do_op(rq
, rbd_dev
, NULL
,
1225 ofs
, len
, bio
, coll
, coll_index
);
1229 * Request sync osd read
1231 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1233 const char *object_name
,
1238 struct ceph_osd_req_op
*ops
;
1241 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1245 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1248 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1249 rbd_destroy_ops(ops
);
1255 * Request sync osd watch
1257 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1261 struct ceph_osd_req_op
*ops
;
1264 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1268 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1269 ops
[0].watch
.cookie
= notify_id
;
1270 ops
[0].watch
.flag
= 0;
1272 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1273 rbd_dev
->header_name
, 0, 0, NULL
,
1278 rbd_simple_req_cb
, 0, NULL
);
1280 rbd_destroy_ops(ops
);
1284 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1286 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1293 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1294 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1295 (unsigned int) opcode
);
1296 rc
= rbd_refresh_header(rbd_dev
, &hver
);
1298 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1299 " update snaps: %d\n", rbd_dev
->major
, rc
);
1301 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1305 * Request sync osd watch
1307 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1309 struct ceph_osd_req_op
*ops
;
1310 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1313 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1317 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1318 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1322 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1323 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1324 ops
[0].watch
.flag
= 1;
1326 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1328 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1330 rbd_dev
->header_name
,
1332 &rbd_dev
->watch_request
, NULL
);
1337 rbd_destroy_ops(ops
);
1341 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1342 rbd_dev
->watch_event
= NULL
;
1344 rbd_destroy_ops(ops
);
1349 * Request sync osd unwatch
1351 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1353 struct ceph_osd_req_op
*ops
;
1356 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1360 ops
[0].watch
.ver
= 0;
1361 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1362 ops
[0].watch
.flag
= 0;
1364 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1366 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1368 rbd_dev
->header_name
,
1369 0, 0, NULL
, NULL
, NULL
);
1372 rbd_destroy_ops(ops
);
1373 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1374 rbd_dev
->watch_event
= NULL
;
1378 struct rbd_notify_info
{
1379 struct rbd_device
*rbd_dev
;
1382 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1384 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1388 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1389 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1390 (unsigned int) opcode
);
1394 * Request sync osd notify
1396 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
)
1398 struct ceph_osd_req_op
*ops
;
1399 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1400 struct ceph_osd_event
*event
;
1401 struct rbd_notify_info info
;
1402 int payload_len
= sizeof(u32
) + sizeof(u32
);
1405 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1409 info
.rbd_dev
= rbd_dev
;
1411 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1412 (void *)&info
, &event
);
1416 ops
[0].watch
.ver
= 1;
1417 ops
[0].watch
.flag
= 1;
1418 ops
[0].watch
.cookie
= event
->cookie
;
1419 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1420 ops
[0].watch
.timeout
= 12;
1422 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1424 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1426 rbd_dev
->header_name
,
1427 0, 0, NULL
, NULL
, NULL
);
1431 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1432 dout("ceph_osdc_wait_event returned %d\n", ret
);
1433 rbd_destroy_ops(ops
);
1437 ceph_osdc_cancel_event(event
);
1439 rbd_destroy_ops(ops
);
1444 * Synchronous osd object method call
1446 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1447 const char *object_name
,
1448 const char *class_name
,
1449 const char *method_name
,
1450 const char *outbound
,
1451 size_t outbound_size
,
1453 size_t inbound_size
,
1457 struct ceph_osd_req_op
*ops
;
1458 int class_name_len
= strlen(class_name
);
1459 int method_name_len
= strlen(method_name
);
1464 * Any input parameters required by the method we're calling
1465 * will be sent along with the class and method names as
1466 * part of the message payload. That data and its size are
1467 * supplied via the indata and indata_len fields (named from
1468 * the perspective of the server side) in the OSD request
1471 payload_size
= class_name_len
+ method_name_len
+ outbound_size
;
1472 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
, payload_size
);
1476 ops
[0].cls
.class_name
= class_name
;
1477 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1478 ops
[0].cls
.method_name
= method_name
;
1479 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1480 ops
[0].cls
.argc
= 0;
1481 ops
[0].cls
.indata
= outbound
;
1482 ops
[0].cls
.indata_len
= outbound_size
;
1484 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1487 object_name
, 0, inbound_size
, inbound
,
1490 rbd_destroy_ops(ops
);
1492 dout("cls_exec returned %d\n", ret
);
1496 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1498 struct rbd_req_coll
*coll
=
1499 kzalloc(sizeof(struct rbd_req_coll
) +
1500 sizeof(struct rbd_req_status
) * num_reqs
,
1505 coll
->total
= num_reqs
;
1506 kref_init(&coll
->kref
);
1511 * block device queue callback
1513 static void rbd_rq_fn(struct request_queue
*q
)
1515 struct rbd_device
*rbd_dev
= q
->queuedata
;
1517 struct bio_pair
*bp
= NULL
;
1519 while ((rq
= blk_fetch_request(q
))) {
1521 struct bio
*rq_bio
, *next_bio
= NULL
;
1526 int num_segs
, cur_seg
= 0;
1527 struct rbd_req_coll
*coll
;
1528 struct ceph_snap_context
*snapc
;
1530 dout("fetched request\n");
1532 /* filter out block requests we don't understand */
1533 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1534 __blk_end_request_all(rq
, 0);
1538 /* deduce our operation (read, write) */
1539 do_write
= (rq_data_dir(rq
) == WRITE
);
1541 size
= blk_rq_bytes(rq
);
1542 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1544 if (do_write
&& rbd_dev
->mapping
.read_only
) {
1545 __blk_end_request_all(rq
, -EROFS
);
1549 spin_unlock_irq(q
->queue_lock
);
1551 down_read(&rbd_dev
->header_rwsem
);
1553 if (rbd_dev
->mapping
.snap_id
!= CEPH_NOSNAP
&&
1554 !rbd_dev
->mapping
.snap_exists
) {
1555 up_read(&rbd_dev
->header_rwsem
);
1556 dout("request for non-existent snapshot");
1557 spin_lock_irq(q
->queue_lock
);
1558 __blk_end_request_all(rq
, -ENXIO
);
1562 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1564 up_read(&rbd_dev
->header_rwsem
);
1566 dout("%s 0x%x bytes at 0x%llx\n",
1567 do_write
? "write" : "read",
1568 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1570 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1571 if (num_segs
<= 0) {
1572 spin_lock_irq(q
->queue_lock
);
1573 __blk_end_request_all(rq
, num_segs
);
1574 ceph_put_snap_context(snapc
);
1577 coll
= rbd_alloc_coll(num_segs
);
1579 spin_lock_irq(q
->queue_lock
);
1580 __blk_end_request_all(rq
, -ENOMEM
);
1581 ceph_put_snap_context(snapc
);
1586 /* a bio clone to be passed down to OSD req */
1587 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1588 op_size
= rbd_segment_length(rbd_dev
, ofs
, size
);
1589 kref_get(&coll
->kref
);
1590 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1591 op_size
, GFP_ATOMIC
);
1593 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1599 /* init OSD command: write or read */
1601 rbd_req_write(rq
, rbd_dev
,
1607 rbd_req_read(rq
, rbd_dev
,
1608 rbd_dev
->mapping
.snap_id
,
1620 kref_put(&coll
->kref
, rbd_coll_release
);
1623 bio_pair_release(bp
);
1624 spin_lock_irq(q
->queue_lock
);
1626 ceph_put_snap_context(snapc
);
1631 * a queue callback. Makes sure that we don't create a bio that spans across
1632 * multiple osd objects. One exception would be with a single page bios,
1633 * which we handle later at bio_chain_clone
1635 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1636 struct bio_vec
*bvec
)
1638 struct rbd_device
*rbd_dev
= q
->queuedata
;
1639 unsigned int chunk_sectors
;
1641 unsigned int bio_sectors
;
1644 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1645 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1646 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1648 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1649 + bio_sectors
)) << SECTOR_SHIFT
;
1651 max
= 0; /* bio_add cannot handle a negative return */
1652 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1653 return bvec
->bv_len
;
1657 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1659 struct gendisk
*disk
= rbd_dev
->disk
;
1664 if (disk
->flags
& GENHD_FL_UP
)
1667 blk_cleanup_queue(disk
->queue
);
1672 * Read the complete header for the given rbd device.
1674 * Returns a pointer to a dynamically-allocated buffer containing
1675 * the complete and validated header. Caller can pass the address
1676 * of a variable that will be filled in with the version of the
1677 * header object at the time it was read.
1679 * Returns a pointer-coded errno if a failure occurs.
1681 static struct rbd_image_header_ondisk
*
1682 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
1684 struct rbd_image_header_ondisk
*ondisk
= NULL
;
1691 * The complete header will include an array of its 64-bit
1692 * snapshot ids, followed by the names of those snapshots as
1693 * a contiguous block of NUL-terminated strings. Note that
1694 * the number of snapshots could change by the time we read
1695 * it in, in which case we re-read it.
1702 size
= sizeof (*ondisk
);
1703 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
1705 ondisk
= kmalloc(size
, GFP_KERNEL
);
1707 return ERR_PTR(-ENOMEM
);
1709 ret
= rbd_req_sync_read(rbd_dev
, CEPH_NOSNAP
,
1710 rbd_dev
->header_name
,
1712 (char *) ondisk
, version
);
1716 if (WARN_ON((size_t) ret
< size
)) {
1718 pr_warning("short header read for image %s"
1719 " (want %zd got %d)\n",
1720 rbd_dev
->image_name
, size
, ret
);
1723 if (!rbd_dev_ondisk_valid(ondisk
)) {
1725 pr_warning("invalid header for image %s\n",
1726 rbd_dev
->image_name
);
1730 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
1731 want_count
= snap_count
;
1732 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1733 } while (snap_count
!= want_count
);
1740 return ERR_PTR(ret
);
1744 * reload the ondisk the header
1746 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1747 struct rbd_image_header
*header
)
1749 struct rbd_image_header_ondisk
*ondisk
;
1753 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
1755 return PTR_ERR(ondisk
);
1756 ret
= rbd_header_from_disk(header
, ondisk
);
1758 header
->obj_version
= ver
;
1767 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1768 const char *snap_name
,
1771 int name_len
= strlen(snap_name
);
1775 struct ceph_mon_client
*monc
;
1777 /* we should create a snapshot only if we're pointing at the head */
1778 if (rbd_dev
->mapping
.snap_id
!= CEPH_NOSNAP
)
1781 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1782 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1783 dout("created snapid=%llu\n", (unsigned long long) new_snapid
);
1787 data
= kmalloc(name_len
+ 16, gfp_flags
);
1792 e
= data
+ name_len
+ 16;
1794 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1795 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1797 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
1799 data
, (size_t) (p
- data
), NULL
, 0,
1800 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1805 return ret
< 0 ? ret
: 0;
1810 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1812 struct rbd_snap
*snap
;
1813 struct rbd_snap
*next
;
1815 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1816 __rbd_remove_snap_dev(snap
);
1820 * only read the first part of the ondisk header, without the snaps info
1822 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1825 struct rbd_image_header h
;
1827 ret
= rbd_read_header(rbd_dev
, &h
);
1831 down_write(&rbd_dev
->header_rwsem
);
1834 if (rbd_dev
->mapping
.snap_id
== CEPH_NOSNAP
) {
1835 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1837 if (size
!= (sector_t
) rbd_dev
->mapping
.size
) {
1838 dout("setting size to %llu sectors",
1839 (unsigned long long) size
);
1840 rbd_dev
->mapping
.size
= (u64
) size
;
1841 set_capacity(rbd_dev
->disk
, size
);
1845 /* rbd_dev->header.object_prefix shouldn't change */
1846 kfree(rbd_dev
->header
.snap_sizes
);
1847 kfree(rbd_dev
->header
.snap_names
);
1848 /* osd requests may still refer to snapc */
1849 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1852 *hver
= h
.obj_version
;
1853 rbd_dev
->header
.obj_version
= h
.obj_version
;
1854 rbd_dev
->header
.image_size
= h
.image_size
;
1855 rbd_dev
->header
.snapc
= h
.snapc
;
1856 rbd_dev
->header
.snap_names
= h
.snap_names
;
1857 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1858 /* Free the extra copy of the object prefix */
1859 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1860 kfree(h
.object_prefix
);
1862 ret
= rbd_dev_snaps_update(rbd_dev
);
1864 ret
= rbd_dev_snaps_register(rbd_dev
);
1866 up_write(&rbd_dev
->header_rwsem
);
1871 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1875 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1876 ret
= __rbd_refresh_header(rbd_dev
, hver
);
1877 mutex_unlock(&ctl_mutex
);
1882 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1884 struct gendisk
*disk
;
1885 struct request_queue
*q
;
1888 /* create gendisk info */
1889 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1893 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1895 disk
->major
= rbd_dev
->major
;
1896 disk
->first_minor
= 0;
1897 disk
->fops
= &rbd_bd_ops
;
1898 disk
->private_data
= rbd_dev
;
1901 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1905 /* We use the default size, but let's be explicit about it. */
1906 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1908 /* set io sizes to object size */
1909 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1910 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1911 blk_queue_max_segment_size(q
, segment_size
);
1912 blk_queue_io_min(q
, segment_size
);
1913 blk_queue_io_opt(q
, segment_size
);
1915 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1918 q
->queuedata
= rbd_dev
;
1920 rbd_dev
->disk
= disk
;
1922 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
1935 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1937 return container_of(dev
, struct rbd_device
, dev
);
1940 static ssize_t
rbd_size_show(struct device
*dev
,
1941 struct device_attribute
*attr
, char *buf
)
1943 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1946 down_read(&rbd_dev
->header_rwsem
);
1947 size
= get_capacity(rbd_dev
->disk
);
1948 up_read(&rbd_dev
->header_rwsem
);
1950 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1953 static ssize_t
rbd_major_show(struct device
*dev
,
1954 struct device_attribute
*attr
, char *buf
)
1956 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1958 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1961 static ssize_t
rbd_client_id_show(struct device
*dev
,
1962 struct device_attribute
*attr
, char *buf
)
1964 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1966 return sprintf(buf
, "client%lld\n",
1967 ceph_client_id(rbd_dev
->rbd_client
->client
));
1970 static ssize_t
rbd_pool_show(struct device
*dev
,
1971 struct device_attribute
*attr
, char *buf
)
1973 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1975 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1978 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1979 struct device_attribute
*attr
, char *buf
)
1981 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1983 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
1986 static ssize_t
rbd_name_show(struct device
*dev
,
1987 struct device_attribute
*attr
, char *buf
)
1989 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1991 return sprintf(buf
, "%s\n", rbd_dev
->image_name
);
1994 static ssize_t
rbd_image_id_show(struct device
*dev
,
1995 struct device_attribute
*attr
, char *buf
)
1997 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1999 return sprintf(buf
, "%s\n", rbd_dev
->image_id
);
2002 static ssize_t
rbd_snap_show(struct device
*dev
,
2003 struct device_attribute
*attr
,
2006 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2008 return sprintf(buf
, "%s\n", rbd_dev
->mapping
.snap_name
);
2011 static ssize_t
rbd_image_refresh(struct device
*dev
,
2012 struct device_attribute
*attr
,
2016 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2019 ret
= rbd_refresh_header(rbd_dev
, NULL
);
2021 return ret
< 0 ? ret
: size
;
2024 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2025 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2026 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2027 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2028 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2029 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2030 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2031 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2032 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2033 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
2035 static struct attribute
*rbd_attrs
[] = {
2036 &dev_attr_size
.attr
,
2037 &dev_attr_major
.attr
,
2038 &dev_attr_client_id
.attr
,
2039 &dev_attr_pool
.attr
,
2040 &dev_attr_pool_id
.attr
,
2041 &dev_attr_name
.attr
,
2042 &dev_attr_image_id
.attr
,
2043 &dev_attr_current_snap
.attr
,
2044 &dev_attr_refresh
.attr
,
2045 &dev_attr_create_snap
.attr
,
2049 static struct attribute_group rbd_attr_group
= {
2053 static const struct attribute_group
*rbd_attr_groups
[] = {
2058 static void rbd_sysfs_dev_release(struct device
*dev
)
2062 static struct device_type rbd_device_type
= {
2064 .groups
= rbd_attr_groups
,
2065 .release
= rbd_sysfs_dev_release
,
2073 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2074 struct device_attribute
*attr
,
2077 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2079 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2082 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2083 struct device_attribute
*attr
,
2086 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2088 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2091 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2092 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2094 static struct attribute
*rbd_snap_attrs
[] = {
2095 &dev_attr_snap_size
.attr
,
2096 &dev_attr_snap_id
.attr
,
2100 static struct attribute_group rbd_snap_attr_group
= {
2101 .attrs
= rbd_snap_attrs
,
2104 static void rbd_snap_dev_release(struct device
*dev
)
2106 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2111 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2112 &rbd_snap_attr_group
,
2116 static struct device_type rbd_snap_device_type
= {
2117 .groups
= rbd_snap_attr_groups
,
2118 .release
= rbd_snap_dev_release
,
2121 static bool rbd_snap_registered(struct rbd_snap
*snap
)
2123 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
2124 bool reg
= device_is_registered(&snap
->dev
);
2126 rbd_assert(!ret
^ reg
);
2131 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
)
2133 list_del(&snap
->node
);
2134 if (device_is_registered(&snap
->dev
))
2135 device_unregister(&snap
->dev
);
2138 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2139 struct device
*parent
)
2141 struct device
*dev
= &snap
->dev
;
2144 dev
->type
= &rbd_snap_device_type
;
2145 dev
->parent
= parent
;
2146 dev
->release
= rbd_snap_dev_release
;
2147 dev_set_name(dev
, "snap_%s", snap
->name
);
2148 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
2150 ret
= device_register(dev
);
2155 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2156 int i
, const char *name
)
2158 struct rbd_snap
*snap
;
2161 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2163 return ERR_PTR(-ENOMEM
);
2166 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2170 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2171 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2179 return ERR_PTR(ret
);
2183 * Scan the rbd device's current snapshot list and compare it to the
2184 * newly-received snapshot context. Remove any existing snapshots
2185 * not present in the new snapshot context. Add a new snapshot for
2186 * any snaphots in the snapshot context not in the current list.
2187 * And verify there are no changes to snapshots we already know
2190 * Assumes the snapshots in the snapshot context are sorted by
2191 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2192 * are also maintained in that order.)
2194 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
2196 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2197 const u32 snap_count
= snapc
->num_snaps
;
2198 char *snap_name
= rbd_dev
->header
.snap_names
;
2199 struct list_head
*head
= &rbd_dev
->snaps
;
2200 struct list_head
*links
= head
->next
;
2203 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
2204 while (index
< snap_count
|| links
!= head
) {
2206 struct rbd_snap
*snap
;
2208 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2210 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2212 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
2214 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2215 struct list_head
*next
= links
->next
;
2217 /* Existing snapshot not in the new snap context */
2219 if (rbd_dev
->mapping
.snap_id
== snap
->id
)
2220 rbd_dev
->mapping
.snap_exists
= false;
2221 __rbd_remove_snap_dev(snap
);
2222 dout("%ssnap id %llu has been removed\n",
2223 rbd_dev
->mapping
.snap_id
== snap
->id
?
2225 (unsigned long long) snap
->id
);
2227 /* Done with this list entry; advance */
2233 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
2234 (unsigned long long) snap_id
);
2235 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2236 struct rbd_snap
*new_snap
;
2238 /* We haven't seen this snapshot before */
2240 new_snap
= __rbd_add_snap_dev(rbd_dev
, index
,
2242 if (IS_ERR(new_snap
)) {
2243 int err
= PTR_ERR(new_snap
);
2245 dout(" failed to add dev, error %d\n", err
);
2250 /* New goes before existing, or at end of list */
2252 dout(" added dev%s\n", snap
? "" : " at end\n");
2254 list_add_tail(&new_snap
->node
, &snap
->node
);
2256 list_add_tail(&new_snap
->node
, head
);
2258 /* Already have this one */
2260 dout(" already present\n");
2262 rbd_assert(snap
->size
==
2263 rbd_dev
->header
.snap_sizes
[index
]);
2264 rbd_assert(!strcmp(snap
->name
, snap_name
));
2266 /* Done with this list entry; advance */
2268 links
= links
->next
;
2271 /* Advance to the next entry in the snapshot context */
2274 snap_name
+= strlen(snap_name
) + 1;
2276 dout("%s: done\n", __func__
);
2282 * Scan the list of snapshots and register the devices for any that
2283 * have not already been registered.
2285 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
2287 struct rbd_snap
*snap
;
2290 dout("%s called\n", __func__
);
2291 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
2294 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2295 if (!rbd_snap_registered(snap
)) {
2296 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2301 dout("%s: returning %d\n", __func__
, ret
);
2306 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2311 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2313 dev
= &rbd_dev
->dev
;
2314 dev
->bus
= &rbd_bus_type
;
2315 dev
->type
= &rbd_device_type
;
2316 dev
->parent
= &rbd_root_dev
;
2317 dev
->release
= rbd_dev_release
;
2318 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2319 ret
= device_register(dev
);
2321 mutex_unlock(&ctl_mutex
);
2326 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2328 device_unregister(&rbd_dev
->dev
);
2331 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2336 ret
= rbd_req_sync_watch(rbd_dev
);
2337 if (ret
== -ERANGE
) {
2338 rc
= rbd_refresh_header(rbd_dev
, NULL
);
2342 } while (ret
== -ERANGE
);
2347 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
2350 * Get a unique rbd identifier for the given new rbd_dev, and add
2351 * the rbd_dev to the global list. The minimum rbd id is 1.
2353 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
2355 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
2357 spin_lock(&rbd_dev_list_lock
);
2358 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2359 spin_unlock(&rbd_dev_list_lock
);
2360 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
2361 (unsigned long long) rbd_dev
->dev_id
);
2365 * Remove an rbd_dev from the global list, and record that its
2366 * identifier is no longer in use.
2368 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
2370 struct list_head
*tmp
;
2371 int rbd_id
= rbd_dev
->dev_id
;
2374 rbd_assert(rbd_id
> 0);
2376 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
2377 (unsigned long long) rbd_dev
->dev_id
);
2378 spin_lock(&rbd_dev_list_lock
);
2379 list_del_init(&rbd_dev
->node
);
2382 * If the id being "put" is not the current maximum, there
2383 * is nothing special we need to do.
2385 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
2386 spin_unlock(&rbd_dev_list_lock
);
2391 * We need to update the current maximum id. Search the
2392 * list to find out what it is. We're more likely to find
2393 * the maximum at the end, so search the list backward.
2396 list_for_each_prev(tmp
, &rbd_dev_list
) {
2397 struct rbd_device
*rbd_dev
;
2399 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2400 if (rbd_id
> max_id
)
2403 spin_unlock(&rbd_dev_list_lock
);
2406 * The max id could have been updated by rbd_dev_id_get(), in
2407 * which case it now accurately reflects the new maximum.
2408 * Be careful not to overwrite the maximum value in that
2411 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
2412 dout(" max dev id has been reset\n");
2416 * Skips over white space at *buf, and updates *buf to point to the
2417 * first found non-space character (if any). Returns the length of
2418 * the token (string of non-white space characters) found. Note
2419 * that *buf must be terminated with '\0'.
2421 static inline size_t next_token(const char **buf
)
2424 * These are the characters that produce nonzero for
2425 * isspace() in the "C" and "POSIX" locales.
2427 const char *spaces
= " \f\n\r\t\v";
2429 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2431 return strcspn(*buf
, spaces
); /* Return token length */
2435 * Finds the next token in *buf, and if the provided token buffer is
2436 * big enough, copies the found token into it. The result, if
2437 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2438 * must be terminated with '\0' on entry.
2440 * Returns the length of the token found (not including the '\0').
2441 * Return value will be 0 if no token is found, and it will be >=
2442 * token_size if the token would not fit.
2444 * The *buf pointer will be updated to point beyond the end of the
2445 * found token. Note that this occurs even if the token buffer is
2446 * too small to hold it.
2448 static inline size_t copy_token(const char **buf
,
2454 len
= next_token(buf
);
2455 if (len
< token_size
) {
2456 memcpy(token
, *buf
, len
);
2457 *(token
+ len
) = '\0';
2465 * Finds the next token in *buf, dynamically allocates a buffer big
2466 * enough to hold a copy of it, and copies the token into the new
2467 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2468 * that a duplicate buffer is created even for a zero-length token.
2470 * Returns a pointer to the newly-allocated duplicate, or a null
2471 * pointer if memory for the duplicate was not available. If
2472 * the lenp argument is a non-null pointer, the length of the token
2473 * (not including the '\0') is returned in *lenp.
2475 * If successful, the *buf pointer will be updated to point beyond
2476 * the end of the found token.
2478 * Note: uses GFP_KERNEL for allocation.
2480 static inline char *dup_token(const char **buf
, size_t *lenp
)
2485 len
= next_token(buf
);
2486 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2490 memcpy(dup
, *buf
, len
);
2491 *(dup
+ len
) = '\0';
2501 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2502 * rbd_md_name, and name fields of the given rbd_dev, based on the
2503 * list of monitor addresses and other options provided via
2504 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2505 * copy of the snapshot name to map if successful, or a
2506 * pointer-coded error otherwise.
2508 * Note: rbd_dev is assumed to have been initially zero-filled.
2510 static char *rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2512 const char **mon_addrs
,
2513 size_t *mon_addrs_size
,
2515 size_t options_size
)
2518 char *err_ptr
= ERR_PTR(-EINVAL
);
2521 /* The first four tokens are required */
2523 len
= next_token(&buf
);
2526 *mon_addrs_size
= len
+ 1;
2531 len
= copy_token(&buf
, options
, options_size
);
2532 if (!len
|| len
>= options_size
)
2535 err_ptr
= ERR_PTR(-ENOMEM
);
2536 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2537 if (!rbd_dev
->pool_name
)
2540 rbd_dev
->image_name
= dup_token(&buf
, &rbd_dev
->image_name_len
);
2541 if (!rbd_dev
->image_name
)
2544 /* Snapshot name is optional */
2545 len
= next_token(&buf
);
2547 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
2548 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
2550 snap_name
= kmalloc(len
+ 1, GFP_KERNEL
);
2553 memcpy(snap_name
, buf
, len
);
2554 *(snap_name
+ len
) = '\0';
2556 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name
, len
);
2561 kfree(rbd_dev
->image_name
);
2562 rbd_dev
->image_name
= NULL
;
2563 rbd_dev
->image_name_len
= 0;
2564 kfree(rbd_dev
->pool_name
);
2565 rbd_dev
->pool_name
= NULL
;
2571 * An rbd format 2 image has a unique identifier, distinct from the
2572 * name given to it by the user. Internally, that identifier is
2573 * what's used to specify the names of objects related to the image.
2575 * A special "rbd id" object is used to map an rbd image name to its
2576 * id. If that object doesn't exist, then there is no v2 rbd image
2577 * with the supplied name.
2579 * This function will record the given rbd_dev's image_id field if
2580 * it can be determined, and in that case will return 0. If any
2581 * errors occur a negative errno will be returned and the rbd_dev's
2582 * image_id field will be unchanged (and should be NULL).
2584 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
2593 * First, see if the format 2 image id file exists, and if
2594 * so, get the image's persistent id from it.
2596 size
= sizeof (RBD_ID_PREFIX
) + rbd_dev
->image_name_len
;
2597 object_name
= kmalloc(size
, GFP_NOIO
);
2600 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->image_name
);
2601 dout("rbd id object name is %s\n", object_name
);
2603 /* Response will be an encoded string, which includes a length */
2605 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
2606 response
= kzalloc(size
, GFP_NOIO
);
2612 ret
= rbd_req_sync_exec(rbd_dev
, object_name
,
2615 response
, RBD_IMAGE_ID_LEN_MAX
,
2616 CEPH_OSD_FLAG_READ
, NULL
);
2617 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2622 rbd_dev
->image_id
= ceph_extract_encoded_string(&p
,
2623 p
+ RBD_IMAGE_ID_LEN_MAX
,
2624 &rbd_dev
->image_id_len
,
2626 if (IS_ERR(rbd_dev
->image_id
)) {
2627 ret
= PTR_ERR(rbd_dev
->image_id
);
2628 rbd_dev
->image_id
= NULL
;
2630 dout("image_id is %s\n", rbd_dev
->image_id
);
2639 static ssize_t
rbd_add(struct bus_type
*bus
,
2644 struct rbd_device
*rbd_dev
= NULL
;
2645 const char *mon_addrs
= NULL
;
2646 size_t mon_addrs_size
= 0;
2647 struct ceph_osd_client
*osdc
;
2651 if (!try_module_get(THIS_MODULE
))
2654 options
= kmalloc(count
, GFP_KERNEL
);
2657 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2661 /* static rbd_device initialization */
2662 spin_lock_init(&rbd_dev
->lock
);
2663 INIT_LIST_HEAD(&rbd_dev
->node
);
2664 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2665 init_rwsem(&rbd_dev
->header_rwsem
);
2667 /* parse add command */
2668 snap_name
= rbd_add_parse_args(rbd_dev
, buf
,
2669 &mon_addrs
, &mon_addrs_size
, options
, count
);
2670 if (IS_ERR(snap_name
)) {
2671 rc
= PTR_ERR(snap_name
);
2675 rc
= rbd_get_client(rbd_dev
, mon_addrs
, mon_addrs_size
- 1, options
);
2680 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2681 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2683 goto err_out_client
;
2684 rbd_dev
->pool_id
= rc
;
2686 rc
= rbd_dev_image_id(rbd_dev
);
2688 rc
= -ENOTSUPP
; /* Not actually supporting format 2 yet */
2689 goto err_out_client
;
2692 /* Version 1 images have no id; empty string is used */
2694 rbd_dev
->image_id
= kstrdup("", GFP_KERNEL
);
2695 if (!rbd_dev
->image_id
) {
2697 goto err_out_client
;
2699 rbd_dev
->image_id_len
= 0;
2701 /* Create the name of the header object */
2703 rbd_dev
->header_name
= kmalloc(rbd_dev
->image_name_len
2704 + sizeof (RBD_SUFFIX
),
2706 if (!rbd_dev
->header_name
)
2707 goto err_out_client
;
2708 sprintf(rbd_dev
->header_name
, "%s%s", rbd_dev
->image_name
, RBD_SUFFIX
);
2710 /* Get information about the image being mapped */
2712 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
2714 goto err_out_client
;
2716 /* no need to lock here, as rbd_dev is not registered yet */
2717 rc
= rbd_dev_snaps_update(rbd_dev
);
2719 goto err_out_header
;
2721 rc
= rbd_dev_set_mapping(rbd_dev
, snap_name
);
2723 goto err_out_header
;
2725 /* generate unique id: find highest unique id, add one */
2726 rbd_dev_id_get(rbd_dev
);
2728 /* Fill in the device name, now that we have its id. */
2729 BUILD_BUG_ON(DEV_NAME_LEN
2730 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2731 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
2733 /* Get our block major device number. */
2735 rc
= register_blkdev(0, rbd_dev
->name
);
2738 rbd_dev
->major
= rc
;
2740 /* Set up the blkdev mapping. */
2742 rc
= rbd_init_disk(rbd_dev
);
2744 goto err_out_blkdev
;
2746 rc
= rbd_bus_add_dev(rbd_dev
);
2751 * At this point cleanup in the event of an error is the job
2752 * of the sysfs code (initiated by rbd_bus_del_dev()).
2755 down_write(&rbd_dev
->header_rwsem
);
2756 rc
= rbd_dev_snaps_register(rbd_dev
);
2757 up_write(&rbd_dev
->header_rwsem
);
2761 rc
= rbd_init_watch_dev(rbd_dev
);
2765 /* Everything's ready. Announce the disk to the world. */
2767 add_disk(rbd_dev
->disk
);
2769 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
2770 (unsigned long long) rbd_dev
->mapping
.size
);
2775 /* this will also clean up rest of rbd_dev stuff */
2777 rbd_bus_del_dev(rbd_dev
);
2782 rbd_free_disk(rbd_dev
);
2784 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2786 rbd_dev_id_put(rbd_dev
);
2788 rbd_header_free(&rbd_dev
->header
);
2790 kfree(rbd_dev
->header_name
);
2791 rbd_put_client(rbd_dev
);
2792 kfree(rbd_dev
->image_id
);
2794 kfree(rbd_dev
->mapping
.snap_name
);
2795 kfree(rbd_dev
->image_name
);
2796 kfree(rbd_dev
->pool_name
);
2801 dout("Error adding device %s\n", buf
);
2802 module_put(THIS_MODULE
);
2804 return (ssize_t
) rc
;
2807 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
2809 struct list_head
*tmp
;
2810 struct rbd_device
*rbd_dev
;
2812 spin_lock(&rbd_dev_list_lock
);
2813 list_for_each(tmp
, &rbd_dev_list
) {
2814 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2815 if (rbd_dev
->dev_id
== dev_id
) {
2816 spin_unlock(&rbd_dev_list_lock
);
2820 spin_unlock(&rbd_dev_list_lock
);
2824 static void rbd_dev_release(struct device
*dev
)
2826 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2828 if (rbd_dev
->watch_request
) {
2829 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2831 ceph_osdc_unregister_linger_request(&client
->osdc
,
2832 rbd_dev
->watch_request
);
2834 if (rbd_dev
->watch_event
)
2835 rbd_req_sync_unwatch(rbd_dev
);
2837 rbd_put_client(rbd_dev
);
2839 /* clean up and free blkdev */
2840 rbd_free_disk(rbd_dev
);
2841 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2843 /* release allocated disk header fields */
2844 rbd_header_free(&rbd_dev
->header
);
2846 /* done with the id, and with the rbd_dev */
2847 kfree(rbd_dev
->mapping
.snap_name
);
2848 kfree(rbd_dev
->image_id
);
2849 kfree(rbd_dev
->header_name
);
2850 kfree(rbd_dev
->pool_name
);
2851 kfree(rbd_dev
->image_name
);
2852 rbd_dev_id_put(rbd_dev
);
2855 /* release module ref */
2856 module_put(THIS_MODULE
);
2859 static ssize_t
rbd_remove(struct bus_type
*bus
,
2863 struct rbd_device
*rbd_dev
= NULL
;
2868 rc
= strict_strtoul(buf
, 10, &ul
);
2872 /* convert to int; abort if we lost anything in the conversion */
2873 target_id
= (int) ul
;
2874 if (target_id
!= ul
)
2877 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2879 rbd_dev
= __rbd_get_dev(target_id
);
2885 __rbd_remove_all_snaps(rbd_dev
);
2886 rbd_bus_del_dev(rbd_dev
);
2889 mutex_unlock(&ctl_mutex
);
2894 static ssize_t
rbd_snap_add(struct device
*dev
,
2895 struct device_attribute
*attr
,
2899 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2901 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2905 snprintf(name
, count
, "%s", buf
);
2907 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2909 ret
= rbd_header_add_snap(rbd_dev
,
2914 ret
= __rbd_refresh_header(rbd_dev
, NULL
);
2918 /* shouldn't hold ctl_mutex when notifying.. notify might
2919 trigger a watch callback that would need to get that mutex */
2920 mutex_unlock(&ctl_mutex
);
2922 /* make a best effort, don't error if failed */
2923 rbd_req_sync_notify(rbd_dev
);
2930 mutex_unlock(&ctl_mutex
);
2936 * create control files in sysfs
2939 static int rbd_sysfs_init(void)
2943 ret
= device_register(&rbd_root_dev
);
2947 ret
= bus_register(&rbd_bus_type
);
2949 device_unregister(&rbd_root_dev
);
2954 static void rbd_sysfs_cleanup(void)
2956 bus_unregister(&rbd_bus_type
);
2957 device_unregister(&rbd_root_dev
);
2960 int __init
rbd_init(void)
2964 rc
= rbd_sysfs_init();
2967 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2971 void __exit
rbd_exit(void)
2973 rbd_sysfs_cleanup();
2976 module_init(rbd_init
);
2977 module_exit(rbd_exit
);
2979 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2980 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2981 MODULE_DESCRIPTION("rados block device");
2983 /* following authorship retained from original osdblk.c */
2984 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2986 MODULE_LICENSE("GPL");