2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header
{
83 struct ceph_snap_context
*snapc
;
84 size_t snap_names_len
;
99 * an instance of the client. multiple devices may share an rbd client.
102 struct ceph_client
*client
;
103 struct rbd_options
*rbd_opts
;
105 struct list_head node
;
109 * a request completion status
111 struct rbd_req_status
{
118 * a collection of requests
120 struct rbd_req_coll
{
124 struct rbd_req_status status
[0];
128 * a single io request
131 struct request
*rq
; /* blk layer request */
132 struct bio
*bio
; /* cloned bio */
133 struct page
**pages
; /* list of used pages */
136 struct rbd_req_coll
*coll
;
143 struct list_head node
;
151 int id
; /* blkdev unique id */
153 int major
; /* blkdev assigned major */
154 struct gendisk
*disk
; /* blkdev's gendisk and rq */
155 struct request_queue
*q
;
157 struct rbd_client
*rbd_client
;
159 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
161 spinlock_t lock
; /* queue lock */
163 struct rbd_image_header header
;
164 char *obj
; /* rbd image name */
166 char *obj_md_name
; /* hdr nm. */
170 struct ceph_osd_event
*watch_event
;
171 struct ceph_osd_request
*watch_request
;
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem
;
176 u64 snap_id
; /* current snapshot id */
179 struct list_head node
;
181 /* list of snapshots */
182 struct list_head snaps
;
188 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
190 static LIST_HEAD(rbd_dev_list
); /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
193 static LIST_HEAD(rbd_client_list
); /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock
);
196 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
197 static void rbd_dev_release(struct device
*dev
);
198 static ssize_t
rbd_snap_add(struct device
*dev
,
199 struct device_attribute
*attr
,
202 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
203 struct rbd_snap
*snap
);
205 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
207 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
210 static struct bus_attribute rbd_bus_attrs
[] = {
211 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
212 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
216 static struct bus_type rbd_bus_type
= {
218 .bus_attrs
= rbd_bus_attrs
,
221 static void rbd_root_dev_release(struct device
*dev
)
225 static struct device rbd_root_dev
= {
227 .release
= rbd_root_dev_release
,
231 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
233 return get_device(&rbd_dev
->dev
);
236 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
238 put_device(&rbd_dev
->dev
);
241 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
);
243 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
245 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
247 rbd_get_dev(rbd_dev
);
249 set_device_ro(bdev
, rbd_dev
->read_only
);
251 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
257 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
259 struct rbd_device
*rbd_dev
= disk
->private_data
;
261 rbd_put_dev(rbd_dev
);
266 static const struct block_device_operations rbd_bd_ops
= {
267 .owner
= THIS_MODULE
,
269 .release
= rbd_release
,
273 * Initialize an rbd client instance.
276 static struct rbd_client
*rbd_client_create(struct ceph_options
*opt
,
277 struct rbd_options
*rbd_opts
)
279 struct rbd_client
*rbdc
;
282 dout("rbd_client_create\n");
283 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
287 kref_init(&rbdc
->kref
);
288 INIT_LIST_HEAD(&rbdc
->node
);
290 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
292 rbdc
->client
= ceph_create_client(opt
, rbdc
, 0, 0);
293 if (IS_ERR(rbdc
->client
))
295 opt
= NULL
; /* Now rbdc->client is responsible for opt */
297 ret
= ceph_open_session(rbdc
->client
);
301 rbdc
->rbd_opts
= rbd_opts
;
303 spin_lock(&rbd_client_list_lock
);
304 list_add_tail(&rbdc
->node
, &rbd_client_list
);
305 spin_unlock(&rbd_client_list_lock
);
307 mutex_unlock(&ctl_mutex
);
309 dout("rbd_client_create created %p\n", rbdc
);
313 ceph_destroy_client(rbdc
->client
);
315 mutex_unlock(&ctl_mutex
);
319 ceph_destroy_options(opt
);
324 * Find a ceph client with specific addr and configuration.
326 static struct rbd_client
*__rbd_client_find(struct ceph_options
*opt
)
328 struct rbd_client
*client_node
;
330 if (opt
->flags
& CEPH_OPT_NOSHARE
)
333 list_for_each_entry(client_node
, &rbd_client_list
, node
)
334 if (ceph_compare_options(opt
, client_node
->client
) == 0)
347 /* string args above */
350 static match_table_t rbdopt_tokens
= {
351 {Opt_notify_timeout
, "notify_timeout=%d"},
353 /* string args above */
357 static int parse_rbd_opts_token(char *c
, void *private)
359 struct rbd_options
*rbdopt
= private;
360 substring_t argstr
[MAX_OPT_ARGS
];
361 int token
, intval
, ret
;
363 token
= match_token(c
, rbdopt_tokens
, argstr
);
367 if (token
< Opt_last_int
) {
368 ret
= match_int(&argstr
[0], &intval
);
370 pr_err("bad mount option arg (not int) "
374 dout("got int token %d val %d\n", token
, intval
);
375 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
376 dout("got string token %d val %s\n", token
,
379 dout("got token %d\n", token
);
383 case Opt_notify_timeout
:
384 rbdopt
->notify_timeout
= intval
;
393 * Get a ceph client with specific addr and configuration, if one does
394 * not exist create it.
396 static struct rbd_client
*rbd_get_client(const char *mon_addr
,
400 struct rbd_client
*rbdc
;
401 struct ceph_options
*opt
;
402 struct rbd_options
*rbd_opts
;
404 rbd_opts
= kzalloc(sizeof(*rbd_opts
), GFP_KERNEL
);
406 return ERR_PTR(-ENOMEM
);
408 rbd_opts
->notify_timeout
= RBD_NOTIFY_TIMEOUT_DEFAULT
;
410 opt
= ceph_parse_options(options
, mon_addr
,
411 mon_addr
+ mon_addr_len
,
412 parse_rbd_opts_token
, rbd_opts
);
415 return ERR_CAST(opt
);
418 spin_lock(&rbd_client_list_lock
);
419 rbdc
= __rbd_client_find(opt
);
421 /* using an existing client */
422 kref_get(&rbdc
->kref
);
423 spin_unlock(&rbd_client_list_lock
);
425 ceph_destroy_options(opt
);
430 spin_unlock(&rbd_client_list_lock
);
432 rbdc
= rbd_client_create(opt
, rbd_opts
);
441 * Destroy ceph client
443 * Caller must hold rbd_client_list_lock.
445 static void rbd_client_release(struct kref
*kref
)
447 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
449 dout("rbd_release_client %p\n", rbdc
);
450 spin_lock(&rbd_client_list_lock
);
451 list_del(&rbdc
->node
);
452 spin_unlock(&rbd_client_list_lock
);
454 ceph_destroy_client(rbdc
->client
);
455 kfree(rbdc
->rbd_opts
);
460 * Drop reference to ceph client node. If it's not referenced anymore, release
463 static void rbd_put_client(struct rbd_device
*rbd_dev
)
465 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
466 rbd_dev
->rbd_client
= NULL
;
470 * Destroy requests collection
472 static void rbd_coll_release(struct kref
*kref
)
474 struct rbd_req_coll
*coll
=
475 container_of(kref
, struct rbd_req_coll
, kref
);
477 dout("rbd_coll_release %p\n", coll
);
482 * Create a new header structure, translate header format from the on-disk
485 static int rbd_header_from_disk(struct rbd_image_header
*header
,
486 struct rbd_image_header_ondisk
*ondisk
,
492 if (memcmp(ondisk
, RBD_HEADER_TEXT
, sizeof(RBD_HEADER_TEXT
)))
495 snap_count
= le32_to_cpu(ondisk
->snap_count
);
496 if (snap_count
> (UINT_MAX
- sizeof(struct ceph_snap_context
))
499 header
->snapc
= kmalloc(sizeof(struct ceph_snap_context
) +
500 snap_count
* sizeof(u64
),
505 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
507 header
->snap_names
= kmalloc(header
->snap_names_len
,
509 if (!header
->snap_names
)
511 header
->snap_sizes
= kmalloc(snap_count
* sizeof(u64
),
513 if (!header
->snap_sizes
)
516 header
->snap_names
= NULL
;
517 header
->snap_sizes
= NULL
;
520 header
->object_prefix
= kmalloc(sizeof (ondisk
->block_name
) + 1,
522 if (!header
->object_prefix
)
525 memcpy(header
->object_prefix
, ondisk
->block_name
,
526 sizeof(ondisk
->block_name
));
527 header
->object_prefix
[sizeof (ondisk
->block_name
)] = '\0';
529 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
530 header
->obj_order
= ondisk
->options
.order
;
531 header
->crypt_type
= ondisk
->options
.crypt_type
;
532 header
->comp_type
= ondisk
->options
.comp_type
;
534 atomic_set(&header
->snapc
->nref
, 1);
535 header
->snap_seq
= le64_to_cpu(ondisk
->snap_seq
);
536 header
->snapc
->num_snaps
= snap_count
;
537 header
->total_snaps
= snap_count
;
539 if (snap_count
&& allocated_snaps
== snap_count
) {
540 for (i
= 0; i
< snap_count
; i
++) {
541 header
->snapc
->snaps
[i
] =
542 le64_to_cpu(ondisk
->snaps
[i
].id
);
543 header
->snap_sizes
[i
] =
544 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
547 /* copy snapshot names */
548 memcpy(header
->snap_names
, &ondisk
->snaps
[i
],
549 header
->snap_names_len
);
555 kfree(header
->snap_sizes
);
557 kfree(header
->snap_names
);
559 kfree(header
->snapc
);
563 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
567 char *p
= header
->snap_names
;
569 for (i
= 0; i
< header
->total_snaps
; i
++) {
570 if (!strcmp(snap_name
, p
)) {
572 /* Found it. Pass back its id and/or size */
575 *seq
= header
->snapc
->snaps
[i
];
577 *size
= header
->snap_sizes
[i
];
580 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
585 static int rbd_header_set_snap(struct rbd_device
*rbd_dev
, u64
*size
)
587 struct rbd_image_header
*header
= &rbd_dev
->header
;
588 struct ceph_snap_context
*snapc
= header
->snapc
;
591 down_write(&rbd_dev
->header_rwsem
);
593 if (!memcmp(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
594 sizeof (RBD_SNAP_HEAD_NAME
))) {
595 if (header
->total_snaps
)
596 snapc
->seq
= header
->snap_seq
;
599 rbd_dev
->snap_id
= CEPH_NOSNAP
;
600 rbd_dev
->read_only
= 0;
602 *size
= header
->image_size
;
604 ret
= snap_by_name(header
, rbd_dev
->snap_name
,
608 rbd_dev
->snap_id
= snapc
->seq
;
609 rbd_dev
->read_only
= 1;
614 up_write(&rbd_dev
->header_rwsem
);
618 static void rbd_header_free(struct rbd_image_header
*header
)
620 kfree(header
->object_prefix
);
621 kfree(header
->snap_sizes
);
622 kfree(header
->snap_names
);
623 kfree(header
->snapc
);
627 * get the actual striped segment name, offset and length
629 static u64
rbd_get_segment(struct rbd_image_header
*header
,
630 const char *object_prefix
,
632 char *seg_name
, u64
*segofs
)
634 u64 seg
= ofs
>> header
->obj_order
;
637 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
638 "%s.%012llx", object_prefix
, seg
);
640 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
641 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
649 static int rbd_get_num_segments(struct rbd_image_header
*header
,
652 u64 start_seg
= ofs
>> header
->obj_order
;
653 u64 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
654 return end_seg
- start_seg
+ 1;
658 * returns the size of an object in the image
660 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
662 return 1 << header
->obj_order
;
669 static void bio_chain_put(struct bio
*chain
)
675 chain
= chain
->bi_next
;
681 * zeros a bio chain, starting at specific offset
683 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
692 bio_for_each_segment(bv
, chain
, i
) {
693 if (pos
+ bv
->bv_len
> start_ofs
) {
694 int remainder
= max(start_ofs
- pos
, 0);
695 buf
= bvec_kmap_irq(bv
, &flags
);
696 memset(buf
+ remainder
, 0,
697 bv
->bv_len
- remainder
);
698 bvec_kunmap_irq(buf
, &flags
);
703 chain
= chain
->bi_next
;
708 * bio_chain_clone - clone a chain of bios up to a certain length.
709 * might return a bio_pair that will need to be released.
711 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
712 struct bio_pair
**bp
,
713 int len
, gfp_t gfpmask
)
715 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
719 bio_pair_release(*bp
);
723 while (old_chain
&& (total
< len
)) {
724 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
728 if (total
+ old_chain
->bi_size
> len
) {
732 * this split can only happen with a single paged bio,
733 * split_bio will BUG_ON if this is not the case
735 dout("bio_chain_clone split! total=%d remaining=%d"
737 (int)total
, (int)len
-total
,
738 (int)old_chain
->bi_size
);
740 /* split the bio. We'll release it either in the next
741 call, or it will have to be released outside */
742 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
746 __bio_clone(tmp
, &bp
->bio1
);
750 __bio_clone(tmp
, old_chain
);
751 *next
= old_chain
->bi_next
;
755 gfpmask
&= ~__GFP_WAIT
;
759 new_chain
= tail
= tmp
;
764 old_chain
= old_chain
->bi_next
;
766 total
+= tmp
->bi_size
;
772 tail
->bi_next
= NULL
;
779 dout("bio_chain_clone with err\n");
780 bio_chain_put(new_chain
);
785 * helpers for osd request op vectors.
787 static int rbd_create_rw_ops(struct ceph_osd_req_op
**ops
,
792 *ops
= kzalloc(sizeof(struct ceph_osd_req_op
) * (num_ops
+ 1),
796 (*ops
)[0].op
= opcode
;
798 * op extent offset and length will be set later on
799 * in calc_raw_layout()
801 (*ops
)[0].payload_len
= payload_len
;
805 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
810 static void rbd_coll_end_req_index(struct request
*rq
,
811 struct rbd_req_coll
*coll
,
815 struct request_queue
*q
;
818 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
819 coll
, index
, ret
, len
);
825 blk_end_request(rq
, ret
, len
);
831 spin_lock_irq(q
->queue_lock
);
832 coll
->status
[index
].done
= 1;
833 coll
->status
[index
].rc
= ret
;
834 coll
->status
[index
].bytes
= len
;
835 max
= min
= coll
->num_done
;
836 while (max
< coll
->total
&& coll
->status
[max
].done
)
839 for (i
= min
; i
<max
; i
++) {
840 __blk_end_request(rq
, coll
->status
[i
].rc
,
841 coll
->status
[i
].bytes
);
843 kref_put(&coll
->kref
, rbd_coll_release
);
845 spin_unlock_irq(q
->queue_lock
);
848 static void rbd_coll_end_req(struct rbd_request
*req
,
851 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
855 * Send ceph osd request
857 static int rbd_do_request(struct request
*rq
,
858 struct rbd_device
*rbd_dev
,
859 struct ceph_snap_context
*snapc
,
861 const char *obj
, u64 ofs
, u64 len
,
866 struct ceph_osd_req_op
*ops
,
868 struct rbd_req_coll
*coll
,
870 void (*rbd_cb
)(struct ceph_osd_request
*req
,
871 struct ceph_msg
*msg
),
872 struct ceph_osd_request
**linger_req
,
875 struct ceph_osd_request
*req
;
876 struct ceph_file_layout
*layout
;
879 struct timespec mtime
= CURRENT_TIME
;
880 struct rbd_request
*req_data
;
881 struct ceph_osd_request_head
*reqhead
;
882 struct ceph_osd_client
*osdc
;
884 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
887 rbd_coll_end_req_index(rq
, coll
, coll_index
,
893 req_data
->coll
= coll
;
894 req_data
->coll_index
= coll_index
;
897 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj
, len
, ofs
);
899 down_read(&rbd_dev
->header_rwsem
);
901 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
902 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
903 false, GFP_NOIO
, pages
, bio
);
905 up_read(&rbd_dev
->header_rwsem
);
910 req
->r_callback
= rbd_cb
;
914 req_data
->pages
= pages
;
917 req
->r_priv
= req_data
;
919 reqhead
= req
->r_request
->front
.iov_base
;
920 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
922 strncpy(req
->r_oid
, obj
, sizeof(req
->r_oid
));
923 req
->r_oid_len
= strlen(req
->r_oid
);
925 layout
= &req
->r_file_layout
;
926 memset(layout
, 0, sizeof(*layout
));
927 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
928 layout
->fl_stripe_count
= cpu_to_le32(1);
929 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
930 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
931 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
934 ceph_osdc_build_request(req
, ofs
, &len
,
938 req
->r_oid
, req
->r_oid_len
);
939 up_read(&rbd_dev
->header_rwsem
);
942 ceph_osdc_set_request_linger(osdc
, req
);
946 ret
= ceph_osdc_start_request(osdc
, req
, false);
951 ret
= ceph_osdc_wait_request(osdc
, req
);
953 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
954 dout("reassert_ver=%lld\n",
955 le64_to_cpu(req
->r_reassert_version
.version
));
956 ceph_osdc_put_request(req
);
961 bio_chain_put(req_data
->bio
);
962 ceph_osdc_put_request(req
);
964 rbd_coll_end_req(req_data
, ret
, len
);
970 * Ceph osd op callback
972 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
974 struct rbd_request
*req_data
= req
->r_priv
;
975 struct ceph_osd_reply_head
*replyhead
;
976 struct ceph_osd_op
*op
;
982 replyhead
= msg
->front
.iov_base
;
983 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
984 op
= (void *)(replyhead
+ 1);
985 rc
= le32_to_cpu(replyhead
->result
);
986 bytes
= le64_to_cpu(op
->extent
.length
);
987 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
989 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes
, read_op
, rc
);
991 if (rc
== -ENOENT
&& read_op
) {
992 zero_bio_chain(req_data
->bio
, 0);
994 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
995 zero_bio_chain(req_data
->bio
, bytes
);
996 bytes
= req_data
->len
;
999 rbd_coll_end_req(req_data
, rc
, bytes
);
1002 bio_chain_put(req_data
->bio
);
1004 ceph_osdc_put_request(req
);
1008 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1010 ceph_osdc_put_request(req
);
1014 * Do a synchronous ceph osd operation
1016 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1017 struct ceph_snap_context
*snapc
,
1021 struct ceph_osd_req_op
*orig_ops
,
1026 struct ceph_osd_request
**linger_req
,
1030 struct page
**pages
;
1032 struct ceph_osd_req_op
*ops
= orig_ops
;
1035 num_pages
= calc_pages_for(ofs
, len
);
1036 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1038 return PTR_ERR(pages
);
1041 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? len
: 0);
1042 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1046 if ((flags
& CEPH_OSD_FLAG_WRITE
) && buf
) {
1047 ret
= ceph_copy_to_page_vector(pages
, buf
, ofs
, len
);
1053 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1054 obj
, ofs
, len
, NULL
,
1065 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1066 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1070 rbd_destroy_ops(ops
);
1072 ceph_release_page_vector(pages
, num_pages
);
1077 * Do an asynchronous ceph osd operation
1079 static int rbd_do_op(struct request
*rq
,
1080 struct rbd_device
*rbd_dev
,
1081 struct ceph_snap_context
*snapc
,
1083 int opcode
, int flags
, int num_reply
,
1086 struct rbd_req_coll
*coll
,
1093 struct ceph_osd_req_op
*ops
;
1096 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1100 seg_len
= rbd_get_segment(&rbd_dev
->header
,
1101 rbd_dev
->header
.object_prefix
,
1103 seg_name
, &seg_ofs
);
1105 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1107 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1111 /* we've taken care of segment sizes earlier when we
1112 cloned the bios. We should never have a segment
1113 truncated at this point */
1114 BUG_ON(seg_len
< len
);
1116 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1117 seg_name
, seg_ofs
, seg_len
,
1124 rbd_req_cb
, 0, NULL
);
1126 rbd_destroy_ops(ops
);
1133 * Request async osd write
1135 static int rbd_req_write(struct request
*rq
,
1136 struct rbd_device
*rbd_dev
,
1137 struct ceph_snap_context
*snapc
,
1140 struct rbd_req_coll
*coll
,
1143 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1145 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1147 ofs
, len
, bio
, coll
, coll_index
);
1151 * Request async osd read
1153 static int rbd_req_read(struct request
*rq
,
1154 struct rbd_device
*rbd_dev
,
1158 struct rbd_req_coll
*coll
,
1161 return rbd_do_op(rq
, rbd_dev
, NULL
,
1166 ofs
, len
, bio
, coll
, coll_index
);
1170 * Request sync osd read
1172 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1173 struct ceph_snap_context
*snapc
,
1180 return rbd_req_sync_op(rbd_dev
, NULL
,
1185 1, obj
, ofs
, len
, buf
, NULL
, ver
);
1189 * Request sync osd watch
1191 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1196 struct ceph_osd_req_op
*ops
;
1199 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1203 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1204 ops
[0].watch
.cookie
= notify_id
;
1205 ops
[0].watch
.flag
= 0;
1207 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1214 rbd_simple_req_cb
, 0, NULL
);
1216 rbd_destroy_ops(ops
);
1220 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1222 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1228 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", rbd_dev
->obj_md_name
,
1229 notify_id
, (int)opcode
);
1230 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1231 rc
= __rbd_refresh_header(rbd_dev
);
1232 mutex_unlock(&ctl_mutex
);
1234 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1235 " update snaps: %d\n", rbd_dev
->major
, rc
);
1237 rbd_req_sync_notify_ack(rbd_dev
, ver
, notify_id
, rbd_dev
->obj_md_name
);
1241 * Request sync osd watch
1243 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
,
1247 struct ceph_osd_req_op
*ops
;
1248 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1250 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1254 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1255 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1259 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1260 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1261 ops
[0].watch
.flag
= 1;
1263 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1266 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1269 &rbd_dev
->watch_request
, NULL
);
1274 rbd_destroy_ops(ops
);
1278 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1279 rbd_dev
->watch_event
= NULL
;
1281 rbd_destroy_ops(ops
);
1286 * Request sync osd unwatch
1288 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
,
1291 struct ceph_osd_req_op
*ops
;
1293 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1297 ops
[0].watch
.ver
= 0;
1298 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1299 ops
[0].watch
.flag
= 0;
1301 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1304 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1306 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1308 rbd_destroy_ops(ops
);
1309 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1310 rbd_dev
->watch_event
= NULL
;
1314 struct rbd_notify_info
{
1315 struct rbd_device
*rbd_dev
;
1318 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1320 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1324 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1325 rbd_dev
->obj_md_name
,
1326 notify_id
, (int)opcode
);
1330 * Request sync osd notify
1332 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
,
1335 struct ceph_osd_req_op
*ops
;
1336 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1337 struct ceph_osd_event
*event
;
1338 struct rbd_notify_info info
;
1339 int payload_len
= sizeof(u32
) + sizeof(u32
);
1342 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1346 info
.rbd_dev
= rbd_dev
;
1348 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1349 (void *)&info
, &event
);
1353 ops
[0].watch
.ver
= 1;
1354 ops
[0].watch
.flag
= 1;
1355 ops
[0].watch
.cookie
= event
->cookie
;
1356 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1357 ops
[0].watch
.timeout
= 12;
1359 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1362 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1364 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1368 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1369 dout("ceph_osdc_wait_event returned %d\n", ret
);
1370 rbd_destroy_ops(ops
);
1374 ceph_osdc_cancel_event(event
);
1376 rbd_destroy_ops(ops
);
1381 * Request sync osd read
1383 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1391 struct ceph_osd_req_op
*ops
;
1392 int cls_len
= strlen(cls
);
1393 int method_len
= strlen(method
);
1394 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_CALL
,
1395 cls_len
+ method_len
+ len
);
1399 ops
[0].cls
.class_name
= cls
;
1400 ops
[0].cls
.class_len
= (__u8
)cls_len
;
1401 ops
[0].cls
.method_name
= method
;
1402 ops
[0].cls
.method_len
= (__u8
)method_len
;
1403 ops
[0].cls
.argc
= 0;
1404 ops
[0].cls
.indata
= data
;
1405 ops
[0].cls
.indata_len
= len
;
1407 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1410 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1412 1, obj
, 0, 0, NULL
, NULL
, ver
);
1414 rbd_destroy_ops(ops
);
1416 dout("cls_exec returned %d\n", ret
);
1420 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1422 struct rbd_req_coll
*coll
=
1423 kzalloc(sizeof(struct rbd_req_coll
) +
1424 sizeof(struct rbd_req_status
) * num_reqs
,
1429 coll
->total
= num_reqs
;
1430 kref_init(&coll
->kref
);
1435 * block device queue callback
1437 static void rbd_rq_fn(struct request_queue
*q
)
1439 struct rbd_device
*rbd_dev
= q
->queuedata
;
1441 struct bio_pair
*bp
= NULL
;
1443 while ((rq
= blk_fetch_request(q
))) {
1445 struct bio
*rq_bio
, *next_bio
= NULL
;
1447 int size
, op_size
= 0;
1449 int num_segs
, cur_seg
= 0;
1450 struct rbd_req_coll
*coll
;
1452 /* peek at request from block layer */
1456 dout("fetched request\n");
1458 /* filter out block requests we don't understand */
1459 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1460 __blk_end_request_all(rq
, 0);
1464 /* deduce our operation (read, write) */
1465 do_write
= (rq_data_dir(rq
) == WRITE
);
1467 size
= blk_rq_bytes(rq
);
1468 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1470 if (do_write
&& rbd_dev
->read_only
) {
1471 __blk_end_request_all(rq
, -EROFS
);
1475 spin_unlock_irq(q
->queue_lock
);
1477 dout("%s 0x%x bytes at 0x%llx\n",
1478 do_write
? "write" : "read",
1479 size
, blk_rq_pos(rq
) * SECTOR_SIZE
);
1481 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1482 coll
= rbd_alloc_coll(num_segs
);
1484 spin_lock_irq(q
->queue_lock
);
1485 __blk_end_request_all(rq
, -ENOMEM
);
1490 /* a bio clone to be passed down to OSD req */
1491 dout("rq->bio->bi_vcnt=%d\n", rq
->bio
->bi_vcnt
);
1492 op_size
= rbd_get_segment(&rbd_dev
->header
,
1493 rbd_dev
->header
.object_prefix
,
1496 kref_get(&coll
->kref
);
1497 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1498 op_size
, GFP_ATOMIC
);
1500 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1506 /* init OSD command: write or read */
1508 rbd_req_write(rq
, rbd_dev
,
1509 rbd_dev
->header
.snapc
,
1514 rbd_req_read(rq
, rbd_dev
,
1527 kref_put(&coll
->kref
, rbd_coll_release
);
1530 bio_pair_release(bp
);
1531 spin_lock_irq(q
->queue_lock
);
1536 * a queue callback. Makes sure that we don't create a bio that spans across
1537 * multiple osd objects. One exception would be with a single page bios,
1538 * which we handle later at bio_chain_clone
1540 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1541 struct bio_vec
*bvec
)
1543 struct rbd_device
*rbd_dev
= q
->queuedata
;
1544 unsigned int chunk_sectors
;
1546 unsigned int bio_sectors
;
1549 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1550 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1551 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1553 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1554 + bio_sectors
)) << SECTOR_SHIFT
;
1556 max
= 0; /* bio_add cannot handle a negative return */
1557 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1558 return bvec
->bv_len
;
1562 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1564 struct gendisk
*disk
= rbd_dev
->disk
;
1569 rbd_header_free(&rbd_dev
->header
);
1571 if (disk
->flags
& GENHD_FL_UP
)
1574 blk_cleanup_queue(disk
->queue
);
1579 * reload the ondisk the header
1581 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1582 struct rbd_image_header
*header
)
1585 struct rbd_image_header_ondisk
*dh
;
1591 * First reads the fixed-size header to determine the number
1592 * of snapshots, then re-reads it, along with all snapshot
1593 * records as well as their stored names.
1597 dh
= kmalloc(len
, GFP_KERNEL
);
1601 rc
= rbd_req_sync_read(rbd_dev
,
1603 rbd_dev
->obj_md_name
,
1609 rc
= rbd_header_from_disk(header
, dh
, snap_count
, GFP_KERNEL
);
1612 pr_warning("unrecognized header format"
1613 " for image %s", rbd_dev
->obj
);
1617 if (snap_count
== header
->total_snaps
)
1620 snap_count
= header
->total_snaps
;
1621 len
= sizeof (*dh
) +
1622 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1623 header
->snap_names_len
;
1625 rbd_header_free(header
);
1628 header
->obj_version
= ver
;
1638 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1639 const char *snap_name
,
1642 int name_len
= strlen(snap_name
);
1647 struct ceph_mon_client
*monc
;
1649 /* we should create a snapshot only if we're pointing at the head */
1650 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
)
1653 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1654 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1655 dout("created snapid=%lld\n", new_snapid
);
1659 data
= kmalloc(name_len
+ 16, gfp_flags
);
1664 e
= data
+ name_len
+ 16;
1666 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1667 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1669 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->obj_md_name
,
1671 data
, p
- data
, &ver
);
1678 down_write(&rbd_dev
->header_rwsem
);
1679 rbd_dev
->header
.snapc
->seq
= new_snapid
;
1680 up_write(&rbd_dev
->header_rwsem
);
1687 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1689 struct rbd_snap
*snap
;
1691 while (!list_empty(&rbd_dev
->snaps
)) {
1692 snap
= list_first_entry(&rbd_dev
->snaps
, struct rbd_snap
, node
);
1693 __rbd_remove_snap_dev(rbd_dev
, snap
);
1698 * only read the first part of the ondisk header, without the snaps info
1700 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
)
1703 struct rbd_image_header h
;
1707 ret
= rbd_read_header(rbd_dev
, &h
);
1712 set_capacity(rbd_dev
->disk
, h
.image_size
/ SECTOR_SIZE
);
1714 down_write(&rbd_dev
->header_rwsem
);
1716 snap_seq
= rbd_dev
->header
.snapc
->seq
;
1717 if (rbd_dev
->header
.total_snaps
&&
1718 rbd_dev
->header
.snapc
->snaps
[0] == snap_seq
)
1719 /* pointing at the head, will need to follow that
1723 /* rbd_dev->header.object_prefix shouldn't change */
1724 kfree(rbd_dev
->header
.snap_sizes
);
1725 kfree(rbd_dev
->header
.snap_names
);
1726 kfree(rbd_dev
->header
.snapc
);
1728 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1729 rbd_dev
->header
.snapc
= h
.snapc
;
1730 rbd_dev
->header
.snap_names
= h
.snap_names
;
1731 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1732 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1733 /* Free the extra copy of the object prefix */
1734 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1735 kfree(h
.object_prefix
);
1738 rbd_dev
->header
.snapc
->seq
= rbd_dev
->header
.snapc
->snaps
[0];
1740 rbd_dev
->header
.snapc
->seq
= snap_seq
;
1742 ret
= __rbd_init_snaps_header(rbd_dev
);
1744 up_write(&rbd_dev
->header_rwsem
);
1749 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1751 struct gendisk
*disk
;
1752 struct request_queue
*q
;
1757 /* contact OSD, request size info about the object being mapped */
1758 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1762 /* no need to lock here, as rbd_dev is not registered yet */
1763 rc
= __rbd_init_snaps_header(rbd_dev
);
1767 rc
= rbd_header_set_snap(rbd_dev
, &total_size
);
1771 /* create gendisk info */
1773 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1777 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1779 disk
->major
= rbd_dev
->major
;
1780 disk
->first_minor
= 0;
1781 disk
->fops
= &rbd_bd_ops
;
1782 disk
->private_data
= rbd_dev
;
1786 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1790 /* We use the default size, but let's be explicit about it. */
1791 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1793 /* set io sizes to object size */
1794 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1795 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1796 blk_queue_max_segment_size(q
, segment_size
);
1797 blk_queue_io_min(q
, segment_size
);
1798 blk_queue_io_opt(q
, segment_size
);
1800 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1803 q
->queuedata
= rbd_dev
;
1805 rbd_dev
->disk
= disk
;
1808 /* finally, announce the disk to the world */
1809 set_capacity(disk
, total_size
/ SECTOR_SIZE
);
1812 pr_info("%s: added with size 0x%llx\n",
1813 disk
->disk_name
, (unsigned long long)total_size
);
1826 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1828 return container_of(dev
, struct rbd_device
, dev
);
1831 static ssize_t
rbd_size_show(struct device
*dev
,
1832 struct device_attribute
*attr
, char *buf
)
1834 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1836 return sprintf(buf
, "%llu\n", (unsigned long long)rbd_dev
->header
.image_size
);
1839 static ssize_t
rbd_major_show(struct device
*dev
,
1840 struct device_attribute
*attr
, char *buf
)
1842 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1844 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1847 static ssize_t
rbd_client_id_show(struct device
*dev
,
1848 struct device_attribute
*attr
, char *buf
)
1850 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1852 return sprintf(buf
, "client%lld\n",
1853 ceph_client_id(rbd_dev
->rbd_client
->client
));
1856 static ssize_t
rbd_pool_show(struct device
*dev
,
1857 struct device_attribute
*attr
, char *buf
)
1859 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1861 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1864 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1865 struct device_attribute
*attr
, char *buf
)
1867 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1869 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
1872 static ssize_t
rbd_name_show(struct device
*dev
,
1873 struct device_attribute
*attr
, char *buf
)
1875 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1877 return sprintf(buf
, "%s\n", rbd_dev
->obj
);
1880 static ssize_t
rbd_snap_show(struct device
*dev
,
1881 struct device_attribute
*attr
,
1884 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1886 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1889 static ssize_t
rbd_image_refresh(struct device
*dev
,
1890 struct device_attribute
*attr
,
1894 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1898 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1900 rc
= __rbd_refresh_header(rbd_dev
);
1904 mutex_unlock(&ctl_mutex
);
1908 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1909 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1910 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1911 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1912 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
1913 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1914 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1915 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1916 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1918 static struct attribute
*rbd_attrs
[] = {
1919 &dev_attr_size
.attr
,
1920 &dev_attr_major
.attr
,
1921 &dev_attr_client_id
.attr
,
1922 &dev_attr_pool
.attr
,
1923 &dev_attr_pool_id
.attr
,
1924 &dev_attr_name
.attr
,
1925 &dev_attr_current_snap
.attr
,
1926 &dev_attr_refresh
.attr
,
1927 &dev_attr_create_snap
.attr
,
1931 static struct attribute_group rbd_attr_group
= {
1935 static const struct attribute_group
*rbd_attr_groups
[] = {
1940 static void rbd_sysfs_dev_release(struct device
*dev
)
1944 static struct device_type rbd_device_type
= {
1946 .groups
= rbd_attr_groups
,
1947 .release
= rbd_sysfs_dev_release
,
1955 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1956 struct device_attribute
*attr
,
1959 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1961 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
1964 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1965 struct device_attribute
*attr
,
1968 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1970 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
1973 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
1974 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
1976 static struct attribute
*rbd_snap_attrs
[] = {
1977 &dev_attr_snap_size
.attr
,
1978 &dev_attr_snap_id
.attr
,
1982 static struct attribute_group rbd_snap_attr_group
= {
1983 .attrs
= rbd_snap_attrs
,
1986 static void rbd_snap_dev_release(struct device
*dev
)
1988 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1993 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
1994 &rbd_snap_attr_group
,
1998 static struct device_type rbd_snap_device_type
= {
1999 .groups
= rbd_snap_attr_groups
,
2000 .release
= rbd_snap_dev_release
,
2003 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
2004 struct rbd_snap
*snap
)
2006 list_del(&snap
->node
);
2007 device_unregister(&snap
->dev
);
2010 static int rbd_register_snap_dev(struct rbd_device
*rbd_dev
,
2011 struct rbd_snap
*snap
,
2012 struct device
*parent
)
2014 struct device
*dev
= &snap
->dev
;
2017 dev
->type
= &rbd_snap_device_type
;
2018 dev
->parent
= parent
;
2019 dev
->release
= rbd_snap_dev_release
;
2020 dev_set_name(dev
, "snap_%s", snap
->name
);
2021 ret
= device_register(dev
);
2026 static int __rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2027 int i
, const char *name
,
2028 struct rbd_snap
**snapp
)
2031 struct rbd_snap
*snap
= kzalloc(sizeof(*snap
), GFP_KERNEL
);
2034 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2035 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2036 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2037 if (device_is_registered(&rbd_dev
->dev
)) {
2038 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
2052 * search for the previous snap in a null delimited string list
2054 const char *rbd_prev_snap_name(const char *name
, const char *start
)
2056 if (name
< start
+ 2)
2069 * compare the old list of snapshots that we have to what's in the header
2070 * and update it accordingly. Note that the header holds the snapshots
2071 * in a reverse order (from newest to oldest) and we need to go from
2072 * older to new so that we don't get a duplicate snap name when
2073 * doing the process (e.g., removed snapshot and recreated a new
2074 * one with the same name.
2076 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
2078 const char *name
, *first_name
;
2079 int i
= rbd_dev
->header
.total_snaps
;
2080 struct rbd_snap
*snap
, *old_snap
= NULL
;
2082 struct list_head
*p
, *n
;
2084 first_name
= rbd_dev
->header
.snap_names
;
2085 name
= first_name
+ rbd_dev
->header
.snap_names_len
;
2087 list_for_each_prev_safe(p
, n
, &rbd_dev
->snaps
) {
2090 old_snap
= list_entry(p
, struct rbd_snap
, node
);
2093 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
- 1];
2095 if (!i
|| old_snap
->id
< cur_id
) {
2096 /* old_snap->id was skipped, thus was removed */
2097 __rbd_remove_snap_dev(rbd_dev
, old_snap
);
2100 if (old_snap
->id
== cur_id
) {
2101 /* we have this snapshot already */
2103 name
= rbd_prev_snap_name(name
, first_name
);
2107 i
--, name
= rbd_prev_snap_name(name
, first_name
)) {
2112 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
];
2113 /* snapshot removal? handle it above */
2114 if (cur_id
>= old_snap
->id
)
2116 /* a new snapshot */
2117 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2121 /* note that we add it backward so using n and not p */
2122 list_add(&snap
->node
, n
);
2126 /* we're done going over the old snap list, just add what's left */
2127 for (; i
> 0; i
--) {
2128 name
= rbd_prev_snap_name(name
, first_name
);
2133 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2136 list_add(&snap
->node
, &rbd_dev
->snaps
);
2142 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2146 struct rbd_snap
*snap
;
2148 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2149 dev
= &rbd_dev
->dev
;
2151 dev
->bus
= &rbd_bus_type
;
2152 dev
->type
= &rbd_device_type
;
2153 dev
->parent
= &rbd_root_dev
;
2154 dev
->release
= rbd_dev_release
;
2155 dev_set_name(dev
, "%d", rbd_dev
->id
);
2156 ret
= device_register(dev
);
2160 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2161 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
2167 mutex_unlock(&ctl_mutex
);
2171 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2173 device_unregister(&rbd_dev
->dev
);
2176 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2181 ret
= rbd_req_sync_watch(rbd_dev
, rbd_dev
->obj_md_name
,
2182 rbd_dev
->header
.obj_version
);
2183 if (ret
== -ERANGE
) {
2184 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2185 rc
= __rbd_refresh_header(rbd_dev
);
2186 mutex_unlock(&ctl_mutex
);
2190 } while (ret
== -ERANGE
);
2195 static atomic64_t rbd_id_max
= ATOMIC64_INIT(0);
2198 * Get a unique rbd identifier for the given new rbd_dev, and add
2199 * the rbd_dev to the global list. The minimum rbd id is 1.
2201 static void rbd_id_get(struct rbd_device
*rbd_dev
)
2203 rbd_dev
->id
= atomic64_inc_return(&rbd_id_max
);
2205 spin_lock(&rbd_dev_list_lock
);
2206 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2207 spin_unlock(&rbd_dev_list_lock
);
2211 * Remove an rbd_dev from the global list, and record that its
2212 * identifier is no longer in use.
2214 static void rbd_id_put(struct rbd_device
*rbd_dev
)
2216 struct list_head
*tmp
;
2217 int rbd_id
= rbd_dev
->id
;
2222 spin_lock(&rbd_dev_list_lock
);
2223 list_del_init(&rbd_dev
->node
);
2226 * If the id being "put" is not the current maximum, there
2227 * is nothing special we need to do.
2229 if (rbd_id
!= atomic64_read(&rbd_id_max
)) {
2230 spin_unlock(&rbd_dev_list_lock
);
2235 * We need to update the current maximum id. Search the
2236 * list to find out what it is. We're more likely to find
2237 * the maximum at the end, so search the list backward.
2240 list_for_each_prev(tmp
, &rbd_dev_list
) {
2241 struct rbd_device
*rbd_dev
;
2243 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2244 if (rbd_id
> max_id
)
2247 spin_unlock(&rbd_dev_list_lock
);
2250 * The max id could have been updated by rbd_id_get(), in
2251 * which case it now accurately reflects the new maximum.
2252 * Be careful not to overwrite the maximum value in that
2255 atomic64_cmpxchg(&rbd_id_max
, rbd_id
, max_id
);
2259 * Skips over white space at *buf, and updates *buf to point to the
2260 * first found non-space character (if any). Returns the length of
2261 * the token (string of non-white space characters) found. Note
2262 * that *buf must be terminated with '\0'.
2264 static inline size_t next_token(const char **buf
)
2267 * These are the characters that produce nonzero for
2268 * isspace() in the "C" and "POSIX" locales.
2270 const char *spaces
= " \f\n\r\t\v";
2272 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2274 return strcspn(*buf
, spaces
); /* Return token length */
2278 * Finds the next token in *buf, and if the provided token buffer is
2279 * big enough, copies the found token into it. The result, if
2280 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2281 * must be terminated with '\0' on entry.
2283 * Returns the length of the token found (not including the '\0').
2284 * Return value will be 0 if no token is found, and it will be >=
2285 * token_size if the token would not fit.
2287 * The *buf pointer will be updated to point beyond the end of the
2288 * found token. Note that this occurs even if the token buffer is
2289 * too small to hold it.
2291 static inline size_t copy_token(const char **buf
,
2297 len
= next_token(buf
);
2298 if (len
< token_size
) {
2299 memcpy(token
, *buf
, len
);
2300 *(token
+ len
) = '\0';
2308 * Finds the next token in *buf, dynamically allocates a buffer big
2309 * enough to hold a copy of it, and copies the token into the new
2310 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2311 * that a duplicate buffer is created even for a zero-length token.
2313 * Returns a pointer to the newly-allocated duplicate, or a null
2314 * pointer if memory for the duplicate was not available. If
2315 * the lenp argument is a non-null pointer, the length of the token
2316 * (not including the '\0') is returned in *lenp.
2318 * If successful, the *buf pointer will be updated to point beyond
2319 * the end of the found token.
2321 * Note: uses GFP_KERNEL for allocation.
2323 static inline char *dup_token(const char **buf
, size_t *lenp
)
2328 len
= next_token(buf
);
2329 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2333 memcpy(dup
, *buf
, len
);
2334 *(dup
+ len
) = '\0';
2344 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2345 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2346 * on the list of monitor addresses and other options provided via
2349 * Note: rbd_dev is assumed to have been initially zero-filled.
2351 static int rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2353 const char **mon_addrs
,
2354 size_t *mon_addrs_size
,
2356 size_t options_size
)
2361 /* The first four tokens are required */
2363 len
= next_token(&buf
);
2366 *mon_addrs_size
= len
+ 1;
2371 len
= copy_token(&buf
, options
, options_size
);
2372 if (!len
|| len
>= options_size
)
2376 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2377 if (!rbd_dev
->pool_name
)
2380 rbd_dev
->obj
= dup_token(&buf
, &rbd_dev
->obj_len
);
2384 /* Create the name of the header object */
2386 rbd_dev
->obj_md_name
= kmalloc(rbd_dev
->obj_len
2387 + sizeof (RBD_SUFFIX
),
2389 if (!rbd_dev
->obj_md_name
)
2391 sprintf(rbd_dev
->obj_md_name
, "%s%s", rbd_dev
->obj
, RBD_SUFFIX
);
2394 * The snapshot name is optional. If none is is supplied,
2395 * we use the default value.
2397 rbd_dev
->snap_name
= dup_token(&buf
, &len
);
2398 if (!rbd_dev
->snap_name
)
2401 /* Replace the empty name with the default */
2402 kfree(rbd_dev
->snap_name
);
2404 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME
), GFP_KERNEL
);
2405 if (!rbd_dev
->snap_name
)
2408 memcpy(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
2409 sizeof (RBD_SNAP_HEAD_NAME
));
2415 kfree(rbd_dev
->obj_md_name
);
2416 kfree(rbd_dev
->obj
);
2417 kfree(rbd_dev
->pool_name
);
2418 rbd_dev
->pool_name
= NULL
;
2423 static ssize_t
rbd_add(struct bus_type
*bus
,
2428 struct rbd_device
*rbd_dev
= NULL
;
2429 const char *mon_addrs
= NULL
;
2430 size_t mon_addrs_size
= 0;
2431 struct ceph_osd_client
*osdc
;
2434 if (!try_module_get(THIS_MODULE
))
2437 options
= kmalloc(count
, GFP_KERNEL
);
2440 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2444 /* static rbd_device initialization */
2445 spin_lock_init(&rbd_dev
->lock
);
2446 INIT_LIST_HEAD(&rbd_dev
->node
);
2447 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2448 init_rwsem(&rbd_dev
->header_rwsem
);
2450 init_rwsem(&rbd_dev
->header_rwsem
);
2452 /* generate unique id: find highest unique id, add one */
2453 rbd_id_get(rbd_dev
);
2455 /* Fill in the device name, now that we have its id. */
2456 BUILD_BUG_ON(DEV_NAME_LEN
2457 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2458 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->id
);
2460 /* parse add command */
2461 rc
= rbd_add_parse_args(rbd_dev
, buf
, &mon_addrs
, &mon_addrs_size
,
2466 rbd_dev
->rbd_client
= rbd_get_client(mon_addrs
, mon_addrs_size
- 1,
2468 if (IS_ERR(rbd_dev
->rbd_client
)) {
2469 rc
= PTR_ERR(rbd_dev
->rbd_client
);
2474 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2475 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2477 goto err_out_client
;
2478 rbd_dev
->pool_id
= rc
;
2480 /* register our block device */
2481 rc
= register_blkdev(0, rbd_dev
->name
);
2483 goto err_out_client
;
2484 rbd_dev
->major
= rc
;
2486 rc
= rbd_bus_add_dev(rbd_dev
);
2488 goto err_out_blkdev
;
2491 * At this point cleanup in the event of an error is the job
2492 * of the sysfs code (initiated by rbd_bus_del_dev()).
2494 * Set up and announce blkdev mapping.
2496 rc
= rbd_init_disk(rbd_dev
);
2500 rc
= rbd_init_watch_dev(rbd_dev
);
2507 /* this will also clean up rest of rbd_dev stuff */
2509 rbd_bus_del_dev(rbd_dev
);
2514 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2516 rbd_put_client(rbd_dev
);
2518 if (rbd_dev
->pool_name
) {
2519 kfree(rbd_dev
->snap_name
);
2520 kfree(rbd_dev
->obj_md_name
);
2521 kfree(rbd_dev
->obj
);
2522 kfree(rbd_dev
->pool_name
);
2524 rbd_id_put(rbd_dev
);
2529 dout("Error adding device %s\n", buf
);
2530 module_put(THIS_MODULE
);
2532 return (ssize_t
) rc
;
2535 static struct rbd_device
*__rbd_get_dev(unsigned long id
)
2537 struct list_head
*tmp
;
2538 struct rbd_device
*rbd_dev
;
2540 spin_lock(&rbd_dev_list_lock
);
2541 list_for_each(tmp
, &rbd_dev_list
) {
2542 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2543 if (rbd_dev
->id
== id
) {
2544 spin_unlock(&rbd_dev_list_lock
);
2548 spin_unlock(&rbd_dev_list_lock
);
2552 static void rbd_dev_release(struct device
*dev
)
2554 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2556 if (rbd_dev
->watch_request
) {
2557 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2559 ceph_osdc_unregister_linger_request(&client
->osdc
,
2560 rbd_dev
->watch_request
);
2562 if (rbd_dev
->watch_event
)
2563 rbd_req_sync_unwatch(rbd_dev
, rbd_dev
->obj_md_name
);
2565 rbd_put_client(rbd_dev
);
2567 /* clean up and free blkdev */
2568 rbd_free_disk(rbd_dev
);
2569 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2571 /* done with the id, and with the rbd_dev */
2572 kfree(rbd_dev
->snap_name
);
2573 kfree(rbd_dev
->obj_md_name
);
2574 kfree(rbd_dev
->pool_name
);
2575 kfree(rbd_dev
->obj
);
2576 rbd_id_put(rbd_dev
);
2579 /* release module ref */
2580 module_put(THIS_MODULE
);
2583 static ssize_t
rbd_remove(struct bus_type
*bus
,
2587 struct rbd_device
*rbd_dev
= NULL
;
2592 rc
= strict_strtoul(buf
, 10, &ul
);
2596 /* convert to int; abort if we lost anything in the conversion */
2597 target_id
= (int) ul
;
2598 if (target_id
!= ul
)
2601 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2603 rbd_dev
= __rbd_get_dev(target_id
);
2609 __rbd_remove_all_snaps(rbd_dev
);
2610 rbd_bus_del_dev(rbd_dev
);
2613 mutex_unlock(&ctl_mutex
);
2617 static ssize_t
rbd_snap_add(struct device
*dev
,
2618 struct device_attribute
*attr
,
2622 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2624 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2628 snprintf(name
, count
, "%s", buf
);
2630 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2632 ret
= rbd_header_add_snap(rbd_dev
,
2637 ret
= __rbd_refresh_header(rbd_dev
);
2641 /* shouldn't hold ctl_mutex when notifying.. notify might
2642 trigger a watch callback that would need to get that mutex */
2643 mutex_unlock(&ctl_mutex
);
2645 /* make a best effort, don't error if failed */
2646 rbd_req_sync_notify(rbd_dev
, rbd_dev
->obj_md_name
);
2653 mutex_unlock(&ctl_mutex
);
2659 * create control files in sysfs
2662 static int rbd_sysfs_init(void)
2666 ret
= device_register(&rbd_root_dev
);
2670 ret
= bus_register(&rbd_bus_type
);
2672 device_unregister(&rbd_root_dev
);
2677 static void rbd_sysfs_cleanup(void)
2679 bus_unregister(&rbd_bus_type
);
2680 device_unregister(&rbd_root_dev
);
2683 int __init
rbd_init(void)
2687 rc
= rbd_sysfs_init();
2690 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2694 void __exit
rbd_exit(void)
2696 rbd_sysfs_cleanup();
2699 module_init(rbd_init
);
2700 module_exit(rbd_exit
);
2702 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2703 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2704 MODULE_DESCRIPTION("rados block device");
2706 /* following authorship retained from original osdblk.c */
2707 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2709 MODULE_LICENSE("GPL");