staging/lustre/debug: quiet noisy console error messages
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include <linux/libcfs/libcfs.h>
40
41
42#include <lustre_dlm.h>
43#include <lustre_net.h>
44#include <lustre/lustre_user.h>
45#include <obd_cksum.h>
46#include <obd_ost.h>
47#include <obd_lov.h>
48
49#ifdef __CYGWIN__
50# include <ctype.h>
51#endif
52
53#include <lustre_ha.h>
54#include <lprocfs_status.h>
55#include <lustre_log.h>
56#include <lustre_debug.h>
57#include <lustre_param.h>
58#include <lustre_fid.h>
59#include "osc_internal.h"
60#include "osc_cl_internal.h"
61
62static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65int osc_cleanup(struct obd_device *obd);
66
67/* Pack OSC object metadata for disk storage (LE byte order). */
68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
70{
71 int lmm_size;
72 ENTRY;
73
74 lmm_size = sizeof(**lmmp);
75 if (lmmp == NULL)
76 RETURN(lmm_size);
77
78 if (*lmmp != NULL && lsm == NULL) {
79 OBD_FREE(*lmmp, lmm_size);
80 *lmmp = NULL;
81 RETURN(0);
82 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
83 RETURN(-EBADF);
84 }
85
86 if (*lmmp == NULL) {
87 OBD_ALLOC(*lmmp, lmm_size);
88 if (*lmmp == NULL)
89 RETURN(-ENOMEM);
90 }
91
92 if (lsm)
93 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
94
95 RETURN(lmm_size);
96}
97
98/* Unpack OSC object metadata from disk storage (LE byte order). */
99static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100 struct lov_mds_md *lmm, int lmm_bytes)
101{
102 int lsm_size;
103 struct obd_import *imp = class_exp2cliimp(exp);
104 ENTRY;
105
106 if (lmm != NULL) {
107 if (lmm_bytes < sizeof(*lmm)) {
108 CERROR("%s: lov_mds_md too small: %d, need %d\n",
109 exp->exp_obd->obd_name, lmm_bytes,
110 (int)sizeof(*lmm));
111 RETURN(-EINVAL);
112 }
113 /* XXX LOV_MAGIC etc check? */
114
115 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116 CERROR("%s: zero lmm_object_id: rc = %d\n",
117 exp->exp_obd->obd_name, -EINVAL);
118 RETURN(-EINVAL);
119 }
120 }
121
122 lsm_size = lov_stripe_md_size(1);
123 if (lsmp == NULL)
124 RETURN(lsm_size);
125
126 if (*lsmp != NULL && lmm == NULL) {
127 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128 OBD_FREE(*lsmp, lsm_size);
129 *lsmp = NULL;
130 RETURN(0);
131 }
132
133 if (*lsmp == NULL) {
134 OBD_ALLOC(*lsmp, lsm_size);
135 if (unlikely(*lsmp == NULL))
136 RETURN(-ENOMEM);
137 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139 OBD_FREE(*lsmp, lsm_size);
140 RETURN(-ENOMEM);
141 }
142 loi_init((*lsmp)->lsm_oinfo[0]);
143 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
144 RETURN(-EBADF);
145 }
146
147 if (lmm != NULL)
148 /* XXX zero *lsmp? */
149 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150
151 if (imp != NULL &&
152 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
154 else
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157 RETURN(lsm_size);
158}
159
160static inline void osc_pack_capa(struct ptlrpc_request *req,
161 struct ost_body *body, void *capa)
162{
163 struct obd_capa *oc = (struct obd_capa *)capa;
164 struct lustre_capa *c;
165
166 if (!capa)
167 return;
168
169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170 LASSERT(c);
171 capa_cpy(c, oc);
172 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173 DEBUG_CAPA(D_SEC, c, "pack");
174}
175
176static inline void osc_pack_req_body(struct ptlrpc_request *req,
177 struct obd_info *oinfo)
178{
179 struct ost_body *body;
180
181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182 LASSERT(body);
183
184 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
185 osc_pack_capa(req, body, oinfo->oi_capa);
186}
187
188static inline void osc_set_capa_size(struct ptlrpc_request *req,
189 const struct req_msg_field *field,
190 struct obd_capa *oc)
191{
192 if (oc == NULL)
193 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194 else
195 /* it is already calculated as sizeof struct obd_capa */
196 ;
197}
198
199static int osc_getattr_interpret(const struct lu_env *env,
200 struct ptlrpc_request *req,
201 struct osc_async_args *aa, int rc)
202{
203 struct ost_body *body;
204 ENTRY;
205
206 if (rc != 0)
207 GOTO(out, rc);
208
209 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
210 if (body) {
211 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
212 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
213
214 /* This should really be sent by the OST */
215 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
216 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217 } else {
218 CDEBUG(D_INFO, "can't unpack ost_body\n");
219 rc = -EPROTO;
220 aa->aa_oi->oi_oa->o_valid = 0;
221 }
222out:
223 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
224 RETURN(rc);
225}
226
227static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
228 struct ptlrpc_request_set *set)
229{
230 struct ptlrpc_request *req;
231 struct osc_async_args *aa;
232 int rc;
233 ENTRY;
234
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
236 if (req == NULL)
237 RETURN(-ENOMEM);
238
239 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
240 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
241 if (rc) {
242 ptlrpc_request_free(req);
243 RETURN(rc);
244 }
245
246 osc_pack_req_body(req, oinfo);
247
248 ptlrpc_request_set_replen(req);
249 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
250
251 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
252 aa = ptlrpc_req_async_args(req);
253 aa->aa_oi = oinfo;
254
255 ptlrpc_set_add_req(set, req);
256 RETURN(0);
257}
258
259static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
260 struct obd_info *oinfo)
261{
262 struct ptlrpc_request *req;
263 struct ost_body *body;
264 int rc;
265 ENTRY;
266
267 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
268 if (req == NULL)
269 RETURN(-ENOMEM);
270
271 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
272 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
273 if (rc) {
274 ptlrpc_request_free(req);
275 RETURN(rc);
276 }
277
278 osc_pack_req_body(req, oinfo);
279
280 ptlrpc_request_set_replen(req);
281
282 rc = ptlrpc_queue_wait(req);
283 if (rc)
284 GOTO(out, rc);
285
286 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287 if (body == NULL)
288 GOTO(out, rc = -EPROTO);
289
290 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
291 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
292
293 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
294 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
295
296 EXIT;
297 out:
298 ptlrpc_req_finished(req);
299 return rc;
300}
301
302static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
303 struct obd_info *oinfo, struct obd_trans_info *oti)
304{
305 struct ptlrpc_request *req;
306 struct ost_body *body;
307 int rc;
308 ENTRY;
309
310 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
311
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313 if (req == NULL)
314 RETURN(-ENOMEM);
315
316 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
318 if (rc) {
319 ptlrpc_request_free(req);
320 RETURN(rc);
321 }
322
323 osc_pack_req_body(req, oinfo);
324
325 ptlrpc_request_set_replen(req);
326
327 rc = ptlrpc_queue_wait(req);
328 if (rc)
329 GOTO(out, rc);
330
331 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
332 if (body == NULL)
333 GOTO(out, rc = -EPROTO);
334
335 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
336
337 EXIT;
338out:
339 ptlrpc_req_finished(req);
340 RETURN(rc);
341}
342
343static int osc_setattr_interpret(const struct lu_env *env,
344 struct ptlrpc_request *req,
345 struct osc_setattr_args *sa, int rc)
346{
347 struct ost_body *body;
348 ENTRY;
349
350 if (rc != 0)
351 GOTO(out, rc);
352
353 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
354 if (body == NULL)
355 GOTO(out, rc = -EPROTO);
356
357 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
358out:
359 rc = sa->sa_upcall(sa->sa_cookie, rc);
360 RETURN(rc);
361}
362
363int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
364 struct obd_trans_info *oti,
365 obd_enqueue_update_f upcall, void *cookie,
366 struct ptlrpc_request_set *rqset)
367{
368 struct ptlrpc_request *req;
369 struct osc_setattr_args *sa;
370 int rc;
371 ENTRY;
372
373 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
374 if (req == NULL)
375 RETURN(-ENOMEM);
376
377 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
378 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
379 if (rc) {
380 ptlrpc_request_free(req);
381 RETURN(rc);
382 }
383
384 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
385 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
386
387 osc_pack_req_body(req, oinfo);
388
389 ptlrpc_request_set_replen(req);
390
391 /* do mds to ost setattr asynchronously */
392 if (!rqset) {
393 /* Do not wait for response. */
394 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
395 } else {
396 req->rq_interpret_reply =
397 (ptlrpc_interpterer_t)osc_setattr_interpret;
398
399 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
400 sa = ptlrpc_req_async_args(req);
401 sa->sa_oa = oinfo->oi_oa;
402 sa->sa_upcall = upcall;
403 sa->sa_cookie = cookie;
404
405 if (rqset == PTLRPCD_SET)
406 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
407 else
408 ptlrpc_set_add_req(rqset, req);
409 }
410
411 RETURN(0);
412}
413
414static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
415 struct obd_trans_info *oti,
416 struct ptlrpc_request_set *rqset)
417{
418 return osc_setattr_async_base(exp, oinfo, oti,
419 oinfo->oi_cb_up, oinfo, rqset);
420}
421
422int osc_real_create(struct obd_export *exp, struct obdo *oa,
423 struct lov_stripe_md **ea, struct obd_trans_info *oti)
424{
425 struct ptlrpc_request *req;
426 struct ost_body *body;
427 struct lov_stripe_md *lsm;
428 int rc;
429 ENTRY;
430
431 LASSERT(oa);
432 LASSERT(ea);
433
434 lsm = *ea;
435 if (!lsm) {
436 rc = obd_alloc_memmd(exp, &lsm);
437 if (rc < 0)
438 RETURN(rc);
439 }
440
441 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
442 if (req == NULL)
443 GOTO(out, rc = -ENOMEM);
444
445 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
446 if (rc) {
447 ptlrpc_request_free(req);
448 GOTO(out, rc);
449 }
450
451 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
452 LASSERT(body);
453 lustre_set_wire_obdo(&body->oa, oa);
454
455 ptlrpc_request_set_replen(req);
456
457 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
458 oa->o_flags == OBD_FL_DELORPHAN) {
459 DEBUG_REQ(D_HA, req,
460 "delorphan from OST integration");
461 /* Don't resend the delorphan req */
462 req->rq_no_resend = req->rq_no_delay = 1;
463 }
464
465 rc = ptlrpc_queue_wait(req);
466 if (rc)
467 GOTO(out_req, rc);
468
469 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470 if (body == NULL)
471 GOTO(out_req, rc = -EPROTO);
472
473 lustre_get_wire_obdo(oa, &body->oa);
474
475 oa->o_blksize = cli_brw_size(exp->exp_obd);
476 oa->o_valid |= OBD_MD_FLBLKSZ;
477
478 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
479 * have valid lsm_oinfo data structs, so don't go touching that.
480 * This needs to be fixed in a big way.
481 */
482 lsm->lsm_oi = oa->o_oi;
483 *ea = lsm;
484
485 if (oti != NULL) {
486 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
487
488 if (oa->o_valid & OBD_MD_FLCOOKIE) {
489 if (!oti->oti_logcookies)
490 oti_alloc_cookies(oti, 1);
491 *oti->oti_logcookies = oa->o_lcookie;
492 }
493 }
494
495 CDEBUG(D_HA, "transno: "LPD64"\n",
496 lustre_msg_get_transno(req->rq_repmsg));
497out_req:
498 ptlrpc_req_finished(req);
499out:
500 if (rc && !*ea)
501 obd_free_memmd(exp, &lsm);
502 RETURN(rc);
503}
504
505int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
506 obd_enqueue_update_f upcall, void *cookie,
507 struct ptlrpc_request_set *rqset)
508{
509 struct ptlrpc_request *req;
510 struct osc_setattr_args *sa;
511 struct ost_body *body;
512 int rc;
513 ENTRY;
514
515 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
516 if (req == NULL)
517 RETURN(-ENOMEM);
518
519 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
520 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
521 if (rc) {
522 ptlrpc_request_free(req);
523 RETURN(rc);
524 }
525 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
526 ptlrpc_at_set_req_timeout(req);
527
528 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529 LASSERT(body);
530 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
531 osc_pack_capa(req, body, oinfo->oi_capa);
532
533 ptlrpc_request_set_replen(req);
534
535 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
536 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
537 sa = ptlrpc_req_async_args(req);
538 sa->sa_oa = oinfo->oi_oa;
539 sa->sa_upcall = upcall;
540 sa->sa_cookie = cookie;
541 if (rqset == PTLRPCD_SET)
542 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
543 else
544 ptlrpc_set_add_req(rqset, req);
545
546 RETURN(0);
547}
548
549static int osc_punch(const struct lu_env *env, struct obd_export *exp,
550 struct obd_info *oinfo, struct obd_trans_info *oti,
551 struct ptlrpc_request_set *rqset)
552{
553 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
554 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
555 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
556 return osc_punch_base(exp, oinfo,
557 oinfo->oi_cb_up, oinfo, rqset);
558}
559
560static int osc_sync_interpret(const struct lu_env *env,
561 struct ptlrpc_request *req,
562 void *arg, int rc)
563{
564 struct osc_fsync_args *fa = arg;
565 struct ost_body *body;
566 ENTRY;
567
568 if (rc)
569 GOTO(out, rc);
570
571 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
572 if (body == NULL) {
573 CERROR ("can't unpack ost_body\n");
574 GOTO(out, rc = -EPROTO);
575 }
576
577 *fa->fa_oi->oi_oa = body->oa;
578out:
579 rc = fa->fa_upcall(fa->fa_cookie, rc);
580 RETURN(rc);
581}
582
583int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
584 obd_enqueue_update_f upcall, void *cookie,
585 struct ptlrpc_request_set *rqset)
586{
587 struct ptlrpc_request *req;
588 struct ost_body *body;
589 struct osc_fsync_args *fa;
590 int rc;
591 ENTRY;
592
593 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
594 if (req == NULL)
595 RETURN(-ENOMEM);
596
597 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
598 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
599 if (rc) {
600 ptlrpc_request_free(req);
601 RETURN(rc);
602 }
603
604 /* overload the size and blocks fields in the oa with start/end */
605 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
606 LASSERT(body);
607 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
608 osc_pack_capa(req, body, oinfo->oi_capa);
609
610 ptlrpc_request_set_replen(req);
611 req->rq_interpret_reply = osc_sync_interpret;
612
613 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
614 fa = ptlrpc_req_async_args(req);
615 fa->fa_oi = oinfo;
616 fa->fa_upcall = upcall;
617 fa->fa_cookie = cookie;
618
619 if (rqset == PTLRPCD_SET)
620 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
621 else
622 ptlrpc_set_add_req(rqset, req);
623
624 RETURN (0);
625}
626
627static int osc_sync(const struct lu_env *env, struct obd_export *exp,
628 struct obd_info *oinfo, obd_size start, obd_size end,
629 struct ptlrpc_request_set *set)
630{
631 ENTRY;
632
633 if (!oinfo->oi_oa) {
634 CDEBUG(D_INFO, "oa NULL\n");
635 RETURN(-EINVAL);
636 }
637
638 oinfo->oi_oa->o_size = start;
639 oinfo->oi_oa->o_blocks = end;
640 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
641
642 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
643}
644
645/* Find and cancel locally locks matched by @mode in the resource found by
646 * @objid. Found locks are added into @cancel list. Returns the amount of
647 * locks added to @cancels list. */
648static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
649 struct list_head *cancels,
650 ldlm_mode_t mode, int lock_flags)
651{
652 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
653 struct ldlm_res_id res_id;
654 struct ldlm_resource *res;
655 int count;
656 ENTRY;
657
658 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
659 * export) but disabled through procfs (flag in NS).
660 *
661 * This distinguishes from a case when ELC is not supported originally,
662 * when we still want to cancel locks in advance and just cancel them
663 * locally, without sending any RPC. */
664 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
665 RETURN(0);
666
667 ostid_build_res_name(&oa->o_oi, &res_id);
668 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
669 if (res == NULL)
670 RETURN(0);
671
672 LDLM_RESOURCE_ADDREF(res);
673 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
674 lock_flags, 0, NULL);
675 LDLM_RESOURCE_DELREF(res);
676 ldlm_resource_putref(res);
677 RETURN(count);
678}
679
680static int osc_destroy_interpret(const struct lu_env *env,
681 struct ptlrpc_request *req, void *data,
682 int rc)
683{
684 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
685
686 atomic_dec(&cli->cl_destroy_in_flight);
687 wake_up(&cli->cl_destroy_waitq);
688 return 0;
689}
690
691static int osc_can_send_destroy(struct client_obd *cli)
692{
693 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
694 cli->cl_max_rpcs_in_flight) {
695 /* The destroy request can be sent */
696 return 1;
697 }
698 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
699 cli->cl_max_rpcs_in_flight) {
700 /*
701 * The counter has been modified between the two atomic
702 * operations.
703 */
704 wake_up(&cli->cl_destroy_waitq);
705 }
706 return 0;
707}
708
709int osc_create(const struct lu_env *env, struct obd_export *exp,
710 struct obdo *oa, struct lov_stripe_md **ea,
711 struct obd_trans_info *oti)
712{
713 int rc = 0;
714 ENTRY;
715
716 LASSERT(oa);
717 LASSERT(ea);
718 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
719
720 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
721 oa->o_flags == OBD_FL_RECREATE_OBJS) {
722 RETURN(osc_real_create(exp, oa, ea, oti));
723 }
724
725 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
726 RETURN(osc_real_create(exp, oa, ea, oti));
727
728 /* we should not get here anymore */
729 LBUG();
730
731 RETURN(rc);
732}
733
734/* Destroy requests can be async always on the client, and we don't even really
735 * care about the return code since the client cannot do anything at all about
736 * a destroy failure.
737 * When the MDS is unlinking a filename, it saves the file objects into a
738 * recovery llog, and these object records are cancelled when the OST reports
739 * they were destroyed and sync'd to disk (i.e. transaction committed).
740 * If the client dies, or the OST is down when the object should be destroyed,
741 * the records are not cancelled, and when the OST reconnects to the MDS next,
742 * it will retrieve the llog unlink logs and then sends the log cancellation
743 * cookies to the MDS after committing destroy transactions. */
744static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
745 struct obdo *oa, struct lov_stripe_md *ea,
746 struct obd_trans_info *oti, struct obd_export *md_export,
747 void *capa)
748{
749 struct client_obd *cli = &exp->exp_obd->u.cli;
750 struct ptlrpc_request *req;
751 struct ost_body *body;
752 LIST_HEAD(cancels);
753 int rc, count;
754 ENTRY;
755
756 if (!oa) {
757 CDEBUG(D_INFO, "oa NULL\n");
758 RETURN(-EINVAL);
759 }
760
761 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
762 LDLM_FL_DISCARD_DATA);
763
764 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
765 if (req == NULL) {
766 ldlm_lock_list_put(&cancels, l_bl_ast, count);
767 RETURN(-ENOMEM);
768 }
769
770 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
771 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
772 0, &cancels, count);
773 if (rc) {
774 ptlrpc_request_free(req);
775 RETURN(rc);
776 }
777
778 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
779 ptlrpc_at_set_req_timeout(req);
780
781 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
782 oa->o_lcookie = *oti->oti_logcookies;
783 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
784 LASSERT(body);
785 lustre_set_wire_obdo(&body->oa, oa);
786
787 osc_pack_capa(req, body, (struct obd_capa *)capa);
788 ptlrpc_request_set_replen(req);
789
790 /* If osc_destory is for destroying the unlink orphan,
791 * sent from MDT to OST, which should not be blocked here,
792 * because the process might be triggered by ptlrpcd, and
793 * it is not good to block ptlrpcd thread (b=16006)*/
794 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
795 req->rq_interpret_reply = osc_destroy_interpret;
796 if (!osc_can_send_destroy(cli)) {
797 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
798 NULL);
799
800 /*
801 * Wait until the number of on-going destroy RPCs drops
802 * under max_rpc_in_flight
803 */
804 l_wait_event_exclusive(cli->cl_destroy_waitq,
805 osc_can_send_destroy(cli), &lwi);
806 }
807 }
808
809 /* Do not wait for response */
810 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
811 RETURN(0);
812}
813
814static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
815 long writing_bytes)
816{
817 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
818
819 LASSERT(!(oa->o_valid & bits));
820
821 oa->o_valid |= bits;
822 client_obd_list_lock(&cli->cl_loi_list_lock);
823 oa->o_dirty = cli->cl_dirty;
824 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
825 cli->cl_dirty_max)) {
826 CERROR("dirty %lu - %lu > dirty_max %lu\n",
827 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
828 oa->o_undirty = 0;
829 } else if (unlikely(atomic_read(&obd_unstable_pages) +
830 atomic_read(&obd_dirty_pages) -
831 atomic_read(&obd_dirty_transit_pages) >
832 (long)(obd_max_dirty_pages + 1))) {
833 /* The atomic_read() allowing the atomic_inc() are
834 * not covered by a lock thus they may safely race and trip
835 * this CERROR() unless we add in a small fudge factor (+1). */
836 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
837 cli->cl_import->imp_obd->obd_name,
838 atomic_read(&obd_unstable_pages),
839 atomic_read(&obd_dirty_pages),
840 atomic_read(&obd_dirty_transit_pages),
841 obd_max_dirty_pages);
842 oa->o_undirty = 0;
843 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
844 CERROR("dirty %lu - dirty_max %lu too big???\n",
845 cli->cl_dirty, cli->cl_dirty_max);
846 oa->o_undirty = 0;
847 } else {
848 long max_in_flight = (cli->cl_max_pages_per_rpc <<
849 PAGE_CACHE_SHIFT)*
850 (cli->cl_max_rpcs_in_flight + 1);
851 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
852 }
853 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
854 oa->o_dropped = cli->cl_lost_grant;
855 cli->cl_lost_grant = 0;
856 client_obd_list_unlock(&cli->cl_loi_list_lock);
857 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
858 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
859
860}
861
862void osc_update_next_shrink(struct client_obd *cli)
863{
864 cli->cl_next_shrink_grant =
865 cfs_time_shift(cli->cl_grant_shrink_interval);
866 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
867 cli->cl_next_shrink_grant);
868}
869
870static void __osc_update_grant(struct client_obd *cli, obd_size grant)
871{
872 client_obd_list_lock(&cli->cl_loi_list_lock);
873 cli->cl_avail_grant += grant;
874 client_obd_list_unlock(&cli->cl_loi_list_lock);
875}
876
877static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
878{
879 if (body->oa.o_valid & OBD_MD_FLGRANT) {
880 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
881 __osc_update_grant(cli, body->oa.o_grant);
882 }
883}
884
885static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
886 obd_count keylen, void *key, obd_count vallen,
887 void *val, struct ptlrpc_request_set *set);
888
889static int osc_shrink_grant_interpret(const struct lu_env *env,
890 struct ptlrpc_request *req,
891 void *aa, int rc)
892{
893 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
894 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
895 struct ost_body *body;
896
897 if (rc != 0) {
898 __osc_update_grant(cli, oa->o_grant);
899 GOTO(out, rc);
900 }
901
902 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
903 LASSERT(body);
904 osc_update_grant(cli, body);
905out:
906 OBDO_FREE(oa);
907 return rc;
908}
909
910static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
911{
912 client_obd_list_lock(&cli->cl_loi_list_lock);
913 oa->o_grant = cli->cl_avail_grant / 4;
914 cli->cl_avail_grant -= oa->o_grant;
915 client_obd_list_unlock(&cli->cl_loi_list_lock);
916 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
917 oa->o_valid |= OBD_MD_FLFLAGS;
918 oa->o_flags = 0;
919 }
920 oa->o_flags |= OBD_FL_SHRINK_GRANT;
921 osc_update_next_shrink(cli);
922}
923
924/* Shrink the current grant, either from some large amount to enough for a
925 * full set of in-flight RPCs, or if we have already shrunk to that limit
926 * then to enough for a single RPC. This avoids keeping more grant than
927 * needed, and avoids shrinking the grant piecemeal. */
928static int osc_shrink_grant(struct client_obd *cli)
929{
930 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
931 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
932
933 client_obd_list_lock(&cli->cl_loi_list_lock);
934 if (cli->cl_avail_grant <= target_bytes)
935 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
936 client_obd_list_unlock(&cli->cl_loi_list_lock);
937
938 return osc_shrink_grant_to_target(cli, target_bytes);
939}
940
941int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
942{
943 int rc = 0;
944 struct ost_body *body;
945 ENTRY;
946
947 client_obd_list_lock(&cli->cl_loi_list_lock);
948 /* Don't shrink if we are already above or below the desired limit
949 * We don't want to shrink below a single RPC, as that will negatively
950 * impact block allocation and long-term performance. */
951 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
952 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
953
954 if (target_bytes >= cli->cl_avail_grant) {
955 client_obd_list_unlock(&cli->cl_loi_list_lock);
956 RETURN(0);
957 }
958 client_obd_list_unlock(&cli->cl_loi_list_lock);
959
960 OBD_ALLOC_PTR(body);
961 if (!body)
962 RETURN(-ENOMEM);
963
964 osc_announce_cached(cli, &body->oa, 0);
965
966 client_obd_list_lock(&cli->cl_loi_list_lock);
967 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
968 cli->cl_avail_grant = target_bytes;
969 client_obd_list_unlock(&cli->cl_loi_list_lock);
970 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
971 body->oa.o_valid |= OBD_MD_FLFLAGS;
972 body->oa.o_flags = 0;
973 }
974 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
975 osc_update_next_shrink(cli);
976
977 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
978 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
979 sizeof(*body), body, NULL);
980 if (rc != 0)
981 __osc_update_grant(cli, body->oa.o_grant);
982 OBD_FREE_PTR(body);
983 RETURN(rc);
984}
985
986static int osc_should_shrink_grant(struct client_obd *client)
987{
988 cfs_time_t time = cfs_time_current();
989 cfs_time_t next_shrink = client->cl_next_shrink_grant;
990
991 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
992 OBD_CONNECT_GRANT_SHRINK) == 0)
993 return 0;
994
995 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
996 /* Get the current RPC size directly, instead of going via:
997 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
998 * Keep comment here so that it can be found by searching. */
999 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1000
1001 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002 client->cl_avail_grant > brw_size)
1003 return 1;
1004 else
1005 osc_update_next_shrink(client);
1006 }
1007 return 0;
1008}
1009
1010static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011{
1012 struct client_obd *client;
1013
1014 list_for_each_entry(client, &item->ti_obd_list,
1015 cl_grant_shrink_list) {
1016 if (osc_should_shrink_grant(client))
1017 osc_shrink_grant(client);
1018 }
1019 return 0;
1020}
1021
1022static int osc_add_shrink_grant(struct client_obd *client)
1023{
1024 int rc;
1025
1026 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027 TIMEOUT_GRANT,
1028 osc_grant_shrink_grant_cb, NULL,
1029 &client->cl_grant_shrink_list);
1030 if (rc) {
1031 CERROR("add grant client %s error %d\n",
1032 client->cl_import->imp_obd->obd_name, rc);
1033 return rc;
1034 }
1035 CDEBUG(D_CACHE, "add grant client %s \n",
1036 client->cl_import->imp_obd->obd_name);
1037 osc_update_next_shrink(client);
1038 return 0;
1039}
1040
1041static int osc_del_shrink_grant(struct client_obd *client)
1042{
1043 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044 TIMEOUT_GRANT);
1045}
1046
1047static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048{
1049 /*
1050 * ocd_grant is the total grant amount we're expect to hold: if we've
1051 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053 *
1054 * race is tolerable here: if we're evicted, but imp_state already
1055 * left EVICTED state, then cl_dirty must be 0 already.
1056 */
1057 client_obd_list_lock(&cli->cl_loi_list_lock);
1058 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059 cli->cl_avail_grant = ocd->ocd_grant;
1060 else
1061 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063 if (cli->cl_avail_grant < 0) {
1064 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1065 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1066 ocd->ocd_grant, cli->cl_dirty);
1067 /* workaround for servers which do not have the patch from
1068 * LU-2679 */
1069 cli->cl_avail_grant = ocd->ocd_grant;
1070 }
1071
1072 /* determine the appropriate chunk size used by osc_extent. */
1073 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1074 client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081 list_empty(&cli->cl_grant_shrink_list))
1082 osc_add_shrink_grant(cli);
1083}
1084
1085/* We assume that the reason this OSC got a short read is because it read
1086 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088 * this stripe never got written at or beyond this stripe offset yet. */
1089static void handle_short_read(int nob_read, obd_count page_count,
1090 struct brw_page **pga)
1091{
1092 char *ptr;
1093 int i = 0;
1094
1095 /* skip bytes read OK */
1096 while (nob_read > 0) {
1097 LASSERT (page_count > 0);
1098
1099 if (pga[i]->count > nob_read) {
1100 /* EOF inside this page */
1101 ptr = kmap(pga[i]->pg) +
1102 (pga[i]->off & ~CFS_PAGE_MASK);
1103 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104 kunmap(pga[i]->pg);
1105 page_count--;
1106 i++;
1107 break;
1108 }
1109
1110 nob_read -= pga[i]->count;
1111 page_count--;
1112 i++;
1113 }
1114
1115 /* zero remaining pages */
1116 while (page_count-- > 0) {
1117 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118 memset(ptr, 0, pga[i]->count);
1119 kunmap(pga[i]->pg);
1120 i++;
1121 }
1122}
1123
1124static int check_write_rcs(struct ptlrpc_request *req,
1125 int requested_nob, int niocount,
1126 obd_count page_count, struct brw_page **pga)
1127{
1128 int i;
1129 __u32 *remote_rcs;
1130
1131 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132 sizeof(*remote_rcs) *
1133 niocount);
1134 if (remote_rcs == NULL) {
1135 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136 return(-EPROTO);
1137 }
1138
1139 /* return error if any niobuf was in error */
1140 for (i = 0; i < niocount; i++) {
1141 if ((int)remote_rcs[i] < 0)
1142 return(remote_rcs[i]);
1143
1144 if (remote_rcs[i] != 0) {
1145 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146 i, remote_rcs[i], req);
1147 return(-EPROTO);
1148 }
1149 }
1150
1151 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153 req->rq_bulk->bd_nob_transferred, requested_nob);
1154 return(-EPROTO);
1155 }
1156
1157 return (0);
1158}
1159
1160static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161{
1162 if (p1->flag != p2->flag) {
1163 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166 /* warn if we try to combine flags that we don't know to be
1167 * safe to combine */
1168 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170 "report this at http://bugs.whamcloud.com/\n",
1171 p1->flag, p2->flag);
1172 }
1173 return 0;
1174 }
1175
1176 return (p1->off + p1->count == p2->off);
1177}
1178
1179static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180 struct brw_page **pga, int opc,
1181 cksum_type_t cksum_type)
1182{
1183 __u32 cksum;
1184 int i = 0;
1185 struct cfs_crypto_hash_desc *hdesc;
1186 unsigned int bufsize;
1187 int err;
1188 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190 LASSERT(pg_count > 0);
1191
1192 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193 if (IS_ERR(hdesc)) {
1194 CERROR("Unable to initialize checksum hash %s\n",
1195 cfs_crypto_hash_name(cfs_alg));
1196 return PTR_ERR(hdesc);
1197 }
1198
1199 while (nob > 0 && pg_count > 0) {
1200 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202 /* corrupt the data before we compute the checksum, to
1203 * simulate an OST->client data error */
1204 if (i == 0 && opc == OST_READ &&
1205 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206 unsigned char *ptr = kmap(pga[i]->pg);
1207 int off = pga[i]->off & ~CFS_PAGE_MASK;
1208 memcpy(ptr + off, "bad1", min(4, nob));
1209 kunmap(pga[i]->pg);
1210 }
1211 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212 pga[i]->off & ~CFS_PAGE_MASK,
1213 count);
1214 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1215 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1216
1217 nob -= pga[i]->count;
1218 pg_count--;
1219 i++;
1220 }
1221
1222 bufsize = 4;
1223 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225 if (err)
1226 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228 /* For sending we only compute the wrong checksum instead
1229 * of corrupting the data so it is still correct on a redo */
1230 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231 cksum++;
1232
1233 return cksum;
1234}
1235
1236static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237 struct lov_stripe_md *lsm, obd_count page_count,
1238 struct brw_page **pga,
1239 struct ptlrpc_request **reqp,
1240 struct obd_capa *ocapa, int reserve,
1241 int resend)
1242{
1243 struct ptlrpc_request *req;
1244 struct ptlrpc_bulk_desc *desc;
1245 struct ost_body *body;
1246 struct obd_ioobj *ioobj;
1247 struct niobuf_remote *niobuf;
1248 int niocount, i, requested_nob, opc, rc;
1249 struct osc_brw_async_args *aa;
1250 struct req_capsule *pill;
1251 struct brw_page *pg_prev;
1252
1253 ENTRY;
1254 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255 RETURN(-ENOMEM); /* Recoverable */
1256 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257 RETURN(-EINVAL); /* Fatal */
1258
1259 if ((cmd & OBD_BRW_WRITE) != 0) {
1260 opc = OST_WRITE;
1261 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262 cli->cl_import->imp_rq_pool,
1263 &RQF_OST_BRW_WRITE);
1264 } else {
1265 opc = OST_READ;
1266 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267 }
1268 if (req == NULL)
1269 RETURN(-ENOMEM);
1270
1271 for (niocount = i = 1; i < page_count; i++) {
1272 if (!can_merge_pages(pga[i - 1], pga[i]))
1273 niocount++;
1274 }
1275
1276 pill = &req->rq_pill;
1277 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278 sizeof(*ioobj));
1279 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280 niocount * sizeof(*niobuf));
1281 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284 if (rc) {
1285 ptlrpc_request_free(req);
1286 RETURN(rc);
1287 }
1288 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289 ptlrpc_at_set_req_timeout(req);
1290 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291 * retry logic */
1292 req->rq_no_retry_einprogress = 1;
1293
1294 desc = ptlrpc_prep_bulk_imp(req, page_count,
1295 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1296 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1297 OST_BULK_PORTAL);
1298
1299 if (desc == NULL)
1300 GOTO(out, rc = -ENOMEM);
1301 /* NB request now owns desc and will free it when it gets freed */
1302
1303 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1307
1308 lustre_set_wire_obdo(&body->oa, oa);
1309
1310 obdo_to_ioobj(oa, ioobj);
1311 ioobj->ioo_bufcnt = niocount;
1312 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1313 * that might be send for this request. The actual number is decided
1314 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1315 * "max - 1" for old client compatibility sending "0", and also so the
1316 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1317 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1318 osc_pack_capa(req, body, ocapa);
1319 LASSERT(page_count > 0);
1320 pg_prev = pga[0];
1321 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1322 struct brw_page *pg = pga[i];
1323 int poff = pg->off & ~CFS_PAGE_MASK;
1324
1325 LASSERT(pg->count > 0);
1326 /* make sure there is no gap in the middle of page array */
1327 LASSERTF(page_count == 1 ||
1328 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1329 ergo(i > 0 && i < page_count - 1,
1330 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1331 ergo(i == page_count - 1, poff == 0)),
1332 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1333 i, page_count, pg, pg->off, pg->count);
1334 LASSERTF(i == 0 || pg->off > pg_prev->off,
1335 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1336 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1337 i, page_count,
1338 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1339 pg_prev->pg, page_private(pg_prev->pg),
1340 pg_prev->pg->index, pg_prev->off);
1341 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1342 (pg->flag & OBD_BRW_SRVLOCK));
1343
1344 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1345 requested_nob += pg->count;
1346
1347 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1348 niobuf--;
1349 niobuf->len += pg->count;
1350 } else {
1351 niobuf->offset = pg->off;
1352 niobuf->len = pg->count;
1353 niobuf->flags = pg->flag;
1354 }
1355 pg_prev = pg;
1356 }
1357
1358 LASSERTF((void *)(niobuf - niocount) ==
1359 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1360 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1361 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1362
1363 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1364 if (resend) {
1365 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1366 body->oa.o_valid |= OBD_MD_FLFLAGS;
1367 body->oa.o_flags = 0;
1368 }
1369 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1370 }
1371
1372 if (osc_should_shrink_grant(cli))
1373 osc_shrink_grant_local(cli, &body->oa);
1374
1375 /* size[REQ_REC_OFF] still sizeof (*body) */
1376 if (opc == OST_WRITE) {
1377 if (cli->cl_checksum &&
1378 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1379 /* store cl_cksum_type in a local variable since
1380 * it can be changed via lprocfs */
1381 cksum_type_t cksum_type = cli->cl_cksum_type;
1382
1383 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1384 oa->o_flags &= OBD_FL_LOCAL_MASK;
1385 body->oa.o_flags = 0;
1386 }
1387 body->oa.o_flags |= cksum_type_pack(cksum_type);
1388 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1389 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1390 page_count, pga,
1391 OST_WRITE,
1392 cksum_type);
1393 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1394 body->oa.o_cksum);
1395 /* save this in 'oa', too, for later checking */
1396 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397 oa->o_flags |= cksum_type_pack(cksum_type);
1398 } else {
1399 /* clear out the checksum flag, in case this is a
1400 * resend but cl_checksum is no longer set. b=11238 */
1401 oa->o_valid &= ~OBD_MD_FLCKSUM;
1402 }
1403 oa->o_cksum = body->oa.o_cksum;
1404 /* 1 RC per niobuf */
1405 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1406 sizeof(__u32) * niocount);
1407 } else {
1408 if (cli->cl_checksum &&
1409 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1410 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1411 body->oa.o_flags = 0;
1412 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1413 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1414 }
1415 }
1416 ptlrpc_request_set_replen(req);
1417
1418 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1419 aa = ptlrpc_req_async_args(req);
1420 aa->aa_oa = oa;
1421 aa->aa_requested_nob = requested_nob;
1422 aa->aa_nio_count = niocount;
1423 aa->aa_page_count = page_count;
1424 aa->aa_resends = 0;
1425 aa->aa_ppga = pga;
1426 aa->aa_cli = cli;
1427 INIT_LIST_HEAD(&aa->aa_oaps);
1428 if (ocapa && reserve)
1429 aa->aa_ocapa = capa_get(ocapa);
1430
1431 *reqp = req;
1432 RETURN(0);
1433
1434 out:
1435 ptlrpc_req_finished(req);
1436 RETURN(rc);
1437}
1438
1439static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1440 __u32 client_cksum, __u32 server_cksum, int nob,
1441 obd_count page_count, struct brw_page **pga,
1442 cksum_type_t client_cksum_type)
1443{
1444 __u32 new_cksum;
1445 char *msg;
1446 cksum_type_t cksum_type;
1447
1448 if (server_cksum == client_cksum) {
1449 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1450 return 0;
1451 }
1452
1453 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1454 oa->o_flags : 0);
1455 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1456 cksum_type);
1457
1458 if (cksum_type != client_cksum_type)
1459 msg = "the server did not use the checksum type specified in "
1460 "the original request - likely a protocol problem";
1461 else if (new_cksum == server_cksum)
1462 msg = "changed on the client after we checksummed it - "
1463 "likely false positive due to mmap IO (bug 11742)";
1464 else if (new_cksum == client_cksum)
1465 msg = "changed in transit before arrival at OST";
1466 else
1467 msg = "changed in transit AND doesn't match the original - "
1468 "likely false positive due to mmap IO (bug 11742)";
1469
1470 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1471 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1472 msg, libcfs_nid2str(peer->nid),
1473 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1474 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1475 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1476 POSTID(&oa->o_oi), pga[0]->off,
1477 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1478 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1479 "client csum now %x\n", client_cksum, client_cksum_type,
1480 server_cksum, cksum_type, new_cksum);
1481 return 1;
1482}
1483
1484/* Note rc enters this function as number of bytes transferred */
1485static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1486{
1487 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1488 const lnet_process_id_t *peer =
1489 &req->rq_import->imp_connection->c_peer;
1490 struct client_obd *cli = aa->aa_cli;
1491 struct ost_body *body;
1492 __u32 client_cksum = 0;
1493 ENTRY;
1494
1495 if (rc < 0 && rc != -EDQUOT) {
1496 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1497 RETURN(rc);
1498 }
1499
1500 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1501 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1502 if (body == NULL) {
1503 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1504 RETURN(-EPROTO);
1505 }
1506
1507 /* set/clear over quota flag for a uid/gid */
1508 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1509 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1510 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1511
1512 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1513 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1514 body->oa.o_flags);
1515 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1516 }
1517
1518 osc_update_grant(cli, body);
1519
1520 if (rc < 0)
1521 RETURN(rc);
1522
1523 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1524 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1525
1526 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1527 if (rc > 0) {
1528 CERROR("Unexpected +ve rc %d\n", rc);
1529 RETURN(-EPROTO);
1530 }
1531 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1532
1533 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1534 RETURN(-EAGAIN);
1535
1536 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1537 check_write_checksum(&body->oa, peer, client_cksum,
1538 body->oa.o_cksum, aa->aa_requested_nob,
1539 aa->aa_page_count, aa->aa_ppga,
1540 cksum_type_unpack(aa->aa_oa->o_flags)))
1541 RETURN(-EAGAIN);
1542
1543 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1544 aa->aa_page_count, aa->aa_ppga);
1545 GOTO(out, rc);
1546 }
1547
1548 /* The rest of this function executes only for OST_READs */
1549
1550 /* if unwrap_bulk failed, return -EAGAIN to retry */
1551 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1552 if (rc < 0)
1553 GOTO(out, rc = -EAGAIN);
1554
1555 if (rc > aa->aa_requested_nob) {
1556 CERROR("Unexpected rc %d (%d requested)\n", rc,
1557 aa->aa_requested_nob);
1558 RETURN(-EPROTO);
1559 }
1560
1561 if (rc != req->rq_bulk->bd_nob_transferred) {
1562 CERROR ("Unexpected rc %d (%d transferred)\n",
1563 rc, req->rq_bulk->bd_nob_transferred);
1564 return (-EPROTO);
1565 }
1566
1567 if (rc < aa->aa_requested_nob)
1568 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1569
1570 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1571 static int cksum_counter;
1572 __u32 server_cksum = body->oa.o_cksum;
1573 char *via;
1574 char *router;
1575 cksum_type_t cksum_type;
1576
1577 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1578 body->oa.o_flags : 0);
1579 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1580 aa->aa_ppga, OST_READ,
1581 cksum_type);
1582
1583 if (peer->nid == req->rq_bulk->bd_sender) {
1584 via = router = "";
1585 } else {
1586 via = " via ";
1587 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1588 }
1589
1590 if (server_cksum == ~0 && rc > 0) {
1591 CERROR("Protocol error: server %s set the 'checksum' "
1592 "bit, but didn't send a checksum. Not fatal, "
1593 "but please notify on http://bugs.whamcloud.com/\n",
1594 libcfs_nid2str(peer->nid));
1595 } else if (server_cksum != client_cksum) {
1596 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1597 "%s%s%s inode "DFID" object "DOSTID
1598 " extent ["LPU64"-"LPU64"]\n",
1599 req->rq_import->imp_obd->obd_name,
1600 libcfs_nid2str(peer->nid),
1601 via, router,
1602 body->oa.o_valid & OBD_MD_FLFID ?
1603 body->oa.o_parent_seq : (__u64)0,
1604 body->oa.o_valid & OBD_MD_FLFID ?
1605 body->oa.o_parent_oid : 0,
1606 body->oa.o_valid & OBD_MD_FLFID ?
1607 body->oa.o_parent_ver : 0,
1608 POSTID(&body->oa.o_oi),
1609 aa->aa_ppga[0]->off,
1610 aa->aa_ppga[aa->aa_page_count-1]->off +
1611 aa->aa_ppga[aa->aa_page_count-1]->count -
1612 1);
1613 CERROR("client %x, server %x, cksum_type %x\n",
1614 client_cksum, server_cksum, cksum_type);
1615 cksum_counter = 0;
1616 aa->aa_oa->o_cksum = client_cksum;
1617 rc = -EAGAIN;
1618 } else {
1619 cksum_counter++;
1620 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1621 rc = 0;
1622 }
1623 } else if (unlikely(client_cksum)) {
1624 static int cksum_missed;
1625
1626 cksum_missed++;
1627 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1628 CERROR("Checksum %u requested from %s but not sent\n",
1629 cksum_missed, libcfs_nid2str(peer->nid));
1630 } else {
1631 rc = 0;
1632 }
1633out:
1634 if (rc >= 0)
1635 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1636
1637 RETURN(rc);
1638}
1639
1640static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1641 struct lov_stripe_md *lsm,
1642 obd_count page_count, struct brw_page **pga,
1643 struct obd_capa *ocapa)
1644{
1645 struct ptlrpc_request *req;
1646 int rc;
1647 wait_queue_head_t waitq;
1648 int generation, resends = 0;
1649 struct l_wait_info lwi;
1650
1651 ENTRY;
1652
1653 init_waitqueue_head(&waitq);
1654 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1655
1656restart_bulk:
1657 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1658 page_count, pga, &req, ocapa, 0, resends);
1659 if (rc != 0)
1660 return (rc);
1661
1662 if (resends) {
1663 req->rq_generation_set = 1;
1664 req->rq_import_generation = generation;
1665 req->rq_sent = cfs_time_current_sec() + resends;
1666 }
1667
1668 rc = ptlrpc_queue_wait(req);
1669
1670 if (rc == -ETIMEDOUT && req->rq_resend) {
1671 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1672 ptlrpc_req_finished(req);
1673 goto restart_bulk;
1674 }
1675
1676 rc = osc_brw_fini_request(req, rc);
1677
1678 ptlrpc_req_finished(req);
1679 /* When server return -EINPROGRESS, client should always retry
1680 * regardless of the number of times the bulk was resent already.*/
1681 if (osc_recoverable_error(rc)) {
1682 resends++;
1683 if (rc != -EINPROGRESS &&
1684 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1685 CERROR("%s: too many resend retries for object: "
1686 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1687 POSTID(&oa->o_oi), rc);
1688 goto out;
1689 }
1690 if (generation !=
1691 exp->exp_obd->u.cli.cl_import->imp_generation) {
1692 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1693 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694 POSTID(&oa->o_oi), rc);
1695 goto out;
1696 }
1697
1698 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1699 NULL);
1700 l_wait_event(waitq, 0, &lwi);
1701
1702 goto restart_bulk;
1703 }
1704out:
1705 if (rc == -EAGAIN || rc == -EINPROGRESS)
1706 rc = -EIO;
1707 RETURN (rc);
1708}
1709
1710static int osc_brw_redo_request(struct ptlrpc_request *request,
1711 struct osc_brw_async_args *aa, int rc)
1712{
1713 struct ptlrpc_request *new_req;
1714 struct osc_brw_async_args *new_aa;
1715 struct osc_async_page *oap;
1716 ENTRY;
1717
1718 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1719 "redo for recoverable error %d", rc);
1720
1721 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1722 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1723 aa->aa_cli, aa->aa_oa,
1724 NULL /* lsm unused by osc currently */,
1725 aa->aa_page_count, aa->aa_ppga,
1726 &new_req, aa->aa_ocapa, 0, 1);
1727 if (rc)
1728 RETURN(rc);
1729
1730 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1731 if (oap->oap_request != NULL) {
1732 LASSERTF(request == oap->oap_request,
1733 "request %p != oap_request %p\n",
1734 request, oap->oap_request);
1735 if (oap->oap_interrupted) {
1736 ptlrpc_req_finished(new_req);
1737 RETURN(-EINTR);
1738 }
1739 }
1740 }
1741 /* New request takes over pga and oaps from old request.
1742 * Note that copying a list_head doesn't work, need to move it... */
1743 aa->aa_resends++;
1744 new_req->rq_interpret_reply = request->rq_interpret_reply;
1745 new_req->rq_async_args = request->rq_async_args;
1746 new_req->rq_commit_cb = request->rq_commit_cb;
1747 /* cap resend delay to the current request timeout, this is similar to
1748 * what ptlrpc does (see after_reply()) */
1749 if (aa->aa_resends > new_req->rq_timeout)
1750 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1751 else
1752 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1753 new_req->rq_generation_set = 1;
1754 new_req->rq_import_generation = request->rq_import_generation;
1755
1756 new_aa = ptlrpc_req_async_args(new_req);
1757
1758 INIT_LIST_HEAD(&new_aa->aa_oaps);
1759 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1760 INIT_LIST_HEAD(&new_aa->aa_exts);
1761 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1762 new_aa->aa_resends = aa->aa_resends;
1763
1764 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1765 if (oap->oap_request) {
1766 ptlrpc_req_finished(oap->oap_request);
1767 oap->oap_request = ptlrpc_request_addref(new_req);
1768 }
1769 }
1770
1771 new_aa->aa_ocapa = aa->aa_ocapa;
1772 aa->aa_ocapa = NULL;
1773
1774 /* XXX: This code will run into problem if we're going to support
1775 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1776 * and wait for all of them to be finished. We should inherit request
1777 * set from old request. */
1778 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1779
1780 DEBUG_REQ(D_INFO, new_req, "new request");
1781 RETURN(0);
1782}
1783
1784/*
1785 * ugh, we want disk allocation on the target to happen in offset order. we'll
1786 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1787 * fine for our small page arrays and doesn't require allocation. its an
1788 * insertion sort that swaps elements that are strides apart, shrinking the
1789 * stride down until its '1' and the array is sorted.
1790 */
1791static void sort_brw_pages(struct brw_page **array, int num)
1792{
1793 int stride, i, j;
1794 struct brw_page *tmp;
1795
1796 if (num == 1)
1797 return;
1798 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1799 ;
1800
1801 do {
1802 stride /= 3;
1803 for (i = stride ; i < num ; i++) {
1804 tmp = array[i];
1805 j = i;
1806 while (j >= stride && array[j - stride]->off > tmp->off) {
1807 array[j] = array[j - stride];
1808 j -= stride;
1809 }
1810 array[j] = tmp;
1811 }
1812 } while (stride > 1);
1813}
1814
1815static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1816{
1817 int count = 1;
1818 int offset;
1819 int i = 0;
1820
1821 LASSERT (pages > 0);
1822 offset = pg[i]->off & ~CFS_PAGE_MASK;
1823
1824 for (;;) {
1825 pages--;
1826 if (pages == 0) /* that's all */
1827 return count;
1828
1829 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1830 return count; /* doesn't end on page boundary */
1831
1832 i++;
1833 offset = pg[i]->off & ~CFS_PAGE_MASK;
1834 if (offset != 0) /* doesn't start on page boundary */
1835 return count;
1836
1837 count++;
1838 }
1839}
1840
1841static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1842{
1843 struct brw_page **ppga;
1844 int i;
1845
1846 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1847 if (ppga == NULL)
1848 return NULL;
1849
1850 for (i = 0; i < count; i++)
1851 ppga[i] = pga + i;
1852 return ppga;
1853}
1854
1855static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1856{
1857 LASSERT(ppga != NULL);
1858 OBD_FREE(ppga, sizeof(*ppga) * count);
1859}
1860
1861static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1862 obd_count page_count, struct brw_page *pga,
1863 struct obd_trans_info *oti)
1864{
1865 struct obdo *saved_oa = NULL;
1866 struct brw_page **ppga, **orig;
1867 struct obd_import *imp = class_exp2cliimp(exp);
1868 struct client_obd *cli;
1869 int rc, page_count_orig;
1870 ENTRY;
1871
1872 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1873 cli = &imp->imp_obd->u.cli;
1874
1875 if (cmd & OBD_BRW_CHECK) {
1876 /* The caller just wants to know if there's a chance that this
1877 * I/O can succeed */
1878
1879 if (imp->imp_invalid)
1880 RETURN(-EIO);
1881 RETURN(0);
1882 }
1883
1884 /* test_brw with a failed create can trip this, maybe others. */
1885 LASSERT(cli->cl_max_pages_per_rpc);
1886
1887 rc = 0;
1888
1889 orig = ppga = osc_build_ppga(pga, page_count);
1890 if (ppga == NULL)
1891 RETURN(-ENOMEM);
1892 page_count_orig = page_count;
1893
1894 sort_brw_pages(ppga, page_count);
1895 while (page_count) {
1896 obd_count pages_per_brw;
1897
1898 if (page_count > cli->cl_max_pages_per_rpc)
1899 pages_per_brw = cli->cl_max_pages_per_rpc;
1900 else
1901 pages_per_brw = page_count;
1902
1903 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1904
1905 if (saved_oa != NULL) {
1906 /* restore previously saved oa */
1907 *oinfo->oi_oa = *saved_oa;
1908 } else if (page_count > pages_per_brw) {
1909 /* save a copy of oa (brw will clobber it) */
1910 OBDO_ALLOC(saved_oa);
1911 if (saved_oa == NULL)
1912 GOTO(out, rc = -ENOMEM);
1913 *saved_oa = *oinfo->oi_oa;
1914 }
1915
1916 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1917 pages_per_brw, ppga, oinfo->oi_capa);
1918
1919 if (rc != 0)
1920 break;
1921
1922 page_count -= pages_per_brw;
1923 ppga += pages_per_brw;
1924 }
1925
1926out:
1927 osc_release_ppga(orig, page_count_orig);
1928
1929 if (saved_oa != NULL)
1930 OBDO_FREE(saved_oa);
1931
1932 RETURN(rc);
1933}
1934
1935static int brw_interpret(const struct lu_env *env,
1936 struct ptlrpc_request *req, void *data, int rc)
1937{
1938 struct osc_brw_async_args *aa = data;
1939 struct osc_extent *ext;
1940 struct osc_extent *tmp;
1941 struct cl_object *obj = NULL;
1942 struct client_obd *cli = aa->aa_cli;
1943 ENTRY;
1944
1945 rc = osc_brw_fini_request(req, rc);
1946 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1947 /* When server return -EINPROGRESS, client should always retry
1948 * regardless of the number of times the bulk was resent already. */
1949 if (osc_recoverable_error(rc)) {
1950 if (req->rq_import_generation !=
1951 req->rq_import->imp_generation) {
1952 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1953 ""DOSTID", rc = %d.\n",
1954 req->rq_import->imp_obd->obd_name,
1955 POSTID(&aa->aa_oa->o_oi), rc);
1956 } else if (rc == -EINPROGRESS ||
1957 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1958 rc = osc_brw_redo_request(req, aa, rc);
1959 } else {
1960 CERROR("%s: too many resent retries for object: "
1961 ""LPU64":"LPU64", rc = %d.\n",
1962 req->rq_import->imp_obd->obd_name,
1963 POSTID(&aa->aa_oa->o_oi), rc);
1964 }
1965
1966 if (rc == 0)
1967 RETURN(0);
1968 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1969 rc = -EIO;
1970 }
1971
1972 if (aa->aa_ocapa) {
1973 capa_put(aa->aa_ocapa);
1974 aa->aa_ocapa = NULL;
1975 }
1976
1977 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1978 if (obj == NULL && rc == 0) {
1979 obj = osc2cl(ext->oe_obj);
1980 cl_object_get(obj);
1981 }
1982
1983 list_del_init(&ext->oe_link);
1984 osc_extent_finish(env, ext, 1, rc);
1985 }
1986 LASSERT(list_empty(&aa->aa_exts));
1987 LASSERT(list_empty(&aa->aa_oaps));
1988
1989 if (obj != NULL) {
1990 struct obdo *oa = aa->aa_oa;
1991 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1992 unsigned long valid = 0;
1993
1994 LASSERT(rc == 0);
1995 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1996 attr->cat_blocks = oa->o_blocks;
1997 valid |= CAT_BLOCKS;
1998 }
1999 if (oa->o_valid & OBD_MD_FLMTIME) {
2000 attr->cat_mtime = oa->o_mtime;
2001 valid |= CAT_MTIME;
2002 }
2003 if (oa->o_valid & OBD_MD_FLATIME) {
2004 attr->cat_atime = oa->o_atime;
2005 valid |= CAT_ATIME;
2006 }
2007 if (oa->o_valid & OBD_MD_FLCTIME) {
2008 attr->cat_ctime = oa->o_ctime;
2009 valid |= CAT_CTIME;
2010 }
2011 if (valid != 0) {
2012 cl_object_attr_lock(obj);
2013 cl_object_attr_set(env, obj, attr, valid);
2014 cl_object_attr_unlock(obj);
2015 }
2016 cl_object_put(env, obj);
2017 }
2018 OBDO_FREE(aa->aa_oa);
2019
2020 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2021 req->rq_bulk->bd_nob_transferred);
2022 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2023 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2024
2025 client_obd_list_lock(&cli->cl_loi_list_lock);
2026 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2027 * is called so we know whether to go to sync BRWs or wait for more
2028 * RPCs to complete */
2029 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2030 cli->cl_w_in_flight--;
2031 else
2032 cli->cl_r_in_flight--;
2033 osc_wake_cache_waiters(cli);
2034 client_obd_list_unlock(&cli->cl_loi_list_lock);
2035
2036 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2037 RETURN(rc);
2038}
2039
2040static void brw_commit(struct ptlrpc_request *req)
2041{
2042 spin_lock(&req->rq_lock);
2043 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2044 * this called via the rq_commit_cb, I need to ensure
2045 * osc_dec_unstable_pages is still called. Otherwise unstable
2046 * pages may be leaked. */
2047 if (req->rq_unstable)
2048 osc_dec_unstable_pages(req);
2049 else
2050 req->rq_committed = 1;
2051 spin_unlock(&req->rq_lock);
2052}
2053
2054/**
2055 * Build an RPC by the list of extent @ext_list. The caller must ensure
2056 * that the total pages in this list are NOT over max pages per RPC.
2057 * Extents in the list must be in OES_RPC state.
2058 */
2059int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2060 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2061{
2062 struct ptlrpc_request *req = NULL;
2063 struct osc_extent *ext;
2064 LIST_HEAD(rpc_list);
2065 struct brw_page **pga = NULL;
2066 struct osc_brw_async_args *aa = NULL;
2067 struct obdo *oa = NULL;
2068 struct osc_async_page *oap;
2069 struct osc_async_page *tmp;
2070 struct cl_req *clerq = NULL;
2071 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2072 struct ldlm_lock *lock = NULL;
2073 struct cl_req_attr crattr;
2074 obd_off starting_offset = OBD_OBJECT_EOF;
2075 obd_off ending_offset = 0;
2076 int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2077
2078 ENTRY;
2079 LASSERT(!list_empty(ext_list));
2080
2081 /* add pages into rpc_list to build BRW rpc */
2082 list_for_each_entry(ext, ext_list, oe_link) {
2083 LASSERT(ext->oe_state == OES_RPC);
2084 mem_tight |= ext->oe_memalloc;
2085 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2086 ++page_count;
2087 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2088 if (starting_offset > oap->oap_obj_off)
2089 starting_offset = oap->oap_obj_off;
2090 else
2091 LASSERT(oap->oap_page_off == 0);
2092 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2093 ending_offset = oap->oap_obj_off +
2094 oap->oap_count;
2095 else
2096 LASSERT(oap->oap_page_off + oap->oap_count ==
2097 PAGE_CACHE_SIZE);
2098 }
2099 }
2100
2101 if (mem_tight)
2102 mpflag = cfs_memory_pressure_get_and_set();
2103
2104 memset(&crattr, 0, sizeof crattr);
2105 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2106 if (pga == NULL)
2107 GOTO(out, rc = -ENOMEM);
2108
2109 OBDO_ALLOC(oa);
2110 if (oa == NULL)
2111 GOTO(out, rc = -ENOMEM);
2112
2113 i = 0;
2114 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115 struct cl_page *page = oap2cl_page(oap);
2116 if (clerq == NULL) {
2117 clerq = cl_req_alloc(env, page, crt,
2118 1 /* only 1-object rpcs for
2119 * now */);
2120 if (IS_ERR(clerq))
2121 GOTO(out, rc = PTR_ERR(clerq));
2122 lock = oap->oap_ldlm_lock;
2123 }
2124 if (mem_tight)
2125 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2126 pga[i] = &oap->oap_brw_page;
2127 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2128 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2129 pga[i]->pg, page_index(oap->oap_page), oap, pga[i]->flag);
2130 i++;
2131 cl_req_page_add(env, clerq, page);
2132 }
2133
2134 /* always get the data for the obdo for the rpc */
2135 LASSERT(clerq != NULL);
2136 crattr.cra_oa = oa;
2137 crattr.cra_capa = NULL;
2138 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2139 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2140 if (lock) {
2141 oa->o_handle = lock->l_remote_handle;
2142 oa->o_valid |= OBD_MD_FLHANDLE;
2143 }
2144
2145 rc = cl_req_prep(env, clerq);
2146 if (rc != 0) {
2147 CERROR("cl_req_prep failed: %d\n", rc);
2148 GOTO(out, rc);
2149 }
2150
2151 sort_brw_pages(pga, page_count);
2152 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2153 pga, &req, crattr.cra_capa, 1, 0);
2154 if (rc != 0) {
2155 CERROR("prep_req failed: %d\n", rc);
2156 GOTO(out, rc);
2157 }
2158
2159 req->rq_commit_cb = brw_commit;
2160 req->rq_interpret_reply = brw_interpret;
2161
2162 if (mem_tight != 0)
2163 req->rq_memalloc = 1;
2164
2165 /* Need to update the timestamps after the request is built in case
2166 * we race with setattr (locally or in queue at OST). If OST gets
2167 * later setattr before earlier BRW (as determined by the request xid),
2168 * the OST will not use BRW timestamps. Sadly, there is no obvious
2169 * way to do this in a single call. bug 10150 */
2170 cl_req_attr_set(env, clerq, &crattr,
2171 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2172
2173 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2174
2175 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2176 aa = ptlrpc_req_async_args(req);
2177 INIT_LIST_HEAD(&aa->aa_oaps);
2178 list_splice_init(&rpc_list, &aa->aa_oaps);
2179 INIT_LIST_HEAD(&aa->aa_exts);
2180 list_splice_init(ext_list, &aa->aa_exts);
2181 aa->aa_clerq = clerq;
2182
2183 /* queued sync pages can be torn down while the pages
2184 * were between the pending list and the rpc */
2185 tmp = NULL;
2186 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2187 /* only one oap gets a request reference */
2188 if (tmp == NULL)
2189 tmp = oap;
2190 if (oap->oap_interrupted && !req->rq_intr) {
2191 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2192 oap, req);
2193 ptlrpc_mark_interrupted(req);
2194 }
2195 }
2196 if (tmp != NULL)
2197 tmp->oap_request = ptlrpc_request_addref(req);
2198
2199 client_obd_list_lock(&cli->cl_loi_list_lock);
2200 starting_offset >>= PAGE_CACHE_SHIFT;
2201 if (cmd == OBD_BRW_READ) {
2202 cli->cl_r_in_flight++;
2203 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2204 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2205 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2206 starting_offset + 1);
2207 } else {
2208 cli->cl_w_in_flight++;
2209 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2210 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2211 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2212 starting_offset + 1);
2213 }
2214 client_obd_list_unlock(&cli->cl_loi_list_lock);
2215
2216 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2217 page_count, aa, cli->cl_r_in_flight,
2218 cli->cl_w_in_flight);
2219
2220 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2221 * see which CPU/NUMA node the majority of pages were allocated
2222 * on, and try to assign the async RPC to the CPU core
2223 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2224 *
2225 * But on the other hand, we expect that multiple ptlrpcd
2226 * threads and the initial write sponsor can run in parallel,
2227 * especially when data checksum is enabled, which is CPU-bound
2228 * operation and single ptlrpcd thread cannot process in time.
2229 * So more ptlrpcd threads sharing BRW load
2230 * (with PDL_POLICY_ROUND) seems better.
2231 */
2232 ptlrpcd_add_req(req, pol, -1);
2233 rc = 0;
2234 EXIT;
2235
2236out:
2237 if (mem_tight != 0)
2238 cfs_memory_pressure_restore(mpflag);
2239
2240 capa_put(crattr.cra_capa);
2241 if (rc != 0) {
2242 LASSERT(req == NULL);
2243
2244 if (oa)
2245 OBDO_FREE(oa);
2246 if (pga)
2247 OBD_FREE(pga, sizeof(*pga) * page_count);
2248 /* this should happen rarely and is pretty bad, it makes the
2249 * pending list not follow the dirty order */
2250 while (!list_empty(ext_list)) {
2251 ext = list_entry(ext_list->next, struct osc_extent,
2252 oe_link);
2253 list_del_init(&ext->oe_link);
2254 osc_extent_finish(env, ext, 0, rc);
2255 }
2256 if (clerq && !IS_ERR(clerq))
2257 cl_req_completion(env, clerq, rc);
2258 }
2259 RETURN(rc);
2260}
2261
2262static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2263 struct ldlm_enqueue_info *einfo)
2264{
2265 void *data = einfo->ei_cbdata;
2266 int set = 0;
2267
2268 LASSERT(lock != NULL);
2269 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2270 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2271 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2272 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2273
2274 lock_res_and_lock(lock);
2275 spin_lock(&osc_ast_guard);
2276
2277 if (lock->l_ast_data == NULL)
2278 lock->l_ast_data = data;
2279 if (lock->l_ast_data == data)
2280 set = 1;
2281
2282 spin_unlock(&osc_ast_guard);
2283 unlock_res_and_lock(lock);
2284
2285 return set;
2286}
2287
2288static int osc_set_data_with_check(struct lustre_handle *lockh,
2289 struct ldlm_enqueue_info *einfo)
2290{
2291 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2292 int set = 0;
2293
2294 if (lock != NULL) {
2295 set = osc_set_lock_data_with_check(lock, einfo);
2296 LDLM_LOCK_PUT(lock);
2297 } else
2298 CERROR("lockh %p, data %p - client evicted?\n",
2299 lockh, einfo->ei_cbdata);
2300 return set;
2301}
2302
2303static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2304 ldlm_iterator_t replace, void *data)
2305{
2306 struct ldlm_res_id res_id;
2307 struct obd_device *obd = class_exp2obd(exp);
2308
2309 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2310 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2311 return 0;
2312}
2313
2314/* find any ldlm lock of the inode in osc
2315 * return 0 not find
2316 * 1 find one
2317 * < 0 error */
2318static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2319 ldlm_iterator_t replace, void *data)
2320{
2321 struct ldlm_res_id res_id;
2322 struct obd_device *obd = class_exp2obd(exp);
2323 int rc = 0;
2324
2325 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2326 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2327 if (rc == LDLM_ITER_STOP)
2328 return(1);
2329 if (rc == LDLM_ITER_CONTINUE)
2330 return(0);
2331 return(rc);
2332}
2333
2334static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2335 obd_enqueue_update_f upcall, void *cookie,
2336 __u64 *flags, int agl, int rc)
2337{
2338 int intent = *flags & LDLM_FL_HAS_INTENT;
2339 ENTRY;
2340
2341 if (intent) {
2342 /* The request was created before ldlm_cli_enqueue call. */
2343 if (rc == ELDLM_LOCK_ABORTED) {
2344 struct ldlm_reply *rep;
2345 rep = req_capsule_server_get(&req->rq_pill,
2346 &RMF_DLM_REP);
2347
2348 LASSERT(rep != NULL);
2349 if (rep->lock_policy_res1)
2350 rc = rep->lock_policy_res1;
2351 }
2352 }
2353
2354 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2355 (rc == 0)) {
2356 *flags |= LDLM_FL_LVB_READY;
2357 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2358 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2359 }
2360
2361 /* Call the update callback. */
2362 rc = (*upcall)(cookie, rc);
2363 RETURN(rc);
2364}
2365
2366static int osc_enqueue_interpret(const struct lu_env *env,
2367 struct ptlrpc_request *req,
2368 struct osc_enqueue_args *aa, int rc)
2369{
2370 struct ldlm_lock *lock;
2371 struct lustre_handle handle;
2372 __u32 mode;
2373 struct ost_lvb *lvb;
2374 __u32 lvb_len;
2375 __u64 *flags = aa->oa_flags;
2376
2377 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2378 * might be freed anytime after lock upcall has been called. */
2379 lustre_handle_copy(&handle, aa->oa_lockh);
2380 mode = aa->oa_ei->ei_mode;
2381
2382 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2383 * be valid. */
2384 lock = ldlm_handle2lock(&handle);
2385
2386 /* Take an additional reference so that a blocking AST that
2387 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2388 * to arrive after an upcall has been executed by
2389 * osc_enqueue_fini(). */
2390 ldlm_lock_addref(&handle, mode);
2391
2392 /* Let CP AST to grant the lock first. */
2393 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2394
2395 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2396 lvb = NULL;
2397 lvb_len = 0;
2398 } else {
2399 lvb = aa->oa_lvb;
2400 lvb_len = sizeof(*aa->oa_lvb);
2401 }
2402
2403 /* Complete obtaining the lock procedure. */
2404 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2405 mode, flags, lvb, lvb_len, &handle, rc);
2406 /* Complete osc stuff. */
2407 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2408 flags, aa->oa_agl, rc);
2409
2410 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2411
2412 /* Release the lock for async request. */
2413 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2414 /*
2415 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2416 * not already released by
2417 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2418 */
2419 ldlm_lock_decref(&handle, mode);
2420
2421 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2422 aa->oa_lockh, req, aa);
2423 ldlm_lock_decref(&handle, mode);
2424 LDLM_LOCK_PUT(lock);
2425 return rc;
2426}
2427
2428void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2429 struct lov_oinfo *loi, int flags,
2430 struct ost_lvb *lvb, __u32 mode, int rc)
2431{
2432 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2433
2434 if (rc == ELDLM_OK) {
2435 __u64 tmp;
2436
2437 LASSERT(lock != NULL);
2438 loi->loi_lvb = *lvb;
2439 tmp = loi->loi_lvb.lvb_size;
2440 /* Extend KMS up to the end of this lock and no further
2441 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2442 if (tmp > lock->l_policy_data.l_extent.end)
2443 tmp = lock->l_policy_data.l_extent.end + 1;
2444 if (tmp >= loi->loi_kms) {
2445 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2446 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2447 loi_kms_set(loi, tmp);
2448 } else {
2449 LDLM_DEBUG(lock, "lock acquired, setting rss="
2450 LPU64"; leaving kms="LPU64", end="LPU64,
2451 loi->loi_lvb.lvb_size, loi->loi_kms,
2452 lock->l_policy_data.l_extent.end);
2453 }
2454 ldlm_lock_allow_match(lock);
2455 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2456 LASSERT(lock != NULL);
2457 loi->loi_lvb = *lvb;
2458 ldlm_lock_allow_match(lock);
2459 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2460 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2461 rc = ELDLM_OK;
2462 }
2463
2464 if (lock != NULL) {
2465 if (rc != ELDLM_OK)
2466 ldlm_lock_fail_match(lock);
2467
2468 LDLM_LOCK_PUT(lock);
2469 }
2470}
2471EXPORT_SYMBOL(osc_update_enqueue);
2472
2473struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2474
2475/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2476 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2477 * other synchronous requests, however keeping some locks and trying to obtain
2478 * others may take a considerable amount of time in a case of ost failure; and
2479 * when other sync requests do not get released lock from a client, the client
2480 * is excluded from the cluster -- such scenarious make the life difficult, so
2481 * release locks just after they are obtained. */
2482int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2483 __u64 *flags, ldlm_policy_data_t *policy,
2484 struct ost_lvb *lvb, int kms_valid,
2485 obd_enqueue_update_f upcall, void *cookie,
2486 struct ldlm_enqueue_info *einfo,
2487 struct lustre_handle *lockh,
2488 struct ptlrpc_request_set *rqset, int async, int agl)
2489{
2490 struct obd_device *obd = exp->exp_obd;
2491 struct ptlrpc_request *req = NULL;
2492 int intent = *flags & LDLM_FL_HAS_INTENT;
2493 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2494 ldlm_mode_t mode;
2495 int rc;
2496 ENTRY;
2497
2498 /* Filesystem lock extents are extended to page boundaries so that
2499 * dealing with the page cache is a little smoother. */
2500 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2501 policy->l_extent.end |= ~CFS_PAGE_MASK;
2502
2503 /*
2504 * kms is not valid when either object is completely fresh (so that no
2505 * locks are cached), or object was evicted. In the latter case cached
2506 * lock cannot be used, because it would prime inode state with
2507 * potentially stale LVB.
2508 */
2509 if (!kms_valid)
2510 goto no_match;
2511
2512 /* Next, search for already existing extent locks that will cover us */
2513 /* If we're trying to read, we also search for an existing PW lock. The
2514 * VFS and page cache already protect us locally, so lots of readers/
2515 * writers can share a single PW lock.
2516 *
2517 * There are problems with conversion deadlocks, so instead of
2518 * converting a read lock to a write lock, we'll just enqueue a new
2519 * one.
2520 *
2521 * At some point we should cancel the read lock instead of making them
2522 * send us a blocking callback, but there are problems with canceling
2523 * locks out from other users right now, too. */
2524 mode = einfo->ei_mode;
2525 if (einfo->ei_mode == LCK_PR)
2526 mode |= LCK_PW;
2527 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2528 einfo->ei_type, policy, mode, lockh, 0);
2529 if (mode) {
2530 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2531
2532 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2533 /* For AGL, if enqueue RPC is sent but the lock is not
2534 * granted, then skip to process this strpe.
2535 * Return -ECANCELED to tell the caller. */
2536 ldlm_lock_decref(lockh, mode);
2537 LDLM_LOCK_PUT(matched);
2538 RETURN(-ECANCELED);
2539 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2540 *flags |= LDLM_FL_LVB_READY;
2541 /* addref the lock only if not async requests and PW
2542 * lock is matched whereas we asked for PR. */
2543 if (!rqset && einfo->ei_mode != mode)
2544 ldlm_lock_addref(lockh, LCK_PR);
2545 if (intent) {
2546 /* I would like to be able to ASSERT here that
2547 * rss <= kms, but I can't, for reasons which
2548 * are explained in lov_enqueue() */
2549 }
2550
2551 /* We already have a lock, and it's referenced.
2552 *
2553 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2554 * AGL upcall may change it to CLS_HELD directly. */
2555 (*upcall)(cookie, ELDLM_OK);
2556
2557 if (einfo->ei_mode != mode)
2558 ldlm_lock_decref(lockh, LCK_PW);
2559 else if (rqset)
2560 /* For async requests, decref the lock. */
2561 ldlm_lock_decref(lockh, einfo->ei_mode);
2562 LDLM_LOCK_PUT(matched);
2563 RETURN(ELDLM_OK);
2564 } else {
2565 ldlm_lock_decref(lockh, mode);
2566 LDLM_LOCK_PUT(matched);
2567 }
2568 }
2569
2570 no_match:
2571 if (intent) {
2572 LIST_HEAD(cancels);
2573 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2574 &RQF_LDLM_ENQUEUE_LVB);
2575 if (req == NULL)
2576 RETURN(-ENOMEM);
2577
2578 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2579 if (rc) {
2580 ptlrpc_request_free(req);
2581 RETURN(rc);
2582 }
2583
2584 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2585 sizeof *lvb);
2586 ptlrpc_request_set_replen(req);
2587 }
2588
2589 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2590 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2591
2592 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2593 sizeof(*lvb), LVB_T_OST, lockh, async);
2594 if (rqset) {
2595 if (!rc) {
2596 struct osc_enqueue_args *aa;
2597 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2598 aa = ptlrpc_req_async_args(req);
2599 aa->oa_ei = einfo;
2600 aa->oa_exp = exp;
2601 aa->oa_flags = flags;
2602 aa->oa_upcall = upcall;
2603 aa->oa_cookie = cookie;
2604 aa->oa_lvb = lvb;
2605 aa->oa_lockh = lockh;
2606 aa->oa_agl = !!agl;
2607
2608 req->rq_interpret_reply =
2609 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2610 if (rqset == PTLRPCD_SET)
2611 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2612 else
2613 ptlrpc_set_add_req(rqset, req);
2614 } else if (intent) {
2615 ptlrpc_req_finished(req);
2616 }
2617 RETURN(rc);
2618 }
2619
2620 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2621 if (intent)
2622 ptlrpc_req_finished(req);
2623
2624 RETURN(rc);
2625}
2626
2627static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2628 struct ldlm_enqueue_info *einfo,
2629 struct ptlrpc_request_set *rqset)
2630{
2631 struct ldlm_res_id res_id;
2632 int rc;
2633 ENTRY;
2634
2635 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2636 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2637 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2638 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2639 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2640 rqset, rqset != NULL, 0);
2641 RETURN(rc);
2642}
2643
2644int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2645 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2646 int *flags, void *data, struct lustre_handle *lockh,
2647 int unref)
2648{
2649 struct obd_device *obd = exp->exp_obd;
2650 int lflags = *flags;
2651 ldlm_mode_t rc;
2652 ENTRY;
2653
2654 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2655 RETURN(-EIO);
2656
2657 /* Filesystem lock extents are extended to page boundaries so that
2658 * dealing with the page cache is a little smoother */
2659 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2660 policy->l_extent.end |= ~CFS_PAGE_MASK;
2661
2662 /* Next, search for already existing extent locks that will cover us */
2663 /* If we're trying to read, we also search for an existing PW lock. The
2664 * VFS and page cache already protect us locally, so lots of readers/
2665 * writers can share a single PW lock. */
2666 rc = mode;
2667 if (mode == LCK_PR)
2668 rc |= LCK_PW;
2669 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2670 res_id, type, policy, rc, lockh, unref);
2671 if (rc) {
2672 if (data != NULL) {
2673 if (!osc_set_data_with_check(lockh, data)) {
2674 if (!(lflags & LDLM_FL_TEST_LOCK))
2675 ldlm_lock_decref(lockh, rc);
2676 RETURN(0);
2677 }
2678 }
2679 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2680 ldlm_lock_addref(lockh, LCK_PR);
2681 ldlm_lock_decref(lockh, LCK_PW);
2682 }
2683 RETURN(rc);
2684 }
2685 RETURN(rc);
2686}
2687
2688int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2689{
2690 ENTRY;
2691
2692 if (unlikely(mode == LCK_GROUP))
2693 ldlm_lock_decref_and_cancel(lockh, mode);
2694 else
2695 ldlm_lock_decref(lockh, mode);
2696
2697 RETURN(0);
2698}
2699
2700static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2701 __u32 mode, struct lustre_handle *lockh)
2702{
2703 ENTRY;
2704 RETURN(osc_cancel_base(lockh, mode));
2705}
2706
2707static int osc_cancel_unused(struct obd_export *exp,
2708 struct lov_stripe_md *lsm,
2709 ldlm_cancel_flags_t flags,
2710 void *opaque)
2711{
2712 struct obd_device *obd = class_exp2obd(exp);
2713 struct ldlm_res_id res_id, *resp = NULL;
2714
2715 if (lsm != NULL) {
2716 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2717 resp = &res_id;
2718 }
2719
2720 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2721}
2722
2723static int osc_statfs_interpret(const struct lu_env *env,
2724 struct ptlrpc_request *req,
2725 struct osc_async_args *aa, int rc)
2726{
2727 struct obd_statfs *msfs;
2728 ENTRY;
2729
2730 if (rc == -EBADR)
2731 /* The request has in fact never been sent
2732 * due to issues at a higher level (LOV).
2733 * Exit immediately since the caller is
2734 * aware of the problem and takes care
2735 * of the clean up */
2736 RETURN(rc);
2737
2738 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2739 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2740 GOTO(out, rc = 0);
2741
2742 if (rc != 0)
2743 GOTO(out, rc);
2744
2745 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2746 if (msfs == NULL) {
2747 GOTO(out, rc = -EPROTO);
2748 }
2749
2750 *aa->aa_oi->oi_osfs = *msfs;
2751out:
2752 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2753 RETURN(rc);
2754}
2755
2756static int osc_statfs_async(struct obd_export *exp,
2757 struct obd_info *oinfo, __u64 max_age,
2758 struct ptlrpc_request_set *rqset)
2759{
2760 struct obd_device *obd = class_exp2obd(exp);
2761 struct ptlrpc_request *req;
2762 struct osc_async_args *aa;
2763 int rc;
2764 ENTRY;
2765
2766 /* We could possibly pass max_age in the request (as an absolute
2767 * timestamp or a "seconds.usec ago") so the target can avoid doing
2768 * extra calls into the filesystem if that isn't necessary (e.g.
2769 * during mount that would help a bit). Having relative timestamps
2770 * is not so great if request processing is slow, while absolute
2771 * timestamps are not ideal because they need time synchronization. */
2772 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2773 if (req == NULL)
2774 RETURN(-ENOMEM);
2775
2776 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2777 if (rc) {
2778 ptlrpc_request_free(req);
2779 RETURN(rc);
2780 }
2781 ptlrpc_request_set_replen(req);
2782 req->rq_request_portal = OST_CREATE_PORTAL;
2783 ptlrpc_at_set_req_timeout(req);
2784
2785 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2786 /* procfs requests not want stat in wait for avoid deadlock */
2787 req->rq_no_resend = 1;
2788 req->rq_no_delay = 1;
2789 }
2790
2791 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2792 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2793 aa = ptlrpc_req_async_args(req);
2794 aa->aa_oi = oinfo;
2795
2796 ptlrpc_set_add_req(rqset, req);
2797 RETURN(0);
2798}
2799
2800static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2801 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2802{
2803 struct obd_device *obd = class_exp2obd(exp);
2804 struct obd_statfs *msfs;
2805 struct ptlrpc_request *req;
2806 struct obd_import *imp = NULL;
2807 int rc;
2808 ENTRY;
2809
2810 /*Since the request might also come from lprocfs, so we need
2811 *sync this with client_disconnect_export Bug15684*/
2812 down_read(&obd->u.cli.cl_sem);
2813 if (obd->u.cli.cl_import)
2814 imp = class_import_get(obd->u.cli.cl_import);
2815 up_read(&obd->u.cli.cl_sem);
2816 if (!imp)
2817 RETURN(-ENODEV);
2818
2819 /* We could possibly pass max_age in the request (as an absolute
2820 * timestamp or a "seconds.usec ago") so the target can avoid doing
2821 * extra calls into the filesystem if that isn't necessary (e.g.
2822 * during mount that would help a bit). Having relative timestamps
2823 * is not so great if request processing is slow, while absolute
2824 * timestamps are not ideal because they need time synchronization. */
2825 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2826
2827 class_import_put(imp);
2828
2829 if (req == NULL)
2830 RETURN(-ENOMEM);
2831
2832 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2833 if (rc) {
2834 ptlrpc_request_free(req);
2835 RETURN(rc);
2836 }
2837 ptlrpc_request_set_replen(req);
2838 req->rq_request_portal = OST_CREATE_PORTAL;
2839 ptlrpc_at_set_req_timeout(req);
2840
2841 if (flags & OBD_STATFS_NODELAY) {
2842 /* procfs requests not want stat in wait for avoid deadlock */
2843 req->rq_no_resend = 1;
2844 req->rq_no_delay = 1;
2845 }
2846
2847 rc = ptlrpc_queue_wait(req);
2848 if (rc)
2849 GOTO(out, rc);
2850
2851 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2852 if (msfs == NULL) {
2853 GOTO(out, rc = -EPROTO);
2854 }
2855
2856 *osfs = *msfs;
2857
2858 EXIT;
2859 out:
2860 ptlrpc_req_finished(req);
2861 return rc;
2862}
2863
2864/* Retrieve object striping information.
2865 *
2866 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2867 * the maximum number of OST indices which will fit in the user buffer.
2868 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2869 */
2870static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2871{
2872 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2873 struct lov_user_md_v3 lum, *lumk;
2874 struct lov_user_ost_data_v1 *lmm_objects;
2875 int rc = 0, lum_size;
2876 ENTRY;
2877
2878 if (!lsm)
2879 RETURN(-ENODATA);
2880
2881 /* we only need the header part from user space to get lmm_magic and
2882 * lmm_stripe_count, (the header part is common to v1 and v3) */
2883 lum_size = sizeof(struct lov_user_md_v1);
2884 if (copy_from_user(&lum, lump, lum_size))
2885 RETURN(-EFAULT);
2886
2887 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2888 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2889 RETURN(-EINVAL);
2890
2891 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2892 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2893 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2894 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2895
2896 /* we can use lov_mds_md_size() to compute lum_size
2897 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2898 if (lum.lmm_stripe_count > 0) {
2899 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2900 OBD_ALLOC(lumk, lum_size);
2901 if (!lumk)
2902 RETURN(-ENOMEM);
2903
2904 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2905 lmm_objects =
2906 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2907 else
2908 lmm_objects = &(lumk->lmm_objects[0]);
2909 lmm_objects->l_ost_oi = lsm->lsm_oi;
2910 } else {
2911 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2912 lumk = &lum;
2913 }
2914
2915 lumk->lmm_oi = lsm->lsm_oi;
2916 lumk->lmm_stripe_count = 1;
2917
2918 if (copy_to_user(lump, lumk, lum_size))
2919 rc = -EFAULT;
2920
2921 if (lumk != &lum)
2922 OBD_FREE(lumk, lum_size);
2923
2924 RETURN(rc);
2925}
2926
2927
2928static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2929 void *karg, void *uarg)
2930{
2931 struct obd_device *obd = exp->exp_obd;
2932 struct obd_ioctl_data *data = karg;
2933 int err = 0;
2934 ENTRY;
2935
2936 if (!try_module_get(THIS_MODULE)) {
2937 CERROR("Can't get module. Is it alive?");
2938 return -EINVAL;
2939 }
2940 switch (cmd) {
2941 case OBD_IOC_LOV_GET_CONFIG: {
2942 char *buf;
2943 struct lov_desc *desc;
2944 struct obd_uuid uuid;
2945
2946 buf = NULL;
2947 len = 0;
2948 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2949 GOTO(out, err = -EINVAL);
2950
2951 data = (struct obd_ioctl_data *)buf;
2952
2953 if (sizeof(*desc) > data->ioc_inllen1) {
2954 obd_ioctl_freedata(buf, len);
2955 GOTO(out, err = -EINVAL);
2956 }
2957
2958 if (data->ioc_inllen2 < sizeof(uuid)) {
2959 obd_ioctl_freedata(buf, len);
2960 GOTO(out, err = -EINVAL);
2961 }
2962
2963 desc = (struct lov_desc *)data->ioc_inlbuf1;
2964 desc->ld_tgt_count = 1;
2965 desc->ld_active_tgt_count = 1;
2966 desc->ld_default_stripe_count = 1;
2967 desc->ld_default_stripe_size = 0;
2968 desc->ld_default_stripe_offset = 0;
2969 desc->ld_pattern = 0;
2970 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2971
2972 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2973
2974 err = copy_to_user((void *)uarg, buf, len);
2975 if (err)
2976 err = -EFAULT;
2977 obd_ioctl_freedata(buf, len);
2978 GOTO(out, err);
2979 }
2980 case LL_IOC_LOV_SETSTRIPE:
2981 err = obd_alloc_memmd(exp, karg);
2982 if (err > 0)
2983 err = 0;
2984 GOTO(out, err);
2985 case LL_IOC_LOV_GETSTRIPE:
2986 err = osc_getstripe(karg, uarg);
2987 GOTO(out, err);
2988 case OBD_IOC_CLIENT_RECOVER:
2989 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2990 data->ioc_inlbuf1, 0);
2991 if (err > 0)
2992 err = 0;
2993 GOTO(out, err);
2994 case IOC_OSC_SET_ACTIVE:
2995 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2996 data->ioc_offset);
2997 GOTO(out, err);
2998 case OBD_IOC_POLL_QUOTACHECK:
2999 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3000 GOTO(out, err);
3001 case OBD_IOC_PING_TARGET:
3002 err = ptlrpc_obd_ping(obd);
3003 GOTO(out, err);
3004 default:
3005 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3006 cmd, current_comm());
3007 GOTO(out, err = -ENOTTY);
3008 }
3009out:
3010 module_put(THIS_MODULE);
3011 return err;
3012}
3013
3014static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3015 obd_count keylen, void *key, __u32 *vallen, void *val,
3016 struct lov_stripe_md *lsm)
3017{
3018 ENTRY;
3019 if (!vallen || !val)
3020 RETURN(-EFAULT);
3021
3022 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3023 __u32 *stripe = val;
3024 *vallen = sizeof(*stripe);
3025 *stripe = 0;
3026 RETURN(0);
3027 } else if (KEY_IS(KEY_LAST_ID)) {
3028 struct ptlrpc_request *req;
3029 obd_id *reply;
3030 char *tmp;
3031 int rc;
3032
3033 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3034 &RQF_OST_GET_INFO_LAST_ID);
3035 if (req == NULL)
3036 RETURN(-ENOMEM);
3037
3038 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3039 RCL_CLIENT, keylen);
3040 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3041 if (rc) {
3042 ptlrpc_request_free(req);
3043 RETURN(rc);
3044 }
3045
3046 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3047 memcpy(tmp, key, keylen);
3048
3049 req->rq_no_delay = req->rq_no_resend = 1;
3050 ptlrpc_request_set_replen(req);
3051 rc = ptlrpc_queue_wait(req);
3052 if (rc)
3053 GOTO(out, rc);
3054
3055 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3056 if (reply == NULL)
3057 GOTO(out, rc = -EPROTO);
3058
3059 *((obd_id *)val) = *reply;
3060 out:
3061 ptlrpc_req_finished(req);
3062 RETURN(rc);
3063 } else if (KEY_IS(KEY_FIEMAP)) {
3064 struct ptlrpc_request *req;
3065 struct ll_user_fiemap *reply;
3066 char *tmp;
3067 int rc;
3068
3069 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3070 &RQF_OST_GET_INFO_FIEMAP);
3071 if (req == NULL)
3072 RETURN(-ENOMEM);
3073
3074 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3075 RCL_CLIENT, keylen);
3076 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3077 RCL_CLIENT, *vallen);
3078 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3079 RCL_SERVER, *vallen);
3080
3081 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3082 if (rc) {
3083 ptlrpc_request_free(req);
3084 RETURN(rc);
3085 }
3086
3087 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3088 memcpy(tmp, key, keylen);
3089 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3090 memcpy(tmp, val, *vallen);
3091
3092 ptlrpc_request_set_replen(req);
3093 rc = ptlrpc_queue_wait(req);
3094 if (rc)
3095 GOTO(out1, rc);
3096
3097 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3098 if (reply == NULL)
3099 GOTO(out1, rc = -EPROTO);
3100
3101 memcpy(val, reply, *vallen);
3102 out1:
3103 ptlrpc_req_finished(req);
3104
3105 RETURN(rc);
3106 }
3107
3108 RETURN(-EINVAL);
3109}
3110
3111static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3112 obd_count keylen, void *key, obd_count vallen,
3113 void *val, struct ptlrpc_request_set *set)
3114{
3115 struct ptlrpc_request *req;
3116 struct obd_device *obd = exp->exp_obd;
3117 struct obd_import *imp = class_exp2cliimp(exp);
3118 char *tmp;
3119 int rc;
3120 ENTRY;
3121
3122 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3123
3124 if (KEY_IS(KEY_CHECKSUM)) {
3125 if (vallen != sizeof(int))
3126 RETURN(-EINVAL);
3127 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3128 RETURN(0);
3129 }
3130
3131 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3132 sptlrpc_conf_client_adapt(obd);
3133 RETURN(0);
3134 }
3135
3136 if (KEY_IS(KEY_FLUSH_CTX)) {
3137 sptlrpc_import_flush_my_ctx(imp);
3138 RETURN(0);
3139 }
3140
3141 if (KEY_IS(KEY_CACHE_SET)) {
3142 struct client_obd *cli = &obd->u.cli;
3143
3144 LASSERT(cli->cl_cache == NULL); /* only once */
3145 cli->cl_cache = (struct cl_client_cache *)val;
3146 atomic_inc(&cli->cl_cache->ccc_users);
3147 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3148
3149 /* add this osc into entity list */
3150 LASSERT(list_empty(&cli->cl_lru_osc));
3151 spin_lock(&cli->cl_cache->ccc_lru_lock);
3152 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3153 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3154
3155 RETURN(0);
3156 }
3157
3158 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3159 struct client_obd *cli = &obd->u.cli;
3160 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3161 int target = *(int *)val;
3162
3163 nr = osc_lru_shrink(cli, min(nr, target));
3164 *(int *)val -= nr;
3165 RETURN(0);
3166 }
3167
3168 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3169 RETURN(-EINVAL);
3170
3171 /* We pass all other commands directly to OST. Since nobody calls osc
3172 methods directly and everybody is supposed to go through LOV, we
3173 assume lov checked invalid values for us.
3174 The only recognised values so far are evict_by_nid and mds_conn.
3175 Even if something bad goes through, we'd get a -EINVAL from OST
3176 anyway. */
3177
3178 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3179 &RQF_OST_SET_GRANT_INFO :
3180 &RQF_OBD_SET_INFO);
3181 if (req == NULL)
3182 RETURN(-ENOMEM);
3183
3184 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3185 RCL_CLIENT, keylen);
3186 if (!KEY_IS(KEY_GRANT_SHRINK))
3187 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3188 RCL_CLIENT, vallen);
3189 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3190 if (rc) {
3191 ptlrpc_request_free(req);
3192 RETURN(rc);
3193 }
3194
3195 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3196 memcpy(tmp, key, keylen);
3197 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3198 &RMF_OST_BODY :
3199 &RMF_SETINFO_VAL);
3200 memcpy(tmp, val, vallen);
3201
3202 if (KEY_IS(KEY_GRANT_SHRINK)) {
3203 struct osc_grant_args *aa;
3204 struct obdo *oa;
3205
3206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3207 aa = ptlrpc_req_async_args(req);
3208 OBDO_ALLOC(oa);
3209 if (!oa) {
3210 ptlrpc_req_finished(req);
3211 RETURN(-ENOMEM);
3212 }
3213 *oa = ((struct ost_body *)val)->oa;
3214 aa->aa_oa = oa;
3215 req->rq_interpret_reply = osc_shrink_grant_interpret;
3216 }
3217
3218 ptlrpc_request_set_replen(req);
3219 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3220 LASSERT(set != NULL);
3221 ptlrpc_set_add_req(set, req);
3222 ptlrpc_check_set(NULL, set);
3223 } else
3224 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3225
3226 RETURN(0);
3227}
3228
3229
3230static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3231 struct obd_device *disk_obd, int *index)
3232{
3233 /* this code is not supposed to be used with LOD/OSP
3234 * to be removed soon */
3235 LBUG();
3236 return 0;
3237}
3238
3239static int osc_llog_finish(struct obd_device *obd, int count)
3240{
3241 struct llog_ctxt *ctxt;
3242
3243 ENTRY;
3244
3245 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3246 if (ctxt) {
3247 llog_cat_close(NULL, ctxt->loc_handle);
3248 llog_cleanup(NULL, ctxt);
3249 }
3250
3251 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3252 if (ctxt)
3253 llog_cleanup(NULL, ctxt);
3254 RETURN(0);
3255}
3256
3257static int osc_reconnect(const struct lu_env *env,
3258 struct obd_export *exp, struct obd_device *obd,
3259 struct obd_uuid *cluuid,
3260 struct obd_connect_data *data,
3261 void *localdata)
3262{
3263 struct client_obd *cli = &obd->u.cli;
3264
3265 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3266 long lost_grant;
3267
3268 client_obd_list_lock(&cli->cl_loi_list_lock);
3269 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3270 2 * cli_brw_size(obd);
3271 lost_grant = cli->cl_lost_grant;
3272 cli->cl_lost_grant = 0;
3273 client_obd_list_unlock(&cli->cl_loi_list_lock);
3274
3275 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3276 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3277 data->ocd_version, data->ocd_grant, lost_grant);
3278 }
3279
3280 RETURN(0);
3281}
3282
3283static int osc_disconnect(struct obd_export *exp)
3284{
3285 struct obd_device *obd = class_exp2obd(exp);
3286 struct llog_ctxt *ctxt;
3287 int rc;
3288
3289 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3290 if (ctxt) {
3291 if (obd->u.cli.cl_conn_count == 1) {
3292 /* Flush any remaining cancel messages out to the
3293 * target */
3294 llog_sync(ctxt, exp, 0);
3295 }
3296 llog_ctxt_put(ctxt);
3297 } else {
3298 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3299 obd);
3300 }
3301
3302 rc = client_disconnect_export(exp);
3303 /**
3304 * Initially we put del_shrink_grant before disconnect_export, but it
3305 * causes the following problem if setup (connect) and cleanup
3306 * (disconnect) are tangled together.
3307 * connect p1 disconnect p2
3308 * ptlrpc_connect_import
3309 * ............... class_manual_cleanup
3310 * osc_disconnect
3311 * del_shrink_grant
3312 * ptlrpc_connect_interrupt
3313 * init_grant_shrink
3314 * add this client to shrink list
3315 * cleanup_osc
3316 * Bang! pinger trigger the shrink.
3317 * So the osc should be disconnected from the shrink list, after we
3318 * are sure the import has been destroyed. BUG18662
3319 */
3320 if (obd->u.cli.cl_import == NULL)
3321 osc_del_shrink_grant(&obd->u.cli);
3322 return rc;
3323}
3324
3325static int osc_import_event(struct obd_device *obd,
3326 struct obd_import *imp,
3327 enum obd_import_event event)
3328{
3329 struct client_obd *cli;
3330 int rc = 0;
3331
3332 ENTRY;
3333 LASSERT(imp->imp_obd == obd);
3334
3335 switch (event) {
3336 case IMP_EVENT_DISCON: {
3337 cli = &obd->u.cli;
3338 client_obd_list_lock(&cli->cl_loi_list_lock);
3339 cli->cl_avail_grant = 0;
3340 cli->cl_lost_grant = 0;
3341 client_obd_list_unlock(&cli->cl_loi_list_lock);
3342 break;
3343 }
3344 case IMP_EVENT_INACTIVE: {
3345 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3346 break;
3347 }
3348 case IMP_EVENT_INVALIDATE: {
3349 struct ldlm_namespace *ns = obd->obd_namespace;
3350 struct lu_env *env;
3351 int refcheck;
3352
3353 env = cl_env_get(&refcheck);
3354 if (!IS_ERR(env)) {
3355 /* Reset grants */
3356 cli = &obd->u.cli;
3357 /* all pages go to failing rpcs due to the invalid
3358 * import */
3359 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3360
3361 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3362 cl_env_put(env, &refcheck);
3363 } else
3364 rc = PTR_ERR(env);
3365 break;
3366 }
3367 case IMP_EVENT_ACTIVE: {
3368 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3369 break;
3370 }
3371 case IMP_EVENT_OCD: {
3372 struct obd_connect_data *ocd = &imp->imp_connect_data;
3373
3374 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3375 osc_init_grant(&obd->u.cli, ocd);
3376
3377 /* See bug 7198 */
3378 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3379 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3380
3381 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3382 break;
3383 }
3384 case IMP_EVENT_DEACTIVATE: {
3385 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3386 break;
3387 }
3388 case IMP_EVENT_ACTIVATE: {
3389 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3390 break;
3391 }
3392 default:
3393 CERROR("Unknown import event %d\n", event);
3394 LBUG();
3395 }
3396 RETURN(rc);
3397}
3398
3399/**
3400 * Determine whether the lock can be canceled before replaying the lock
3401 * during recovery, see bug16774 for detailed information.
3402 *
3403 * \retval zero the lock can't be canceled
3404 * \retval other ok to cancel
3405 */
3406static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3407{
3408 check_res_locked(lock->l_resource);
3409
3410 /*
3411 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3412 *
3413 * XXX as a future improvement, we can also cancel unused write lock
3414 * if it doesn't have dirty data and active mmaps.
3415 */
3416 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3417 (lock->l_granted_mode == LCK_PR ||
3418 lock->l_granted_mode == LCK_CR) &&
3419 (osc_dlm_lock_pageref(lock) == 0))
3420 RETURN(1);
3421
3422 RETURN(0);
3423}
3424
3425static int brw_queue_work(const struct lu_env *env, void *data)
3426{
3427 struct client_obd *cli = data;
3428
3429 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3430
3431 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3432 RETURN(0);
3433}
3434
3435int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3436{
3437 struct lprocfs_static_vars lvars = { 0 };
3438 struct client_obd *cli = &obd->u.cli;
3439 void *handler;
3440 int rc;
3441 ENTRY;
3442
3443 rc = ptlrpcd_addref();
3444 if (rc)
3445 RETURN(rc);
3446
3447 rc = client_obd_setup(obd, lcfg);
3448 if (rc)
3449 GOTO(out_ptlrpcd, rc);
3450
3451 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3452 if (IS_ERR(handler))
3453 GOTO(out_client_setup, rc = PTR_ERR(handler));
3454 cli->cl_writeback_work = handler;
3455
3456 rc = osc_quota_setup(obd);
3457 if (rc)
3458 GOTO(out_ptlrpcd_work, rc);
3459
3460 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3461 lprocfs_osc_init_vars(&lvars);
3462 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3463 lproc_osc_attach_seqstat(obd);
3464 sptlrpc_lprocfs_cliobd_attach(obd);
3465 ptlrpc_lprocfs_register_obd(obd);
3466 }
3467
3468 /* We need to allocate a few requests more, because
3469 * brw_interpret tries to create new requests before freeing
3470 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3471 * reserved, but I'm afraid that might be too much wasted RAM
3472 * in fact, so 2 is just my guess and still should work. */
3473 cli->cl_import->imp_rq_pool =
3474 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3475 OST_MAXREQSIZE,
3476 ptlrpc_add_rqs_to_pool);
3477
3478 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3479 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3480 RETURN(rc);
3481
3482out_ptlrpcd_work:
3483 ptlrpcd_destroy_work(handler);
3484out_client_setup:
3485 client_obd_cleanup(obd);
3486out_ptlrpcd:
3487 ptlrpcd_decref();
3488 RETURN(rc);
3489}
3490
3491static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3492{
3493 int rc = 0;
3494 ENTRY;
3495
3496 switch (stage) {
3497 case OBD_CLEANUP_EARLY: {
3498 struct obd_import *imp;
3499 imp = obd->u.cli.cl_import;
3500 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3501 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3502 ptlrpc_deactivate_import(imp);
3503 spin_lock(&imp->imp_lock);
3504 imp->imp_pingable = 0;
3505 spin_unlock(&imp->imp_lock);
3506 break;
3507 }
3508 case OBD_CLEANUP_EXPORTS: {
3509 struct client_obd *cli = &obd->u.cli;
3510 /* LU-464
3511 * for echo client, export may be on zombie list, wait for
3512 * zombie thread to cull it, because cli.cl_import will be
3513 * cleared in client_disconnect_export():
3514 * class_export_destroy() -> obd_cleanup() ->
3515 * echo_device_free() -> echo_client_cleanup() ->
3516 * obd_disconnect() -> osc_disconnect() ->
3517 * client_disconnect_export()
3518 */
3519 obd_zombie_barrier();
3520 if (cli->cl_writeback_work) {
3521 ptlrpcd_destroy_work(cli->cl_writeback_work);
3522 cli->cl_writeback_work = NULL;
3523 }
3524 obd_cleanup_client_import(obd);
3525 ptlrpc_lprocfs_unregister_obd(obd);
3526 lprocfs_obd_cleanup(obd);
3527 rc = obd_llog_finish(obd, 0);
3528 if (rc != 0)
3529 CERROR("failed to cleanup llogging subsystems\n");
3530 break;
3531 }
3532 }
3533 RETURN(rc);
3534}
3535
3536int osc_cleanup(struct obd_device *obd)
3537{
3538 struct client_obd *cli = &obd->u.cli;
3539 int rc;
3540
3541 ENTRY;
3542
3543 /* lru cleanup */
3544 if (cli->cl_cache != NULL) {
3545 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3546 spin_lock(&cli->cl_cache->ccc_lru_lock);
3547 list_del_init(&cli->cl_lru_osc);
3548 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3549 cli->cl_lru_left = NULL;
3550 atomic_dec(&cli->cl_cache->ccc_users);
3551 cli->cl_cache = NULL;
3552 }
3553
3554 /* free memory of osc quota cache */
3555 osc_quota_cleanup(obd);
3556
3557 rc = client_obd_cleanup(obd);
3558
3559 ptlrpcd_decref();
3560 RETURN(rc);
3561}
3562
3563int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3564{
3565 struct lprocfs_static_vars lvars = { 0 };
3566 int rc = 0;
3567
3568 lprocfs_osc_init_vars(&lvars);
3569
3570 switch (lcfg->lcfg_command) {
3571 default:
3572 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3573 lcfg, obd);
3574 if (rc > 0)
3575 rc = 0;
3576 break;
3577 }
3578
3579 return(rc);
3580}
3581
3582static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3583{
3584 return osc_process_config_base(obd, buf);
3585}
3586
3587struct obd_ops osc_obd_ops = {
3588 .o_owner = THIS_MODULE,
3589 .o_setup = osc_setup,
3590 .o_precleanup = osc_precleanup,
3591 .o_cleanup = osc_cleanup,
3592 .o_add_conn = client_import_add_conn,
3593 .o_del_conn = client_import_del_conn,
3594 .o_connect = client_connect_import,
3595 .o_reconnect = osc_reconnect,
3596 .o_disconnect = osc_disconnect,
3597 .o_statfs = osc_statfs,
3598 .o_statfs_async = osc_statfs_async,
3599 .o_packmd = osc_packmd,
3600 .o_unpackmd = osc_unpackmd,
3601 .o_create = osc_create,
3602 .o_destroy = osc_destroy,
3603 .o_getattr = osc_getattr,
3604 .o_getattr_async = osc_getattr_async,
3605 .o_setattr = osc_setattr,
3606 .o_setattr_async = osc_setattr_async,
3607 .o_brw = osc_brw,
3608 .o_punch = osc_punch,
3609 .o_sync = osc_sync,
3610 .o_enqueue = osc_enqueue,
3611 .o_change_cbdata = osc_change_cbdata,
3612 .o_find_cbdata = osc_find_cbdata,
3613 .o_cancel = osc_cancel,
3614 .o_cancel_unused = osc_cancel_unused,
3615 .o_iocontrol = osc_iocontrol,
3616 .o_get_info = osc_get_info,
3617 .o_set_info_async = osc_set_info_async,
3618 .o_import_event = osc_import_event,
3619 .o_llog_init = osc_llog_init,
3620 .o_llog_finish = osc_llog_finish,
3621 .o_process_config = osc_process_config,
3622 .o_quotactl = osc_quotactl,
3623 .o_quotacheck = osc_quotacheck,
3624};
3625
3626extern struct lu_kmem_descr osc_caches[];
3627extern spinlock_t osc_ast_guard;
3628extern struct lock_class_key osc_ast_guard_class;
3629
3630int __init osc_init(void)
3631{
3632 struct lprocfs_static_vars lvars = { 0 };
3633 int rc;
3634 ENTRY;
3635
3636 /* print an address of _any_ initialized kernel symbol from this
3637 * module, to allow debugging with gdb that doesn't support data
3638 * symbols from modules.*/
3639 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3640
3641 rc = lu_kmem_init(osc_caches);
3642
3643 lprocfs_osc_init_vars(&lvars);
3644
3645 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3646 LUSTRE_OSC_NAME, &osc_device_type);
3647 if (rc) {
3648 lu_kmem_fini(osc_caches);
3649 RETURN(rc);
3650 }
3651
3652 spin_lock_init(&osc_ast_guard);
3653 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3654
3655 RETURN(rc);
3656}
3657
3658static void /*__exit*/ osc_exit(void)
3659{
3660 class_unregister_type(LUSTRE_OSC_NAME);
3661 lu_kmem_fini(osc_caches);
3662}
3663
3664MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3665MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3666MODULE_LICENSE("GPL");
3667
3668cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
This page took 0.185762 seconds and 5 git commands to generate.