staging: lustre: echo: remove echo_env_info() regions from echo_client.c
[deliverable/linux.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2015, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49
50 #include "echo_internal.h"
51
52 /** \defgroup echo_client Echo Client
53 * @{
54 */
55
56 struct echo_device {
57 struct cl_device ed_cl;
58 struct echo_client_obd *ed_ec;
59
60 struct cl_site ed_site_myself;
61 struct cl_site *ed_site;
62 struct lu_device *ed_next;
63 };
64
65 struct echo_object {
66 struct cl_object eo_cl;
67 struct cl_object_header eo_hdr;
68
69 struct echo_device *eo_dev;
70 struct list_head eo_obj_chain;
71 struct lov_stripe_md *eo_lsm;
72 atomic_t eo_npages;
73 int eo_deleted;
74 };
75
76 struct echo_object_conf {
77 struct cl_object_conf eoc_cl;
78 struct lov_stripe_md **eoc_md;
79 };
80
81 struct echo_page {
82 struct cl_page_slice ep_cl;
83 struct mutex ep_lock;
84 };
85
86 struct echo_lock {
87 struct cl_lock_slice el_cl;
88 struct list_head el_chain;
89 struct echo_object *el_object;
90 __u64 el_cookie;
91 atomic_t el_refcount;
92 };
93
94 static int echo_client_setup(const struct lu_env *env,
95 struct obd_device *obddev,
96 struct lustre_cfg *lcfg);
97 static int echo_client_cleanup(struct obd_device *obddev);
98
99 /** \defgroup echo_helpers Helper functions
100 * @{
101 */
102 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
103 {
104 return container_of0(dev, struct echo_device, ed_cl);
105 }
106
107 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
108 {
109 return &d->ed_cl;
110 }
111
112 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
113 {
114 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
115 }
116
117 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
118 {
119 return &eco->eo_cl;
120 }
121
122 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
123 {
124 return container_of(o, struct echo_object, eo_cl);
125 }
126
127 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
128 {
129 return container_of(s, struct echo_page, ep_cl);
130 }
131
132 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
133 {
134 return container_of(s, struct echo_lock, el_cl);
135 }
136
137 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
138 {
139 return ecl->el_cl.cls_lock;
140 }
141
142 static struct lu_context_key echo_thread_key;
143 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
144 {
145 struct echo_thread_info *info;
146
147 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
148 LASSERT(info);
149 return info;
150 }
151
152 static inline
153 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
154 {
155 return container_of(c, struct echo_object_conf, eoc_cl);
156 }
157
158 /** @} echo_helpers */
159
160 static struct echo_object *cl_echo_object_find(struct echo_device *d,
161 struct lov_stripe_md **lsm);
162 static int cl_echo_object_put(struct echo_object *eco);
163 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
164 struct page **pages, int npages, int async);
165
166 struct echo_thread_info {
167 struct echo_object_conf eti_conf;
168 struct lustre_md eti_md;
169
170 struct cl_2queue eti_queue;
171 struct cl_io eti_io;
172 struct cl_lock eti_lock;
173 struct lu_fid eti_fid;
174 struct lu_fid eti_fid2;
175 };
176
177 /* No session used right now */
178 struct echo_session_info {
179 unsigned long dummy;
180 };
181
182 static struct kmem_cache *echo_lock_kmem;
183 static struct kmem_cache *echo_object_kmem;
184 static struct kmem_cache *echo_thread_kmem;
185 static struct kmem_cache *echo_session_kmem;
186
187 static struct lu_kmem_descr echo_caches[] = {
188 {
189 .ckd_cache = &echo_lock_kmem,
190 .ckd_name = "echo_lock_kmem",
191 .ckd_size = sizeof(struct echo_lock)
192 },
193 {
194 .ckd_cache = &echo_object_kmem,
195 .ckd_name = "echo_object_kmem",
196 .ckd_size = sizeof(struct echo_object)
197 },
198 {
199 .ckd_cache = &echo_thread_kmem,
200 .ckd_name = "echo_thread_kmem",
201 .ckd_size = sizeof(struct echo_thread_info)
202 },
203 {
204 .ckd_cache = &echo_session_kmem,
205 .ckd_name = "echo_session_kmem",
206 .ckd_size = sizeof(struct echo_session_info)
207 },
208 {
209 .ckd_cache = NULL
210 }
211 };
212
213 /** \defgroup echo_page Page operations
214 *
215 * Echo page operations.
216 *
217 * @{
218 */
219 static int echo_page_own(const struct lu_env *env,
220 const struct cl_page_slice *slice,
221 struct cl_io *io, int nonblock)
222 {
223 struct echo_page *ep = cl2echo_page(slice);
224
225 if (!nonblock)
226 mutex_lock(&ep->ep_lock);
227 else if (!mutex_trylock(&ep->ep_lock))
228 return -EAGAIN;
229 return 0;
230 }
231
232 static void echo_page_disown(const struct lu_env *env,
233 const struct cl_page_slice *slice,
234 struct cl_io *io)
235 {
236 struct echo_page *ep = cl2echo_page(slice);
237
238 LASSERT(mutex_is_locked(&ep->ep_lock));
239 mutex_unlock(&ep->ep_lock);
240 }
241
242 static void echo_page_discard(const struct lu_env *env,
243 const struct cl_page_slice *slice,
244 struct cl_io *unused)
245 {
246 cl_page_delete(env, slice->cpl_page);
247 }
248
249 static int echo_page_is_vmlocked(const struct lu_env *env,
250 const struct cl_page_slice *slice)
251 {
252 if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
253 return -EBUSY;
254 return -ENODATA;
255 }
256
257 static void echo_page_completion(const struct lu_env *env,
258 const struct cl_page_slice *slice,
259 int ioret)
260 {
261 LASSERT(slice->cpl_page->cp_sync_io);
262 }
263
264 static void echo_page_fini(const struct lu_env *env,
265 struct cl_page_slice *slice)
266 {
267 struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
268
269 atomic_dec(&eco->eo_npages);
270 put_page(slice->cpl_page->cp_vmpage);
271 }
272
273 static int echo_page_prep(const struct lu_env *env,
274 const struct cl_page_slice *slice,
275 struct cl_io *unused)
276 {
277 return 0;
278 }
279
280 static int echo_page_print(const struct lu_env *env,
281 const struct cl_page_slice *slice,
282 void *cookie, lu_printer_t printer)
283 {
284 struct echo_page *ep = cl2echo_page(slice);
285
286 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
287 ep, mutex_is_locked(&ep->ep_lock),
288 slice->cpl_page->cp_vmpage);
289 return 0;
290 }
291
292 static const struct cl_page_operations echo_page_ops = {
293 .cpo_own = echo_page_own,
294 .cpo_disown = echo_page_disown,
295 .cpo_discard = echo_page_discard,
296 .cpo_fini = echo_page_fini,
297 .cpo_print = echo_page_print,
298 .cpo_is_vmlocked = echo_page_is_vmlocked,
299 .io = {
300 [CRT_READ] = {
301 .cpo_prep = echo_page_prep,
302 .cpo_completion = echo_page_completion,
303 },
304 [CRT_WRITE] = {
305 .cpo_prep = echo_page_prep,
306 .cpo_completion = echo_page_completion,
307 }
308 }
309 };
310
311 /** @} echo_page */
312
313 /** \defgroup echo_lock Locking
314 *
315 * echo lock operations
316 *
317 * @{
318 */
319 static void echo_lock_fini(const struct lu_env *env,
320 struct cl_lock_slice *slice)
321 {
322 struct echo_lock *ecl = cl2echo_lock(slice);
323
324 LASSERT(list_empty(&ecl->el_chain));
325 kmem_cache_free(echo_lock_kmem, ecl);
326 }
327
328 static struct cl_lock_operations echo_lock_ops = {
329 .clo_fini = echo_lock_fini,
330 };
331
332 /** @} echo_lock */
333
334 /** \defgroup echo_cl_ops cl_object operations
335 *
336 * operations for cl_object
337 *
338 * @{
339 */
340 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
341 struct cl_page *page, pgoff_t index)
342 {
343 struct echo_page *ep = cl_object_page_slice(obj, page);
344 struct echo_object *eco = cl2echo_obj(obj);
345
346 get_page(page->cp_vmpage);
347 mutex_init(&ep->ep_lock);
348 cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
349 atomic_inc(&eco->eo_npages);
350 return 0;
351 }
352
353 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
354 struct cl_io *io)
355 {
356 return 0;
357 }
358
359 static int echo_lock_init(const struct lu_env *env,
360 struct cl_object *obj, struct cl_lock *lock,
361 const struct cl_io *unused)
362 {
363 struct echo_lock *el;
364
365 el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
366 if (el) {
367 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
368 el->el_object = cl2echo_obj(obj);
369 INIT_LIST_HEAD(&el->el_chain);
370 atomic_set(&el->el_refcount, 0);
371 }
372 return !el ? -ENOMEM : 0;
373 }
374
375 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
376 const struct cl_object_conf *conf)
377 {
378 return 0;
379 }
380
381 static const struct cl_object_operations echo_cl_obj_ops = {
382 .coo_page_init = echo_page_init,
383 .coo_lock_init = echo_lock_init,
384 .coo_io_init = echo_io_init,
385 .coo_conf_set = echo_conf_set
386 };
387
388 /** @} echo_cl_ops */
389
390 /** \defgroup echo_lu_ops lu_object operations
391 *
392 * operations for echo lu object.
393 *
394 * @{
395 */
396 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
397 const struct lu_object_conf *conf)
398 {
399 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
400 struct echo_client_obd *ec = ed->ed_ec;
401 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
402 const struct cl_object_conf *cconf;
403 struct echo_object_conf *econf;
404
405 if (ed->ed_next) {
406 struct lu_object *below;
407 struct lu_device *under;
408
409 under = ed->ed_next;
410 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
411 under);
412 if (!below)
413 return -ENOMEM;
414 lu_object_add(obj, below);
415 }
416
417 cconf = lu2cl_conf(conf);
418 econf = cl2echo_conf(cconf);
419
420 LASSERT(econf->eoc_md);
421 eco->eo_lsm = *econf->eoc_md;
422 /* clear the lsm pointer so that it won't get freed. */
423 *econf->eoc_md = NULL;
424
425 eco->eo_dev = ed;
426 atomic_set(&eco->eo_npages, 0);
427 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
428
429 spin_lock(&ec->ec_lock);
430 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
431 spin_unlock(&ec->ec_lock);
432
433 return 0;
434 }
435
436 /* taken from osc_unpackmd() */
437 static int echo_alloc_memmd(struct echo_device *ed,
438 struct lov_stripe_md **lsmp)
439 {
440 int lsm_size;
441
442 /* If export is lov/osc then use their obd method */
443 if (ed->ed_next)
444 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
445 /* OFD has no unpackmd method, do everything here */
446 lsm_size = lov_stripe_md_size(1);
447
448 LASSERT(!*lsmp);
449 *lsmp = kzalloc(lsm_size, GFP_NOFS);
450 if (!*lsmp)
451 return -ENOMEM;
452
453 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
454 if (!(*lsmp)->lsm_oinfo[0]) {
455 kfree(*lsmp);
456 return -ENOMEM;
457 }
458
459 loi_init((*lsmp)->lsm_oinfo[0]);
460 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
461 ostid_set_seq_echo(&(*lsmp)->lsm_oi);
462
463 return lsm_size;
464 }
465
466 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
467 {
468 int lsm_size;
469
470 /* If export is lov/osc then use their obd method */
471 if (ed->ed_next)
472 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
473 /* OFD has no unpackmd method, do everything here */
474 lsm_size = lov_stripe_md_size(1);
475
476 kfree((*lsmp)->lsm_oinfo[0]);
477 kfree(*lsmp);
478 *lsmp = NULL;
479 return 0;
480 }
481
482 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
483 {
484 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
485 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
486
487 LASSERT(atomic_read(&eco->eo_npages) == 0);
488
489 spin_lock(&ec->ec_lock);
490 list_del_init(&eco->eo_obj_chain);
491 spin_unlock(&ec->ec_lock);
492
493 lu_object_fini(obj);
494 lu_object_header_fini(obj->lo_header);
495
496 if (eco->eo_lsm)
497 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
498 kmem_cache_free(echo_object_kmem, eco);
499 }
500
501 static int echo_object_print(const struct lu_env *env, void *cookie,
502 lu_printer_t p, const struct lu_object *o)
503 {
504 struct echo_object *obj = cl2echo_obj(lu2cl(o));
505
506 return (*p)(env, cookie, "echoclient-object@%p", obj);
507 }
508
509 static const struct lu_object_operations echo_lu_obj_ops = {
510 .loo_object_init = echo_object_init,
511 .loo_object_delete = NULL,
512 .loo_object_release = NULL,
513 .loo_object_free = echo_object_free,
514 .loo_object_print = echo_object_print,
515 .loo_object_invariant = NULL
516 };
517
518 /** @} echo_lu_ops */
519
520 /** \defgroup echo_lu_dev_ops lu_device operations
521 *
522 * Operations for echo lu device.
523 *
524 * @{
525 */
526 static struct lu_object *echo_object_alloc(const struct lu_env *env,
527 const struct lu_object_header *hdr,
528 struct lu_device *dev)
529 {
530 struct echo_object *eco;
531 struct lu_object *obj = NULL;
532
533 /* we're the top dev. */
534 LASSERT(!hdr);
535 eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
536 if (eco) {
537 struct cl_object_header *hdr = &eco->eo_hdr;
538
539 obj = &echo_obj2cl(eco)->co_lu;
540 cl_object_header_init(hdr);
541 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
542
543 lu_object_init(obj, &hdr->coh_lu, dev);
544 lu_object_add_top(&hdr->coh_lu, obj);
545
546 eco->eo_cl.co_ops = &echo_cl_obj_ops;
547 obj->lo_ops = &echo_lu_obj_ops;
548 }
549 return obj;
550 }
551
552 static const struct lu_device_operations echo_device_lu_ops = {
553 .ldo_object_alloc = echo_object_alloc,
554 };
555
556 /** @} echo_lu_dev_ops */
557
558 static const struct cl_device_operations echo_device_cl_ops = {
559 };
560
561 /** \defgroup echo_init Setup and teardown
562 *
563 * Init and fini functions for echo client.
564 *
565 * @{
566 */
567 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
568 {
569 struct cl_site *site = &ed->ed_site_myself;
570 int rc;
571
572 /* initialize site */
573 rc = cl_site_init(site, &ed->ed_cl);
574 if (rc) {
575 CERROR("Cannot initialize site for echo client(%d)\n", rc);
576 return rc;
577 }
578
579 rc = lu_site_init_finish(&site->cs_lu);
580 if (rc)
581 return rc;
582
583 ed->ed_site = site;
584 return 0;
585 }
586
587 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
588 {
589 if (ed->ed_site) {
590 cl_site_fini(ed->ed_site);
591 ed->ed_site = NULL;
592 }
593 }
594
595 static void *echo_thread_key_init(const struct lu_context *ctx,
596 struct lu_context_key *key)
597 {
598 struct echo_thread_info *info;
599
600 info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
601 if (!info)
602 info = ERR_PTR(-ENOMEM);
603 return info;
604 }
605
606 static void echo_thread_key_fini(const struct lu_context *ctx,
607 struct lu_context_key *key, void *data)
608 {
609 struct echo_thread_info *info = data;
610
611 kmem_cache_free(echo_thread_kmem, info);
612 }
613
614 static void echo_thread_key_exit(const struct lu_context *ctx,
615 struct lu_context_key *key, void *data)
616 {
617 }
618
619 static struct lu_context_key echo_thread_key = {
620 .lct_tags = LCT_CL_THREAD,
621 .lct_init = echo_thread_key_init,
622 .lct_fini = echo_thread_key_fini,
623 .lct_exit = echo_thread_key_exit
624 };
625
626 static void *echo_session_key_init(const struct lu_context *ctx,
627 struct lu_context_key *key)
628 {
629 struct echo_session_info *session;
630
631 session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
632 if (!session)
633 session = ERR_PTR(-ENOMEM);
634 return session;
635 }
636
637 static void echo_session_key_fini(const struct lu_context *ctx,
638 struct lu_context_key *key, void *data)
639 {
640 struct echo_session_info *session = data;
641
642 kmem_cache_free(echo_session_kmem, session);
643 }
644
645 static void echo_session_key_exit(const struct lu_context *ctx,
646 struct lu_context_key *key, void *data)
647 {
648 }
649
650 static struct lu_context_key echo_session_key = {
651 .lct_tags = LCT_SESSION,
652 .lct_init = echo_session_key_init,
653 .lct_fini = echo_session_key_fini,
654 .lct_exit = echo_session_key_exit
655 };
656
657 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
658
659 static struct lu_device *echo_device_alloc(const struct lu_env *env,
660 struct lu_device_type *t,
661 struct lustre_cfg *cfg)
662 {
663 struct lu_device *next;
664 struct echo_device *ed;
665 struct cl_device *cd;
666 struct obd_device *obd = NULL; /* to keep compiler happy */
667 struct obd_device *tgt;
668 const char *tgt_type_name;
669 int rc, err;
670
671 ed = kzalloc(sizeof(*ed), GFP_NOFS);
672 if (!ed) {
673 rc = -ENOMEM;
674 goto out;
675 }
676
677 cd = &ed->ed_cl;
678 rc = cl_device_init(cd, t);
679 if (rc)
680 goto out_free;
681
682 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
683 cd->cd_ops = &echo_device_cl_ops;
684
685 obd = class_name2obd(lustre_cfg_string(cfg, 0));
686 LASSERT(obd);
687 LASSERT(env);
688
689 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
690 if (!tgt) {
691 CERROR("Can not find tgt device %s\n",
692 lustre_cfg_string(cfg, 1));
693 rc = -ENODEV;
694 goto out_device_fini;
695 }
696
697 next = tgt->obd_lu_dev;
698 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
699 CERROR("echo MDT client must be run on server\n");
700 rc = -EOPNOTSUPP;
701 goto out_device_fini;
702 }
703
704 rc = echo_site_init(env, ed);
705 if (rc)
706 goto out_device_fini;
707
708 rc = echo_client_setup(env, obd, cfg);
709 if (rc)
710 goto out_site_fini;
711
712 ed->ed_ec = &obd->u.echo_client;
713
714 /* if echo client is to be stacked upon ost device, the next is
715 * NULL since ost is not a clio device so far
716 */
717 if (next && !lu_device_is_cl(next))
718 next = NULL;
719
720 tgt_type_name = tgt->obd_type->typ_name;
721 if (next) {
722 if (next->ld_site) {
723 rc = -EBUSY;
724 goto out_cleanup;
725 }
726
727 next->ld_site = &ed->ed_site->cs_lu;
728 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
729 next->ld_type->ldt_name,
730 NULL);
731 if (rc)
732 goto out_cleanup;
733
734 } else {
735 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
736 }
737
738 ed->ed_next = next;
739 return &cd->cd_lu_dev;
740
741 out_cleanup:
742 err = echo_client_cleanup(obd);
743 if (err)
744 CERROR("Cleanup obd device %s error(%d)\n",
745 obd->obd_name, err);
746 out_site_fini:
747 echo_site_fini(env, ed);
748 out_device_fini:
749 cl_device_fini(&ed->ed_cl);
750 out_free:
751 kfree(ed);
752 out:
753 return ERR_PTR(rc);
754 }
755
756 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
757 const char *name, struct lu_device *next)
758 {
759 LBUG();
760 return 0;
761 }
762
763 static struct lu_device *echo_device_fini(const struct lu_env *env,
764 struct lu_device *d)
765 {
766 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
767 struct lu_device *next = ed->ed_next;
768
769 while (next)
770 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
771 return NULL;
772 }
773
774 static void echo_lock_release(const struct lu_env *env,
775 struct echo_lock *ecl,
776 int still_used)
777 {
778 struct cl_lock *clk = echo_lock2cl(ecl);
779
780 cl_lock_release(env, clk);
781 }
782
783 static struct lu_device *echo_device_free(const struct lu_env *env,
784 struct lu_device *d)
785 {
786 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
787 struct echo_client_obd *ec = ed->ed_ec;
788 struct echo_object *eco;
789 struct lu_device *next = ed->ed_next;
790
791 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
792 ed, next);
793
794 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
795
796 /* check if there are objects still alive.
797 * It shouldn't have any object because lu_site_purge would cleanup
798 * all of cached objects. Anyway, probably the echo device is being
799 * parallelly accessed.
800 */
801 spin_lock(&ec->ec_lock);
802 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
803 eco->eo_deleted = 1;
804 spin_unlock(&ec->ec_lock);
805
806 /* purge again */
807 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
808
809 CDEBUG(D_INFO,
810 "Waiting for the reference of echo object to be dropped\n");
811
812 /* Wait for the last reference to be dropped. */
813 spin_lock(&ec->ec_lock);
814 while (!list_empty(&ec->ec_objects)) {
815 spin_unlock(&ec->ec_lock);
816 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
817 set_current_state(TASK_UNINTERRUPTIBLE);
818 schedule_timeout(cfs_time_seconds(1));
819 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
820 spin_lock(&ec->ec_lock);
821 }
822 spin_unlock(&ec->ec_lock);
823
824 LASSERT(list_empty(&ec->ec_locks));
825
826 CDEBUG(D_INFO, "No object exists, exiting...\n");
827
828 echo_client_cleanup(d->ld_obd);
829
830 while (next)
831 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
832
833 LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
834 echo_site_fini(env, ed);
835 cl_device_fini(&ed->ed_cl);
836 kfree(ed);
837
838 return NULL;
839 }
840
841 static const struct lu_device_type_operations echo_device_type_ops = {
842 .ldto_init = echo_type_init,
843 .ldto_fini = echo_type_fini,
844
845 .ldto_start = echo_type_start,
846 .ldto_stop = echo_type_stop,
847
848 .ldto_device_alloc = echo_device_alloc,
849 .ldto_device_free = echo_device_free,
850 .ldto_device_init = echo_device_init,
851 .ldto_device_fini = echo_device_fini
852 };
853
854 static struct lu_device_type echo_device_type = {
855 .ldt_tags = LU_DEVICE_CL,
856 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
857 .ldt_ops = &echo_device_type_ops,
858 .ldt_ctx_tags = LCT_CL_THREAD,
859 };
860
861 /** @} echo_init */
862
863 /** \defgroup echo_exports Exported operations
864 *
865 * exporting functions to echo client
866 *
867 * @{
868 */
869
870 /* Interfaces to echo client obd device */
871 static struct echo_object *cl_echo_object_find(struct echo_device *d,
872 struct lov_stripe_md **lsmp)
873 {
874 struct lu_env *env;
875 struct echo_thread_info *info;
876 struct echo_object_conf *conf;
877 struct lov_stripe_md *lsm;
878 struct echo_object *eco;
879 struct cl_object *obj;
880 struct lu_fid *fid;
881 int refcheck;
882 int rc;
883
884 LASSERT(lsmp);
885 lsm = *lsmp;
886 LASSERT(lsm);
887 LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
888 LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
889 POSTID(&lsm->lsm_oi));
890
891 /* Never return an object if the obd is to be freed. */
892 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
893 return ERR_PTR(-ENODEV);
894
895 env = cl_env_get(&refcheck);
896 if (IS_ERR(env))
897 return (void *)env;
898
899 info = echo_env_info(env);
900 conf = &info->eti_conf;
901 if (d->ed_next) {
902 struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
903
904 LASSERT(oinfo);
905 oinfo->loi_oi = lsm->lsm_oi;
906 conf->eoc_cl.u.coc_oinfo = oinfo;
907 }
908 conf->eoc_md = lsmp;
909
910 fid = &info->eti_fid;
911 rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
912 if (rc != 0) {
913 eco = ERR_PTR(rc);
914 goto out;
915 }
916
917 /* In the function below, .hs_keycmp resolves to
918 * lu_obj_hop_keycmp()
919 */
920 /* coverity[overrun-buffer-val] */
921 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
922 if (IS_ERR(obj)) {
923 eco = (void *)obj;
924 goto out;
925 }
926
927 eco = cl2echo_obj(obj);
928 if (eco->eo_deleted) {
929 cl_object_put(env, obj);
930 eco = ERR_PTR(-EAGAIN);
931 }
932
933 out:
934 cl_env_put(env, &refcheck);
935 return eco;
936 }
937
938 static int cl_echo_object_put(struct echo_object *eco)
939 {
940 struct lu_env *env;
941 struct cl_object *obj = echo_obj2cl(eco);
942 int refcheck;
943
944 env = cl_env_get(&refcheck);
945 if (IS_ERR(env))
946 return PTR_ERR(env);
947
948 /* an external function to kill an object? */
949 if (eco->eo_deleted) {
950 struct lu_object_header *loh = obj->co_lu.lo_header;
951
952 LASSERT(&eco->eo_hdr == luh2coh(loh));
953 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
954 }
955
956 cl_object_put(env, obj);
957 cl_env_put(env, &refcheck);
958 return 0;
959 }
960
961 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
962 u64 start, u64 end, int mode,
963 __u64 *cookie, __u32 enqflags)
964 {
965 struct cl_io *io;
966 struct cl_lock *lck;
967 struct cl_object *obj;
968 struct cl_lock_descr *descr;
969 struct echo_thread_info *info;
970 int rc = -ENOMEM;
971
972 info = echo_env_info(env);
973 io = &info->eti_io;
974 lck = &info->eti_lock;
975 obj = echo_obj2cl(eco);
976
977 memset(lck, 0, sizeof(*lck));
978 descr = &lck->cll_descr;
979 descr->cld_obj = obj;
980 descr->cld_start = cl_index(obj, start);
981 descr->cld_end = cl_index(obj, end);
982 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
983 descr->cld_enq_flags = enqflags;
984 io->ci_obj = obj;
985
986 rc = cl_lock_request(env, io, lck);
987 if (rc == 0) {
988 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
989 struct echo_lock *el;
990
991 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
992 spin_lock(&ec->ec_lock);
993 if (list_empty(&el->el_chain)) {
994 list_add(&el->el_chain, &ec->ec_locks);
995 el->el_cookie = ++ec->ec_unique;
996 }
997 atomic_inc(&el->el_refcount);
998 *cookie = el->el_cookie;
999 spin_unlock(&ec->ec_lock);
1000 }
1001 return rc;
1002 }
1003
1004 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1005 __u64 cookie)
1006 {
1007 struct echo_client_obd *ec = ed->ed_ec;
1008 struct echo_lock *ecl = NULL;
1009 struct list_head *el;
1010 int found = 0, still_used = 0;
1011
1012 spin_lock(&ec->ec_lock);
1013 list_for_each(el, &ec->ec_locks) {
1014 ecl = list_entry(el, struct echo_lock, el_chain);
1015 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1016 found = (ecl->el_cookie == cookie);
1017 if (found) {
1018 if (atomic_dec_and_test(&ecl->el_refcount))
1019 list_del_init(&ecl->el_chain);
1020 else
1021 still_used = 1;
1022 break;
1023 }
1024 }
1025 spin_unlock(&ec->ec_lock);
1026
1027 if (!found)
1028 return -ENOENT;
1029
1030 echo_lock_release(env, ecl, still_used);
1031 return 0;
1032 }
1033
1034 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1035 struct cl_page *page)
1036 {
1037 struct echo_thread_info *info;
1038 struct cl_2queue *queue;
1039
1040 info = echo_env_info(env);
1041 LASSERT(io == &info->eti_io);
1042
1043 queue = &info->eti_queue;
1044 cl_page_list_add(&queue->c2_qout, page);
1045 }
1046
1047 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1048 struct page **pages, int npages, int async)
1049 {
1050 struct lu_env *env;
1051 struct echo_thread_info *info;
1052 struct cl_object *obj = echo_obj2cl(eco);
1053 struct echo_device *ed = eco->eo_dev;
1054 struct cl_2queue *queue;
1055 struct cl_io *io;
1056 struct cl_page *clp;
1057 struct lustre_handle lh = { 0 };
1058 int page_size = cl_page_size(obj);
1059 int refcheck;
1060 int rc;
1061 int i;
1062
1063 LASSERT((offset & ~PAGE_MASK) == 0);
1064 LASSERT(ed->ed_next);
1065 env = cl_env_get(&refcheck);
1066 if (IS_ERR(env))
1067 return PTR_ERR(env);
1068
1069 info = echo_env_info(env);
1070 io = &info->eti_io;
1071 queue = &info->eti_queue;
1072
1073 cl_2queue_init(queue);
1074
1075 io->ci_ignore_layout = 1;
1076 rc = cl_io_init(env, io, CIT_MISC, obj);
1077 if (rc < 0)
1078 goto out;
1079 LASSERT(rc == 0);
1080
1081 rc = cl_echo_enqueue0(env, eco, offset,
1082 offset + npages * PAGE_SIZE - 1,
1083 rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1084 CEF_NEVER);
1085 if (rc < 0)
1086 goto error_lock;
1087
1088 for (i = 0; i < npages; i++) {
1089 LASSERT(pages[i]);
1090 clp = cl_page_find(env, obj, cl_index(obj, offset),
1091 pages[i], CPT_TRANSIENT);
1092 if (IS_ERR(clp)) {
1093 rc = PTR_ERR(clp);
1094 break;
1095 }
1096 LASSERT(clp->cp_type == CPT_TRANSIENT);
1097
1098 rc = cl_page_own(env, io, clp);
1099 if (rc) {
1100 LASSERT(clp->cp_state == CPS_FREEING);
1101 cl_page_put(env, clp);
1102 break;
1103 }
1104 /*
1105 * Add a page to the incoming page list of 2-queue.
1106 */
1107 cl_page_list_add(&queue->c2_qin, clp);
1108
1109 /* drop the reference count for cl_page_find, so that the page
1110 * will be freed in cl_2queue_fini.
1111 */
1112 cl_page_put(env, clp);
1113 cl_page_clip(env, clp, 0, page_size);
1114
1115 offset += page_size;
1116 }
1117
1118 if (rc == 0) {
1119 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1120
1121 async = async && (typ == CRT_WRITE);
1122 if (async)
1123 rc = cl_io_commit_async(env, io, &queue->c2_qin,
1124 0, PAGE_SIZE,
1125 echo_commit_callback);
1126 else
1127 rc = cl_io_submit_sync(env, io, typ, queue, 0);
1128 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1129 async ? "async" : "sync", rc);
1130 }
1131
1132 cl_echo_cancel0(env, ed, lh.cookie);
1133 error_lock:
1134 cl_2queue_discard(env, io, queue);
1135 cl_2queue_disown(env, io, queue);
1136 cl_2queue_fini(env, queue);
1137 cl_io_fini(env, io);
1138 out:
1139 cl_env_put(env, &refcheck);
1140 return rc;
1141 }
1142
1143 /** @} echo_exports */
1144
1145 static u64 last_object_id;
1146
1147 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1148 struct obdo *oa, struct obd_trans_info *oti)
1149 {
1150 struct echo_object *eco;
1151 struct echo_client_obd *ec = ed->ed_ec;
1152 struct lov_stripe_md *lsm = NULL;
1153 int rc;
1154 int created = 0;
1155
1156 if (!(oa->o_valid & OBD_MD_FLID) ||
1157 !(oa->o_valid & OBD_MD_FLGROUP) ||
1158 !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1159 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1160 return -EINVAL;
1161 }
1162
1163 rc = echo_alloc_memmd(ed, &lsm);
1164 if (rc < 0) {
1165 CERROR("Cannot allocate md: rc = %d\n", rc);
1166 goto failed;
1167 }
1168
1169 /* setup object ID here */
1170 lsm->lsm_oi = oa->o_oi;
1171
1172 if (ostid_id(&lsm->lsm_oi) == 0)
1173 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1174
1175 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1176 if (rc != 0) {
1177 CERROR("Cannot create objects: rc = %d\n", rc);
1178 goto failed;
1179 }
1180 created = 1;
1181
1182 /* See what object ID we were given */
1183 oa->o_oi = lsm->lsm_oi;
1184 oa->o_valid |= OBD_MD_FLID;
1185
1186 eco = cl_echo_object_find(ed, &lsm);
1187 if (IS_ERR(eco)) {
1188 rc = PTR_ERR(eco);
1189 goto failed;
1190 }
1191 cl_echo_object_put(eco);
1192
1193 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1194
1195 failed:
1196 if (created && rc)
1197 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
1198 if (lsm)
1199 echo_free_memmd(ed, &lsm);
1200 if (rc)
1201 CERROR("create object failed with: rc = %d\n", rc);
1202 return rc;
1203 }
1204
1205 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1206 struct obdo *oa)
1207 {
1208 struct lov_stripe_md *lsm = NULL;
1209 struct echo_object *eco;
1210 int rc;
1211
1212 if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1213 /* disallow use of object id 0 */
1214 CERROR("No valid oid\n");
1215 return -EINVAL;
1216 }
1217
1218 rc = echo_alloc_memmd(ed, &lsm);
1219 if (rc < 0)
1220 return rc;
1221
1222 lsm->lsm_oi = oa->o_oi;
1223 if (!(oa->o_valid & OBD_MD_FLGROUP))
1224 ostid_set_seq_echo(&lsm->lsm_oi);
1225
1226 rc = 0;
1227 eco = cl_echo_object_find(ed, &lsm);
1228 if (!IS_ERR(eco))
1229 *ecop = eco;
1230 else
1231 rc = PTR_ERR(eco);
1232 if (lsm)
1233 echo_free_memmd(ed, &lsm);
1234 return rc;
1235 }
1236
1237 static void echo_put_object(struct echo_object *eco)
1238 {
1239 int rc;
1240
1241 rc = cl_echo_object_put(eco);
1242 if (rc)
1243 CERROR("%s: echo client drop an object failed: rc = %d\n",
1244 eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
1245 }
1246
1247 static void
1248 echo_client_page_debug_setup(struct page *page, int rw, u64 id,
1249 u64 offset, u64 count)
1250 {
1251 char *addr;
1252 u64 stripe_off;
1253 u64 stripe_id;
1254 int delta;
1255
1256 /* no partial pages on the client */
1257 LASSERT(count == PAGE_SIZE);
1258
1259 addr = kmap(page);
1260
1261 for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1262 if (rw == OBD_BRW_WRITE) {
1263 stripe_off = offset + delta;
1264 stripe_id = id;
1265 } else {
1266 stripe_off = 0xdeadbeef00c0ffeeULL;
1267 stripe_id = 0xdeadbeef00c0ffeeULL;
1268 }
1269 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1270 stripe_off, stripe_id);
1271 }
1272
1273 kunmap(page);
1274 }
1275
1276 static int echo_client_page_debug_check(struct page *page, u64 id,
1277 u64 offset, u64 count)
1278 {
1279 u64 stripe_off;
1280 u64 stripe_id;
1281 char *addr;
1282 int delta;
1283 int rc;
1284 int rc2;
1285
1286 /* no partial pages on the client */
1287 LASSERT(count == PAGE_SIZE);
1288
1289 addr = kmap(page);
1290
1291 for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1292 stripe_off = offset + delta;
1293 stripe_id = id;
1294
1295 rc2 = block_debug_check("test_brw",
1296 addr + delta, OBD_ECHO_BLOCK_SIZE,
1297 stripe_off, stripe_id);
1298 if (rc2 != 0) {
1299 CERROR("Error in echo object %#llx\n", id);
1300 rc = rc2;
1301 }
1302 }
1303
1304 kunmap(page);
1305 return rc;
1306 }
1307
1308 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1309 struct echo_object *eco, u64 offset,
1310 u64 count, int async,
1311 struct obd_trans_info *oti)
1312 {
1313 u32 npages;
1314 struct brw_page *pga;
1315 struct brw_page *pgp;
1316 struct page **pages;
1317 u64 off;
1318 int i;
1319 int rc;
1320 int verify;
1321 gfp_t gfp_mask;
1322 int brw_flags = 0;
1323
1324 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1325 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1326 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1327
1328 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1329
1330 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1331
1332 if (count <= 0 ||
1333 (count & (~PAGE_MASK)) != 0)
1334 return -EINVAL;
1335
1336 /* XXX think again with misaligned I/O */
1337 npages = count >> PAGE_SHIFT;
1338
1339 if (rw == OBD_BRW_WRITE)
1340 brw_flags = OBD_BRW_ASYNC;
1341
1342 pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1343 if (!pga)
1344 return -ENOMEM;
1345
1346 pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1347 if (!pages) {
1348 kfree(pga);
1349 return -ENOMEM;
1350 }
1351
1352 for (i = 0, pgp = pga, off = offset;
1353 i < npages;
1354 i++, pgp++, off += PAGE_SIZE) {
1355 LASSERT(!pgp->pg); /* for cleanup */
1356
1357 rc = -ENOMEM;
1358 pgp->pg = alloc_page(gfp_mask);
1359 if (!pgp->pg)
1360 goto out;
1361
1362 pages[i] = pgp->pg;
1363 pgp->count = PAGE_SIZE;
1364 pgp->off = off;
1365 pgp->flag = brw_flags;
1366
1367 if (verify)
1368 echo_client_page_debug_setup(pgp->pg, rw,
1369 ostid_id(&oa->o_oi), off,
1370 pgp->count);
1371 }
1372
1373 /* brw mode can only be used at client */
1374 LASSERT(ed->ed_next);
1375 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1376
1377 out:
1378 if (rc != 0 || rw != OBD_BRW_READ)
1379 verify = 0;
1380
1381 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1382 if (!pgp->pg)
1383 continue;
1384
1385 if (verify) {
1386 int vrc;
1387
1388 vrc = echo_client_page_debug_check(pgp->pg,
1389 ostid_id(&oa->o_oi),
1390 pgp->off, pgp->count);
1391 if (vrc != 0 && rc == 0)
1392 rc = vrc;
1393 }
1394 __free_page(pgp->pg);
1395 }
1396 kfree(pga);
1397 kfree(pages);
1398 return rc;
1399 }
1400
1401 static int echo_client_prep_commit(const struct lu_env *env,
1402 struct obd_export *exp, int rw,
1403 struct obdo *oa, struct echo_object *eco,
1404 u64 offset, u64 count,
1405 u64 batch, struct obd_trans_info *oti,
1406 int async)
1407 {
1408 struct obd_ioobj ioo;
1409 struct niobuf_local *lnb;
1410 struct niobuf_remote *rnb;
1411 u64 off;
1412 u64 npages, tot_pages;
1413 int i, ret = 0, brw_flags = 0;
1414
1415 if (count <= 0 || (count & (~PAGE_MASK)) != 0)
1416 return -EINVAL;
1417
1418 npages = batch >> PAGE_SHIFT;
1419 tot_pages = count >> PAGE_SHIFT;
1420
1421 lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1422 rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1423
1424 if (!lnb || !rnb) {
1425 ret = -ENOMEM;
1426 goto out;
1427 }
1428
1429 if (rw == OBD_BRW_WRITE && async)
1430 brw_flags |= OBD_BRW_ASYNC;
1431
1432 obdo_to_ioobj(oa, &ioo);
1433
1434 off = offset;
1435
1436 for (; tot_pages; tot_pages -= npages) {
1437 int lpages;
1438
1439 if (tot_pages < npages)
1440 npages = tot_pages;
1441
1442 for (i = 0; i < npages; i++, off += PAGE_SIZE) {
1443 rnb[i].offset = off;
1444 rnb[i].len = PAGE_SIZE;
1445 rnb[i].flags = brw_flags;
1446 }
1447
1448 ioo.ioo_bufcnt = npages;
1449 oti->oti_transno = 0;
1450
1451 lpages = npages;
1452 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1453 lnb, oti);
1454 if (ret != 0)
1455 goto out;
1456 LASSERT(lpages == npages);
1457
1458 for (i = 0; i < lpages; i++) {
1459 struct page *page = lnb[i].page;
1460
1461 /* read past eof? */
1462 if (!page && lnb[i].rc == 0)
1463 continue;
1464
1465 if (async)
1466 lnb[i].flags |= OBD_BRW_ASYNC;
1467
1468 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1469 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1470 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1471 continue;
1472
1473 if (rw == OBD_BRW_WRITE)
1474 echo_client_page_debug_setup(page, rw,
1475 ostid_id(&oa->o_oi),
1476 rnb[i].offset,
1477 rnb[i].len);
1478 else
1479 echo_client_page_debug_check(page,
1480 ostid_id(&oa->o_oi),
1481 rnb[i].offset,
1482 rnb[i].len);
1483 }
1484
1485 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1486 rnb, npages, lnb, oti, ret);
1487 if (ret != 0)
1488 goto out;
1489
1490 /* Reset oti otherwise it would confuse ldiskfs. */
1491 memset(oti, 0, sizeof(*oti));
1492
1493 /* Reuse env context. */
1494 lu_context_exit((struct lu_context *)&env->le_ctx);
1495 lu_context_enter((struct lu_context *)&env->le_ctx);
1496 }
1497
1498 out:
1499 kfree(lnb);
1500 kfree(rnb);
1501 return ret;
1502 }
1503
1504 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1505 struct obd_export *exp,
1506 struct obd_ioctl_data *data,
1507 struct obd_trans_info *dummy_oti)
1508 {
1509 struct obd_device *obd = class_exp2obd(exp);
1510 struct echo_device *ed = obd2echo_dev(obd);
1511 struct echo_client_obd *ec = ed->ed_ec;
1512 struct obdo *oa = &data->ioc_obdo1;
1513 struct echo_object *eco;
1514 int rc;
1515 int async = 1;
1516 long test_mode;
1517
1518 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1519
1520 rc = echo_get_object(&eco, ed, oa);
1521 if (rc)
1522 return rc;
1523
1524 oa->o_valid &= ~OBD_MD_FLHANDLE;
1525
1526 /* OFD/obdfilter works only via prep/commit */
1527 test_mode = (long)data->ioc_pbuf1;
1528 if (test_mode == 1)
1529 async = 0;
1530
1531 if (!ed->ed_next && test_mode != 3) {
1532 test_mode = 3;
1533 data->ioc_plen1 = data->ioc_count;
1534 }
1535
1536 /* Truncate batch size to maximum */
1537 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1538 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1539
1540 switch (test_mode) {
1541 case 1:
1542 /* fall through */
1543 case 2:
1544 rc = echo_client_kbrw(ed, rw, oa,
1545 eco, data->ioc_offset,
1546 data->ioc_count, async, dummy_oti);
1547 break;
1548 case 3:
1549 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1550 eco, data->ioc_offset,
1551 data->ioc_count, data->ioc_plen1,
1552 dummy_oti, async);
1553 break;
1554 default:
1555 rc = -EINVAL;
1556 }
1557 echo_put_object(eco);
1558 return rc;
1559 }
1560
1561 static int
1562 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1563 void *karg, void __user *uarg)
1564 {
1565 struct obd_device *obd = exp->exp_obd;
1566 struct echo_device *ed = obd2echo_dev(obd);
1567 struct echo_client_obd *ec = ed->ed_ec;
1568 struct echo_object *eco;
1569 struct obd_ioctl_data *data = karg;
1570 struct obd_trans_info dummy_oti;
1571 struct lu_env *env;
1572 struct oti_req_ack_lock *ack_lock;
1573 struct obdo *oa;
1574 struct lu_fid fid;
1575 int rw = OBD_BRW_READ;
1576 int rc = 0;
1577 int i;
1578
1579 memset(&dummy_oti, 0, sizeof(dummy_oti));
1580
1581 oa = &data->ioc_obdo1;
1582 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1583 oa->o_valid |= OBD_MD_FLGROUP;
1584 ostid_set_seq_echo(&oa->o_oi);
1585 }
1586
1587 /* This FID is unpacked just for validation at this point */
1588 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1589 if (rc < 0)
1590 return rc;
1591
1592 env = kzalloc(sizeof(*env), GFP_NOFS);
1593 if (!env)
1594 return -ENOMEM;
1595
1596 rc = lu_env_init(env, LCT_DT_THREAD);
1597 if (rc) {
1598 rc = -ENOMEM;
1599 goto out;
1600 }
1601
1602 switch (cmd) {
1603 case OBD_IOC_CREATE: /* may create echo object */
1604 if (!capable(CFS_CAP_SYS_ADMIN)) {
1605 rc = -EPERM;
1606 goto out;
1607 }
1608
1609 rc = echo_create_object(env, ed, oa, &dummy_oti);
1610 goto out;
1611
1612 case OBD_IOC_DESTROY:
1613 if (!capable(CFS_CAP_SYS_ADMIN)) {
1614 rc = -EPERM;
1615 goto out;
1616 }
1617
1618 rc = echo_get_object(&eco, ed, oa);
1619 if (rc == 0) {
1620 rc = obd_destroy(env, ec->ec_exp, oa, NULL,
1621 &dummy_oti, NULL);
1622 if (rc == 0)
1623 eco->eo_deleted = 1;
1624 echo_put_object(eco);
1625 }
1626 goto out;
1627
1628 case OBD_IOC_GETATTR:
1629 rc = echo_get_object(&eco, ed, oa);
1630 if (rc == 0) {
1631 struct obd_info oinfo = {
1632 .oi_oa = oa,
1633 };
1634
1635 rc = obd_getattr(env, ec->ec_exp, &oinfo);
1636 echo_put_object(eco);
1637 }
1638 goto out;
1639
1640 case OBD_IOC_SETATTR:
1641 if (!capable(CFS_CAP_SYS_ADMIN)) {
1642 rc = -EPERM;
1643 goto out;
1644 }
1645
1646 rc = echo_get_object(&eco, ed, oa);
1647 if (rc == 0) {
1648 struct obd_info oinfo = {
1649 .oi_oa = oa,
1650 };
1651
1652 rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1653 echo_put_object(eco);
1654 }
1655 goto out;
1656
1657 case OBD_IOC_BRW_WRITE:
1658 if (!capable(CFS_CAP_SYS_ADMIN)) {
1659 rc = -EPERM;
1660 goto out;
1661 }
1662
1663 rw = OBD_BRW_WRITE;
1664 /* fall through */
1665 case OBD_IOC_BRW_READ:
1666 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1667 goto out;
1668
1669 default:
1670 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1671 rc = -ENOTTY;
1672 goto out;
1673 }
1674
1675 out:
1676 lu_env_fini(env);
1677 kfree(env);
1678
1679 /* XXX this should be in a helper also called by target_send_reply */
1680 for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1681 i++, ack_lock++) {
1682 if (!ack_lock->mode)
1683 break;
1684 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1685 }
1686
1687 return rc;
1688 }
1689
1690 static int echo_client_setup(const struct lu_env *env,
1691 struct obd_device *obddev, struct lustre_cfg *lcfg)
1692 {
1693 struct echo_client_obd *ec = &obddev->u.echo_client;
1694 struct obd_device *tgt;
1695 struct obd_uuid echo_uuid = { "ECHO_UUID" };
1696 struct obd_connect_data *ocd = NULL;
1697 int rc;
1698
1699 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1700 CERROR("requires a TARGET OBD name\n");
1701 return -EINVAL;
1702 }
1703
1704 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1705 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1706 CERROR("device not attached or not set up (%s)\n",
1707 lustre_cfg_string(lcfg, 1));
1708 return -EINVAL;
1709 }
1710
1711 spin_lock_init(&ec->ec_lock);
1712 INIT_LIST_HEAD(&ec->ec_objects);
1713 INIT_LIST_HEAD(&ec->ec_locks);
1714 ec->ec_unique = 0;
1715
1716 ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
1717 if (!ocd)
1718 return -ENOMEM;
1719
1720 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1721 OBD_CONNECT_BRW_SIZE |
1722 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1723 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1724 OBD_CONNECT_FID;
1725 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1726 ocd->ocd_version = LUSTRE_VERSION_CODE;
1727 ocd->ocd_group = FID_SEQ_ECHO;
1728
1729 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1730
1731 kfree(ocd);
1732
1733 if (rc != 0) {
1734 CERROR("fail to connect to device %s\n",
1735 lustre_cfg_string(lcfg, 1));
1736 return rc;
1737 }
1738
1739 return rc;
1740 }
1741
1742 static int echo_client_cleanup(struct obd_device *obddev)
1743 {
1744 struct echo_client_obd *ec = &obddev->u.echo_client;
1745 int rc;
1746
1747 if (!list_empty(&obddev->obd_exports)) {
1748 CERROR("still has clients!\n");
1749 return -EBUSY;
1750 }
1751
1752 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1753 rc = obd_disconnect(ec->ec_exp);
1754 if (rc != 0)
1755 CERROR("fail to disconnect device: %d\n", rc);
1756
1757 return rc;
1758 }
1759
1760 static int echo_client_connect(const struct lu_env *env,
1761 struct obd_export **exp,
1762 struct obd_device *src, struct obd_uuid *cluuid,
1763 struct obd_connect_data *data, void *localdata)
1764 {
1765 int rc;
1766 struct lustre_handle conn = { 0 };
1767
1768 rc = class_connect(&conn, src, cluuid);
1769 if (rc == 0) {
1770 *exp = class_conn2export(&conn);
1771 }
1772
1773 return rc;
1774 }
1775
1776 static int echo_client_disconnect(struct obd_export *exp)
1777 {
1778 int rc;
1779
1780 if (!exp) {
1781 rc = -EINVAL;
1782 goto out;
1783 }
1784
1785 rc = class_disconnect(exp);
1786 goto out;
1787 out:
1788 return rc;
1789 }
1790
1791 static struct obd_ops echo_client_obd_ops = {
1792 .owner = THIS_MODULE,
1793 .iocontrol = echo_client_iocontrol,
1794 .connect = echo_client_connect,
1795 .disconnect = echo_client_disconnect
1796 };
1797
1798 static int echo_client_init(void)
1799 {
1800 int rc;
1801
1802 rc = lu_kmem_init(echo_caches);
1803 if (rc == 0) {
1804 rc = class_register_type(&echo_client_obd_ops, NULL,
1805 LUSTRE_ECHO_CLIENT_NAME,
1806 &echo_device_type);
1807 if (rc)
1808 lu_kmem_fini(echo_caches);
1809 }
1810 return rc;
1811 }
1812
1813 static void echo_client_exit(void)
1814 {
1815 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1816 lu_kmem_fini(echo_caches);
1817 }
1818
1819 static int __init obdecho_init(void)
1820 {
1821 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1822
1823 LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
1824
1825 return echo_client_init();
1826 }
1827
1828 static void /*__exit*/ obdecho_exit(void)
1829 {
1830 echo_client_exit();
1831 }
1832
1833 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1834 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1835 MODULE_VERSION(LUSTRE_VERSION_STRING);
1836 MODULE_LICENSE("GPL");
1837
1838 module_init(obdecho_init);
1839 module_exit(obdecho_exit);
1840
1841 /** @} echo_client */
This page took 0.085269 seconds and 5 git commands to generate.