rbd: kill rbd_dev_clear_mapping()
[deliverable/linux.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
f0f8cef5
AE
58#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
60
61#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
62
d4b125e9
AE
63#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64#define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
35d489f9 67#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
68
69#define RBD_SNAP_HEAD_NAME "-"
70
9682fc6d
AE
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
9e15b77d
AE
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 75#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 76
1e130199 77#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 78
d889140c
AE
79/* Feature bits */
80
5cbf6f12
AE
81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
85
86/* Features supported by this (client software) implementation. */
87
770eba6e 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 89
81a89793
AE
90/*
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
95 */
602adf40 96#define DEV_NAME_LEN 32
81a89793 97#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
98
99/*
100 * block device image metadata (in-memory version)
101 */
102struct rbd_image_header {
f84344f3 103 /* These four fields never change for a given rbd image */
849b4260 104 char *object_prefix;
34b13184 105 u64 features;
602adf40
YS
106 __u8 obj_order;
107 __u8 crypt_type;
108 __u8 comp_type;
602adf40 109
f84344f3
AE
110 /* The remaining fields need to be updated occasionally */
111 u64 image_size;
112 struct ceph_snap_context *snapc;
602adf40
YS
113 char *snap_names;
114 u64 *snap_sizes;
59c2be1e 115
500d0c0f
AE
116 u64 stripe_unit;
117 u64 stripe_count;
59c2be1e
YS
118};
119
0d7dbfce
AE
120/*
121 * An rbd image specification.
122 *
123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
124 * identify an image. Each rbd_dev structure includes a pointer to
125 * an rbd_spec structure that encapsulates this identity.
126 *
127 * Each of the id's in an rbd_spec has an associated name. For a
128 * user-mapped image, the names are supplied and the id's associated
129 * with them are looked up. For a layered image, a parent image is
130 * defined by the tuple, and the names are looked up.
131 *
132 * An rbd_dev structure contains a parent_spec pointer which is
133 * non-null if the image it represents is a child in a layered
134 * image. This pointer will refer to the rbd_spec structure used
135 * by the parent rbd_dev for its own identity (i.e., the structure
136 * is shared between the parent and child).
137 *
138 * Since these structures are populated once, during the discovery
139 * phase of image construction, they are effectively immutable so
140 * we make no effort to synchronize access to them.
141 *
142 * Note that code herein does not assume the image name is known (it
143 * could be a null pointer).
0d7dbfce
AE
144 */
145struct rbd_spec {
146 u64 pool_id;
ecb4dc22 147 const char *pool_name;
0d7dbfce 148
ecb4dc22
AE
149 const char *image_id;
150 const char *image_name;
0d7dbfce
AE
151
152 u64 snap_id;
ecb4dc22 153 const char *snap_name;
0d7dbfce
AE
154
155 struct kref kref;
156};
157
602adf40 158/*
f0f8cef5 159 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
160 */
161struct rbd_client {
162 struct ceph_client *client;
163 struct kref kref;
164 struct list_head node;
165};
166
bf0d5f50
AE
167struct rbd_img_request;
168typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171
172struct rbd_obj_request;
173typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
9969ebc5
AE
175enum obj_request_type {
176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177};
bf0d5f50 178
926f9b3f
AE
179enum obj_req_flags {
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
184};
185
bf0d5f50
AE
186struct rbd_obj_request {
187 const char *object_name;
188 u64 offset; /* object start byte */
189 u64 length; /* bytes from offset */
926f9b3f 190 unsigned long flags;
bf0d5f50 191
c5b5ef6c
AE
192 /*
193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
195 *
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
198 *
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
202 *
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
207 */
208 union {
209 struct rbd_obj_request *obj_request; /* STAT op */
210 struct {
211 struct rbd_img_request *img_request;
212 u64 img_offset;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
215 };
216 };
bf0d5f50
AE
217 u32 which; /* posn image request list */
218
219 enum obj_request_type type;
788e2df3
AE
220 union {
221 struct bio *bio_list;
222 struct {
223 struct page **pages;
224 u32 page_count;
225 };
226 };
0eefd470 227 struct page **copyup_pages;
bf0d5f50
AE
228
229 struct ceph_osd_request *osd_req;
230
231 u64 xferred; /* bytes transferred */
1b83bef2 232 int result;
bf0d5f50
AE
233
234 rbd_obj_callback_t callback;
788e2df3 235 struct completion completion;
bf0d5f50
AE
236
237 struct kref kref;
238};
239
0c425248 240enum img_req_flags {
9849e986
AE
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
244};
245
bf0d5f50 246struct rbd_img_request {
bf0d5f50
AE
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
0c425248 250 unsigned long flags;
bf0d5f50 251 union {
9849e986 252 u64 snap_id; /* for reads */
bf0d5f50 253 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
254 };
255 union {
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 258 };
3d7efd18 259 struct page **copyup_pages;
bf0d5f50
AE
260 spinlock_t completion_lock;/* protects next_completion */
261 u32 next_completion;
262 rbd_img_callback_t callback;
55f27e09 263 u64 xferred;/* aggregate bytes transferred */
a5a337d4 264 int result; /* first nonzero obj_request result */
bf0d5f50
AE
265
266 u32 obj_request_count;
267 struct list_head obj_requests; /* rbd_obj_request structs */
268
269 struct kref kref;
270};
271
272#define for_each_obj_request(ireq, oreq) \
ef06f4d3 273 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 274#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 276#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 278
f84344f3 279struct rbd_mapping {
99c1f08f 280 u64 size;
34b13184 281 u64 features;
f84344f3
AE
282 bool read_only;
283};
284
602adf40
YS
285/*
286 * a single device
287 */
288struct rbd_device {
de71a297 289 int dev_id; /* blkdev unique id */
602adf40
YS
290
291 int major; /* blkdev assigned major */
292 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 293
a30b71b9 294 u32 image_format; /* Either 1 or 2 */
602adf40
YS
295 struct rbd_client *rbd_client;
296
297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
b82d167b 299 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
300
301 struct rbd_image_header header;
b82d167b 302 unsigned long flags; /* possibly lock protected */
0d7dbfce 303 struct rbd_spec *spec;
602adf40 304
0d7dbfce 305 char *header_name;
971f839a 306
0903e875
AE
307 struct ceph_file_layout layout;
308
59c2be1e 309 struct ceph_osd_event *watch_event;
975241af 310 struct rbd_obj_request *watch_request;
59c2be1e 311
86b00e0d
AE
312 struct rbd_spec *parent_spec;
313 u64 parent_overlap;
2f82ee54 314 struct rbd_device *parent;
86b00e0d 315
c666601a
JD
316 /* protects updating the header */
317 struct rw_semaphore header_rwsem;
f84344f3
AE
318
319 struct rbd_mapping mapping;
602adf40
YS
320
321 struct list_head node;
dfc5606d 322
dfc5606d
YS
323 /* sysfs related */
324 struct device dev;
b82d167b 325 unsigned long open_count; /* protected by lock */
dfc5606d
YS
326};
327
b82d167b
AE
328/*
329 * Flag bits for rbd_dev->flags. If atomicity is required,
330 * rbd_dev->lock is used to protect access.
331 *
332 * Currently, only the "removing" flag (which is coupled with the
333 * "open_count" field) requires atomic access.
334 */
6d292906
AE
335enum rbd_dev_flags {
336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
338};
339
602adf40 340static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 341
602adf40 342static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
343static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
432b8587
AE
345static LIST_HEAD(rbd_client_list); /* clients */
346static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 347
78c2a44a
AE
348/* Slab caches for frequently-allocated structures */
349
1c2a9dfe 350static struct kmem_cache *rbd_img_request_cache;
868311b1 351static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 352static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 353
3d7efd18
AE
354static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
200a6a8b 356static void rbd_dev_device_release(struct device *dev);
dfc5606d 357
f0f8cef5
AE
358static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 size_t count);
360static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 size_t count);
71f293e2 362static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
363
364static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add),
366 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
367 __ATTR_NULL
368};
369
370static struct bus_type rbd_bus_type = {
371 .name = "rbd",
372 .bus_attrs = rbd_bus_attrs,
373};
374
375static void rbd_root_dev_release(struct device *dev)
376{
377}
378
379static struct device rbd_root_dev = {
380 .init_name = "rbd",
381 .release = rbd_root_dev_release,
382};
383
06ecc6cb
AE
384static __printf(2, 3)
385void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386{
387 struct va_format vaf;
388 va_list args;
389
390 va_start(args, fmt);
391 vaf.fmt = fmt;
392 vaf.va = &args;
393
394 if (!rbd_dev)
395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 else if (rbd_dev->disk)
397 printk(KERN_WARNING "%s: %s: %pV\n",
398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 printk(KERN_WARNING "%s: image %s: %pV\n",
401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 printk(KERN_WARNING "%s: id %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 else /* punt */
406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 RBD_DRV_NAME, rbd_dev, &vaf);
408 va_end(args);
409}
410
aafb230e
AE
411#ifdef RBD_DEBUG
412#define rbd_assert(expr) \
413 if (unlikely(!(expr))) { \
414 printk(KERN_ERR "\nAssertion failure in %s() " \
415 "at line %d:\n\n" \
416 "\trbd_assert(%s);\n\n", \
417 __func__, __LINE__, #expr); \
418 BUG(); \
419 }
420#else /* !RBD_DEBUG */
421# define rbd_assert(expr) ((void) 0)
422#endif /* !RBD_DEBUG */
dfc5606d 423
b454e36d 424static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
425static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 427
cc4a38bd
AE
428static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
54cac61f
AE
430static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 u64 snap_id);
2ad3d716
AE
432static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 u64 *snap_features);
436static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 437
602adf40
YS
438static int rbd_open(struct block_device *bdev, fmode_t mode)
439{
f0f8cef5 440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 441 bool removing = false;
602adf40 442
f84344f3 443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
444 return -EROFS;
445
a14ea269 446 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448 removing = true;
449 else
450 rbd_dev->open_count++;
a14ea269 451 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
452 if (removing)
453 return -ENOENT;
454
42382b70 455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 456 (void) get_device(&rbd_dev->dev);
f84344f3 457 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 458 mutex_unlock(&ctl_mutex);
340c7a2b 459
602adf40
YS
460 return 0;
461}
462
dfc5606d
YS
463static int rbd_release(struct gendisk *disk, fmode_t mode)
464{
465 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
466 unsigned long open_count_before;
467
a14ea269 468 spin_lock_irq(&rbd_dev->lock);
b82d167b 469 open_count_before = rbd_dev->open_count--;
a14ea269 470 spin_unlock_irq(&rbd_dev->lock);
b82d167b 471 rbd_assert(open_count_before > 0);
dfc5606d 472
42382b70 473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 474 put_device(&rbd_dev->dev);
42382b70 475 mutex_unlock(&ctl_mutex);
dfc5606d
YS
476
477 return 0;
478}
479
602adf40
YS
480static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
482 .open = rbd_open,
dfc5606d 483 .release = rbd_release,
602adf40
YS
484};
485
486/*
487 * Initialize an rbd client instance.
43ae4701 488 * We own *ceph_opts.
602adf40 489 */
f8c38929 490static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
491{
492 struct rbd_client *rbdc;
493 int ret = -ENOMEM;
494
37206ee5 495 dout("%s:\n", __func__);
602adf40
YS
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497 if (!rbdc)
498 goto out_opt;
499
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
502
bc534d86
AE
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
43ae4701 505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 506 if (IS_ERR(rbdc->client))
bc534d86 507 goto out_mutex;
43ae4701 508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
509
510 ret = ceph_open_session(rbdc->client);
511 if (ret < 0)
512 goto out_err;
513
432b8587 514 spin_lock(&rbd_client_list_lock);
602adf40 515 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 516 spin_unlock(&rbd_client_list_lock);
602adf40 517
bc534d86 518 mutex_unlock(&ctl_mutex);
37206ee5 519 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 520
602adf40
YS
521 return rbdc;
522
523out_err:
524 ceph_destroy_client(rbdc->client);
bc534d86
AE
525out_mutex:
526 mutex_unlock(&ctl_mutex);
602adf40
YS
527 kfree(rbdc);
528out_opt:
43ae4701
AE
529 if (ceph_opts)
530 ceph_destroy_options(ceph_opts);
37206ee5
AE
531 dout("%s: error %d\n", __func__, ret);
532
28f259b7 533 return ERR_PTR(ret);
602adf40
YS
534}
535
2f82ee54
AE
536static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537{
538 kref_get(&rbdc->kref);
539
540 return rbdc;
541}
542
602adf40 543/*
1f7ba331
AE
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
602adf40 546 */
1f7ba331 547static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
548{
549 struct rbd_client *client_node;
1f7ba331 550 bool found = false;
602adf40 551
43ae4701 552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
553 return NULL;
554
1f7ba331
AE
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
558 __rbd_get_client(client_node);
559
1f7ba331
AE
560 found = true;
561 break;
562 }
563 }
564 spin_unlock(&rbd_client_list_lock);
565
566 return found ? client_node : NULL;
602adf40
YS
567}
568
59c2be1e
YS
569/*
570 * mount options
571 */
572enum {
59c2be1e
YS
573 Opt_last_int,
574 /* int args above */
575 Opt_last_string,
576 /* string args above */
cc0538b6
AE
577 Opt_read_only,
578 Opt_read_write,
579 /* Boolean args above */
580 Opt_last_bool,
59c2be1e
YS
581};
582
43ae4701 583static match_table_t rbd_opts_tokens = {
59c2be1e
YS
584 /* int args above */
585 /* string args above */
be466c1c 586 {Opt_read_only, "read_only"},
cc0538b6
AE
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
59c2be1e
YS
591 {-1, NULL}
592};
593
98571b5a
AE
594struct rbd_options {
595 bool read_only;
596};
597
598#define RBD_READ_ONLY_DEFAULT false
599
59c2be1e
YS
600static int parse_rbd_opts_token(char *c, void *private)
601{
43ae4701 602 struct rbd_options *rbd_opts = private;
59c2be1e
YS
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
605
43ae4701 606 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
607 if (token < 0)
608 return -EINVAL;
609
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
612 if (ret < 0) {
613 pr_err("bad mount option arg (not int) "
614 "at '%s'\n", c);
615 return ret;
616 }
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
620 argstr[0].from);
cc0538b6
AE
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
59c2be1e
YS
623 } else {
624 dout("got token %d\n", token);
625 }
626
627 switch (token) {
cc0538b6
AE
628 case Opt_read_only:
629 rbd_opts->read_only = true;
630 break;
631 case Opt_read_write:
632 rbd_opts->read_only = false;
633 break;
59c2be1e 634 default:
aafb230e
AE
635 rbd_assert(false);
636 break;
59c2be1e
YS
637 }
638 return 0;
639}
640
602adf40
YS
641/*
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
644 */
9d3997fd 645static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 646{
f8c38929 647 struct rbd_client *rbdc;
59c2be1e 648
1f7ba331 649 rbdc = rbd_client_find(ceph_opts);
9d3997fd 650 if (rbdc) /* using an existing client */
43ae4701 651 ceph_destroy_options(ceph_opts);
9d3997fd 652 else
f8c38929 653 rbdc = rbd_client_create(ceph_opts);
602adf40 654
9d3997fd 655 return rbdc;
602adf40
YS
656}
657
658/*
659 * Destroy ceph client
d23a4b3f 660 *
432b8587 661 * Caller must hold rbd_client_list_lock.
602adf40
YS
662 */
663static void rbd_client_release(struct kref *kref)
664{
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
37206ee5 667 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 668 spin_lock(&rbd_client_list_lock);
602adf40 669 list_del(&rbdc->node);
cd9d9f5d 670 spin_unlock(&rbd_client_list_lock);
602adf40
YS
671
672 ceph_destroy_client(rbdc->client);
673 kfree(rbdc);
674}
675
676/*
677 * Drop reference to ceph client node. If it's not referenced anymore, release
678 * it.
679 */
9d3997fd 680static void rbd_put_client(struct rbd_client *rbdc)
602adf40 681{
c53d5893
AE
682 if (rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
684}
685
a30b71b9
AE
686static bool rbd_image_format_valid(u32 image_format)
687{
688 return image_format == 1 || image_format == 2;
689}
690
8e94af8e
AE
691static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692{
103a150f
AE
693 size_t size;
694 u32 snap_count;
695
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698 return false;
699
db2388b6
AE
700 /* The bio layer requires at least sector-sized I/O */
701
702 if (ondisk->options.order < SECTOR_SHIFT)
703 return false;
704
705 /* If we use u64 in a few spots we may be able to loosen this */
706
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
708 return false;
709
103a150f
AE
710 /*
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
713 */
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
717 return false;
718
719 /*
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
722 */
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725 return false;
726
727 return true;
8e94af8e
AE
728}
729
602adf40
YS
730/*
731 * Create a new header structure, translate header format from the on-disk
732 * header.
733 */
734static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 735 struct rbd_image_header_ondisk *ondisk)
602adf40 736{
ccece235 737 u32 snap_count;
58c17b0e 738 size_t len;
d2bb24e5 739 size_t size;
621901d6 740 u32 i;
602adf40 741
6a52325f
AE
742 memset(header, 0, sizeof (*header));
743
103a150f
AE
744 snap_count = le32_to_cpu(ondisk->snap_count);
745
58c17b0e
AE
746 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 748 if (!header->object_prefix)
602adf40 749 return -ENOMEM;
58c17b0e
AE
750 memcpy(header->object_prefix, ondisk->object_prefix, len);
751 header->object_prefix[len] = '\0';
00f1f36f 752
602adf40 753 if (snap_count) {
f785cc1d
AE
754 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
755
621901d6
AE
756 /* Save a copy of the snapshot names */
757
f785cc1d
AE
758 if (snap_names_len > (u64) SIZE_MAX)
759 return -EIO;
760 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 761 if (!header->snap_names)
6a52325f 762 goto out_err;
f785cc1d
AE
763 /*
764 * Note that rbd_dev_v1_header_read() guarantees
765 * the ondisk buffer we're working with has
766 * snap_names_len bytes beyond the end of the
767 * snapshot id array, this memcpy() is safe.
768 */
769 memcpy(header->snap_names, &ondisk->snaps[snap_count],
770 snap_names_len);
6a52325f 771
621901d6
AE
772 /* Record each snapshot's size */
773
d2bb24e5
AE
774 size = snap_count * sizeof (*header->snap_sizes);
775 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 776 if (!header->snap_sizes)
6a52325f 777 goto out_err;
621901d6
AE
778 for (i = 0; i < snap_count; i++)
779 header->snap_sizes[i] =
780 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
781 } else {
782 header->snap_names = NULL;
783 header->snap_sizes = NULL;
784 }
849b4260 785
34b13184 786 header->features = 0; /* No features support in v1 images */
602adf40
YS
787 header->obj_order = ondisk->options.order;
788 header->crypt_type = ondisk->options.crypt_type;
789 header->comp_type = ondisk->options.comp_type;
6a52325f 790
621901d6
AE
791 /* Allocate and fill in the snapshot context */
792
f84344f3 793 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 794
812164f8 795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
796 if (!header->snapc)
797 goto out_err;
505cbb9b 798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 799 for (i = 0; i < snap_count; i++)
468521c1 800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
801
802 return 0;
803
6a52325f 804out_err:
849b4260 805 kfree(header->snap_sizes);
ccece235 806 header->snap_sizes = NULL;
602adf40 807 kfree(header->snap_names);
ccece235 808 header->snap_names = NULL;
6a52325f
AE
809 kfree(header->object_prefix);
810 header->object_prefix = NULL;
ccece235 811
00f1f36f 812 return -ENOMEM;
602adf40
YS
813}
814
9682fc6d
AE
815static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
816{
817 const char *snap_name;
818
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
820
821 /* Skip over names until we find the one we are looking for */
822
823 snap_name = rbd_dev->header.snap_names;
824 while (which--)
825 snap_name += strlen(snap_name) + 1;
826
827 return kstrdup(snap_name, GFP_KERNEL);
828}
829
30d1cff8
AE
830/*
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
833 */
834static int snapid_compare_reverse(const void *s1, const void *s2)
835{
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
838
839 if (snap_id1 < snap_id2)
840 return 1;
841 return snap_id1 == snap_id2 ? 0 : -1;
842}
843
844/*
845 * Search a snapshot context to see if the given snapshot id is
846 * present.
847 *
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
850 *
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
853 */
9682fc6d
AE
854static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
855{
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 857 u64 *found;
9682fc6d 858
30d1cff8
AE
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 861
30d1cff8 862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
863}
864
2ad3d716
AE
865static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
866 u64 snap_id)
9e15b77d 867{
54cac61f 868 u32 which;
9e15b77d 869
54cac61f
AE
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
872 return NULL;
873
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
875}
876
877static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
878{
9e15b77d
AE
879 if (snap_id == CEPH_NOSNAP)
880 return RBD_SNAP_HEAD_NAME;
881
54cac61f
AE
882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883 if (rbd_dev->image_format == 1)
884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 885
54cac61f 886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
887}
888
2ad3d716
AE
889static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
890 u64 *snap_size)
602adf40 891{
2ad3d716
AE
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
896 u32 which;
602adf40 897
2ad3d716
AE
898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
900 return -ENOENT;
e86924a8 901
2ad3d716
AE
902 *snap_size = rbd_dev->header.snap_sizes[which];
903 } else {
904 u64 size = 0;
905 int ret;
906
907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
908 if (ret)
909 return ret;
910
911 *snap_size = size;
912 }
913 return 0;
602adf40
YS
914}
915
2ad3d716
AE
916static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
917 u64 *snap_features)
602adf40 918{
2ad3d716
AE
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
602adf40 924 } else {
2ad3d716
AE
925 u64 features = 0;
926 int ret;
8b0241f8 927
2ad3d716
AE
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
929 if (ret)
930 return ret;
931
932 *snap_features = features;
933 }
934 return 0;
935}
936
937static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
938{
8f4b7d98 939 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
940 u64 size = 0;
941 u64 features = 0;
942 int ret;
943
2ad3d716
AE
944 ret = rbd_snap_size(rbd_dev, snap_id, &size);
945 if (ret)
946 return ret;
947 ret = rbd_snap_features(rbd_dev, snap_id, &features);
948 if (ret)
949 return ret;
950
951 rbd_dev->mapping.size = size;
952 rbd_dev->mapping.features = features;
953
954 /* If we are mapping a snapshot it must be marked read-only */
955
956 if (snap_id != CEPH_NOSNAP)
957 rbd_dev->mapping.read_only = true;
958
8b0241f8 959 return 0;
602adf40
YS
960}
961
d1cf5788
AE
962static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
963{
964 rbd_dev->mapping.size = 0;
965 rbd_dev->mapping.features = 0;
966 rbd_dev->mapping.read_only = true;
967}
968
98571b5a 969static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 970{
65ccfe21
AE
971 char *name;
972 u64 segment;
973 int ret;
602adf40 974
78c2a44a 975 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
976 if (!name)
977 return NULL;
978 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 979 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 980 rbd_dev->header.object_prefix, segment);
2fd82b9e 981 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
982 pr_err("error formatting segment name for #%llu (%d)\n",
983 segment, ret);
984 kfree(name);
985 name = NULL;
986 }
602adf40 987
65ccfe21
AE
988 return name;
989}
602adf40 990
78c2a44a
AE
991static void rbd_segment_name_free(const char *name)
992{
993 /* The explicit cast here is needed to drop the const qualifier */
994
995 kmem_cache_free(rbd_segment_name_cache, (void *)name);
996}
997
65ccfe21
AE
998static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
999{
1000 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1001
65ccfe21
AE
1002 return offset & (segment_size - 1);
1003}
1004
1005static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1006 u64 offset, u64 length)
1007{
1008 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1009
1010 offset &= segment_size - 1;
1011
aafb230e 1012 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1013 if (offset + length > segment_size)
1014 length = segment_size - offset;
1015
1016 return length;
602adf40
YS
1017}
1018
029bcbd8
JD
1019/*
1020 * returns the size of an object in the image
1021 */
1022static u64 rbd_obj_bytes(struct rbd_image_header *header)
1023{
1024 return 1 << header->obj_order;
1025}
1026
602adf40
YS
1027/*
1028 * bio helpers
1029 */
1030
1031static void bio_chain_put(struct bio *chain)
1032{
1033 struct bio *tmp;
1034
1035 while (chain) {
1036 tmp = chain;
1037 chain = chain->bi_next;
1038 bio_put(tmp);
1039 }
1040}
1041
1042/*
1043 * zeros a bio chain, starting at specific offset
1044 */
1045static void zero_bio_chain(struct bio *chain, int start_ofs)
1046{
1047 struct bio_vec *bv;
1048 unsigned long flags;
1049 void *buf;
1050 int i;
1051 int pos = 0;
1052
1053 while (chain) {
1054 bio_for_each_segment(bv, chain, i) {
1055 if (pos + bv->bv_len > start_ofs) {
1056 int remainder = max(start_ofs - pos, 0);
1057 buf = bvec_kmap_irq(bv, &flags);
1058 memset(buf + remainder, 0,
1059 bv->bv_len - remainder);
85b5aaa6 1060 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1061 }
1062 pos += bv->bv_len;
1063 }
1064
1065 chain = chain->bi_next;
1066 }
1067}
1068
b9434c5b
AE
1069/*
1070 * similar to zero_bio_chain(), zeros data defined by a page array,
1071 * starting at the given byte offset from the start of the array and
1072 * continuing up to the given end offset. The pages array is
1073 * assumed to be big enough to hold all bytes up to the end.
1074 */
1075static void zero_pages(struct page **pages, u64 offset, u64 end)
1076{
1077 struct page **page = &pages[offset >> PAGE_SHIFT];
1078
1079 rbd_assert(end > offset);
1080 rbd_assert(end - offset <= (u64)SIZE_MAX);
1081 while (offset < end) {
1082 size_t page_offset;
1083 size_t length;
1084 unsigned long flags;
1085 void *kaddr;
1086
1087 page_offset = (size_t)(offset & ~PAGE_MASK);
1088 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1089 local_irq_save(flags);
1090 kaddr = kmap_atomic(*page);
1091 memset(kaddr + page_offset, 0, length);
1092 kunmap_atomic(kaddr);
1093 local_irq_restore(flags);
1094
1095 offset += length;
1096 page++;
1097 }
1098}
1099
602adf40 1100/*
f7760dad
AE
1101 * Clone a portion of a bio, starting at the given byte offset
1102 * and continuing for the number of bytes indicated.
602adf40 1103 */
f7760dad
AE
1104static struct bio *bio_clone_range(struct bio *bio_src,
1105 unsigned int offset,
1106 unsigned int len,
1107 gfp_t gfpmask)
602adf40 1108{
f7760dad
AE
1109 struct bio_vec *bv;
1110 unsigned int resid;
1111 unsigned short idx;
1112 unsigned int voff;
1113 unsigned short end_idx;
1114 unsigned short vcnt;
1115 struct bio *bio;
1116
1117 /* Handle the easy case for the caller */
1118
1119 if (!offset && len == bio_src->bi_size)
1120 return bio_clone(bio_src, gfpmask);
1121
1122 if (WARN_ON_ONCE(!len))
1123 return NULL;
1124 if (WARN_ON_ONCE(len > bio_src->bi_size))
1125 return NULL;
1126 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1127 return NULL;
1128
1129 /* Find first affected segment... */
1130
1131 resid = offset;
1132 __bio_for_each_segment(bv, bio_src, idx, 0) {
1133 if (resid < bv->bv_len)
1134 break;
1135 resid -= bv->bv_len;
602adf40 1136 }
f7760dad 1137 voff = resid;
602adf40 1138
f7760dad 1139 /* ...and the last affected segment */
602adf40 1140
f7760dad
AE
1141 resid += len;
1142 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1143 if (resid <= bv->bv_len)
1144 break;
1145 resid -= bv->bv_len;
1146 }
1147 vcnt = end_idx - idx + 1;
1148
1149 /* Build the clone */
1150
1151 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1152 if (!bio)
1153 return NULL; /* ENOMEM */
602adf40 1154
f7760dad
AE
1155 bio->bi_bdev = bio_src->bi_bdev;
1156 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1157 bio->bi_rw = bio_src->bi_rw;
1158 bio->bi_flags |= 1 << BIO_CLONED;
1159
1160 /*
1161 * Copy over our part of the bio_vec, then update the first
1162 * and last (or only) entries.
1163 */
1164 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1165 vcnt * sizeof (struct bio_vec));
1166 bio->bi_io_vec[0].bv_offset += voff;
1167 if (vcnt > 1) {
1168 bio->bi_io_vec[0].bv_len -= voff;
1169 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1170 } else {
1171 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1172 }
1173
f7760dad
AE
1174 bio->bi_vcnt = vcnt;
1175 bio->bi_size = len;
1176 bio->bi_idx = 0;
1177
1178 return bio;
1179}
1180
1181/*
1182 * Clone a portion of a bio chain, starting at the given byte offset
1183 * into the first bio in the source chain and continuing for the
1184 * number of bytes indicated. The result is another bio chain of
1185 * exactly the given length, or a null pointer on error.
1186 *
1187 * The bio_src and offset parameters are both in-out. On entry they
1188 * refer to the first source bio and the offset into that bio where
1189 * the start of data to be cloned is located.
1190 *
1191 * On return, bio_src is updated to refer to the bio in the source
1192 * chain that contains first un-cloned byte, and *offset will
1193 * contain the offset of that byte within that bio.
1194 */
1195static struct bio *bio_chain_clone_range(struct bio **bio_src,
1196 unsigned int *offset,
1197 unsigned int len,
1198 gfp_t gfpmask)
1199{
1200 struct bio *bi = *bio_src;
1201 unsigned int off = *offset;
1202 struct bio *chain = NULL;
1203 struct bio **end;
1204
1205 /* Build up a chain of clone bios up to the limit */
1206
1207 if (!bi || off >= bi->bi_size || !len)
1208 return NULL; /* Nothing to clone */
602adf40 1209
f7760dad
AE
1210 end = &chain;
1211 while (len) {
1212 unsigned int bi_size;
1213 struct bio *bio;
1214
f5400b7a
AE
1215 if (!bi) {
1216 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1217 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1218 }
f7760dad
AE
1219 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1220 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1221 if (!bio)
1222 goto out_err; /* ENOMEM */
1223
1224 *end = bio;
1225 end = &bio->bi_next;
602adf40 1226
f7760dad
AE
1227 off += bi_size;
1228 if (off == bi->bi_size) {
1229 bi = bi->bi_next;
1230 off = 0;
1231 }
1232 len -= bi_size;
1233 }
1234 *bio_src = bi;
1235 *offset = off;
1236
1237 return chain;
1238out_err:
1239 bio_chain_put(chain);
602adf40 1240
602adf40
YS
1241 return NULL;
1242}
1243
926f9b3f
AE
1244/*
1245 * The default/initial value for all object request flags is 0. For
1246 * each flag, once its value is set to 1 it is never reset to 0
1247 * again.
1248 */
57acbaa7 1249static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1250{
57acbaa7 1251 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1252 struct rbd_device *rbd_dev;
1253
57acbaa7
AE
1254 rbd_dev = obj_request->img_request->rbd_dev;
1255 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1256 obj_request);
1257 }
1258}
1259
57acbaa7 1260static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1261{
1262 smp_mb();
57acbaa7 1263 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1264}
1265
57acbaa7 1266static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1267{
57acbaa7
AE
1268 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1269 struct rbd_device *rbd_dev = NULL;
6365d33a 1270
57acbaa7
AE
1271 if (obj_request_img_data_test(obj_request))
1272 rbd_dev = obj_request->img_request->rbd_dev;
1273 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1274 obj_request);
1275 }
1276}
1277
57acbaa7 1278static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1279{
1280 smp_mb();
57acbaa7 1281 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1282}
1283
5679c59f
AE
1284/*
1285 * This sets the KNOWN flag after (possibly) setting the EXISTS
1286 * flag. The latter is set based on the "exists" value provided.
1287 *
1288 * Note that for our purposes once an object exists it never goes
1289 * away again. It's possible that the response from two existence
1290 * checks are separated by the creation of the target object, and
1291 * the first ("doesn't exist") response arrives *after* the second
1292 * ("does exist"). In that case we ignore the second one.
1293 */
1294static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1295 bool exists)
1296{
1297 if (exists)
1298 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1299 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1300 smp_mb();
1301}
1302
1303static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1304{
1305 smp_mb();
1306 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1307}
1308
1309static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1310{
1311 smp_mb();
1312 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1313}
1314
bf0d5f50
AE
1315static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1316{
37206ee5
AE
1317 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1318 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1319 kref_get(&obj_request->kref);
1320}
1321
1322static void rbd_obj_request_destroy(struct kref *kref);
1323static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1324{
1325 rbd_assert(obj_request != NULL);
37206ee5
AE
1326 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1327 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1328 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1329}
1330
1331static void rbd_img_request_get(struct rbd_img_request *img_request)
1332{
37206ee5
AE
1333 dout("%s: img %p (was %d)\n", __func__, img_request,
1334 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1335 kref_get(&img_request->kref);
1336}
1337
1338static void rbd_img_request_destroy(struct kref *kref);
1339static void rbd_img_request_put(struct rbd_img_request *img_request)
1340{
1341 rbd_assert(img_request != NULL);
37206ee5
AE
1342 dout("%s: img %p (was %d)\n", __func__, img_request,
1343 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1344 kref_put(&img_request->kref, rbd_img_request_destroy);
1345}
1346
1347static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1348 struct rbd_obj_request *obj_request)
1349{
25dcf954
AE
1350 rbd_assert(obj_request->img_request == NULL);
1351
b155e86c 1352 /* Image request now owns object's original reference */
bf0d5f50 1353 obj_request->img_request = img_request;
25dcf954 1354 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1355 rbd_assert(!obj_request_img_data_test(obj_request));
1356 obj_request_img_data_set(obj_request);
bf0d5f50 1357 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1358 img_request->obj_request_count++;
1359 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1360 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1361 obj_request->which);
bf0d5f50
AE
1362}
1363
1364static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1365 struct rbd_obj_request *obj_request)
1366{
1367 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1368
37206ee5
AE
1369 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1370 obj_request->which);
bf0d5f50 1371 list_del(&obj_request->links);
25dcf954
AE
1372 rbd_assert(img_request->obj_request_count > 0);
1373 img_request->obj_request_count--;
1374 rbd_assert(obj_request->which == img_request->obj_request_count);
1375 obj_request->which = BAD_WHICH;
6365d33a 1376 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1377 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1378 obj_request->img_request = NULL;
25dcf954 1379 obj_request->callback = NULL;
bf0d5f50
AE
1380 rbd_obj_request_put(obj_request);
1381}
1382
1383static bool obj_request_type_valid(enum obj_request_type type)
1384{
1385 switch (type) {
9969ebc5 1386 case OBJ_REQUEST_NODATA:
bf0d5f50 1387 case OBJ_REQUEST_BIO:
788e2df3 1388 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1389 return true;
1390 default:
1391 return false;
1392 }
1393}
1394
bf0d5f50
AE
1395static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1396 struct rbd_obj_request *obj_request)
1397{
37206ee5
AE
1398 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1399
bf0d5f50
AE
1400 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1401}
1402
1403static void rbd_img_request_complete(struct rbd_img_request *img_request)
1404{
55f27e09 1405
37206ee5 1406 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1407
1408 /*
1409 * If no error occurred, compute the aggregate transfer
1410 * count for the image request. We could instead use
1411 * atomic64_cmpxchg() to update it as each object request
1412 * completes; not clear which way is better off hand.
1413 */
1414 if (!img_request->result) {
1415 struct rbd_obj_request *obj_request;
1416 u64 xferred = 0;
1417
1418 for_each_obj_request(img_request, obj_request)
1419 xferred += obj_request->xferred;
1420 img_request->xferred = xferred;
1421 }
1422
bf0d5f50
AE
1423 if (img_request->callback)
1424 img_request->callback(img_request);
1425 else
1426 rbd_img_request_put(img_request);
1427}
1428
788e2df3
AE
1429/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1430
1431static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1432{
37206ee5
AE
1433 dout("%s: obj %p\n", __func__, obj_request);
1434
788e2df3
AE
1435 return wait_for_completion_interruptible(&obj_request->completion);
1436}
1437
0c425248
AE
1438/*
1439 * The default/initial value for all image request flags is 0. Each
1440 * is conditionally set to 1 at image request initialization time
1441 * and currently never change thereafter.
1442 */
1443static void img_request_write_set(struct rbd_img_request *img_request)
1444{
1445 set_bit(IMG_REQ_WRITE, &img_request->flags);
1446 smp_mb();
1447}
1448
1449static bool img_request_write_test(struct rbd_img_request *img_request)
1450{
1451 smp_mb();
1452 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1453}
1454
9849e986
AE
1455static void img_request_child_set(struct rbd_img_request *img_request)
1456{
1457 set_bit(IMG_REQ_CHILD, &img_request->flags);
1458 smp_mb();
1459}
1460
1461static bool img_request_child_test(struct rbd_img_request *img_request)
1462{
1463 smp_mb();
1464 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1465}
1466
d0b2e944
AE
1467static void img_request_layered_set(struct rbd_img_request *img_request)
1468{
1469 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1470 smp_mb();
1471}
1472
1473static bool img_request_layered_test(struct rbd_img_request *img_request)
1474{
1475 smp_mb();
1476 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1477}
1478
6e2a4505
AE
1479static void
1480rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1481{
b9434c5b
AE
1482 u64 xferred = obj_request->xferred;
1483 u64 length = obj_request->length;
1484
6e2a4505
AE
1485 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1486 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1487 xferred, length);
6e2a4505
AE
1488 /*
1489 * ENOENT means a hole in the image. We zero-fill the
1490 * entire length of the request. A short read also implies
1491 * zero-fill to the end of the request. Either way we
1492 * update the xferred count to indicate the whole request
1493 * was satisfied.
1494 */
b9434c5b 1495 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1496 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1497 if (obj_request->type == OBJ_REQUEST_BIO)
1498 zero_bio_chain(obj_request->bio_list, 0);
1499 else
1500 zero_pages(obj_request->pages, 0, length);
6e2a4505 1501 obj_request->result = 0;
b9434c5b
AE
1502 obj_request->xferred = length;
1503 } else if (xferred < length && !obj_request->result) {
1504 if (obj_request->type == OBJ_REQUEST_BIO)
1505 zero_bio_chain(obj_request->bio_list, xferred);
1506 else
1507 zero_pages(obj_request->pages, xferred, length);
1508 obj_request->xferred = length;
6e2a4505
AE
1509 }
1510 obj_request_done_set(obj_request);
1511}
1512
bf0d5f50
AE
1513static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1514{
37206ee5
AE
1515 dout("%s: obj %p cb %p\n", __func__, obj_request,
1516 obj_request->callback);
bf0d5f50
AE
1517 if (obj_request->callback)
1518 obj_request->callback(obj_request);
788e2df3
AE
1519 else
1520 complete_all(&obj_request->completion);
bf0d5f50
AE
1521}
1522
c47f9371 1523static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1524{
1525 dout("%s: obj %p\n", __func__, obj_request);
1526 obj_request_done_set(obj_request);
1527}
1528
c47f9371 1529static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1530{
57acbaa7 1531 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1532 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1533 bool layered = false;
1534
1535 if (obj_request_img_data_test(obj_request)) {
1536 img_request = obj_request->img_request;
1537 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1538 rbd_dev = img_request->rbd_dev;
57acbaa7 1539 }
8b3e1a56
AE
1540
1541 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1542 obj_request, img_request, obj_request->result,
1543 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1544 if (layered && obj_request->result == -ENOENT &&
1545 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1546 rbd_img_parent_read(obj_request);
1547 else if (img_request)
6e2a4505
AE
1548 rbd_img_obj_request_read_callback(obj_request);
1549 else
1550 obj_request_done_set(obj_request);
bf0d5f50
AE
1551}
1552
c47f9371 1553static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1554{
1b83bef2
SW
1555 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1556 obj_request->result, obj_request->length);
1557 /*
8b3e1a56
AE
1558 * There is no such thing as a successful short write. Set
1559 * it to our originally-requested length.
1b83bef2
SW
1560 */
1561 obj_request->xferred = obj_request->length;
07741308 1562 obj_request_done_set(obj_request);
bf0d5f50
AE
1563}
1564
fbfab539
AE
1565/*
1566 * For a simple stat call there's nothing to do. We'll do more if
1567 * this is part of a write sequence for a layered image.
1568 */
c47f9371 1569static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1570{
37206ee5 1571 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1572 obj_request_done_set(obj_request);
1573}
1574
bf0d5f50
AE
1575static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1576 struct ceph_msg *msg)
1577{
1578 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1579 u16 opcode;
1580
37206ee5 1581 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1582 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1583 if (obj_request_img_data_test(obj_request)) {
1584 rbd_assert(obj_request->img_request);
1585 rbd_assert(obj_request->which != BAD_WHICH);
1586 } else {
1587 rbd_assert(obj_request->which == BAD_WHICH);
1588 }
bf0d5f50 1589
1b83bef2
SW
1590 if (osd_req->r_result < 0)
1591 obj_request->result = osd_req->r_result;
bf0d5f50 1592
0eefd470 1593 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1594
c47f9371
AE
1595 /*
1596 * We support a 64-bit length, but ultimately it has to be
1597 * passed to blk_end_request(), which takes an unsigned int.
1598 */
1b83bef2 1599 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1600 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1601 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1602 switch (opcode) {
1603 case CEPH_OSD_OP_READ:
c47f9371 1604 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1605 break;
1606 case CEPH_OSD_OP_WRITE:
c47f9371 1607 rbd_osd_write_callback(obj_request);
bf0d5f50 1608 break;
fbfab539 1609 case CEPH_OSD_OP_STAT:
c47f9371 1610 rbd_osd_stat_callback(obj_request);
fbfab539 1611 break;
36be9a76 1612 case CEPH_OSD_OP_CALL:
b8d70035 1613 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1614 case CEPH_OSD_OP_WATCH:
c47f9371 1615 rbd_osd_trivial_callback(obj_request);
9969ebc5 1616 break;
bf0d5f50
AE
1617 default:
1618 rbd_warn(NULL, "%s: unsupported op %hu\n",
1619 obj_request->object_name, (unsigned short) opcode);
1620 break;
1621 }
1622
07741308 1623 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1624 rbd_obj_request_complete(obj_request);
1625}
1626
9d4df01f 1627static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1628{
1629 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1630 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1631 u64 snap_id;
430c28c3 1632
8c042b0d 1633 rbd_assert(osd_req != NULL);
430c28c3 1634
9d4df01f 1635 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1636 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1637 NULL, snap_id, NULL);
1638}
1639
1640static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1641{
1642 struct rbd_img_request *img_request = obj_request->img_request;
1643 struct ceph_osd_request *osd_req = obj_request->osd_req;
1644 struct ceph_snap_context *snapc;
1645 struct timespec mtime = CURRENT_TIME;
1646
1647 rbd_assert(osd_req != NULL);
1648
1649 snapc = img_request ? img_request->snapc : NULL;
1650 ceph_osdc_build_request(osd_req, obj_request->offset,
1651 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1652}
1653
bf0d5f50
AE
1654static struct ceph_osd_request *rbd_osd_req_create(
1655 struct rbd_device *rbd_dev,
1656 bool write_request,
430c28c3 1657 struct rbd_obj_request *obj_request)
bf0d5f50 1658{
bf0d5f50
AE
1659 struct ceph_snap_context *snapc = NULL;
1660 struct ceph_osd_client *osdc;
1661 struct ceph_osd_request *osd_req;
bf0d5f50 1662
6365d33a
AE
1663 if (obj_request_img_data_test(obj_request)) {
1664 struct rbd_img_request *img_request = obj_request->img_request;
1665
0c425248
AE
1666 rbd_assert(write_request ==
1667 img_request_write_test(img_request));
1668 if (write_request)
bf0d5f50 1669 snapc = img_request->snapc;
bf0d5f50
AE
1670 }
1671
1672 /* Allocate and initialize the request, for the single op */
1673
1674 osdc = &rbd_dev->rbd_client->client->osdc;
1675 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1676 if (!osd_req)
1677 return NULL; /* ENOMEM */
bf0d5f50 1678
430c28c3 1679 if (write_request)
bf0d5f50 1680 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1681 else
bf0d5f50 1682 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1683
1684 osd_req->r_callback = rbd_osd_req_callback;
1685 osd_req->r_priv = obj_request;
1686
1687 osd_req->r_oid_len = strlen(obj_request->object_name);
1688 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1689 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1690
1691 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1692
bf0d5f50
AE
1693 return osd_req;
1694}
1695
0eefd470
AE
1696/*
1697 * Create a copyup osd request based on the information in the
1698 * object request supplied. A copyup request has two osd ops,
1699 * a copyup method call, and a "normal" write request.
1700 */
1701static struct ceph_osd_request *
1702rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1703{
1704 struct rbd_img_request *img_request;
1705 struct ceph_snap_context *snapc;
1706 struct rbd_device *rbd_dev;
1707 struct ceph_osd_client *osdc;
1708 struct ceph_osd_request *osd_req;
1709
1710 rbd_assert(obj_request_img_data_test(obj_request));
1711 img_request = obj_request->img_request;
1712 rbd_assert(img_request);
1713 rbd_assert(img_request_write_test(img_request));
1714
1715 /* Allocate and initialize the request, for the two ops */
1716
1717 snapc = img_request->snapc;
1718 rbd_dev = img_request->rbd_dev;
1719 osdc = &rbd_dev->rbd_client->client->osdc;
1720 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1721 if (!osd_req)
1722 return NULL; /* ENOMEM */
1723
1724 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1725 osd_req->r_callback = rbd_osd_req_callback;
1726 osd_req->r_priv = obj_request;
1727
1728 osd_req->r_oid_len = strlen(obj_request->object_name);
1729 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1730 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1731
1732 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1733
1734 return osd_req;
1735}
1736
1737
bf0d5f50
AE
1738static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1739{
1740 ceph_osdc_put_request(osd_req);
1741}
1742
1743/* object_name is assumed to be a non-null pointer and NUL-terminated */
1744
1745static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1746 u64 offset, u64 length,
1747 enum obj_request_type type)
1748{
1749 struct rbd_obj_request *obj_request;
1750 size_t size;
1751 char *name;
1752
1753 rbd_assert(obj_request_type_valid(type));
1754
1755 size = strlen(object_name) + 1;
f907ad55
AE
1756 name = kmalloc(size, GFP_KERNEL);
1757 if (!name)
bf0d5f50
AE
1758 return NULL;
1759
868311b1 1760 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1761 if (!obj_request) {
1762 kfree(name);
1763 return NULL;
1764 }
1765
bf0d5f50
AE
1766 obj_request->object_name = memcpy(name, object_name, size);
1767 obj_request->offset = offset;
1768 obj_request->length = length;
926f9b3f 1769 obj_request->flags = 0;
bf0d5f50
AE
1770 obj_request->which = BAD_WHICH;
1771 obj_request->type = type;
1772 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1773 init_completion(&obj_request->completion);
bf0d5f50
AE
1774 kref_init(&obj_request->kref);
1775
37206ee5
AE
1776 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1777 offset, length, (int)type, obj_request);
1778
bf0d5f50
AE
1779 return obj_request;
1780}
1781
1782static void rbd_obj_request_destroy(struct kref *kref)
1783{
1784 struct rbd_obj_request *obj_request;
1785
1786 obj_request = container_of(kref, struct rbd_obj_request, kref);
1787
37206ee5
AE
1788 dout("%s: obj %p\n", __func__, obj_request);
1789
bf0d5f50
AE
1790 rbd_assert(obj_request->img_request == NULL);
1791 rbd_assert(obj_request->which == BAD_WHICH);
1792
1793 if (obj_request->osd_req)
1794 rbd_osd_req_destroy(obj_request->osd_req);
1795
1796 rbd_assert(obj_request_type_valid(obj_request->type));
1797 switch (obj_request->type) {
9969ebc5
AE
1798 case OBJ_REQUEST_NODATA:
1799 break; /* Nothing to do */
bf0d5f50
AE
1800 case OBJ_REQUEST_BIO:
1801 if (obj_request->bio_list)
1802 bio_chain_put(obj_request->bio_list);
1803 break;
788e2df3
AE
1804 case OBJ_REQUEST_PAGES:
1805 if (obj_request->pages)
1806 ceph_release_page_vector(obj_request->pages,
1807 obj_request->page_count);
1808 break;
bf0d5f50
AE
1809 }
1810
f907ad55 1811 kfree(obj_request->object_name);
868311b1
AE
1812 obj_request->object_name = NULL;
1813 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1814}
1815
1816/*
1817 * Caller is responsible for filling in the list of object requests
1818 * that comprises the image request, and the Linux request pointer
1819 * (if there is one).
1820 */
cc344fa1
AE
1821static struct rbd_img_request *rbd_img_request_create(
1822 struct rbd_device *rbd_dev,
bf0d5f50 1823 u64 offset, u64 length,
9849e986
AE
1824 bool write_request,
1825 bool child_request)
bf0d5f50
AE
1826{
1827 struct rbd_img_request *img_request;
bf0d5f50 1828
1c2a9dfe 1829 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1830 if (!img_request)
1831 return NULL;
1832
1833 if (write_request) {
1834 down_read(&rbd_dev->header_rwsem);
812164f8 1835 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1836 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1837 }
1838
1839 img_request->rq = NULL;
1840 img_request->rbd_dev = rbd_dev;
1841 img_request->offset = offset;
1842 img_request->length = length;
0c425248
AE
1843 img_request->flags = 0;
1844 if (write_request) {
1845 img_request_write_set(img_request);
468521c1 1846 img_request->snapc = rbd_dev->header.snapc;
0c425248 1847 } else {
bf0d5f50 1848 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1849 }
9849e986
AE
1850 if (child_request)
1851 img_request_child_set(img_request);
d0b2e944
AE
1852 if (rbd_dev->parent_spec)
1853 img_request_layered_set(img_request);
bf0d5f50
AE
1854 spin_lock_init(&img_request->completion_lock);
1855 img_request->next_completion = 0;
1856 img_request->callback = NULL;
a5a337d4 1857 img_request->result = 0;
bf0d5f50
AE
1858 img_request->obj_request_count = 0;
1859 INIT_LIST_HEAD(&img_request->obj_requests);
1860 kref_init(&img_request->kref);
1861
1862 rbd_img_request_get(img_request); /* Avoid a warning */
1863 rbd_img_request_put(img_request); /* TEMPORARY */
1864
37206ee5
AE
1865 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1866 write_request ? "write" : "read", offset, length,
1867 img_request);
1868
bf0d5f50
AE
1869 return img_request;
1870}
1871
1872static void rbd_img_request_destroy(struct kref *kref)
1873{
1874 struct rbd_img_request *img_request;
1875 struct rbd_obj_request *obj_request;
1876 struct rbd_obj_request *next_obj_request;
1877
1878 img_request = container_of(kref, struct rbd_img_request, kref);
1879
37206ee5
AE
1880 dout("%s: img %p\n", __func__, img_request);
1881
bf0d5f50
AE
1882 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1883 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1884 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1885
0c425248 1886 if (img_request_write_test(img_request))
812164f8 1887 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1888
8b3e1a56
AE
1889 if (img_request_child_test(img_request))
1890 rbd_obj_request_put(img_request->obj_request);
1891
1c2a9dfe 1892 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1893}
1894
1217857f
AE
1895static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1896{
6365d33a 1897 struct rbd_img_request *img_request;
1217857f
AE
1898 unsigned int xferred;
1899 int result;
8b3e1a56 1900 bool more;
1217857f 1901
6365d33a
AE
1902 rbd_assert(obj_request_img_data_test(obj_request));
1903 img_request = obj_request->img_request;
1904
1217857f
AE
1905 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1906 xferred = (unsigned int)obj_request->xferred;
1907 result = obj_request->result;
1908 if (result) {
1909 struct rbd_device *rbd_dev = img_request->rbd_dev;
1910
1911 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1912 img_request_write_test(img_request) ? "write" : "read",
1913 obj_request->length, obj_request->img_offset,
1914 obj_request->offset);
1915 rbd_warn(rbd_dev, " result %d xferred %x\n",
1916 result, xferred);
1917 if (!img_request->result)
1918 img_request->result = result;
1919 }
1920
f1a4739f
AE
1921 /* Image object requests don't own their page array */
1922
1923 if (obj_request->type == OBJ_REQUEST_PAGES) {
1924 obj_request->pages = NULL;
1925 obj_request->page_count = 0;
1926 }
1927
8b3e1a56
AE
1928 if (img_request_child_test(img_request)) {
1929 rbd_assert(img_request->obj_request != NULL);
1930 more = obj_request->which < img_request->obj_request_count - 1;
1931 } else {
1932 rbd_assert(img_request->rq != NULL);
1933 more = blk_end_request(img_request->rq, result, xferred);
1934 }
1935
1936 return more;
1217857f
AE
1937}
1938
2169238d
AE
1939static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1940{
1941 struct rbd_img_request *img_request;
1942 u32 which = obj_request->which;
1943 bool more = true;
1944
6365d33a 1945 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1946 img_request = obj_request->img_request;
1947
1948 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1949 rbd_assert(img_request != NULL);
2169238d
AE
1950 rbd_assert(img_request->obj_request_count > 0);
1951 rbd_assert(which != BAD_WHICH);
1952 rbd_assert(which < img_request->obj_request_count);
1953 rbd_assert(which >= img_request->next_completion);
1954
1955 spin_lock_irq(&img_request->completion_lock);
1956 if (which != img_request->next_completion)
1957 goto out;
1958
1959 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1960 rbd_assert(more);
1961 rbd_assert(which < img_request->obj_request_count);
1962
1963 if (!obj_request_done_test(obj_request))
1964 break;
1217857f 1965 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1966 which++;
1967 }
1968
1969 rbd_assert(more ^ (which == img_request->obj_request_count));
1970 img_request->next_completion = which;
1971out:
1972 spin_unlock_irq(&img_request->completion_lock);
1973
1974 if (!more)
1975 rbd_img_request_complete(img_request);
1976}
1977
f1a4739f
AE
1978/*
1979 * Split up an image request into one or more object requests, each
1980 * to a different object. The "type" parameter indicates whether
1981 * "data_desc" is the pointer to the head of a list of bio
1982 * structures, or the base of a page array. In either case this
1983 * function assumes data_desc describes memory sufficient to hold
1984 * all data described by the image request.
1985 */
1986static int rbd_img_request_fill(struct rbd_img_request *img_request,
1987 enum obj_request_type type,
1988 void *data_desc)
bf0d5f50
AE
1989{
1990 struct rbd_device *rbd_dev = img_request->rbd_dev;
1991 struct rbd_obj_request *obj_request = NULL;
1992 struct rbd_obj_request *next_obj_request;
0c425248 1993 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1994 struct bio *bio_list;
1995 unsigned int bio_offset = 0;
1996 struct page **pages;
7da22d29 1997 u64 img_offset;
bf0d5f50
AE
1998 u64 resid;
1999 u16 opcode;
2000
f1a4739f
AE
2001 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2002 (int)type, data_desc);
37206ee5 2003
430c28c3 2004 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2005 img_offset = img_request->offset;
bf0d5f50 2006 resid = img_request->length;
4dda41d3 2007 rbd_assert(resid > 0);
f1a4739f
AE
2008
2009 if (type == OBJ_REQUEST_BIO) {
2010 bio_list = data_desc;
2011 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2012 } else {
2013 rbd_assert(type == OBJ_REQUEST_PAGES);
2014 pages = data_desc;
2015 }
2016
bf0d5f50 2017 while (resid) {
2fa12320 2018 struct ceph_osd_request *osd_req;
bf0d5f50 2019 const char *object_name;
bf0d5f50
AE
2020 u64 offset;
2021 u64 length;
2022
7da22d29 2023 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2024 if (!object_name)
2025 goto out_unwind;
7da22d29
AE
2026 offset = rbd_segment_offset(rbd_dev, img_offset);
2027 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2028 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2029 offset, length, type);
78c2a44a
AE
2030 /* object request has its own copy of the object name */
2031 rbd_segment_name_free(object_name);
bf0d5f50
AE
2032 if (!obj_request)
2033 goto out_unwind;
2034
f1a4739f
AE
2035 if (type == OBJ_REQUEST_BIO) {
2036 unsigned int clone_size;
2037
2038 rbd_assert(length <= (u64)UINT_MAX);
2039 clone_size = (unsigned int)length;
2040 obj_request->bio_list =
2041 bio_chain_clone_range(&bio_list,
2042 &bio_offset,
2043 clone_size,
2044 GFP_ATOMIC);
2045 if (!obj_request->bio_list)
2046 goto out_partial;
2047 } else {
2048 unsigned int page_count;
2049
2050 obj_request->pages = pages;
2051 page_count = (u32)calc_pages_for(offset, length);
2052 obj_request->page_count = page_count;
2053 if ((offset + length) & ~PAGE_MASK)
2054 page_count--; /* more on last page */
2055 pages += page_count;
2056 }
bf0d5f50 2057
2fa12320
AE
2058 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2059 obj_request);
2060 if (!osd_req)
bf0d5f50 2061 goto out_partial;
2fa12320 2062 obj_request->osd_req = osd_req;
2169238d 2063 obj_request->callback = rbd_img_obj_callback;
430c28c3 2064
2fa12320
AE
2065 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2066 0, 0);
f1a4739f
AE
2067 if (type == OBJ_REQUEST_BIO)
2068 osd_req_op_extent_osd_data_bio(osd_req, 0,
2069 obj_request->bio_list, length);
2070 else
2071 osd_req_op_extent_osd_data_pages(osd_req, 0,
2072 obj_request->pages, length,
2073 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2074
2075 if (write_request)
2076 rbd_osd_req_format_write(obj_request);
2077 else
2078 rbd_osd_req_format_read(obj_request);
430c28c3 2079
7da22d29 2080 obj_request->img_offset = img_offset;
bf0d5f50
AE
2081 rbd_img_obj_request_add(img_request, obj_request);
2082
7da22d29 2083 img_offset += length;
bf0d5f50
AE
2084 resid -= length;
2085 }
2086
2087 return 0;
2088
2089out_partial:
2090 rbd_obj_request_put(obj_request);
2091out_unwind:
2092 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2093 rbd_obj_request_put(obj_request);
2094
2095 return -ENOMEM;
2096}
2097
0eefd470
AE
2098static void
2099rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2100{
2101 struct rbd_img_request *img_request;
2102 struct rbd_device *rbd_dev;
2103 u64 length;
2104 u32 page_count;
2105
2106 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2107 rbd_assert(obj_request_img_data_test(obj_request));
2108 img_request = obj_request->img_request;
2109 rbd_assert(img_request);
2110
2111 rbd_dev = img_request->rbd_dev;
2112 rbd_assert(rbd_dev);
2113 length = (u64)1 << rbd_dev->header.obj_order;
2114 page_count = (u32)calc_pages_for(0, length);
2115
2116 rbd_assert(obj_request->copyup_pages);
2117 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2118 obj_request->copyup_pages = NULL;
2119
2120 /*
2121 * We want the transfer count to reflect the size of the
2122 * original write request. There is no such thing as a
2123 * successful short write, so if the request was successful
2124 * we can just set it to the originally-requested length.
2125 */
2126 if (!obj_request->result)
2127 obj_request->xferred = obj_request->length;
2128
2129 /* Finish up with the normal image object callback */
2130
2131 rbd_img_obj_callback(obj_request);
2132}
2133
3d7efd18
AE
2134static void
2135rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2136{
2137 struct rbd_obj_request *orig_request;
0eefd470
AE
2138 struct ceph_osd_request *osd_req;
2139 struct ceph_osd_client *osdc;
2140 struct rbd_device *rbd_dev;
3d7efd18 2141 struct page **pages;
3d7efd18
AE
2142 int result;
2143 u64 obj_size;
2144 u64 xferred;
2145
2146 rbd_assert(img_request_child_test(img_request));
2147
2148 /* First get what we need from the image request */
2149
2150 pages = img_request->copyup_pages;
2151 rbd_assert(pages != NULL);
2152 img_request->copyup_pages = NULL;
2153
2154 orig_request = img_request->obj_request;
2155 rbd_assert(orig_request != NULL);
0eefd470 2156 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2157 result = img_request->result;
2158 obj_size = img_request->length;
2159 xferred = img_request->xferred;
2160
0eefd470
AE
2161 rbd_dev = img_request->rbd_dev;
2162 rbd_assert(rbd_dev);
2163 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2164
3d7efd18
AE
2165 rbd_img_request_put(img_request);
2166
0eefd470
AE
2167 if (result)
2168 goto out_err;
2169
2170 /* Allocate the new copyup osd request for the original request */
2171
2172 result = -ENOMEM;
2173 rbd_assert(!orig_request->osd_req);
2174 osd_req = rbd_osd_req_create_copyup(orig_request);
2175 if (!osd_req)
2176 goto out_err;
2177 orig_request->osd_req = osd_req;
2178 orig_request->copyup_pages = pages;
3d7efd18 2179
0eefd470 2180 /* Initialize the copyup op */
3d7efd18 2181
0eefd470
AE
2182 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2183 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2184 false, false);
3d7efd18 2185
0eefd470
AE
2186 /* Then the original write request op */
2187
2188 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2189 orig_request->offset,
2190 orig_request->length, 0, 0);
2191 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2192 orig_request->length);
2193
2194 rbd_osd_req_format_write(orig_request);
2195
2196 /* All set, send it off. */
2197
2198 orig_request->callback = rbd_img_obj_copyup_callback;
2199 osdc = &rbd_dev->rbd_client->client->osdc;
2200 result = rbd_obj_request_submit(osdc, orig_request);
2201 if (!result)
2202 return;
2203out_err:
2204 /* Record the error code and complete the request */
2205
2206 orig_request->result = result;
2207 orig_request->xferred = 0;
2208 obj_request_done_set(orig_request);
2209 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2210}
2211
2212/*
2213 * Read from the parent image the range of data that covers the
2214 * entire target of the given object request. This is used for
2215 * satisfying a layered image write request when the target of an
2216 * object request from the image request does not exist.
2217 *
2218 * A page array big enough to hold the returned data is allocated
2219 * and supplied to rbd_img_request_fill() as the "data descriptor."
2220 * When the read completes, this page array will be transferred to
2221 * the original object request for the copyup operation.
2222 *
2223 * If an error occurs, record it as the result of the original
2224 * object request and mark it done so it gets completed.
2225 */
2226static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2227{
2228 struct rbd_img_request *img_request = NULL;
2229 struct rbd_img_request *parent_request = NULL;
2230 struct rbd_device *rbd_dev;
2231 u64 img_offset;
2232 u64 length;
2233 struct page **pages = NULL;
2234 u32 page_count;
2235 int result;
2236
2237 rbd_assert(obj_request_img_data_test(obj_request));
2238 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2239
2240 img_request = obj_request->img_request;
2241 rbd_assert(img_request != NULL);
2242 rbd_dev = img_request->rbd_dev;
2243 rbd_assert(rbd_dev->parent != NULL);
2244
0eefd470
AE
2245 /*
2246 * First things first. The original osd request is of no
2247 * use to use any more, we'll need a new one that can hold
2248 * the two ops in a copyup request. We'll get that later,
2249 * but for now we can release the old one.
2250 */
2251 rbd_osd_req_destroy(obj_request->osd_req);
2252 obj_request->osd_req = NULL;
2253
3d7efd18
AE
2254 /*
2255 * Determine the byte range covered by the object in the
2256 * child image to which the original request was to be sent.
2257 */
2258 img_offset = obj_request->img_offset - obj_request->offset;
2259 length = (u64)1 << rbd_dev->header.obj_order;
2260
a9e8ba2c
AE
2261 /*
2262 * There is no defined parent data beyond the parent
2263 * overlap, so limit what we read at that boundary if
2264 * necessary.
2265 */
2266 if (img_offset + length > rbd_dev->parent_overlap) {
2267 rbd_assert(img_offset < rbd_dev->parent_overlap);
2268 length = rbd_dev->parent_overlap - img_offset;
2269 }
2270
3d7efd18
AE
2271 /*
2272 * Allocate a page array big enough to receive the data read
2273 * from the parent.
2274 */
2275 page_count = (u32)calc_pages_for(0, length);
2276 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2277 if (IS_ERR(pages)) {
2278 result = PTR_ERR(pages);
2279 pages = NULL;
2280 goto out_err;
2281 }
2282
2283 result = -ENOMEM;
2284 parent_request = rbd_img_request_create(rbd_dev->parent,
2285 img_offset, length,
2286 false, true);
2287 if (!parent_request)
2288 goto out_err;
2289 rbd_obj_request_get(obj_request);
2290 parent_request->obj_request = obj_request;
2291
2292 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2293 if (result)
2294 goto out_err;
2295 parent_request->copyup_pages = pages;
2296
2297 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2298 result = rbd_img_request_submit(parent_request);
2299 if (!result)
2300 return 0;
2301
2302 parent_request->copyup_pages = NULL;
2303 parent_request->obj_request = NULL;
2304 rbd_obj_request_put(obj_request);
2305out_err:
2306 if (pages)
2307 ceph_release_page_vector(pages, page_count);
2308 if (parent_request)
2309 rbd_img_request_put(parent_request);
2310 obj_request->result = result;
2311 obj_request->xferred = 0;
2312 obj_request_done_set(obj_request);
2313
2314 return result;
2315}
2316
c5b5ef6c
AE
2317static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2318{
c5b5ef6c
AE
2319 struct rbd_obj_request *orig_request;
2320 int result;
2321
2322 rbd_assert(!obj_request_img_data_test(obj_request));
2323
2324 /*
2325 * All we need from the object request is the original
2326 * request and the result of the STAT op. Grab those, then
2327 * we're done with the request.
2328 */
2329 orig_request = obj_request->obj_request;
2330 obj_request->obj_request = NULL;
2331 rbd_assert(orig_request);
2332 rbd_assert(orig_request->img_request);
2333
2334 result = obj_request->result;
2335 obj_request->result = 0;
2336
2337 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2338 obj_request, orig_request, result,
2339 obj_request->xferred, obj_request->length);
2340 rbd_obj_request_put(obj_request);
2341
2342 rbd_assert(orig_request);
2343 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2344
2345 /*
2346 * Our only purpose here is to determine whether the object
2347 * exists, and we don't want to treat the non-existence as
2348 * an error. If something else comes back, transfer the
2349 * error to the original request and complete it now.
2350 */
2351 if (!result) {
2352 obj_request_existence_set(orig_request, true);
2353 } else if (result == -ENOENT) {
2354 obj_request_existence_set(orig_request, false);
2355 } else if (result) {
2356 orig_request->result = result;
3d7efd18 2357 goto out;
c5b5ef6c
AE
2358 }
2359
2360 /*
2361 * Resubmit the original request now that we have recorded
2362 * whether the target object exists.
2363 */
b454e36d 2364 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2365out:
c5b5ef6c
AE
2366 if (orig_request->result)
2367 rbd_obj_request_complete(orig_request);
2368 rbd_obj_request_put(orig_request);
2369}
2370
2371static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2372{
2373 struct rbd_obj_request *stat_request;
2374 struct rbd_device *rbd_dev;
2375 struct ceph_osd_client *osdc;
2376 struct page **pages = NULL;
2377 u32 page_count;
2378 size_t size;
2379 int ret;
2380
2381 /*
2382 * The response data for a STAT call consists of:
2383 * le64 length;
2384 * struct {
2385 * le32 tv_sec;
2386 * le32 tv_nsec;
2387 * } mtime;
2388 */
2389 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2390 page_count = (u32)calc_pages_for(0, size);
2391 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2392 if (IS_ERR(pages))
2393 return PTR_ERR(pages);
2394
2395 ret = -ENOMEM;
2396 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2397 OBJ_REQUEST_PAGES);
2398 if (!stat_request)
2399 goto out;
2400
2401 rbd_obj_request_get(obj_request);
2402 stat_request->obj_request = obj_request;
2403 stat_request->pages = pages;
2404 stat_request->page_count = page_count;
2405
2406 rbd_assert(obj_request->img_request);
2407 rbd_dev = obj_request->img_request->rbd_dev;
2408 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2409 stat_request);
2410 if (!stat_request->osd_req)
2411 goto out;
2412 stat_request->callback = rbd_img_obj_exists_callback;
2413
2414 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2415 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2416 false, false);
9d4df01f 2417 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2418
2419 osdc = &rbd_dev->rbd_client->client->osdc;
2420 ret = rbd_obj_request_submit(osdc, stat_request);
2421out:
2422 if (ret)
2423 rbd_obj_request_put(obj_request);
2424
2425 return ret;
2426}
2427
b454e36d
AE
2428static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2429{
2430 struct rbd_img_request *img_request;
a9e8ba2c 2431 struct rbd_device *rbd_dev;
3d7efd18 2432 bool known;
b454e36d
AE
2433
2434 rbd_assert(obj_request_img_data_test(obj_request));
2435
2436 img_request = obj_request->img_request;
2437 rbd_assert(img_request);
a9e8ba2c 2438 rbd_dev = img_request->rbd_dev;
b454e36d 2439
b454e36d 2440 /*
a9e8ba2c
AE
2441 * Only writes to layered images need special handling.
2442 * Reads and non-layered writes are simple object requests.
2443 * Layered writes that start beyond the end of the overlap
2444 * with the parent have no parent data, so they too are
2445 * simple object requests. Finally, if the target object is
2446 * known to already exist, its parent data has already been
2447 * copied, so a write to the object can also be handled as a
2448 * simple object request.
b454e36d
AE
2449 */
2450 if (!img_request_write_test(img_request) ||
2451 !img_request_layered_test(img_request) ||
a9e8ba2c 2452 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2453 ((known = obj_request_known_test(obj_request)) &&
2454 obj_request_exists_test(obj_request))) {
b454e36d
AE
2455
2456 struct rbd_device *rbd_dev;
2457 struct ceph_osd_client *osdc;
2458
2459 rbd_dev = obj_request->img_request->rbd_dev;
2460 osdc = &rbd_dev->rbd_client->client->osdc;
2461
2462 return rbd_obj_request_submit(osdc, obj_request);
2463 }
2464
2465 /*
3d7efd18
AE
2466 * It's a layered write. The target object might exist but
2467 * we may not know that yet. If we know it doesn't exist,
2468 * start by reading the data for the full target object from
2469 * the parent so we can use it for a copyup to the target.
b454e36d 2470 */
3d7efd18
AE
2471 if (known)
2472 return rbd_img_obj_parent_read_full(obj_request);
2473
2474 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2475
2476 return rbd_img_obj_exists_submit(obj_request);
2477}
2478
bf0d5f50
AE
2479static int rbd_img_request_submit(struct rbd_img_request *img_request)
2480{
bf0d5f50 2481 struct rbd_obj_request *obj_request;
46faeed4 2482 struct rbd_obj_request *next_obj_request;
bf0d5f50 2483
37206ee5 2484 dout("%s: img %p\n", __func__, img_request);
46faeed4 2485 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2486 int ret;
2487
b454e36d 2488 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2489 if (ret)
2490 return ret;
bf0d5f50
AE
2491 }
2492
2493 return 0;
2494}
8b3e1a56
AE
2495
2496static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2497{
2498 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2499 struct rbd_device *rbd_dev;
2500 u64 obj_end;
8b3e1a56
AE
2501
2502 rbd_assert(img_request_child_test(img_request));
2503
2504 obj_request = img_request->obj_request;
a9e8ba2c
AE
2505 rbd_assert(obj_request);
2506 rbd_assert(obj_request->img_request);
2507
8b3e1a56 2508 obj_request->result = img_request->result;
a9e8ba2c
AE
2509 if (obj_request->result)
2510 goto out;
2511
2512 /*
2513 * We need to zero anything beyond the parent overlap
2514 * boundary. Since rbd_img_obj_request_read_callback()
2515 * will zero anything beyond the end of a short read, an
2516 * easy way to do this is to pretend the data from the
2517 * parent came up short--ending at the overlap boundary.
2518 */
2519 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2520 obj_end = obj_request->img_offset + obj_request->length;
2521 rbd_dev = obj_request->img_request->rbd_dev;
2522 if (obj_end > rbd_dev->parent_overlap) {
2523 u64 xferred = 0;
2524
2525 if (obj_request->img_offset < rbd_dev->parent_overlap)
2526 xferred = rbd_dev->parent_overlap -
2527 obj_request->img_offset;
8b3e1a56 2528
a9e8ba2c
AE
2529 obj_request->xferred = min(img_request->xferred, xferred);
2530 } else {
2531 obj_request->xferred = img_request->xferred;
2532 }
2533out:
b5b09be3 2534 rbd_img_request_put(img_request);
8b3e1a56
AE
2535 rbd_img_obj_request_read_callback(obj_request);
2536 rbd_obj_request_complete(obj_request);
2537}
2538
2539static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2540{
2541 struct rbd_device *rbd_dev;
2542 struct rbd_img_request *img_request;
2543 int result;
2544
2545 rbd_assert(obj_request_img_data_test(obj_request));
2546 rbd_assert(obj_request->img_request != NULL);
2547 rbd_assert(obj_request->result == (s32) -ENOENT);
2548 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2549
2550 rbd_dev = obj_request->img_request->rbd_dev;
2551 rbd_assert(rbd_dev->parent != NULL);
2552 /* rbd_read_finish(obj_request, obj_request->length); */
2553 img_request = rbd_img_request_create(rbd_dev->parent,
2554 obj_request->img_offset,
2555 obj_request->length,
2556 false, true);
2557 result = -ENOMEM;
2558 if (!img_request)
2559 goto out_err;
2560
2561 rbd_obj_request_get(obj_request);
2562 img_request->obj_request = obj_request;
2563
f1a4739f
AE
2564 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2565 obj_request->bio_list);
8b3e1a56
AE
2566 if (result)
2567 goto out_err;
2568
2569 img_request->callback = rbd_img_parent_read_callback;
2570 result = rbd_img_request_submit(img_request);
2571 if (result)
2572 goto out_err;
2573
2574 return;
2575out_err:
2576 if (img_request)
2577 rbd_img_request_put(img_request);
2578 obj_request->result = result;
2579 obj_request->xferred = 0;
2580 obj_request_done_set(obj_request);
2581}
bf0d5f50 2582
cc4a38bd 2583static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2584{
2585 struct rbd_obj_request *obj_request;
2169238d 2586 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2587 int ret;
2588
2589 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2590 OBJ_REQUEST_NODATA);
2591 if (!obj_request)
2592 return -ENOMEM;
2593
2594 ret = -ENOMEM;
430c28c3 2595 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2596 if (!obj_request->osd_req)
2597 goto out;
2169238d 2598 obj_request->callback = rbd_obj_request_put;
b8d70035 2599
c99d2d4a 2600 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2601 notify_id, 0, 0);
9d4df01f 2602 rbd_osd_req_format_read(obj_request);
430c28c3 2603
b8d70035 2604 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2605out:
cf81b60e
AE
2606 if (ret)
2607 rbd_obj_request_put(obj_request);
b8d70035
AE
2608
2609 return ret;
2610}
2611
2612static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2613{
2614 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2615 int ret;
b8d70035
AE
2616
2617 if (!rbd_dev)
2618 return;
2619
37206ee5 2620 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2621 rbd_dev->header_name, (unsigned long long)notify_id,
2622 (unsigned int)opcode);
e627db08
AE
2623 ret = rbd_dev_refresh(rbd_dev);
2624 if (ret)
2625 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2626
cc4a38bd 2627 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2628}
2629
9969ebc5
AE
2630/*
2631 * Request sync osd watch/unwatch. The value of "start" determines
2632 * whether a watch request is being initiated or torn down.
2633 */
2634static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2635{
2636 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2637 struct rbd_obj_request *obj_request;
9969ebc5
AE
2638 int ret;
2639
2640 rbd_assert(start ^ !!rbd_dev->watch_event);
2641 rbd_assert(start ^ !!rbd_dev->watch_request);
2642
2643 if (start) {
3c663bbd 2644 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2645 &rbd_dev->watch_event);
2646 if (ret < 0)
2647 return ret;
8eb87565 2648 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2649 }
2650
2651 ret = -ENOMEM;
2652 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2653 OBJ_REQUEST_NODATA);
2654 if (!obj_request)
2655 goto out_cancel;
2656
430c28c3
AE
2657 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2658 if (!obj_request->osd_req)
2659 goto out_cancel;
2660
8eb87565 2661 if (start)
975241af 2662 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2663 else
6977c3f9 2664 ceph_osdc_unregister_linger_request(osdc,
975241af 2665 rbd_dev->watch_request->osd_req);
2169238d
AE
2666
2667 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2668 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2669 rbd_osd_req_format_write(obj_request);
2169238d 2670
9969ebc5
AE
2671 ret = rbd_obj_request_submit(osdc, obj_request);
2672 if (ret)
2673 goto out_cancel;
2674 ret = rbd_obj_request_wait(obj_request);
2675 if (ret)
2676 goto out_cancel;
9969ebc5
AE
2677 ret = obj_request->result;
2678 if (ret)
2679 goto out_cancel;
2680
8eb87565
AE
2681 /*
2682 * A watch request is set to linger, so the underlying osd
2683 * request won't go away until we unregister it. We retain
2684 * a pointer to the object request during that time (in
2685 * rbd_dev->watch_request), so we'll keep a reference to
2686 * it. We'll drop that reference (below) after we've
2687 * unregistered it.
2688 */
2689 if (start) {
2690 rbd_dev->watch_request = obj_request;
2691
2692 return 0;
2693 }
2694
2695 /* We have successfully torn down the watch request */
2696
2697 rbd_obj_request_put(rbd_dev->watch_request);
2698 rbd_dev->watch_request = NULL;
9969ebc5
AE
2699out_cancel:
2700 /* Cancel the event if we're tearing down, or on error */
2701 ceph_osdc_cancel_event(rbd_dev->watch_event);
2702 rbd_dev->watch_event = NULL;
9969ebc5
AE
2703 if (obj_request)
2704 rbd_obj_request_put(obj_request);
2705
2706 return ret;
2707}
2708
36be9a76 2709/*
f40eb349
AE
2710 * Synchronous osd object method call. Returns the number of bytes
2711 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2712 */
2713static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2714 const char *object_name,
2715 const char *class_name,
2716 const char *method_name,
4157976b 2717 const void *outbound,
36be9a76 2718 size_t outbound_size,
4157976b 2719 void *inbound,
e2a58ee5 2720 size_t inbound_size)
36be9a76 2721{
2169238d 2722 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2723 struct rbd_obj_request *obj_request;
36be9a76
AE
2724 struct page **pages;
2725 u32 page_count;
2726 int ret;
2727
2728 /*
6010a451
AE
2729 * Method calls are ultimately read operations. The result
2730 * should placed into the inbound buffer provided. They
2731 * also supply outbound data--parameters for the object
2732 * method. Currently if this is present it will be a
2733 * snapshot id.
36be9a76 2734 */
57385b51 2735 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2736 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2737 if (IS_ERR(pages))
2738 return PTR_ERR(pages);
2739
2740 ret = -ENOMEM;
6010a451 2741 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2742 OBJ_REQUEST_PAGES);
2743 if (!obj_request)
2744 goto out;
2745
2746 obj_request->pages = pages;
2747 obj_request->page_count = page_count;
2748
430c28c3 2749 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2750 if (!obj_request->osd_req)
2751 goto out;
2752
c99d2d4a 2753 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2754 class_name, method_name);
2755 if (outbound_size) {
2756 struct ceph_pagelist *pagelist;
2757
2758 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2759 if (!pagelist)
2760 goto out;
2761
2762 ceph_pagelist_init(pagelist);
2763 ceph_pagelist_append(pagelist, outbound, outbound_size);
2764 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2765 pagelist);
2766 }
a4ce40a9
AE
2767 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2768 obj_request->pages, inbound_size,
44cd188d 2769 0, false, false);
9d4df01f 2770 rbd_osd_req_format_read(obj_request);
430c28c3 2771
36be9a76
AE
2772 ret = rbd_obj_request_submit(osdc, obj_request);
2773 if (ret)
2774 goto out;
2775 ret = rbd_obj_request_wait(obj_request);
2776 if (ret)
2777 goto out;
2778
2779 ret = obj_request->result;
2780 if (ret < 0)
2781 goto out;
57385b51
AE
2782
2783 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2784 ret = (int)obj_request->xferred;
903bb32e 2785 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2786out:
2787 if (obj_request)
2788 rbd_obj_request_put(obj_request);
2789 else
2790 ceph_release_page_vector(pages, page_count);
2791
2792 return ret;
2793}
2794
bf0d5f50 2795static void rbd_request_fn(struct request_queue *q)
cc344fa1 2796 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2797{
2798 struct rbd_device *rbd_dev = q->queuedata;
2799 bool read_only = rbd_dev->mapping.read_only;
2800 struct request *rq;
2801 int result;
2802
2803 while ((rq = blk_fetch_request(q))) {
2804 bool write_request = rq_data_dir(rq) == WRITE;
2805 struct rbd_img_request *img_request;
2806 u64 offset;
2807 u64 length;
2808
2809 /* Ignore any non-FS requests that filter through. */
2810
2811 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2812 dout("%s: non-fs request type %d\n", __func__,
2813 (int) rq->cmd_type);
2814 __blk_end_request_all(rq, 0);
2815 continue;
2816 }
2817
2818 /* Ignore/skip any zero-length requests */
2819
2820 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2821 length = (u64) blk_rq_bytes(rq);
2822
2823 if (!length) {
2824 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2825 __blk_end_request_all(rq, 0);
2826 continue;
2827 }
2828
2829 spin_unlock_irq(q->queue_lock);
2830
2831 /* Disallow writes to a read-only device */
2832
2833 if (write_request) {
2834 result = -EROFS;
2835 if (read_only)
2836 goto end_request;
2837 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2838 }
2839
6d292906
AE
2840 /*
2841 * Quit early if the mapped snapshot no longer
2842 * exists. It's still possible the snapshot will
2843 * have disappeared by the time our request arrives
2844 * at the osd, but there's no sense in sending it if
2845 * we already know.
2846 */
2847 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2848 dout("request for non-existent snapshot");
2849 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2850 result = -ENXIO;
2851 goto end_request;
2852 }
2853
bf0d5f50 2854 result = -EINVAL;
c0cd10db
AE
2855 if (offset && length > U64_MAX - offset + 1) {
2856 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2857 offset, length);
bf0d5f50 2858 goto end_request; /* Shouldn't happen */
c0cd10db 2859 }
bf0d5f50 2860
00a653e2
AE
2861 result = -EIO;
2862 if (offset + length > rbd_dev->mapping.size) {
2863 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2864 offset, length, rbd_dev->mapping.size);
2865 goto end_request;
2866 }
2867
bf0d5f50
AE
2868 result = -ENOMEM;
2869 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2870 write_request, false);
bf0d5f50
AE
2871 if (!img_request)
2872 goto end_request;
2873
2874 img_request->rq = rq;
2875
f1a4739f
AE
2876 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2877 rq->bio);
bf0d5f50
AE
2878 if (!result)
2879 result = rbd_img_request_submit(img_request);
2880 if (result)
2881 rbd_img_request_put(img_request);
2882end_request:
2883 spin_lock_irq(q->queue_lock);
2884 if (result < 0) {
7da22d29
AE
2885 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2886 write_request ? "write" : "read",
2887 length, offset, result);
2888
bf0d5f50
AE
2889 __blk_end_request_all(rq, result);
2890 }
2891 }
2892}
2893
602adf40
YS
2894/*
2895 * a queue callback. Makes sure that we don't create a bio that spans across
2896 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2897 * which we handle later at bio_chain_clone_range()
602adf40
YS
2898 */
2899static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2900 struct bio_vec *bvec)
2901{
2902 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2903 sector_t sector_offset;
2904 sector_t sectors_per_obj;
2905 sector_t obj_sector_offset;
2906 int ret;
2907
2908 /*
2909 * Find how far into its rbd object the partition-relative
2910 * bio start sector is to offset relative to the enclosing
2911 * device.
2912 */
2913 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2914 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2915 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2916
2917 /*
2918 * Compute the number of bytes from that offset to the end
2919 * of the object. Account for what's already used by the bio.
2920 */
2921 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2922 if (ret > bmd->bi_size)
2923 ret -= bmd->bi_size;
2924 else
2925 ret = 0;
2926
2927 /*
2928 * Don't send back more than was asked for. And if the bio
2929 * was empty, let the whole thing through because: "Note
2930 * that a block device *must* allow a single page to be
2931 * added to an empty bio."
2932 */
2933 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2934 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2935 ret = (int) bvec->bv_len;
2936
2937 return ret;
602adf40
YS
2938}
2939
2940static void rbd_free_disk(struct rbd_device *rbd_dev)
2941{
2942 struct gendisk *disk = rbd_dev->disk;
2943
2944 if (!disk)
2945 return;
2946
a0cab924
AE
2947 rbd_dev->disk = NULL;
2948 if (disk->flags & GENHD_FL_UP) {
602adf40 2949 del_gendisk(disk);
a0cab924
AE
2950 if (disk->queue)
2951 blk_cleanup_queue(disk->queue);
2952 }
602adf40
YS
2953 put_disk(disk);
2954}
2955
788e2df3
AE
2956static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2957 const char *object_name,
7097f8df 2958 u64 offset, u64 length, void *buf)
788e2df3
AE
2959
2960{
2169238d 2961 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2962 struct rbd_obj_request *obj_request;
788e2df3
AE
2963 struct page **pages = NULL;
2964 u32 page_count;
1ceae7ef 2965 size_t size;
788e2df3
AE
2966 int ret;
2967
2968 page_count = (u32) calc_pages_for(offset, length);
2969 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2970 if (IS_ERR(pages))
2971 ret = PTR_ERR(pages);
2972
2973 ret = -ENOMEM;
2974 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2975 OBJ_REQUEST_PAGES);
788e2df3
AE
2976 if (!obj_request)
2977 goto out;
2978
2979 obj_request->pages = pages;
2980 obj_request->page_count = page_count;
2981
430c28c3 2982 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2983 if (!obj_request->osd_req)
2984 goto out;
2985
c99d2d4a
AE
2986 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2987 offset, length, 0, 0);
406e2c9f 2988 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2989 obj_request->pages,
44cd188d
AE
2990 obj_request->length,
2991 obj_request->offset & ~PAGE_MASK,
2992 false, false);
9d4df01f 2993 rbd_osd_req_format_read(obj_request);
430c28c3 2994
788e2df3
AE
2995 ret = rbd_obj_request_submit(osdc, obj_request);
2996 if (ret)
2997 goto out;
2998 ret = rbd_obj_request_wait(obj_request);
2999 if (ret)
3000 goto out;
3001
3002 ret = obj_request->result;
3003 if (ret < 0)
3004 goto out;
1ceae7ef
AE
3005
3006 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3007 size = (size_t) obj_request->xferred;
903bb32e 3008 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3009 rbd_assert(size <= (size_t)INT_MAX);
3010 ret = (int)size;
788e2df3
AE
3011out:
3012 if (obj_request)
3013 rbd_obj_request_put(obj_request);
3014 else
3015 ceph_release_page_vector(pages, page_count);
3016
3017 return ret;
3018}
3019
602adf40 3020/*
4156d998
AE
3021 * Read the complete header for the given rbd device.
3022 *
3023 * Returns a pointer to a dynamically-allocated buffer containing
3024 * the complete and validated header. Caller can pass the address
3025 * of a variable that will be filled in with the version of the
3026 * header object at the time it was read.
3027 *
3028 * Returns a pointer-coded errno if a failure occurs.
602adf40 3029 */
4156d998 3030static struct rbd_image_header_ondisk *
7097f8df 3031rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 3032{
4156d998 3033 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3034 u32 snap_count = 0;
4156d998
AE
3035 u64 names_size = 0;
3036 u32 want_count;
3037 int ret;
602adf40 3038
00f1f36f 3039 /*
4156d998
AE
3040 * The complete header will include an array of its 64-bit
3041 * snapshot ids, followed by the names of those snapshots as
3042 * a contiguous block of NUL-terminated strings. Note that
3043 * the number of snapshots could change by the time we read
3044 * it in, in which case we re-read it.
00f1f36f 3045 */
4156d998
AE
3046 do {
3047 size_t size;
3048
3049 kfree(ondisk);
3050
3051 size = sizeof (*ondisk);
3052 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3053 size += names_size;
3054 ondisk = kmalloc(size, GFP_KERNEL);
3055 if (!ondisk)
3056 return ERR_PTR(-ENOMEM);
3057
788e2df3 3058 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3059 0, size, ondisk);
4156d998
AE
3060 if (ret < 0)
3061 goto out_err;
c0cd10db 3062 if ((size_t)ret < size) {
4156d998 3063 ret = -ENXIO;
06ecc6cb
AE
3064 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3065 size, ret);
4156d998
AE
3066 goto out_err;
3067 }
3068 if (!rbd_dev_ondisk_valid(ondisk)) {
3069 ret = -ENXIO;
06ecc6cb 3070 rbd_warn(rbd_dev, "invalid header");
4156d998 3071 goto out_err;
81e759fb 3072 }
602adf40 3073
4156d998
AE
3074 names_size = le64_to_cpu(ondisk->snap_names_len);
3075 want_count = snap_count;
3076 snap_count = le32_to_cpu(ondisk->snap_count);
3077 } while (snap_count != want_count);
00f1f36f 3078
4156d998 3079 return ondisk;
00f1f36f 3080
4156d998
AE
3081out_err:
3082 kfree(ondisk);
3083
3084 return ERR_PTR(ret);
3085}
3086
3087/*
3088 * reload the ondisk the header
3089 */
3090static int rbd_read_header(struct rbd_device *rbd_dev,
3091 struct rbd_image_header *header)
3092{
3093 struct rbd_image_header_ondisk *ondisk;
4156d998 3094 int ret;
602adf40 3095
7097f8df 3096 ondisk = rbd_dev_v1_header_read(rbd_dev);
4156d998
AE
3097 if (IS_ERR(ondisk))
3098 return PTR_ERR(ondisk);
3099 ret = rbd_header_from_disk(header, ondisk);
4156d998
AE
3100 kfree(ondisk);
3101
3102 return ret;
602adf40
YS
3103}
3104
602adf40
YS
3105/*
3106 * only read the first part of the ondisk header, without the snaps info
3107 */
cc4a38bd 3108static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
602adf40
YS
3109{
3110 int ret;
3111 struct rbd_image_header h;
602adf40
YS
3112
3113 ret = rbd_read_header(rbd_dev, &h);
3114 if (ret < 0)
3115 return ret;
3116
a51aa0c0
JD
3117 down_write(&rbd_dev->header_rwsem);
3118
9478554a
AE
3119 /* Update image size, and check for resize of mapped image */
3120 rbd_dev->header.image_size = h.image_size;
29334ba4
AE
3121 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3122 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3123 rbd_dev->mapping.size = rbd_dev->header.image_size;
9db4b3e3 3124
849b4260 3125 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3126 kfree(rbd_dev->header.snap_sizes);
849b4260 3127 kfree(rbd_dev->header.snap_names);
d1d25646 3128 /* osd requests may still refer to snapc */
812164f8 3129 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3130
93a24e08 3131 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3132 rbd_dev->header.snapc = h.snapc;
3133 rbd_dev->header.snap_names = h.snap_names;
3134 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3135 /* Free the extra copy of the object prefix */
c0cd10db
AE
3136 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3137 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3138 kfree(h.object_prefix);
3139
c666601a 3140 up_write(&rbd_dev->header_rwsem);
602adf40 3141
dfc5606d 3142 return ret;
602adf40
YS
3143}
3144
15228ede
AE
3145/*
3146 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3147 * has disappeared from the (just updated) snapshot context.
3148 */
3149static void rbd_exists_validate(struct rbd_device *rbd_dev)
3150{
3151 u64 snap_id;
3152
3153 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3154 return;
3155
3156 snap_id = rbd_dev->spec->snap_id;
3157 if (snap_id == CEPH_NOSNAP)
3158 return;
3159
3160 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3161 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3162}
3163
cc4a38bd 3164static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3165{
e627db08 3166 u64 mapping_size;
1fe5e993
AE
3167 int ret;
3168
117973fb 3169 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3170 mapping_size = rbd_dev->mapping.size;
1fe5e993 3171 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3172 if (rbd_dev->image_format == 1)
cc4a38bd 3173 ret = rbd_dev_v1_refresh(rbd_dev);
117973fb 3174 else
cc4a38bd 3175 ret = rbd_dev_v2_refresh(rbd_dev);
15228ede
AE
3176
3177 /* If it's a mapped snapshot, validate its EXISTS flag */
3178
3179 rbd_exists_validate(rbd_dev);
1fe5e993 3180 mutex_unlock(&ctl_mutex);
00a653e2
AE
3181 if (mapping_size != rbd_dev->mapping.size) {
3182 sector_t size;
3183
3184 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3185 dout("setting size to %llu sectors", (unsigned long long)size);
3186 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3187 revalidate_disk(rbd_dev->disk);
00a653e2 3188 }
1fe5e993
AE
3189
3190 return ret;
3191}
3192
602adf40
YS
3193static int rbd_init_disk(struct rbd_device *rbd_dev)
3194{
3195 struct gendisk *disk;
3196 struct request_queue *q;
593a9e7b 3197 u64 segment_size;
602adf40 3198
602adf40 3199 /* create gendisk info */
602adf40
YS
3200 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3201 if (!disk)
1fcdb8aa 3202 return -ENOMEM;
602adf40 3203
f0f8cef5 3204 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3205 rbd_dev->dev_id);
602adf40
YS
3206 disk->major = rbd_dev->major;
3207 disk->first_minor = 0;
3208 disk->fops = &rbd_bd_ops;
3209 disk->private_data = rbd_dev;
3210
bf0d5f50 3211 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3212 if (!q)
3213 goto out_disk;
029bcbd8 3214
593a9e7b
AE
3215 /* We use the default size, but let's be explicit about it. */
3216 blk_queue_physical_block_size(q, SECTOR_SIZE);
3217
029bcbd8 3218 /* set io sizes to object size */
593a9e7b
AE
3219 segment_size = rbd_obj_bytes(&rbd_dev->header);
3220 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3221 blk_queue_max_segment_size(q, segment_size);
3222 blk_queue_io_min(q, segment_size);
3223 blk_queue_io_opt(q, segment_size);
029bcbd8 3224
602adf40
YS
3225 blk_queue_merge_bvec(q, rbd_merge_bvec);
3226 disk->queue = q;
3227
3228 q->queuedata = rbd_dev;
3229
3230 rbd_dev->disk = disk;
602adf40 3231
602adf40 3232 return 0;
602adf40
YS
3233out_disk:
3234 put_disk(disk);
1fcdb8aa
AE
3235
3236 return -ENOMEM;
602adf40
YS
3237}
3238
dfc5606d
YS
3239/*
3240 sysfs
3241*/
3242
593a9e7b
AE
3243static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3244{
3245 return container_of(dev, struct rbd_device, dev);
3246}
3247
dfc5606d
YS
3248static ssize_t rbd_size_show(struct device *dev,
3249 struct device_attribute *attr, char *buf)
3250{
593a9e7b 3251 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3252
fc71d833
AE
3253 return sprintf(buf, "%llu\n",
3254 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3255}
3256
34b13184
AE
3257/*
3258 * Note this shows the features for whatever's mapped, which is not
3259 * necessarily the base image.
3260 */
3261static ssize_t rbd_features_show(struct device *dev,
3262 struct device_attribute *attr, char *buf)
3263{
3264 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3265
3266 return sprintf(buf, "0x%016llx\n",
fc71d833 3267 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3268}
3269
dfc5606d
YS
3270static ssize_t rbd_major_show(struct device *dev,
3271 struct device_attribute *attr, char *buf)
3272{
593a9e7b 3273 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3274
fc71d833
AE
3275 if (rbd_dev->major)
3276 return sprintf(buf, "%d\n", rbd_dev->major);
3277
3278 return sprintf(buf, "(none)\n");
3279
dfc5606d
YS
3280}
3281
3282static ssize_t rbd_client_id_show(struct device *dev,
3283 struct device_attribute *attr, char *buf)
602adf40 3284{
593a9e7b 3285 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3286
1dbb4399
AE
3287 return sprintf(buf, "client%lld\n",
3288 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3289}
3290
dfc5606d
YS
3291static ssize_t rbd_pool_show(struct device *dev,
3292 struct device_attribute *attr, char *buf)
602adf40 3293{
593a9e7b 3294 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3295
0d7dbfce 3296 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3297}
3298
9bb2f334
AE
3299static ssize_t rbd_pool_id_show(struct device *dev,
3300 struct device_attribute *attr, char *buf)
3301{
3302 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3303
0d7dbfce 3304 return sprintf(buf, "%llu\n",
fc71d833 3305 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3306}
3307
dfc5606d
YS
3308static ssize_t rbd_name_show(struct device *dev,
3309 struct device_attribute *attr, char *buf)
3310{
593a9e7b 3311 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3312
a92ffdf8
AE
3313 if (rbd_dev->spec->image_name)
3314 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3315
3316 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3317}
3318
589d30e0
AE
3319static ssize_t rbd_image_id_show(struct device *dev,
3320 struct device_attribute *attr, char *buf)
3321{
3322 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3323
0d7dbfce 3324 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3325}
3326
34b13184
AE
3327/*
3328 * Shows the name of the currently-mapped snapshot (or
3329 * RBD_SNAP_HEAD_NAME for the base image).
3330 */
dfc5606d
YS
3331static ssize_t rbd_snap_show(struct device *dev,
3332 struct device_attribute *attr,
3333 char *buf)
3334{
593a9e7b 3335 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3336
0d7dbfce 3337 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3338}
3339
86b00e0d
AE
3340/*
3341 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3342 * for the parent image. If there is no parent, simply shows
3343 * "(no parent image)".
3344 */
3345static ssize_t rbd_parent_show(struct device *dev,
3346 struct device_attribute *attr,
3347 char *buf)
3348{
3349 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3350 struct rbd_spec *spec = rbd_dev->parent_spec;
3351 int count;
3352 char *bufp = buf;
3353
3354 if (!spec)
3355 return sprintf(buf, "(no parent image)\n");
3356
3357 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3358 (unsigned long long) spec->pool_id, spec->pool_name);
3359 if (count < 0)
3360 return count;
3361 bufp += count;
3362
3363 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3364 spec->image_name ? spec->image_name : "(unknown)");
3365 if (count < 0)
3366 return count;
3367 bufp += count;
3368
3369 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3370 (unsigned long long) spec->snap_id, spec->snap_name);
3371 if (count < 0)
3372 return count;
3373 bufp += count;
3374
3375 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3376 if (count < 0)
3377 return count;
3378 bufp += count;
3379
3380 return (ssize_t) (bufp - buf);
3381}
3382
dfc5606d
YS
3383static ssize_t rbd_image_refresh(struct device *dev,
3384 struct device_attribute *attr,
3385 const char *buf,
3386 size_t size)
3387{
593a9e7b 3388 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3389 int ret;
602adf40 3390
cc4a38bd 3391 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3392 if (ret)
3393 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3394
3395 return ret < 0 ? ret : size;
dfc5606d 3396}
602adf40 3397
dfc5606d 3398static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3399static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3400static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3401static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3402static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3403static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3404static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3405static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3406static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3407static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3408static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3409
3410static struct attribute *rbd_attrs[] = {
3411 &dev_attr_size.attr,
34b13184 3412 &dev_attr_features.attr,
dfc5606d
YS
3413 &dev_attr_major.attr,
3414 &dev_attr_client_id.attr,
3415 &dev_attr_pool.attr,
9bb2f334 3416 &dev_attr_pool_id.attr,
dfc5606d 3417 &dev_attr_name.attr,
589d30e0 3418 &dev_attr_image_id.attr,
dfc5606d 3419 &dev_attr_current_snap.attr,
86b00e0d 3420 &dev_attr_parent.attr,
dfc5606d 3421 &dev_attr_refresh.attr,
dfc5606d
YS
3422 NULL
3423};
3424
3425static struct attribute_group rbd_attr_group = {
3426 .attrs = rbd_attrs,
3427};
3428
3429static const struct attribute_group *rbd_attr_groups[] = {
3430 &rbd_attr_group,
3431 NULL
3432};
3433
3434static void rbd_sysfs_dev_release(struct device *dev)
3435{
3436}
3437
3438static struct device_type rbd_device_type = {
3439 .name = "rbd",
3440 .groups = rbd_attr_groups,
3441 .release = rbd_sysfs_dev_release,
3442};
3443
8b8fb99c
AE
3444static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3445{
3446 kref_get(&spec->kref);
3447
3448 return spec;
3449}
3450
3451static void rbd_spec_free(struct kref *kref);
3452static void rbd_spec_put(struct rbd_spec *spec)
3453{
3454 if (spec)
3455 kref_put(&spec->kref, rbd_spec_free);
3456}
3457
3458static struct rbd_spec *rbd_spec_alloc(void)
3459{
3460 struct rbd_spec *spec;
3461
3462 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3463 if (!spec)
3464 return NULL;
3465 kref_init(&spec->kref);
3466
8b8fb99c
AE
3467 return spec;
3468}
3469
3470static void rbd_spec_free(struct kref *kref)
3471{
3472 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3473
3474 kfree(spec->pool_name);
3475 kfree(spec->image_id);
3476 kfree(spec->image_name);
3477 kfree(spec->snap_name);
3478 kfree(spec);
3479}
3480
cc344fa1 3481static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3482 struct rbd_spec *spec)
3483{
3484 struct rbd_device *rbd_dev;
3485
3486 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3487 if (!rbd_dev)
3488 return NULL;
3489
3490 spin_lock_init(&rbd_dev->lock);
6d292906 3491 rbd_dev->flags = 0;
c53d5893 3492 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3493 init_rwsem(&rbd_dev->header_rwsem);
3494
3495 rbd_dev->spec = spec;
3496 rbd_dev->rbd_client = rbdc;
3497
0903e875
AE
3498 /* Initialize the layout used for all rbd requests */
3499
3500 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3501 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3502 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3503 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3504
c53d5893
AE
3505 return rbd_dev;
3506}
3507
3508static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3509{
c53d5893
AE
3510 rbd_put_client(rbd_dev->rbd_client);
3511 rbd_spec_put(rbd_dev->spec);
3512 kfree(rbd_dev);
3513}
3514
9d475de5
AE
3515/*
3516 * Get the size and object order for an image snapshot, or if
3517 * snap_id is CEPH_NOSNAP, gets this information for the base
3518 * image.
3519 */
3520static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3521 u8 *order, u64 *snap_size)
3522{
3523 __le64 snapid = cpu_to_le64(snap_id);
3524 int ret;
3525 struct {
3526 u8 order;
3527 __le64 size;
3528 } __attribute__ ((packed)) size_buf = { 0 };
3529
36be9a76 3530 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3531 "rbd", "get_size",
4157976b 3532 &snapid, sizeof (snapid),
e2a58ee5 3533 &size_buf, sizeof (size_buf));
36be9a76 3534 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3535 if (ret < 0)
3536 return ret;
57385b51
AE
3537 if (ret < sizeof (size_buf))
3538 return -ERANGE;
9d475de5 3539
c86f86e9
AE
3540 if (order)
3541 *order = size_buf.order;
9d475de5
AE
3542 *snap_size = le64_to_cpu(size_buf.size);
3543
3544 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3545 (unsigned long long)snap_id, (unsigned int)*order,
3546 (unsigned long long)*snap_size);
9d475de5
AE
3547
3548 return 0;
3549}
3550
3551static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3552{
3553 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3554 &rbd_dev->header.obj_order,
3555 &rbd_dev->header.image_size);
3556}
3557
1e130199
AE
3558static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3559{
3560 void *reply_buf;
3561 int ret;
3562 void *p;
3563
3564 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3565 if (!reply_buf)
3566 return -ENOMEM;
3567
36be9a76 3568 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3569 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3570 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3571 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3572 if (ret < 0)
3573 goto out;
3574
3575 p = reply_buf;
3576 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3577 p + ret, NULL, GFP_NOIO);
3578 ret = 0;
1e130199
AE
3579
3580 if (IS_ERR(rbd_dev->header.object_prefix)) {
3581 ret = PTR_ERR(rbd_dev->header.object_prefix);
3582 rbd_dev->header.object_prefix = NULL;
3583 } else {
3584 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3585 }
1e130199
AE
3586out:
3587 kfree(reply_buf);
3588
3589 return ret;
3590}
3591
b1b5402a
AE
3592static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3593 u64 *snap_features)
3594{
3595 __le64 snapid = cpu_to_le64(snap_id);
3596 struct {
3597 __le64 features;
3598 __le64 incompat;
4157976b 3599 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3600 u64 incompat;
b1b5402a
AE
3601 int ret;
3602
36be9a76 3603 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3604 "rbd", "get_features",
4157976b 3605 &snapid, sizeof (snapid),
e2a58ee5 3606 &features_buf, sizeof (features_buf));
36be9a76 3607 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3608 if (ret < 0)
3609 return ret;
57385b51
AE
3610 if (ret < sizeof (features_buf))
3611 return -ERANGE;
d889140c
AE
3612
3613 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3614 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3615 return -ENXIO;
d889140c 3616
b1b5402a
AE
3617 *snap_features = le64_to_cpu(features_buf.features);
3618
3619 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3620 (unsigned long long)snap_id,
3621 (unsigned long long)*snap_features,
3622 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3623
3624 return 0;
3625}
3626
3627static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3628{
3629 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3630 &rbd_dev->header.features);
3631}
3632
86b00e0d
AE
3633static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3634{
3635 struct rbd_spec *parent_spec;
3636 size_t size;
3637 void *reply_buf = NULL;
3638 __le64 snapid;
3639 void *p;
3640 void *end;
3641 char *image_id;
3642 u64 overlap;
86b00e0d
AE
3643 int ret;
3644
3645 parent_spec = rbd_spec_alloc();
3646 if (!parent_spec)
3647 return -ENOMEM;
3648
3649 size = sizeof (__le64) + /* pool_id */
3650 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3651 sizeof (__le64) + /* snap_id */
3652 sizeof (__le64); /* overlap */
3653 reply_buf = kmalloc(size, GFP_KERNEL);
3654 if (!reply_buf) {
3655 ret = -ENOMEM;
3656 goto out_err;
3657 }
3658
3659 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3660 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3661 "rbd", "get_parent",
4157976b 3662 &snapid, sizeof (snapid),
e2a58ee5 3663 reply_buf, size);
36be9a76 3664 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3665 if (ret < 0)
3666 goto out_err;
3667
86b00e0d 3668 p = reply_buf;
57385b51
AE
3669 end = reply_buf + ret;
3670 ret = -ERANGE;
86b00e0d
AE
3671 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3672 if (parent_spec->pool_id == CEPH_NOPOOL)
3673 goto out; /* No parent? No problem. */
3674
0903e875
AE
3675 /* The ceph file layout needs to fit pool id in 32 bits */
3676
3677 ret = -EIO;
c0cd10db
AE
3678 if (parent_spec->pool_id > (u64)U32_MAX) {
3679 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3680 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3681 goto out_err;
c0cd10db 3682 }
0903e875 3683
979ed480 3684 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3685 if (IS_ERR(image_id)) {
3686 ret = PTR_ERR(image_id);
3687 goto out_err;
3688 }
3689 parent_spec->image_id = image_id;
3690 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3691 ceph_decode_64_safe(&p, end, overlap, out_err);
3692
3693 rbd_dev->parent_overlap = overlap;
3694 rbd_dev->parent_spec = parent_spec;
3695 parent_spec = NULL; /* rbd_dev now owns this */
3696out:
3697 ret = 0;
3698out_err:
3699 kfree(reply_buf);
3700 rbd_spec_put(parent_spec);
3701
3702 return ret;
3703}
3704
cc070d59
AE
3705static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3706{
3707 struct {
3708 __le64 stripe_unit;
3709 __le64 stripe_count;
3710 } __attribute__ ((packed)) striping_info_buf = { 0 };
3711 size_t size = sizeof (striping_info_buf);
3712 void *p;
3713 u64 obj_size;
3714 u64 stripe_unit;
3715 u64 stripe_count;
3716 int ret;
3717
3718 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3719 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3720 (char *)&striping_info_buf, size);
cc070d59
AE
3721 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3722 if (ret < 0)
3723 return ret;
3724 if (ret < size)
3725 return -ERANGE;
3726
3727 /*
3728 * We don't actually support the "fancy striping" feature
3729 * (STRIPINGV2) yet, but if the striping sizes are the
3730 * defaults the behavior is the same as before. So find
3731 * out, and only fail if the image has non-default values.
3732 */
3733 ret = -EINVAL;
3734 obj_size = (u64)1 << rbd_dev->header.obj_order;
3735 p = &striping_info_buf;
3736 stripe_unit = ceph_decode_64(&p);
3737 if (stripe_unit != obj_size) {
3738 rbd_warn(rbd_dev, "unsupported stripe unit "
3739 "(got %llu want %llu)",
3740 stripe_unit, obj_size);
3741 return -EINVAL;
3742 }
3743 stripe_count = ceph_decode_64(&p);
3744 if (stripe_count != 1) {
3745 rbd_warn(rbd_dev, "unsupported stripe count "
3746 "(got %llu want 1)", stripe_count);
3747 return -EINVAL;
3748 }
500d0c0f
AE
3749 rbd_dev->header.stripe_unit = stripe_unit;
3750 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3751
3752 return 0;
3753}
3754
9e15b77d
AE
3755static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3756{
3757 size_t image_id_size;
3758 char *image_id;
3759 void *p;
3760 void *end;
3761 size_t size;
3762 void *reply_buf = NULL;
3763 size_t len = 0;
3764 char *image_name = NULL;
3765 int ret;
3766
3767 rbd_assert(!rbd_dev->spec->image_name);
3768
69e7a02f
AE
3769 len = strlen(rbd_dev->spec->image_id);
3770 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3771 image_id = kmalloc(image_id_size, GFP_KERNEL);
3772 if (!image_id)
3773 return NULL;
3774
3775 p = image_id;
4157976b 3776 end = image_id + image_id_size;
57385b51 3777 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3778
3779 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3780 reply_buf = kmalloc(size, GFP_KERNEL);
3781 if (!reply_buf)
3782 goto out;
3783
36be9a76 3784 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3785 "rbd", "dir_get_name",
3786 image_id, image_id_size,
e2a58ee5 3787 reply_buf, size);
9e15b77d
AE
3788 if (ret < 0)
3789 goto out;
3790 p = reply_buf;
f40eb349
AE
3791 end = reply_buf + ret;
3792
9e15b77d
AE
3793 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3794 if (IS_ERR(image_name))
3795 image_name = NULL;
3796 else
3797 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3798out:
3799 kfree(reply_buf);
3800 kfree(image_id);
3801
3802 return image_name;
3803}
3804
2ad3d716
AE
3805static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3806{
3807 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3808 const char *snap_name;
3809 u32 which = 0;
3810
3811 /* Skip over names until we find the one we are looking for */
3812
3813 snap_name = rbd_dev->header.snap_names;
3814 while (which < snapc->num_snaps) {
3815 if (!strcmp(name, snap_name))
3816 return snapc->snaps[which];
3817 snap_name += strlen(snap_name) + 1;
3818 which++;
3819 }
3820 return CEPH_NOSNAP;
3821}
3822
3823static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3824{
3825 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3826 u32 which;
3827 bool found = false;
3828 u64 snap_id;
3829
3830 for (which = 0; !found && which < snapc->num_snaps; which++) {
3831 const char *snap_name;
3832
3833 snap_id = snapc->snaps[which];
3834 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3835 if (IS_ERR(snap_name))
3836 break;
3837 found = !strcmp(name, snap_name);
3838 kfree(snap_name);
3839 }
3840 return found ? snap_id : CEPH_NOSNAP;
3841}
3842
3843/*
3844 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3845 * no snapshot by that name is found, or if an error occurs.
3846 */
3847static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3848{
3849 if (rbd_dev->image_format == 1)
3850 return rbd_v1_snap_id_by_name(rbd_dev, name);
3851
3852 return rbd_v2_snap_id_by_name(rbd_dev, name);
3853}
3854
9e15b77d 3855/*
2e9f7f1c
AE
3856 * When an rbd image has a parent image, it is identified by the
3857 * pool, image, and snapshot ids (not names). This function fills
3858 * in the names for those ids. (It's OK if we can't figure out the
3859 * name for an image id, but the pool and snapshot ids should always
3860 * exist and have names.) All names in an rbd spec are dynamically
3861 * allocated.
e1d4213f
AE
3862 *
3863 * When an image being mapped (not a parent) is probed, we have the
3864 * pool name and pool id, image name and image id, and the snapshot
3865 * name. The only thing we're missing is the snapshot id.
9e15b77d 3866 */
2e9f7f1c 3867static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3868{
2e9f7f1c
AE
3869 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3870 struct rbd_spec *spec = rbd_dev->spec;
3871 const char *pool_name;
3872 const char *image_name;
3873 const char *snap_name;
9e15b77d
AE
3874 int ret;
3875
e1d4213f
AE
3876 /*
3877 * An image being mapped will have the pool name (etc.), but
3878 * we need to look up the snapshot id.
3879 */
2e9f7f1c
AE
3880 if (spec->pool_name) {
3881 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 3882 u64 snap_id;
e1d4213f 3883
2ad3d716
AE
3884 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3885 if (snap_id == CEPH_NOSNAP)
e1d4213f 3886 return -ENOENT;
2ad3d716 3887 spec->snap_id = snap_id;
e1d4213f 3888 } else {
2e9f7f1c 3889 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3890 }
3891
3892 return 0;
3893 }
9e15b77d 3894
2e9f7f1c 3895 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3896
2e9f7f1c
AE
3897 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3898 if (!pool_name) {
3899 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3900 return -EIO;
3901 }
2e9f7f1c
AE
3902 pool_name = kstrdup(pool_name, GFP_KERNEL);
3903 if (!pool_name)
9e15b77d
AE
3904 return -ENOMEM;
3905
3906 /* Fetch the image name; tolerate failure here */
3907
2e9f7f1c
AE
3908 image_name = rbd_dev_image_name(rbd_dev);
3909 if (!image_name)
06ecc6cb 3910 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3911
2e9f7f1c 3912 /* Look up the snapshot name, and make a copy */
9e15b77d 3913
2e9f7f1c 3914 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
3915 if (!snap_name) {
3916 ret = -ENOMEM;
9e15b77d 3917 goto out_err;
2e9f7f1c
AE
3918 }
3919
3920 spec->pool_name = pool_name;
3921 spec->image_name = image_name;
3922 spec->snap_name = snap_name;
9e15b77d
AE
3923
3924 return 0;
3925out_err:
2e9f7f1c
AE
3926 kfree(image_name);
3927 kfree(pool_name);
9e15b77d
AE
3928
3929 return ret;
3930}
3931
cc4a38bd 3932static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3933{
3934 size_t size;
3935 int ret;
3936 void *reply_buf;
3937 void *p;
3938 void *end;
3939 u64 seq;
3940 u32 snap_count;
3941 struct ceph_snap_context *snapc;
3942 u32 i;
3943
3944 /*
3945 * We'll need room for the seq value (maximum snapshot id),
3946 * snapshot count, and array of that many snapshot ids.
3947 * For now we have a fixed upper limit on the number we're
3948 * prepared to receive.
3949 */
3950 size = sizeof (__le64) + sizeof (__le32) +
3951 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3952 reply_buf = kzalloc(size, GFP_KERNEL);
3953 if (!reply_buf)
3954 return -ENOMEM;
3955
36be9a76 3956 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3957 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 3958 reply_buf, size);
36be9a76 3959 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3960 if (ret < 0)
3961 goto out;
3962
35d489f9 3963 p = reply_buf;
57385b51
AE
3964 end = reply_buf + ret;
3965 ret = -ERANGE;
35d489f9
AE
3966 ceph_decode_64_safe(&p, end, seq, out);
3967 ceph_decode_32_safe(&p, end, snap_count, out);
3968
3969 /*
3970 * Make sure the reported number of snapshot ids wouldn't go
3971 * beyond the end of our buffer. But before checking that,
3972 * make sure the computed size of the snapshot context we
3973 * allocate is representable in a size_t.
3974 */
3975 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3976 / sizeof (u64)) {
3977 ret = -EINVAL;
3978 goto out;
3979 }
3980 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3981 goto out;
468521c1 3982 ret = 0;
35d489f9 3983
812164f8 3984 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3985 if (!snapc) {
3986 ret = -ENOMEM;
3987 goto out;
3988 }
35d489f9 3989 snapc->seq = seq;
35d489f9
AE
3990 for (i = 0; i < snap_count; i++)
3991 snapc->snaps[i] = ceph_decode_64(&p);
3992
49ece554 3993 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
3994 rbd_dev->header.snapc = snapc;
3995
3996 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3997 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3998out:
3999 kfree(reply_buf);
4000
57385b51 4001 return ret;
35d489f9
AE
4002}
4003
54cac61f
AE
4004static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4005 u64 snap_id)
b8b1e2db
AE
4006{
4007 size_t size;
4008 void *reply_buf;
54cac61f 4009 __le64 snapid;
b8b1e2db
AE
4010 int ret;
4011 void *p;
4012 void *end;
b8b1e2db
AE
4013 char *snap_name;
4014
4015 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4016 reply_buf = kmalloc(size, GFP_KERNEL);
4017 if (!reply_buf)
4018 return ERR_PTR(-ENOMEM);
4019
54cac61f 4020 snapid = cpu_to_le64(snap_id);
36be9a76 4021 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4022 "rbd", "get_snapshot_name",
54cac61f 4023 &snapid, sizeof (snapid),
e2a58ee5 4024 reply_buf, size);
36be9a76 4025 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4026 if (ret < 0) {
4027 snap_name = ERR_PTR(ret);
b8b1e2db 4028 goto out;
f40eb349 4029 }
b8b1e2db
AE
4030
4031 p = reply_buf;
f40eb349 4032 end = reply_buf + ret;
e5c35534 4033 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4034 if (IS_ERR(snap_name))
b8b1e2db 4035 goto out;
b8b1e2db 4036
f40eb349 4037 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4038 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4039out:
4040 kfree(reply_buf);
4041
f40eb349 4042 return snap_name;
b8b1e2db
AE
4043}
4044
cc4a38bd 4045static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
117973fb
AE
4046{
4047 int ret;
117973fb
AE
4048
4049 down_write(&rbd_dev->header_rwsem);
4050
117973fb
AE
4051 ret = rbd_dev_v2_image_size(rbd_dev);
4052 if (ret)
4053 goto out;
29334ba4
AE
4054 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4055 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4056 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4057
cc4a38bd 4058 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
4059 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4060 if (ret)
4061 goto out;
117973fb
AE
4062out:
4063 up_write(&rbd_dev->header_rwsem);
4064
4065 return ret;
4066}
4067
dfc5606d
YS
4068static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4069{
dfc5606d 4070 struct device *dev;
cd789ab9 4071 int ret;
dfc5606d
YS
4072
4073 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4074
cd789ab9 4075 dev = &rbd_dev->dev;
dfc5606d
YS
4076 dev->bus = &rbd_bus_type;
4077 dev->type = &rbd_device_type;
4078 dev->parent = &rbd_root_dev;
200a6a8b 4079 dev->release = rbd_dev_device_release;
de71a297 4080 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4081 ret = device_register(dev);
dfc5606d 4082
dfc5606d 4083 mutex_unlock(&ctl_mutex);
cd789ab9 4084
dfc5606d 4085 return ret;
602adf40
YS
4086}
4087
dfc5606d
YS
4088static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4089{
4090 device_unregister(&rbd_dev->dev);
4091}
4092
e2839308 4093static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4094
4095/*
499afd5b
AE
4096 * Get a unique rbd identifier for the given new rbd_dev, and add
4097 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4098 */
e2839308 4099static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4100{
e2839308 4101 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4102
4103 spin_lock(&rbd_dev_list_lock);
4104 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4105 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4106 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4107 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4108}
b7f23c36 4109
1ddbe94e 4110/*
499afd5b
AE
4111 * Remove an rbd_dev from the global list, and record that its
4112 * identifier is no longer in use.
1ddbe94e 4113 */
e2839308 4114static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4115{
d184f6bf 4116 struct list_head *tmp;
de71a297 4117 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4118 int max_id;
4119
aafb230e 4120 rbd_assert(rbd_id > 0);
499afd5b 4121
e2839308
AE
4122 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4123 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4124 spin_lock(&rbd_dev_list_lock);
4125 list_del_init(&rbd_dev->node);
d184f6bf
AE
4126
4127 /*
4128 * If the id being "put" is not the current maximum, there
4129 * is nothing special we need to do.
4130 */
e2839308 4131 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4132 spin_unlock(&rbd_dev_list_lock);
4133 return;
4134 }
4135
4136 /*
4137 * We need to update the current maximum id. Search the
4138 * list to find out what it is. We're more likely to find
4139 * the maximum at the end, so search the list backward.
4140 */
4141 max_id = 0;
4142 list_for_each_prev(tmp, &rbd_dev_list) {
4143 struct rbd_device *rbd_dev;
4144
4145 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4146 if (rbd_dev->dev_id > max_id)
4147 max_id = rbd_dev->dev_id;
d184f6bf 4148 }
499afd5b 4149 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4150
1ddbe94e 4151 /*
e2839308 4152 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4153 * which case it now accurately reflects the new maximum.
4154 * Be careful not to overwrite the maximum value in that
4155 * case.
1ddbe94e 4156 */
e2839308
AE
4157 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4158 dout(" max dev id has been reset\n");
b7f23c36
AE
4159}
4160
e28fff26
AE
4161/*
4162 * Skips over white space at *buf, and updates *buf to point to the
4163 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4164 * the token (string of non-white space characters) found. Note
4165 * that *buf must be terminated with '\0'.
e28fff26
AE
4166 */
4167static inline size_t next_token(const char **buf)
4168{
4169 /*
4170 * These are the characters that produce nonzero for
4171 * isspace() in the "C" and "POSIX" locales.
4172 */
4173 const char *spaces = " \f\n\r\t\v";
4174
4175 *buf += strspn(*buf, spaces); /* Find start of token */
4176
4177 return strcspn(*buf, spaces); /* Return token length */
4178}
4179
4180/*
4181 * Finds the next token in *buf, and if the provided token buffer is
4182 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4183 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4184 * must be terminated with '\0' on entry.
e28fff26
AE
4185 *
4186 * Returns the length of the token found (not including the '\0').
4187 * Return value will be 0 if no token is found, and it will be >=
4188 * token_size if the token would not fit.
4189 *
593a9e7b 4190 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4191 * found token. Note that this occurs even if the token buffer is
4192 * too small to hold it.
4193 */
4194static inline size_t copy_token(const char **buf,
4195 char *token,
4196 size_t token_size)
4197{
4198 size_t len;
4199
4200 len = next_token(buf);
4201 if (len < token_size) {
4202 memcpy(token, *buf, len);
4203 *(token + len) = '\0';
4204 }
4205 *buf += len;
4206
4207 return len;
4208}
4209
ea3352f4
AE
4210/*
4211 * Finds the next token in *buf, dynamically allocates a buffer big
4212 * enough to hold a copy of it, and copies the token into the new
4213 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4214 * that a duplicate buffer is created even for a zero-length token.
4215 *
4216 * Returns a pointer to the newly-allocated duplicate, or a null
4217 * pointer if memory for the duplicate was not available. If
4218 * the lenp argument is a non-null pointer, the length of the token
4219 * (not including the '\0') is returned in *lenp.
4220 *
4221 * If successful, the *buf pointer will be updated to point beyond
4222 * the end of the found token.
4223 *
4224 * Note: uses GFP_KERNEL for allocation.
4225 */
4226static inline char *dup_token(const char **buf, size_t *lenp)
4227{
4228 char *dup;
4229 size_t len;
4230
4231 len = next_token(buf);
4caf35f9 4232 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4233 if (!dup)
4234 return NULL;
ea3352f4
AE
4235 *(dup + len) = '\0';
4236 *buf += len;
4237
4238 if (lenp)
4239 *lenp = len;
4240
4241 return dup;
4242}
4243
a725f65e 4244/*
859c31df
AE
4245 * Parse the options provided for an "rbd add" (i.e., rbd image
4246 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4247 * and the data written is passed here via a NUL-terminated buffer.
4248 * Returns 0 if successful or an error code otherwise.
d22f76e7 4249 *
859c31df
AE
4250 * The information extracted from these options is recorded in
4251 * the other parameters which return dynamically-allocated
4252 * structures:
4253 * ceph_opts
4254 * The address of a pointer that will refer to a ceph options
4255 * structure. Caller must release the returned pointer using
4256 * ceph_destroy_options() when it is no longer needed.
4257 * rbd_opts
4258 * Address of an rbd options pointer. Fully initialized by
4259 * this function; caller must release with kfree().
4260 * spec
4261 * Address of an rbd image specification pointer. Fully
4262 * initialized by this function based on parsed options.
4263 * Caller must release with rbd_spec_put().
4264 *
4265 * The options passed take this form:
4266 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4267 * where:
4268 * <mon_addrs>
4269 * A comma-separated list of one or more monitor addresses.
4270 * A monitor address is an ip address, optionally followed
4271 * by a port number (separated by a colon).
4272 * I.e.: ip1[:port1][,ip2[:port2]...]
4273 * <options>
4274 * A comma-separated list of ceph and/or rbd options.
4275 * <pool_name>
4276 * The name of the rados pool containing the rbd image.
4277 * <image_name>
4278 * The name of the image in that pool to map.
4279 * <snap_id>
4280 * An optional snapshot id. If provided, the mapping will
4281 * present data from the image at the time that snapshot was
4282 * created. The image head is used if no snapshot id is
4283 * provided. Snapshot mappings are always read-only.
a725f65e 4284 */
859c31df 4285static int rbd_add_parse_args(const char *buf,
dc79b113 4286 struct ceph_options **ceph_opts,
859c31df
AE
4287 struct rbd_options **opts,
4288 struct rbd_spec **rbd_spec)
e28fff26 4289{
d22f76e7 4290 size_t len;
859c31df 4291 char *options;
0ddebc0c 4292 const char *mon_addrs;
ecb4dc22 4293 char *snap_name;
0ddebc0c 4294 size_t mon_addrs_size;
859c31df 4295 struct rbd_spec *spec = NULL;
4e9afeba 4296 struct rbd_options *rbd_opts = NULL;
859c31df 4297 struct ceph_options *copts;
dc79b113 4298 int ret;
e28fff26
AE
4299
4300 /* The first four tokens are required */
4301
7ef3214a 4302 len = next_token(&buf);
4fb5d671
AE
4303 if (!len) {
4304 rbd_warn(NULL, "no monitor address(es) provided");
4305 return -EINVAL;
4306 }
0ddebc0c 4307 mon_addrs = buf;
f28e565a 4308 mon_addrs_size = len + 1;
7ef3214a 4309 buf += len;
a725f65e 4310
dc79b113 4311 ret = -EINVAL;
f28e565a
AE
4312 options = dup_token(&buf, NULL);
4313 if (!options)
dc79b113 4314 return -ENOMEM;
4fb5d671
AE
4315 if (!*options) {
4316 rbd_warn(NULL, "no options provided");
4317 goto out_err;
4318 }
e28fff26 4319
859c31df
AE
4320 spec = rbd_spec_alloc();
4321 if (!spec)
f28e565a 4322 goto out_mem;
859c31df
AE
4323
4324 spec->pool_name = dup_token(&buf, NULL);
4325 if (!spec->pool_name)
4326 goto out_mem;
4fb5d671
AE
4327 if (!*spec->pool_name) {
4328 rbd_warn(NULL, "no pool name provided");
4329 goto out_err;
4330 }
e28fff26 4331
69e7a02f 4332 spec->image_name = dup_token(&buf, NULL);
859c31df 4333 if (!spec->image_name)
f28e565a 4334 goto out_mem;
4fb5d671
AE
4335 if (!*spec->image_name) {
4336 rbd_warn(NULL, "no image name provided");
4337 goto out_err;
4338 }
d4b125e9 4339
f28e565a
AE
4340 /*
4341 * Snapshot name is optional; default is to use "-"
4342 * (indicating the head/no snapshot).
4343 */
3feeb894 4344 len = next_token(&buf);
820a5f3e 4345 if (!len) {
3feeb894
AE
4346 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4347 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4348 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4349 ret = -ENAMETOOLONG;
f28e565a 4350 goto out_err;
849b4260 4351 }
ecb4dc22
AE
4352 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4353 if (!snap_name)
f28e565a 4354 goto out_mem;
ecb4dc22
AE
4355 *(snap_name + len) = '\0';
4356 spec->snap_name = snap_name;
e5c35534 4357
0ddebc0c 4358 /* Initialize all rbd options to the defaults */
e28fff26 4359
4e9afeba
AE
4360 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4361 if (!rbd_opts)
4362 goto out_mem;
4363
4364 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4365
859c31df 4366 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4367 mon_addrs + mon_addrs_size - 1,
4e9afeba 4368 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4369 if (IS_ERR(copts)) {
4370 ret = PTR_ERR(copts);
dc79b113
AE
4371 goto out_err;
4372 }
859c31df
AE
4373 kfree(options);
4374
4375 *ceph_opts = copts;
4e9afeba 4376 *opts = rbd_opts;
859c31df 4377 *rbd_spec = spec;
0ddebc0c 4378
dc79b113 4379 return 0;
f28e565a 4380out_mem:
dc79b113 4381 ret = -ENOMEM;
d22f76e7 4382out_err:
859c31df
AE
4383 kfree(rbd_opts);
4384 rbd_spec_put(spec);
f28e565a 4385 kfree(options);
d22f76e7 4386
dc79b113 4387 return ret;
a725f65e
AE
4388}
4389
589d30e0
AE
4390/*
4391 * An rbd format 2 image has a unique identifier, distinct from the
4392 * name given to it by the user. Internally, that identifier is
4393 * what's used to specify the names of objects related to the image.
4394 *
4395 * A special "rbd id" object is used to map an rbd image name to its
4396 * id. If that object doesn't exist, then there is no v2 rbd image
4397 * with the supplied name.
4398 *
4399 * This function will record the given rbd_dev's image_id field if
4400 * it can be determined, and in that case will return 0. If any
4401 * errors occur a negative errno will be returned and the rbd_dev's
4402 * image_id field will be unchanged (and should be NULL).
4403 */
4404static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4405{
4406 int ret;
4407 size_t size;
4408 char *object_name;
4409 void *response;
c0fba368 4410 char *image_id;
2f82ee54 4411
2c0d0a10
AE
4412 /*
4413 * When probing a parent image, the image id is already
4414 * known (and the image name likely is not). There's no
c0fba368
AE
4415 * need to fetch the image id again in this case. We
4416 * do still need to set the image format though.
2c0d0a10 4417 */
c0fba368
AE
4418 if (rbd_dev->spec->image_id) {
4419 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4420
2c0d0a10 4421 return 0;
c0fba368 4422 }
2c0d0a10 4423
589d30e0
AE
4424 /*
4425 * First, see if the format 2 image id file exists, and if
4426 * so, get the image's persistent id from it.
4427 */
69e7a02f 4428 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4429 object_name = kmalloc(size, GFP_NOIO);
4430 if (!object_name)
4431 return -ENOMEM;
0d7dbfce 4432 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4433 dout("rbd id object name is %s\n", object_name);
4434
4435 /* Response will be an encoded string, which includes a length */
4436
4437 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4438 response = kzalloc(size, GFP_NOIO);
4439 if (!response) {
4440 ret = -ENOMEM;
4441 goto out;
4442 }
4443
c0fba368
AE
4444 /* If it doesn't exist we'll assume it's a format 1 image */
4445
36be9a76 4446 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4447 "rbd", "get_id", NULL, 0,
e2a58ee5 4448 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4449 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4450 if (ret == -ENOENT) {
4451 image_id = kstrdup("", GFP_KERNEL);
4452 ret = image_id ? 0 : -ENOMEM;
4453 if (!ret)
4454 rbd_dev->image_format = 1;
4455 } else if (ret > sizeof (__le32)) {
4456 void *p = response;
4457
4458 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4459 NULL, GFP_NOIO);
c0fba368
AE
4460 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4461 if (!ret)
4462 rbd_dev->image_format = 2;
589d30e0 4463 } else {
c0fba368
AE
4464 ret = -EINVAL;
4465 }
4466
4467 if (!ret) {
4468 rbd_dev->spec->image_id = image_id;
4469 dout("image_id is %s\n", image_id);
589d30e0
AE
4470 }
4471out:
4472 kfree(response);
4473 kfree(object_name);
4474
4475 return ret;
4476}
4477
6fd48b3b
AE
4478/* Undo whatever state changes are made by v1 or v2 image probe */
4479
4480static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4481{
4482 struct rbd_image_header *header;
4483
4484 rbd_dev_remove_parent(rbd_dev);
4485 rbd_spec_put(rbd_dev->parent_spec);
4486 rbd_dev->parent_spec = NULL;
4487 rbd_dev->parent_overlap = 0;
4488
4489 /* Free dynamic fields from the header, then zero it out */
4490
4491 header = &rbd_dev->header;
812164f8 4492 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4493 kfree(header->snap_sizes);
4494 kfree(header->snap_names);
4495 kfree(header->object_prefix);
4496 memset(header, 0, sizeof (*header));
4497}
4498
a30b71b9
AE
4499static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4500{
4501 int ret;
a30b71b9
AE
4502
4503 /* Populate rbd image metadata */
4504
4505 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4506 if (ret < 0)
4507 goto out_err;
86b00e0d
AE
4508
4509 /* Version 1 images have no parent (no layering) */
4510
4511 rbd_dev->parent_spec = NULL;
4512 rbd_dev->parent_overlap = 0;
4513
a30b71b9
AE
4514 dout("discovered version 1 image, header name is %s\n",
4515 rbd_dev->header_name);
4516
4517 return 0;
4518
4519out_err:
4520 kfree(rbd_dev->header_name);
4521 rbd_dev->header_name = NULL;
0d7dbfce
AE
4522 kfree(rbd_dev->spec->image_id);
4523 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4524
4525 return ret;
4526}
4527
4528static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4529{
9d475de5 4530 int ret;
a30b71b9 4531
9d475de5 4532 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4533 if (ret)
1e130199
AE
4534 goto out_err;
4535
4536 /* Get the object prefix (a.k.a. block_name) for the image */
4537
4538 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4539 if (ret)
b1b5402a
AE
4540 goto out_err;
4541
d889140c 4542 /* Get the and check features for the image */
b1b5402a
AE
4543
4544 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4545 if (ret)
9d475de5 4546 goto out_err;
35d489f9 4547
86b00e0d
AE
4548 /* If the image supports layering, get the parent info */
4549
4550 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4551 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4552 if (ret)
86b00e0d 4553 goto out_err;
96882f55 4554 /*
c734b796
AE
4555 * Print a warning if this image has a parent.
4556 * Don't print it if the image now being probed
4557 * is itself a parent. We can tell at this point
4558 * because we won't know its pool name yet (just its
4559 * pool id).
96882f55 4560 */
c734b796 4561 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
96882f55
AE
4562 rbd_warn(rbd_dev, "WARNING: kernel layering "
4563 "is EXPERIMENTAL!");
86b00e0d
AE
4564 }
4565
cc070d59
AE
4566 /* If the image supports fancy striping, get its parameters */
4567
4568 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4569 ret = rbd_dev_v2_striping_info(rbd_dev);
4570 if (ret < 0)
4571 goto out_err;
4572 }
4573
6e14b1a6
AE
4574 /* crypto and compression type aren't (yet) supported for v2 images */
4575
4576 rbd_dev->header.crypt_type = 0;
4577 rbd_dev->header.comp_type = 0;
35d489f9 4578
6e14b1a6
AE
4579 /* Get the snapshot context, plus the header version */
4580
cc4a38bd 4581 ret = rbd_dev_v2_snap_context(rbd_dev);
35d489f9
AE
4582 if (ret)
4583 goto out_err;
6e14b1a6 4584
a30b71b9
AE
4585 dout("discovered version 2 image, header name is %s\n",
4586 rbd_dev->header_name);
4587
35152979 4588 return 0;
9d475de5 4589out_err:
86b00e0d
AE
4590 rbd_dev->parent_overlap = 0;
4591 rbd_spec_put(rbd_dev->parent_spec);
4592 rbd_dev->parent_spec = NULL;
9d475de5
AE
4593 kfree(rbd_dev->header_name);
4594 rbd_dev->header_name = NULL;
1e130199
AE
4595 kfree(rbd_dev->header.object_prefix);
4596 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4597
4598 return ret;
a30b71b9
AE
4599}
4600
124afba2 4601static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4602{
2f82ee54 4603 struct rbd_device *parent = NULL;
124afba2
AE
4604 struct rbd_spec *parent_spec;
4605 struct rbd_client *rbdc;
4606 int ret;
4607
4608 if (!rbd_dev->parent_spec)
4609 return 0;
4610 /*
4611 * We need to pass a reference to the client and the parent
4612 * spec when creating the parent rbd_dev. Images related by
4613 * parent/child relationships always share both.
4614 */
4615 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4616 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4617
4618 ret = -ENOMEM;
4619 parent = rbd_dev_create(rbdc, parent_spec);
4620 if (!parent)
4621 goto out_err;
4622
4623 ret = rbd_dev_image_probe(parent);
4624 if (ret < 0)
4625 goto out_err;
4626 rbd_dev->parent = parent;
4627
4628 return 0;
4629out_err:
4630 if (parent) {
4631 rbd_spec_put(rbd_dev->parent_spec);
4632 kfree(rbd_dev->header_name);
4633 rbd_dev_destroy(parent);
4634 } else {
4635 rbd_put_client(rbdc);
4636 rbd_spec_put(parent_spec);
4637 }
4638
4639 return ret;
4640}
4641
200a6a8b 4642static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4643{
83a06263 4644 int ret;
d1cf5788
AE
4645
4646 ret = rbd_dev_mapping_set(rbd_dev);
83a06263 4647 if (ret)
9bb81c9b 4648 return ret;
5de10f3b 4649
83a06263
AE
4650 /* generate unique id: find highest unique id, add one */
4651 rbd_dev_id_get(rbd_dev);
4652
4653 /* Fill in the device name, now that we have its id. */
4654 BUILD_BUG_ON(DEV_NAME_LEN
4655 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4656 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4657
4658 /* Get our block major device number. */
4659
4660 ret = register_blkdev(0, rbd_dev->name);
4661 if (ret < 0)
4662 goto err_out_id;
4663 rbd_dev->major = ret;
4664
4665 /* Set up the blkdev mapping. */
4666
4667 ret = rbd_init_disk(rbd_dev);
4668 if (ret)
4669 goto err_out_blkdev;
4670
4671 ret = rbd_bus_add_dev(rbd_dev);
4672 if (ret)
4673 goto err_out_disk;
4674
83a06263
AE
4675 /* Everything's ready. Announce the disk to the world. */
4676
b5156e76 4677 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
129b79d4 4678 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4679 add_disk(rbd_dev->disk);
4680
4681 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4682 (unsigned long long) rbd_dev->mapping.size);
4683
4684 return ret;
2f82ee54 4685
83a06263
AE
4686err_out_disk:
4687 rbd_free_disk(rbd_dev);
4688err_out_blkdev:
4689 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4690err_out_id:
4691 rbd_dev_id_put(rbd_dev);
d1cf5788 4692 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4693
4694 return ret;
4695}
4696
332bb12d
AE
4697static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4698{
4699 struct rbd_spec *spec = rbd_dev->spec;
4700 size_t size;
4701
4702 /* Record the header object name for this rbd image. */
4703
4704 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4705
4706 if (rbd_dev->image_format == 1)
4707 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4708 else
4709 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4710
4711 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4712 if (!rbd_dev->header_name)
4713 return -ENOMEM;
4714
4715 if (rbd_dev->image_format == 1)
4716 sprintf(rbd_dev->header_name, "%s%s",
4717 spec->image_name, RBD_SUFFIX);
4718 else
4719 sprintf(rbd_dev->header_name, "%s%s",
4720 RBD_HEADER_PREFIX, spec->image_id);
4721 return 0;
4722}
4723
200a6a8b
AE
4724static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4725{
6fd48b3b
AE
4726 int ret;
4727
6fd48b3b
AE
4728 rbd_dev_unprobe(rbd_dev);
4729 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4730 if (ret)
4731 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4732 kfree(rbd_dev->header_name);
6fd48b3b
AE
4733 rbd_dev->header_name = NULL;
4734 rbd_dev->image_format = 0;
4735 kfree(rbd_dev->spec->image_id);
4736 rbd_dev->spec->image_id = NULL;
4737
200a6a8b
AE
4738 rbd_dev_destroy(rbd_dev);
4739}
4740
a30b71b9
AE
4741/*
4742 * Probe for the existence of the header object for the given rbd
4743 * device. For format 2 images this includes determining the image
4744 * id.
4745 */
71f293e2 4746static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
a30b71b9
AE
4747{
4748 int ret;
b644de2b 4749 int tmp;
a30b71b9
AE
4750
4751 /*
4752 * Get the id from the image id object. If it's not a
4753 * format 2 image, we'll get ENOENT back, and we'll assume
4754 * it's a format 1 image.
4755 */
4756 ret = rbd_dev_image_id(rbd_dev);
4757 if (ret)
c0fba368
AE
4758 return ret;
4759 rbd_assert(rbd_dev->spec->image_id);
4760 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4761
332bb12d
AE
4762 ret = rbd_dev_header_name(rbd_dev);
4763 if (ret)
4764 goto err_out_format;
4765
b644de2b
AE
4766 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4767 if (ret)
4768 goto out_header_name;
4769
c0fba368 4770 if (rbd_dev->image_format == 1)
a30b71b9
AE
4771 ret = rbd_dev_v1_probe(rbd_dev);
4772 else
4773 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4774 if (ret)
b644de2b 4775 goto err_out_watch;
83a06263 4776
9bb81c9b
AE
4777 ret = rbd_dev_spec_update(rbd_dev);
4778 if (ret)
33dca39f 4779 goto err_out_probe;
9bb81c9b
AE
4780
4781 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4782 if (!ret)
4783 return 0;
83a06263 4784
6fd48b3b
AE
4785err_out_probe:
4786 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4787err_out_watch:
4788 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4789 if (tmp)
4790 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4791out_header_name:
4792 kfree(rbd_dev->header_name);
4793 rbd_dev->header_name = NULL;
4794err_out_format:
4795 rbd_dev->image_format = 0;
5655c4d9
AE
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4798
4799 dout("probe failed, returning %d\n", ret);
4800
a30b71b9
AE
4801 return ret;
4802}
4803
59c2be1e
YS
4804static ssize_t rbd_add(struct bus_type *bus,
4805 const char *buf,
4806 size_t count)
602adf40 4807{
cb8627c7 4808 struct rbd_device *rbd_dev = NULL;
dc79b113 4809 struct ceph_options *ceph_opts = NULL;
4e9afeba 4810 struct rbd_options *rbd_opts = NULL;
859c31df 4811 struct rbd_spec *spec = NULL;
9d3997fd 4812 struct rbd_client *rbdc;
27cc2594
AE
4813 struct ceph_osd_client *osdc;
4814 int rc = -ENOMEM;
602adf40
YS
4815
4816 if (!try_module_get(THIS_MODULE))
4817 return -ENODEV;
4818
602adf40 4819 /* parse add command */
859c31df 4820 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4821 if (rc < 0)
bd4ba655 4822 goto err_out_module;
78cea76e 4823
9d3997fd
AE
4824 rbdc = rbd_get_client(ceph_opts);
4825 if (IS_ERR(rbdc)) {
4826 rc = PTR_ERR(rbdc);
0ddebc0c 4827 goto err_out_args;
9d3997fd 4828 }
c53d5893 4829 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4830
602adf40 4831 /* pick the pool */
9d3997fd 4832 osdc = &rbdc->client->osdc;
859c31df 4833 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4834 if (rc < 0)
4835 goto err_out_client;
c0cd10db 4836 spec->pool_id = (u64)rc;
859c31df 4837
0903e875
AE
4838 /* The ceph file layout needs to fit pool id in 32 bits */
4839
c0cd10db
AE
4840 if (spec->pool_id > (u64)U32_MAX) {
4841 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4842 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4843 rc = -EIO;
4844 goto err_out_client;
4845 }
4846
c53d5893 4847 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4848 if (!rbd_dev)
4849 goto err_out_client;
c53d5893
AE
4850 rbdc = NULL; /* rbd_dev now owns this */
4851 spec = NULL; /* rbd_dev now owns this */
602adf40 4852
bd4ba655 4853 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4854 kfree(rbd_opts);
4855 rbd_opts = NULL; /* done with this */
bd4ba655 4856
71f293e2 4857 rc = rbd_dev_image_probe(rbd_dev);
a30b71b9 4858 if (rc < 0)
c53d5893 4859 goto err_out_rbd_dev;
05fd6f6f 4860
b536f69a
AE
4861 rc = rbd_dev_device_setup(rbd_dev);
4862 if (!rc)
4863 return count;
4864
4865 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4866err_out_rbd_dev:
4867 rbd_dev_destroy(rbd_dev);
bd4ba655 4868err_out_client:
9d3997fd 4869 rbd_put_client(rbdc);
0ddebc0c 4870err_out_args:
78cea76e
AE
4871 if (ceph_opts)
4872 ceph_destroy_options(ceph_opts);
4e9afeba 4873 kfree(rbd_opts);
859c31df 4874 rbd_spec_put(spec);
bd4ba655
AE
4875err_out_module:
4876 module_put(THIS_MODULE);
27cc2594 4877
602adf40 4878 dout("Error adding device %s\n", buf);
27cc2594 4879
c0cd10db 4880 return (ssize_t)rc;
602adf40
YS
4881}
4882
de71a297 4883static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4884{
4885 struct list_head *tmp;
4886 struct rbd_device *rbd_dev;
4887
e124a82f 4888 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4889 list_for_each(tmp, &rbd_dev_list) {
4890 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4891 if (rbd_dev->dev_id == dev_id) {
e124a82f 4892 spin_unlock(&rbd_dev_list_lock);
602adf40 4893 return rbd_dev;
e124a82f 4894 }
602adf40 4895 }
e124a82f 4896 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4897 return NULL;
4898}
4899
200a6a8b 4900static void rbd_dev_device_release(struct device *dev)
602adf40 4901{
593a9e7b 4902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4903
602adf40 4904 rbd_free_disk(rbd_dev);
200a6a8b 4905 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 4906 rbd_dev_mapping_clear(rbd_dev);
602adf40 4907 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4908 rbd_dev->major = 0;
e2839308 4909 rbd_dev_id_put(rbd_dev);
d1cf5788 4910 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4911}
4912
05a46afd
AE
4913static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4914{
ad945fc1 4915 while (rbd_dev->parent) {
05a46afd
AE
4916 struct rbd_device *first = rbd_dev;
4917 struct rbd_device *second = first->parent;
4918 struct rbd_device *third;
4919
4920 /*
4921 * Follow to the parent with no grandparent and
4922 * remove it.
4923 */
4924 while (second && (third = second->parent)) {
4925 first = second;
4926 second = third;
4927 }
ad945fc1 4928 rbd_assert(second);
8ad42cd0 4929 rbd_dev_image_release(second);
ad945fc1
AE
4930 first->parent = NULL;
4931 first->parent_overlap = 0;
4932
4933 rbd_assert(first->parent_spec);
05a46afd
AE
4934 rbd_spec_put(first->parent_spec);
4935 first->parent_spec = NULL;
05a46afd
AE
4936 }
4937}
4938
dfc5606d
YS
4939static ssize_t rbd_remove(struct bus_type *bus,
4940 const char *buf,
4941 size_t count)
602adf40
YS
4942{
4943 struct rbd_device *rbd_dev = NULL;
0d8189e1 4944 int target_id;
602adf40 4945 unsigned long ul;
0d8189e1 4946 int ret;
602adf40 4947
0d8189e1
AE
4948 ret = strict_strtoul(buf, 10, &ul);
4949 if (ret)
4950 return ret;
602adf40
YS
4951
4952 /* convert to int; abort if we lost anything in the conversion */
4953 target_id = (int) ul;
4954 if (target_id != ul)
4955 return -EINVAL;
4956
4957 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4958
4959 rbd_dev = __rbd_get_dev(target_id);
4960 if (!rbd_dev) {
4961 ret = -ENOENT;
4962 goto done;
42382b70
AE
4963 }
4964
a14ea269 4965 spin_lock_irq(&rbd_dev->lock);
b82d167b 4966 if (rbd_dev->open_count)
42382b70 4967 ret = -EBUSY;
b82d167b
AE
4968 else
4969 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4970 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4971 if (ret < 0)
42382b70 4972 goto done;
0d8189e1 4973 ret = count;
b480815a 4974 rbd_bus_del_dev(rbd_dev);
8ad42cd0 4975 rbd_dev_image_release(rbd_dev);
79ab7558 4976 module_put(THIS_MODULE);
602adf40
YS
4977done:
4978 mutex_unlock(&ctl_mutex);
aafb230e 4979
602adf40
YS
4980 return ret;
4981}
4982
602adf40
YS
4983/*
4984 * create control files in sysfs
dfc5606d 4985 * /sys/bus/rbd/...
602adf40
YS
4986 */
4987static int rbd_sysfs_init(void)
4988{
dfc5606d 4989 int ret;
602adf40 4990
fed4c143 4991 ret = device_register(&rbd_root_dev);
21079786 4992 if (ret < 0)
dfc5606d 4993 return ret;
602adf40 4994
fed4c143
AE
4995 ret = bus_register(&rbd_bus_type);
4996 if (ret < 0)
4997 device_unregister(&rbd_root_dev);
602adf40 4998
602adf40
YS
4999 return ret;
5000}
5001
5002static void rbd_sysfs_cleanup(void)
5003{
dfc5606d 5004 bus_unregister(&rbd_bus_type);
fed4c143 5005 device_unregister(&rbd_root_dev);
602adf40
YS
5006}
5007
1c2a9dfe
AE
5008static int rbd_slab_init(void)
5009{
5010 rbd_assert(!rbd_img_request_cache);
5011 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012 sizeof (struct rbd_img_request),
5013 __alignof__(struct rbd_img_request),
5014 0, NULL);
868311b1
AE
5015 if (!rbd_img_request_cache)
5016 return -ENOMEM;
5017
5018 rbd_assert(!rbd_obj_request_cache);
5019 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020 sizeof (struct rbd_obj_request),
5021 __alignof__(struct rbd_obj_request),
5022 0, NULL);
78c2a44a
AE
5023 if (!rbd_obj_request_cache)
5024 goto out_err;
5025
5026 rbd_assert(!rbd_segment_name_cache);
5027 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029 if (rbd_segment_name_cache)
1c2a9dfe 5030 return 0;
78c2a44a
AE
5031out_err:
5032 if (rbd_obj_request_cache) {
5033 kmem_cache_destroy(rbd_obj_request_cache);
5034 rbd_obj_request_cache = NULL;
5035 }
1c2a9dfe 5036
868311b1
AE
5037 kmem_cache_destroy(rbd_img_request_cache);
5038 rbd_img_request_cache = NULL;
5039
1c2a9dfe
AE
5040 return -ENOMEM;
5041}
5042
5043static void rbd_slab_exit(void)
5044{
78c2a44a
AE
5045 rbd_assert(rbd_segment_name_cache);
5046 kmem_cache_destroy(rbd_segment_name_cache);
5047 rbd_segment_name_cache = NULL;
5048
868311b1
AE
5049 rbd_assert(rbd_obj_request_cache);
5050 kmem_cache_destroy(rbd_obj_request_cache);
5051 rbd_obj_request_cache = NULL;
5052
1c2a9dfe
AE
5053 rbd_assert(rbd_img_request_cache);
5054 kmem_cache_destroy(rbd_img_request_cache);
5055 rbd_img_request_cache = NULL;
5056}
5057
cc344fa1 5058static int __init rbd_init(void)
602adf40
YS
5059{
5060 int rc;
5061
1e32d34c
AE
5062 if (!libceph_compatible(NULL)) {
5063 rbd_warn(NULL, "libceph incompatibility (quitting)");
5064
5065 return -EINVAL;
5066 }
1c2a9dfe 5067 rc = rbd_slab_init();
602adf40
YS
5068 if (rc)
5069 return rc;
1c2a9dfe
AE
5070 rc = rbd_sysfs_init();
5071 if (rc)
5072 rbd_slab_exit();
5073 else
5074 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5075
5076 return rc;
602adf40
YS
5077}
5078
cc344fa1 5079static void __exit rbd_exit(void)
602adf40
YS
5080{
5081 rbd_sysfs_cleanup();
1c2a9dfe 5082 rbd_slab_exit();
602adf40
YS
5083}
5084
5085module_init(rbd_init);
5086module_exit(rbd_exit);
5087
5088MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090MODULE_DESCRIPTION("rados block device");
5091
5092/* following authorship retained from original osdblk.c */
5093MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5094
5095MODULE_LICENSE("GPL");
This page took 0.549562 seconds and 5 git commands to generate.