4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
50 #include "echo_internal.h"
52 /** \defgroup echo_client Echo Client
57 struct cl_device ed_cl
;
58 struct echo_client_obd
*ed_ec
;
60 struct cl_site ed_site_myself
;
61 struct cl_site
*ed_site
;
62 struct lu_device
*ed_next
;
66 struct cl_object eo_cl
;
67 struct cl_object_header eo_hdr
;
69 struct echo_device
*eo_dev
;
70 struct list_head eo_obj_chain
;
71 struct lov_stripe_md
*eo_lsm
;
76 struct echo_object_conf
{
77 struct cl_object_conf eoc_cl
;
78 struct lov_stripe_md
**eoc_md
;
82 struct cl_page_slice ep_cl
;
87 struct cl_lock_slice el_cl
;
88 struct list_head el_chain
;
89 struct echo_object
*el_object
;
94 static int echo_client_setup(const struct lu_env
*env
,
95 struct obd_device
*obddev
,
96 struct lustre_cfg
*lcfg
);
97 static int echo_client_cleanup(struct obd_device
*obddev
);
99 /** \defgroup echo_helpers Helper functions
102 static inline struct echo_device
*cl2echo_dev(const struct cl_device
*dev
)
104 return container_of0(dev
, struct echo_device
, ed_cl
);
107 static inline struct cl_device
*echo_dev2cl(struct echo_device
*d
)
112 static inline struct echo_device
*obd2echo_dev(const struct obd_device
*obd
)
114 return cl2echo_dev(lu2cl_dev(obd
->obd_lu_dev
));
117 static inline struct cl_object
*echo_obj2cl(struct echo_object
*eco
)
122 static inline struct echo_object
*cl2echo_obj(const struct cl_object
*o
)
124 return container_of(o
, struct echo_object
, eo_cl
);
127 static inline struct echo_page
*cl2echo_page(const struct cl_page_slice
*s
)
129 return container_of(s
, struct echo_page
, ep_cl
);
132 static inline struct echo_lock
*cl2echo_lock(const struct cl_lock_slice
*s
)
134 return container_of(s
, struct echo_lock
, el_cl
);
137 static inline struct cl_lock
*echo_lock2cl(const struct echo_lock
*ecl
)
139 return ecl
->el_cl
.cls_lock
;
142 static struct lu_context_key echo_thread_key
;
143 static inline struct echo_thread_info
*echo_env_info(const struct lu_env
*env
)
145 struct echo_thread_info
*info
;
147 info
= lu_context_key_get(&env
->le_ctx
, &echo_thread_key
);
153 struct echo_object_conf
*cl2echo_conf(const struct cl_object_conf
*c
)
155 return container_of(c
, struct echo_object_conf
, eoc_cl
);
158 /** @} echo_helpers */
160 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
161 struct lov_stripe_md
**lsm
);
162 static int cl_echo_object_put(struct echo_object
*eco
);
163 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
164 struct page
**pages
, int npages
, int async
);
166 struct echo_thread_info
{
167 struct echo_object_conf eti_conf
;
168 struct lustre_md eti_md
;
170 struct cl_2queue eti_queue
;
172 struct cl_lock eti_lock
;
173 struct lu_fid eti_fid
;
174 struct lu_fid eti_fid2
;
177 /* No session used right now */
178 struct echo_session_info
{
182 static struct kmem_cache
*echo_lock_kmem
;
183 static struct kmem_cache
*echo_object_kmem
;
184 static struct kmem_cache
*echo_thread_kmem
;
185 static struct kmem_cache
*echo_session_kmem
;
187 static struct lu_kmem_descr echo_caches
[] = {
189 .ckd_cache
= &echo_lock_kmem
,
190 .ckd_name
= "echo_lock_kmem",
191 .ckd_size
= sizeof(struct echo_lock
)
194 .ckd_cache
= &echo_object_kmem
,
195 .ckd_name
= "echo_object_kmem",
196 .ckd_size
= sizeof(struct echo_object
)
199 .ckd_cache
= &echo_thread_kmem
,
200 .ckd_name
= "echo_thread_kmem",
201 .ckd_size
= sizeof(struct echo_thread_info
)
204 .ckd_cache
= &echo_session_kmem
,
205 .ckd_name
= "echo_session_kmem",
206 .ckd_size
= sizeof(struct echo_session_info
)
213 /** \defgroup echo_page Page operations
215 * Echo page operations.
219 static int echo_page_own(const struct lu_env
*env
,
220 const struct cl_page_slice
*slice
,
221 struct cl_io
*io
, int nonblock
)
223 struct echo_page
*ep
= cl2echo_page(slice
);
226 mutex_lock(&ep
->ep_lock
);
227 else if (!mutex_trylock(&ep
->ep_lock
))
232 static void echo_page_disown(const struct lu_env
*env
,
233 const struct cl_page_slice
*slice
,
236 struct echo_page
*ep
= cl2echo_page(slice
);
238 LASSERT(mutex_is_locked(&ep
->ep_lock
));
239 mutex_unlock(&ep
->ep_lock
);
242 static void echo_page_discard(const struct lu_env
*env
,
243 const struct cl_page_slice
*slice
,
244 struct cl_io
*unused
)
246 cl_page_delete(env
, slice
->cpl_page
);
249 static int echo_page_is_vmlocked(const struct lu_env
*env
,
250 const struct cl_page_slice
*slice
)
252 if (mutex_is_locked(&cl2echo_page(slice
)->ep_lock
))
257 static void echo_page_completion(const struct lu_env
*env
,
258 const struct cl_page_slice
*slice
,
261 LASSERT(slice
->cpl_page
->cp_sync_io
);
264 static void echo_page_fini(const struct lu_env
*env
,
265 struct cl_page_slice
*slice
)
267 struct echo_object
*eco
= cl2echo_obj(slice
->cpl_obj
);
269 atomic_dec(&eco
->eo_npages
);
270 put_page(slice
->cpl_page
->cp_vmpage
);
273 static int echo_page_prep(const struct lu_env
*env
,
274 const struct cl_page_slice
*slice
,
275 struct cl_io
*unused
)
280 static int echo_page_print(const struct lu_env
*env
,
281 const struct cl_page_slice
*slice
,
282 void *cookie
, lu_printer_t printer
)
284 struct echo_page
*ep
= cl2echo_page(slice
);
286 (*printer
)(env
, cookie
, LUSTRE_ECHO_CLIENT_NAME
"-page@%p %d vm@%p\n",
287 ep
, mutex_is_locked(&ep
->ep_lock
),
288 slice
->cpl_page
->cp_vmpage
);
292 static const struct cl_page_operations echo_page_ops
= {
293 .cpo_own
= echo_page_own
,
294 .cpo_disown
= echo_page_disown
,
295 .cpo_discard
= echo_page_discard
,
296 .cpo_fini
= echo_page_fini
,
297 .cpo_print
= echo_page_print
,
298 .cpo_is_vmlocked
= echo_page_is_vmlocked
,
301 .cpo_prep
= echo_page_prep
,
302 .cpo_completion
= echo_page_completion
,
305 .cpo_prep
= echo_page_prep
,
306 .cpo_completion
= echo_page_completion
,
313 /** \defgroup echo_lock Locking
315 * echo lock operations
319 static void echo_lock_fini(const struct lu_env
*env
,
320 struct cl_lock_slice
*slice
)
322 struct echo_lock
*ecl
= cl2echo_lock(slice
);
324 LASSERT(list_empty(&ecl
->el_chain
));
325 kmem_cache_free(echo_lock_kmem
, ecl
);
328 static struct cl_lock_operations echo_lock_ops
= {
329 .clo_fini
= echo_lock_fini
,
334 /** \defgroup echo_cl_ops cl_object operations
336 * operations for cl_object
340 static int echo_page_init(const struct lu_env
*env
, struct cl_object
*obj
,
341 struct cl_page
*page
, pgoff_t index
)
343 struct echo_page
*ep
= cl_object_page_slice(obj
, page
);
344 struct echo_object
*eco
= cl2echo_obj(obj
);
346 get_page(page
->cp_vmpage
);
347 mutex_init(&ep
->ep_lock
);
348 cl_page_slice_add(page
, &ep
->ep_cl
, obj
, index
, &echo_page_ops
);
349 atomic_inc(&eco
->eo_npages
);
353 static int echo_io_init(const struct lu_env
*env
, struct cl_object
*obj
,
359 static int echo_lock_init(const struct lu_env
*env
,
360 struct cl_object
*obj
, struct cl_lock
*lock
,
361 const struct cl_io
*unused
)
363 struct echo_lock
*el
;
365 el
= kmem_cache_zalloc(echo_lock_kmem
, GFP_NOFS
);
367 cl_lock_slice_add(lock
, &el
->el_cl
, obj
, &echo_lock_ops
);
368 el
->el_object
= cl2echo_obj(obj
);
369 INIT_LIST_HEAD(&el
->el_chain
);
370 atomic_set(&el
->el_refcount
, 0);
372 return !el
? -ENOMEM
: 0;
375 static int echo_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
376 const struct cl_object_conf
*conf
)
381 static const struct cl_object_operations echo_cl_obj_ops
= {
382 .coo_page_init
= echo_page_init
,
383 .coo_lock_init
= echo_lock_init
,
384 .coo_io_init
= echo_io_init
,
385 .coo_conf_set
= echo_conf_set
388 /** @} echo_cl_ops */
390 /** \defgroup echo_lu_ops lu_object operations
392 * operations for echo lu object.
396 static int echo_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
397 const struct lu_object_conf
*conf
)
399 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(obj
->lo_dev
));
400 struct echo_client_obd
*ec
= ed
->ed_ec
;
401 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
402 const struct cl_object_conf
*cconf
;
403 struct echo_object_conf
*econf
;
406 struct lu_object
*below
;
407 struct lu_device
*under
;
410 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
,
414 lu_object_add(obj
, below
);
417 cconf
= lu2cl_conf(conf
);
418 econf
= cl2echo_conf(cconf
);
420 LASSERT(econf
->eoc_md
);
421 eco
->eo_lsm
= *econf
->eoc_md
;
422 /* clear the lsm pointer so that it won't get freed. */
423 *econf
->eoc_md
= NULL
;
426 atomic_set(&eco
->eo_npages
, 0);
427 cl_object_page_init(lu2cl(obj
), sizeof(struct echo_page
));
429 spin_lock(&ec
->ec_lock
);
430 list_add_tail(&eco
->eo_obj_chain
, &ec
->ec_objects
);
431 spin_unlock(&ec
->ec_lock
);
436 /* taken from osc_unpackmd() */
437 static int echo_alloc_memmd(struct echo_device
*ed
,
438 struct lov_stripe_md
**lsmp
)
442 /* If export is lov/osc then use their obd method */
444 return obd_alloc_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
445 /* OFD has no unpackmd method, do everything here */
446 lsm_size
= lov_stripe_md_size(1);
449 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
453 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
), GFP_NOFS
);
454 if (!(*lsmp
)->lsm_oinfo
[0]) {
459 loi_init((*lsmp
)->lsm_oinfo
[0]);
460 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
461 ostid_set_seq_echo(&(*lsmp
)->lsm_oi
);
466 static int echo_free_memmd(struct echo_device
*ed
, struct lov_stripe_md
**lsmp
)
470 /* If export is lov/osc then use their obd method */
472 return obd_free_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
473 /* OFD has no unpackmd method, do everything here */
474 lsm_size
= lov_stripe_md_size(1);
476 kfree((*lsmp
)->lsm_oinfo
[0]);
482 static void echo_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
484 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
485 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
487 LASSERT(atomic_read(&eco
->eo_npages
) == 0);
489 spin_lock(&ec
->ec_lock
);
490 list_del_init(&eco
->eo_obj_chain
);
491 spin_unlock(&ec
->ec_lock
);
494 lu_object_header_fini(obj
->lo_header
);
497 echo_free_memmd(eco
->eo_dev
, &eco
->eo_lsm
);
498 kmem_cache_free(echo_object_kmem
, eco
);
501 static int echo_object_print(const struct lu_env
*env
, void *cookie
,
502 lu_printer_t p
, const struct lu_object
*o
)
504 struct echo_object
*obj
= cl2echo_obj(lu2cl(o
));
506 return (*p
)(env
, cookie
, "echoclient-object@%p", obj
);
509 static const struct lu_object_operations echo_lu_obj_ops
= {
510 .loo_object_init
= echo_object_init
,
511 .loo_object_delete
= NULL
,
512 .loo_object_release
= NULL
,
513 .loo_object_free
= echo_object_free
,
514 .loo_object_print
= echo_object_print
,
515 .loo_object_invariant
= NULL
518 /** @} echo_lu_ops */
520 /** \defgroup echo_lu_dev_ops lu_device operations
522 * Operations for echo lu device.
526 static struct lu_object
*echo_object_alloc(const struct lu_env
*env
,
527 const struct lu_object_header
*hdr
,
528 struct lu_device
*dev
)
530 struct echo_object
*eco
;
531 struct lu_object
*obj
= NULL
;
533 /* we're the top dev. */
535 eco
= kmem_cache_zalloc(echo_object_kmem
, GFP_NOFS
);
537 struct cl_object_header
*hdr
= &eco
->eo_hdr
;
539 obj
= &echo_obj2cl(eco
)->co_lu
;
540 cl_object_header_init(hdr
);
541 hdr
->coh_page_bufsize
= cfs_size_round(sizeof(struct cl_page
));
543 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
544 lu_object_add_top(&hdr
->coh_lu
, obj
);
546 eco
->eo_cl
.co_ops
= &echo_cl_obj_ops
;
547 obj
->lo_ops
= &echo_lu_obj_ops
;
552 static const struct lu_device_operations echo_device_lu_ops
= {
553 .ldo_object_alloc
= echo_object_alloc
,
556 /** @} echo_lu_dev_ops */
558 static const struct cl_device_operations echo_device_cl_ops
= {
561 /** \defgroup echo_init Setup and teardown
563 * Init and fini functions for echo client.
567 static int echo_site_init(const struct lu_env
*env
, struct echo_device
*ed
)
569 struct cl_site
*site
= &ed
->ed_site_myself
;
572 /* initialize site */
573 rc
= cl_site_init(site
, &ed
->ed_cl
);
575 CERROR("Cannot initialize site for echo client(%d)\n", rc
);
579 rc
= lu_site_init_finish(&site
->cs_lu
);
587 static void echo_site_fini(const struct lu_env
*env
, struct echo_device
*ed
)
590 cl_site_fini(ed
->ed_site
);
595 static void *echo_thread_key_init(const struct lu_context
*ctx
,
596 struct lu_context_key
*key
)
598 struct echo_thread_info
*info
;
600 info
= kmem_cache_zalloc(echo_thread_kmem
, GFP_NOFS
);
602 info
= ERR_PTR(-ENOMEM
);
606 static void echo_thread_key_fini(const struct lu_context
*ctx
,
607 struct lu_context_key
*key
, void *data
)
609 struct echo_thread_info
*info
= data
;
611 kmem_cache_free(echo_thread_kmem
, info
);
614 static void echo_thread_key_exit(const struct lu_context
*ctx
,
615 struct lu_context_key
*key
, void *data
)
619 static struct lu_context_key echo_thread_key
= {
620 .lct_tags
= LCT_CL_THREAD
,
621 .lct_init
= echo_thread_key_init
,
622 .lct_fini
= echo_thread_key_fini
,
623 .lct_exit
= echo_thread_key_exit
626 static void *echo_session_key_init(const struct lu_context
*ctx
,
627 struct lu_context_key
*key
)
629 struct echo_session_info
*session
;
631 session
= kmem_cache_zalloc(echo_session_kmem
, GFP_NOFS
);
633 session
= ERR_PTR(-ENOMEM
);
637 static void echo_session_key_fini(const struct lu_context
*ctx
,
638 struct lu_context_key
*key
, void *data
)
640 struct echo_session_info
*session
= data
;
642 kmem_cache_free(echo_session_kmem
, session
);
645 static void echo_session_key_exit(const struct lu_context
*ctx
,
646 struct lu_context_key
*key
, void *data
)
650 static struct lu_context_key echo_session_key
= {
651 .lct_tags
= LCT_SESSION
,
652 .lct_init
= echo_session_key_init
,
653 .lct_fini
= echo_session_key_fini
,
654 .lct_exit
= echo_session_key_exit
657 LU_TYPE_INIT_FINI(echo
, &echo_thread_key
, &echo_session_key
);
659 static struct lu_device
*echo_device_alloc(const struct lu_env
*env
,
660 struct lu_device_type
*t
,
661 struct lustre_cfg
*cfg
)
663 struct lu_device
*next
;
664 struct echo_device
*ed
;
665 struct cl_device
*cd
;
666 struct obd_device
*obd
= NULL
; /* to keep compiler happy */
667 struct obd_device
*tgt
;
668 const char *tgt_type_name
;
671 ed
= kzalloc(sizeof(*ed
), GFP_NOFS
);
678 rc
= cl_device_init(cd
, t
);
682 cd
->cd_lu_dev
.ld_ops
= &echo_device_lu_ops
;
683 cd
->cd_ops
= &echo_device_cl_ops
;
685 obd
= class_name2obd(lustre_cfg_string(cfg
, 0));
689 tgt
= class_name2obd(lustre_cfg_string(cfg
, 1));
691 CERROR("Can not find tgt device %s\n",
692 lustre_cfg_string(cfg
, 1));
694 goto out_device_fini
;
697 next
= tgt
->obd_lu_dev
;
698 if (!strcmp(tgt
->obd_type
->typ_name
, LUSTRE_MDT_NAME
)) {
699 CERROR("echo MDT client must be run on server\n");
701 goto out_device_fini
;
704 rc
= echo_site_init(env
, ed
);
706 goto out_device_fini
;
708 rc
= echo_client_setup(env
, obd
, cfg
);
712 ed
->ed_ec
= &obd
->u
.echo_client
;
714 /* if echo client is to be stacked upon ost device, the next is
715 * NULL since ost is not a clio device so far
717 if (next
&& !lu_device_is_cl(next
))
720 tgt_type_name
= tgt
->obd_type
->typ_name
;
727 next
->ld_site
= &ed
->ed_site
->cs_lu
;
728 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(env
, next
,
729 next
->ld_type
->ldt_name
,
735 LASSERT(strcmp(tgt_type_name
, LUSTRE_OST_NAME
) == 0);
739 return &cd
->cd_lu_dev
;
742 err
= echo_client_cleanup(obd
);
744 CERROR("Cleanup obd device %s error(%d)\n",
747 echo_site_fini(env
, ed
);
749 cl_device_fini(&ed
->ed_cl
);
756 static int echo_device_init(const struct lu_env
*env
, struct lu_device
*d
,
757 const char *name
, struct lu_device
*next
)
763 static struct lu_device
*echo_device_fini(const struct lu_env
*env
,
766 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
767 struct lu_device
*next
= ed
->ed_next
;
770 next
= next
->ld_type
->ldt_ops
->ldto_device_fini(env
, next
);
774 static void echo_lock_release(const struct lu_env
*env
,
775 struct echo_lock
*ecl
,
778 struct cl_lock
*clk
= echo_lock2cl(ecl
);
780 cl_lock_release(env
, clk
);
783 static struct lu_device
*echo_device_free(const struct lu_env
*env
,
786 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
787 struct echo_client_obd
*ec
= ed
->ed_ec
;
788 struct echo_object
*eco
;
789 struct lu_device
*next
= ed
->ed_next
;
791 CDEBUG(D_INFO
, "echo device:%p is going to be freed, next = %p\n",
794 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
796 /* check if there are objects still alive.
797 * It shouldn't have any object because lu_site_purge would cleanup
798 * all of cached objects. Anyway, probably the echo device is being
799 * parallelly accessed.
801 spin_lock(&ec
->ec_lock
);
802 list_for_each_entry(eco
, &ec
->ec_objects
, eo_obj_chain
)
804 spin_unlock(&ec
->ec_lock
);
807 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
810 "Waiting for the reference of echo object to be dropped\n");
812 /* Wait for the last reference to be dropped. */
813 spin_lock(&ec
->ec_lock
);
814 while (!list_empty(&ec
->ec_objects
)) {
815 spin_unlock(&ec
->ec_lock
);
816 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
817 set_current_state(TASK_UNINTERRUPTIBLE
);
818 schedule_timeout(cfs_time_seconds(1));
819 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
820 spin_lock(&ec
->ec_lock
);
822 spin_unlock(&ec
->ec_lock
);
824 LASSERT(list_empty(&ec
->ec_locks
));
826 CDEBUG(D_INFO
, "No object exists, exiting...\n");
828 echo_client_cleanup(d
->ld_obd
);
831 next
= next
->ld_type
->ldt_ops
->ldto_device_free(env
, next
);
833 LASSERT(ed
->ed_site
== lu2cl_site(d
->ld_site
));
834 echo_site_fini(env
, ed
);
835 cl_device_fini(&ed
->ed_cl
);
841 static const struct lu_device_type_operations echo_device_type_ops
= {
842 .ldto_init
= echo_type_init
,
843 .ldto_fini
= echo_type_fini
,
845 .ldto_start
= echo_type_start
,
846 .ldto_stop
= echo_type_stop
,
848 .ldto_device_alloc
= echo_device_alloc
,
849 .ldto_device_free
= echo_device_free
,
850 .ldto_device_init
= echo_device_init
,
851 .ldto_device_fini
= echo_device_fini
854 static struct lu_device_type echo_device_type
= {
855 .ldt_tags
= LU_DEVICE_CL
,
856 .ldt_name
= LUSTRE_ECHO_CLIENT_NAME
,
857 .ldt_ops
= &echo_device_type_ops
,
858 .ldt_ctx_tags
= LCT_CL_THREAD
,
863 /** \defgroup echo_exports Exported operations
865 * exporting functions to echo client
870 /* Interfaces to echo client obd device */
871 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
872 struct lov_stripe_md
**lsmp
)
875 struct echo_thread_info
*info
;
876 struct echo_object_conf
*conf
;
877 struct lov_stripe_md
*lsm
;
878 struct echo_object
*eco
;
879 struct cl_object
*obj
;
887 LASSERTF(ostid_id(&lsm
->lsm_oi
) != 0, DOSTID
"\n", POSTID(&lsm
->lsm_oi
));
888 LASSERTF(ostid_seq(&lsm
->lsm_oi
) == FID_SEQ_ECHO
, DOSTID
"\n",
889 POSTID(&lsm
->lsm_oi
));
891 /* Never return an object if the obd is to be freed. */
892 if (echo_dev2cl(d
)->cd_lu_dev
.ld_obd
->obd_stopping
)
893 return ERR_PTR(-ENODEV
);
895 env
= cl_env_get(&refcheck
);
899 info
= echo_env_info(env
);
900 conf
= &info
->eti_conf
;
902 struct lov_oinfo
*oinfo
= lsm
->lsm_oinfo
[0];
905 oinfo
->loi_oi
= lsm
->lsm_oi
;
906 conf
->eoc_cl
.u
.coc_oinfo
= oinfo
;
910 fid
= &info
->eti_fid
;
911 rc
= ostid_to_fid(fid
, &lsm
->lsm_oi
, 0);
917 /* In the function below, .hs_keycmp resolves to
918 * lu_obj_hop_keycmp()
920 /* coverity[overrun-buffer-val] */
921 obj
= cl_object_find(env
, echo_dev2cl(d
), fid
, &conf
->eoc_cl
);
927 eco
= cl2echo_obj(obj
);
928 if (eco
->eo_deleted
) {
929 cl_object_put(env
, obj
);
930 eco
= ERR_PTR(-EAGAIN
);
934 cl_env_put(env
, &refcheck
);
938 static int cl_echo_object_put(struct echo_object
*eco
)
941 struct cl_object
*obj
= echo_obj2cl(eco
);
944 env
= cl_env_get(&refcheck
);
948 /* an external function to kill an object? */
949 if (eco
->eo_deleted
) {
950 struct lu_object_header
*loh
= obj
->co_lu
.lo_header
;
952 LASSERT(&eco
->eo_hdr
== luh2coh(loh
));
953 set_bit(LU_OBJECT_HEARD_BANSHEE
, &loh
->loh_flags
);
956 cl_object_put(env
, obj
);
957 cl_env_put(env
, &refcheck
);
961 static int cl_echo_enqueue0(struct lu_env
*env
, struct echo_object
*eco
,
962 u64 start
, u64 end
, int mode
,
963 __u64
*cookie
, __u32 enqflags
)
967 struct cl_object
*obj
;
968 struct cl_lock_descr
*descr
;
969 struct echo_thread_info
*info
;
972 info
= echo_env_info(env
);
974 lck
= &info
->eti_lock
;
975 obj
= echo_obj2cl(eco
);
977 memset(lck
, 0, sizeof(*lck
));
978 descr
= &lck
->cll_descr
;
979 descr
->cld_obj
= obj
;
980 descr
->cld_start
= cl_index(obj
, start
);
981 descr
->cld_end
= cl_index(obj
, end
);
982 descr
->cld_mode
= mode
== LCK_PW
? CLM_WRITE
: CLM_READ
;
983 descr
->cld_enq_flags
= enqflags
;
986 rc
= cl_lock_request(env
, io
, lck
);
988 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
989 struct echo_lock
*el
;
991 el
= cl2echo_lock(cl_lock_at(lck
, &echo_device_type
));
992 spin_lock(&ec
->ec_lock
);
993 if (list_empty(&el
->el_chain
)) {
994 list_add(&el
->el_chain
, &ec
->ec_locks
);
995 el
->el_cookie
= ++ec
->ec_unique
;
997 atomic_inc(&el
->el_refcount
);
998 *cookie
= el
->el_cookie
;
999 spin_unlock(&ec
->ec_lock
);
1004 static int cl_echo_cancel0(struct lu_env
*env
, struct echo_device
*ed
,
1007 struct echo_client_obd
*ec
= ed
->ed_ec
;
1008 struct echo_lock
*ecl
= NULL
;
1009 struct list_head
*el
;
1010 int found
= 0, still_used
= 0;
1012 spin_lock(&ec
->ec_lock
);
1013 list_for_each(el
, &ec
->ec_locks
) {
1014 ecl
= list_entry(el
, struct echo_lock
, el_chain
);
1015 CDEBUG(D_INFO
, "ecl: %p, cookie: %#llx\n", ecl
, ecl
->el_cookie
);
1016 found
= (ecl
->el_cookie
== cookie
);
1018 if (atomic_dec_and_test(&ecl
->el_refcount
))
1019 list_del_init(&ecl
->el_chain
);
1025 spin_unlock(&ec
->ec_lock
);
1030 echo_lock_release(env
, ecl
, still_used
);
1034 static void echo_commit_callback(const struct lu_env
*env
, struct cl_io
*io
,
1035 struct cl_page
*page
)
1037 struct echo_thread_info
*info
;
1038 struct cl_2queue
*queue
;
1040 info
= echo_env_info(env
);
1041 LASSERT(io
== &info
->eti_io
);
1043 queue
= &info
->eti_queue
;
1044 cl_page_list_add(&queue
->c2_qout
, page
);
1047 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
1048 struct page
**pages
, int npages
, int async
)
1051 struct echo_thread_info
*info
;
1052 struct cl_object
*obj
= echo_obj2cl(eco
);
1053 struct echo_device
*ed
= eco
->eo_dev
;
1054 struct cl_2queue
*queue
;
1056 struct cl_page
*clp
;
1057 struct lustre_handle lh
= { 0 };
1058 int page_size
= cl_page_size(obj
);
1063 LASSERT((offset
& ~PAGE_MASK
) == 0);
1064 LASSERT(ed
->ed_next
);
1065 env
= cl_env_get(&refcheck
);
1067 return PTR_ERR(env
);
1069 info
= echo_env_info(env
);
1071 queue
= &info
->eti_queue
;
1073 cl_2queue_init(queue
);
1075 io
->ci_ignore_layout
= 1;
1076 rc
= cl_io_init(env
, io
, CIT_MISC
, obj
);
1081 rc
= cl_echo_enqueue0(env
, eco
, offset
,
1082 offset
+ npages
* PAGE_SIZE
- 1,
1083 rw
== READ
? LCK_PR
: LCK_PW
, &lh
.cookie
,
1088 for (i
= 0; i
< npages
; i
++) {
1090 clp
= cl_page_find(env
, obj
, cl_index(obj
, offset
),
1091 pages
[i
], CPT_TRANSIENT
);
1096 LASSERT(clp
->cp_type
== CPT_TRANSIENT
);
1098 rc
= cl_page_own(env
, io
, clp
);
1100 LASSERT(clp
->cp_state
== CPS_FREEING
);
1101 cl_page_put(env
, clp
);
1105 * Add a page to the incoming page list of 2-queue.
1107 cl_page_list_add(&queue
->c2_qin
, clp
);
1109 /* drop the reference count for cl_page_find, so that the page
1110 * will be freed in cl_2queue_fini.
1112 cl_page_put(env
, clp
);
1113 cl_page_clip(env
, clp
, 0, page_size
);
1115 offset
+= page_size
;
1119 enum cl_req_type typ
= rw
== READ
? CRT_READ
: CRT_WRITE
;
1121 async
= async
&& (typ
== CRT_WRITE
);
1123 rc
= cl_io_commit_async(env
, io
, &queue
->c2_qin
,
1125 echo_commit_callback
);
1127 rc
= cl_io_submit_sync(env
, io
, typ
, queue
, 0);
1128 CDEBUG(D_INFO
, "echo_client %s write returns %d\n",
1129 async
? "async" : "sync", rc
);
1132 cl_echo_cancel0(env
, ed
, lh
.cookie
);
1134 cl_2queue_discard(env
, io
, queue
);
1135 cl_2queue_disown(env
, io
, queue
);
1136 cl_2queue_fini(env
, queue
);
1137 cl_io_fini(env
, io
);
1139 cl_env_put(env
, &refcheck
);
1143 /** @} echo_exports */
1145 static u64 last_object_id
;
1147 static int echo_create_object(const struct lu_env
*env
, struct echo_device
*ed
,
1148 struct obdo
*oa
, struct obd_trans_info
*oti
)
1150 struct echo_object
*eco
;
1151 struct echo_client_obd
*ec
= ed
->ed_ec
;
1152 struct lov_stripe_md
*lsm
= NULL
;
1156 if (!(oa
->o_valid
& OBD_MD_FLID
) ||
1157 !(oa
->o_valid
& OBD_MD_FLGROUP
) ||
1158 !fid_seq_is_echo(ostid_seq(&oa
->o_oi
))) {
1159 CERROR("invalid oid " DOSTID
"\n", POSTID(&oa
->o_oi
));
1163 rc
= echo_alloc_memmd(ed
, &lsm
);
1165 CERROR("Cannot allocate md: rc = %d\n", rc
);
1169 /* setup object ID here */
1170 lsm
->lsm_oi
= oa
->o_oi
;
1172 if (ostid_id(&lsm
->lsm_oi
) == 0)
1173 ostid_set_id(&lsm
->lsm_oi
, ++last_object_id
);
1175 rc
= obd_create(env
, ec
->ec_exp
, oa
, &lsm
, oti
);
1177 CERROR("Cannot create objects: rc = %d\n", rc
);
1182 /* See what object ID we were given */
1183 oa
->o_oi
= lsm
->lsm_oi
;
1184 oa
->o_valid
|= OBD_MD_FLID
;
1186 eco
= cl_echo_object_find(ed
, &lsm
);
1191 cl_echo_object_put(eco
);
1193 CDEBUG(D_INFO
, "oa oid "DOSTID
"\n", POSTID(&oa
->o_oi
));
1197 obd_destroy(env
, ec
->ec_exp
, oa
, lsm
, oti
, NULL
);
1199 echo_free_memmd(ed
, &lsm
);
1201 CERROR("create object failed with: rc = %d\n", rc
);
1205 static int echo_get_object(struct echo_object
**ecop
, struct echo_device
*ed
,
1208 struct lov_stripe_md
*lsm
= NULL
;
1209 struct echo_object
*eco
;
1212 if ((oa
->o_valid
& OBD_MD_FLID
) == 0 || ostid_id(&oa
->o_oi
) == 0) {
1213 /* disallow use of object id 0 */
1214 CERROR("No valid oid\n");
1218 rc
= echo_alloc_memmd(ed
, &lsm
);
1222 lsm
->lsm_oi
= oa
->o_oi
;
1223 if (!(oa
->o_valid
& OBD_MD_FLGROUP
))
1224 ostid_set_seq_echo(&lsm
->lsm_oi
);
1227 eco
= cl_echo_object_find(ed
, &lsm
);
1233 echo_free_memmd(ed
, &lsm
);
1237 static void echo_put_object(struct echo_object
*eco
)
1241 rc
= cl_echo_object_put(eco
);
1243 CERROR("%s: echo client drop an object failed: rc = %d\n",
1244 eco
->eo_dev
->ed_ec
->ec_exp
->exp_obd
->obd_name
, rc
);
1248 echo_client_page_debug_setup(struct page
*page
, int rw
, u64 id
,
1249 u64 offset
, u64 count
)
1256 /* no partial pages on the client */
1257 LASSERT(count
== PAGE_SIZE
);
1261 for (delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1262 if (rw
== OBD_BRW_WRITE
) {
1263 stripe_off
= offset
+ delta
;
1266 stripe_off
= 0xdeadbeef00c0ffeeULL
;
1267 stripe_id
= 0xdeadbeef00c0ffeeULL
;
1269 block_debug_setup(addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1270 stripe_off
, stripe_id
);
1276 static int echo_client_page_debug_check(struct page
*page
, u64 id
,
1277 u64 offset
, u64 count
)
1286 /* no partial pages on the client */
1287 LASSERT(count
== PAGE_SIZE
);
1291 for (rc
= delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1292 stripe_off
= offset
+ delta
;
1295 rc2
= block_debug_check("test_brw",
1296 addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1297 stripe_off
, stripe_id
);
1299 CERROR("Error in echo object %#llx\n", id
);
1308 static int echo_client_kbrw(struct echo_device
*ed
, int rw
, struct obdo
*oa
,
1309 struct echo_object
*eco
, u64 offset
,
1310 u64 count
, int async
,
1311 struct obd_trans_info
*oti
)
1314 struct brw_page
*pga
;
1315 struct brw_page
*pgp
;
1316 struct page
**pages
;
1324 verify
= (ostid_id(&oa
->o_oi
) != ECHO_PERSISTENT_OBJID
&&
1325 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
1326 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
1328 gfp_mask
= ((ostid_id(&oa
->o_oi
) & 2) == 0) ? GFP_KERNEL
: GFP_HIGHUSER
;
1330 LASSERT(rw
== OBD_BRW_WRITE
|| rw
== OBD_BRW_READ
);
1333 (count
& (~PAGE_MASK
)) != 0)
1336 /* XXX think again with misaligned I/O */
1337 npages
= count
>> PAGE_SHIFT
;
1339 if (rw
== OBD_BRW_WRITE
)
1340 brw_flags
= OBD_BRW_ASYNC
;
1342 pga
= kcalloc(npages
, sizeof(*pga
), GFP_NOFS
);
1346 pages
= kcalloc(npages
, sizeof(*pages
), GFP_NOFS
);
1352 for (i
= 0, pgp
= pga
, off
= offset
;
1354 i
++, pgp
++, off
+= PAGE_SIZE
) {
1355 LASSERT(!pgp
->pg
); /* for cleanup */
1358 pgp
->pg
= alloc_page(gfp_mask
);
1363 pgp
->count
= PAGE_SIZE
;
1365 pgp
->flag
= brw_flags
;
1368 echo_client_page_debug_setup(pgp
->pg
, rw
,
1369 ostid_id(&oa
->o_oi
), off
,
1373 /* brw mode can only be used at client */
1374 LASSERT(ed
->ed_next
);
1375 rc
= cl_echo_object_brw(eco
, rw
, offset
, pages
, npages
, async
);
1378 if (rc
!= 0 || rw
!= OBD_BRW_READ
)
1381 for (i
= 0, pgp
= pga
; i
< npages
; i
++, pgp
++) {
1388 vrc
= echo_client_page_debug_check(pgp
->pg
,
1389 ostid_id(&oa
->o_oi
),
1390 pgp
->off
, pgp
->count
);
1391 if (vrc
!= 0 && rc
== 0)
1394 __free_page(pgp
->pg
);
1401 static int echo_client_prep_commit(const struct lu_env
*env
,
1402 struct obd_export
*exp
, int rw
,
1403 struct obdo
*oa
, struct echo_object
*eco
,
1404 u64 offset
, u64 count
,
1405 u64 batch
, struct obd_trans_info
*oti
,
1408 struct obd_ioobj ioo
;
1409 struct niobuf_local
*lnb
;
1410 struct niobuf_remote
*rnb
;
1412 u64 npages
, tot_pages
;
1413 int i
, ret
= 0, brw_flags
= 0;
1415 if (count
<= 0 || (count
& (~PAGE_MASK
)) != 0)
1418 npages
= batch
>> PAGE_SHIFT
;
1419 tot_pages
= count
>> PAGE_SHIFT
;
1421 lnb
= kcalloc(npages
, sizeof(struct niobuf_local
), GFP_NOFS
);
1422 rnb
= kcalloc(npages
, sizeof(struct niobuf_remote
), GFP_NOFS
);
1429 if (rw
== OBD_BRW_WRITE
&& async
)
1430 brw_flags
|= OBD_BRW_ASYNC
;
1432 obdo_to_ioobj(oa
, &ioo
);
1436 for (; tot_pages
; tot_pages
-= npages
) {
1439 if (tot_pages
< npages
)
1442 for (i
= 0; i
< npages
; i
++, off
+= PAGE_SIZE
) {
1443 rnb
[i
].offset
= off
;
1444 rnb
[i
].len
= PAGE_SIZE
;
1445 rnb
[i
].flags
= brw_flags
;
1448 ioo
.ioo_bufcnt
= npages
;
1449 oti
->oti_transno
= 0;
1452 ret
= obd_preprw(env
, rw
, exp
, oa
, 1, &ioo
, rnb
, &lpages
,
1456 LASSERT(lpages
== npages
);
1458 for (i
= 0; i
< lpages
; i
++) {
1459 struct page
*page
= lnb
[i
].page
;
1461 /* read past eof? */
1462 if (!page
&& lnb
[i
].rc
== 0)
1466 lnb
[i
].flags
|= OBD_BRW_ASYNC
;
1468 if (ostid_id(&oa
->o_oi
) == ECHO_PERSISTENT_OBJID
||
1469 (oa
->o_valid
& OBD_MD_FLFLAGS
) == 0 ||
1470 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) == 0)
1473 if (rw
== OBD_BRW_WRITE
)
1474 echo_client_page_debug_setup(page
, rw
,
1475 ostid_id(&oa
->o_oi
),
1479 echo_client_page_debug_check(page
,
1480 ostid_id(&oa
->o_oi
),
1485 ret
= obd_commitrw(env
, rw
, exp
, oa
, 1, &ioo
,
1486 rnb
, npages
, lnb
, oti
, ret
);
1490 /* Reset oti otherwise it would confuse ldiskfs. */
1491 memset(oti
, 0, sizeof(*oti
));
1493 /* Reuse env context. */
1494 lu_context_exit((struct lu_context
*)&env
->le_ctx
);
1495 lu_context_enter((struct lu_context
*)&env
->le_ctx
);
1504 static int echo_client_brw_ioctl(const struct lu_env
*env
, int rw
,
1505 struct obd_export
*exp
,
1506 struct obd_ioctl_data
*data
,
1507 struct obd_trans_info
*dummy_oti
)
1509 struct obd_device
*obd
= class_exp2obd(exp
);
1510 struct echo_device
*ed
= obd2echo_dev(obd
);
1511 struct echo_client_obd
*ec
= ed
->ed_ec
;
1512 struct obdo
*oa
= &data
->ioc_obdo1
;
1513 struct echo_object
*eco
;
1518 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
1520 rc
= echo_get_object(&eco
, ed
, oa
);
1524 oa
->o_valid
&= ~OBD_MD_FLHANDLE
;
1526 /* OFD/obdfilter works only via prep/commit */
1527 test_mode
= (long)data
->ioc_pbuf1
;
1531 if (!ed
->ed_next
&& test_mode
!= 3) {
1533 data
->ioc_plen1
= data
->ioc_count
;
1536 /* Truncate batch size to maximum */
1537 if (data
->ioc_plen1
> PTLRPC_MAX_BRW_SIZE
)
1538 data
->ioc_plen1
= PTLRPC_MAX_BRW_SIZE
;
1540 switch (test_mode
) {
1544 rc
= echo_client_kbrw(ed
, rw
, oa
,
1545 eco
, data
->ioc_offset
,
1546 data
->ioc_count
, async
, dummy_oti
);
1549 rc
= echo_client_prep_commit(env
, ec
->ec_exp
, rw
, oa
,
1550 eco
, data
->ioc_offset
,
1551 data
->ioc_count
, data
->ioc_plen1
,
1557 echo_put_object(eco
);
1562 echo_client_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
1563 void *karg
, void __user
*uarg
)
1565 struct obd_device
*obd
= exp
->exp_obd
;
1566 struct echo_device
*ed
= obd2echo_dev(obd
);
1567 struct echo_client_obd
*ec
= ed
->ed_ec
;
1568 struct echo_object
*eco
;
1569 struct obd_ioctl_data
*data
= karg
;
1570 struct obd_trans_info dummy_oti
;
1572 struct oti_req_ack_lock
*ack_lock
;
1575 int rw
= OBD_BRW_READ
;
1579 memset(&dummy_oti
, 0, sizeof(dummy_oti
));
1581 oa
= &data
->ioc_obdo1
;
1582 if (!(oa
->o_valid
& OBD_MD_FLGROUP
)) {
1583 oa
->o_valid
|= OBD_MD_FLGROUP
;
1584 ostid_set_seq_echo(&oa
->o_oi
);
1587 /* This FID is unpacked just for validation at this point */
1588 rc
= ostid_to_fid(&fid
, &oa
->o_oi
, 0);
1592 env
= kzalloc(sizeof(*env
), GFP_NOFS
);
1596 rc
= lu_env_init(env
, LCT_DT_THREAD
);
1603 case OBD_IOC_CREATE
: /* may create echo object */
1604 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1609 rc
= echo_create_object(env
, ed
, oa
, &dummy_oti
);
1612 case OBD_IOC_DESTROY
:
1613 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1618 rc
= echo_get_object(&eco
, ed
, oa
);
1620 rc
= obd_destroy(env
, ec
->ec_exp
, oa
, NULL
,
1623 eco
->eo_deleted
= 1;
1624 echo_put_object(eco
);
1628 case OBD_IOC_GETATTR
:
1629 rc
= echo_get_object(&eco
, ed
, oa
);
1631 struct obd_info oinfo
= {
1635 rc
= obd_getattr(env
, ec
->ec_exp
, &oinfo
);
1636 echo_put_object(eco
);
1640 case OBD_IOC_SETATTR
:
1641 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1646 rc
= echo_get_object(&eco
, ed
, oa
);
1648 struct obd_info oinfo
= {
1652 rc
= obd_setattr(env
, ec
->ec_exp
, &oinfo
, NULL
);
1653 echo_put_object(eco
);
1657 case OBD_IOC_BRW_WRITE
:
1658 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1665 case OBD_IOC_BRW_READ
:
1666 rc
= echo_client_brw_ioctl(env
, rw
, exp
, data
, &dummy_oti
);
1670 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd
);
1679 /* XXX this should be in a helper also called by target_send_reply */
1680 for (ack_lock
= dummy_oti
.oti_ack_locks
, i
= 0; i
< 4;
1682 if (!ack_lock
->mode
)
1684 ldlm_lock_decref(&ack_lock
->lock
, ack_lock
->mode
);
1690 static int echo_client_setup(const struct lu_env
*env
,
1691 struct obd_device
*obddev
, struct lustre_cfg
*lcfg
)
1693 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1694 struct obd_device
*tgt
;
1695 struct obd_uuid echo_uuid
= { "ECHO_UUID" };
1696 struct obd_connect_data
*ocd
= NULL
;
1699 if (lcfg
->lcfg_bufcount
< 2 || LUSTRE_CFG_BUFLEN(lcfg
, 1) < 1) {
1700 CERROR("requires a TARGET OBD name\n");
1704 tgt
= class_name2obd(lustre_cfg_string(lcfg
, 1));
1705 if (!tgt
|| !tgt
->obd_attached
|| !tgt
->obd_set_up
) {
1706 CERROR("device not attached or not set up (%s)\n",
1707 lustre_cfg_string(lcfg
, 1));
1711 spin_lock_init(&ec
->ec_lock
);
1712 INIT_LIST_HEAD(&ec
->ec_objects
);
1713 INIT_LIST_HEAD(&ec
->ec_locks
);
1716 ocd
= kzalloc(sizeof(*ocd
), GFP_NOFS
);
1720 ocd
->ocd_connect_flags
= OBD_CONNECT_VERSION
| OBD_CONNECT_REQPORTAL
|
1721 OBD_CONNECT_BRW_SIZE
|
1722 OBD_CONNECT_GRANT
| OBD_CONNECT_FULL20
|
1723 OBD_CONNECT_64BITHASH
| OBD_CONNECT_LVB_TYPE
|
1725 ocd
->ocd_brw_size
= DT_MAX_BRW_SIZE
;
1726 ocd
->ocd_version
= LUSTRE_VERSION_CODE
;
1727 ocd
->ocd_group
= FID_SEQ_ECHO
;
1729 rc
= obd_connect(env
, &ec
->ec_exp
, tgt
, &echo_uuid
, ocd
, NULL
);
1734 CERROR("fail to connect to device %s\n",
1735 lustre_cfg_string(lcfg
, 1));
1742 static int echo_client_cleanup(struct obd_device
*obddev
)
1744 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1747 if (!list_empty(&obddev
->obd_exports
)) {
1748 CERROR("still has clients!\n");
1752 LASSERT(atomic_read(&ec
->ec_exp
->exp_refcount
) > 0);
1753 rc
= obd_disconnect(ec
->ec_exp
);
1755 CERROR("fail to disconnect device: %d\n", rc
);
1760 static int echo_client_connect(const struct lu_env
*env
,
1761 struct obd_export
**exp
,
1762 struct obd_device
*src
, struct obd_uuid
*cluuid
,
1763 struct obd_connect_data
*data
, void *localdata
)
1766 struct lustre_handle conn
= { 0 };
1768 rc
= class_connect(&conn
, src
, cluuid
);
1770 *exp
= class_conn2export(&conn
);
1776 static int echo_client_disconnect(struct obd_export
*exp
)
1785 rc
= class_disconnect(exp
);
1791 static struct obd_ops echo_client_obd_ops
= {
1792 .owner
= THIS_MODULE
,
1793 .iocontrol
= echo_client_iocontrol
,
1794 .connect
= echo_client_connect
,
1795 .disconnect
= echo_client_disconnect
1798 static int echo_client_init(void)
1802 rc
= lu_kmem_init(echo_caches
);
1804 rc
= class_register_type(&echo_client_obd_ops
, NULL
,
1805 LUSTRE_ECHO_CLIENT_NAME
,
1808 lu_kmem_fini(echo_caches
);
1813 static void echo_client_exit(void)
1815 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME
);
1816 lu_kmem_fini(echo_caches
);
1819 static int __init
obdecho_init(void)
1821 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1823 LASSERT(PAGE_SIZE
% OBD_ECHO_BLOCK_SIZE
== 0);
1825 return echo_client_init();
1828 static void /*__exit*/ obdecho_exit(void)
1833 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1834 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1835 MODULE_VERSION(LUSTRE_VERSION_STRING
);
1836 MODULE_LICENSE("GPL");
1838 module_init(obdecho_init
);
1839 module_exit(obdecho_exit
);
1841 /** @} echo_client */