4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include "../../include/linux/libcfs/libcfs.h"
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "../include/obd.h"
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
57 atomic_t osc_pool_req_count
;
58 unsigned int osc_reqpool_maxreqcount
;
59 struct ptlrpc_request_pool
*osc_rq_pool
;
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max
= 5;
63 module_param(osc_reqpool_mem_max
, uint
, 0444);
65 struct osc_brw_async_args
{
71 struct brw_page
**aa_ppga
;
72 struct client_obd
*aa_cli
;
73 struct list_head aa_oaps
;
74 struct list_head aa_exts
;
75 struct obd_capa
*aa_ocapa
;
76 struct cl_req
*aa_clerq
;
79 struct osc_async_args
{
80 struct obd_info
*aa_oi
;
83 struct osc_setattr_args
{
85 obd_enqueue_update_f sa_upcall
;
89 struct osc_fsync_args
{
90 struct obd_info
*fa_oi
;
91 obd_enqueue_update_f fa_upcall
;
95 struct osc_enqueue_args
{
96 struct obd_export
*oa_exp
;
98 obd_enqueue_update_f oa_upcall
;
100 struct ost_lvb
*oa_lvb
;
101 struct lustre_handle
*oa_lockh
;
102 struct ldlm_enqueue_info
*oa_ei
;
103 unsigned int oa_agl
:1;
106 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
);
107 static int brw_interpret(const struct lu_env
*env
,
108 struct ptlrpc_request
*req
, void *data
, int rc
);
109 int osc_cleanup(struct obd_device
*obd
);
111 /* Pack OSC object metadata for disk storage (LE byte order). */
112 static int osc_packmd(struct obd_export
*exp
, struct lov_mds_md
**lmmp
,
113 struct lov_stripe_md
*lsm
)
117 lmm_size
= sizeof(**lmmp
);
121 if (*lmmp
!= NULL
&& lsm
== NULL
) {
125 } else if (unlikely(lsm
!= NULL
&& ostid_id(&lsm
->lsm_oi
) == 0)) {
130 *lmmp
= kzalloc(lmm_size
, GFP_NOFS
);
136 ostid_cpu_to_le(&lsm
->lsm_oi
, &(*lmmp
)->lmm_oi
);
141 /* Unpack OSC object metadata from disk storage (LE byte order). */
142 static int osc_unpackmd(struct obd_export
*exp
, struct lov_stripe_md
**lsmp
,
143 struct lov_mds_md
*lmm
, int lmm_bytes
)
146 struct obd_import
*imp
= class_exp2cliimp(exp
);
149 if (lmm_bytes
< sizeof(*lmm
)) {
150 CERROR("%s: lov_mds_md too small: %d, need %d\n",
151 exp
->exp_obd
->obd_name
, lmm_bytes
,
155 /* XXX LOV_MAGIC etc check? */
157 if (unlikely(ostid_id(&lmm
->lmm_oi
) == 0)) {
158 CERROR("%s: zero lmm_object_id: rc = %d\n",
159 exp
->exp_obd
->obd_name
, -EINVAL
);
164 lsm_size
= lov_stripe_md_size(1);
168 if (*lsmp
!= NULL
&& lmm
== NULL
) {
169 kfree((*lsmp
)->lsm_oinfo
[0]);
176 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
177 if (unlikely(*lsmp
== NULL
))
179 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
),
181 if (unlikely((*lsmp
)->lsm_oinfo
[0] == NULL
)) {
185 loi_init((*lsmp
)->lsm_oinfo
[0]);
186 } else if (unlikely(ostid_id(&(*lsmp
)->lsm_oi
) == 0)) {
191 /* XXX zero *lsmp? */
192 ostid_le_to_cpu(&lmm
->lmm_oi
, &(*lsmp
)->lsm_oi
);
195 (imp
->imp_connect_data
.ocd_connect_flags
& OBD_CONNECT_MAXBYTES
))
196 (*lsmp
)->lsm_maxbytes
= imp
->imp_connect_data
.ocd_maxbytes
;
198 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
203 static inline void osc_pack_capa(struct ptlrpc_request
*req
,
204 struct ost_body
*body
, void *capa
)
206 struct obd_capa
*oc
= (struct obd_capa
*)capa
;
207 struct lustre_capa
*c
;
212 c
= req_capsule_client_get(&req
->rq_pill
, &RMF_CAPA1
);
215 body
->oa
.o_valid
|= OBD_MD_FLOSSCAPA
;
216 DEBUG_CAPA(D_SEC
, c
, "pack");
219 static inline void osc_pack_req_body(struct ptlrpc_request
*req
,
220 struct obd_info
*oinfo
)
222 struct ost_body
*body
;
224 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
227 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
229 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
232 static inline void osc_set_capa_size(struct ptlrpc_request
*req
,
233 const struct req_msg_field
*field
,
237 req_capsule_set_size(&req
->rq_pill
, field
, RCL_CLIENT
, 0);
239 /* it is already calculated as sizeof struct obd_capa */
243 static int osc_getattr_interpret(const struct lu_env
*env
,
244 struct ptlrpc_request
*req
,
245 struct osc_async_args
*aa
, int rc
)
247 struct ost_body
*body
;
252 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
254 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
255 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
256 aa
->aa_oi
->oi_oa
, &body
->oa
);
258 /* This should really be sent by the OST */
259 aa
->aa_oi
->oi_oa
->o_blksize
= DT_MAX_BRW_SIZE
;
260 aa
->aa_oi
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
262 CDEBUG(D_INFO
, "can't unpack ost_body\n");
264 aa
->aa_oi
->oi_oa
->o_valid
= 0;
267 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
271 static int osc_getattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
272 struct ptlrpc_request_set
*set
)
274 struct ptlrpc_request
*req
;
275 struct osc_async_args
*aa
;
278 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
282 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
283 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
285 ptlrpc_request_free(req
);
289 osc_pack_req_body(req
, oinfo
);
291 ptlrpc_request_set_replen(req
);
292 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_getattr_interpret
;
294 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
295 aa
= ptlrpc_req_async_args(req
);
298 ptlrpc_set_add_req(set
, req
);
302 static int osc_getattr(const struct lu_env
*env
, struct obd_export
*exp
,
303 struct obd_info
*oinfo
)
305 struct ptlrpc_request
*req
;
306 struct ost_body
*body
;
309 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
313 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
314 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
316 ptlrpc_request_free(req
);
320 osc_pack_req_body(req
, oinfo
);
322 ptlrpc_request_set_replen(req
);
324 rc
= ptlrpc_queue_wait(req
);
328 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
334 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
335 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
338 oinfo
->oi_oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
339 oinfo
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
342 ptlrpc_req_finished(req
);
346 static int osc_setattr(const struct lu_env
*env
, struct obd_export
*exp
,
347 struct obd_info
*oinfo
, struct obd_trans_info
*oti
)
349 struct ptlrpc_request
*req
;
350 struct ost_body
*body
;
353 LASSERT(oinfo
->oi_oa
->o_valid
& OBD_MD_FLGROUP
);
355 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
359 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
360 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
362 ptlrpc_request_free(req
);
366 osc_pack_req_body(req
, oinfo
);
368 ptlrpc_request_set_replen(req
);
370 rc
= ptlrpc_queue_wait(req
);
374 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
380 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
384 ptlrpc_req_finished(req
);
388 static int osc_setattr_interpret(const struct lu_env
*env
,
389 struct ptlrpc_request
*req
,
390 struct osc_setattr_args
*sa
, int rc
)
392 struct ost_body
*body
;
397 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
403 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, sa
->sa_oa
,
406 rc
= sa
->sa_upcall(sa
->sa_cookie
, rc
);
410 int osc_setattr_async_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
411 struct obd_trans_info
*oti
,
412 obd_enqueue_update_f upcall
, void *cookie
,
413 struct ptlrpc_request_set
*rqset
)
415 struct ptlrpc_request
*req
;
416 struct osc_setattr_args
*sa
;
419 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
423 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
424 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
426 ptlrpc_request_free(req
);
430 if (oti
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
431 oinfo
->oi_oa
->o_lcookie
= *oti
->oti_logcookies
;
433 osc_pack_req_body(req
, oinfo
);
435 ptlrpc_request_set_replen(req
);
437 /* do mds to ost setattr asynchronously */
439 /* Do not wait for response. */
440 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
442 req
->rq_interpret_reply
=
443 (ptlrpc_interpterer_t
)osc_setattr_interpret
;
445 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
446 sa
= ptlrpc_req_async_args(req
);
447 sa
->sa_oa
= oinfo
->oi_oa
;
448 sa
->sa_upcall
= upcall
;
449 sa
->sa_cookie
= cookie
;
451 if (rqset
== PTLRPCD_SET
)
452 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
454 ptlrpc_set_add_req(rqset
, req
);
460 static int osc_setattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
461 struct obd_trans_info
*oti
,
462 struct ptlrpc_request_set
*rqset
)
464 return osc_setattr_async_base(exp
, oinfo
, oti
,
465 oinfo
->oi_cb_up
, oinfo
, rqset
);
468 int osc_real_create(struct obd_export
*exp
, struct obdo
*oa
,
469 struct lov_stripe_md
**ea
, struct obd_trans_info
*oti
)
471 struct ptlrpc_request
*req
;
472 struct ost_body
*body
;
473 struct lov_stripe_md
*lsm
;
481 rc
= obd_alloc_memmd(exp
, &lsm
);
486 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_CREATE
);
492 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_CREATE
);
494 ptlrpc_request_free(req
);
498 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
501 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
503 ptlrpc_request_set_replen(req
);
505 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
506 oa
->o_flags
== OBD_FL_DELORPHAN
) {
508 "delorphan from OST integration");
509 /* Don't resend the delorphan req */
510 req
->rq_no_resend
= req
->rq_no_delay
= 1;
513 rc
= ptlrpc_queue_wait(req
);
517 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
523 CDEBUG(D_INFO
, "oa flags %x\n", oa
->o_flags
);
524 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oa
, &body
->oa
);
526 oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
527 oa
->o_valid
|= OBD_MD_FLBLKSZ
;
529 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
530 * have valid lsm_oinfo data structs, so don't go touching that.
531 * This needs to be fixed in a big way.
533 lsm
->lsm_oi
= oa
->o_oi
;
537 oti
->oti_transno
= lustre_msg_get_transno(req
->rq_repmsg
);
539 if (oa
->o_valid
& OBD_MD_FLCOOKIE
) {
540 if (!oti
->oti_logcookies
)
541 oti_alloc_cookies(oti
, 1);
542 *oti
->oti_logcookies
= oa
->o_lcookie
;
546 CDEBUG(D_HA
, "transno: %lld\n",
547 lustre_msg_get_transno(req
->rq_repmsg
));
549 ptlrpc_req_finished(req
);
552 obd_free_memmd(exp
, &lsm
);
556 int osc_punch_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
557 obd_enqueue_update_f upcall
, void *cookie
,
558 struct ptlrpc_request_set
*rqset
)
560 struct ptlrpc_request
*req
;
561 struct osc_setattr_args
*sa
;
562 struct ost_body
*body
;
565 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_PUNCH
);
569 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
570 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_PUNCH
);
572 ptlrpc_request_free(req
);
575 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
576 ptlrpc_at_set_req_timeout(req
);
578 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
580 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
582 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
584 ptlrpc_request_set_replen(req
);
586 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_setattr_interpret
;
587 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
588 sa
= ptlrpc_req_async_args(req
);
589 sa
->sa_oa
= oinfo
->oi_oa
;
590 sa
->sa_upcall
= upcall
;
591 sa
->sa_cookie
= cookie
;
592 if (rqset
== PTLRPCD_SET
)
593 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
595 ptlrpc_set_add_req(rqset
, req
);
600 static int osc_sync_interpret(const struct lu_env
*env
,
601 struct ptlrpc_request
*req
,
604 struct osc_fsync_args
*fa
= arg
;
605 struct ost_body
*body
;
610 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
612 CERROR("can't unpack ost_body\n");
617 *fa
->fa_oi
->oi_oa
= body
->oa
;
619 rc
= fa
->fa_upcall(fa
->fa_cookie
, rc
);
623 int osc_sync_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
624 obd_enqueue_update_f upcall
, void *cookie
,
625 struct ptlrpc_request_set
*rqset
)
627 struct ptlrpc_request
*req
;
628 struct ost_body
*body
;
629 struct osc_fsync_args
*fa
;
632 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SYNC
);
636 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
637 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SYNC
);
639 ptlrpc_request_free(req
);
643 /* overload the size and blocks fields in the oa with start/end */
644 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
646 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
648 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
650 ptlrpc_request_set_replen(req
);
651 req
->rq_interpret_reply
= osc_sync_interpret
;
653 CLASSERT(sizeof(*fa
) <= sizeof(req
->rq_async_args
));
654 fa
= ptlrpc_req_async_args(req
);
656 fa
->fa_upcall
= upcall
;
657 fa
->fa_cookie
= cookie
;
659 if (rqset
== PTLRPCD_SET
)
660 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
662 ptlrpc_set_add_req(rqset
, req
);
667 /* Find and cancel locally locks matched by @mode in the resource found by
668 * @objid. Found locks are added into @cancel list. Returns the amount of
669 * locks added to @cancels list. */
670 static int osc_resource_get_unused(struct obd_export
*exp
, struct obdo
*oa
,
671 struct list_head
*cancels
,
672 ldlm_mode_t mode
, __u64 lock_flags
)
674 struct ldlm_namespace
*ns
= exp
->exp_obd
->obd_namespace
;
675 struct ldlm_res_id res_id
;
676 struct ldlm_resource
*res
;
679 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
680 * export) but disabled through procfs (flag in NS).
682 * This distinguishes from a case when ELC is not supported originally,
683 * when we still want to cancel locks in advance and just cancel them
684 * locally, without sending any RPC. */
685 if (exp_connect_cancelset(exp
) && !ns_connect_cancelset(ns
))
688 ostid_build_res_name(&oa
->o_oi
, &res_id
);
689 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
693 LDLM_RESOURCE_ADDREF(res
);
694 count
= ldlm_cancel_resource_local(res
, cancels
, NULL
, mode
,
695 lock_flags
, 0, NULL
);
696 LDLM_RESOURCE_DELREF(res
);
697 ldlm_resource_putref(res
);
701 static int osc_destroy_interpret(const struct lu_env
*env
,
702 struct ptlrpc_request
*req
, void *data
,
705 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
707 atomic_dec(&cli
->cl_destroy_in_flight
);
708 wake_up(&cli
->cl_destroy_waitq
);
712 static int osc_can_send_destroy(struct client_obd
*cli
)
714 if (atomic_inc_return(&cli
->cl_destroy_in_flight
) <=
715 cli
->cl_max_rpcs_in_flight
) {
716 /* The destroy request can be sent */
719 if (atomic_dec_return(&cli
->cl_destroy_in_flight
) <
720 cli
->cl_max_rpcs_in_flight
) {
722 * The counter has been modified between the two atomic
725 wake_up(&cli
->cl_destroy_waitq
);
730 int osc_create(const struct lu_env
*env
, struct obd_export
*exp
,
731 struct obdo
*oa
, struct lov_stripe_md
**ea
,
732 struct obd_trans_info
*oti
)
738 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
740 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
741 oa
->o_flags
== OBD_FL_RECREATE_OBJS
) {
742 return osc_real_create(exp
, oa
, ea
, oti
);
745 if (!fid_seq_is_mdt(ostid_seq(&oa
->o_oi
)))
746 return osc_real_create(exp
, oa
, ea
, oti
);
748 /* we should not get here anymore */
754 /* Destroy requests can be async always on the client, and we don't even really
755 * care about the return code since the client cannot do anything at all about
757 * When the MDS is unlinking a filename, it saves the file objects into a
758 * recovery llog, and these object records are cancelled when the OST reports
759 * they were destroyed and sync'd to disk (i.e. transaction committed).
760 * If the client dies, or the OST is down when the object should be destroyed,
761 * the records are not cancelled, and when the OST reconnects to the MDS next,
762 * it will retrieve the llog unlink logs and then sends the log cancellation
763 * cookies to the MDS after committing destroy transactions. */
764 static int osc_destroy(const struct lu_env
*env
, struct obd_export
*exp
,
765 struct obdo
*oa
, struct lov_stripe_md
*ea
,
766 struct obd_trans_info
*oti
, struct obd_export
*md_export
,
769 struct client_obd
*cli
= &exp
->exp_obd
->u
.cli
;
770 struct ptlrpc_request
*req
;
771 struct ost_body
*body
;
776 CDEBUG(D_INFO
, "oa NULL\n");
780 count
= osc_resource_get_unused(exp
, oa
, &cancels
, LCK_PW
,
781 LDLM_FL_DISCARD_DATA
);
783 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_DESTROY
);
785 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
789 osc_set_capa_size(req
, &RMF_CAPA1
, (struct obd_capa
*)capa
);
790 rc
= ldlm_prep_elc_req(exp
, req
, LUSTRE_OST_VERSION
, OST_DESTROY
,
793 ptlrpc_request_free(req
);
797 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
798 ptlrpc_at_set_req_timeout(req
);
800 if (oti
!= NULL
&& oa
->o_valid
& OBD_MD_FLCOOKIE
)
801 oa
->o_lcookie
= *oti
->oti_logcookies
;
802 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
804 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
806 osc_pack_capa(req
, body
, (struct obd_capa
*)capa
);
807 ptlrpc_request_set_replen(req
);
809 /* If osc_destroy is for destroying the unlink orphan,
810 * sent from MDT to OST, which should not be blocked here,
811 * because the process might be triggered by ptlrpcd, and
812 * it is not good to block ptlrpcd thread (b=16006)*/
813 if (!(oa
->o_flags
& OBD_FL_DELORPHAN
)) {
814 req
->rq_interpret_reply
= osc_destroy_interpret
;
815 if (!osc_can_send_destroy(cli
)) {
816 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
,
820 * Wait until the number of on-going destroy RPCs drops
821 * under max_rpc_in_flight
823 l_wait_event_exclusive(cli
->cl_destroy_waitq
,
824 osc_can_send_destroy(cli
), &lwi
);
828 /* Do not wait for response */
829 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
833 static void osc_announce_cached(struct client_obd
*cli
, struct obdo
*oa
,
836 u32 bits
= OBD_MD_FLBLOCKS
|OBD_MD_FLGRANT
;
838 LASSERT(!(oa
->o_valid
& bits
));
841 client_obd_list_lock(&cli
->cl_loi_list_lock
);
842 oa
->o_dirty
= cli
->cl_dirty
;
843 if (unlikely(cli
->cl_dirty
- cli
->cl_dirty_transit
>
844 cli
->cl_dirty_max
)) {
845 CERROR("dirty %lu - %lu > dirty_max %lu\n",
846 cli
->cl_dirty
, cli
->cl_dirty_transit
, cli
->cl_dirty_max
);
848 } else if (unlikely(atomic_read(&obd_dirty_pages
) -
849 atomic_read(&obd_dirty_transit_pages
) >
850 (long)(obd_max_dirty_pages
+ 1))) {
851 /* The atomic_read() allowing the atomic_inc() are
852 * not covered by a lock thus they may safely race and trip
853 * this CERROR() unless we add in a small fudge factor (+1). */
854 CERROR("dirty %d - %d > system dirty_max %d\n",
855 atomic_read(&obd_dirty_pages
),
856 atomic_read(&obd_dirty_transit_pages
),
857 obd_max_dirty_pages
);
859 } else if (unlikely(cli
->cl_dirty_max
- cli
->cl_dirty
> 0x7fffffff)) {
860 CERROR("dirty %lu - dirty_max %lu too big???\n",
861 cli
->cl_dirty
, cli
->cl_dirty_max
);
864 long max_in_flight
= (cli
->cl_max_pages_per_rpc
<<
866 (cli
->cl_max_rpcs_in_flight
+ 1);
867 oa
->o_undirty
= max(cli
->cl_dirty_max
, max_in_flight
);
869 oa
->o_grant
= cli
->cl_avail_grant
+ cli
->cl_reserved_grant
;
870 oa
->o_dropped
= cli
->cl_lost_grant
;
871 cli
->cl_lost_grant
= 0;
872 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
873 CDEBUG(D_CACHE
, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
874 oa
->o_dirty
, oa
->o_undirty
, oa
->o_dropped
, oa
->o_grant
);
878 void osc_update_next_shrink(struct client_obd
*cli
)
880 cli
->cl_next_shrink_grant
=
881 cfs_time_shift(cli
->cl_grant_shrink_interval
);
882 CDEBUG(D_CACHE
, "next time %ld to shrink grant \n",
883 cli
->cl_next_shrink_grant
);
886 static void __osc_update_grant(struct client_obd
*cli
, u64 grant
)
888 client_obd_list_lock(&cli
->cl_loi_list_lock
);
889 cli
->cl_avail_grant
+= grant
;
890 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
893 static void osc_update_grant(struct client_obd
*cli
, struct ost_body
*body
)
895 if (body
->oa
.o_valid
& OBD_MD_FLGRANT
) {
896 CDEBUG(D_CACHE
, "got %llu extra grant\n", body
->oa
.o_grant
);
897 __osc_update_grant(cli
, body
->oa
.o_grant
);
901 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
902 u32 keylen
, void *key
, u32 vallen
,
903 void *val
, struct ptlrpc_request_set
*set
);
905 static int osc_shrink_grant_interpret(const struct lu_env
*env
,
906 struct ptlrpc_request
*req
,
909 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
910 struct obdo
*oa
= ((struct osc_brw_async_args
*)aa
)->aa_oa
;
911 struct ost_body
*body
;
914 __osc_update_grant(cli
, oa
->o_grant
);
918 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
920 osc_update_grant(cli
, body
);
926 static void osc_shrink_grant_local(struct client_obd
*cli
, struct obdo
*oa
)
928 client_obd_list_lock(&cli
->cl_loi_list_lock
);
929 oa
->o_grant
= cli
->cl_avail_grant
/ 4;
930 cli
->cl_avail_grant
-= oa
->o_grant
;
931 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
932 if (!(oa
->o_valid
& OBD_MD_FLFLAGS
)) {
933 oa
->o_valid
|= OBD_MD_FLFLAGS
;
936 oa
->o_flags
|= OBD_FL_SHRINK_GRANT
;
937 osc_update_next_shrink(cli
);
940 /* Shrink the current grant, either from some large amount to enough for a
941 * full set of in-flight RPCs, or if we have already shrunk to that limit
942 * then to enough for a single RPC. This avoids keeping more grant than
943 * needed, and avoids shrinking the grant piecemeal. */
944 static int osc_shrink_grant(struct client_obd
*cli
)
946 __u64 target_bytes
= (cli
->cl_max_rpcs_in_flight
+ 1) *
947 (cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
);
949 client_obd_list_lock(&cli
->cl_loi_list_lock
);
950 if (cli
->cl_avail_grant
<= target_bytes
)
951 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
952 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
954 return osc_shrink_grant_to_target(cli
, target_bytes
);
957 int osc_shrink_grant_to_target(struct client_obd
*cli
, __u64 target_bytes
)
960 struct ost_body
*body
;
962 client_obd_list_lock(&cli
->cl_loi_list_lock
);
963 /* Don't shrink if we are already above or below the desired limit
964 * We don't want to shrink below a single RPC, as that will negatively
965 * impact block allocation and long-term performance. */
966 if (target_bytes
< cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
)
967 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
969 if (target_bytes
>= cli
->cl_avail_grant
) {
970 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
973 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
975 body
= kzalloc(sizeof(*body
), GFP_NOFS
);
979 osc_announce_cached(cli
, &body
->oa
, 0);
981 client_obd_list_lock(&cli
->cl_loi_list_lock
);
982 body
->oa
.o_grant
= cli
->cl_avail_grant
- target_bytes
;
983 cli
->cl_avail_grant
= target_bytes
;
984 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
985 if (!(body
->oa
.o_valid
& OBD_MD_FLFLAGS
)) {
986 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
987 body
->oa
.o_flags
= 0;
989 body
->oa
.o_flags
|= OBD_FL_SHRINK_GRANT
;
990 osc_update_next_shrink(cli
);
992 rc
= osc_set_info_async(NULL
, cli
->cl_import
->imp_obd
->obd_self_export
,
993 sizeof(KEY_GRANT_SHRINK
), KEY_GRANT_SHRINK
,
994 sizeof(*body
), body
, NULL
);
996 __osc_update_grant(cli
, body
->oa
.o_grant
);
1001 static int osc_should_shrink_grant(struct client_obd
*client
)
1003 unsigned long time
= cfs_time_current();
1004 unsigned long next_shrink
= client
->cl_next_shrink_grant
;
1006 if ((client
->cl_import
->imp_connect_data
.ocd_connect_flags
&
1007 OBD_CONNECT_GRANT_SHRINK
) == 0)
1010 if (cfs_time_aftereq(time
, next_shrink
- 5 * CFS_TICK
)) {
1011 /* Get the current RPC size directly, instead of going via:
1012 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1013 * Keep comment here so that it can be found by searching. */
1014 int brw_size
= client
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
1016 if (client
->cl_import
->imp_state
== LUSTRE_IMP_FULL
&&
1017 client
->cl_avail_grant
> brw_size
)
1020 osc_update_next_shrink(client
);
1025 static int osc_grant_shrink_grant_cb(struct timeout_item
*item
, void *data
)
1027 struct client_obd
*client
;
1029 list_for_each_entry(client
, &item
->ti_obd_list
,
1030 cl_grant_shrink_list
) {
1031 if (osc_should_shrink_grant(client
))
1032 osc_shrink_grant(client
);
1037 static int osc_add_shrink_grant(struct client_obd
*client
)
1041 rc
= ptlrpc_add_timeout_client(client
->cl_grant_shrink_interval
,
1043 osc_grant_shrink_grant_cb
, NULL
,
1044 &client
->cl_grant_shrink_list
);
1046 CERROR("add grant client %s error %d\n",
1047 client
->cl_import
->imp_obd
->obd_name
, rc
);
1050 CDEBUG(D_CACHE
, "add grant client %s \n",
1051 client
->cl_import
->imp_obd
->obd_name
);
1052 osc_update_next_shrink(client
);
1056 static int osc_del_shrink_grant(struct client_obd
*client
)
1058 return ptlrpc_del_timeout_client(&client
->cl_grant_shrink_list
,
1062 static void osc_init_grant(struct client_obd
*cli
, struct obd_connect_data
*ocd
)
1065 * ocd_grant is the total grant amount we're expect to hold: if we've
1066 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1067 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1069 * race is tolerable here: if we're evicted, but imp_state already
1070 * left EVICTED state, then cl_dirty must be 0 already.
1072 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1073 if (cli
->cl_import
->imp_state
== LUSTRE_IMP_EVICTED
)
1074 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1076 cli
->cl_avail_grant
= ocd
->ocd_grant
- cli
->cl_dirty
;
1078 if (cli
->cl_avail_grant
< 0) {
1079 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1080 cli
->cl_import
->imp_obd
->obd_name
, cli
->cl_avail_grant
,
1081 ocd
->ocd_grant
, cli
->cl_dirty
);
1082 /* workaround for servers which do not have the patch from
1084 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1087 /* determine the appropriate chunk size used by osc_extent. */
1088 cli
->cl_chunkbits
= max_t(int, PAGE_CACHE_SHIFT
, ocd
->ocd_blocksize
);
1089 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1091 CDEBUG(D_CACHE
, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1092 cli
->cl_import
->imp_obd
->obd_name
,
1093 cli
->cl_avail_grant
, cli
->cl_lost_grant
, cli
->cl_chunkbits
);
1095 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT_SHRINK
&&
1096 list_empty(&cli
->cl_grant_shrink_list
))
1097 osc_add_shrink_grant(cli
);
1100 /* We assume that the reason this OSC got a short read is because it read
1101 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1102 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1103 * this stripe never got written at or beyond this stripe offset yet. */
1104 static void handle_short_read(int nob_read
, u32 page_count
,
1105 struct brw_page
**pga
)
1110 /* skip bytes read OK */
1111 while (nob_read
> 0) {
1112 LASSERT(page_count
> 0);
1114 if (pga
[i
]->count
> nob_read
) {
1115 /* EOF inside this page */
1116 ptr
= kmap(pga
[i
]->pg
) +
1117 (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1118 memset(ptr
+ nob_read
, 0, pga
[i
]->count
- nob_read
);
1125 nob_read
-= pga
[i
]->count
;
1130 /* zero remaining pages */
1131 while (page_count
-- > 0) {
1132 ptr
= kmap(pga
[i
]->pg
) + (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1133 memset(ptr
, 0, pga
[i
]->count
);
1139 static int check_write_rcs(struct ptlrpc_request
*req
,
1140 int requested_nob
, int niocount
,
1141 u32 page_count
, struct brw_page
**pga
)
1146 remote_rcs
= req_capsule_server_sized_get(&req
->rq_pill
, &RMF_RCS
,
1147 sizeof(*remote_rcs
) *
1149 if (remote_rcs
== NULL
) {
1150 CDEBUG(D_INFO
, "Missing/short RC vector on BRW_WRITE reply\n");
1154 /* return error if any niobuf was in error */
1155 for (i
= 0; i
< niocount
; i
++) {
1156 if ((int)remote_rcs
[i
] < 0)
1157 return remote_rcs
[i
];
1159 if (remote_rcs
[i
] != 0) {
1160 CDEBUG(D_INFO
, "rc[%d] invalid (%d) req %p\n",
1161 i
, remote_rcs
[i
], req
);
1166 if (req
->rq_bulk
->bd_nob_transferred
!= requested_nob
) {
1167 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1168 req
->rq_bulk
->bd_nob_transferred
, requested_nob
);
1175 static inline int can_merge_pages(struct brw_page
*p1
, struct brw_page
*p2
)
1177 if (p1
->flag
!= p2
->flag
) {
1178 unsigned mask
= ~(OBD_BRW_FROM_GRANT
| OBD_BRW_NOCACHE
|
1179 OBD_BRW_SYNC
| OBD_BRW_ASYNC
|OBD_BRW_NOQUOTA
);
1181 /* warn if we try to combine flags that we don't know to be
1182 * safe to combine */
1183 if (unlikely((p1
->flag
& mask
) != (p2
->flag
& mask
))) {
1184 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1185 p1
->flag
, p2
->flag
);
1190 return (p1
->off
+ p1
->count
== p2
->off
);
1193 static u32
osc_checksum_bulk(int nob
, u32 pg_count
,
1194 struct brw_page
**pga
, int opc
,
1195 cksum_type_t cksum_type
)
1199 struct cfs_crypto_hash_desc
*hdesc
;
1200 unsigned int bufsize
;
1202 unsigned char cfs_alg
= cksum_obd2cfs(cksum_type
);
1204 LASSERT(pg_count
> 0);
1206 hdesc
= cfs_crypto_hash_init(cfs_alg
, NULL
, 0);
1207 if (IS_ERR(hdesc
)) {
1208 CERROR("Unable to initialize checksum hash %s\n",
1209 cfs_crypto_hash_name(cfs_alg
));
1210 return PTR_ERR(hdesc
);
1213 while (nob
> 0 && pg_count
> 0) {
1214 int count
= pga
[i
]->count
> nob
? nob
: pga
[i
]->count
;
1216 /* corrupt the data before we compute the checksum, to
1217 * simulate an OST->client data error */
1218 if (i
== 0 && opc
== OST_READ
&&
1219 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE
)) {
1220 unsigned char *ptr
= kmap(pga
[i
]->pg
);
1221 int off
= pga
[i
]->off
& ~CFS_PAGE_MASK
;
1222 memcpy(ptr
+ off
, "bad1", min(4, nob
));
1225 cfs_crypto_hash_update_page(hdesc
, pga
[i
]->pg
,
1226 pga
[i
]->off
& ~CFS_PAGE_MASK
,
1229 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1230 pga
[i
]->pg
, pga
[i
]->pg
->mapping
, pga
[i
]->pg
->index
,
1231 (long)pga
[i
]->pg
->flags
, page_count(pga
[i
]->pg
),
1232 page_private(pga
[i
]->pg
),
1233 (int)(pga
[i
]->off
& ~CFS_PAGE_MASK
));
1235 nob
-= pga
[i
]->count
;
1241 err
= cfs_crypto_hash_final(hdesc
, (unsigned char *)&cksum
, &bufsize
);
1244 cfs_crypto_hash_final(hdesc
, NULL
, NULL
);
1246 /* For sending we only compute the wrong checksum instead
1247 * of corrupting the data so it is still correct on a redo */
1248 if (opc
== OST_WRITE
&& OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND
))
1254 static int osc_brw_prep_request(int cmd
, struct client_obd
*cli
,
1256 struct lov_stripe_md
*lsm
, u32 page_count
,
1257 struct brw_page
**pga
,
1258 struct ptlrpc_request
**reqp
,
1259 struct obd_capa
*ocapa
, int reserve
,
1262 struct ptlrpc_request
*req
;
1263 struct ptlrpc_bulk_desc
*desc
;
1264 struct ost_body
*body
;
1265 struct obd_ioobj
*ioobj
;
1266 struct niobuf_remote
*niobuf
;
1267 int niocount
, i
, requested_nob
, opc
, rc
;
1268 struct osc_brw_async_args
*aa
;
1269 struct req_capsule
*pill
;
1270 struct brw_page
*pg_prev
;
1272 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ
))
1273 return -ENOMEM
; /* Recoverable */
1274 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2
))
1275 return -EINVAL
; /* Fatal */
1277 if ((cmd
& OBD_BRW_WRITE
) != 0) {
1279 req
= ptlrpc_request_alloc_pool(cli
->cl_import
,
1281 &RQF_OST_BRW_WRITE
);
1284 req
= ptlrpc_request_alloc(cli
->cl_import
, &RQF_OST_BRW_READ
);
1289 for (niocount
= i
= 1; i
< page_count
; i
++) {
1290 if (!can_merge_pages(pga
[i
- 1], pga
[i
]))
1294 pill
= &req
->rq_pill
;
1295 req_capsule_set_size(pill
, &RMF_OBD_IOOBJ
, RCL_CLIENT
,
1297 req_capsule_set_size(pill
, &RMF_NIOBUF_REMOTE
, RCL_CLIENT
,
1298 niocount
* sizeof(*niobuf
));
1299 osc_set_capa_size(req
, &RMF_CAPA1
, ocapa
);
1301 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, opc
);
1303 ptlrpc_request_free(req
);
1306 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
1307 ptlrpc_at_set_req_timeout(req
);
1308 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1310 req
->rq_no_retry_einprogress
= 1;
1312 desc
= ptlrpc_prep_bulk_imp(req
, page_count
,
1313 cli
->cl_import
->imp_connect_data
.ocd_brw_size
>> LNET_MTU_BITS
,
1314 opc
== OST_WRITE
? BULK_GET_SOURCE
: BULK_PUT_SINK
,
1321 /* NB request now owns desc and will free it when it gets freed */
1323 body
= req_capsule_client_get(pill
, &RMF_OST_BODY
);
1324 ioobj
= req_capsule_client_get(pill
, &RMF_OBD_IOOBJ
);
1325 niobuf
= req_capsule_client_get(pill
, &RMF_NIOBUF_REMOTE
);
1326 LASSERT(body
!= NULL
&& ioobj
!= NULL
&& niobuf
!= NULL
);
1328 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
1330 obdo_to_ioobj(oa
, ioobj
);
1331 ioobj
->ioo_bufcnt
= niocount
;
1332 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1333 * that might be send for this request. The actual number is decided
1334 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1335 * "max - 1" for old client compatibility sending "0", and also so the
1336 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1337 ioobj_max_brw_set(ioobj
, desc
->bd_md_max_brw
);
1338 osc_pack_capa(req
, body
, ocapa
);
1339 LASSERT(page_count
> 0);
1341 for (requested_nob
= i
= 0; i
< page_count
; i
++, niobuf
++) {
1342 struct brw_page
*pg
= pga
[i
];
1343 int poff
= pg
->off
& ~CFS_PAGE_MASK
;
1345 LASSERT(pg
->count
> 0);
1346 /* make sure there is no gap in the middle of page array */
1347 LASSERTF(page_count
== 1 ||
1348 (ergo(i
== 0, poff
+ pg
->count
== PAGE_CACHE_SIZE
) &&
1349 ergo(i
> 0 && i
< page_count
- 1,
1350 poff
== 0 && pg
->count
== PAGE_CACHE_SIZE
) &&
1351 ergo(i
== page_count
- 1, poff
== 0)),
1352 "i: %d/%d pg: %p off: %llu, count: %u\n",
1353 i
, page_count
, pg
, pg
->off
, pg
->count
);
1354 LASSERTF(i
== 0 || pg
->off
> pg_prev
->off
,
1355 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1357 pg
->pg
, page_private(pg
->pg
), pg
->pg
->index
, pg
->off
,
1358 pg_prev
->pg
, page_private(pg_prev
->pg
),
1359 pg_prev
->pg
->index
, pg_prev
->off
);
1360 LASSERT((pga
[0]->flag
& OBD_BRW_SRVLOCK
) ==
1361 (pg
->flag
& OBD_BRW_SRVLOCK
));
1363 ptlrpc_prep_bulk_page_pin(desc
, pg
->pg
, poff
, pg
->count
);
1364 requested_nob
+= pg
->count
;
1366 if (i
> 0 && can_merge_pages(pg_prev
, pg
)) {
1368 niobuf
->len
+= pg
->count
;
1370 niobuf
->offset
= pg
->off
;
1371 niobuf
->len
= pg
->count
;
1372 niobuf
->flags
= pg
->flag
;
1377 LASSERTF((void *)(niobuf
- niocount
) ==
1378 req_capsule_client_get(&req
->rq_pill
, &RMF_NIOBUF_REMOTE
),
1379 "want %p - real %p\n", req_capsule_client_get(&req
->rq_pill
,
1380 &RMF_NIOBUF_REMOTE
), (void *)(niobuf
- niocount
));
1382 osc_announce_cached(cli
, &body
->oa
, opc
== OST_WRITE
? requested_nob
:0);
1384 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1385 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
1386 body
->oa
.o_flags
= 0;
1388 body
->oa
.o_flags
|= OBD_FL_RECOV_RESEND
;
1391 if (osc_should_shrink_grant(cli
))
1392 osc_shrink_grant_local(cli
, &body
->oa
);
1394 /* size[REQ_REC_OFF] still sizeof (*body) */
1395 if (opc
== OST_WRITE
) {
1396 if (cli
->cl_checksum
&&
1397 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1398 /* store cl_cksum_type in a local variable since
1399 * it can be changed via lprocfs */
1400 cksum_type_t cksum_type
= cli
->cl_cksum_type
;
1402 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1403 oa
->o_flags
&= OBD_FL_LOCAL_MASK
;
1404 body
->oa
.o_flags
= 0;
1406 body
->oa
.o_flags
|= cksum_type_pack(cksum_type
);
1407 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1408 body
->oa
.o_cksum
= osc_checksum_bulk(requested_nob
,
1412 CDEBUG(D_PAGE
, "checksum at write origin: %x\n",
1414 /* save this in 'oa', too, for later checking */
1415 oa
->o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1416 oa
->o_flags
|= cksum_type_pack(cksum_type
);
1418 /* clear out the checksum flag, in case this is a
1419 * resend but cl_checksum is no longer set. b=11238 */
1420 oa
->o_valid
&= ~OBD_MD_FLCKSUM
;
1422 oa
->o_cksum
= body
->oa
.o_cksum
;
1423 /* 1 RC per niobuf */
1424 req_capsule_set_size(pill
, &RMF_RCS
, RCL_SERVER
,
1425 sizeof(__u32
) * niocount
);
1427 if (cli
->cl_checksum
&&
1428 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1429 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0)
1430 body
->oa
.o_flags
= 0;
1431 body
->oa
.o_flags
|= cksum_type_pack(cli
->cl_cksum_type
);
1432 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1435 ptlrpc_request_set_replen(req
);
1437 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1438 aa
= ptlrpc_req_async_args(req
);
1440 aa
->aa_requested_nob
= requested_nob
;
1441 aa
->aa_nio_count
= niocount
;
1442 aa
->aa_page_count
= page_count
;
1446 INIT_LIST_HEAD(&aa
->aa_oaps
);
1447 if (ocapa
&& reserve
)
1448 aa
->aa_ocapa
= capa_get(ocapa
);
1454 ptlrpc_req_finished(req
);
1458 static int check_write_checksum(struct obdo
*oa
, const lnet_process_id_t
*peer
,
1459 __u32 client_cksum
, __u32 server_cksum
, int nob
,
1460 u32 page_count
, struct brw_page
**pga
,
1461 cksum_type_t client_cksum_type
)
1465 cksum_type_t cksum_type
;
1467 if (server_cksum
== client_cksum
) {
1468 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1472 cksum_type
= cksum_type_unpack(oa
->o_valid
& OBD_MD_FLFLAGS
?
1474 new_cksum
= osc_checksum_bulk(nob
, page_count
, pga
, OST_WRITE
,
1477 if (cksum_type
!= client_cksum_type
)
1478 msg
= "the server did not use the checksum type specified in the original request - likely a protocol problem"
1480 else if (new_cksum
== server_cksum
)
1481 msg
= "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1483 else if (new_cksum
== client_cksum
)
1484 msg
= "changed in transit before arrival at OST";
1486 msg
= "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1489 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1490 " object "DOSTID
" extent [%llu-%llu]\n",
1491 msg
, libcfs_nid2str(peer
->nid
),
1492 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_seq
: (__u64
)0,
1493 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_oid
: 0,
1494 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_ver
: 0,
1495 POSTID(&oa
->o_oi
), pga
[0]->off
,
1496 pga
[page_count
-1]->off
+ pga
[page_count
-1]->count
- 1);
1497 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1498 client_cksum
, client_cksum_type
,
1499 server_cksum
, cksum_type
, new_cksum
);
1503 /* Note rc enters this function as number of bytes transferred */
1504 static int osc_brw_fini_request(struct ptlrpc_request
*req
, int rc
)
1506 struct osc_brw_async_args
*aa
= (void *)&req
->rq_async_args
;
1507 const lnet_process_id_t
*peer
=
1508 &req
->rq_import
->imp_connection
->c_peer
;
1509 struct client_obd
*cli
= aa
->aa_cli
;
1510 struct ost_body
*body
;
1511 __u32 client_cksum
= 0;
1513 if (rc
< 0 && rc
!= -EDQUOT
) {
1514 DEBUG_REQ(D_INFO
, req
, "Failed request with rc = %d\n", rc
);
1518 LASSERTF(req
->rq_repmsg
!= NULL
, "rc = %d\n", rc
);
1519 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
1521 DEBUG_REQ(D_INFO
, req
, "Can't unpack body\n");
1525 /* set/clear over quota flag for a uid/gid */
1526 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
&&
1527 body
->oa
.o_valid
& (OBD_MD_FLUSRQUOTA
| OBD_MD_FLGRPQUOTA
)) {
1528 unsigned int qid
[MAXQUOTAS
] = { body
->oa
.o_uid
, body
->oa
.o_gid
};
1530 CDEBUG(D_QUOTA
, "setdq for [%u %u] with valid %#llx, flags %x\n",
1531 body
->oa
.o_uid
, body
->oa
.o_gid
, body
->oa
.o_valid
,
1533 osc_quota_setdq(cli
, qid
, body
->oa
.o_valid
, body
->oa
.o_flags
);
1536 osc_update_grant(cli
, body
);
1541 if (aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
)
1542 client_cksum
= aa
->aa_oa
->o_cksum
; /* save for later */
1544 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
) {
1546 CERROR("Unexpected +ve rc %d\n", rc
);
1549 LASSERT(req
->rq_bulk
->bd_nob
== aa
->aa_requested_nob
);
1551 if (sptlrpc_cli_unwrap_bulk_write(req
, req
->rq_bulk
))
1554 if ((aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
) && client_cksum
&&
1555 check_write_checksum(&body
->oa
, peer
, client_cksum
,
1556 body
->oa
.o_cksum
, aa
->aa_requested_nob
,
1557 aa
->aa_page_count
, aa
->aa_ppga
,
1558 cksum_type_unpack(aa
->aa_oa
->o_flags
)))
1561 rc
= check_write_rcs(req
, aa
->aa_requested_nob
,
1563 aa
->aa_page_count
, aa
->aa_ppga
);
1567 /* The rest of this function executes only for OST_READs */
1569 /* if unwrap_bulk failed, return -EAGAIN to retry */
1570 rc
= sptlrpc_cli_unwrap_bulk_read(req
, req
->rq_bulk
, rc
);
1576 if (rc
> aa
->aa_requested_nob
) {
1577 CERROR("Unexpected rc %d (%d requested)\n", rc
,
1578 aa
->aa_requested_nob
);
1582 if (rc
!= req
->rq_bulk
->bd_nob_transferred
) {
1583 CERROR("Unexpected rc %d (%d transferred)\n",
1584 rc
, req
->rq_bulk
->bd_nob_transferred
);
1588 if (rc
< aa
->aa_requested_nob
)
1589 handle_short_read(rc
, aa
->aa_page_count
, aa
->aa_ppga
);
1591 if (body
->oa
.o_valid
& OBD_MD_FLCKSUM
) {
1592 static int cksum_counter
;
1593 __u32 server_cksum
= body
->oa
.o_cksum
;
1596 cksum_type_t cksum_type
;
1598 cksum_type
= cksum_type_unpack(body
->oa
.o_valid
&OBD_MD_FLFLAGS
?
1599 body
->oa
.o_flags
: 0);
1600 client_cksum
= osc_checksum_bulk(rc
, aa
->aa_page_count
,
1601 aa
->aa_ppga
, OST_READ
,
1604 if (peer
->nid
== req
->rq_bulk
->bd_sender
) {
1608 router
= libcfs_nid2str(req
->rq_bulk
->bd_sender
);
1611 if (server_cksum
!= client_cksum
) {
1612 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID
" object " DOSTID
" extent [%llu-%llu]\n",
1613 req
->rq_import
->imp_obd
->obd_name
,
1614 libcfs_nid2str(peer
->nid
),
1616 body
->oa
.o_valid
& OBD_MD_FLFID
?
1617 body
->oa
.o_parent_seq
: (__u64
)0,
1618 body
->oa
.o_valid
& OBD_MD_FLFID
?
1619 body
->oa
.o_parent_oid
: 0,
1620 body
->oa
.o_valid
& OBD_MD_FLFID
?
1621 body
->oa
.o_parent_ver
: 0,
1622 POSTID(&body
->oa
.o_oi
),
1623 aa
->aa_ppga
[0]->off
,
1624 aa
->aa_ppga
[aa
->aa_page_count
-1]->off
+
1625 aa
->aa_ppga
[aa
->aa_page_count
-1]->count
-
1627 CERROR("client %x, server %x, cksum_type %x\n",
1628 client_cksum
, server_cksum
, cksum_type
);
1630 aa
->aa_oa
->o_cksum
= client_cksum
;
1634 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1637 } else if (unlikely(client_cksum
)) {
1638 static int cksum_missed
;
1641 if ((cksum_missed
& (-cksum_missed
)) == cksum_missed
)
1642 CERROR("Checksum %u requested from %s but not sent\n",
1643 cksum_missed
, libcfs_nid2str(peer
->nid
));
1649 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
1650 aa
->aa_oa
, &body
->oa
);
1655 static int osc_brw_redo_request(struct ptlrpc_request
*request
,
1656 struct osc_brw_async_args
*aa
, int rc
)
1658 struct ptlrpc_request
*new_req
;
1659 struct osc_brw_async_args
*new_aa
;
1660 struct osc_async_page
*oap
;
1662 DEBUG_REQ(rc
== -EINPROGRESS
? D_RPCTRACE
: D_ERROR
, request
,
1663 "redo for recoverable error %d", rc
);
1665 rc
= osc_brw_prep_request(lustre_msg_get_opc(request
->rq_reqmsg
) ==
1666 OST_WRITE
? OBD_BRW_WRITE
: OBD_BRW_READ
,
1667 aa
->aa_cli
, aa
->aa_oa
,
1668 NULL
/* lsm unused by osc currently */,
1669 aa
->aa_page_count
, aa
->aa_ppga
,
1670 &new_req
, aa
->aa_ocapa
, 0, 1);
1674 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
1675 if (oap
->oap_request
!= NULL
) {
1676 LASSERTF(request
== oap
->oap_request
,
1677 "request %p != oap_request %p\n",
1678 request
, oap
->oap_request
);
1679 if (oap
->oap_interrupted
) {
1680 ptlrpc_req_finished(new_req
);
1685 /* New request takes over pga and oaps from old request.
1686 * Note that copying a list_head doesn't work, need to move it... */
1688 new_req
->rq_interpret_reply
= request
->rq_interpret_reply
;
1689 new_req
->rq_async_args
= request
->rq_async_args
;
1690 /* cap resend delay to the current request timeout, this is similar to
1691 * what ptlrpc does (see after_reply()) */
1692 if (aa
->aa_resends
> new_req
->rq_timeout
)
1693 new_req
->rq_sent
= get_seconds() + new_req
->rq_timeout
;
1695 new_req
->rq_sent
= get_seconds() + aa
->aa_resends
;
1696 new_req
->rq_generation_set
= 1;
1697 new_req
->rq_import_generation
= request
->rq_import_generation
;
1699 new_aa
= ptlrpc_req_async_args(new_req
);
1701 INIT_LIST_HEAD(&new_aa
->aa_oaps
);
1702 list_splice_init(&aa
->aa_oaps
, &new_aa
->aa_oaps
);
1703 INIT_LIST_HEAD(&new_aa
->aa_exts
);
1704 list_splice_init(&aa
->aa_exts
, &new_aa
->aa_exts
);
1705 new_aa
->aa_resends
= aa
->aa_resends
;
1707 list_for_each_entry(oap
, &new_aa
->aa_oaps
, oap_rpc_item
) {
1708 if (oap
->oap_request
) {
1709 ptlrpc_req_finished(oap
->oap_request
);
1710 oap
->oap_request
= ptlrpc_request_addref(new_req
);
1714 new_aa
->aa_ocapa
= aa
->aa_ocapa
;
1715 aa
->aa_ocapa
= NULL
;
1717 /* XXX: This code will run into problem if we're going to support
1718 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1719 * and wait for all of them to be finished. We should inherit request
1720 * set from old request. */
1721 ptlrpcd_add_req(new_req
, PDL_POLICY_SAME
, -1);
1723 DEBUG_REQ(D_INFO
, new_req
, "new request");
1728 * ugh, we want disk allocation on the target to happen in offset order. we'll
1729 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1730 * fine for our small page arrays and doesn't require allocation. its an
1731 * insertion sort that swaps elements that are strides apart, shrinking the
1732 * stride down until its '1' and the array is sorted.
1734 static void sort_brw_pages(struct brw_page
**array
, int num
)
1737 struct brw_page
*tmp
;
1741 for (stride
= 1; stride
< num
; stride
= (stride
* 3) + 1)
1746 for (i
= stride
; i
< num
; i
++) {
1749 while (j
>= stride
&& array
[j
- stride
]->off
> tmp
->off
) {
1750 array
[j
] = array
[j
- stride
];
1755 } while (stride
> 1);
1758 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
)
1760 LASSERT(ppga
!= NULL
);
1764 static int brw_interpret(const struct lu_env
*env
,
1765 struct ptlrpc_request
*req
, void *data
, int rc
)
1767 struct osc_brw_async_args
*aa
= data
;
1768 struct osc_extent
*ext
;
1769 struct osc_extent
*tmp
;
1770 struct cl_object
*obj
= NULL
;
1771 struct client_obd
*cli
= aa
->aa_cli
;
1773 rc
= osc_brw_fini_request(req
, rc
);
1774 CDEBUG(D_INODE
, "request %p aa %p rc %d\n", req
, aa
, rc
);
1775 /* When server return -EINPROGRESS, client should always retry
1776 * regardless of the number of times the bulk was resent already. */
1777 if (osc_recoverable_error(rc
)) {
1778 if (req
->rq_import_generation
!=
1779 req
->rq_import
->imp_generation
) {
1780 CDEBUG(D_HA
, "%s: resend cross eviction for object: " DOSTID
", rc = %d.\n",
1781 req
->rq_import
->imp_obd
->obd_name
,
1782 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1783 } else if (rc
== -EINPROGRESS
||
1784 client_should_resend(aa
->aa_resends
, aa
->aa_cli
)) {
1785 rc
= osc_brw_redo_request(req
, aa
, rc
);
1787 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1788 req
->rq_import
->imp_obd
->obd_name
,
1789 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1794 else if (rc
== -EAGAIN
|| rc
== -EINPROGRESS
)
1799 capa_put(aa
->aa_ocapa
);
1800 aa
->aa_ocapa
= NULL
;
1803 list_for_each_entry_safe(ext
, tmp
, &aa
->aa_exts
, oe_link
) {
1804 if (obj
== NULL
&& rc
== 0) {
1805 obj
= osc2cl(ext
->oe_obj
);
1809 list_del_init(&ext
->oe_link
);
1810 osc_extent_finish(env
, ext
, 1, rc
);
1812 LASSERT(list_empty(&aa
->aa_exts
));
1813 LASSERT(list_empty(&aa
->aa_oaps
));
1816 struct obdo
*oa
= aa
->aa_oa
;
1817 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
1818 unsigned long valid
= 0;
1821 if (oa
->o_valid
& OBD_MD_FLBLOCKS
) {
1822 attr
->cat_blocks
= oa
->o_blocks
;
1823 valid
|= CAT_BLOCKS
;
1825 if (oa
->o_valid
& OBD_MD_FLMTIME
) {
1826 attr
->cat_mtime
= oa
->o_mtime
;
1829 if (oa
->o_valid
& OBD_MD_FLATIME
) {
1830 attr
->cat_atime
= oa
->o_atime
;
1833 if (oa
->o_valid
& OBD_MD_FLCTIME
) {
1834 attr
->cat_ctime
= oa
->o_ctime
;
1838 cl_object_attr_lock(obj
);
1839 cl_object_attr_set(env
, obj
, attr
, valid
);
1840 cl_object_attr_unlock(obj
);
1842 cl_object_put(env
, obj
);
1844 OBDO_FREE(aa
->aa_oa
);
1846 cl_req_completion(env
, aa
->aa_clerq
, rc
< 0 ? rc
:
1847 req
->rq_bulk
->bd_nob_transferred
);
1848 osc_release_ppga(aa
->aa_ppga
, aa
->aa_page_count
);
1849 ptlrpc_lprocfs_brw(req
, req
->rq_bulk
->bd_nob_transferred
);
1851 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1852 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1853 * is called so we know whether to go to sync BRWs or wait for more
1854 * RPCs to complete */
1855 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
)
1856 cli
->cl_w_in_flight
--;
1858 cli
->cl_r_in_flight
--;
1859 osc_wake_cache_waiters(cli
);
1860 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1862 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_SAME
);
1867 * Build an RPC by the list of extent @ext_list. The caller must ensure
1868 * that the total pages in this list are NOT over max pages per RPC.
1869 * Extents in the list must be in OES_RPC state.
1871 int osc_build_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
1872 struct list_head
*ext_list
, int cmd
, pdl_policy_t pol
)
1874 struct ptlrpc_request
*req
= NULL
;
1875 struct osc_extent
*ext
;
1876 struct brw_page
**pga
= NULL
;
1877 struct osc_brw_async_args
*aa
= NULL
;
1878 struct obdo
*oa
= NULL
;
1879 struct osc_async_page
*oap
;
1880 struct osc_async_page
*tmp
;
1881 struct cl_req
*clerq
= NULL
;
1882 enum cl_req_type crt
= (cmd
& OBD_BRW_WRITE
) ? CRT_WRITE
: CRT_READ
;
1883 struct ldlm_lock
*lock
= NULL
;
1884 struct cl_req_attr
*crattr
= NULL
;
1885 u64 starting_offset
= OBD_OBJECT_EOF
;
1886 u64 ending_offset
= 0;
1892 struct ost_body
*body
;
1893 LIST_HEAD(rpc_list
);
1895 LASSERT(!list_empty(ext_list
));
1897 /* add pages into rpc_list to build BRW rpc */
1898 list_for_each_entry(ext
, ext_list
, oe_link
) {
1899 LASSERT(ext
->oe_state
== OES_RPC
);
1900 mem_tight
|= ext
->oe_memalloc
;
1901 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1903 list_add_tail(&oap
->oap_rpc_item
, &rpc_list
);
1904 if (starting_offset
> oap
->oap_obj_off
)
1905 starting_offset
= oap
->oap_obj_off
;
1907 LASSERT(oap
->oap_page_off
== 0);
1908 if (ending_offset
< oap
->oap_obj_off
+ oap
->oap_count
)
1909 ending_offset
= oap
->oap_obj_off
+
1912 LASSERT(oap
->oap_page_off
+ oap
->oap_count
==
1918 mpflag
= cfs_memory_pressure_get_and_set();
1920 crattr
= kzalloc(sizeof(*crattr
), GFP_NOFS
);
1926 pga
= kcalloc(page_count
, sizeof(*pga
), GFP_NOFS
);
1939 list_for_each_entry(oap
, &rpc_list
, oap_rpc_item
) {
1940 struct cl_page
*page
= oap2cl_page(oap
);
1941 if (clerq
== NULL
) {
1942 clerq
= cl_req_alloc(env
, page
, crt
,
1943 1 /* only 1-object rpcs for now */);
1944 if (IS_ERR(clerq
)) {
1945 rc
= PTR_ERR(clerq
);
1948 lock
= oap
->oap_ldlm_lock
;
1951 oap
->oap_brw_flags
|= OBD_BRW_MEMALLOC
;
1952 pga
[i
] = &oap
->oap_brw_page
;
1953 pga
[i
]->off
= oap
->oap_obj_off
+ oap
->oap_page_off
;
1954 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1955 pga
[i
]->pg
, page_index(oap
->oap_page
), oap
,
1958 cl_req_page_add(env
, clerq
, page
);
1961 /* always get the data for the obdo for the rpc */
1962 LASSERT(clerq
!= NULL
);
1963 crattr
->cra_oa
= oa
;
1964 cl_req_attr_set(env
, clerq
, crattr
, ~0ULL);
1966 oa
->o_handle
= lock
->l_remote_handle
;
1967 oa
->o_valid
|= OBD_MD_FLHANDLE
;
1970 rc
= cl_req_prep(env
, clerq
);
1972 CERROR("cl_req_prep failed: %d\n", rc
);
1976 sort_brw_pages(pga
, page_count
);
1977 rc
= osc_brw_prep_request(cmd
, cli
, oa
, NULL
, page_count
,
1978 pga
, &req
, crattr
->cra_capa
, 1, 0);
1980 CERROR("prep_req failed: %d\n", rc
);
1984 req
->rq_interpret_reply
= brw_interpret
;
1987 req
->rq_memalloc
= 1;
1989 /* Need to update the timestamps after the request is built in case
1990 * we race with setattr (locally or in queue at OST). If OST gets
1991 * later setattr before earlier BRW (as determined by the request xid),
1992 * the OST will not use BRW timestamps. Sadly, there is no obvious
1993 * way to do this in a single call. bug 10150 */
1994 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
1995 crattr
->cra_oa
= &body
->oa
;
1996 cl_req_attr_set(env
, clerq
, crattr
,
1997 OBD_MD_FLMTIME
|OBD_MD_FLCTIME
|OBD_MD_FLATIME
);
1999 lustre_msg_set_jobid(req
->rq_reqmsg
, crattr
->cra_jobid
);
2001 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2002 aa
= ptlrpc_req_async_args(req
);
2003 INIT_LIST_HEAD(&aa
->aa_oaps
);
2004 list_splice_init(&rpc_list
, &aa
->aa_oaps
);
2005 INIT_LIST_HEAD(&aa
->aa_exts
);
2006 list_splice_init(ext_list
, &aa
->aa_exts
);
2007 aa
->aa_clerq
= clerq
;
2009 /* queued sync pages can be torn down while the pages
2010 * were between the pending list and the rpc */
2012 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
2013 /* only one oap gets a request reference */
2016 if (oap
->oap_interrupted
&& !req
->rq_intr
) {
2017 CDEBUG(D_INODE
, "oap %p in req %p interrupted\n",
2019 ptlrpc_mark_interrupted(req
);
2023 tmp
->oap_request
= ptlrpc_request_addref(req
);
2025 client_obd_list_lock(&cli
->cl_loi_list_lock
);
2026 starting_offset
>>= PAGE_CACHE_SHIFT
;
2027 if (cmd
== OBD_BRW_READ
) {
2028 cli
->cl_r_in_flight
++;
2029 lprocfs_oh_tally_log2(&cli
->cl_read_page_hist
, page_count
);
2030 lprocfs_oh_tally(&cli
->cl_read_rpc_hist
, cli
->cl_r_in_flight
);
2031 lprocfs_oh_tally_log2(&cli
->cl_read_offset_hist
,
2032 starting_offset
+ 1);
2034 cli
->cl_w_in_flight
++;
2035 lprocfs_oh_tally_log2(&cli
->cl_write_page_hist
, page_count
);
2036 lprocfs_oh_tally(&cli
->cl_write_rpc_hist
, cli
->cl_w_in_flight
);
2037 lprocfs_oh_tally_log2(&cli
->cl_write_offset_hist
,
2038 starting_offset
+ 1);
2040 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
2042 DEBUG_REQ(D_INODE
, req
, "%d pages, aa %p. now %dr/%dw in flight",
2043 page_count
, aa
, cli
->cl_r_in_flight
,
2044 cli
->cl_w_in_flight
);
2046 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2047 * see which CPU/NUMA node the majority of pages were allocated
2048 * on, and try to assign the async RPC to the CPU core
2049 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2051 * But on the other hand, we expect that multiple ptlrpcd
2052 * threads and the initial write sponsor can run in parallel,
2053 * especially when data checksum is enabled, which is CPU-bound
2054 * operation and single ptlrpcd thread cannot process in time.
2055 * So more ptlrpcd threads sharing BRW load
2056 * (with PDL_POLICY_ROUND) seems better.
2058 ptlrpcd_add_req(req
, pol
, -1);
2063 cfs_memory_pressure_restore(mpflag
);
2065 if (crattr
!= NULL
) {
2066 capa_put(crattr
->cra_capa
);
2071 LASSERT(req
== NULL
);
2076 /* this should happen rarely and is pretty bad, it makes the
2077 * pending list not follow the dirty order */
2078 while (!list_empty(ext_list
)) {
2079 ext
= list_entry(ext_list
->next
, struct osc_extent
,
2081 list_del_init(&ext
->oe_link
);
2082 osc_extent_finish(env
, ext
, 0, rc
);
2084 if (clerq
&& !IS_ERR(clerq
))
2085 cl_req_completion(env
, clerq
, rc
);
2090 static int osc_set_lock_data_with_check(struct ldlm_lock
*lock
,
2091 struct ldlm_enqueue_info
*einfo
)
2093 void *data
= einfo
->ei_cbdata
;
2096 LASSERT(lock
!= NULL
);
2097 LASSERT(lock
->l_blocking_ast
== einfo
->ei_cb_bl
);
2098 LASSERT(lock
->l_resource
->lr_type
== einfo
->ei_type
);
2099 LASSERT(lock
->l_completion_ast
== einfo
->ei_cb_cp
);
2100 LASSERT(lock
->l_glimpse_ast
== einfo
->ei_cb_gl
);
2102 lock_res_and_lock(lock
);
2103 spin_lock(&osc_ast_guard
);
2105 if (lock
->l_ast_data
== NULL
)
2106 lock
->l_ast_data
= data
;
2107 if (lock
->l_ast_data
== data
)
2110 spin_unlock(&osc_ast_guard
);
2111 unlock_res_and_lock(lock
);
2116 static int osc_set_data_with_check(struct lustre_handle
*lockh
,
2117 struct ldlm_enqueue_info
*einfo
)
2119 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
2123 set
= osc_set_lock_data_with_check(lock
, einfo
);
2124 LDLM_LOCK_PUT(lock
);
2126 CERROR("lockh %p, data %p - client evicted?\n",
2127 lockh
, einfo
->ei_cbdata
);
2131 /* find any ldlm lock of the inode in osc
2135 static int osc_find_cbdata(struct obd_export
*exp
, struct lov_stripe_md
*lsm
,
2136 ldlm_iterator_t replace
, void *data
)
2138 struct ldlm_res_id res_id
;
2139 struct obd_device
*obd
= class_exp2obd(exp
);
2142 ostid_build_res_name(&lsm
->lsm_oi
, &res_id
);
2143 rc
= ldlm_resource_iterate(obd
->obd_namespace
, &res_id
, replace
, data
);
2144 if (rc
== LDLM_ITER_STOP
)
2146 if (rc
== LDLM_ITER_CONTINUE
)
2151 static int osc_enqueue_fini(struct ptlrpc_request
*req
, struct ost_lvb
*lvb
,
2152 obd_enqueue_update_f upcall
, void *cookie
,
2153 __u64
*flags
, int agl
, int rc
)
2155 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2158 /* The request was created before ldlm_cli_enqueue call. */
2159 if (rc
== ELDLM_LOCK_ABORTED
) {
2160 struct ldlm_reply
*rep
;
2161 rep
= req_capsule_server_get(&req
->rq_pill
,
2164 LASSERT(rep
!= NULL
);
2165 rep
->lock_policy_res1
=
2166 ptlrpc_status_ntoh(rep
->lock_policy_res1
);
2167 if (rep
->lock_policy_res1
)
2168 rc
= rep
->lock_policy_res1
;
2172 if ((intent
!= 0 && rc
== ELDLM_LOCK_ABORTED
&& agl
== 0) ||
2174 *flags
|= LDLM_FL_LVB_READY
;
2175 CDEBUG(D_INODE
, "got kms %llu blocks %llu mtime %llu\n",
2176 lvb
->lvb_size
, lvb
->lvb_blocks
, lvb
->lvb_mtime
);
2179 /* Call the update callback. */
2180 rc
= (*upcall
)(cookie
, rc
);
2184 static int osc_enqueue_interpret(const struct lu_env
*env
,
2185 struct ptlrpc_request
*req
,
2186 struct osc_enqueue_args
*aa
, int rc
)
2188 struct ldlm_lock
*lock
;
2189 struct lustre_handle handle
;
2191 struct ost_lvb
*lvb
;
2193 __u64
*flags
= aa
->oa_flags
;
2195 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2196 * might be freed anytime after lock upcall has been called. */
2197 lustre_handle_copy(&handle
, aa
->oa_lockh
);
2198 mode
= aa
->oa_ei
->ei_mode
;
2200 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2202 lock
= ldlm_handle2lock(&handle
);
2204 /* Take an additional reference so that a blocking AST that
2205 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2206 * to arrive after an upcall has been executed by
2207 * osc_enqueue_fini(). */
2208 ldlm_lock_addref(&handle
, mode
);
2210 /* Let CP AST to grant the lock first. */
2211 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 1);
2213 if (aa
->oa_agl
&& rc
== ELDLM_LOCK_ABORTED
) {
2218 lvb_len
= sizeof(*aa
->oa_lvb
);
2221 /* Complete obtaining the lock procedure. */
2222 rc
= ldlm_cli_enqueue_fini(aa
->oa_exp
, req
, aa
->oa_ei
->ei_type
, 1,
2223 mode
, flags
, lvb
, lvb_len
, &handle
, rc
);
2224 /* Complete osc stuff. */
2225 rc
= osc_enqueue_fini(req
, aa
->oa_lvb
, aa
->oa_upcall
, aa
->oa_cookie
,
2226 flags
, aa
->oa_agl
, rc
);
2228 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE
, 10);
2230 /* Release the lock for async request. */
2231 if (lustre_handle_is_used(&handle
) && rc
== ELDLM_OK
)
2233 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2234 * not already released by
2235 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2237 ldlm_lock_decref(&handle
, mode
);
2239 LASSERTF(lock
!= NULL
, "lockh %p, req %p, aa %p - client evicted?\n",
2240 aa
->oa_lockh
, req
, aa
);
2241 ldlm_lock_decref(&handle
, mode
);
2242 LDLM_LOCK_PUT(lock
);
2246 struct ptlrpc_request_set
*PTLRPCD_SET
= (void *)1;
2248 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2249 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2250 * other synchronous requests, however keeping some locks and trying to obtain
2251 * others may take a considerable amount of time in a case of ost failure; and
2252 * when other sync requests do not get released lock from a client, the client
2253 * is excluded from the cluster -- such scenarious make the life difficult, so
2254 * release locks just after they are obtained. */
2255 int osc_enqueue_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2256 __u64
*flags
, ldlm_policy_data_t
*policy
,
2257 struct ost_lvb
*lvb
, int kms_valid
,
2258 obd_enqueue_update_f upcall
, void *cookie
,
2259 struct ldlm_enqueue_info
*einfo
,
2260 struct lustre_handle
*lockh
,
2261 struct ptlrpc_request_set
*rqset
, int async
, int agl
)
2263 struct obd_device
*obd
= exp
->exp_obd
;
2264 struct ptlrpc_request
*req
= NULL
;
2265 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2266 __u64 match_lvb
= (agl
!= 0 ? 0 : LDLM_FL_LVB_READY
);
2270 /* Filesystem lock extents are extended to page boundaries so that
2271 * dealing with the page cache is a little smoother. */
2272 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2273 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2276 * kms is not valid when either object is completely fresh (so that no
2277 * locks are cached), or object was evicted. In the latter case cached
2278 * lock cannot be used, because it would prime inode state with
2279 * potentially stale LVB.
2284 /* Next, search for already existing extent locks that will cover us */
2285 /* If we're trying to read, we also search for an existing PW lock. The
2286 * VFS and page cache already protect us locally, so lots of readers/
2287 * writers can share a single PW lock.
2289 * There are problems with conversion deadlocks, so instead of
2290 * converting a read lock to a write lock, we'll just enqueue a new
2293 * At some point we should cancel the read lock instead of making them
2294 * send us a blocking callback, but there are problems with canceling
2295 * locks out from other users right now, too. */
2296 mode
= einfo
->ei_mode
;
2297 if (einfo
->ei_mode
== LCK_PR
)
2299 mode
= ldlm_lock_match(obd
->obd_namespace
, *flags
| match_lvb
, res_id
,
2300 einfo
->ei_type
, policy
, mode
, lockh
, 0);
2302 struct ldlm_lock
*matched
= ldlm_handle2lock(lockh
);
2304 if ((agl
!= 0) && !(matched
->l_flags
& LDLM_FL_LVB_READY
)) {
2305 /* For AGL, if enqueue RPC is sent but the lock is not
2306 * granted, then skip to process this strpe.
2307 * Return -ECANCELED to tell the caller. */
2308 ldlm_lock_decref(lockh
, mode
);
2309 LDLM_LOCK_PUT(matched
);
2313 if (osc_set_lock_data_with_check(matched
, einfo
)) {
2314 *flags
|= LDLM_FL_LVB_READY
;
2315 /* addref the lock only if not async requests and PW
2316 * lock is matched whereas we asked for PR. */
2317 if (!rqset
&& einfo
->ei_mode
!= mode
)
2318 ldlm_lock_addref(lockh
, LCK_PR
);
2320 /* I would like to be able to ASSERT here that
2321 * rss <= kms, but I can't, for reasons which
2322 * are explained in lov_enqueue() */
2325 /* We already have a lock, and it's referenced.
2327 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2328 * AGL upcall may change it to CLS_HELD directly. */
2329 (*upcall
)(cookie
, ELDLM_OK
);
2331 if (einfo
->ei_mode
!= mode
)
2332 ldlm_lock_decref(lockh
, LCK_PW
);
2334 /* For async requests, decref the lock. */
2335 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
2336 LDLM_LOCK_PUT(matched
);
2340 ldlm_lock_decref(lockh
, mode
);
2341 LDLM_LOCK_PUT(matched
);
2347 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2348 &RQF_LDLM_ENQUEUE_LVB
);
2352 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, 0);
2354 ptlrpc_request_free(req
);
2358 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
2360 ptlrpc_request_set_replen(req
);
2363 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2364 *flags
&= ~LDLM_FL_BLOCK_GRANTED
;
2366 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, res_id
, policy
, flags
, lvb
,
2367 sizeof(*lvb
), LVB_T_OST
, lockh
, async
);
2370 struct osc_enqueue_args
*aa
;
2371 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2372 aa
= ptlrpc_req_async_args(req
);
2375 aa
->oa_flags
= flags
;
2376 aa
->oa_upcall
= upcall
;
2377 aa
->oa_cookie
= cookie
;
2379 aa
->oa_lockh
= lockh
;
2382 req
->rq_interpret_reply
=
2383 (ptlrpc_interpterer_t
)osc_enqueue_interpret
;
2384 if (rqset
== PTLRPCD_SET
)
2385 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
2387 ptlrpc_set_add_req(rqset
, req
);
2388 } else if (intent
) {
2389 ptlrpc_req_finished(req
);
2394 rc
= osc_enqueue_fini(req
, lvb
, upcall
, cookie
, flags
, agl
, rc
);
2396 ptlrpc_req_finished(req
);
2401 int osc_match_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2402 __u32 type
, ldlm_policy_data_t
*policy
, __u32 mode
,
2403 __u64
*flags
, void *data
, struct lustre_handle
*lockh
,
2406 struct obd_device
*obd
= exp
->exp_obd
;
2407 __u64 lflags
= *flags
;
2410 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH
))
2413 /* Filesystem lock extents are extended to page boundaries so that
2414 * dealing with the page cache is a little smoother */
2415 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2416 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2418 /* Next, search for already existing extent locks that will cover us */
2419 /* If we're trying to read, we also search for an existing PW lock. The
2420 * VFS and page cache already protect us locally, so lots of readers/
2421 * writers can share a single PW lock. */
2425 rc
= ldlm_lock_match(obd
->obd_namespace
, lflags
,
2426 res_id
, type
, policy
, rc
, lockh
, unref
);
2429 if (!osc_set_data_with_check(lockh
, data
)) {
2430 if (!(lflags
& LDLM_FL_TEST_LOCK
))
2431 ldlm_lock_decref(lockh
, rc
);
2435 if (!(lflags
& LDLM_FL_TEST_LOCK
) && mode
!= rc
) {
2436 ldlm_lock_addref(lockh
, LCK_PR
);
2437 ldlm_lock_decref(lockh
, LCK_PW
);
2444 int osc_cancel_base(struct lustre_handle
*lockh
, __u32 mode
)
2446 if (unlikely(mode
== LCK_GROUP
))
2447 ldlm_lock_decref_and_cancel(lockh
, mode
);
2449 ldlm_lock_decref(lockh
, mode
);
2454 static int osc_statfs_interpret(const struct lu_env
*env
,
2455 struct ptlrpc_request
*req
,
2456 struct osc_async_args
*aa
, int rc
)
2458 struct obd_statfs
*msfs
;
2461 /* The request has in fact never been sent
2462 * due to issues at a higher level (LOV).
2463 * Exit immediately since the caller is
2464 * aware of the problem and takes care
2465 * of the clean up */
2468 if ((rc
== -ENOTCONN
|| rc
== -EAGAIN
) &&
2469 (aa
->aa_oi
->oi_flags
& OBD_STATFS_NODELAY
)) {
2477 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2483 *aa
->aa_oi
->oi_osfs
= *msfs
;
2485 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
2489 static int osc_statfs_async(struct obd_export
*exp
,
2490 struct obd_info
*oinfo
, __u64 max_age
,
2491 struct ptlrpc_request_set
*rqset
)
2493 struct obd_device
*obd
= class_exp2obd(exp
);
2494 struct ptlrpc_request
*req
;
2495 struct osc_async_args
*aa
;
2498 /* We could possibly pass max_age in the request (as an absolute
2499 * timestamp or a "seconds.usec ago") so the target can avoid doing
2500 * extra calls into the filesystem if that isn't necessary (e.g.
2501 * during mount that would help a bit). Having relative timestamps
2502 * is not so great if request processing is slow, while absolute
2503 * timestamps are not ideal because they need time synchronization. */
2504 req
= ptlrpc_request_alloc(obd
->u
.cli
.cl_import
, &RQF_OST_STATFS
);
2508 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2510 ptlrpc_request_free(req
);
2513 ptlrpc_request_set_replen(req
);
2514 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2515 ptlrpc_at_set_req_timeout(req
);
2517 if (oinfo
->oi_flags
& OBD_STATFS_NODELAY
) {
2518 /* procfs requests not want stat in wait for avoid deadlock */
2519 req
->rq_no_resend
= 1;
2520 req
->rq_no_delay
= 1;
2523 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_statfs_interpret
;
2524 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2525 aa
= ptlrpc_req_async_args(req
);
2528 ptlrpc_set_add_req(rqset
, req
);
2532 static int osc_statfs(const struct lu_env
*env
, struct obd_export
*exp
,
2533 struct obd_statfs
*osfs
, __u64 max_age
, __u32 flags
)
2535 struct obd_device
*obd
= class_exp2obd(exp
);
2536 struct obd_statfs
*msfs
;
2537 struct ptlrpc_request
*req
;
2538 struct obd_import
*imp
= NULL
;
2541 /*Since the request might also come from lprocfs, so we need
2542 *sync this with client_disconnect_export Bug15684*/
2543 down_read(&obd
->u
.cli
.cl_sem
);
2544 if (obd
->u
.cli
.cl_import
)
2545 imp
= class_import_get(obd
->u
.cli
.cl_import
);
2546 up_read(&obd
->u
.cli
.cl_sem
);
2550 /* We could possibly pass max_age in the request (as an absolute
2551 * timestamp or a "seconds.usec ago") so the target can avoid doing
2552 * extra calls into the filesystem if that isn't necessary (e.g.
2553 * during mount that would help a bit). Having relative timestamps
2554 * is not so great if request processing is slow, while absolute
2555 * timestamps are not ideal because they need time synchronization. */
2556 req
= ptlrpc_request_alloc(imp
, &RQF_OST_STATFS
);
2558 class_import_put(imp
);
2563 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2565 ptlrpc_request_free(req
);
2568 ptlrpc_request_set_replen(req
);
2569 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2570 ptlrpc_at_set_req_timeout(req
);
2572 if (flags
& OBD_STATFS_NODELAY
) {
2573 /* procfs requests not want stat in wait for avoid deadlock */
2574 req
->rq_no_resend
= 1;
2575 req
->rq_no_delay
= 1;
2578 rc
= ptlrpc_queue_wait(req
);
2582 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2591 ptlrpc_req_finished(req
);
2595 /* Retrieve object striping information.
2597 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2598 * the maximum number of OST indices which will fit in the user buffer.
2599 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2601 static int osc_getstripe(struct lov_stripe_md
*lsm
, struct lov_user_md
*lump
)
2603 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2604 struct lov_user_md_v3 lum
, *lumk
;
2605 struct lov_user_ost_data_v1
*lmm_objects
;
2606 int rc
= 0, lum_size
;
2611 /* we only need the header part from user space to get lmm_magic and
2612 * lmm_stripe_count, (the header part is common to v1 and v3) */
2613 lum_size
= sizeof(struct lov_user_md_v1
);
2614 if (copy_from_user(&lum
, lump
, lum_size
))
2617 if ((lum
.lmm_magic
!= LOV_USER_MAGIC_V1
) &&
2618 (lum
.lmm_magic
!= LOV_USER_MAGIC_V3
))
2621 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2622 LASSERT(sizeof(struct lov_user_md_v1
) == sizeof(struct lov_mds_md_v1
));
2623 LASSERT(sizeof(struct lov_user_md_v3
) == sizeof(struct lov_mds_md_v3
));
2624 LASSERT(sizeof(lum
.lmm_objects
[0]) == sizeof(lumk
->lmm_objects
[0]));
2626 /* we can use lov_mds_md_size() to compute lum_size
2627 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2628 if (lum
.lmm_stripe_count
> 0) {
2629 lum_size
= lov_mds_md_size(lum
.lmm_stripe_count
, lum
.lmm_magic
);
2630 lumk
= kzalloc(lum_size
, GFP_NOFS
);
2634 if (lum
.lmm_magic
== LOV_USER_MAGIC_V1
)
2636 &(((struct lov_user_md_v1
*)lumk
)->lmm_objects
[0]);
2638 lmm_objects
= &(lumk
->lmm_objects
[0]);
2639 lmm_objects
->l_ost_oi
= lsm
->lsm_oi
;
2641 lum_size
= lov_mds_md_size(0, lum
.lmm_magic
);
2645 lumk
->lmm_oi
= lsm
->lsm_oi
;
2646 lumk
->lmm_stripe_count
= 1;
2648 if (copy_to_user(lump
, lumk
, lum_size
))
2658 static int osc_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
2659 void *karg
, void *uarg
)
2661 struct obd_device
*obd
= exp
->exp_obd
;
2662 struct obd_ioctl_data
*data
= karg
;
2665 if (!try_module_get(THIS_MODULE
)) {
2666 CERROR("Can't get module. Is it alive?");
2670 case OBD_IOC_LOV_GET_CONFIG
: {
2672 struct lov_desc
*desc
;
2673 struct obd_uuid uuid
;
2677 if (obd_ioctl_getdata(&buf
, &len
, uarg
)) {
2682 data
= (struct obd_ioctl_data
*)buf
;
2684 if (sizeof(*desc
) > data
->ioc_inllen1
) {
2685 obd_ioctl_freedata(buf
, len
);
2690 if (data
->ioc_inllen2
< sizeof(uuid
)) {
2691 obd_ioctl_freedata(buf
, len
);
2696 desc
= (struct lov_desc
*)data
->ioc_inlbuf1
;
2697 desc
->ld_tgt_count
= 1;
2698 desc
->ld_active_tgt_count
= 1;
2699 desc
->ld_default_stripe_count
= 1;
2700 desc
->ld_default_stripe_size
= 0;
2701 desc
->ld_default_stripe_offset
= 0;
2702 desc
->ld_pattern
= 0;
2703 memcpy(&desc
->ld_uuid
, &obd
->obd_uuid
, sizeof(uuid
));
2705 memcpy(data
->ioc_inlbuf2
, &obd
->obd_uuid
, sizeof(uuid
));
2707 err
= copy_to_user(uarg
, buf
, len
);
2710 obd_ioctl_freedata(buf
, len
);
2713 case LL_IOC_LOV_SETSTRIPE
:
2714 err
= obd_alloc_memmd(exp
, karg
);
2718 case LL_IOC_LOV_GETSTRIPE
:
2719 err
= osc_getstripe(karg
, uarg
);
2721 case OBD_IOC_CLIENT_RECOVER
:
2722 err
= ptlrpc_recover_import(obd
->u
.cli
.cl_import
,
2723 data
->ioc_inlbuf1
, 0);
2727 case IOC_OSC_SET_ACTIVE
:
2728 err
= ptlrpc_set_import_active(obd
->u
.cli
.cl_import
,
2731 case OBD_IOC_POLL_QUOTACHECK
:
2732 err
= osc_quota_poll_check(exp
, (struct if_quotacheck
*)karg
);
2734 case OBD_IOC_PING_TARGET
:
2735 err
= ptlrpc_obd_ping(obd
);
2738 CDEBUG(D_INODE
, "unrecognised ioctl %#x by %s\n",
2739 cmd
, current_comm());
2744 module_put(THIS_MODULE
);
2748 static int osc_get_info(const struct lu_env
*env
, struct obd_export
*exp
,
2749 u32 keylen
, void *key
, __u32
*vallen
, void *val
,
2750 struct lov_stripe_md
*lsm
)
2752 if (!vallen
|| !val
)
2755 if (KEY_IS(KEY_LOCK_TO_STRIPE
)) {
2756 __u32
*stripe
= val
;
2757 *vallen
= sizeof(*stripe
);
2760 } else if (KEY_IS(KEY_LAST_ID
)) {
2761 struct ptlrpc_request
*req
;
2766 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2767 &RQF_OST_GET_INFO_LAST_ID
);
2771 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2772 RCL_CLIENT
, keylen
);
2773 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2775 ptlrpc_request_free(req
);
2779 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2780 memcpy(tmp
, key
, keylen
);
2782 req
->rq_no_delay
= req
->rq_no_resend
= 1;
2783 ptlrpc_request_set_replen(req
);
2784 rc
= ptlrpc_queue_wait(req
);
2788 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_ID
);
2789 if (reply
== NULL
) {
2794 *((u64
*)val
) = *reply
;
2796 ptlrpc_req_finished(req
);
2798 } else if (KEY_IS(KEY_FIEMAP
)) {
2799 struct ll_fiemap_info_key
*fm_key
=
2800 (struct ll_fiemap_info_key
*)key
;
2801 struct ldlm_res_id res_id
;
2802 ldlm_policy_data_t policy
;
2803 struct lustre_handle lockh
;
2804 ldlm_mode_t mode
= 0;
2805 struct ptlrpc_request
*req
;
2806 struct ll_user_fiemap
*reply
;
2810 if (!(fm_key
->fiemap
.fm_flags
& FIEMAP_FLAG_SYNC
))
2813 policy
.l_extent
.start
= fm_key
->fiemap
.fm_start
&
2816 if (OBD_OBJECT_EOF
- fm_key
->fiemap
.fm_length
<=
2817 fm_key
->fiemap
.fm_start
+ PAGE_CACHE_SIZE
- 1)
2818 policy
.l_extent
.end
= OBD_OBJECT_EOF
;
2820 policy
.l_extent
.end
= (fm_key
->fiemap
.fm_start
+
2821 fm_key
->fiemap
.fm_length
+
2822 PAGE_CACHE_SIZE
- 1) & CFS_PAGE_MASK
;
2824 ostid_build_res_name(&fm_key
->oa
.o_oi
, &res_id
);
2825 mode
= ldlm_lock_match(exp
->exp_obd
->obd_namespace
,
2826 LDLM_FL_BLOCK_GRANTED
|
2828 &res_id
, LDLM_EXTENT
, &policy
,
2829 LCK_PR
| LCK_PW
, &lockh
, 0);
2830 if (mode
) { /* lock is cached on client */
2831 if (mode
!= LCK_PR
) {
2832 ldlm_lock_addref(&lockh
, LCK_PR
);
2833 ldlm_lock_decref(&lockh
, LCK_PW
);
2835 } else { /* no cached lock, needs acquire lock on server side */
2836 fm_key
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
2837 fm_key
->oa
.o_flags
|= OBD_FL_SRVLOCK
;
2841 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2842 &RQF_OST_GET_INFO_FIEMAP
);
2848 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_KEY
,
2849 RCL_CLIENT
, keylen
);
2850 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2851 RCL_CLIENT
, *vallen
);
2852 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2853 RCL_SERVER
, *vallen
);
2855 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2857 ptlrpc_request_free(req
);
2861 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_KEY
);
2862 memcpy(tmp
, key
, keylen
);
2863 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2864 memcpy(tmp
, val
, *vallen
);
2866 ptlrpc_request_set_replen(req
);
2867 rc
= ptlrpc_queue_wait(req
);
2871 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2872 if (reply
== NULL
) {
2877 memcpy(val
, reply
, *vallen
);
2879 ptlrpc_req_finished(req
);
2882 ldlm_lock_decref(&lockh
, LCK_PR
);
2889 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
2890 u32 keylen
, void *key
, u32 vallen
,
2891 void *val
, struct ptlrpc_request_set
*set
)
2893 struct ptlrpc_request
*req
;
2894 struct obd_device
*obd
= exp
->exp_obd
;
2895 struct obd_import
*imp
= class_exp2cliimp(exp
);
2899 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN
, 10);
2901 if (KEY_IS(KEY_CHECKSUM
)) {
2902 if (vallen
!= sizeof(int))
2904 exp
->exp_obd
->u
.cli
.cl_checksum
= (*(int *)val
) ? 1 : 0;
2908 if (KEY_IS(KEY_SPTLRPC_CONF
)) {
2909 sptlrpc_conf_client_adapt(obd
);
2913 if (KEY_IS(KEY_FLUSH_CTX
)) {
2914 sptlrpc_import_flush_my_ctx(imp
);
2918 if (KEY_IS(KEY_CACHE_SET
)) {
2919 struct client_obd
*cli
= &obd
->u
.cli
;
2921 LASSERT(cli
->cl_cache
== NULL
); /* only once */
2922 cli
->cl_cache
= (struct cl_client_cache
*)val
;
2923 atomic_inc(&cli
->cl_cache
->ccc_users
);
2924 cli
->cl_lru_left
= &cli
->cl_cache
->ccc_lru_left
;
2926 /* add this osc into entity list */
2927 LASSERT(list_empty(&cli
->cl_lru_osc
));
2928 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
2929 list_add(&cli
->cl_lru_osc
, &cli
->cl_cache
->ccc_lru
);
2930 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
2935 if (KEY_IS(KEY_CACHE_LRU_SHRINK
)) {
2936 struct client_obd
*cli
= &obd
->u
.cli
;
2937 int nr
= atomic_read(&cli
->cl_lru_in_list
) >> 1;
2938 int target
= *(int *)val
;
2940 nr
= osc_lru_shrink(cli
, min(nr
, target
));
2945 if (!set
&& !KEY_IS(KEY_GRANT_SHRINK
))
2948 /* We pass all other commands directly to OST. Since nobody calls osc
2949 methods directly and everybody is supposed to go through LOV, we
2950 assume lov checked invalid values for us.
2951 The only recognised values so far are evict_by_nid and mds_conn.
2952 Even if something bad goes through, we'd get a -EINVAL from OST
2955 req
= ptlrpc_request_alloc(imp
, KEY_IS(KEY_GRANT_SHRINK
) ?
2956 &RQF_OST_SET_GRANT_INFO
:
2961 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2962 RCL_CLIENT
, keylen
);
2963 if (!KEY_IS(KEY_GRANT_SHRINK
))
2964 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
2965 RCL_CLIENT
, vallen
);
2966 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SET_INFO
);
2968 ptlrpc_request_free(req
);
2972 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2973 memcpy(tmp
, key
, keylen
);
2974 tmp
= req_capsule_client_get(&req
->rq_pill
, KEY_IS(KEY_GRANT_SHRINK
) ?
2977 memcpy(tmp
, val
, vallen
);
2979 if (KEY_IS(KEY_GRANT_SHRINK
)) {
2980 struct osc_brw_async_args
*aa
;
2983 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2984 aa
= ptlrpc_req_async_args(req
);
2987 ptlrpc_req_finished(req
);
2990 *oa
= ((struct ost_body
*)val
)->oa
;
2992 req
->rq_interpret_reply
= osc_shrink_grant_interpret
;
2995 ptlrpc_request_set_replen(req
);
2996 if (!KEY_IS(KEY_GRANT_SHRINK
)) {
2997 LASSERT(set
!= NULL
);
2998 ptlrpc_set_add_req(set
, req
);
2999 ptlrpc_check_set(NULL
, set
);
3001 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
3006 static int osc_reconnect(const struct lu_env
*env
,
3007 struct obd_export
*exp
, struct obd_device
*obd
,
3008 struct obd_uuid
*cluuid
,
3009 struct obd_connect_data
*data
,
3012 struct client_obd
*cli
= &obd
->u
.cli
;
3014 if (data
!= NULL
&& (data
->ocd_connect_flags
& OBD_CONNECT_GRANT
)) {
3017 client_obd_list_lock(&cli
->cl_loi_list_lock
);
3018 data
->ocd_grant
= (cli
->cl_avail_grant
+ cli
->cl_dirty
) ?:
3019 2 * cli_brw_size(obd
);
3020 lost_grant
= cli
->cl_lost_grant
;
3021 cli
->cl_lost_grant
= 0;
3022 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3024 CDEBUG(D_RPCTRACE
, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3025 data
->ocd_connect_flags
,
3026 data
->ocd_version
, data
->ocd_grant
, lost_grant
);
3032 static int osc_disconnect(struct obd_export
*exp
)
3034 struct obd_device
*obd
= class_exp2obd(exp
);
3037 rc
= client_disconnect_export(exp
);
3039 * Initially we put del_shrink_grant before disconnect_export, but it
3040 * causes the following problem if setup (connect) and cleanup
3041 * (disconnect) are tangled together.
3042 * connect p1 disconnect p2
3043 * ptlrpc_connect_import
3044 * ............... class_manual_cleanup
3047 * ptlrpc_connect_interrupt
3049 * add this client to shrink list
3051 * Bang! pinger trigger the shrink.
3052 * So the osc should be disconnected from the shrink list, after we
3053 * are sure the import has been destroyed. BUG18662
3055 if (obd
->u
.cli
.cl_import
== NULL
)
3056 osc_del_shrink_grant(&obd
->u
.cli
);
3060 static int osc_import_event(struct obd_device
*obd
,
3061 struct obd_import
*imp
,
3062 enum obd_import_event event
)
3064 struct client_obd
*cli
;
3067 LASSERT(imp
->imp_obd
== obd
);
3070 case IMP_EVENT_DISCON
: {
3072 client_obd_list_lock(&cli
->cl_loi_list_lock
);
3073 cli
->cl_avail_grant
= 0;
3074 cli
->cl_lost_grant
= 0;
3075 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3078 case IMP_EVENT_INACTIVE
: {
3079 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_INACTIVE
, NULL
);
3082 case IMP_EVENT_INVALIDATE
: {
3083 struct ldlm_namespace
*ns
= obd
->obd_namespace
;
3087 env
= cl_env_get(&refcheck
);
3091 /* all pages go to failing rpcs due to the invalid
3093 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_ROUND
);
3095 ldlm_namespace_cleanup(ns
, LDLM_FL_LOCAL_ONLY
);
3096 cl_env_put(env
, &refcheck
);
3101 case IMP_EVENT_ACTIVE
: {
3102 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVE
, NULL
);
3105 case IMP_EVENT_OCD
: {
3106 struct obd_connect_data
*ocd
= &imp
->imp_connect_data
;
3108 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT
)
3109 osc_init_grant(&obd
->u
.cli
, ocd
);
3112 if (ocd
->ocd_connect_flags
& OBD_CONNECT_REQPORTAL
)
3113 imp
->imp_client
->cli_request_portal
= OST_REQUEST_PORTAL
;
3115 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_OCD
, NULL
);
3118 case IMP_EVENT_DEACTIVATE
: {
3119 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_DEACTIVATE
, NULL
);
3122 case IMP_EVENT_ACTIVATE
: {
3123 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVATE
, NULL
);
3127 CERROR("Unknown import event %d\n", event
);
3134 * Determine whether the lock can be canceled before replaying the lock
3135 * during recovery, see bug16774 for detailed information.
3137 * \retval zero the lock can't be canceled
3138 * \retval other ok to cancel
3140 static int osc_cancel_for_recovery(struct ldlm_lock
*lock
)
3142 check_res_locked(lock
->l_resource
);
3145 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3147 * XXX as a future improvement, we can also cancel unused write lock
3148 * if it doesn't have dirty data and active mmaps.
3150 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
3151 (lock
->l_granted_mode
== LCK_PR
||
3152 lock
->l_granted_mode
== LCK_CR
) &&
3153 (osc_dlm_lock_pageref(lock
) == 0))
3159 static int brw_queue_work(const struct lu_env
*env
, void *data
)
3161 struct client_obd
*cli
= data
;
3163 CDEBUG(D_CACHE
, "Run writeback work for client obd %p.\n", cli
);
3165 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_SAME
);
3169 int osc_setup(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3171 struct lprocfs_static_vars lvars
= { NULL
};
3172 struct client_obd
*cli
= &obd
->u
.cli
;
3179 rc
= ptlrpcd_addref();
3183 rc
= client_obd_setup(obd
, lcfg
);
3187 handler
= ptlrpcd_alloc_work(cli
->cl_import
, brw_queue_work
, cli
);
3188 if (IS_ERR(handler
)) {
3189 rc
= PTR_ERR(handler
);
3190 goto out_client_setup
;
3192 cli
->cl_writeback_work
= handler
;
3194 rc
= osc_quota_setup(obd
);
3196 goto out_ptlrpcd_work
;
3198 cli
->cl_grant_shrink_interval
= GRANT_SHRINK_INTERVAL
;
3199 lprocfs_osc_init_vars(&lvars
);
3200 if (lprocfs_obd_setup(obd
, lvars
.obd_vars
, lvars
.sysfs_vars
) == 0) {
3201 lproc_osc_attach_seqstat(obd
);
3202 sptlrpc_lprocfs_cliobd_attach(obd
);
3203 ptlrpc_lprocfs_register_obd(obd
);
3207 * We try to control the total number of requests with a upper limit
3208 * osc_reqpool_maxreqcount. There might be some race which will cause
3209 * over-limit allocation, but it is fine.
3211 req_count
= atomic_read(&osc_pool_req_count
);
3212 if (req_count
< osc_reqpool_maxreqcount
) {
3213 adding
= cli
->cl_max_rpcs_in_flight
+ 2;
3214 if (req_count
+ adding
> osc_reqpool_maxreqcount
)
3215 adding
= osc_reqpool_maxreqcount
- req_count
;
3217 added
= ptlrpc_add_rqs_to_pool(osc_rq_pool
, adding
);
3218 atomic_add(added
, &osc_pool_req_count
);
3221 INIT_LIST_HEAD(&cli
->cl_grant_shrink_list
);
3222 ns_register_cancel(obd
->obd_namespace
, osc_cancel_for_recovery
);
3226 ptlrpcd_destroy_work(handler
);
3228 client_obd_cleanup(obd
);
3234 static int osc_precleanup(struct obd_device
*obd
, enum obd_cleanup_stage stage
)
3237 case OBD_CLEANUP_EARLY
: {
3238 struct obd_import
*imp
;
3239 imp
= obd
->u
.cli
.cl_import
;
3240 CDEBUG(D_HA
, "Deactivating import %s\n", obd
->obd_name
);
3241 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3242 ptlrpc_deactivate_import(imp
);
3243 spin_lock(&imp
->imp_lock
);
3244 imp
->imp_pingable
= 0;
3245 spin_unlock(&imp
->imp_lock
);
3248 case OBD_CLEANUP_EXPORTS
: {
3249 struct client_obd
*cli
= &obd
->u
.cli
;
3251 * for echo client, export may be on zombie list, wait for
3252 * zombie thread to cull it, because cli.cl_import will be
3253 * cleared in client_disconnect_export():
3254 * class_export_destroy() -> obd_cleanup() ->
3255 * echo_device_free() -> echo_client_cleanup() ->
3256 * obd_disconnect() -> osc_disconnect() ->
3257 * client_disconnect_export()
3259 obd_zombie_barrier();
3260 if (cli
->cl_writeback_work
) {
3261 ptlrpcd_destroy_work(cli
->cl_writeback_work
);
3262 cli
->cl_writeback_work
= NULL
;
3264 obd_cleanup_client_import(obd
);
3265 ptlrpc_lprocfs_unregister_obd(obd
);
3266 lprocfs_obd_cleanup(obd
);
3273 int osc_cleanup(struct obd_device
*obd
)
3275 struct client_obd
*cli
= &obd
->u
.cli
;
3279 if (cli
->cl_cache
!= NULL
) {
3280 LASSERT(atomic_read(&cli
->cl_cache
->ccc_users
) > 0);
3281 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
3282 list_del_init(&cli
->cl_lru_osc
);
3283 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
3284 cli
->cl_lru_left
= NULL
;
3285 atomic_dec(&cli
->cl_cache
->ccc_users
);
3286 cli
->cl_cache
= NULL
;
3289 /* free memory of osc quota cache */
3290 osc_quota_cleanup(obd
);
3292 rc
= client_obd_cleanup(obd
);
3298 int osc_process_config_base(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3300 struct lprocfs_static_vars lvars
= { NULL
};
3303 lprocfs_osc_init_vars(&lvars
);
3305 switch (lcfg
->lcfg_command
) {
3307 rc
= class_process_proc_param(PARAM_OSC
, lvars
.obd_vars
,
3317 static int osc_process_config(struct obd_device
*obd
, u32 len
, void *buf
)
3319 return osc_process_config_base(obd
, buf
);
3322 struct obd_ops osc_obd_ops
= {
3323 .o_owner
= THIS_MODULE
,
3324 .o_setup
= osc_setup
,
3325 .o_precleanup
= osc_precleanup
,
3326 .o_cleanup
= osc_cleanup
,
3327 .o_add_conn
= client_import_add_conn
,
3328 .o_del_conn
= client_import_del_conn
,
3329 .o_connect
= client_connect_import
,
3330 .o_reconnect
= osc_reconnect
,
3331 .o_disconnect
= osc_disconnect
,
3332 .o_statfs
= osc_statfs
,
3333 .o_statfs_async
= osc_statfs_async
,
3334 .o_packmd
= osc_packmd
,
3335 .o_unpackmd
= osc_unpackmd
,
3336 .o_create
= osc_create
,
3337 .o_destroy
= osc_destroy
,
3338 .o_getattr
= osc_getattr
,
3339 .o_getattr_async
= osc_getattr_async
,
3340 .o_setattr
= osc_setattr
,
3341 .o_setattr_async
= osc_setattr_async
,
3342 .o_find_cbdata
= osc_find_cbdata
,
3343 .o_iocontrol
= osc_iocontrol
,
3344 .o_get_info
= osc_get_info
,
3345 .o_set_info_async
= osc_set_info_async
,
3346 .o_import_event
= osc_import_event
,
3347 .o_process_config
= osc_process_config
,
3348 .o_quotactl
= osc_quotactl
,
3349 .o_quotacheck
= osc_quotacheck
,
3352 extern struct lu_kmem_descr osc_caches
[];
3353 extern spinlock_t osc_ast_guard
;
3354 extern struct lock_class_key osc_ast_guard_class
;
3356 static int __init
osc_init(void)
3358 struct lprocfs_static_vars lvars
= { NULL
};
3359 unsigned int reqpool_size
;
3360 unsigned int reqsize
;
3363 /* print an address of _any_ initialized kernel symbol from this
3364 * module, to allow debugging with gdb that doesn't support data
3365 * symbols from modules.*/
3366 CDEBUG(D_INFO
, "Lustre OSC module (%p).\n", &osc_caches
);
3368 rc
= lu_kmem_init(osc_caches
);
3372 lprocfs_osc_init_vars(&lvars
);
3374 rc
= class_register_type(&osc_obd_ops
, NULL
,
3375 LUSTRE_OSC_NAME
, &osc_device_type
);
3379 spin_lock_init(&osc_ast_guard
);
3380 lockdep_set_class(&osc_ast_guard
, &osc_ast_guard_class
);
3382 /* This is obviously too much memory, only prevent overflow here */
3383 if (osc_reqpool_mem_max
>= 1 << 12 || osc_reqpool_mem_max
== 0) {
3388 reqpool_size
= osc_reqpool_mem_max
<< 20;
3391 while (reqsize
< OST_MAXREQSIZE
)
3392 reqsize
= reqsize
<< 1;
3395 * We don't enlarge the request count in OSC pool according to
3396 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3397 * tried after normal allocation failed. So a small OSC pool won't
3398 * cause much performance degression in most of cases.
3400 osc_reqpool_maxreqcount
= reqpool_size
/ reqsize
;
3402 atomic_set(&osc_pool_req_count
, 0);
3403 osc_rq_pool
= ptlrpc_init_rq_pool(0, OST_MAXREQSIZE
,
3404 ptlrpc_add_rqs_to_pool
);
3412 class_unregister_type(LUSTRE_OSC_NAME
);
3414 lu_kmem_fini(osc_caches
);
3418 static void /*__exit*/ osc_exit(void)
3420 class_unregister_type(LUSTRE_OSC_NAME
);
3421 lu_kmem_fini(osc_caches
);
3422 ptlrpc_free_rq_pool(osc_rq_pool
);
3425 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3426 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3427 MODULE_LICENSE("GPL");
3428 MODULE_VERSION(LUSTRE_VERSION_STRING
);
3430 module_init(osc_init
);
3431 module_exit(osc_exit
);