4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdecho/echo.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Andreas Dilger <adilger@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_ECHO
44 #include "../include/obd_support.h"
45 #include "../include/obd_class.h"
46 #include "../include/lustre_debug.h"
47 #include "../include/lustre_dlm.h"
48 #include "../include/lprocfs_status.h"
50 #include "echo_internal.h"
52 /* The echo objid needs to be below 2^32, because regular FID numbers are
53 * limited to 2^32 objects in f_oid for the FID_SEQ_ECHO range. b=23335 */
54 #define ECHO_INIT_OID 0x10000000ULL
55 #define ECHO_HANDLE_MAGIC 0xabcd0123fedc9876ULL
57 #define ECHO_PERSISTENT_PAGES (ECHO_PERSISTENT_SIZE >> PAGE_CACHE_SHIFT)
58 static struct page
*echo_persistent_pages
[ECHO_PERSISTENT_PAGES
];
61 LPROC_ECHO_READ_BYTES
= 1,
62 LPROC_ECHO_WRITE_BYTES
= 2,
63 LPROC_ECHO_LAST
= LPROC_ECHO_WRITE_BYTES
+1
66 static int echo_connect(const struct lu_env
*env
,
67 struct obd_export
**exp
, struct obd_device
*obd
,
68 struct obd_uuid
*cluuid
, struct obd_connect_data
*data
,
71 struct lustre_handle conn
= { 0 };
74 data
->ocd_connect_flags
&= ECHO_CONNECT_SUPPORTED
;
75 rc
= class_connect(&conn
, obd
, cluuid
);
77 CERROR("can't connect %d\n", rc
);
80 *exp
= class_conn2export(&conn
);
85 static int echo_disconnect(struct obd_export
*exp
)
87 LASSERT (exp
!= NULL
);
89 return server_disconnect_export(exp
);
92 static int echo_init_export(struct obd_export
*exp
)
94 return ldlm_init_export(exp
);
97 static int echo_destroy_export(struct obd_export
*exp
)
99 target_destroy_export(exp
);
100 ldlm_destroy_export(exp
);
105 static __u64
echo_next_id(struct obd_device
*obddev
)
109 spin_lock(&obddev
->u
.echo
.eo_lock
);
110 id
= ++obddev
->u
.echo
.eo_lastino
;
111 spin_unlock(&obddev
->u
.echo
.eo_lock
);
116 static int echo_create(const struct lu_env
*env
, struct obd_export
*exp
,
117 struct obdo
*oa
, struct lov_stripe_md
**ea
,
118 struct obd_trans_info
*oti
)
120 struct obd_device
*obd
= class_exp2obd(exp
);
123 CERROR("invalid client cookie %#llx\n",
124 exp
->exp_handle
.h_cookie
);
128 if (!(oa
->o_mode
&& S_IFMT
)) {
129 CERROR("echo obd: no type!\n");
133 if (!(oa
->o_valid
& OBD_MD_FLTYPE
)) {
134 CERROR("invalid o_valid %#llx\n", oa
->o_valid
);
138 ostid_set_seq_echo(&oa
->o_oi
);
139 ostid_set_id(&oa
->o_oi
, echo_next_id(obd
));
140 oa
->o_valid
= OBD_MD_FLID
;
145 static int echo_destroy(const struct lu_env
*env
, struct obd_export
*exp
,
146 struct obdo
*oa
, struct lov_stripe_md
*ea
,
147 struct obd_trans_info
*oti
, struct obd_export
*md_exp
,
150 struct obd_device
*obd
= class_exp2obd(exp
);
153 CERROR("invalid client cookie %#llx\n",
154 exp
->exp_handle
.h_cookie
);
158 if (!(oa
->o_valid
& OBD_MD_FLID
)) {
159 CERROR("obdo missing FLID valid flag: %#llx\n", oa
->o_valid
);
163 if (ostid_id(&oa
->o_oi
) > obd
->u
.echo
.eo_lastino
||
164 ostid_id(&oa
->o_oi
) < ECHO_INIT_OID
) {
165 CERROR("bad destroy objid: "DOSTID
"\n", POSTID(&oa
->o_oi
));
172 static int echo_getattr(const struct lu_env
*env
, struct obd_export
*exp
,
173 struct obd_info
*oinfo
)
175 struct obd_device
*obd
= class_exp2obd(exp
);
176 obd_id id
= ostid_id(&oinfo
->oi_oa
->o_oi
);
179 CERROR("invalid client cookie %#llx\n",
180 exp
->exp_handle
.h_cookie
);
184 if (!(oinfo
->oi_oa
->o_valid
& OBD_MD_FLID
)) {
185 CERROR("obdo missing FLID valid flag: %#llx\n",
186 oinfo
->oi_oa
->o_valid
);
190 obdo_cpy_md(oinfo
->oi_oa
, &obd
->u
.echo
.eo_oa
, oinfo
->oi_oa
->o_valid
);
191 ostid_set_seq_echo(&oinfo
->oi_oa
->o_oi
);
192 ostid_set_id(&oinfo
->oi_oa
->o_oi
, id
);
197 static int echo_setattr(const struct lu_env
*env
, struct obd_export
*exp
,
198 struct obd_info
*oinfo
, struct obd_trans_info
*oti
)
200 struct obd_device
*obd
= class_exp2obd(exp
);
203 CERROR("invalid client cookie %#llx\n",
204 exp
->exp_handle
.h_cookie
);
208 if (!(oinfo
->oi_oa
->o_valid
& OBD_MD_FLID
)) {
209 CERROR("obdo missing FLID valid flag: %#llx\n",
210 oinfo
->oi_oa
->o_valid
);
214 memcpy(&obd
->u
.echo
.eo_oa
, oinfo
->oi_oa
, sizeof(*oinfo
->oi_oa
));
216 if (ostid_id(&oinfo
->oi_oa
->o_oi
) & 4) {
217 /* Save lock to force ACKed reply */
218 ldlm_lock_addref (&obd
->u
.echo
.eo_nl_lock
, LCK_NL
);
219 oti
->oti_ack_locks
[0].mode
= LCK_NL
;
220 oti
->oti_ack_locks
[0].lock
= obd
->u
.echo
.eo_nl_lock
;
227 echo_page_debug_setup(struct page
*page
, int rw
, obd_id id
,
228 __u64 offset
, int len
)
230 int page_offset
= offset
& ~CFS_PAGE_MASK
;
231 char *addr
= ((char *)kmap(page
)) + page_offset
;
233 if (len
% OBD_ECHO_BLOCK_SIZE
!= 0)
234 CERROR("Unexpected block size %d\n", len
);
237 if (rw
& OBD_BRW_READ
)
238 block_debug_setup(addr
, OBD_ECHO_BLOCK_SIZE
,
241 block_debug_setup(addr
, OBD_ECHO_BLOCK_SIZE
,
242 0xecc0ecc0ecc0ecc0ULL
,
243 0xecc0ecc0ecc0ecc0ULL
);
245 addr
+= OBD_ECHO_BLOCK_SIZE
;
246 offset
+= OBD_ECHO_BLOCK_SIZE
;
247 len
-= OBD_ECHO_BLOCK_SIZE
;
254 echo_page_debug_check(struct page
*page
, obd_id id
,
255 __u64 offset
, int len
)
257 int page_offset
= offset
& ~CFS_PAGE_MASK
;
258 char *addr
= ((char *)kmap(page
)) + page_offset
;
262 if (len
% OBD_ECHO_BLOCK_SIZE
!= 0)
263 CERROR("Unexpected block size %d\n", len
);
266 rc2
= block_debug_check("echo", addr
, OBD_ECHO_BLOCK_SIZE
,
269 if (rc2
!= 0 && rc
== 0)
272 addr
+= OBD_ECHO_BLOCK_SIZE
;
273 offset
+= OBD_ECHO_BLOCK_SIZE
;
274 len
-= OBD_ECHO_BLOCK_SIZE
;
282 /* This allows us to verify that desc_private is passed unmolested */
283 #define DESC_PRIV 0x10293847
285 static int echo_map_nb_to_lb(struct obdo
*oa
, struct obd_ioobj
*obj
,
286 struct niobuf_remote
*nb
, int *pages
,
287 struct niobuf_local
*lb
, int cmd
, int *left
)
289 int gfp_mask
= (ostid_id(&obj
->ioo_oid
) & 1) ?
290 GFP_HIGHUSER
: GFP_IOFS
;
291 int ispersistent
= ostid_id(&obj
->ioo_oid
) == ECHO_PERSISTENT_OBJID
;
292 int debug_setup
= (!ispersistent
&&
293 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
294 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
295 struct niobuf_local
*res
= lb
;
296 obd_off offset
= nb
->offset
;
300 int plen
= PAGE_CACHE_SIZE
- (offset
& (PAGE_CACHE_SIZE
-1));
304 /* check for local buf overflow */
308 res
->lnb_file_offset
= offset
;
310 LASSERT((res
->lnb_file_offset
& ~CFS_PAGE_MASK
) + res
->len
<=
314 ((res
->lnb_file_offset
>> PAGE_CACHE_SHIFT
) <
315 ECHO_PERSISTENT_PAGES
)) {
317 echo_persistent_pages
[res
->lnb_file_offset
>>
319 /* Take extra ref so __free_pages() can be called OK */
320 get_page (res
->page
);
322 OBD_PAGE_ALLOC(res
->page
, gfp_mask
);
323 if (res
->page
== NULL
) {
324 CERROR("can't get page for id " DOSTID
"\n",
325 POSTID(&obj
->ioo_oid
));
330 CDEBUG(D_PAGE
, "$$$$ get page %p @ %llu for %d\n",
331 res
->page
, res
->lnb_file_offset
, res
->len
);
333 if (cmd
& OBD_BRW_READ
)
337 echo_page_debug_setup(res
->page
, cmd
,
338 ostid_id(&obj
->ioo_oid
),
339 res
->lnb_file_offset
, res
->len
);
352 static int echo_finalize_lb(struct obdo
*oa
, struct obd_ioobj
*obj
,
353 struct niobuf_remote
*rb
, int *pgs
,
354 struct niobuf_local
*lb
, int verify
)
356 struct niobuf_local
*res
= lb
;
357 obd_off start
= rb
->offset
>> PAGE_CACHE_SHIFT
;
358 obd_off end
= (rb
->offset
+ rb
->len
+ PAGE_CACHE_SIZE
- 1) >> PAGE_CACHE_SHIFT
;
359 int count
= (int)(end
- start
);
363 for (i
= 0; i
< count
; i
++, (*pgs
) ++, res
++) {
364 struct page
*page
= res
->page
;
368 CERROR("null page objid %llu:%p, buf %d/%d\n",
369 ostid_id(&obj
->ioo_oid
), page
, i
,
376 CDEBUG(D_PAGE
, "$$$$ use page %p, addr %p@%llu\n",
377 res
->page
, addr
, res
->lnb_file_offset
);
380 int vrc
= echo_page_debug_check(page
,
381 ostid_id(&obj
->ioo_oid
),
382 res
->lnb_file_offset
,
384 /* check all the pages always */
385 if (vrc
!= 0 && rc
== 0)
390 /* NB see comment above regarding persistent pages */
397 static int echo_preprw(const struct lu_env
*env
, int cmd
,
398 struct obd_export
*export
, struct obdo
*oa
,
399 int objcount
, struct obd_ioobj
*obj
,
400 struct niobuf_remote
*nb
, int *pages
,
401 struct niobuf_local
*res
, struct obd_trans_info
*oti
,
402 struct lustre_capa
*unused
)
404 struct obd_device
*obd
;
409 obd
= export
->exp_obd
;
413 /* Temp fix to stop falling foul of osc_announce_cached() */
414 oa
->o_valid
&= ~(OBD_MD_FLBLOCKS
| OBD_MD_FLGRANT
);
416 memset(res
, 0, sizeof(*res
) * *pages
);
418 CDEBUG(D_PAGE
, "%s %d obdos with %d IOs\n",
419 cmd
== OBD_BRW_READ
? "reading" : "writing", objcount
, *pages
);
422 oti
->oti_handle
= (void *)DESC_PRIV
;
427 for (i
= 0; i
< objcount
; i
++, obj
++) {
430 for (j
= 0 ; j
< obj
->ioo_bufcnt
; j
++, nb
++) {
432 rc
= echo_map_nb_to_lb(oa
, obj
, nb
, pages
,
433 res
+ *pages
, cmd
, &left
);
435 GOTO(preprw_cleanup
, rc
);
437 tot_bytes
+= nb
->len
;
441 atomic_add(*pages
, &obd
->u
.echo
.eo_prep
);
443 if (cmd
& OBD_BRW_READ
)
444 lprocfs_counter_add(obd
->obd_stats
, LPROC_ECHO_READ_BYTES
,
447 lprocfs_counter_add(obd
->obd_stats
, LPROC_ECHO_WRITE_BYTES
,
450 CDEBUG(D_PAGE
, "%d pages allocated after prep\n",
451 atomic_read(&obd
->u
.echo
.eo_prep
));
456 /* It is possible that we would rather handle errors by allow
457 * any already-set-up pages to complete, rather than tearing them
458 * all down again. I believe that this is what the in-kernel
459 * prep/commit operations do.
461 CERROR("cleaning up %u pages (%d obdos)\n", *pages
, objcount
);
462 for (i
= 0; i
< *pages
; i
++) {
464 /* NB if this is a persistent page, __free_pages will just
465 * lose the extra ref gained above */
466 OBD_PAGE_FREE(res
[i
].page
);
468 atomic_dec(&obd
->u
.echo
.eo_prep
);
474 static int echo_commitrw(const struct lu_env
*env
, int cmd
,
475 struct obd_export
*export
, struct obdo
*oa
,
476 int objcount
, struct obd_ioobj
*obj
,
477 struct niobuf_remote
*rb
, int niocount
,
478 struct niobuf_local
*res
, struct obd_trans_info
*oti
,
481 struct obd_device
*obd
;
485 obd
= export
->exp_obd
;
490 GOTO(commitrw_cleanup
, rc
);
492 if ((cmd
& OBD_BRW_RWMASK
) == OBD_BRW_READ
) {
493 CDEBUG(D_PAGE
, "reading %d obdos with %d IOs\n",
496 CDEBUG(D_PAGE
, "writing %d obdos with %d IOs\n",
500 if (niocount
&& res
== NULL
) {
501 CERROR("NULL res niobuf with niocount %d\n", niocount
);
505 LASSERT(oti
== NULL
|| oti
->oti_handle
== (void *)DESC_PRIV
);
507 for (i
= 0; i
< objcount
; i
++, obj
++) {
508 int verify
= (rc
== 0 &&
509 ostid_id(&obj
->ioo_oid
) != ECHO_PERSISTENT_OBJID
&&
510 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
511 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
514 for (j
= 0 ; j
< obj
->ioo_bufcnt
; j
++, rb
++) {
515 int vrc
= echo_finalize_lb(oa
, obj
, rb
, &pgs
, &res
[pgs
],
521 GOTO(commitrw_cleanup
, rc
= vrc
);
529 atomic_sub(pgs
, &obd
->u
.echo
.eo_prep
);
531 CDEBUG(D_PAGE
, "%d pages remain after commit\n",
532 atomic_read(&obd
->u
.echo
.eo_prep
));
536 atomic_sub(pgs
, &obd
->u
.echo
.eo_prep
);
538 CERROR("cleaning up %d pages (%d obdos)\n",
539 niocount
- pgs
- 1, objcount
);
541 while (pgs
< niocount
) {
542 struct page
*page
= res
[pgs
++].page
;
547 /* NB see comment above regarding persistent pages */
549 atomic_dec(&obd
->u
.echo
.eo_prep
);
554 static int echo_setup(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
556 struct lprocfs_static_vars lvars
;
558 __u64 lock_flags
= 0;
559 struct ldlm_res_id res_id
= {.name
= {1}};
562 obd
->u
.echo
.eo_obt
.obt_magic
= OBT_MAGIC
;
563 spin_lock_init(&obd
->u
.echo
.eo_lock
);
564 obd
->u
.echo
.eo_lastino
= ECHO_INIT_OID
;
566 sprintf(ns_name
, "echotgt-%s", obd
->obd_uuid
.uuid
);
567 obd
->obd_namespace
= ldlm_namespace_new(obd
, ns_name
,
568 LDLM_NAMESPACE_SERVER
,
569 LDLM_NAMESPACE_MODEST
,
571 if (obd
->obd_namespace
== NULL
) {
576 rc
= ldlm_cli_enqueue_local(obd
->obd_namespace
, &res_id
, LDLM_PLAIN
,
577 NULL
, LCK_NL
, &lock_flags
, NULL
,
578 ldlm_completion_ast
, NULL
, NULL
, 0,
579 LVB_T_NONE
, NULL
, &obd
->u
.echo
.eo_nl_lock
);
580 LASSERT (rc
== ELDLM_OK
);
582 lprocfs_echo_init_vars(&lvars
);
583 if (lprocfs_obd_setup(obd
, lvars
.obd_vars
) == 0 &&
584 lprocfs_alloc_obd_stats(obd
, LPROC_ECHO_LAST
) == 0) {
585 lprocfs_counter_init(obd
->obd_stats
, LPROC_ECHO_READ_BYTES
,
586 LPROCFS_CNTR_AVGMINMAX
,
587 "read_bytes", "bytes");
588 lprocfs_counter_init(obd
->obd_stats
, LPROC_ECHO_WRITE_BYTES
,
589 LPROCFS_CNTR_AVGMINMAX
,
590 "write_bytes", "bytes");
593 ptlrpc_init_client (LDLM_CB_REQUEST_PORTAL
, LDLM_CB_REPLY_PORTAL
,
594 "echo_ldlm_cb_client", &obd
->obd_ldlm_client
);
598 static int echo_cleanup(struct obd_device
*obd
)
602 lprocfs_obd_cleanup(obd
);
603 lprocfs_free_obd_stats(obd
);
605 ldlm_lock_decref(&obd
->u
.echo
.eo_nl_lock
, LCK_NL
);
607 /* XXX Bug 3413; wait for a bit to ensure the BL callback has
608 * happened before calling ldlm_namespace_free() */
609 set_current_state(TASK_UNINTERRUPTIBLE
);
610 schedule_timeout(cfs_time_seconds(1));
612 ldlm_namespace_free(obd
->obd_namespace
, NULL
, obd
->obd_force
);
613 obd
->obd_namespace
= NULL
;
615 leaked
= atomic_read(&obd
->u
.echo
.eo_prep
);
617 CERROR("%d prep/commitrw pages leaked\n", leaked
);
622 struct obd_ops echo_obd_ops
= {
623 .o_owner
= THIS_MODULE
,
624 .o_connect
= echo_connect
,
625 .o_disconnect
= echo_disconnect
,
626 .o_init_export
= echo_init_export
,
627 .o_destroy_export
= echo_destroy_export
,
628 .o_create
= echo_create
,
629 .o_destroy
= echo_destroy
,
630 .o_getattr
= echo_getattr
,
631 .o_setattr
= echo_setattr
,
632 .o_preprw
= echo_preprw
,
633 .o_commitrw
= echo_commitrw
,
634 .o_setup
= echo_setup
,
635 .o_cleanup
= echo_cleanup
638 void echo_persistent_pages_fini(void)
642 for (i
= 0; i
< ECHO_PERSISTENT_PAGES
; i
++)
643 if (echo_persistent_pages
[i
] != NULL
) {
644 OBD_PAGE_FREE(echo_persistent_pages
[i
]);
645 echo_persistent_pages
[i
] = NULL
;
649 int echo_persistent_pages_init(void)
654 for (i
= 0; i
< ECHO_PERSISTENT_PAGES
; i
++) {
655 int gfp_mask
= (i
< ECHO_PERSISTENT_PAGES
/2) ?
656 GFP_IOFS
: GFP_HIGHUSER
;
658 OBD_PAGE_ALLOC(pg
, gfp_mask
);
660 echo_persistent_pages_fini ();
664 memset (kmap (pg
), 0, PAGE_CACHE_SIZE
);
667 echo_persistent_pages
[i
] = pg
;