staging/lustre/ptlrpc: move sptlrpc procfs entry to debugfs
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
9fdaf8c0 39#include "../../include/linux/libcfs/libcfs.h"
d7e09d03
PT
40
41
3ee30015
GKH
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
d7e09d03 46
3ee30015
GKH
47#include "../include/lustre_ha.h"
48#include "../include/lprocfs_status.h"
3ee30015
GKH
49#include "../include/lustre_debug.h"
50#include "../include/lustre_param.h"
51#include "../include/lustre_fid.h"
dd45f477 52#include "../include/obd_class.h"
d7e09d03
PT
53#include "osc_internal.h"
54#include "osc_cl_internal.h"
55
f024bad4
JH
56struct osc_brw_async_args {
57 struct obdo *aa_oa;
58 int aa_requested_nob;
59 int aa_nio_count;
60 u32 aa_page_count;
61 int aa_resends;
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
68};
69
70struct osc_async_args {
71 struct obd_info *aa_oi;
72};
73
74struct osc_setattr_args {
75 struct obdo *sa_oa;
76 obd_enqueue_update_f sa_upcall;
77 void *sa_cookie;
78};
79
80struct osc_fsync_args {
81 struct obd_info *fa_oi;
82 obd_enqueue_update_f fa_upcall;
83 void *fa_cookie;
84};
85
86struct osc_enqueue_args {
87 struct obd_export *oa_exp;
88 __u64 *oa_flags;
89 obd_enqueue_update_f oa_upcall;
90 void *oa_cookie;
91 struct ost_lvb *oa_lvb;
92 struct lustre_handle *oa_lockh;
93 struct ldlm_enqueue_info *oa_ei;
94 unsigned int oa_agl:1;
95};
96
21aef7d9 97static void osc_release_ppga(struct brw_page **ppga, u32 count);
d7e09d03
PT
98static int brw_interpret(const struct lu_env *env,
99 struct ptlrpc_request *req, void *data, int rc);
100int osc_cleanup(struct obd_device *obd);
101
102/* Pack OSC object metadata for disk storage (LE byte order). */
103static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 struct lov_stripe_md *lsm)
105{
106 int lmm_size;
d7e09d03
PT
107
108 lmm_size = sizeof(**lmmp);
109 if (lmmp == NULL)
0a3bdb00 110 return lmm_size;
d7e09d03
PT
111
112 if (*lmmp != NULL && lsm == NULL) {
7795178d 113 kfree(*lmmp);
d7e09d03 114 *lmmp = NULL;
0a3bdb00 115 return 0;
d7e09d03 116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
0a3bdb00 117 return -EBADF;
d7e09d03
PT
118 }
119
120 if (*lmmp == NULL) {
7795178d 121 *lmmp = kzalloc(lmm_size, GFP_NOFS);
d7e09d03 122 if (*lmmp == NULL)
0a3bdb00 123 return -ENOMEM;
d7e09d03
PT
124 }
125
126 if (lsm)
127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
0a3bdb00 129 return lmm_size;
d7e09d03
PT
130}
131
132/* Unpack OSC object metadata from disk storage (LE byte order). */
133static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 struct lov_mds_md *lmm, int lmm_bytes)
135{
136 int lsm_size;
137 struct obd_import *imp = class_exp2cliimp(exp);
d7e09d03
PT
138
139 if (lmm != NULL) {
140 if (lmm_bytes < sizeof(*lmm)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp->exp_obd->obd_name, lmm_bytes,
143 (int)sizeof(*lmm));
0a3bdb00 144 return -EINVAL;
d7e09d03
PT
145 }
146 /* XXX LOV_MAGIC etc check? */
147
148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp->exp_obd->obd_name, -EINVAL);
0a3bdb00 151 return -EINVAL;
d7e09d03
PT
152 }
153 }
154
155 lsm_size = lov_stripe_md_size(1);
156 if (lsmp == NULL)
0a3bdb00 157 return lsm_size;
d7e09d03
PT
158
159 if (*lsmp != NULL && lmm == NULL) {
7795178d
JL
160 kfree((*lsmp)->lsm_oinfo[0]);
161 kfree(*lsmp);
d7e09d03 162 *lsmp = NULL;
0a3bdb00 163 return 0;
d7e09d03
PT
164 }
165
166 if (*lsmp == NULL) {
7795178d 167 *lsmp = kzalloc(lsm_size, GFP_NOFS);
d7e09d03 168 if (unlikely(*lsmp == NULL))
0a3bdb00 169 return -ENOMEM;
7795178d
JL
170 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
171 GFP_NOFS);
d7e09d03 172 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
7795178d 173 kfree(*lsmp);
0a3bdb00 174 return -ENOMEM;
d7e09d03
PT
175 }
176 loi_init((*lsmp)->lsm_oinfo[0]);
177 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
0a3bdb00 178 return -EBADF;
d7e09d03
PT
179 }
180
181 if (lmm != NULL)
182 /* XXX zero *lsmp? */
183 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
184
185 if (imp != NULL &&
186 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
187 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
188 else
189 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
190
0a3bdb00 191 return lsm_size;
d7e09d03
PT
192}
193
194static inline void osc_pack_capa(struct ptlrpc_request *req,
195 struct ost_body *body, void *capa)
196{
197 struct obd_capa *oc = (struct obd_capa *)capa;
198 struct lustre_capa *c;
199
200 if (!capa)
201 return;
202
203 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
204 LASSERT(c);
205 capa_cpy(c, oc);
206 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
207 DEBUG_CAPA(D_SEC, c, "pack");
208}
209
210static inline void osc_pack_req_body(struct ptlrpc_request *req,
211 struct obd_info *oinfo)
212{
213 struct ost_body *body;
214
215 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
216 LASSERT(body);
217
3b2f75fd 218 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
219 oinfo->oi_oa);
d7e09d03
PT
220 osc_pack_capa(req, body, oinfo->oi_capa);
221}
222
223static inline void osc_set_capa_size(struct ptlrpc_request *req,
224 const struct req_msg_field *field,
225 struct obd_capa *oc)
226{
227 if (oc == NULL)
228 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
229 else
230 /* it is already calculated as sizeof struct obd_capa */
231 ;
232}
233
234static int osc_getattr_interpret(const struct lu_env *env,
235 struct ptlrpc_request *req,
236 struct osc_async_args *aa, int rc)
237{
238 struct ost_body *body;
d7e09d03
PT
239
240 if (rc != 0)
26c4ea46 241 goto out;
d7e09d03
PT
242
243 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
244 if (body) {
245 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 246 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
247 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
248
249 /* This should really be sent by the OST */
250 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
251 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
252 } else {
253 CDEBUG(D_INFO, "can't unpack ost_body\n");
254 rc = -EPROTO;
255 aa->aa_oi->oi_oa->o_valid = 0;
256 }
257out:
258 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 259 return rc;
d7e09d03
PT
260}
261
262static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
263 struct ptlrpc_request_set *set)
264{
265 struct ptlrpc_request *req;
266 struct osc_async_args *aa;
267 int rc;
d7e09d03
PT
268
269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270 if (req == NULL)
0a3bdb00 271 return -ENOMEM;
d7e09d03
PT
272
273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275 if (rc) {
276 ptlrpc_request_free(req);
0a3bdb00 277 return rc;
d7e09d03
PT
278 }
279
280 osc_pack_req_body(req, oinfo);
281
282 ptlrpc_request_set_replen(req);
283 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
284
285 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
286 aa = ptlrpc_req_async_args(req);
287 aa->aa_oi = oinfo;
288
289 ptlrpc_set_add_req(set, req);
0a3bdb00 290 return 0;
d7e09d03
PT
291}
292
293static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
294 struct obd_info *oinfo)
295{
296 struct ptlrpc_request *req;
297 struct ost_body *body;
298 int rc;
d7e09d03
PT
299
300 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
301 if (req == NULL)
0a3bdb00 302 return -ENOMEM;
d7e09d03
PT
303
304 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
305 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
306 if (rc) {
307 ptlrpc_request_free(req);
0a3bdb00 308 return rc;
d7e09d03
PT
309 }
310
311 osc_pack_req_body(req, oinfo);
312
313 ptlrpc_request_set_replen(req);
314
315 rc = ptlrpc_queue_wait(req);
316 if (rc)
26c4ea46 317 goto out;
d7e09d03
PT
318
319 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
320 if (body == NULL) {
321 rc = -EPROTO;
322 goto out;
323 }
d7e09d03
PT
324
325 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 326 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327 &body->oa);
d7e09d03
PT
328
329 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
330 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
331
d7e09d03
PT
332 out:
333 ptlrpc_req_finished(req);
334 return rc;
335}
336
337static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
338 struct obd_info *oinfo, struct obd_trans_info *oti)
339{
340 struct ptlrpc_request *req;
341 struct ost_body *body;
342 int rc;
d7e09d03
PT
343
344 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
345
346 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
347 if (req == NULL)
0a3bdb00 348 return -ENOMEM;
d7e09d03
PT
349
350 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
351 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
352 if (rc) {
353 ptlrpc_request_free(req);
0a3bdb00 354 return rc;
d7e09d03
PT
355 }
356
357 osc_pack_req_body(req, oinfo);
358
359 ptlrpc_request_set_replen(req);
360
361 rc = ptlrpc_queue_wait(req);
362 if (rc)
26c4ea46 363 goto out;
d7e09d03
PT
364
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
366 if (body == NULL) {
367 rc = -EPROTO;
368 goto out;
369 }
d7e09d03 370
3b2f75fd 371 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
372 &body->oa);
d7e09d03 373
d7e09d03
PT
374out:
375 ptlrpc_req_finished(req);
0a3bdb00 376 return rc;
d7e09d03
PT
377}
378
379static int osc_setattr_interpret(const struct lu_env *env,
380 struct ptlrpc_request *req,
381 struct osc_setattr_args *sa, int rc)
382{
383 struct ost_body *body;
d7e09d03
PT
384
385 if (rc != 0)
26c4ea46 386 goto out;
d7e09d03
PT
387
388 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
389 if (body == NULL) {
390 rc = -EPROTO;
391 goto out;
392 }
d7e09d03 393
3b2f75fd 394 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
395 &body->oa);
d7e09d03
PT
396out:
397 rc = sa->sa_upcall(sa->sa_cookie, rc);
0a3bdb00 398 return rc;
d7e09d03
PT
399}
400
401int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
402 struct obd_trans_info *oti,
403 obd_enqueue_update_f upcall, void *cookie,
404 struct ptlrpc_request_set *rqset)
405{
406 struct ptlrpc_request *req;
407 struct osc_setattr_args *sa;
408 int rc;
d7e09d03
PT
409
410 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
411 if (req == NULL)
0a3bdb00 412 return -ENOMEM;
d7e09d03
PT
413
414 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
415 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
416 if (rc) {
417 ptlrpc_request_free(req);
0a3bdb00 418 return rc;
d7e09d03
PT
419 }
420
421 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
422 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
423
424 osc_pack_req_body(req, oinfo);
425
426 ptlrpc_request_set_replen(req);
427
428 /* do mds to ost setattr asynchronously */
429 if (!rqset) {
430 /* Do not wait for response. */
431 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
432 } else {
433 req->rq_interpret_reply =
434 (ptlrpc_interpterer_t)osc_setattr_interpret;
435
436 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
437 sa = ptlrpc_req_async_args(req);
438 sa->sa_oa = oinfo->oi_oa;
439 sa->sa_upcall = upcall;
440 sa->sa_cookie = cookie;
441
442 if (rqset == PTLRPCD_SET)
443 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
444 else
445 ptlrpc_set_add_req(rqset, req);
446 }
447
0a3bdb00 448 return 0;
d7e09d03
PT
449}
450
451static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
452 struct obd_trans_info *oti,
453 struct ptlrpc_request_set *rqset)
454{
455 return osc_setattr_async_base(exp, oinfo, oti,
456 oinfo->oi_cb_up, oinfo, rqset);
457}
458
459int osc_real_create(struct obd_export *exp, struct obdo *oa,
460 struct lov_stripe_md **ea, struct obd_trans_info *oti)
461{
462 struct ptlrpc_request *req;
463 struct ost_body *body;
464 struct lov_stripe_md *lsm;
465 int rc;
d7e09d03
PT
466
467 LASSERT(oa);
468 LASSERT(ea);
469
470 lsm = *ea;
471 if (!lsm) {
472 rc = obd_alloc_memmd(exp, &lsm);
473 if (rc < 0)
0a3bdb00 474 return rc;
d7e09d03
PT
475 }
476
477 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
26c4ea46
TJ
478 if (req == NULL) {
479 rc = -ENOMEM;
480 goto out;
481 }
d7e09d03
PT
482
483 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
484 if (rc) {
485 ptlrpc_request_free(req);
26c4ea46 486 goto out;
d7e09d03
PT
487 }
488
489 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490 LASSERT(body);
3b2f75fd 491
492 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
493
494 ptlrpc_request_set_replen(req);
495
496 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
497 oa->o_flags == OBD_FL_DELORPHAN) {
498 DEBUG_REQ(D_HA, req,
499 "delorphan from OST integration");
500 /* Don't resend the delorphan req */
501 req->rq_no_resend = req->rq_no_delay = 1;
502 }
503
504 rc = ptlrpc_queue_wait(req);
505 if (rc)
26c4ea46 506 goto out_req;
d7e09d03
PT
507
508 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
509 if (body == NULL) {
510 rc = -EPROTO;
511 goto out_req;
512 }
d7e09d03 513
3b2f75fd 514 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
515 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
516
517 oa->o_blksize = cli_brw_size(exp->exp_obd);
518 oa->o_valid |= OBD_MD_FLBLKSZ;
519
520 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
521 * have valid lsm_oinfo data structs, so don't go touching that.
522 * This needs to be fixed in a big way.
523 */
524 lsm->lsm_oi = oa->o_oi;
525 *ea = lsm;
526
527 if (oti != NULL) {
528 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
529
530 if (oa->o_valid & OBD_MD_FLCOOKIE) {
531 if (!oti->oti_logcookies)
532 oti_alloc_cookies(oti, 1);
533 *oti->oti_logcookies = oa->o_lcookie;
534 }
535 }
536
f537dd2c 537 CDEBUG(D_HA, "transno: %lld\n",
d7e09d03
PT
538 lustre_msg_get_transno(req->rq_repmsg));
539out_req:
540 ptlrpc_req_finished(req);
541out:
542 if (rc && !*ea)
543 obd_free_memmd(exp, &lsm);
0a3bdb00 544 return rc;
d7e09d03
PT
545}
546
547int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
548 obd_enqueue_update_f upcall, void *cookie,
549 struct ptlrpc_request_set *rqset)
550{
551 struct ptlrpc_request *req;
552 struct osc_setattr_args *sa;
553 struct ost_body *body;
554 int rc;
d7e09d03
PT
555
556 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
557 if (req == NULL)
0a3bdb00 558 return -ENOMEM;
d7e09d03
PT
559
560 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
561 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
562 if (rc) {
563 ptlrpc_request_free(req);
0a3bdb00 564 return rc;
d7e09d03
PT
565 }
566 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
567 ptlrpc_at_set_req_timeout(req);
568
569 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
570 LASSERT(body);
3b2f75fd 571 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
572 oinfo->oi_oa);
d7e09d03
PT
573 osc_pack_capa(req, body, oinfo->oi_capa);
574
575 ptlrpc_request_set_replen(req);
576
577 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
578 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
579 sa = ptlrpc_req_async_args(req);
580 sa->sa_oa = oinfo->oi_oa;
581 sa->sa_upcall = upcall;
582 sa->sa_cookie = cookie;
583 if (rqset == PTLRPCD_SET)
584 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
585 else
586 ptlrpc_set_add_req(rqset, req);
587
0a3bdb00 588 return 0;
d7e09d03
PT
589}
590
d7e09d03
PT
591static int osc_sync_interpret(const struct lu_env *env,
592 struct ptlrpc_request *req,
593 void *arg, int rc)
594{
595 struct osc_fsync_args *fa = arg;
596 struct ost_body *body;
d7e09d03
PT
597
598 if (rc)
26c4ea46 599 goto out;
d7e09d03
PT
600
601 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602 if (body == NULL) {
603 CERROR ("can't unpack ost_body\n");
26c4ea46
TJ
604 rc = -EPROTO;
605 goto out;
d7e09d03
PT
606 }
607
608 *fa->fa_oi->oi_oa = body->oa;
609out:
610 rc = fa->fa_upcall(fa->fa_cookie, rc);
0a3bdb00 611 return rc;
d7e09d03
PT
612}
613
614int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
615 obd_enqueue_update_f upcall, void *cookie,
616 struct ptlrpc_request_set *rqset)
617{
618 struct ptlrpc_request *req;
619 struct ost_body *body;
620 struct osc_fsync_args *fa;
621 int rc;
d7e09d03
PT
622
623 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
624 if (req == NULL)
0a3bdb00 625 return -ENOMEM;
d7e09d03
PT
626
627 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
628 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
629 if (rc) {
630 ptlrpc_request_free(req);
0a3bdb00 631 return rc;
d7e09d03
PT
632 }
633
634 /* overload the size and blocks fields in the oa with start/end */
635 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 LASSERT(body);
3b2f75fd 637 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
638 oinfo->oi_oa);
d7e09d03
PT
639 osc_pack_capa(req, body, oinfo->oi_capa);
640
641 ptlrpc_request_set_replen(req);
642 req->rq_interpret_reply = osc_sync_interpret;
643
644 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
645 fa = ptlrpc_req_async_args(req);
646 fa->fa_oi = oinfo;
647 fa->fa_upcall = upcall;
648 fa->fa_cookie = cookie;
649
650 if (rqset == PTLRPCD_SET)
651 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
652 else
653 ptlrpc_set_add_req(rqset, req);
654
0a3bdb00 655 return 0;
d7e09d03
PT
656}
657
d7e09d03
PT
658/* Find and cancel locally locks matched by @mode in the resource found by
659 * @objid. Found locks are added into @cancel list. Returns the amount of
660 * locks added to @cancels list. */
661static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
662 struct list_head *cancels,
875332d4 663 ldlm_mode_t mode, __u64 lock_flags)
d7e09d03
PT
664{
665 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
666 struct ldlm_res_id res_id;
667 struct ldlm_resource *res;
668 int count;
d7e09d03
PT
669
670 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671 * export) but disabled through procfs (flag in NS).
672 *
673 * This distinguishes from a case when ELC is not supported originally,
674 * when we still want to cancel locks in advance and just cancel them
675 * locally, without sending any RPC. */
676 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
0a3bdb00 677 return 0;
d7e09d03
PT
678
679 ostid_build_res_name(&oa->o_oi, &res_id);
680 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681 if (res == NULL)
0a3bdb00 682 return 0;
d7e09d03
PT
683
684 LDLM_RESOURCE_ADDREF(res);
685 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686 lock_flags, 0, NULL);
687 LDLM_RESOURCE_DELREF(res);
688 ldlm_resource_putref(res);
0a3bdb00 689 return count;
d7e09d03
PT
690}
691
692static int osc_destroy_interpret(const struct lu_env *env,
693 struct ptlrpc_request *req, void *data,
694 int rc)
695{
696 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698 atomic_dec(&cli->cl_destroy_in_flight);
699 wake_up(&cli->cl_destroy_waitq);
700 return 0;
701}
702
703static int osc_can_send_destroy(struct client_obd *cli)
704{
705 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
706 cli->cl_max_rpcs_in_flight) {
707 /* The destroy request can be sent */
708 return 1;
709 }
710 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
711 cli->cl_max_rpcs_in_flight) {
712 /*
713 * The counter has been modified between the two atomic
714 * operations.
715 */
716 wake_up(&cli->cl_destroy_waitq);
717 }
718 return 0;
719}
720
721int osc_create(const struct lu_env *env, struct obd_export *exp,
722 struct obdo *oa, struct lov_stripe_md **ea,
723 struct obd_trans_info *oti)
724{
725 int rc = 0;
d7e09d03
PT
726
727 LASSERT(oa);
728 LASSERT(ea);
729 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
730
731 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
732 oa->o_flags == OBD_FL_RECREATE_OBJS) {
0a3bdb00 733 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
734 }
735
736 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
0a3bdb00 737 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
738
739 /* we should not get here anymore */
740 LBUG();
741
0a3bdb00 742 return rc;
d7e09d03
PT
743}
744
745/* Destroy requests can be async always on the client, and we don't even really
746 * care about the return code since the client cannot do anything at all about
747 * a destroy failure.
748 * When the MDS is unlinking a filename, it saves the file objects into a
749 * recovery llog, and these object records are cancelled when the OST reports
750 * they were destroyed and sync'd to disk (i.e. transaction committed).
751 * If the client dies, or the OST is down when the object should be destroyed,
752 * the records are not cancelled, and when the OST reconnects to the MDS next,
753 * it will retrieve the llog unlink logs and then sends the log cancellation
754 * cookies to the MDS after committing destroy transactions. */
755static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
756 struct obdo *oa, struct lov_stripe_md *ea,
757 struct obd_trans_info *oti, struct obd_export *md_export,
758 void *capa)
759{
760 struct client_obd *cli = &exp->exp_obd->u.cli;
761 struct ptlrpc_request *req;
762 struct ost_body *body;
763 LIST_HEAD(cancels);
764 int rc, count;
d7e09d03
PT
765
766 if (!oa) {
767 CDEBUG(D_INFO, "oa NULL\n");
0a3bdb00 768 return -EINVAL;
d7e09d03
PT
769 }
770
771 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
772 LDLM_FL_DISCARD_DATA);
773
774 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
775 if (req == NULL) {
776 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 777 return -ENOMEM;
d7e09d03
PT
778 }
779
780 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
781 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
782 0, &cancels, count);
783 if (rc) {
784 ptlrpc_request_free(req);
0a3bdb00 785 return rc;
d7e09d03
PT
786 }
787
788 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
789 ptlrpc_at_set_req_timeout(req);
790
791 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
792 oa->o_lcookie = *oti->oti_logcookies;
793 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
794 LASSERT(body);
3b2f75fd 795 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
796
797 osc_pack_capa(req, body, (struct obd_capa *)capa);
798 ptlrpc_request_set_replen(req);
799
11d66e89 800 /* If osc_destroy is for destroying the unlink orphan,
d7e09d03
PT
801 * sent from MDT to OST, which should not be blocked here,
802 * because the process might be triggered by ptlrpcd, and
803 * it is not good to block ptlrpcd thread (b=16006)*/
804 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
805 req->rq_interpret_reply = osc_destroy_interpret;
806 if (!osc_can_send_destroy(cli)) {
807 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
808 NULL);
809
810 /*
811 * Wait until the number of on-going destroy RPCs drops
812 * under max_rpc_in_flight
813 */
814 l_wait_event_exclusive(cli->cl_destroy_waitq,
815 osc_can_send_destroy(cli), &lwi);
816 }
817 }
818
819 /* Do not wait for response */
820 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
0a3bdb00 821 return 0;
d7e09d03
PT
822}
823
824static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
825 long writing_bytes)
826{
21aef7d9 827 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
d7e09d03
PT
828
829 LASSERT(!(oa->o_valid & bits));
830
831 oa->o_valid |= bits;
832 client_obd_list_lock(&cli->cl_loi_list_lock);
833 oa->o_dirty = cli->cl_dirty;
834 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
835 cli->cl_dirty_max)) {
836 CERROR("dirty %lu - %lu > dirty_max %lu\n",
837 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
838 oa->o_undirty = 0;
c52f69c5 839 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
840 atomic_read(&obd_dirty_transit_pages) >
841 (long)(obd_max_dirty_pages + 1))) {
842 /* The atomic_read() allowing the atomic_inc() are
843 * not covered by a lock thus they may safely race and trip
844 * this CERROR() unless we add in a small fudge factor (+1). */
c52f69c5 845 CERROR("dirty %d - %d > system dirty_max %d\n",
d7e09d03
PT
846 atomic_read(&obd_dirty_pages),
847 atomic_read(&obd_dirty_transit_pages),
848 obd_max_dirty_pages);
849 oa->o_undirty = 0;
850 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
851 CERROR("dirty %lu - dirty_max %lu too big???\n",
852 cli->cl_dirty, cli->cl_dirty_max);
853 oa->o_undirty = 0;
854 } else {
855 long max_in_flight = (cli->cl_max_pages_per_rpc <<
856 PAGE_CACHE_SHIFT)*
857 (cli->cl_max_rpcs_in_flight + 1);
858 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
859 }
860 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
861 oa->o_dropped = cli->cl_lost_grant;
862 cli->cl_lost_grant = 0;
863 client_obd_list_unlock(&cli->cl_loi_list_lock);
1d8cb70c 864 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
d7e09d03
PT
865 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
866
867}
868
869void osc_update_next_shrink(struct client_obd *cli)
870{
871 cli->cl_next_shrink_grant =
872 cfs_time_shift(cli->cl_grant_shrink_interval);
873 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
874 cli->cl_next_shrink_grant);
875}
876
21aef7d9 877static void __osc_update_grant(struct client_obd *cli, u64 grant)
d7e09d03
PT
878{
879 client_obd_list_lock(&cli->cl_loi_list_lock);
880 cli->cl_avail_grant += grant;
881 client_obd_list_unlock(&cli->cl_loi_list_lock);
882}
883
884static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
885{
886 if (body->oa.o_valid & OBD_MD_FLGRANT) {
b0f5aad5 887 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
d7e09d03
PT
888 __osc_update_grant(cli, body->oa.o_grant);
889 }
890}
891
892static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 893 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
894 void *val, struct ptlrpc_request_set *set);
895
896static int osc_shrink_grant_interpret(const struct lu_env *env,
897 struct ptlrpc_request *req,
898 void *aa, int rc)
899{
900 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
f024bad4 901 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
d7e09d03
PT
902 struct ost_body *body;
903
904 if (rc != 0) {
905 __osc_update_grant(cli, oa->o_grant);
26c4ea46 906 goto out;
d7e09d03
PT
907 }
908
909 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
910 LASSERT(body);
911 osc_update_grant(cli, body);
912out:
913 OBDO_FREE(oa);
914 return rc;
915}
916
917static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
918{
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 oa->o_grant = cli->cl_avail_grant / 4;
921 cli->cl_avail_grant -= oa->o_grant;
922 client_obd_list_unlock(&cli->cl_loi_list_lock);
923 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
924 oa->o_valid |= OBD_MD_FLFLAGS;
925 oa->o_flags = 0;
926 }
927 oa->o_flags |= OBD_FL_SHRINK_GRANT;
928 osc_update_next_shrink(cli);
929}
930
931/* Shrink the current grant, either from some large amount to enough for a
932 * full set of in-flight RPCs, or if we have already shrunk to that limit
933 * then to enough for a single RPC. This avoids keeping more grant than
934 * needed, and avoids shrinking the grant piecemeal. */
935static int osc_shrink_grant(struct client_obd *cli)
936{
937 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
938 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
939
940 client_obd_list_lock(&cli->cl_loi_list_lock);
941 if (cli->cl_avail_grant <= target_bytes)
942 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
943 client_obd_list_unlock(&cli->cl_loi_list_lock);
944
945 return osc_shrink_grant_to_target(cli, target_bytes);
946}
947
948int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
949{
950 int rc = 0;
951 struct ost_body *body;
d7e09d03
PT
952
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959
960 if (target_bytes >= cli->cl_avail_grant) {
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
0a3bdb00 962 return 0;
d7e09d03
PT
963 }
964 client_obd_list_unlock(&cli->cl_loi_list_lock);
965
7795178d 966 body = kzalloc(sizeof(*body), GFP_NOFS);
d7e09d03 967 if (!body)
0a3bdb00 968 return -ENOMEM;
d7e09d03
PT
969
970 osc_announce_cached(cli, &body->oa, 0);
971
972 client_obd_list_lock(&cli->cl_loi_list_lock);
973 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974 cli->cl_avail_grant = target_bytes;
975 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977 body->oa.o_valid |= OBD_MD_FLFLAGS;
978 body->oa.o_flags = 0;
979 }
980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981 osc_update_next_shrink(cli);
982
983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985 sizeof(*body), body, NULL);
986 if (rc != 0)
987 __osc_update_grant(cli, body->oa.o_grant);
7795178d 988 kfree(body);
0a3bdb00 989 return rc;
d7e09d03
PT
990}
991
992static int osc_should_shrink_grant(struct client_obd *client)
993{
a649ad1d
GKH
994 unsigned long time = cfs_time_current();
995 unsigned long next_shrink = client->cl_next_shrink_grant;
d7e09d03
PT
996
997 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998 OBD_CONNECT_GRANT_SHRINK) == 0)
999 return 0;
1000
1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006
1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008 client->cl_avail_grant > brw_size)
1009 return 1;
1010 else
1011 osc_update_next_shrink(client);
1012 }
1013 return 0;
1014}
1015
1016static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017{
1018 struct client_obd *client;
1019
1020 list_for_each_entry(client, &item->ti_obd_list,
1021 cl_grant_shrink_list) {
1022 if (osc_should_shrink_grant(client))
1023 osc_shrink_grant(client);
1024 }
1025 return 0;
1026}
1027
1028static int osc_add_shrink_grant(struct client_obd *client)
1029{
1030 int rc;
1031
1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033 TIMEOUT_GRANT,
1034 osc_grant_shrink_grant_cb, NULL,
1035 &client->cl_grant_shrink_list);
1036 if (rc) {
1037 CERROR("add grant client %s error %d\n",
1038 client->cl_import->imp_obd->obd_name, rc);
1039 return rc;
1040 }
1041 CDEBUG(D_CACHE, "add grant client %s \n",
1042 client->cl_import->imp_obd->obd_name);
1043 osc_update_next_shrink(client);
1044 return 0;
1045}
1046
1047static int osc_del_shrink_grant(struct client_obd *client)
1048{
1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050 TIMEOUT_GRANT);
1051}
1052
1053static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1054{
1055 /*
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059 *
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1062 */
1063 client_obd_list_lock(&cli->cl_loi_list_lock);
1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065 cli->cl_avail_grant = ocd->ocd_grant;
1066 else
1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068
1069 if (cli->cl_avail_grant < 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072 ocd->ocd_grant, cli->cl_dirty);
1073 /* workaround for servers which do not have the patch from
1074 * LU-2679 */
1075 cli->cl_avail_grant = ocd->ocd_grant;
1076 }
1077
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
2d00bd17
JP
1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1083 cli->cl_import->imp_obd->obd_name,
1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
d7e09d03
PT
1085
1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087 list_empty(&cli->cl_grant_shrink_list))
1088 osc_add_shrink_grant(cli);
1089}
1090
1091/* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
21aef7d9 1095static void handle_short_read(int nob_read, u32 page_count,
d7e09d03
PT
1096 struct brw_page **pga)
1097{
1098 char *ptr;
1099 int i = 0;
1100
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1104
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110 kunmap(pga[i]->pg);
1111 page_count--;
1112 i++;
1113 break;
1114 }
1115
1116 nob_read -= pga[i]->count;
1117 page_count--;
1118 i++;
1119 }
1120
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1125 kunmap(pga[i]->pg);
1126 i++;
1127 }
1128}
1129
1130static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
21aef7d9 1132 u32 page_count, struct brw_page **pga)
d7e09d03
PT
1133{
1134 int i;
1135 __u32 *remote_rcs;
1136
1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138 sizeof(*remote_rcs) *
1139 niocount);
1140 if (remote_rcs == NULL) {
1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
fbe7c6c7 1142 return -EPROTO;
d7e09d03
PT
1143 }
1144
1145 /* return error if any niobuf was in error */
1146 for (i = 0; i < niocount; i++) {
1147 if ((int)remote_rcs[i] < 0)
e8291974 1148 return remote_rcs[i];
d7e09d03
PT
1149
1150 if (remote_rcs[i] != 0) {
1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152 i, remote_rcs[i], req);
fbe7c6c7 1153 return -EPROTO;
d7e09d03
PT
1154 }
1155 }
1156
1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req->rq_bulk->bd_nob_transferred, requested_nob);
fbe7c6c7 1160 return -EPROTO;
d7e09d03
PT
1161 }
1162
fbe7c6c7 1163 return 0;
d7e09d03
PT
1164}
1165
1166static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167{
1168 if (p1->flag != p2->flag) {
7cf1054b
HE
1169 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1170 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
d7e09d03
PT
1171
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
2d00bd17 1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
d7e09d03
PT
1176 p1->flag, p2->flag);
1177 }
1178 return 0;
1179 }
1180
1181 return (p1->off + p1->count == p2->off);
1182}
1183
21aef7d9 1184static u32 osc_checksum_bulk(int nob, u32 pg_count,
d7e09d03
PT
1185 struct brw_page **pga, int opc,
1186 cksum_type_t cksum_type)
1187{
1188 __u32 cksum;
1189 int i = 0;
1190 struct cfs_crypto_hash_desc *hdesc;
1191 unsigned int bufsize;
1192 int err;
1193 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1194
1195 LASSERT(pg_count > 0);
1196
1197 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198 if (IS_ERR(hdesc)) {
1199 CERROR("Unable to initialize checksum hash %s\n",
1200 cfs_crypto_hash_name(cfs_alg));
1201 return PTR_ERR(hdesc);
1202 }
1203
1204 while (nob > 0 && pg_count > 0) {
1205 int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207 /* corrupt the data before we compute the checksum, to
1208 * simulate an OST->client data error */
1209 if (i == 0 && opc == OST_READ &&
1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211 unsigned char *ptr = kmap(pga[i]->pg);
1212 int off = pga[i]->off & ~CFS_PAGE_MASK;
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 kunmap(pga[i]->pg);
1215 }
1216 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217 pga[i]->off & ~CFS_PAGE_MASK,
1218 count);
aa3bee0d
GKH
1219 CDEBUG(D_PAGE,
1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1222 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1223 page_private(pga[i]->pg),
1224 (int)(pga[i]->off & ~CFS_PAGE_MASK));
d7e09d03
PT
1225
1226 nob -= pga[i]->count;
1227 pg_count--;
1228 i++;
1229 }
1230
1231 bufsize = 4;
1232 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234 if (err)
1235 cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237 /* For sending we only compute the wrong checksum instead
1238 * of corrupting the data so it is still correct on a redo */
1239 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240 cksum++;
1241
1242 return cksum;
1243}
1244
1d8cb70c
GD
1245static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1246 struct obdo *oa,
21aef7d9 1247 struct lov_stripe_md *lsm, u32 page_count,
d7e09d03
PT
1248 struct brw_page **pga,
1249 struct ptlrpc_request **reqp,
1250 struct obd_capa *ocapa, int reserve,
1251 int resend)
1252{
1253 struct ptlrpc_request *req;
1254 struct ptlrpc_bulk_desc *desc;
1255 struct ost_body *body;
1256 struct obd_ioobj *ioobj;
1257 struct niobuf_remote *niobuf;
1258 int niocount, i, requested_nob, opc, rc;
1259 struct osc_brw_async_args *aa;
1260 struct req_capsule *pill;
1261 struct brw_page *pg_prev;
1262
d7e09d03 1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
0a3bdb00 1264 return -ENOMEM; /* Recoverable */
d7e09d03 1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
0a3bdb00 1266 return -EINVAL; /* Fatal */
d7e09d03
PT
1267
1268 if ((cmd & OBD_BRW_WRITE) != 0) {
1269 opc = OST_WRITE;
1270 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271 cli->cl_import->imp_rq_pool,
1272 &RQF_OST_BRW_WRITE);
1273 } else {
1274 opc = OST_READ;
1275 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276 }
1277 if (req == NULL)
0a3bdb00 1278 return -ENOMEM;
d7e09d03
PT
1279
1280 for (niocount = i = 1; i < page_count; i++) {
1281 if (!can_merge_pages(pga[i - 1], pga[i]))
1282 niocount++;
1283 }
1284
1285 pill = &req->rq_pill;
1286 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287 sizeof(*ioobj));
1288 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289 niocount * sizeof(*niobuf));
1290 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293 if (rc) {
1294 ptlrpc_request_free(req);
0a3bdb00 1295 return rc;
d7e09d03
PT
1296 }
1297 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298 ptlrpc_at_set_req_timeout(req);
1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300 * retry logic */
1301 req->rq_no_retry_einprogress = 1;
1302
1303 desc = ptlrpc_prep_bulk_imp(req, page_count,
1304 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306 OST_BULK_PORTAL);
1307
26c4ea46
TJ
1308 if (desc == NULL) {
1309 rc = -ENOMEM;
1310 goto out;
1311 }
d7e09d03
PT
1312 /* NB request now owns desc and will free it when it gets freed */
1313
1314 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1315 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1316 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1317 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1318
3b2f75fd 1319 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1320
1321 obdo_to_ioobj(oa, ioobj);
1322 ioobj->ioo_bufcnt = niocount;
1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324 * that might be send for this request. The actual number is decided
1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326 * "max - 1" for old client compatibility sending "0", and also so the
1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329 osc_pack_capa(req, body, ocapa);
1330 LASSERT(page_count > 0);
1331 pg_prev = pga[0];
1332 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1333 struct brw_page *pg = pga[i];
1334 int poff = pg->off & ~CFS_PAGE_MASK;
1335
1336 LASSERT(pg->count > 0);
1337 /* make sure there is no gap in the middle of page array */
1338 LASSERTF(page_count == 1 ||
1339 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1340 ergo(i > 0 && i < page_count - 1,
1341 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1342 ergo(i == page_count - 1, poff == 0)),
b0f5aad5 1343 "i: %d/%d pg: %p off: %llu, count: %u\n",
d7e09d03
PT
1344 i, page_count, pg, pg->off, pg->count);
1345 LASSERTF(i == 0 || pg->off > pg_prev->off,
2d00bd17 1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
d7e09d03
PT
1347 i, page_count,
1348 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349 pg_prev->pg, page_private(pg_prev->pg),
1350 pg_prev->pg->index, pg_prev->off);
1351 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1352 (pg->flag & OBD_BRW_SRVLOCK));
1353
1354 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1355 requested_nob += pg->count;
1356
1357 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1358 niobuf--;
1359 niobuf->len += pg->count;
1360 } else {
1361 niobuf->offset = pg->off;
1362 niobuf->len = pg->count;
1363 niobuf->flags = pg->flag;
1364 }
1365 pg_prev = pg;
1366 }
1367
1368 LASSERTF((void *)(niobuf - niocount) ==
1369 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1370 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1371 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1372
1373 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1374 if (resend) {
1375 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1376 body->oa.o_valid |= OBD_MD_FLFLAGS;
1377 body->oa.o_flags = 0;
1378 }
1379 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1380 }
1381
1382 if (osc_should_shrink_grant(cli))
1383 osc_shrink_grant_local(cli, &body->oa);
1384
1385 /* size[REQ_REC_OFF] still sizeof (*body) */
1386 if (opc == OST_WRITE) {
1387 if (cli->cl_checksum &&
1388 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389 /* store cl_cksum_type in a local variable since
1390 * it can be changed via lprocfs */
1391 cksum_type_t cksum_type = cli->cl_cksum_type;
1392
1393 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1394 oa->o_flags &= OBD_FL_LOCAL_MASK;
1395 body->oa.o_flags = 0;
1396 }
1397 body->oa.o_flags |= cksum_type_pack(cksum_type);
1398 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1400 page_count, pga,
1401 OST_WRITE,
1402 cksum_type);
1403 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1404 body->oa.o_cksum);
1405 /* save this in 'oa', too, for later checking */
1406 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407 oa->o_flags |= cksum_type_pack(cksum_type);
1408 } else {
1409 /* clear out the checksum flag, in case this is a
1410 * resend but cl_checksum is no longer set. b=11238 */
1411 oa->o_valid &= ~OBD_MD_FLCKSUM;
1412 }
1413 oa->o_cksum = body->oa.o_cksum;
1414 /* 1 RC per niobuf */
1415 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1416 sizeof(__u32) * niocount);
1417 } else {
1418 if (cli->cl_checksum &&
1419 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1420 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1421 body->oa.o_flags = 0;
1422 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1423 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 }
1425 }
1426 ptlrpc_request_set_replen(req);
1427
1428 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1429 aa = ptlrpc_req_async_args(req);
1430 aa->aa_oa = oa;
1431 aa->aa_requested_nob = requested_nob;
1432 aa->aa_nio_count = niocount;
1433 aa->aa_page_count = page_count;
1434 aa->aa_resends = 0;
1435 aa->aa_ppga = pga;
1436 aa->aa_cli = cli;
1437 INIT_LIST_HEAD(&aa->aa_oaps);
1438 if (ocapa && reserve)
1439 aa->aa_ocapa = capa_get(ocapa);
1440
1441 *reqp = req;
0a3bdb00 1442 return 0;
d7e09d03
PT
1443
1444 out:
1445 ptlrpc_req_finished(req);
0a3bdb00 1446 return rc;
d7e09d03
PT
1447}
1448
1449static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1450 __u32 client_cksum, __u32 server_cksum, int nob,
21aef7d9 1451 u32 page_count, struct brw_page **pga,
d7e09d03
PT
1452 cksum_type_t client_cksum_type)
1453{
1454 __u32 new_cksum;
1455 char *msg;
1456 cksum_type_t cksum_type;
1457
1458 if (server_cksum == client_cksum) {
1459 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1460 return 0;
1461 }
1462
1463 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1464 oa->o_flags : 0);
1465 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1466 cksum_type);
1467
1468 if (cksum_type != client_cksum_type)
2d00bd17
JP
1469 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1470 ;
d7e09d03 1471 else if (new_cksum == server_cksum)
2d00bd17
JP
1472 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1473 ;
d7e09d03
PT
1474 else if (new_cksum == client_cksum)
1475 msg = "changed in transit before arrival at OST";
1476 else
2d00bd17
JP
1477 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1478 ;
d7e09d03
PT
1479
1480 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
b0f5aad5 1481 " object "DOSTID" extent [%llu-%llu]\n",
d7e09d03
PT
1482 msg, libcfs_nid2str(peer->nid),
1483 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1485 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1486 POSTID(&oa->o_oi), pga[0]->off,
1487 pga[page_count-1]->off + pga[page_count-1]->count - 1);
2d00bd17
JP
1488 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1489 client_cksum, client_cksum_type,
d7e09d03
PT
1490 server_cksum, cksum_type, new_cksum);
1491 return 1;
1492}
1493
1494/* Note rc enters this function as number of bytes transferred */
1495static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1496{
1497 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1498 const lnet_process_id_t *peer =
1499 &req->rq_import->imp_connection->c_peer;
1500 struct client_obd *cli = aa->aa_cli;
1501 struct ost_body *body;
1502 __u32 client_cksum = 0;
d7e09d03
PT
1503
1504 if (rc < 0 && rc != -EDQUOT) {
1505 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
0a3bdb00 1506 return rc;
d7e09d03
PT
1507 }
1508
1509 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1510 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1511 if (body == NULL) {
1512 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
0a3bdb00 1513 return -EPROTO;
d7e09d03
PT
1514 }
1515
1516 /* set/clear over quota flag for a uid/gid */
1517 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1518 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1519 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1520
55f5a824 1521 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
d7e09d03
PT
1522 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1523 body->oa.o_flags);
1524 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1525 }
1526
1527 osc_update_grant(cli, body);
1528
1529 if (rc < 0)
0a3bdb00 1530 return rc;
d7e09d03
PT
1531
1532 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1533 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1534
1535 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1536 if (rc > 0) {
1537 CERROR("Unexpected +ve rc %d\n", rc);
0a3bdb00 1538 return -EPROTO;
d7e09d03
PT
1539 }
1540 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1541
1542 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
0a3bdb00 1543 return -EAGAIN;
d7e09d03
PT
1544
1545 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1546 check_write_checksum(&body->oa, peer, client_cksum,
1547 body->oa.o_cksum, aa->aa_requested_nob,
1548 aa->aa_page_count, aa->aa_ppga,
1549 cksum_type_unpack(aa->aa_oa->o_flags)))
0a3bdb00 1550 return -EAGAIN;
d7e09d03 1551
1d8cb70c
GD
1552 rc = check_write_rcs(req, aa->aa_requested_nob,
1553 aa->aa_nio_count,
d7e09d03 1554 aa->aa_page_count, aa->aa_ppga);
26c4ea46 1555 goto out;
d7e09d03
PT
1556 }
1557
1558 /* The rest of this function executes only for OST_READs */
1559
1560 /* if unwrap_bulk failed, return -EAGAIN to retry */
1561 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
26c4ea46
TJ
1562 if (rc < 0) {
1563 rc = -EAGAIN;
1564 goto out;
1565 }
d7e09d03
PT
1566
1567 if (rc > aa->aa_requested_nob) {
1568 CERROR("Unexpected rc %d (%d requested)\n", rc,
1569 aa->aa_requested_nob);
0a3bdb00 1570 return -EPROTO;
d7e09d03
PT
1571 }
1572
1573 if (rc != req->rq_bulk->bd_nob_transferred) {
1574 CERROR ("Unexpected rc %d (%d transferred)\n",
1575 rc, req->rq_bulk->bd_nob_transferred);
fbe7c6c7 1576 return -EPROTO;
d7e09d03
PT
1577 }
1578
1579 if (rc < aa->aa_requested_nob)
1580 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1581
1582 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1583 static int cksum_counter;
1584 __u32 server_cksum = body->oa.o_cksum;
1585 char *via;
1586 char *router;
1587 cksum_type_t cksum_type;
1588
1589 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1590 body->oa.o_flags : 0);
1591 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1592 aa->aa_ppga, OST_READ,
1593 cksum_type);
1594
1595 if (peer->nid == req->rq_bulk->bd_sender) {
1596 via = router = "";
1597 } else {
1598 via = " via ";
1599 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1600 }
1601
a2ff0f97 1602 if (server_cksum != client_cksum) {
2d00bd17 1603 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
d7e09d03
PT
1604 req->rq_import->imp_obd->obd_name,
1605 libcfs_nid2str(peer->nid),
1606 via, router,
1607 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1608 body->oa.o_parent_seq : (__u64)0,
d7e09d03 1609 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1610 body->oa.o_parent_oid : 0,
d7e09d03 1611 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1612 body->oa.o_parent_ver : 0,
d7e09d03
PT
1613 POSTID(&body->oa.o_oi),
1614 aa->aa_ppga[0]->off,
1615 aa->aa_ppga[aa->aa_page_count-1]->off +
1616 aa->aa_ppga[aa->aa_page_count-1]->count -
2d00bd17 1617 1);
d7e09d03
PT
1618 CERROR("client %x, server %x, cksum_type %x\n",
1619 client_cksum, server_cksum, cksum_type);
1620 cksum_counter = 0;
1621 aa->aa_oa->o_cksum = client_cksum;
1622 rc = -EAGAIN;
1623 } else {
1624 cksum_counter++;
1625 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1626 rc = 0;
1627 }
1628 } else if (unlikely(client_cksum)) {
1629 static int cksum_missed;
1630
1631 cksum_missed++;
1632 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1633 CERROR("Checksum %u requested from %s but not sent\n",
1634 cksum_missed, libcfs_nid2str(peer->nid));
1635 } else {
1636 rc = 0;
1637 }
1638out:
1639 if (rc >= 0)
3b2f75fd 1640 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1641 aa->aa_oa, &body->oa);
d7e09d03 1642
0a3bdb00 1643 return rc;
d7e09d03
PT
1644}
1645
d7e09d03
PT
1646static int osc_brw_redo_request(struct ptlrpc_request *request,
1647 struct osc_brw_async_args *aa, int rc)
1648{
1649 struct ptlrpc_request *new_req;
1650 struct osc_brw_async_args *new_aa;
1651 struct osc_async_page *oap;
d7e09d03
PT
1652
1653 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1654 "redo for recoverable error %d", rc);
1655
1656 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1657 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1658 aa->aa_cli, aa->aa_oa,
1659 NULL /* lsm unused by osc currently */,
1660 aa->aa_page_count, aa->aa_ppga,
1661 &new_req, aa->aa_ocapa, 0, 1);
1662 if (rc)
0a3bdb00 1663 return rc;
d7e09d03
PT
1664
1665 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1666 if (oap->oap_request != NULL) {
1667 LASSERTF(request == oap->oap_request,
1668 "request %p != oap_request %p\n",
1669 request, oap->oap_request);
1670 if (oap->oap_interrupted) {
1671 ptlrpc_req_finished(new_req);
0a3bdb00 1672 return -EINTR;
d7e09d03
PT
1673 }
1674 }
1675 }
1676 /* New request takes over pga and oaps from old request.
1677 * Note that copying a list_head doesn't work, need to move it... */
1678 aa->aa_resends++;
1679 new_req->rq_interpret_reply = request->rq_interpret_reply;
1680 new_req->rq_async_args = request->rq_async_args;
d7e09d03
PT
1681 /* cap resend delay to the current request timeout, this is similar to
1682 * what ptlrpc does (see after_reply()) */
1683 if (aa->aa_resends > new_req->rq_timeout)
7264b8a5 1684 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
d7e09d03 1685 else
7264b8a5 1686 new_req->rq_sent = get_seconds() + aa->aa_resends;
d7e09d03
PT
1687 new_req->rq_generation_set = 1;
1688 new_req->rq_import_generation = request->rq_import_generation;
1689
1690 new_aa = ptlrpc_req_async_args(new_req);
1691
1692 INIT_LIST_HEAD(&new_aa->aa_oaps);
1693 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1694 INIT_LIST_HEAD(&new_aa->aa_exts);
1695 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1696 new_aa->aa_resends = aa->aa_resends;
1697
1698 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1699 if (oap->oap_request) {
1700 ptlrpc_req_finished(oap->oap_request);
1701 oap->oap_request = ptlrpc_request_addref(new_req);
1702 }
1703 }
1704
1705 new_aa->aa_ocapa = aa->aa_ocapa;
1706 aa->aa_ocapa = NULL;
1707
1708 /* XXX: This code will run into problem if we're going to support
1709 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1710 * and wait for all of them to be finished. We should inherit request
1711 * set from old request. */
1712 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1713
1714 DEBUG_REQ(D_INFO, new_req, "new request");
0a3bdb00 1715 return 0;
d7e09d03
PT
1716}
1717
1718/*
1719 * ugh, we want disk allocation on the target to happen in offset order. we'll
1720 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1721 * fine for our small page arrays and doesn't require allocation. its an
1722 * insertion sort that swaps elements that are strides apart, shrinking the
1723 * stride down until its '1' and the array is sorted.
1724 */
1725static void sort_brw_pages(struct brw_page **array, int num)
1726{
1727 int stride, i, j;
1728 struct brw_page *tmp;
1729
1730 if (num == 1)
1731 return;
1732 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1733 ;
1734
1735 do {
1736 stride /= 3;
1737 for (i = stride ; i < num ; i++) {
1738 tmp = array[i];
1739 j = i;
1740 while (j >= stride && array[j - stride]->off > tmp->off) {
1741 array[j] = array[j - stride];
1742 j -= stride;
1743 }
1744 array[j] = tmp;
1745 }
1746 } while (stride > 1);
1747}
1748
21aef7d9 1749static void osc_release_ppga(struct brw_page **ppga, u32 count)
d7e09d03
PT
1750{
1751 LASSERT(ppga != NULL);
7795178d 1752 kfree(ppga);
d7e09d03
PT
1753}
1754
d7e09d03
PT
1755static int brw_interpret(const struct lu_env *env,
1756 struct ptlrpc_request *req, void *data, int rc)
1757{
1758 struct osc_brw_async_args *aa = data;
1759 struct osc_extent *ext;
1760 struct osc_extent *tmp;
1761 struct cl_object *obj = NULL;
1762 struct client_obd *cli = aa->aa_cli;
d7e09d03
PT
1763
1764 rc = osc_brw_fini_request(req, rc);
1765 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1766 /* When server return -EINPROGRESS, client should always retry
1767 * regardless of the number of times the bulk was resent already. */
1768 if (osc_recoverable_error(rc)) {
1769 if (req->rq_import_generation !=
1770 req->rq_import->imp_generation) {
2d00bd17 1771 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
d7e09d03
PT
1772 req->rq_import->imp_obd->obd_name,
1773 POSTID(&aa->aa_oa->o_oi), rc);
1774 } else if (rc == -EINPROGRESS ||
1775 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1776 rc = osc_brw_redo_request(req, aa, rc);
1777 } else {
b0f5aad5 1778 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
d7e09d03
PT
1779 req->rq_import->imp_obd->obd_name,
1780 POSTID(&aa->aa_oa->o_oi), rc);
1781 }
1782
1783 if (rc == 0)
0a3bdb00 1784 return 0;
d7e09d03
PT
1785 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1786 rc = -EIO;
1787 }
1788
1789 if (aa->aa_ocapa) {
1790 capa_put(aa->aa_ocapa);
1791 aa->aa_ocapa = NULL;
1792 }
1793
1794 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1795 if (obj == NULL && rc == 0) {
1796 obj = osc2cl(ext->oe_obj);
1797 cl_object_get(obj);
1798 }
1799
1800 list_del_init(&ext->oe_link);
1801 osc_extent_finish(env, ext, 1, rc);
1802 }
1803 LASSERT(list_empty(&aa->aa_exts));
1804 LASSERT(list_empty(&aa->aa_oaps));
1805
1806 if (obj != NULL) {
1807 struct obdo *oa = aa->aa_oa;
1808 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1809 unsigned long valid = 0;
1810
1811 LASSERT(rc == 0);
1812 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1813 attr->cat_blocks = oa->o_blocks;
1814 valid |= CAT_BLOCKS;
1815 }
1816 if (oa->o_valid & OBD_MD_FLMTIME) {
1817 attr->cat_mtime = oa->o_mtime;
1818 valid |= CAT_MTIME;
1819 }
1820 if (oa->o_valid & OBD_MD_FLATIME) {
1821 attr->cat_atime = oa->o_atime;
1822 valid |= CAT_ATIME;
1823 }
1824 if (oa->o_valid & OBD_MD_FLCTIME) {
1825 attr->cat_ctime = oa->o_ctime;
1826 valid |= CAT_CTIME;
1827 }
1828 if (valid != 0) {
1829 cl_object_attr_lock(obj);
1830 cl_object_attr_set(env, obj, attr, valid);
1831 cl_object_attr_unlock(obj);
1832 }
1833 cl_object_put(env, obj);
1834 }
1835 OBDO_FREE(aa->aa_oa);
1836
1837 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1838 req->rq_bulk->bd_nob_transferred);
1839 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1840 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1841
1842 client_obd_list_lock(&cli->cl_loi_list_lock);
1843 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1844 * is called so we know whether to go to sync BRWs or wait for more
1845 * RPCs to complete */
1846 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1847 cli->cl_w_in_flight--;
1848 else
1849 cli->cl_r_in_flight--;
1850 osc_wake_cache_waiters(cli);
1851 client_obd_list_unlock(&cli->cl_loi_list_lock);
1852
1853 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
0a3bdb00 1854 return rc;
d7e09d03
PT
1855}
1856
d7e09d03
PT
1857/**
1858 * Build an RPC by the list of extent @ext_list. The caller must ensure
1859 * that the total pages in this list are NOT over max pages per RPC.
1860 * Extents in the list must be in OES_RPC state.
1861 */
1862int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1863 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1864{
cad6fafa
BJ
1865 struct ptlrpc_request *req = NULL;
1866 struct osc_extent *ext;
1867 struct brw_page **pga = NULL;
1868 struct osc_brw_async_args *aa = NULL;
1869 struct obdo *oa = NULL;
1870 struct osc_async_page *oap;
1871 struct osc_async_page *tmp;
1872 struct cl_req *clerq = NULL;
1873 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1874 CRT_READ;
1875 struct ldlm_lock *lock = NULL;
1876 struct cl_req_attr *crattr = NULL;
21aef7d9
OD
1877 u64 starting_offset = OBD_OBJECT_EOF;
1878 u64 ending_offset = 0;
cad6fafa
BJ
1879 int mpflag = 0;
1880 int mem_tight = 0;
1881 int page_count = 0;
1882 int i;
1883 int rc;
3ce08cd7 1884 struct ost_body *body;
d7e09d03 1885 LIST_HEAD(rpc_list);
d7e09d03 1886
d7e09d03
PT
1887 LASSERT(!list_empty(ext_list));
1888
1889 /* add pages into rpc_list to build BRW rpc */
1890 list_for_each_entry(ext, ext_list, oe_link) {
1891 LASSERT(ext->oe_state == OES_RPC);
1892 mem_tight |= ext->oe_memalloc;
1893 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1894 ++page_count;
1895 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1896 if (starting_offset > oap->oap_obj_off)
1897 starting_offset = oap->oap_obj_off;
1898 else
1899 LASSERT(oap->oap_page_off == 0);
1900 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1901 ending_offset = oap->oap_obj_off +
1902 oap->oap_count;
1903 else
1904 LASSERT(oap->oap_page_off + oap->oap_count ==
1905 PAGE_CACHE_SIZE);
1906 }
1907 }
1908
1909 if (mem_tight)
1910 mpflag = cfs_memory_pressure_get_and_set();
1911
7795178d 1912 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
26c4ea46
TJ
1913 if (crattr == NULL) {
1914 rc = -ENOMEM;
1915 goto out;
1916 }
cad6fafa 1917
7795178d 1918 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
26c4ea46
TJ
1919 if (pga == NULL) {
1920 rc = -ENOMEM;
1921 goto out;
1922 }
d7e09d03
PT
1923
1924 OBDO_ALLOC(oa);
26c4ea46
TJ
1925 if (oa == NULL) {
1926 rc = -ENOMEM;
1927 goto out;
1928 }
d7e09d03
PT
1929
1930 i = 0;
1931 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1932 struct cl_page *page = oap2cl_page(oap);
1933 if (clerq == NULL) {
1934 clerq = cl_req_alloc(env, page, crt,
cad6fafa 1935 1 /* only 1-object rpcs for now */);
26c4ea46
TJ
1936 if (IS_ERR(clerq)) {
1937 rc = PTR_ERR(clerq);
1938 goto out;
1939 }
d7e09d03
PT
1940 lock = oap->oap_ldlm_lock;
1941 }
1942 if (mem_tight)
1943 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1944 pga[i] = &oap->oap_brw_page;
1945 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1946 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
cad6fafa
BJ
1947 pga[i]->pg, page_index(oap->oap_page), oap,
1948 pga[i]->flag);
d7e09d03
PT
1949 i++;
1950 cl_req_page_add(env, clerq, page);
1951 }
1952
1953 /* always get the data for the obdo for the rpc */
1954 LASSERT(clerq != NULL);
cad6fafa
BJ
1955 crattr->cra_oa = oa;
1956 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
1957 if (lock) {
1958 oa->o_handle = lock->l_remote_handle;
1959 oa->o_valid |= OBD_MD_FLHANDLE;
1960 }
1961
1962 rc = cl_req_prep(env, clerq);
1963 if (rc != 0) {
1964 CERROR("cl_req_prep failed: %d\n", rc);
26c4ea46 1965 goto out;
d7e09d03
PT
1966 }
1967
1968 sort_brw_pages(pga, page_count);
1969 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
cad6fafa 1970 pga, &req, crattr->cra_capa, 1, 0);
d7e09d03
PT
1971 if (rc != 0) {
1972 CERROR("prep_req failed: %d\n", rc);
26c4ea46 1973 goto out;
d7e09d03
PT
1974 }
1975
d7e09d03
PT
1976 req->rq_interpret_reply = brw_interpret;
1977
1978 if (mem_tight != 0)
1979 req->rq_memalloc = 1;
1980
1981 /* Need to update the timestamps after the request is built in case
1982 * we race with setattr (locally or in queue at OST). If OST gets
1983 * later setattr before earlier BRW (as determined by the request xid),
1984 * the OST will not use BRW timestamps. Sadly, there is no obvious
1985 * way to do this in a single call. bug 10150 */
3ce08cd7
NY
1986 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1987 crattr->cra_oa = &body->oa;
cad6fafa 1988 cl_req_attr_set(env, clerq, crattr,
d7e09d03
PT
1989 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1990
cad6fafa 1991 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
1992
1993 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1994 aa = ptlrpc_req_async_args(req);
1995 INIT_LIST_HEAD(&aa->aa_oaps);
1996 list_splice_init(&rpc_list, &aa->aa_oaps);
1997 INIT_LIST_HEAD(&aa->aa_exts);
1998 list_splice_init(ext_list, &aa->aa_exts);
1999 aa->aa_clerq = clerq;
2000
2001 /* queued sync pages can be torn down while the pages
2002 * were between the pending list and the rpc */
2003 tmp = NULL;
2004 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2005 /* only one oap gets a request reference */
2006 if (tmp == NULL)
2007 tmp = oap;
2008 if (oap->oap_interrupted && !req->rq_intr) {
2009 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2010 oap, req);
2011 ptlrpc_mark_interrupted(req);
2012 }
2013 }
2014 if (tmp != NULL)
2015 tmp->oap_request = ptlrpc_request_addref(req);
2016
2017 client_obd_list_lock(&cli->cl_loi_list_lock);
2018 starting_offset >>= PAGE_CACHE_SHIFT;
2019 if (cmd == OBD_BRW_READ) {
2020 cli->cl_r_in_flight++;
2021 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2022 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2023 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2024 starting_offset + 1);
2025 } else {
2026 cli->cl_w_in_flight++;
2027 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2028 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2029 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2030 starting_offset + 1);
2031 }
2032 client_obd_list_unlock(&cli->cl_loi_list_lock);
2033
2034 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2035 page_count, aa, cli->cl_r_in_flight,
2036 cli->cl_w_in_flight);
2037
2038 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2039 * see which CPU/NUMA node the majority of pages were allocated
2040 * on, and try to assign the async RPC to the CPU core
2041 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2042 *
2043 * But on the other hand, we expect that multiple ptlrpcd
2044 * threads and the initial write sponsor can run in parallel,
2045 * especially when data checksum is enabled, which is CPU-bound
2046 * operation and single ptlrpcd thread cannot process in time.
2047 * So more ptlrpcd threads sharing BRW load
2048 * (with PDL_POLICY_ROUND) seems better.
2049 */
2050 ptlrpcd_add_req(req, pol, -1);
2051 rc = 0;
d7e09d03
PT
2052
2053out:
2054 if (mem_tight != 0)
2055 cfs_memory_pressure_restore(mpflag);
2056
cad6fafa
BJ
2057 if (crattr != NULL) {
2058 capa_put(crattr->cra_capa);
7795178d 2059 kfree(crattr);
cad6fafa
BJ
2060 }
2061
d7e09d03
PT
2062 if (rc != 0) {
2063 LASSERT(req == NULL);
2064
2065 if (oa)
2066 OBDO_FREE(oa);
59e267c0 2067 kfree(pga);
d7e09d03
PT
2068 /* this should happen rarely and is pretty bad, it makes the
2069 * pending list not follow the dirty order */
2070 while (!list_empty(ext_list)) {
2071 ext = list_entry(ext_list->next, struct osc_extent,
2072 oe_link);
2073 list_del_init(&ext->oe_link);
2074 osc_extent_finish(env, ext, 0, rc);
2075 }
2076 if (clerq && !IS_ERR(clerq))
2077 cl_req_completion(env, clerq, rc);
2078 }
0a3bdb00 2079 return rc;
d7e09d03
PT
2080}
2081
2082static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2083 struct ldlm_enqueue_info *einfo)
2084{
2085 void *data = einfo->ei_cbdata;
2086 int set = 0;
2087
2088 LASSERT(lock != NULL);
2089 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2090 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2091 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2092 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2093
2094 lock_res_and_lock(lock);
2095 spin_lock(&osc_ast_guard);
2096
2097 if (lock->l_ast_data == NULL)
2098 lock->l_ast_data = data;
2099 if (lock->l_ast_data == data)
2100 set = 1;
2101
2102 spin_unlock(&osc_ast_guard);
2103 unlock_res_and_lock(lock);
2104
2105 return set;
2106}
2107
2108static int osc_set_data_with_check(struct lustre_handle *lockh,
2109 struct ldlm_enqueue_info *einfo)
2110{
2111 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2112 int set = 0;
2113
2114 if (lock != NULL) {
2115 set = osc_set_lock_data_with_check(lock, einfo);
2116 LDLM_LOCK_PUT(lock);
2117 } else
2118 CERROR("lockh %p, data %p - client evicted?\n",
2119 lockh, einfo->ei_cbdata);
2120 return set;
2121}
2122
d7e09d03
PT
2123/* find any ldlm lock of the inode in osc
2124 * return 0 not find
2125 * 1 find one
2126 * < 0 error */
2127static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2128 ldlm_iterator_t replace, void *data)
2129{
2130 struct ldlm_res_id res_id;
2131 struct obd_device *obd = class_exp2obd(exp);
2132 int rc = 0;
2133
2134 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2136 if (rc == LDLM_ITER_STOP)
fbe7c6c7 2137 return 1;
d7e09d03 2138 if (rc == LDLM_ITER_CONTINUE)
fbe7c6c7
JL
2139 return 0;
2140 return rc;
d7e09d03
PT
2141}
2142
2143static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2144 obd_enqueue_update_f upcall, void *cookie,
2145 __u64 *flags, int agl, int rc)
2146{
2147 int intent = *flags & LDLM_FL_HAS_INTENT;
d7e09d03
PT
2148
2149 if (intent) {
2150 /* The request was created before ldlm_cli_enqueue call. */
2151 if (rc == ELDLM_LOCK_ABORTED) {
2152 struct ldlm_reply *rep;
2153 rep = req_capsule_server_get(&req->rq_pill,
2154 &RMF_DLM_REP);
2155
2156 LASSERT(rep != NULL);
2d58de78
LW
2157 rep->lock_policy_res1 =
2158 ptlrpc_status_ntoh(rep->lock_policy_res1);
d7e09d03
PT
2159 if (rep->lock_policy_res1)
2160 rc = rep->lock_policy_res1;
2161 }
2162 }
2163
2164 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2165 (rc == 0)) {
2166 *flags |= LDLM_FL_LVB_READY;
1d8cb70c 2167 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
d7e09d03
PT
2168 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2169 }
2170
2171 /* Call the update callback. */
2172 rc = (*upcall)(cookie, rc);
0a3bdb00 2173 return rc;
d7e09d03
PT
2174}
2175
2176static int osc_enqueue_interpret(const struct lu_env *env,
2177 struct ptlrpc_request *req,
2178 struct osc_enqueue_args *aa, int rc)
2179{
2180 struct ldlm_lock *lock;
2181 struct lustre_handle handle;
2182 __u32 mode;
2183 struct ost_lvb *lvb;
2184 __u32 lvb_len;
2185 __u64 *flags = aa->oa_flags;
2186
2187 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2188 * might be freed anytime after lock upcall has been called. */
2189 lustre_handle_copy(&handle, aa->oa_lockh);
2190 mode = aa->oa_ei->ei_mode;
2191
2192 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193 * be valid. */
2194 lock = ldlm_handle2lock(&handle);
2195
2196 /* Take an additional reference so that a blocking AST that
2197 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2198 * to arrive after an upcall has been executed by
2199 * osc_enqueue_fini(). */
2200 ldlm_lock_addref(&handle, mode);
2201
2202 /* Let CP AST to grant the lock first. */
2203 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2204
2205 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2206 lvb = NULL;
2207 lvb_len = 0;
2208 } else {
2209 lvb = aa->oa_lvb;
2210 lvb_len = sizeof(*aa->oa_lvb);
2211 }
2212
2213 /* Complete obtaining the lock procedure. */
2214 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2215 mode, flags, lvb, lvb_len, &handle, rc);
2216 /* Complete osc stuff. */
2217 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2218 flags, aa->oa_agl, rc);
2219
2220 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2221
2222 /* Release the lock for async request. */
2223 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2224 /*
2225 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2226 * not already released by
2227 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2228 */
2229 ldlm_lock_decref(&handle, mode);
2230
2231 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2232 aa->oa_lockh, req, aa);
2233 ldlm_lock_decref(&handle, mode);
2234 LDLM_LOCK_PUT(lock);
2235 return rc;
2236}
2237
d7e09d03
PT
2238struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2239
2240/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2241 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2242 * other synchronous requests, however keeping some locks and trying to obtain
2243 * others may take a considerable amount of time in a case of ost failure; and
2244 * when other sync requests do not get released lock from a client, the client
2245 * is excluded from the cluster -- such scenarious make the life difficult, so
2246 * release locks just after they are obtained. */
2247int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2248 __u64 *flags, ldlm_policy_data_t *policy,
2249 struct ost_lvb *lvb, int kms_valid,
2250 obd_enqueue_update_f upcall, void *cookie,
2251 struct ldlm_enqueue_info *einfo,
2252 struct lustre_handle *lockh,
2253 struct ptlrpc_request_set *rqset, int async, int agl)
2254{
2255 struct obd_device *obd = exp->exp_obd;
2256 struct ptlrpc_request *req = NULL;
2257 int intent = *flags & LDLM_FL_HAS_INTENT;
875332d4 2258 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
d7e09d03
PT
2259 ldlm_mode_t mode;
2260 int rc;
d7e09d03
PT
2261
2262 /* Filesystem lock extents are extended to page boundaries so that
2263 * dealing with the page cache is a little smoother. */
2264 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2265 policy->l_extent.end |= ~CFS_PAGE_MASK;
2266
2267 /*
2268 * kms is not valid when either object is completely fresh (so that no
2269 * locks are cached), or object was evicted. In the latter case cached
2270 * lock cannot be used, because it would prime inode state with
2271 * potentially stale LVB.
2272 */
2273 if (!kms_valid)
2274 goto no_match;
2275
2276 /* Next, search for already existing extent locks that will cover us */
2277 /* If we're trying to read, we also search for an existing PW lock. The
2278 * VFS and page cache already protect us locally, so lots of readers/
2279 * writers can share a single PW lock.
2280 *
2281 * There are problems with conversion deadlocks, so instead of
2282 * converting a read lock to a write lock, we'll just enqueue a new
2283 * one.
2284 *
2285 * At some point we should cancel the read lock instead of making them
2286 * send us a blocking callback, but there are problems with canceling
2287 * locks out from other users right now, too. */
2288 mode = einfo->ei_mode;
2289 if (einfo->ei_mode == LCK_PR)
2290 mode |= LCK_PW;
2291 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2292 einfo->ei_type, policy, mode, lockh, 0);
2293 if (mode) {
2294 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2295
2296 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2297 /* For AGL, if enqueue RPC is sent but the lock is not
2298 * granted, then skip to process this strpe.
2299 * Return -ECANCELED to tell the caller. */
2300 ldlm_lock_decref(lockh, mode);
2301 LDLM_LOCK_PUT(matched);
0a3bdb00 2302 return -ECANCELED;
d7e09d03
PT
2303 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2304 *flags |= LDLM_FL_LVB_READY;
2305 /* addref the lock only if not async requests and PW
2306 * lock is matched whereas we asked for PR. */
2307 if (!rqset && einfo->ei_mode != mode)
2308 ldlm_lock_addref(lockh, LCK_PR);
2309 if (intent) {
2310 /* I would like to be able to ASSERT here that
2311 * rss <= kms, but I can't, for reasons which
2312 * are explained in lov_enqueue() */
2313 }
2314
2315 /* We already have a lock, and it's referenced.
2316 *
2317 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2318 * AGL upcall may change it to CLS_HELD directly. */
2319 (*upcall)(cookie, ELDLM_OK);
2320
2321 if (einfo->ei_mode != mode)
2322 ldlm_lock_decref(lockh, LCK_PW);
2323 else if (rqset)
2324 /* For async requests, decref the lock. */
2325 ldlm_lock_decref(lockh, einfo->ei_mode);
2326 LDLM_LOCK_PUT(matched);
0a3bdb00 2327 return ELDLM_OK;
d7e09d03
PT
2328 } else {
2329 ldlm_lock_decref(lockh, mode);
2330 LDLM_LOCK_PUT(matched);
2331 }
2332 }
2333
2334 no_match:
2335 if (intent) {
2336 LIST_HEAD(cancels);
2337 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2338 &RQF_LDLM_ENQUEUE_LVB);
2339 if (req == NULL)
0a3bdb00 2340 return -ENOMEM;
d7e09d03
PT
2341
2342 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2343 if (rc) {
2344 ptlrpc_request_free(req);
0a3bdb00 2345 return rc;
d7e09d03
PT
2346 }
2347
2348 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
ec83e611 2349 sizeof(*lvb));
d7e09d03
PT
2350 ptlrpc_request_set_replen(req);
2351 }
2352
2353 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2354 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2355
2356 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2357 sizeof(*lvb), LVB_T_OST, lockh, async);
2358 if (rqset) {
2359 if (!rc) {
2360 struct osc_enqueue_args *aa;
2361 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2362 aa = ptlrpc_req_async_args(req);
2363 aa->oa_ei = einfo;
2364 aa->oa_exp = exp;
2365 aa->oa_flags = flags;
2366 aa->oa_upcall = upcall;
2367 aa->oa_cookie = cookie;
2368 aa->oa_lvb = lvb;
2369 aa->oa_lockh = lockh;
2370 aa->oa_agl = !!agl;
2371
2372 req->rq_interpret_reply =
2373 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2374 if (rqset == PTLRPCD_SET)
2375 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2376 else
2377 ptlrpc_set_add_req(rqset, req);
2378 } else if (intent) {
2379 ptlrpc_req_finished(req);
2380 }
0a3bdb00 2381 return rc;
d7e09d03
PT
2382 }
2383
2384 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2385 if (intent)
2386 ptlrpc_req_finished(req);
2387
0a3bdb00 2388 return rc;
d7e09d03
PT
2389}
2390
d7e09d03
PT
2391int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2392 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
875332d4 2393 __u64 *flags, void *data, struct lustre_handle *lockh,
d7e09d03
PT
2394 int unref)
2395{
2396 struct obd_device *obd = exp->exp_obd;
875332d4 2397 __u64 lflags = *flags;
d7e09d03 2398 ldlm_mode_t rc;
d7e09d03
PT
2399
2400 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
0a3bdb00 2401 return -EIO;
d7e09d03
PT
2402
2403 /* Filesystem lock extents are extended to page boundaries so that
2404 * dealing with the page cache is a little smoother */
2405 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2406 policy->l_extent.end |= ~CFS_PAGE_MASK;
2407
2408 /* Next, search for already existing extent locks that will cover us */
2409 /* If we're trying to read, we also search for an existing PW lock. The
2410 * VFS and page cache already protect us locally, so lots of readers/
2411 * writers can share a single PW lock. */
2412 rc = mode;
2413 if (mode == LCK_PR)
2414 rc |= LCK_PW;
2415 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2416 res_id, type, policy, rc, lockh, unref);
2417 if (rc) {
2418 if (data != NULL) {
2419 if (!osc_set_data_with_check(lockh, data)) {
2420 if (!(lflags & LDLM_FL_TEST_LOCK))
2421 ldlm_lock_decref(lockh, rc);
0a3bdb00 2422 return 0;
d7e09d03
PT
2423 }
2424 }
2425 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2426 ldlm_lock_addref(lockh, LCK_PR);
2427 ldlm_lock_decref(lockh, LCK_PW);
2428 }
0a3bdb00 2429 return rc;
d7e09d03 2430 }
0a3bdb00 2431 return rc;
d7e09d03
PT
2432}
2433
2434int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2435{
d7e09d03
PT
2436 if (unlikely(mode == LCK_GROUP))
2437 ldlm_lock_decref_and_cancel(lockh, mode);
2438 else
2439 ldlm_lock_decref(lockh, mode);
2440
0a3bdb00 2441 return 0;
d7e09d03
PT
2442}
2443
d7e09d03
PT
2444static int osc_statfs_interpret(const struct lu_env *env,
2445 struct ptlrpc_request *req,
2446 struct osc_async_args *aa, int rc)
2447{
2448 struct obd_statfs *msfs;
d7e09d03
PT
2449
2450 if (rc == -EBADR)
2451 /* The request has in fact never been sent
2452 * due to issues at a higher level (LOV).
2453 * Exit immediately since the caller is
2454 * aware of the problem and takes care
2455 * of the clean up */
0a3bdb00 2456 return rc;
d7e09d03
PT
2457
2458 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
26c4ea46
TJ
2459 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2460 rc = 0;
2461 goto out;
2462 }
d7e09d03
PT
2463
2464 if (rc != 0)
26c4ea46 2465 goto out;
d7e09d03
PT
2466
2467 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2468 if (msfs == NULL) {
26c4ea46
TJ
2469 rc = -EPROTO;
2470 goto out;
d7e09d03
PT
2471 }
2472
2473 *aa->aa_oi->oi_osfs = *msfs;
2474out:
2475 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 2476 return rc;
d7e09d03
PT
2477}
2478
2479static int osc_statfs_async(struct obd_export *exp,
2480 struct obd_info *oinfo, __u64 max_age,
2481 struct ptlrpc_request_set *rqset)
2482{
2483 struct obd_device *obd = class_exp2obd(exp);
2484 struct ptlrpc_request *req;
2485 struct osc_async_args *aa;
2486 int rc;
d7e09d03
PT
2487
2488 /* We could possibly pass max_age in the request (as an absolute
2489 * timestamp or a "seconds.usec ago") so the target can avoid doing
2490 * extra calls into the filesystem if that isn't necessary (e.g.
2491 * during mount that would help a bit). Having relative timestamps
2492 * is not so great if request processing is slow, while absolute
2493 * timestamps are not ideal because they need time synchronization. */
2494 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2495 if (req == NULL)
0a3bdb00 2496 return -ENOMEM;
d7e09d03
PT
2497
2498 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499 if (rc) {
2500 ptlrpc_request_free(req);
0a3bdb00 2501 return rc;
d7e09d03
PT
2502 }
2503 ptlrpc_request_set_replen(req);
2504 req->rq_request_portal = OST_CREATE_PORTAL;
2505 ptlrpc_at_set_req_timeout(req);
2506
2507 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2508 /* procfs requests not want stat in wait for avoid deadlock */
2509 req->rq_no_resend = 1;
2510 req->rq_no_delay = 1;
2511 }
2512
2513 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2514 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2515 aa = ptlrpc_req_async_args(req);
2516 aa->aa_oi = oinfo;
2517
2518 ptlrpc_set_add_req(rqset, req);
0a3bdb00 2519 return 0;
d7e09d03
PT
2520}
2521
2522static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2523 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2524{
2525 struct obd_device *obd = class_exp2obd(exp);
2526 struct obd_statfs *msfs;
2527 struct ptlrpc_request *req;
2528 struct obd_import *imp = NULL;
2529 int rc;
d7e09d03
PT
2530
2531 /*Since the request might also come from lprocfs, so we need
2532 *sync this with client_disconnect_export Bug15684*/
2533 down_read(&obd->u.cli.cl_sem);
2534 if (obd->u.cli.cl_import)
2535 imp = class_import_get(obd->u.cli.cl_import);
2536 up_read(&obd->u.cli.cl_sem);
2537 if (!imp)
0a3bdb00 2538 return -ENODEV;
d7e09d03
PT
2539
2540 /* We could possibly pass max_age in the request (as an absolute
2541 * timestamp or a "seconds.usec ago") so the target can avoid doing
2542 * extra calls into the filesystem if that isn't necessary (e.g.
2543 * during mount that would help a bit). Having relative timestamps
2544 * is not so great if request processing is slow, while absolute
2545 * timestamps are not ideal because they need time synchronization. */
2546 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2547
2548 class_import_put(imp);
2549
2550 if (req == NULL)
0a3bdb00 2551 return -ENOMEM;
d7e09d03
PT
2552
2553 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2554 if (rc) {
2555 ptlrpc_request_free(req);
0a3bdb00 2556 return rc;
d7e09d03
PT
2557 }
2558 ptlrpc_request_set_replen(req);
2559 req->rq_request_portal = OST_CREATE_PORTAL;
2560 ptlrpc_at_set_req_timeout(req);
2561
2562 if (flags & OBD_STATFS_NODELAY) {
2563 /* procfs requests not want stat in wait for avoid deadlock */
2564 req->rq_no_resend = 1;
2565 req->rq_no_delay = 1;
2566 }
2567
2568 rc = ptlrpc_queue_wait(req);
2569 if (rc)
26c4ea46 2570 goto out;
d7e09d03
PT
2571
2572 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2573 if (msfs == NULL) {
26c4ea46
TJ
2574 rc = -EPROTO;
2575 goto out;
d7e09d03
PT
2576 }
2577
2578 *osfs = *msfs;
2579
d7e09d03
PT
2580 out:
2581 ptlrpc_req_finished(req);
2582 return rc;
2583}
2584
2585/* Retrieve object striping information.
2586 *
2587 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2588 * the maximum number of OST indices which will fit in the user buffer.
2589 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2590 */
2591static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2592{
2593 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2594 struct lov_user_md_v3 lum, *lumk;
2595 struct lov_user_ost_data_v1 *lmm_objects;
2596 int rc = 0, lum_size;
d7e09d03
PT
2597
2598 if (!lsm)
0a3bdb00 2599 return -ENODATA;
d7e09d03
PT
2600
2601 /* we only need the header part from user space to get lmm_magic and
2602 * lmm_stripe_count, (the header part is common to v1 and v3) */
2603 lum_size = sizeof(struct lov_user_md_v1);
2604 if (copy_from_user(&lum, lump, lum_size))
0a3bdb00 2605 return -EFAULT;
d7e09d03
PT
2606
2607 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2608 (lum.lmm_magic != LOV_USER_MAGIC_V3))
0a3bdb00 2609 return -EINVAL;
d7e09d03
PT
2610
2611 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2612 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2613 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2614 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2615
2616 /* we can use lov_mds_md_size() to compute lum_size
2617 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2618 if (lum.lmm_stripe_count > 0) {
2619 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
7795178d 2620 lumk = kzalloc(lum_size, GFP_NOFS);
d7e09d03 2621 if (!lumk)
0a3bdb00 2622 return -ENOMEM;
d7e09d03
PT
2623
2624 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2625 lmm_objects =
2626 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2627 else
2628 lmm_objects = &(lumk->lmm_objects[0]);
2629 lmm_objects->l_ost_oi = lsm->lsm_oi;
2630 } else {
2631 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2632 lumk = &lum;
2633 }
2634
2635 lumk->lmm_oi = lsm->lsm_oi;
2636 lumk->lmm_stripe_count = 1;
2637
2638 if (copy_to_user(lump, lumk, lum_size))
2639 rc = -EFAULT;
2640
2641 if (lumk != &lum)
7795178d 2642 kfree(lumk);
d7e09d03 2643
0a3bdb00 2644 return rc;
d7e09d03
PT
2645}
2646
2647
2648static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2649 void *karg, void *uarg)
2650{
2651 struct obd_device *obd = exp->exp_obd;
2652 struct obd_ioctl_data *data = karg;
2653 int err = 0;
d7e09d03
PT
2654
2655 if (!try_module_get(THIS_MODULE)) {
2656 CERROR("Can't get module. Is it alive?");
2657 return -EINVAL;
2658 }
2659 switch (cmd) {
2660 case OBD_IOC_LOV_GET_CONFIG: {
2661 char *buf;
2662 struct lov_desc *desc;
2663 struct obd_uuid uuid;
2664
2665 buf = NULL;
2666 len = 0;
26c4ea46
TJ
2667 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2668 err = -EINVAL;
2669 goto out;
2670 }
d7e09d03
PT
2671
2672 data = (struct obd_ioctl_data *)buf;
2673
2674 if (sizeof(*desc) > data->ioc_inllen1) {
2675 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2676 err = -EINVAL;
2677 goto out;
d7e09d03
PT
2678 }
2679
2680 if (data->ioc_inllen2 < sizeof(uuid)) {
2681 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2682 err = -EINVAL;
2683 goto out;
d7e09d03
PT
2684 }
2685
2686 desc = (struct lov_desc *)data->ioc_inlbuf1;
2687 desc->ld_tgt_count = 1;
2688 desc->ld_active_tgt_count = 1;
2689 desc->ld_default_stripe_count = 1;
2690 desc->ld_default_stripe_size = 0;
2691 desc->ld_default_stripe_offset = 0;
2692 desc->ld_pattern = 0;
2693 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2694
2695 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2696
2697 err = copy_to_user((void *)uarg, buf, len);
2698 if (err)
2699 err = -EFAULT;
2700 obd_ioctl_freedata(buf, len);
26c4ea46 2701 goto out;
d7e09d03
PT
2702 }
2703 case LL_IOC_LOV_SETSTRIPE:
2704 err = obd_alloc_memmd(exp, karg);
2705 if (err > 0)
2706 err = 0;
26c4ea46 2707 goto out;
d7e09d03
PT
2708 case LL_IOC_LOV_GETSTRIPE:
2709 err = osc_getstripe(karg, uarg);
26c4ea46 2710 goto out;
d7e09d03
PT
2711 case OBD_IOC_CLIENT_RECOVER:
2712 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2713 data->ioc_inlbuf1, 0);
2714 if (err > 0)
2715 err = 0;
26c4ea46 2716 goto out;
d7e09d03
PT
2717 case IOC_OSC_SET_ACTIVE:
2718 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2719 data->ioc_offset);
26c4ea46 2720 goto out;
d7e09d03
PT
2721 case OBD_IOC_POLL_QUOTACHECK:
2722 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
26c4ea46 2723 goto out;
d7e09d03
PT
2724 case OBD_IOC_PING_TARGET:
2725 err = ptlrpc_obd_ping(obd);
26c4ea46 2726 goto out;
d7e09d03
PT
2727 default:
2728 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2729 cmd, current_comm());
26c4ea46
TJ
2730 err = -ENOTTY;
2731 goto out;
d7e09d03
PT
2732 }
2733out:
2734 module_put(THIS_MODULE);
2735 return err;
2736}
2737
2738static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2739 u32 keylen, void *key, __u32 *vallen, void *val,
d7e09d03
PT
2740 struct lov_stripe_md *lsm)
2741{
d7e09d03 2742 if (!vallen || !val)
0a3bdb00 2743 return -EFAULT;
d7e09d03
PT
2744
2745 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2746 __u32 *stripe = val;
2747 *vallen = sizeof(*stripe);
2748 *stripe = 0;
0a3bdb00 2749 return 0;
d7e09d03
PT
2750 } else if (KEY_IS(KEY_LAST_ID)) {
2751 struct ptlrpc_request *req;
21aef7d9 2752 u64 *reply;
d7e09d03
PT
2753 char *tmp;
2754 int rc;
2755
2756 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2757 &RQF_OST_GET_INFO_LAST_ID);
2758 if (req == NULL)
0a3bdb00 2759 return -ENOMEM;
d7e09d03
PT
2760
2761 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2762 RCL_CLIENT, keylen);
2763 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2764 if (rc) {
2765 ptlrpc_request_free(req);
0a3bdb00 2766 return rc;
d7e09d03
PT
2767 }
2768
2769 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2770 memcpy(tmp, key, keylen);
2771
2772 req->rq_no_delay = req->rq_no_resend = 1;
2773 ptlrpc_request_set_replen(req);
2774 rc = ptlrpc_queue_wait(req);
2775 if (rc)
26c4ea46 2776 goto out;
d7e09d03
PT
2777
2778 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
26c4ea46
TJ
2779 if (reply == NULL) {
2780 rc = -EPROTO;
2781 goto out;
2782 }
d7e09d03 2783
21aef7d9 2784 *((u64 *)val) = *reply;
d7e09d03
PT
2785 out:
2786 ptlrpc_req_finished(req);
0a3bdb00 2787 return rc;
d7e09d03 2788 } else if (KEY_IS(KEY_FIEMAP)) {
9d865439
AB
2789 struct ll_fiemap_info_key *fm_key =
2790 (struct ll_fiemap_info_key *)key;
2791 struct ldlm_res_id res_id;
2792 ldlm_policy_data_t policy;
2793 struct lustre_handle lockh;
2794 ldlm_mode_t mode = 0;
2795 struct ptlrpc_request *req;
2796 struct ll_user_fiemap *reply;
2797 char *tmp;
2798 int rc;
2799
2800 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2801 goto skip_locking;
2802
2803 policy.l_extent.start = fm_key->fiemap.fm_start &
2804 CFS_PAGE_MASK;
2805
2806 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2807 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2808 policy.l_extent.end = OBD_OBJECT_EOF;
2809 else
2810 policy.l_extent.end = (fm_key->fiemap.fm_start +
2811 fm_key->fiemap.fm_length +
2812 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2813
2814 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2815 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2816 LDLM_FL_BLOCK_GRANTED |
2817 LDLM_FL_LVB_READY,
2818 &res_id, LDLM_EXTENT, &policy,
2819 LCK_PR | LCK_PW, &lockh, 0);
2820 if (mode) { /* lock is cached on client */
2821 if (mode != LCK_PR) {
2822 ldlm_lock_addref(&lockh, LCK_PR);
2823 ldlm_lock_decref(&lockh, LCK_PW);
2824 }
2825 } else { /* no cached lock, needs acquire lock on server side */
2826 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2827 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2828 }
d7e09d03 2829
9d865439 2830skip_locking:
d7e09d03
PT
2831 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2832 &RQF_OST_GET_INFO_FIEMAP);
26c4ea46
TJ
2833 if (req == NULL) {
2834 rc = -ENOMEM;
2835 goto drop_lock;
2836 }
d7e09d03
PT
2837
2838 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2839 RCL_CLIENT, keylen);
2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2841 RCL_CLIENT, *vallen);
2842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843 RCL_SERVER, *vallen);
2844
2845 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2846 if (rc) {
2847 ptlrpc_request_free(req);
26c4ea46 2848 goto drop_lock;
d7e09d03
PT
2849 }
2850
2851 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2852 memcpy(tmp, key, keylen);
2853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2854 memcpy(tmp, val, *vallen);
2855
2856 ptlrpc_request_set_replen(req);
2857 rc = ptlrpc_queue_wait(req);
2858 if (rc)
26c4ea46 2859 goto fini_req;
d7e09d03
PT
2860
2861 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
26c4ea46
TJ
2862 if (reply == NULL) {
2863 rc = -EPROTO;
2864 goto fini_req;
2865 }
d7e09d03
PT
2866
2867 memcpy(val, reply, *vallen);
9d865439 2868fini_req:
d7e09d03 2869 ptlrpc_req_finished(req);
9d865439
AB
2870drop_lock:
2871 if (mode)
2872 ldlm_lock_decref(&lockh, LCK_PR);
0a3bdb00 2873 return rc;
d7e09d03
PT
2874 }
2875
0a3bdb00 2876 return -EINVAL;
d7e09d03
PT
2877}
2878
2879static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2880 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
2881 void *val, struct ptlrpc_request_set *set)
2882{
2883 struct ptlrpc_request *req;
2884 struct obd_device *obd = exp->exp_obd;
2885 struct obd_import *imp = class_exp2cliimp(exp);
2886 char *tmp;
2887 int rc;
d7e09d03
PT
2888
2889 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2890
2891 if (KEY_IS(KEY_CHECKSUM)) {
2892 if (vallen != sizeof(int))
0a3bdb00 2893 return -EINVAL;
d7e09d03 2894 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
0a3bdb00 2895 return 0;
d7e09d03
PT
2896 }
2897
2898 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2899 sptlrpc_conf_client_adapt(obd);
0a3bdb00 2900 return 0;
d7e09d03
PT
2901 }
2902
2903 if (KEY_IS(KEY_FLUSH_CTX)) {
2904 sptlrpc_import_flush_my_ctx(imp);
0a3bdb00 2905 return 0;
d7e09d03
PT
2906 }
2907
2908 if (KEY_IS(KEY_CACHE_SET)) {
2909 struct client_obd *cli = &obd->u.cli;
2910
2911 LASSERT(cli->cl_cache == NULL); /* only once */
2912 cli->cl_cache = (struct cl_client_cache *)val;
2913 atomic_inc(&cli->cl_cache->ccc_users);
2914 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2915
2916 /* add this osc into entity list */
2917 LASSERT(list_empty(&cli->cl_lru_osc));
2918 spin_lock(&cli->cl_cache->ccc_lru_lock);
2919 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2920 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2921
0a3bdb00 2922 return 0;
d7e09d03
PT
2923 }
2924
2925 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2926 struct client_obd *cli = &obd->u.cli;
2927 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2928 int target = *(int *)val;
2929
2930 nr = osc_lru_shrink(cli, min(nr, target));
2931 *(int *)val -= nr;
0a3bdb00 2932 return 0;
d7e09d03
PT
2933 }
2934
2935 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
0a3bdb00 2936 return -EINVAL;
d7e09d03
PT
2937
2938 /* We pass all other commands directly to OST. Since nobody calls osc
2939 methods directly and everybody is supposed to go through LOV, we
2940 assume lov checked invalid values for us.
2941 The only recognised values so far are evict_by_nid and mds_conn.
2942 Even if something bad goes through, we'd get a -EINVAL from OST
2943 anyway. */
2944
2945 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2946 &RQF_OST_SET_GRANT_INFO :
2947 &RQF_OBD_SET_INFO);
2948 if (req == NULL)
0a3bdb00 2949 return -ENOMEM;
d7e09d03
PT
2950
2951 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2952 RCL_CLIENT, keylen);
2953 if (!KEY_IS(KEY_GRANT_SHRINK))
2954 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2955 RCL_CLIENT, vallen);
2956 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2957 if (rc) {
2958 ptlrpc_request_free(req);
0a3bdb00 2959 return rc;
d7e09d03
PT
2960 }
2961
2962 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2963 memcpy(tmp, key, keylen);
2964 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2965 &RMF_OST_BODY :
2966 &RMF_SETINFO_VAL);
2967 memcpy(tmp, val, vallen);
2968
2969 if (KEY_IS(KEY_GRANT_SHRINK)) {
f024bad4 2970 struct osc_brw_async_args *aa;
d7e09d03
PT
2971 struct obdo *oa;
2972
2973 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2974 aa = ptlrpc_req_async_args(req);
2975 OBDO_ALLOC(oa);
2976 if (!oa) {
2977 ptlrpc_req_finished(req);
0a3bdb00 2978 return -ENOMEM;
d7e09d03
PT
2979 }
2980 *oa = ((struct ost_body *)val)->oa;
2981 aa->aa_oa = oa;
2982 req->rq_interpret_reply = osc_shrink_grant_interpret;
2983 }
2984
2985 ptlrpc_request_set_replen(req);
2986 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2987 LASSERT(set != NULL);
2988 ptlrpc_set_add_req(set, req);
2989 ptlrpc_check_set(NULL, set);
2990 } else
2991 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2992
0a3bdb00 2993 return 0;
d7e09d03
PT
2994}
2995
d7e09d03
PT
2996static int osc_reconnect(const struct lu_env *env,
2997 struct obd_export *exp, struct obd_device *obd,
2998 struct obd_uuid *cluuid,
2999 struct obd_connect_data *data,
3000 void *localdata)
3001{
3002 struct client_obd *cli = &obd->u.cli;
3003
3004 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3005 long lost_grant;
3006
3007 client_obd_list_lock(&cli->cl_loi_list_lock);
3008 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3009 2 * cli_brw_size(obd);
3010 lost_grant = cli->cl_lost_grant;
3011 cli->cl_lost_grant = 0;
3012 client_obd_list_unlock(&cli->cl_loi_list_lock);
3013
2d00bd17
JP
3014 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3015 data->ocd_connect_flags,
d7e09d03
PT
3016 data->ocd_version, data->ocd_grant, lost_grant);
3017 }
3018
0a3bdb00 3019 return 0;
d7e09d03
PT
3020}
3021
3022static int osc_disconnect(struct obd_export *exp)
3023{
3024 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
3025 int rc;
3026
d7e09d03
PT
3027 rc = client_disconnect_export(exp);
3028 /**
3029 * Initially we put del_shrink_grant before disconnect_export, but it
3030 * causes the following problem if setup (connect) and cleanup
3031 * (disconnect) are tangled together.
3032 * connect p1 disconnect p2
3033 * ptlrpc_connect_import
3034 * ............... class_manual_cleanup
3035 * osc_disconnect
3036 * del_shrink_grant
3037 * ptlrpc_connect_interrupt
3038 * init_grant_shrink
3039 * add this client to shrink list
3040 * cleanup_osc
3041 * Bang! pinger trigger the shrink.
3042 * So the osc should be disconnected from the shrink list, after we
3043 * are sure the import has been destroyed. BUG18662
3044 */
3045 if (obd->u.cli.cl_import == NULL)
3046 osc_del_shrink_grant(&obd->u.cli);
3047 return rc;
3048}
3049
3050static int osc_import_event(struct obd_device *obd,
3051 struct obd_import *imp,
3052 enum obd_import_event event)
3053{
3054 struct client_obd *cli;
3055 int rc = 0;
3056
d7e09d03
PT
3057 LASSERT(imp->imp_obd == obd);
3058
3059 switch (event) {
3060 case IMP_EVENT_DISCON: {
3061 cli = &obd->u.cli;
3062 client_obd_list_lock(&cli->cl_loi_list_lock);
3063 cli->cl_avail_grant = 0;
3064 cli->cl_lost_grant = 0;
3065 client_obd_list_unlock(&cli->cl_loi_list_lock);
3066 break;
3067 }
3068 case IMP_EVENT_INACTIVE: {
3069 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3070 break;
3071 }
3072 case IMP_EVENT_INVALIDATE: {
3073 struct ldlm_namespace *ns = obd->obd_namespace;
3074 struct lu_env *env;
3075 int refcheck;
3076
3077 env = cl_env_get(&refcheck);
3078 if (!IS_ERR(env)) {
3079 /* Reset grants */
3080 cli = &obd->u.cli;
3081 /* all pages go to failing rpcs due to the invalid
3082 * import */
3083 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3084
3085 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3086 cl_env_put(env, &refcheck);
3087 } else
3088 rc = PTR_ERR(env);
3089 break;
3090 }
3091 case IMP_EVENT_ACTIVE: {
3092 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3093 break;
3094 }
3095 case IMP_EVENT_OCD: {
3096 struct obd_connect_data *ocd = &imp->imp_connect_data;
3097
3098 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3099 osc_init_grant(&obd->u.cli, ocd);
3100
3101 /* See bug 7198 */
3102 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3103 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3104
3105 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3106 break;
3107 }
3108 case IMP_EVENT_DEACTIVATE: {
3109 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3110 break;
3111 }
3112 case IMP_EVENT_ACTIVATE: {
3113 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3114 break;
3115 }
3116 default:
3117 CERROR("Unknown import event %d\n", event);
3118 LBUG();
3119 }
0a3bdb00 3120 return rc;
d7e09d03
PT
3121}
3122
3123/**
3124 * Determine whether the lock can be canceled before replaying the lock
3125 * during recovery, see bug16774 for detailed information.
3126 *
3127 * \retval zero the lock can't be canceled
3128 * \retval other ok to cancel
3129 */
3130static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3131{
3132 check_res_locked(lock->l_resource);
3133
3134 /*
3135 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3136 *
3137 * XXX as a future improvement, we can also cancel unused write lock
3138 * if it doesn't have dirty data and active mmaps.
3139 */
3140 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3141 (lock->l_granted_mode == LCK_PR ||
3142 lock->l_granted_mode == LCK_CR) &&
3143 (osc_dlm_lock_pageref(lock) == 0))
0a3bdb00 3144 return 1;
d7e09d03 3145
0a3bdb00 3146 return 0;
d7e09d03
PT
3147}
3148
3149static int brw_queue_work(const struct lu_env *env, void *data)
3150{
3151 struct client_obd *cli = data;
3152
3153 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3154
3155 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
0a3bdb00 3156 return 0;
d7e09d03
PT
3157}
3158
3159int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3160{
ea7893bb 3161 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3162 struct client_obd *cli = &obd->u.cli;
3163 void *handler;
3164 int rc;
d7e09d03
PT
3165
3166 rc = ptlrpcd_addref();
3167 if (rc)
0a3bdb00 3168 return rc;
d7e09d03
PT
3169
3170 rc = client_obd_setup(obd, lcfg);
3171 if (rc)
26c4ea46 3172 goto out_ptlrpcd;
d7e09d03
PT
3173
3174 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
26c4ea46
TJ
3175 if (IS_ERR(handler)) {
3176 rc = PTR_ERR(handler);
3177 goto out_client_setup;
3178 }
d7e09d03
PT
3179 cli->cl_writeback_work = handler;
3180
3181 rc = osc_quota_setup(obd);
3182 if (rc)
26c4ea46 3183 goto out_ptlrpcd_work;
d7e09d03
PT
3184
3185 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3186 lprocfs_osc_init_vars(&lvars);
3187 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3188 lproc_osc_attach_seqstat(obd);
3189 sptlrpc_lprocfs_cliobd_attach(obd);
3190 ptlrpc_lprocfs_register_obd(obd);
3191 }
3192
3193 /* We need to allocate a few requests more, because
3194 * brw_interpret tries to create new requests before freeing
3195 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3196 * reserved, but I'm afraid that might be too much wasted RAM
3197 * in fact, so 2 is just my guess and still should work. */
3198 cli->cl_import->imp_rq_pool =
3199 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3200 OST_MAXREQSIZE,
3201 ptlrpc_add_rqs_to_pool);
3202
3203 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3204 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
0a3bdb00 3205 return rc;
d7e09d03
PT
3206
3207out_ptlrpcd_work:
3208 ptlrpcd_destroy_work(handler);
3209out_client_setup:
3210 client_obd_cleanup(obd);
3211out_ptlrpcd:
3212 ptlrpcd_decref();
0a3bdb00 3213 return rc;
d7e09d03
PT
3214}
3215
3216static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3217{
d7e09d03
PT
3218 switch (stage) {
3219 case OBD_CLEANUP_EARLY: {
3220 struct obd_import *imp;
3221 imp = obd->u.cli.cl_import;
3222 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3223 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3224 ptlrpc_deactivate_import(imp);
3225 spin_lock(&imp->imp_lock);
3226 imp->imp_pingable = 0;
3227 spin_unlock(&imp->imp_lock);
3228 break;
3229 }
3230 case OBD_CLEANUP_EXPORTS: {
3231 struct client_obd *cli = &obd->u.cli;
3232 /* LU-464
3233 * for echo client, export may be on zombie list, wait for
3234 * zombie thread to cull it, because cli.cl_import will be
3235 * cleared in client_disconnect_export():
3236 * class_export_destroy() -> obd_cleanup() ->
3237 * echo_device_free() -> echo_client_cleanup() ->
3238 * obd_disconnect() -> osc_disconnect() ->
3239 * client_disconnect_export()
3240 */
3241 obd_zombie_barrier();
3242 if (cli->cl_writeback_work) {
3243 ptlrpcd_destroy_work(cli->cl_writeback_work);
3244 cli->cl_writeback_work = NULL;
3245 }
3246 obd_cleanup_client_import(obd);
3247 ptlrpc_lprocfs_unregister_obd(obd);
3248 lprocfs_obd_cleanup(obd);
d7e09d03
PT
3249 break;
3250 }
3251 }
41f8d410 3252 return 0;
d7e09d03
PT
3253}
3254
3255int osc_cleanup(struct obd_device *obd)
3256{
3257 struct client_obd *cli = &obd->u.cli;
3258 int rc;
3259
d7e09d03
PT
3260 /* lru cleanup */
3261 if (cli->cl_cache != NULL) {
3262 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3263 spin_lock(&cli->cl_cache->ccc_lru_lock);
3264 list_del_init(&cli->cl_lru_osc);
3265 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3266 cli->cl_lru_left = NULL;
3267 atomic_dec(&cli->cl_cache->ccc_users);
3268 cli->cl_cache = NULL;
3269 }
3270
3271 /* free memory of osc quota cache */
3272 osc_quota_cleanup(obd);
3273
3274 rc = client_obd_cleanup(obd);
3275
3276 ptlrpcd_decref();
0a3bdb00 3277 return rc;
d7e09d03
PT
3278}
3279
3280int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3281{
ea7893bb 3282 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3283 int rc = 0;
3284
3285 lprocfs_osc_init_vars(&lvars);
3286
3287 switch (lcfg->lcfg_command) {
3288 default:
3289 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3290 lcfg, obd);
3291 if (rc > 0)
3292 rc = 0;
3293 break;
3294 }
3295
fbe7c6c7 3296 return rc;
d7e09d03
PT
3297}
3298
21aef7d9 3299static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
3300{
3301 return osc_process_config_base(obd, buf);
3302}
3303
3304struct obd_ops osc_obd_ops = {
3305 .o_owner = THIS_MODULE,
3306 .o_setup = osc_setup,
3307 .o_precleanup = osc_precleanup,
3308 .o_cleanup = osc_cleanup,
3309 .o_add_conn = client_import_add_conn,
3310 .o_del_conn = client_import_del_conn,
3311 .o_connect = client_connect_import,
3312 .o_reconnect = osc_reconnect,
3313 .o_disconnect = osc_disconnect,
3314 .o_statfs = osc_statfs,
3315 .o_statfs_async = osc_statfs_async,
3316 .o_packmd = osc_packmd,
3317 .o_unpackmd = osc_unpackmd,
3318 .o_create = osc_create,
3319 .o_destroy = osc_destroy,
3320 .o_getattr = osc_getattr,
3321 .o_getattr_async = osc_getattr_async,
3322 .o_setattr = osc_setattr,
3323 .o_setattr_async = osc_setattr_async,
d7e09d03 3324 .o_find_cbdata = osc_find_cbdata,
d7e09d03
PT
3325 .o_iocontrol = osc_iocontrol,
3326 .o_get_info = osc_get_info,
3327 .o_set_info_async = osc_set_info_async,
3328 .o_import_event = osc_import_event,
d7e09d03
PT
3329 .o_process_config = osc_process_config,
3330 .o_quotactl = osc_quotactl,
3331 .o_quotacheck = osc_quotacheck,
3332};
3333
3334extern struct lu_kmem_descr osc_caches[];
3335extern spinlock_t osc_ast_guard;
3336extern struct lock_class_key osc_ast_guard_class;
3337
b47ea4bb 3338static int __init osc_init(void)
d7e09d03 3339{
ea7893bb 3340 struct lprocfs_static_vars lvars = { NULL };
d7e09d03 3341 int rc;
d7e09d03
PT
3342
3343 /* print an address of _any_ initialized kernel symbol from this
3344 * module, to allow debugging with gdb that doesn't support data
3345 * symbols from modules.*/
3346 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3347
3348 rc = lu_kmem_init(osc_caches);
a55e0f44 3349 if (rc)
0a3bdb00 3350 return rc;
d7e09d03
PT
3351
3352 lprocfs_osc_init_vars(&lvars);
3353
3354 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3355 LUSTRE_OSC_NAME, &osc_device_type);
3356 if (rc) {
3357 lu_kmem_fini(osc_caches);
0a3bdb00 3358 return rc;
d7e09d03
PT
3359 }
3360
3361 spin_lock_init(&osc_ast_guard);
3362 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3363
0a3bdb00 3364 return rc;
d7e09d03
PT
3365}
3366
3367static void /*__exit*/ osc_exit(void)
3368{
3369 class_unregister_type(LUSTRE_OSC_NAME);
3370 lu_kmem_fini(osc_caches);
3371}
3372
3373MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3374MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3375MODULE_LICENSE("GPL");
6960736c 3376MODULE_VERSION(LUSTRE_VERSION_STRING);
d7e09d03 3377
6960736c
GKH
3378module_init(osc_init);
3379module_exit(osc_exit);
This page took 0.62166 seconds and 5 git commands to generate.