rbd: dynamically allocate pool name
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
21079786 58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
59#define RBD_MAX_SNAP_NAME_LEN 32
60#define RBD_MAX_OPT_LEN 1024
61
62#define RBD_SNAP_HEAD_NAME "-"
63
81a89793
AE
64/*
65 * An RBD device name will be "rbd#", where the "rbd" comes from
66 * RBD_DRV_NAME above, and # is a unique integer identifier.
67 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
68 * enough to hold all possible device names.
69 */
602adf40 70#define DEV_NAME_LEN 32
81a89793 71#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 72
59c2be1e
YS
73#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
74
602adf40
YS
75/*
76 * block device image metadata (in-memory version)
77 */
78struct rbd_image_header {
79 u64 image_size;
ca1e49a6 80 char object_prefix[32];
602adf40
YS
81 __u8 obj_order;
82 __u8 crypt_type;
83 __u8 comp_type;
602adf40
YS
84 struct ceph_snap_context *snapc;
85 size_t snap_names_len;
86 u64 snap_seq;
87 u32 total_snaps;
88
89 char *snap_names;
90 u64 *snap_sizes;
59c2be1e
YS
91
92 u64 obj_version;
93};
94
95struct rbd_options {
96 int notify_timeout;
602adf40
YS
97};
98
99/*
f0f8cef5 100 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
101 */
102struct rbd_client {
103 struct ceph_client *client;
59c2be1e 104 struct rbd_options *rbd_opts;
602adf40
YS
105 struct kref kref;
106 struct list_head node;
107};
108
109/*
f0f8cef5 110 * a request completion status
602adf40 111 */
1fec7093
YS
112struct rbd_req_status {
113 int done;
114 int rc;
115 u64 bytes;
116};
117
118/*
119 * a collection of requests
120 */
121struct rbd_req_coll {
122 int total;
123 int num_done;
124 struct kref kref;
125 struct rbd_req_status status[0];
602adf40
YS
126};
127
f0f8cef5
AE
128/*
129 * a single io request
130 */
131struct rbd_request {
132 struct request *rq; /* blk layer request */
133 struct bio *bio; /* cloned bio */
134 struct page **pages; /* list of used pages */
135 u64 len;
136 int coll_index;
137 struct rbd_req_coll *coll;
138};
139
dfc5606d
YS
140struct rbd_snap {
141 struct device dev;
142 const char *name;
3591538f 143 u64 size;
dfc5606d
YS
144 struct list_head node;
145 u64 id;
146};
147
602adf40
YS
148/*
149 * a single device
150 */
151struct rbd_device {
152 int id; /* blkdev unique id */
153
154 int major; /* blkdev assigned major */
155 struct gendisk *disk; /* blkdev's gendisk and rq */
156 struct request_queue *q;
157
602adf40
YS
158 struct rbd_client *rbd_client;
159
160 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
161
162 spinlock_t lock; /* queue lock */
163
164 struct rbd_image_header header;
165 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
166 int obj_len;
167 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
d22f76e7 168 char *pool_name;
9bb2f334 169 int pool_id;
602adf40 170
59c2be1e
YS
171 struct ceph_osd_event *watch_event;
172 struct ceph_osd_request *watch_request;
173
c666601a
JD
174 /* protects updating the header */
175 struct rw_semaphore header_rwsem;
602adf40 176 char snap_name[RBD_MAX_SNAP_NAME_LEN];
77dfe99f 177 u64 snap_id; /* current snapshot id */
602adf40
YS
178 int read_only;
179
180 struct list_head node;
dfc5606d
YS
181
182 /* list of snapshots */
183 struct list_head snaps;
184
185 /* sysfs related */
186 struct device dev;
187};
188
602adf40 189static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 190
602adf40 191static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
192static DEFINE_SPINLOCK(rbd_dev_list_lock);
193
432b8587
AE
194static LIST_HEAD(rbd_client_list); /* clients */
195static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 196
dfc5606d
YS
197static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
198static void rbd_dev_release(struct device *dev);
dfc5606d
YS
199static ssize_t rbd_snap_add(struct device *dev,
200 struct device_attribute *attr,
201 const char *buf,
202 size_t count);
203static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 204 struct rbd_snap *snap);
dfc5606d 205
f0f8cef5
AE
206static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 size_t count);
208static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209 size_t count);
210
211static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214 __ATTR_NULL
215};
216
217static struct bus_type rbd_bus_type = {
218 .name = "rbd",
219 .bus_attrs = rbd_bus_attrs,
220};
221
222static void rbd_root_dev_release(struct device *dev)
223{
224}
225
226static struct device rbd_root_dev = {
227 .init_name = "rbd",
228 .release = rbd_root_dev_release,
229};
230
dfc5606d 231
dfc5606d
YS
232static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233{
234 return get_device(&rbd_dev->dev);
235}
236
237static void rbd_put_dev(struct rbd_device *rbd_dev)
238{
239 put_device(&rbd_dev->dev);
240}
602adf40 241
263c6ca0 242static int __rbd_refresh_header(struct rbd_device *rbd_dev);
59c2be1e 243
602adf40
YS
244static int rbd_open(struct block_device *bdev, fmode_t mode)
245{
f0f8cef5 246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 247
dfc5606d
YS
248 rbd_get_dev(rbd_dev);
249
602adf40
YS
250 set_device_ro(bdev, rbd_dev->read_only);
251
252 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
253 return -EROFS;
254
255 return 0;
256}
257
dfc5606d
YS
258static int rbd_release(struct gendisk *disk, fmode_t mode)
259{
260 struct rbd_device *rbd_dev = disk->private_data;
261
262 rbd_put_dev(rbd_dev);
263
264 return 0;
265}
266
602adf40
YS
267static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
269 .open = rbd_open,
dfc5606d 270 .release = rbd_release,
602adf40
YS
271};
272
273/*
274 * Initialize an rbd client instance.
275 * We own *opt.
276 */
59c2be1e
YS
277static struct rbd_client *rbd_client_create(struct ceph_options *opt,
278 struct rbd_options *rbd_opts)
602adf40
YS
279{
280 struct rbd_client *rbdc;
281 int ret = -ENOMEM;
282
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 if (!rbdc)
286 goto out_opt;
287
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
290
bc534d86
AE
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
6ab00d46 293 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40 294 if (IS_ERR(rbdc->client))
bc534d86 295 goto out_mutex;
28f259b7 296 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
297
298 ret = ceph_open_session(rbdc->client);
299 if (ret < 0)
300 goto out_err;
301
59c2be1e
YS
302 rbdc->rbd_opts = rbd_opts;
303
432b8587 304 spin_lock(&rbd_client_list_lock);
602adf40 305 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 306 spin_unlock(&rbd_client_list_lock);
602adf40 307
bc534d86
AE
308 mutex_unlock(&ctl_mutex);
309
602adf40
YS
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313out_err:
314 ceph_destroy_client(rbdc->client);
bc534d86
AE
315out_mutex:
316 mutex_unlock(&ctl_mutex);
602adf40
YS
317 kfree(rbdc);
318out_opt:
28f259b7
VK
319 if (opt)
320 ceph_destroy_options(opt);
321 return ERR_PTR(ret);
602adf40
YS
322}
323
324/*
325 * Find a ceph client with specific addr and configuration.
326 */
327static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
328{
329 struct rbd_client *client_node;
330
331 if (opt->flags & CEPH_OPT_NOSHARE)
332 return NULL;
333
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (ceph_compare_options(opt, client_node->client) == 0)
336 return client_node;
337 return NULL;
338}
339
59c2be1e
YS
340/*
341 * mount options
342 */
343enum {
344 Opt_notify_timeout,
345 Opt_last_int,
346 /* int args above */
347 Opt_last_string,
348 /* string args above */
349};
350
351static match_table_t rbdopt_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* int args above */
354 /* string args above */
355 {-1, NULL}
356};
357
358static int parse_rbd_opts_token(char *c, void *private)
359{
360 struct rbd_options *rbdopt = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
363
21079786 364 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
365 if (token < 0)
366 return -EINVAL;
367
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
370 if (ret < 0) {
371 pr_err("bad mount option arg (not int) "
372 "at '%s'\n", c);
373 return ret;
374 }
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
378 argstr[0].from);
379 } else {
380 dout("got token %d\n", token);
381 }
382
383 switch (token) {
384 case Opt_notify_timeout:
385 rbdopt->notify_timeout = intval;
386 break;
387 default:
388 BUG_ON(token);
389 }
390 return 0;
391}
392
602adf40
YS
393/*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
5214ecc4
AE
397static struct rbd_client *rbd_get_client(const char *mon_addr,
398 size_t mon_addr_len,
399 char *options)
602adf40
YS
400{
401 struct rbd_client *rbdc;
402 struct ceph_options *opt;
59c2be1e
YS
403 struct rbd_options *rbd_opts;
404
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 if (!rbd_opts)
d720bcb0 407 return ERR_PTR(-ENOMEM);
59c2be1e
YS
408
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 410
ee57741c 411 opt = ceph_parse_options(options, mon_addr,
5214ecc4 412 mon_addr + mon_addr_len,
21079786 413 parse_rbd_opts_token, rbd_opts);
ee57741c 414 if (IS_ERR(opt)) {
d720bcb0
AE
415 kfree(rbd_opts);
416 return ERR_CAST(opt);
ee57741c 417 }
602adf40 418
432b8587 419 spin_lock(&rbd_client_list_lock);
602adf40
YS
420 rbdc = __rbd_client_find(opt);
421 if (rbdc) {
602adf40
YS
422 /* using an existing client */
423 kref_get(&rbdc->kref);
432b8587 424 spin_unlock(&rbd_client_list_lock);
e6994d3d 425
e6994d3d
AE
426 ceph_destroy_options(opt);
427 kfree(rbd_opts);
428
d720bcb0 429 return rbdc;
602adf40 430 }
432b8587 431 spin_unlock(&rbd_client_list_lock);
602adf40 432
59c2be1e 433 rbdc = rbd_client_create(opt, rbd_opts);
d97081b0 434
d720bcb0
AE
435 if (IS_ERR(rbdc))
436 kfree(rbd_opts);
602adf40 437
d720bcb0 438 return rbdc;
602adf40
YS
439}
440
441/*
442 * Destroy ceph client
d23a4b3f 443 *
432b8587 444 * Caller must hold rbd_client_list_lock.
602adf40
YS
445 */
446static void rbd_client_release(struct kref *kref)
447{
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 451 spin_lock(&rbd_client_list_lock);
602adf40 452 list_del(&rbdc->node);
cd9d9f5d 453 spin_unlock(&rbd_client_list_lock);
602adf40
YS
454
455 ceph_destroy_client(rbdc->client);
59c2be1e 456 kfree(rbdc->rbd_opts);
602adf40
YS
457 kfree(rbdc);
458}
459
460/*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464static void rbd_put_client(struct rbd_device *rbd_dev)
465{
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
602adf40
YS
468}
469
1fec7093
YS
470/*
471 * Destroy requests collection
472 */
473static void rbd_coll_release(struct kref *kref)
474{
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
477
478 dout("rbd_coll_release %p\n", coll);
479 kfree(coll);
480}
602adf40
YS
481
482/*
483 * Create a new header structure, translate header format from the on-disk
484 * header.
485 */
486static int rbd_header_from_disk(struct rbd_image_header *header,
487 struct rbd_image_header_ondisk *ondisk,
50f7c4c9 488 u32 allocated_snaps,
602adf40
YS
489 gfp_t gfp_flags)
490{
50f7c4c9 491 u32 i, snap_count;
602adf40 492
21079786 493 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 494 return -ENXIO;
81e759fb 495
00f1f36f 496 snap_count = le32_to_cpu(ondisk->snap_count);
50f7c4c9
XW
497 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
498 / sizeof (*ondisk))
499 return -EINVAL;
602adf40 500 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
f9f9a190 501 snap_count * sizeof(u64),
602adf40
YS
502 gfp_flags);
503 if (!header->snapc)
504 return -ENOMEM;
00f1f36f 505
00f1f36f 506 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
602adf40
YS
507 if (snap_count) {
508 header->snap_names = kmalloc(header->snap_names_len,
f8ad495a 509 gfp_flags);
602adf40
YS
510 if (!header->snap_names)
511 goto err_snapc;
512 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
f8ad495a 513 gfp_flags);
602adf40
YS
514 if (!header->snap_sizes)
515 goto err_names;
516 } else {
517 header->snap_names = NULL;
518 header->snap_sizes = NULL;
519 }
ca1e49a6 520 memcpy(header->object_prefix, ondisk->block_name,
602adf40
YS
521 sizeof(ondisk->block_name));
522
523 header->image_size = le64_to_cpu(ondisk->image_size);
524 header->obj_order = ondisk->options.order;
525 header->crypt_type = ondisk->options.crypt_type;
526 header->comp_type = ondisk->options.comp_type;
527
528 atomic_set(&header->snapc->nref, 1);
529 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
530 header->snapc->num_snaps = snap_count;
531 header->total_snaps = snap_count;
532
21079786 533 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
534 for (i = 0; i < snap_count; i++) {
535 header->snapc->snaps[i] =
536 le64_to_cpu(ondisk->snaps[i].id);
537 header->snap_sizes[i] =
538 le64_to_cpu(ondisk->snaps[i].image_size);
539 }
540
541 /* copy snapshot names */
542 memcpy(header->snap_names, &ondisk->snaps[i],
543 header->snap_names_len);
544 }
545
546 return 0;
547
548err_names:
549 kfree(header->snap_names);
550err_snapc:
551 kfree(header->snapc);
00f1f36f 552 return -ENOMEM;
602adf40
YS
553}
554
602adf40
YS
555static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
556 u64 *seq, u64 *size)
557{
558 int i;
559 char *p = header->snap_names;
560
00f1f36f
AE
561 for (i = 0; i < header->total_snaps; i++) {
562 if (!strcmp(snap_name, p)) {
602adf40 563
00f1f36f 564 /* Found it. Pass back its id and/or size */
602adf40 565
00f1f36f
AE
566 if (seq)
567 *seq = header->snapc->snaps[i];
568 if (size)
569 *size = header->snap_sizes[i];
570 return i;
571 }
572 p += strlen(p) + 1; /* Skip ahead to the next name */
573 }
574 return -ENOENT;
602adf40
YS
575}
576
cc9d734c 577static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
578{
579 struct rbd_image_header *header = &dev->header;
580 struct ceph_snap_context *snapc = header->snapc;
581 int ret = -ENOENT;
582
cc9d734c
JD
583 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
584
c666601a 585 down_write(&dev->header_rwsem);
602adf40 586
cc9d734c
JD
587 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
588 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
589 if (header->total_snaps)
590 snapc->seq = header->snap_seq;
591 else
592 snapc->seq = 0;
77dfe99f 593 dev->snap_id = CEPH_NOSNAP;
602adf40
YS
594 dev->read_only = 0;
595 if (size)
596 *size = header->image_size;
597 } else {
cc9d734c 598 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
599 if (ret < 0)
600 goto done;
77dfe99f 601 dev->snap_id = snapc->seq;
602adf40
YS
602 dev->read_only = 1;
603 }
604
605 ret = 0;
606done:
c666601a 607 up_write(&dev->header_rwsem);
602adf40
YS
608 return ret;
609}
610
611static void rbd_header_free(struct rbd_image_header *header)
612{
613 kfree(header->snapc);
614 kfree(header->snap_names);
615 kfree(header->snap_sizes);
616}
617
618/*
619 * get the actual striped segment name, offset and length
620 */
621static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 622 const char *object_prefix,
602adf40
YS
623 u64 ofs, u64 len,
624 char *seg_name, u64 *segofs)
625{
626 u64 seg = ofs >> header->obj_order;
627
628 if (seg_name)
629 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 630 "%s.%012llx", object_prefix, seg);
602adf40
YS
631
632 ofs = ofs & ((1 << header->obj_order) - 1);
633 len = min_t(u64, len, (1 << header->obj_order) - ofs);
634
635 if (segofs)
636 *segofs = ofs;
637
638 return len;
639}
640
1fec7093
YS
641static int rbd_get_num_segments(struct rbd_image_header *header,
642 u64 ofs, u64 len)
643{
644 u64 start_seg = ofs >> header->obj_order;
645 u64 end_seg = (ofs + len - 1) >> header->obj_order;
646 return end_seg - start_seg + 1;
647}
648
029bcbd8
JD
649/*
650 * returns the size of an object in the image
651 */
652static u64 rbd_obj_bytes(struct rbd_image_header *header)
653{
654 return 1 << header->obj_order;
655}
656
602adf40
YS
657/*
658 * bio helpers
659 */
660
661static void bio_chain_put(struct bio *chain)
662{
663 struct bio *tmp;
664
665 while (chain) {
666 tmp = chain;
667 chain = chain->bi_next;
668 bio_put(tmp);
669 }
670}
671
672/*
673 * zeros a bio chain, starting at specific offset
674 */
675static void zero_bio_chain(struct bio *chain, int start_ofs)
676{
677 struct bio_vec *bv;
678 unsigned long flags;
679 void *buf;
680 int i;
681 int pos = 0;
682
683 while (chain) {
684 bio_for_each_segment(bv, chain, i) {
685 if (pos + bv->bv_len > start_ofs) {
686 int remainder = max(start_ofs - pos, 0);
687 buf = bvec_kmap_irq(bv, &flags);
688 memset(buf + remainder, 0,
689 bv->bv_len - remainder);
85b5aaa6 690 bvec_kunmap_irq(buf, &flags);
602adf40
YS
691 }
692 pos += bv->bv_len;
693 }
694
695 chain = chain->bi_next;
696 }
697}
698
699/*
700 * bio_chain_clone - clone a chain of bios up to a certain length.
701 * might return a bio_pair that will need to be released.
702 */
703static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
704 struct bio_pair **bp,
705 int len, gfp_t gfpmask)
706{
707 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
708 int total = 0;
709
710 if (*bp) {
711 bio_pair_release(*bp);
712 *bp = NULL;
713 }
714
715 while (old_chain && (total < len)) {
716 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
717 if (!tmp)
718 goto err_out;
719
720 if (total + old_chain->bi_size > len) {
721 struct bio_pair *bp;
722
723 /*
724 * this split can only happen with a single paged bio,
725 * split_bio will BUG_ON if this is not the case
726 */
727 dout("bio_chain_clone split! total=%d remaining=%d"
728 "bi_size=%d\n",
729 (int)total, (int)len-total,
730 (int)old_chain->bi_size);
731
732 /* split the bio. We'll release it either in the next
733 call, or it will have to be released outside */
593a9e7b 734 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
735 if (!bp)
736 goto err_out;
737
738 __bio_clone(tmp, &bp->bio1);
739
740 *next = &bp->bio2;
741 } else {
742 __bio_clone(tmp, old_chain);
743 *next = old_chain->bi_next;
744 }
745
746 tmp->bi_bdev = NULL;
747 gfpmask &= ~__GFP_WAIT;
748 tmp->bi_next = NULL;
749
750 if (!new_chain) {
751 new_chain = tail = tmp;
752 } else {
753 tail->bi_next = tmp;
754 tail = tmp;
755 }
756 old_chain = old_chain->bi_next;
757
758 total += tmp->bi_size;
759 }
760
761 BUG_ON(total < len);
762
763 if (tail)
764 tail->bi_next = NULL;
765
766 *old = old_chain;
767
768 return new_chain;
769
770err_out:
771 dout("bio_chain_clone with err\n");
772 bio_chain_put(new_chain);
773 return NULL;
774}
775
776/*
777 * helpers for osd request op vectors.
778 */
779static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
780 int num_ops,
781 int opcode,
782 u32 payload_len)
783{
784 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
785 GFP_NOIO);
786 if (!*ops)
787 return -ENOMEM;
788 (*ops)[0].op = opcode;
789 /*
790 * op extent offset and length will be set later on
791 * in calc_raw_layout()
792 */
793 (*ops)[0].payload_len = payload_len;
794 return 0;
795}
796
797static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
798{
799 kfree(ops);
800}
801
1fec7093
YS
802static void rbd_coll_end_req_index(struct request *rq,
803 struct rbd_req_coll *coll,
804 int index,
805 int ret, u64 len)
806{
807 struct request_queue *q;
808 int min, max, i;
809
810 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
811 coll, index, ret, len);
812
813 if (!rq)
814 return;
815
816 if (!coll) {
817 blk_end_request(rq, ret, len);
818 return;
819 }
820
821 q = rq->q;
822
823 spin_lock_irq(q->queue_lock);
824 coll->status[index].done = 1;
825 coll->status[index].rc = ret;
826 coll->status[index].bytes = len;
827 max = min = coll->num_done;
828 while (max < coll->total && coll->status[max].done)
829 max++;
830
831 for (i = min; i<max; i++) {
832 __blk_end_request(rq, coll->status[i].rc,
833 coll->status[i].bytes);
834 coll->num_done++;
835 kref_put(&coll->kref, rbd_coll_release);
836 }
837 spin_unlock_irq(q->queue_lock);
838}
839
840static void rbd_coll_end_req(struct rbd_request *req,
841 int ret, u64 len)
842{
843 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
844}
845
602adf40
YS
846/*
847 * Send ceph osd request
848 */
849static int rbd_do_request(struct request *rq,
850 struct rbd_device *dev,
851 struct ceph_snap_context *snapc,
852 u64 snapid,
853 const char *obj, u64 ofs, u64 len,
854 struct bio *bio,
855 struct page **pages,
856 int num_pages,
857 int flags,
858 struct ceph_osd_req_op *ops,
859 int num_reply,
1fec7093
YS
860 struct rbd_req_coll *coll,
861 int coll_index,
602adf40 862 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
863 struct ceph_msg *msg),
864 struct ceph_osd_request **linger_req,
865 u64 *ver)
602adf40
YS
866{
867 struct ceph_osd_request *req;
868 struct ceph_file_layout *layout;
869 int ret;
870 u64 bno;
871 struct timespec mtime = CURRENT_TIME;
872 struct rbd_request *req_data;
873 struct ceph_osd_request_head *reqhead;
1dbb4399 874 struct ceph_osd_client *osdc;
602adf40 875
602adf40 876 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
877 if (!req_data) {
878 if (coll)
879 rbd_coll_end_req_index(rq, coll, coll_index,
880 -ENOMEM, len);
881 return -ENOMEM;
882 }
883
884 if (coll) {
885 req_data->coll = coll;
886 req_data->coll_index = coll_index;
887 }
602adf40 888
1fec7093 889 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40 890
c666601a 891 down_read(&dev->header_rwsem);
602adf40 892
1dbb4399
AE
893 osdc = &dev->rbd_client->client->osdc;
894 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
895 false, GFP_NOIO, pages, bio);
4ad12621 896 if (!req) {
c666601a 897 up_read(&dev->header_rwsem);
4ad12621 898 ret = -ENOMEM;
602adf40
YS
899 goto done_pages;
900 }
901
902 req->r_callback = rbd_cb;
903
904 req_data->rq = rq;
905 req_data->bio = bio;
906 req_data->pages = pages;
907 req_data->len = len;
908
909 req->r_priv = req_data;
910
911 reqhead = req->r_request->front.iov_base;
912 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
913
914 strncpy(req->r_oid, obj, sizeof(req->r_oid));
915 req->r_oid_len = strlen(req->r_oid);
916
917 layout = &req->r_file_layout;
918 memset(layout, 0, sizeof(*layout));
919 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
920 layout->fl_stripe_count = cpu_to_le32(1);
921 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
9bb2f334 922 layout->fl_pg_pool = cpu_to_le32(dev->pool_id);
1dbb4399
AE
923 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
924 req, ops);
602adf40
YS
925
926 ceph_osdc_build_request(req, ofs, &len,
927 ops,
928 snapc,
929 &mtime,
930 req->r_oid, req->r_oid_len);
c666601a 931 up_read(&dev->header_rwsem);
602adf40 932
59c2be1e 933 if (linger_req) {
1dbb4399 934 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
935 *linger_req = req;
936 }
937
1dbb4399 938 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
939 if (ret < 0)
940 goto done_err;
941
942 if (!rbd_cb) {
1dbb4399 943 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
944 if (ver)
945 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
946 dout("reassert_ver=%lld\n",
947 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
948 ceph_osdc_put_request(req);
949 }
950 return ret;
951
952done_err:
953 bio_chain_put(req_data->bio);
954 ceph_osdc_put_request(req);
955done_pages:
1fec7093 956 rbd_coll_end_req(req_data, ret, len);
602adf40 957 kfree(req_data);
602adf40
YS
958 return ret;
959}
960
961/*
962 * Ceph osd op callback
963 */
964static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
965{
966 struct rbd_request *req_data = req->r_priv;
967 struct ceph_osd_reply_head *replyhead;
968 struct ceph_osd_op *op;
969 __s32 rc;
970 u64 bytes;
971 int read_op;
972
973 /* parse reply */
974 replyhead = msg->front.iov_base;
975 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
976 op = (void *)(replyhead + 1);
977 rc = le32_to_cpu(replyhead->result);
978 bytes = le64_to_cpu(op->extent.length);
895cfcc8 979 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40
YS
980
981 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
982
983 if (rc == -ENOENT && read_op) {
984 zero_bio_chain(req_data->bio, 0);
985 rc = 0;
986 } else if (rc == 0 && read_op && bytes < req_data->len) {
987 zero_bio_chain(req_data->bio, bytes);
988 bytes = req_data->len;
989 }
990
1fec7093 991 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
992
993 if (req_data->bio)
994 bio_chain_put(req_data->bio);
995
996 ceph_osdc_put_request(req);
997 kfree(req_data);
998}
999
59c2be1e
YS
1000static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1001{
1002 ceph_osdc_put_request(req);
1003}
1004
602adf40
YS
1005/*
1006 * Do a synchronous ceph osd operation
1007 */
1008static int rbd_req_sync_op(struct rbd_device *dev,
1009 struct ceph_snap_context *snapc,
1010 u64 snapid,
1011 int opcode,
1012 int flags,
1013 struct ceph_osd_req_op *orig_ops,
1014 int num_reply,
1015 const char *obj,
1016 u64 ofs, u64 len,
59c2be1e
YS
1017 char *buf,
1018 struct ceph_osd_request **linger_req,
1019 u64 *ver)
602adf40
YS
1020{
1021 int ret;
1022 struct page **pages;
1023 int num_pages;
1024 struct ceph_osd_req_op *ops = orig_ops;
1025 u32 payload_len;
1026
1027 num_pages = calc_pages_for(ofs , len);
1028 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1029 if (IS_ERR(pages))
1030 return PTR_ERR(pages);
602adf40
YS
1031
1032 if (!orig_ops) {
1033 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1034 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1035 if (ret < 0)
1036 goto done;
1037
1038 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1039 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1040 if (ret < 0)
1041 goto done_ops;
1042 }
1043 }
1044
1045 ret = rbd_do_request(NULL, dev, snapc, snapid,
1046 obj, ofs, len, NULL,
1047 pages, num_pages,
1048 flags,
1049 ops,
1050 2,
1fec7093 1051 NULL, 0,
59c2be1e
YS
1052 NULL,
1053 linger_req, ver);
602adf40
YS
1054 if (ret < 0)
1055 goto done_ops;
1056
1057 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1058 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1059
1060done_ops:
1061 if (!orig_ops)
1062 rbd_destroy_ops(ops);
1063done:
1064 ceph_release_page_vector(pages, num_pages);
1065 return ret;
1066}
1067
1068/*
1069 * Do an asynchronous ceph osd operation
1070 */
1071static int rbd_do_op(struct request *rq,
1072 struct rbd_device *rbd_dev ,
1073 struct ceph_snap_context *snapc,
1074 u64 snapid,
1075 int opcode, int flags, int num_reply,
1076 u64 ofs, u64 len,
1fec7093
YS
1077 struct bio *bio,
1078 struct rbd_req_coll *coll,
1079 int coll_index)
602adf40
YS
1080{
1081 char *seg_name;
1082 u64 seg_ofs;
1083 u64 seg_len;
1084 int ret;
1085 struct ceph_osd_req_op *ops;
1086 u32 payload_len;
1087
1088 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089 if (!seg_name)
1090 return -ENOMEM;
1091
1092 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1093 rbd_dev->header.object_prefix,
602adf40
YS
1094 ofs, len,
1095 seg_name, &seg_ofs);
602adf40
YS
1096
1097 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098
1099 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1100 if (ret < 0)
1101 goto done;
1102
1103 /* we've taken care of segment sizes earlier when we
1104 cloned the bios. We should never have a segment
1105 truncated at this point */
1106 BUG_ON(seg_len < len);
1107
1108 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1109 seg_name, seg_ofs, seg_len,
1110 bio,
1111 NULL, 0,
1112 flags,
1113 ops,
1114 num_reply,
1fec7093 1115 coll, coll_index,
59c2be1e 1116 rbd_req_cb, 0, NULL);
11f77002
SW
1117
1118 rbd_destroy_ops(ops);
602adf40
YS
1119done:
1120 kfree(seg_name);
1121 return ret;
1122}
1123
1124/*
1125 * Request async osd write
1126 */
1127static int rbd_req_write(struct request *rq,
1128 struct rbd_device *rbd_dev,
1129 struct ceph_snap_context *snapc,
1130 u64 ofs, u64 len,
1fec7093
YS
1131 struct bio *bio,
1132 struct rbd_req_coll *coll,
1133 int coll_index)
602adf40
YS
1134{
1135 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136 CEPH_OSD_OP_WRITE,
1137 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138 2,
1fec7093 1139 ofs, len, bio, coll, coll_index);
602adf40
YS
1140}
1141
1142/*
1143 * Request async osd read
1144 */
1145static int rbd_req_read(struct request *rq,
1146 struct rbd_device *rbd_dev,
1147 u64 snapid,
1148 u64 ofs, u64 len,
1fec7093
YS
1149 struct bio *bio,
1150 struct rbd_req_coll *coll,
1151 int coll_index)
602adf40
YS
1152{
1153 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1154 snapid,
602adf40
YS
1155 CEPH_OSD_OP_READ,
1156 CEPH_OSD_FLAG_READ,
1157 2,
1fec7093 1158 ofs, len, bio, coll, coll_index);
602adf40
YS
1159}
1160
1161/*
1162 * Request sync osd read
1163 */
1164static int rbd_req_sync_read(struct rbd_device *dev,
1165 struct ceph_snap_context *snapc,
1166 u64 snapid,
1167 const char *obj,
1168 u64 ofs, u64 len,
59c2be1e
YS
1169 char *buf,
1170 u64 *ver)
602adf40
YS
1171{
1172 return rbd_req_sync_op(dev, NULL,
b06e6a6b 1173 snapid,
602adf40
YS
1174 CEPH_OSD_OP_READ,
1175 CEPH_OSD_FLAG_READ,
1176 NULL,
59c2be1e 1177 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1178}
1179
1180/*
59c2be1e
YS
1181 * Request sync osd watch
1182 */
1183static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1184 u64 ver,
1185 u64 notify_id,
1186 const char *obj)
1187{
1188 struct ceph_osd_req_op *ops;
11f77002
SW
1189 int ret;
1190
1191 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1192 if (ret < 0)
1193 return ret;
1194
1195 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1196 ops[0].watch.cookie = notify_id;
1197 ops[0].watch.flag = 0;
1198
1199 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1200 obj, 0, 0, NULL,
ad4f232f 1201 NULL, 0,
59c2be1e
YS
1202 CEPH_OSD_FLAG_READ,
1203 ops,
1204 1,
1fec7093 1205 NULL, 0,
59c2be1e
YS
1206 rbd_simple_req_cb, 0, NULL);
1207
1208 rbd_destroy_ops(ops);
1209 return ret;
1210}
1211
1212static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213{
1214 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1215 int rc;
1216
59c2be1e
YS
1217 if (!dev)
1218 return;
1219
1220 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1221 notify_id, (int)opcode);
1222 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 1223 rc = __rbd_refresh_header(dev);
59c2be1e 1224 mutex_unlock(&ctl_mutex);
13143d2d 1225 if (rc)
f0f8cef5
AE
1226 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1227 " update snaps: %d\n", dev->major, rc);
59c2be1e
YS
1228
1229 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1230}
1231
1232/*
1233 * Request sync osd watch
1234 */
1235static int rbd_req_sync_watch(struct rbd_device *dev,
1236 const char *obj,
1237 u64 ver)
1238{
1239 struct ceph_osd_req_op *ops;
1dbb4399 1240 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1241
1242 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1243 if (ret < 0)
1244 return ret;
1245
1246 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1247 (void *)dev, &dev->watch_event);
1248 if (ret < 0)
1249 goto fail;
1250
1251 ops[0].watch.ver = cpu_to_le64(ver);
1252 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1253 ops[0].watch.flag = 1;
1254
1255 ret = rbd_req_sync_op(dev, NULL,
1256 CEPH_NOSNAP,
1257 0,
1258 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1259 ops,
1260 1, obj, 0, 0, NULL,
1261 &dev->watch_request, NULL);
1262
1263 if (ret < 0)
1264 goto fail_event;
1265
1266 rbd_destroy_ops(ops);
1267 return 0;
1268
1269fail_event:
1270 ceph_osdc_cancel_event(dev->watch_event);
1271 dev->watch_event = NULL;
1272fail:
1273 rbd_destroy_ops(ops);
1274 return ret;
1275}
1276
79e3057c
YS
1277/*
1278 * Request sync osd unwatch
1279 */
1280static int rbd_req_sync_unwatch(struct rbd_device *dev,
1281 const char *obj)
1282{
1283 struct ceph_osd_req_op *ops;
1284
1285 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1286 if (ret < 0)
1287 return ret;
1288
1289 ops[0].watch.ver = 0;
1290 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1291 ops[0].watch.flag = 0;
1292
1293 ret = rbd_req_sync_op(dev, NULL,
1294 CEPH_NOSNAP,
1295 0,
1296 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1297 ops,
1298 1, obj, 0, 0, NULL, NULL, NULL);
1299
1300 rbd_destroy_ops(ops);
1301 ceph_osdc_cancel_event(dev->watch_event);
1302 dev->watch_event = NULL;
1303 return ret;
1304}
1305
59c2be1e
YS
1306struct rbd_notify_info {
1307 struct rbd_device *dev;
1308};
1309
1310static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1311{
1312 struct rbd_device *dev = (struct rbd_device *)data;
1313 if (!dev)
1314 return;
1315
1316 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1317 notify_id, (int)opcode);
1318}
1319
1320/*
1321 * Request sync osd notify
1322 */
1323static int rbd_req_sync_notify(struct rbd_device *dev,
1324 const char *obj)
1325{
1326 struct ceph_osd_req_op *ops;
1dbb4399 1327 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1328 struct ceph_osd_event *event;
1329 struct rbd_notify_info info;
1330 int payload_len = sizeof(u32) + sizeof(u32);
1331 int ret;
1332
1333 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1334 if (ret < 0)
1335 return ret;
1336
1337 info.dev = dev;
1338
1339 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1340 (void *)&info, &event);
1341 if (ret < 0)
1342 goto fail;
1343
1344 ops[0].watch.ver = 1;
1345 ops[0].watch.flag = 1;
1346 ops[0].watch.cookie = event->cookie;
1347 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1348 ops[0].watch.timeout = 12;
1349
1350 ret = rbd_req_sync_op(dev, NULL,
1351 CEPH_NOSNAP,
1352 0,
1353 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1354 ops,
1355 1, obj, 0, 0, NULL, NULL, NULL);
1356 if (ret < 0)
1357 goto fail_event;
1358
1359 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1360 dout("ceph_osdc_wait_event returned %d\n", ret);
1361 rbd_destroy_ops(ops);
1362 return 0;
1363
1364fail_event:
1365 ceph_osdc_cancel_event(event);
1366fail:
1367 rbd_destroy_ops(ops);
1368 return ret;
1369}
1370
602adf40
YS
1371/*
1372 * Request sync osd read
1373 */
1374static int rbd_req_sync_exec(struct rbd_device *dev,
1375 const char *obj,
1376 const char *cls,
1377 const char *method,
1378 const char *data,
59c2be1e
YS
1379 int len,
1380 u64 *ver)
602adf40
YS
1381{
1382 struct ceph_osd_req_op *ops;
1383 int cls_len = strlen(cls);
1384 int method_len = strlen(method);
1385 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1386 cls_len + method_len + len);
1387 if (ret < 0)
1388 return ret;
1389
1390 ops[0].cls.class_name = cls;
1391 ops[0].cls.class_len = (__u8)cls_len;
1392 ops[0].cls.method_name = method;
1393 ops[0].cls.method_len = (__u8)method_len;
1394 ops[0].cls.argc = 0;
1395 ops[0].cls.indata = data;
1396 ops[0].cls.indata_len = len;
1397
1398 ret = rbd_req_sync_op(dev, NULL,
1399 CEPH_NOSNAP,
1400 0,
1401 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1402 ops,
59c2be1e 1403 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1404
1405 rbd_destroy_ops(ops);
1406
1407 dout("cls_exec returned %d\n", ret);
1408 return ret;
1409}
1410
1fec7093
YS
1411static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1412{
1413 struct rbd_req_coll *coll =
1414 kzalloc(sizeof(struct rbd_req_coll) +
1415 sizeof(struct rbd_req_status) * num_reqs,
1416 GFP_ATOMIC);
1417
1418 if (!coll)
1419 return NULL;
1420 coll->total = num_reqs;
1421 kref_init(&coll->kref);
1422 return coll;
1423}
1424
602adf40
YS
1425/*
1426 * block device queue callback
1427 */
1428static void rbd_rq_fn(struct request_queue *q)
1429{
1430 struct rbd_device *rbd_dev = q->queuedata;
1431 struct request *rq;
1432 struct bio_pair *bp = NULL;
1433
00f1f36f 1434 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1435 struct bio *bio;
1436 struct bio *rq_bio, *next_bio = NULL;
1437 bool do_write;
1438 int size, op_size = 0;
1439 u64 ofs;
1fec7093
YS
1440 int num_segs, cur_seg = 0;
1441 struct rbd_req_coll *coll;
602adf40
YS
1442
1443 /* peek at request from block layer */
1444 if (!rq)
1445 break;
1446
1447 dout("fetched request\n");
1448
1449 /* filter out block requests we don't understand */
1450 if ((rq->cmd_type != REQ_TYPE_FS)) {
1451 __blk_end_request_all(rq, 0);
00f1f36f 1452 continue;
602adf40
YS
1453 }
1454
1455 /* deduce our operation (read, write) */
1456 do_write = (rq_data_dir(rq) == WRITE);
1457
1458 size = blk_rq_bytes(rq);
593a9e7b 1459 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1460 rq_bio = rq->bio;
1461 if (do_write && rbd_dev->read_only) {
1462 __blk_end_request_all(rq, -EROFS);
00f1f36f 1463 continue;
602adf40
YS
1464 }
1465
1466 spin_unlock_irq(q->queue_lock);
1467
1468 dout("%s 0x%x bytes at 0x%llx\n",
1469 do_write ? "write" : "read",
593a9e7b 1470 size, blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1471
1fec7093
YS
1472 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1473 coll = rbd_alloc_coll(num_segs);
1474 if (!coll) {
1475 spin_lock_irq(q->queue_lock);
1476 __blk_end_request_all(rq, -ENOMEM);
00f1f36f 1477 continue;
1fec7093
YS
1478 }
1479
602adf40
YS
1480 do {
1481 /* a bio clone to be passed down to OSD req */
1482 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1483 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1484 rbd_dev->header.object_prefix,
602adf40
YS
1485 ofs, size,
1486 NULL, NULL);
1fec7093 1487 kref_get(&coll->kref);
602adf40
YS
1488 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1489 op_size, GFP_ATOMIC);
1490 if (!bio) {
1fec7093
YS
1491 rbd_coll_end_req_index(rq, coll, cur_seg,
1492 -ENOMEM, op_size);
1493 goto next_seg;
602adf40
YS
1494 }
1495
1fec7093 1496
602adf40
YS
1497 /* init OSD command: write or read */
1498 if (do_write)
1499 rbd_req_write(rq, rbd_dev,
1500 rbd_dev->header.snapc,
1501 ofs,
1fec7093
YS
1502 op_size, bio,
1503 coll, cur_seg);
602adf40
YS
1504 else
1505 rbd_req_read(rq, rbd_dev,
77dfe99f 1506 rbd_dev->snap_id,
602adf40 1507 ofs,
1fec7093
YS
1508 op_size, bio,
1509 coll, cur_seg);
602adf40 1510
1fec7093 1511next_seg:
602adf40
YS
1512 size -= op_size;
1513 ofs += op_size;
1514
1fec7093 1515 cur_seg++;
602adf40
YS
1516 rq_bio = next_bio;
1517 } while (size > 0);
1fec7093 1518 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1519
1520 if (bp)
1521 bio_pair_release(bp);
602adf40 1522 spin_lock_irq(q->queue_lock);
602adf40
YS
1523 }
1524}
1525
1526/*
1527 * a queue callback. Makes sure that we don't create a bio that spans across
1528 * multiple osd objects. One exception would be with a single page bios,
1529 * which we handle later at bio_chain_clone
1530 */
1531static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1532 struct bio_vec *bvec)
1533{
1534 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1535 unsigned int chunk_sectors;
1536 sector_t sector;
1537 unsigned int bio_sectors;
602adf40
YS
1538 int max;
1539
593a9e7b
AE
1540 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1541 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1542 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1543
602adf40 1544 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1545 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1546 if (max < 0)
1547 max = 0; /* bio_add cannot handle a negative return */
1548 if (max <= bvec->bv_len && bio_sectors == 0)
1549 return bvec->bv_len;
1550 return max;
1551}
1552
1553static void rbd_free_disk(struct rbd_device *rbd_dev)
1554{
1555 struct gendisk *disk = rbd_dev->disk;
1556
1557 if (!disk)
1558 return;
1559
1560 rbd_header_free(&rbd_dev->header);
1561
1562 if (disk->flags & GENHD_FL_UP)
1563 del_gendisk(disk);
1564 if (disk->queue)
1565 blk_cleanup_queue(disk->queue);
1566 put_disk(disk);
1567}
1568
1569/*
1570 * reload the ondisk the header
1571 */
1572static int rbd_read_header(struct rbd_device *rbd_dev,
1573 struct rbd_image_header *header)
1574{
1575 ssize_t rc;
1576 struct rbd_image_header_ondisk *dh;
50f7c4c9 1577 u32 snap_count = 0;
59c2be1e 1578 u64 ver;
00f1f36f 1579 size_t len;
602adf40 1580
00f1f36f
AE
1581 /*
1582 * First reads the fixed-size header to determine the number
1583 * of snapshots, then re-reads it, along with all snapshot
1584 * records as well as their stored names.
1585 */
1586 len = sizeof (*dh);
602adf40 1587 while (1) {
602adf40
YS
1588 dh = kmalloc(len, GFP_KERNEL);
1589 if (!dh)
1590 return -ENOMEM;
1591
1592 rc = rbd_req_sync_read(rbd_dev,
1593 NULL, CEPH_NOSNAP,
1594 rbd_dev->obj_md_name,
1595 0, len,
59c2be1e 1596 (char *)dh, &ver);
602adf40
YS
1597 if (rc < 0)
1598 goto out_dh;
1599
1600 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb 1601 if (rc < 0) {
00f1f36f 1602 if (rc == -ENXIO)
81e759fb
JD
1603 pr_warning("unrecognized header format"
1604 " for image %s", rbd_dev->obj);
602adf40 1605 goto out_dh;
81e759fb 1606 }
602adf40 1607
00f1f36f
AE
1608 if (snap_count == header->total_snaps)
1609 break;
1610
1611 snap_count = header->total_snaps;
1612 len = sizeof (*dh) +
1613 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1614 header->snap_names_len;
1615
1616 rbd_header_free(header);
1617 kfree(dh);
602adf40 1618 }
59c2be1e 1619 header->obj_version = ver;
602adf40
YS
1620
1621out_dh:
1622 kfree(dh);
1623 return rc;
1624}
1625
1626/*
1627 * create a snapshot
1628 */
1629static int rbd_header_add_snap(struct rbd_device *dev,
1630 const char *snap_name,
1631 gfp_t gfp_flags)
1632{
1633 int name_len = strlen(snap_name);
1634 u64 new_snapid;
1635 int ret;
916d4d67 1636 void *data, *p, *e;
59c2be1e 1637 u64 ver;
1dbb4399 1638 struct ceph_mon_client *monc;
602adf40
YS
1639
1640 /* we should create a snapshot only if we're pointing at the head */
77dfe99f 1641 if (dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1642 return -EINVAL;
1643
1dbb4399 1644 monc = &dev->rbd_client->client->monc;
9bb2f334 1645 ret = ceph_monc_create_snapid(monc, dev->pool_id, &new_snapid);
602adf40
YS
1646 dout("created snapid=%lld\n", new_snapid);
1647 if (ret < 0)
1648 return ret;
1649
1650 data = kmalloc(name_len + 16, gfp_flags);
1651 if (!data)
1652 return -ENOMEM;
1653
916d4d67
SW
1654 p = data;
1655 e = data + name_len + 16;
602adf40 1656
916d4d67
SW
1657 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1658 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1659
1660 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1661 data, p - data, &ver);
602adf40 1662
916d4d67 1663 kfree(data);
602adf40
YS
1664
1665 if (ret < 0)
1666 return ret;
1667
403f24d3
JD
1668 down_write(&dev->header_rwsem);
1669 dev->header.snapc->seq = new_snapid;
1670 up_write(&dev->header_rwsem);
602adf40
YS
1671
1672 return 0;
1673bad:
1674 return -ERANGE;
1675}
1676
dfc5606d
YS
1677static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1678{
1679 struct rbd_snap *snap;
1680
1681 while (!list_empty(&rbd_dev->snaps)) {
1682 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1683 __rbd_remove_snap_dev(rbd_dev, snap);
1684 }
1685}
1686
602adf40
YS
1687/*
1688 * only read the first part of the ondisk header, without the snaps info
1689 */
263c6ca0 1690static int __rbd_refresh_header(struct rbd_device *rbd_dev)
602adf40
YS
1691{
1692 int ret;
1693 struct rbd_image_header h;
1694 u64 snap_seq;
59c2be1e 1695 int follow_seq = 0;
602adf40
YS
1696
1697 ret = rbd_read_header(rbd_dev, &h);
1698 if (ret < 0)
1699 return ret;
1700
9db4b3e3 1701 /* resized? */
593a9e7b 1702 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
9db4b3e3 1703
c666601a 1704 down_write(&rbd_dev->header_rwsem);
602adf40
YS
1705
1706 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1707 if (rbd_dev->header.total_snaps &&
1708 rbd_dev->header.snapc->snaps[0] == snap_seq)
1709 /* pointing at the head, will need to follow that
1710 if head moves */
1711 follow_seq = 1;
602adf40
YS
1712
1713 kfree(rbd_dev->header.snapc);
1714 kfree(rbd_dev->header.snap_names);
1715 kfree(rbd_dev->header.snap_sizes);
1716
1717 rbd_dev->header.total_snaps = h.total_snaps;
1718 rbd_dev->header.snapc = h.snapc;
1719 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1720 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1721 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1722 if (follow_seq)
1723 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1724 else
1725 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1726
dfc5606d
YS
1727 ret = __rbd_init_snaps_header(rbd_dev);
1728
c666601a 1729 up_write(&rbd_dev->header_rwsem);
602adf40 1730
dfc5606d 1731 return ret;
602adf40
YS
1732}
1733
1734static int rbd_init_disk(struct rbd_device *rbd_dev)
1735{
1736 struct gendisk *disk;
1737 struct request_queue *q;
1738 int rc;
593a9e7b 1739 u64 segment_size;
602adf40
YS
1740 u64 total_size = 0;
1741
1742 /* contact OSD, request size info about the object being mapped */
1743 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1744 if (rc)
1745 return rc;
1746
dfc5606d
YS
1747 /* no need to lock here, as rbd_dev is not registered yet */
1748 rc = __rbd_init_snaps_header(rbd_dev);
1749 if (rc)
1750 return rc;
1751
cc9d734c 1752 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1753 if (rc)
1754 return rc;
1755
1756 /* create gendisk info */
1757 rc = -ENOMEM;
1758 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1759 if (!disk)
1760 goto out;
1761
f0f8cef5 1762 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1763 rbd_dev->id);
602adf40
YS
1764 disk->major = rbd_dev->major;
1765 disk->first_minor = 0;
1766 disk->fops = &rbd_bd_ops;
1767 disk->private_data = rbd_dev;
1768
1769 /* init rq */
1770 rc = -ENOMEM;
1771 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1772 if (!q)
1773 goto out_disk;
029bcbd8 1774
593a9e7b
AE
1775 /* We use the default size, but let's be explicit about it. */
1776 blk_queue_physical_block_size(q, SECTOR_SIZE);
1777
029bcbd8 1778 /* set io sizes to object size */
593a9e7b
AE
1779 segment_size = rbd_obj_bytes(&rbd_dev->header);
1780 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1781 blk_queue_max_segment_size(q, segment_size);
1782 blk_queue_io_min(q, segment_size);
1783 blk_queue_io_opt(q, segment_size);
029bcbd8 1784
602adf40
YS
1785 blk_queue_merge_bvec(q, rbd_merge_bvec);
1786 disk->queue = q;
1787
1788 q->queuedata = rbd_dev;
1789
1790 rbd_dev->disk = disk;
1791 rbd_dev->q = q;
1792
1793 /* finally, announce the disk to the world */
593a9e7b 1794 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1795 add_disk(disk);
1796
1797 pr_info("%s: added with size 0x%llx\n",
1798 disk->disk_name, (unsigned long long)total_size);
1799 return 0;
1800
1801out_disk:
1802 put_disk(disk);
1803out:
1804 return rc;
1805}
1806
dfc5606d
YS
1807/*
1808 sysfs
1809*/
1810
593a9e7b
AE
1811static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1812{
1813 return container_of(dev, struct rbd_device, dev);
1814}
1815
dfc5606d
YS
1816static ssize_t rbd_size_show(struct device *dev,
1817 struct device_attribute *attr, char *buf)
1818{
593a9e7b 1819 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1820
1821 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1822}
1823
1824static ssize_t rbd_major_show(struct device *dev,
1825 struct device_attribute *attr, char *buf)
1826{
593a9e7b 1827 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1828
dfc5606d
YS
1829 return sprintf(buf, "%d\n", rbd_dev->major);
1830}
1831
1832static ssize_t rbd_client_id_show(struct device *dev,
1833 struct device_attribute *attr, char *buf)
602adf40 1834{
593a9e7b 1835 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1836
1dbb4399
AE
1837 return sprintf(buf, "client%lld\n",
1838 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1839}
1840
dfc5606d
YS
1841static ssize_t rbd_pool_show(struct device *dev,
1842 struct device_attribute *attr, char *buf)
602adf40 1843{
593a9e7b 1844 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1845
1846 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1847}
1848
9bb2f334
AE
1849static ssize_t rbd_pool_id_show(struct device *dev,
1850 struct device_attribute *attr, char *buf)
1851{
1852 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1853
1854 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1855}
1856
dfc5606d
YS
1857static ssize_t rbd_name_show(struct device *dev,
1858 struct device_attribute *attr, char *buf)
1859{
593a9e7b 1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1861
1862 return sprintf(buf, "%s\n", rbd_dev->obj);
1863}
1864
1865static ssize_t rbd_snap_show(struct device *dev,
1866 struct device_attribute *attr,
1867 char *buf)
1868{
593a9e7b 1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1870
1871 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1872}
1873
1874static ssize_t rbd_image_refresh(struct device *dev,
1875 struct device_attribute *attr,
1876 const char *buf,
1877 size_t size)
1878{
593a9e7b 1879 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1880 int rc;
1881 int ret = size;
602adf40
YS
1882
1883 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1884
263c6ca0 1885 rc = __rbd_refresh_header(rbd_dev);
dfc5606d
YS
1886 if (rc < 0)
1887 ret = rc;
602adf40 1888
dfc5606d
YS
1889 mutex_unlock(&ctl_mutex);
1890 return ret;
1891}
602adf40 1892
dfc5606d
YS
1893static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1894static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1895static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1896static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1897static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1898static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1899static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1900static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1901static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1902
1903static struct attribute *rbd_attrs[] = {
1904 &dev_attr_size.attr,
1905 &dev_attr_major.attr,
1906 &dev_attr_client_id.attr,
1907 &dev_attr_pool.attr,
9bb2f334 1908 &dev_attr_pool_id.attr,
dfc5606d
YS
1909 &dev_attr_name.attr,
1910 &dev_attr_current_snap.attr,
1911 &dev_attr_refresh.attr,
1912 &dev_attr_create_snap.attr,
dfc5606d
YS
1913 NULL
1914};
1915
1916static struct attribute_group rbd_attr_group = {
1917 .attrs = rbd_attrs,
1918};
1919
1920static const struct attribute_group *rbd_attr_groups[] = {
1921 &rbd_attr_group,
1922 NULL
1923};
1924
1925static void rbd_sysfs_dev_release(struct device *dev)
1926{
1927}
1928
1929static struct device_type rbd_device_type = {
1930 .name = "rbd",
1931 .groups = rbd_attr_groups,
1932 .release = rbd_sysfs_dev_release,
1933};
1934
1935
1936/*
1937 sysfs - snapshots
1938*/
1939
1940static ssize_t rbd_snap_size_show(struct device *dev,
1941 struct device_attribute *attr,
1942 char *buf)
1943{
1944 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1945
3591538f 1946 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1947}
1948
1949static ssize_t rbd_snap_id_show(struct device *dev,
1950 struct device_attribute *attr,
1951 char *buf)
1952{
1953 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1954
3591538f 1955 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
1956}
1957
1958static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1959static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1960
1961static struct attribute *rbd_snap_attrs[] = {
1962 &dev_attr_snap_size.attr,
1963 &dev_attr_snap_id.attr,
1964 NULL,
1965};
1966
1967static struct attribute_group rbd_snap_attr_group = {
1968 .attrs = rbd_snap_attrs,
1969};
1970
1971static void rbd_snap_dev_release(struct device *dev)
1972{
1973 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1974 kfree(snap->name);
1975 kfree(snap);
1976}
1977
1978static const struct attribute_group *rbd_snap_attr_groups[] = {
1979 &rbd_snap_attr_group,
1980 NULL
1981};
1982
1983static struct device_type rbd_snap_device_type = {
1984 .groups = rbd_snap_attr_groups,
1985 .release = rbd_snap_dev_release,
1986};
1987
1988static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1989 struct rbd_snap *snap)
1990{
1991 list_del(&snap->node);
1992 device_unregister(&snap->dev);
1993}
1994
1995static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1996 struct rbd_snap *snap,
1997 struct device *parent)
1998{
1999 struct device *dev = &snap->dev;
2000 int ret;
2001
2002 dev->type = &rbd_snap_device_type;
2003 dev->parent = parent;
2004 dev->release = rbd_snap_dev_release;
2005 dev_set_name(dev, "snap_%s", snap->name);
2006 ret = device_register(dev);
2007
2008 return ret;
2009}
2010
2011static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2012 int i, const char *name,
2013 struct rbd_snap **snapp)
2014{
2015 int ret;
2016 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2017 if (!snap)
2018 return -ENOMEM;
2019 snap->name = kstrdup(name, GFP_KERNEL);
2020 snap->size = rbd_dev->header.snap_sizes[i];
2021 snap->id = rbd_dev->header.snapc->snaps[i];
2022 if (device_is_registered(&rbd_dev->dev)) {
2023 ret = rbd_register_snap_dev(rbd_dev, snap,
2024 &rbd_dev->dev);
2025 if (ret < 0)
2026 goto err;
2027 }
2028 *snapp = snap;
2029 return 0;
2030err:
2031 kfree(snap->name);
2032 kfree(snap);
2033 return ret;
2034}
2035
2036/*
2037 * search for the previous snap in a null delimited string list
2038 */
2039const char *rbd_prev_snap_name(const char *name, const char *start)
2040{
2041 if (name < start + 2)
2042 return NULL;
2043
2044 name -= 2;
2045 while (*name) {
2046 if (name == start)
2047 return start;
2048 name--;
2049 }
2050 return name + 1;
2051}
2052
2053/*
2054 * compare the old list of snapshots that we have to what's in the header
2055 * and update it accordingly. Note that the header holds the snapshots
2056 * in a reverse order (from newest to oldest) and we need to go from
2057 * older to new so that we don't get a duplicate snap name when
2058 * doing the process (e.g., removed snapshot and recreated a new
2059 * one with the same name.
2060 */
2061static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2062{
2063 const char *name, *first_name;
2064 int i = rbd_dev->header.total_snaps;
2065 struct rbd_snap *snap, *old_snap = NULL;
2066 int ret;
2067 struct list_head *p, *n;
2068
2069 first_name = rbd_dev->header.snap_names;
2070 name = first_name + rbd_dev->header.snap_names_len;
2071
2072 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2073 u64 cur_id;
2074
2075 old_snap = list_entry(p, struct rbd_snap, node);
2076
2077 if (i)
2078 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2079
2080 if (!i || old_snap->id < cur_id) {
2081 /* old_snap->id was skipped, thus was removed */
2082 __rbd_remove_snap_dev(rbd_dev, old_snap);
2083 continue;
2084 }
2085 if (old_snap->id == cur_id) {
2086 /* we have this snapshot already */
2087 i--;
2088 name = rbd_prev_snap_name(name, first_name);
2089 continue;
2090 }
2091 for (; i > 0;
2092 i--, name = rbd_prev_snap_name(name, first_name)) {
2093 if (!name) {
2094 WARN_ON(1);
2095 return -EINVAL;
2096 }
2097 cur_id = rbd_dev->header.snapc->snaps[i];
2098 /* snapshot removal? handle it above */
2099 if (cur_id >= old_snap->id)
2100 break;
2101 /* a new snapshot */
2102 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2103 if (ret < 0)
2104 return ret;
2105
2106 /* note that we add it backward so using n and not p */
2107 list_add(&snap->node, n);
2108 p = &snap->node;
2109 }
2110 }
2111 /* we're done going over the old snap list, just add what's left */
2112 for (; i > 0; i--) {
2113 name = rbd_prev_snap_name(name, first_name);
2114 if (!name) {
2115 WARN_ON(1);
2116 return -EINVAL;
2117 }
2118 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2119 if (ret < 0)
2120 return ret;
2121 list_add(&snap->node, &rbd_dev->snaps);
2122 }
2123
2124 return 0;
2125}
2126
dfc5606d
YS
2127static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2128{
f0f8cef5 2129 int ret;
dfc5606d
YS
2130 struct device *dev;
2131 struct rbd_snap *snap;
2132
2133 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2134 dev = &rbd_dev->dev;
2135
2136 dev->bus = &rbd_bus_type;
2137 dev->type = &rbd_device_type;
2138 dev->parent = &rbd_root_dev;
2139 dev->release = rbd_dev_release;
2140 dev_set_name(dev, "%d", rbd_dev->id);
2141 ret = device_register(dev);
2142 if (ret < 0)
f0f8cef5 2143 goto out;
dfc5606d
YS
2144
2145 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2146 ret = rbd_register_snap_dev(rbd_dev, snap,
2147 &rbd_dev->dev);
2148 if (ret < 0)
602adf40
YS
2149 break;
2150 }
f0f8cef5 2151out:
dfc5606d
YS
2152 mutex_unlock(&ctl_mutex);
2153 return ret;
602adf40
YS
2154}
2155
dfc5606d
YS
2156static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2157{
2158 device_unregister(&rbd_dev->dev);
2159}
2160
59c2be1e
YS
2161static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2162{
2163 int ret, rc;
2164
2165 do {
2166 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2167 rbd_dev->header.obj_version);
2168 if (ret == -ERANGE) {
2169 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 2170 rc = __rbd_refresh_header(rbd_dev);
59c2be1e
YS
2171 mutex_unlock(&ctl_mutex);
2172 if (rc < 0)
2173 return rc;
2174 }
2175 } while (ret == -ERANGE);
2176
2177 return ret;
2178}
2179
1ddbe94e
AE
2180static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2181
2182/*
499afd5b
AE
2183 * Get a unique rbd identifier for the given new rbd_dev, and add
2184 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2185 */
499afd5b 2186static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2187{
499afd5b
AE
2188 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2189
2190 spin_lock(&rbd_dev_list_lock);
2191 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2192 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2193}
b7f23c36 2194
1ddbe94e 2195/*
499afd5b
AE
2196 * Remove an rbd_dev from the global list, and record that its
2197 * identifier is no longer in use.
1ddbe94e 2198 */
499afd5b 2199static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2200{
d184f6bf
AE
2201 struct list_head *tmp;
2202 int rbd_id = rbd_dev->id;
2203 int max_id;
2204
2205 BUG_ON(rbd_id < 1);
499afd5b
AE
2206
2207 spin_lock(&rbd_dev_list_lock);
2208 list_del_init(&rbd_dev->node);
d184f6bf
AE
2209
2210 /*
2211 * If the id being "put" is not the current maximum, there
2212 * is nothing special we need to do.
2213 */
2214 if (rbd_id != atomic64_read(&rbd_id_max)) {
2215 spin_unlock(&rbd_dev_list_lock);
2216 return;
2217 }
2218
2219 /*
2220 * We need to update the current maximum id. Search the
2221 * list to find out what it is. We're more likely to find
2222 * the maximum at the end, so search the list backward.
2223 */
2224 max_id = 0;
2225 list_for_each_prev(tmp, &rbd_dev_list) {
2226 struct rbd_device *rbd_dev;
2227
2228 rbd_dev = list_entry(tmp, struct rbd_device, node);
2229 if (rbd_id > max_id)
2230 max_id = rbd_id;
2231 }
499afd5b 2232 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2233
1ddbe94e 2234 /*
d184f6bf
AE
2235 * The max id could have been updated by rbd_id_get(), in
2236 * which case it now accurately reflects the new maximum.
2237 * Be careful not to overwrite the maximum value in that
2238 * case.
1ddbe94e 2239 */
d184f6bf 2240 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2241}
2242
e28fff26
AE
2243/*
2244 * Skips over white space at *buf, and updates *buf to point to the
2245 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2246 * the token (string of non-white space characters) found. Note
2247 * that *buf must be terminated with '\0'.
e28fff26
AE
2248 */
2249static inline size_t next_token(const char **buf)
2250{
2251 /*
2252 * These are the characters that produce nonzero for
2253 * isspace() in the "C" and "POSIX" locales.
2254 */
2255 const char *spaces = " \f\n\r\t\v";
2256
2257 *buf += strspn(*buf, spaces); /* Find start of token */
2258
2259 return strcspn(*buf, spaces); /* Return token length */
2260}
2261
2262/*
2263 * Finds the next token in *buf, and if the provided token buffer is
2264 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2265 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2266 * must be terminated with '\0' on entry.
e28fff26
AE
2267 *
2268 * Returns the length of the token found (not including the '\0').
2269 * Return value will be 0 if no token is found, and it will be >=
2270 * token_size if the token would not fit.
2271 *
593a9e7b 2272 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2273 * found token. Note that this occurs even if the token buffer is
2274 * too small to hold it.
2275 */
2276static inline size_t copy_token(const char **buf,
2277 char *token,
2278 size_t token_size)
2279{
2280 size_t len;
2281
2282 len = next_token(buf);
2283 if (len < token_size) {
2284 memcpy(token, *buf, len);
2285 *(token + len) = '\0';
2286 }
2287 *buf += len;
2288
2289 return len;
2290}
2291
ea3352f4
AE
2292/*
2293 * Finds the next token in *buf, dynamically allocates a buffer big
2294 * enough to hold a copy of it, and copies the token into the new
2295 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2296 * that a duplicate buffer is created even for a zero-length token.
2297 *
2298 * Returns a pointer to the newly-allocated duplicate, or a null
2299 * pointer if memory for the duplicate was not available. If
2300 * the lenp argument is a non-null pointer, the length of the token
2301 * (not including the '\0') is returned in *lenp.
2302 *
2303 * If successful, the *buf pointer will be updated to point beyond
2304 * the end of the found token.
2305 *
2306 * Note: uses GFP_KERNEL for allocation.
2307 */
2308static inline char *dup_token(const char **buf, size_t *lenp)
2309{
2310 char *dup;
2311 size_t len;
2312
2313 len = next_token(buf);
2314 dup = kmalloc(len + 1, GFP_KERNEL);
2315 if (!dup)
2316 return NULL;
2317
2318 memcpy(dup, *buf, len);
2319 *(dup + len) = '\0';
2320 *buf += len;
2321
2322 if (lenp)
2323 *lenp = len;
2324
2325 return dup;
2326}
2327
a725f65e
AE
2328/*
2329 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2330 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2331 * on the list of monitor addresses and other options provided via
2332 * /sys/bus/rbd/add.
d22f76e7
AE
2333 *
2334 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2335 */
2336static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2337 const char *buf,
7ef3214a 2338 const char **mon_addrs,
5214ecc4 2339 size_t *mon_addrs_size,
e28fff26
AE
2340 char *options,
2341 size_t options_size)
2342{
d22f76e7
AE
2343 size_t len;
2344 int ret;
e28fff26
AE
2345
2346 /* The first four tokens are required */
2347
7ef3214a
AE
2348 len = next_token(&buf);
2349 if (!len)
a725f65e 2350 return -EINVAL;
5214ecc4 2351 *mon_addrs_size = len + 1;
7ef3214a
AE
2352 *mon_addrs = buf;
2353
2354 buf += len;
a725f65e 2355
e28fff26
AE
2356 len = copy_token(&buf, options, options_size);
2357 if (!len || len >= options_size)
2358 return -EINVAL;
2359
d22f76e7
AE
2360 rbd_dev->pool_name = dup_token(&buf, NULL);
2361 if (!rbd_dev->pool_name)
2362 return -ENOMEM;
e28fff26 2363
d22f76e7 2364 ret = -EINVAL;
e28fff26
AE
2365 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2366 if (!len || len >= sizeof (rbd_dev->obj))
d22f76e7 2367 goto out_err;
e28fff26
AE
2368
2369 /* We have the object length in hand, save it. */
2370
2371 rbd_dev->obj_len = len;
a725f65e 2372
81a89793
AE
2373 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2374 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2375 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
a725f65e 2376
e28fff26
AE
2377 /*
2378 * The snapshot name is optional, but it's an error if it's
2379 * too long. If no snapshot is supplied, fill in the default.
2380 */
2381 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2382 if (!len)
2383 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2384 sizeof (RBD_SNAP_HEAD_NAME));
2385 else if (len >= sizeof (rbd_dev->snap_name))
d22f76e7 2386 goto out_err;
e28fff26 2387
a725f65e 2388 return 0;
d22f76e7
AE
2389
2390out_err:
2391 kfree(rbd_dev->pool_name);
2392 rbd_dev->pool_name = NULL;
2393
2394 return ret;
a725f65e
AE
2395}
2396
59c2be1e
YS
2397static ssize_t rbd_add(struct bus_type *bus,
2398 const char *buf,
2399 size_t count)
602adf40 2400{
602adf40 2401 struct rbd_device *rbd_dev;
7ef3214a
AE
2402 const char *mon_addrs = NULL;
2403 size_t mon_addrs_size = 0;
27cc2594
AE
2404 char *options = NULL;
2405 struct ceph_osd_client *osdc;
2406 int rc = -ENOMEM;
602adf40
YS
2407
2408 if (!try_module_get(THIS_MODULE))
2409 return -ENODEV;
2410
27cc2594
AE
2411 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2412 if (!rbd_dev)
2413 goto err_nomem;
60571c7d 2414 options = kmalloc(count, GFP_KERNEL);
602adf40 2415 if (!options)
27cc2594 2416 goto err_nomem;
602adf40
YS
2417
2418 /* static rbd_device initialization */
2419 spin_lock_init(&rbd_dev->lock);
2420 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2421 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2422 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2423
c666601a 2424 init_rwsem(&rbd_dev->header_rwsem);
0e805a1d 2425
d184f6bf 2426 /* generate unique id: find highest unique id, add one */
499afd5b 2427 rbd_id_get(rbd_dev);
602adf40 2428
a725f65e 2429 /* Fill in the device name, now that we have its id. */
81a89793
AE
2430 BUILD_BUG_ON(DEV_NAME_LEN
2431 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2432 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2433
602adf40 2434 /* parse add command */
7ef3214a 2435 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2436 options, count);
a725f65e 2437 if (rc)
f0f8cef5 2438 goto err_put_id;
e124a82f 2439
5214ecc4
AE
2440 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2441 options);
d720bcb0
AE
2442 if (IS_ERR(rbd_dev->rbd_client)) {
2443 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2444 goto err_put_id;
d720bcb0 2445 }
602adf40 2446
602adf40 2447 /* pick the pool */
1dbb4399 2448 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2449 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2450 if (rc < 0)
2451 goto err_out_client;
9bb2f334 2452 rbd_dev->pool_id = rc;
602adf40
YS
2453
2454 /* register our block device */
27cc2594
AE
2455 rc = register_blkdev(0, rbd_dev->name);
2456 if (rc < 0)
602adf40 2457 goto err_out_client;
27cc2594 2458 rbd_dev->major = rc;
602adf40 2459
dfc5606d
YS
2460 rc = rbd_bus_add_dev(rbd_dev);
2461 if (rc)
766fc439
YS
2462 goto err_out_blkdev;
2463
32eec68d
AE
2464 /*
2465 * At this point cleanup in the event of an error is the job
2466 * of the sysfs code (initiated by rbd_bus_del_dev()).
2467 *
2468 * Set up and announce blkdev mapping.
2469 */
602adf40
YS
2470 rc = rbd_init_disk(rbd_dev);
2471 if (rc)
766fc439 2472 goto err_out_bus;
602adf40 2473
59c2be1e
YS
2474 rc = rbd_init_watch_dev(rbd_dev);
2475 if (rc)
2476 goto err_out_bus;
2477
602adf40
YS
2478 return count;
2479
766fc439 2480err_out_bus:
766fc439
YS
2481 /* this will also clean up rest of rbd_dev stuff */
2482
2483 rbd_bus_del_dev(rbd_dev);
2484 kfree(options);
766fc439
YS
2485 return rc;
2486
602adf40
YS
2487err_out_blkdev:
2488 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2489err_out_client:
2490 rbd_put_client(rbd_dev);
f0f8cef5 2491err_put_id:
d22f76e7 2492 kfree(rbd_dev->pool_name);
499afd5b 2493 rbd_id_put(rbd_dev);
27cc2594 2494err_nomem:
602adf40 2495 kfree(options);
27cc2594
AE
2496 kfree(rbd_dev);
2497
602adf40
YS
2498 dout("Error adding device %s\n", buf);
2499 module_put(THIS_MODULE);
27cc2594
AE
2500
2501 return (ssize_t) rc;
602adf40
YS
2502}
2503
2504static struct rbd_device *__rbd_get_dev(unsigned long id)
2505{
2506 struct list_head *tmp;
2507 struct rbd_device *rbd_dev;
2508
e124a82f 2509 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2510 list_for_each(tmp, &rbd_dev_list) {
2511 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2512 if (rbd_dev->id == id) {
2513 spin_unlock(&rbd_dev_list_lock);
602adf40 2514 return rbd_dev;
e124a82f 2515 }
602adf40 2516 }
e124a82f 2517 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2518 return NULL;
2519}
2520
dfc5606d 2521static void rbd_dev_release(struct device *dev)
602adf40 2522{
593a9e7b 2523 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2524
1dbb4399
AE
2525 if (rbd_dev->watch_request) {
2526 struct ceph_client *client = rbd_dev->rbd_client->client;
2527
2528 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2529 rbd_dev->watch_request);
1dbb4399 2530 }
59c2be1e 2531 if (rbd_dev->watch_event)
79e3057c 2532 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2533
602adf40
YS
2534 rbd_put_client(rbd_dev);
2535
2536 /* clean up and free blkdev */
2537 rbd_free_disk(rbd_dev);
2538 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2539
2540 /* done with the id, and with the rbd_dev */
d22f76e7 2541 kfree(rbd_dev->pool_name);
32eec68d 2542 rbd_id_put(rbd_dev);
602adf40
YS
2543 kfree(rbd_dev);
2544
2545 /* release module ref */
2546 module_put(THIS_MODULE);
602adf40
YS
2547}
2548
dfc5606d
YS
2549static ssize_t rbd_remove(struct bus_type *bus,
2550 const char *buf,
2551 size_t count)
602adf40
YS
2552{
2553 struct rbd_device *rbd_dev = NULL;
2554 int target_id, rc;
2555 unsigned long ul;
2556 int ret = count;
2557
2558 rc = strict_strtoul(buf, 10, &ul);
2559 if (rc)
2560 return rc;
2561
2562 /* convert to int; abort if we lost anything in the conversion */
2563 target_id = (int) ul;
2564 if (target_id != ul)
2565 return -EINVAL;
2566
2567 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2568
2569 rbd_dev = __rbd_get_dev(target_id);
2570 if (!rbd_dev) {
2571 ret = -ENOENT;
2572 goto done;
2573 }
2574
dfc5606d
YS
2575 __rbd_remove_all_snaps(rbd_dev);
2576 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2577
2578done:
2579 mutex_unlock(&ctl_mutex);
2580 return ret;
2581}
2582
dfc5606d
YS
2583static ssize_t rbd_snap_add(struct device *dev,
2584 struct device_attribute *attr,
2585 const char *buf,
2586 size_t count)
602adf40 2587{
593a9e7b 2588 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2589 int ret;
2590 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2591 if (!name)
2592 return -ENOMEM;
2593
dfc5606d 2594 snprintf(name, count, "%s", buf);
602adf40
YS
2595
2596 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2597
602adf40
YS
2598 ret = rbd_header_add_snap(rbd_dev,
2599 name, GFP_KERNEL);
2600 if (ret < 0)
59c2be1e 2601 goto err_unlock;
602adf40 2602
263c6ca0 2603 ret = __rbd_refresh_header(rbd_dev);
602adf40 2604 if (ret < 0)
59c2be1e
YS
2605 goto err_unlock;
2606
2607 /* shouldn't hold ctl_mutex when notifying.. notify might
2608 trigger a watch callback that would need to get that mutex */
2609 mutex_unlock(&ctl_mutex);
2610
2611 /* make a best effort, don't error if failed */
2612 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2613
2614 ret = count;
59c2be1e
YS
2615 kfree(name);
2616 return ret;
2617
2618err_unlock:
602adf40 2619 mutex_unlock(&ctl_mutex);
602adf40
YS
2620 kfree(name);
2621 return ret;
2622}
2623
602adf40
YS
2624/*
2625 * create control files in sysfs
dfc5606d 2626 * /sys/bus/rbd/...
602adf40
YS
2627 */
2628static int rbd_sysfs_init(void)
2629{
dfc5606d 2630 int ret;
602adf40 2631
fed4c143 2632 ret = device_register(&rbd_root_dev);
21079786 2633 if (ret < 0)
dfc5606d 2634 return ret;
602adf40 2635
fed4c143
AE
2636 ret = bus_register(&rbd_bus_type);
2637 if (ret < 0)
2638 device_unregister(&rbd_root_dev);
602adf40 2639
602adf40
YS
2640 return ret;
2641}
2642
2643static void rbd_sysfs_cleanup(void)
2644{
dfc5606d 2645 bus_unregister(&rbd_bus_type);
fed4c143 2646 device_unregister(&rbd_root_dev);
602adf40
YS
2647}
2648
2649int __init rbd_init(void)
2650{
2651 int rc;
2652
2653 rc = rbd_sysfs_init();
2654 if (rc)
2655 return rc;
f0f8cef5 2656 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2657 return 0;
2658}
2659
2660void __exit rbd_exit(void)
2661{
2662 rbd_sysfs_cleanup();
2663}
2664
2665module_init(rbd_init);
2666module_exit(rbd_exit);
2667
2668MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2669MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2670MODULE_DESCRIPTION("rados block device");
2671
2672/* following authorship retained from original osdblk.c */
2673MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2674
2675MODULE_LICENSE("GPL");
This page took 0.313515 seconds and 5 git commands to generate.