staging: lustre: mdc: use __FMODE_EXEC macro
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
9fdaf8c0 39#include "../../include/linux/libcfs/libcfs.h"
d7e09d03
PT
40
41
3ee30015
GKH
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
d7e09d03 46
3ee30015
GKH
47#include "../include/lustre_ha.h"
48#include "../include/lprocfs_status.h"
3ee30015
GKH
49#include "../include/lustre_debug.h"
50#include "../include/lustre_param.h"
51#include "../include/lustre_fid.h"
dd45f477 52#include "../include/obd_class.h"
d7e09d03
PT
53#include "osc_internal.h"
54#include "osc_cl_internal.h"
55
f024bad4
JH
56struct osc_brw_async_args {
57 struct obdo *aa_oa;
58 int aa_requested_nob;
59 int aa_nio_count;
60 u32 aa_page_count;
61 int aa_resends;
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
68};
69
70struct osc_async_args {
71 struct obd_info *aa_oi;
72};
73
74struct osc_setattr_args {
75 struct obdo *sa_oa;
76 obd_enqueue_update_f sa_upcall;
77 void *sa_cookie;
78};
79
80struct osc_fsync_args {
81 struct obd_info *fa_oi;
82 obd_enqueue_update_f fa_upcall;
83 void *fa_cookie;
84};
85
86struct osc_enqueue_args {
87 struct obd_export *oa_exp;
88 __u64 *oa_flags;
89 obd_enqueue_update_f oa_upcall;
90 void *oa_cookie;
91 struct ost_lvb *oa_lvb;
92 struct lustre_handle *oa_lockh;
93 struct ldlm_enqueue_info *oa_ei;
94 unsigned int oa_agl:1;
95};
96
21aef7d9 97static void osc_release_ppga(struct brw_page **ppga, u32 count);
d7e09d03
PT
98static int brw_interpret(const struct lu_env *env,
99 struct ptlrpc_request *req, void *data, int rc);
100int osc_cleanup(struct obd_device *obd);
101
102/* Pack OSC object metadata for disk storage (LE byte order). */
103static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 struct lov_stripe_md *lsm)
105{
106 int lmm_size;
d7e09d03
PT
107
108 lmm_size = sizeof(**lmmp);
109 if (lmmp == NULL)
0a3bdb00 110 return lmm_size;
d7e09d03
PT
111
112 if (*lmmp != NULL && lsm == NULL) {
113 OBD_FREE(*lmmp, lmm_size);
114 *lmmp = NULL;
0a3bdb00 115 return 0;
d7e09d03 116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
0a3bdb00 117 return -EBADF;
d7e09d03
PT
118 }
119
120 if (*lmmp == NULL) {
121 OBD_ALLOC(*lmmp, lmm_size);
122 if (*lmmp == NULL)
0a3bdb00 123 return -ENOMEM;
d7e09d03
PT
124 }
125
126 if (lsm)
127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
0a3bdb00 129 return lmm_size;
d7e09d03
PT
130}
131
132/* Unpack OSC object metadata from disk storage (LE byte order). */
133static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 struct lov_mds_md *lmm, int lmm_bytes)
135{
136 int lsm_size;
137 struct obd_import *imp = class_exp2cliimp(exp);
d7e09d03
PT
138
139 if (lmm != NULL) {
140 if (lmm_bytes < sizeof(*lmm)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp->exp_obd->obd_name, lmm_bytes,
143 (int)sizeof(*lmm));
0a3bdb00 144 return -EINVAL;
d7e09d03
PT
145 }
146 /* XXX LOV_MAGIC etc check? */
147
148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp->exp_obd->obd_name, -EINVAL);
0a3bdb00 151 return -EINVAL;
d7e09d03
PT
152 }
153 }
154
155 lsm_size = lov_stripe_md_size(1);
156 if (lsmp == NULL)
0a3bdb00 157 return lsm_size;
d7e09d03
PT
158
159 if (*lsmp != NULL && lmm == NULL) {
160 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161 OBD_FREE(*lsmp, lsm_size);
162 *lsmp = NULL;
0a3bdb00 163 return 0;
d7e09d03
PT
164 }
165
166 if (*lsmp == NULL) {
167 OBD_ALLOC(*lsmp, lsm_size);
168 if (unlikely(*lsmp == NULL))
0a3bdb00 169 return -ENOMEM;
d7e09d03
PT
170 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172 OBD_FREE(*lsmp, lsm_size);
0a3bdb00 173 return -ENOMEM;
d7e09d03
PT
174 }
175 loi_init((*lsmp)->lsm_oinfo[0]);
176 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
0a3bdb00 177 return -EBADF;
d7e09d03
PT
178 }
179
180 if (lmm != NULL)
181 /* XXX zero *lsmp? */
182 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
183
184 if (imp != NULL &&
185 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
187 else
188 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
189
0a3bdb00 190 return lsm_size;
d7e09d03
PT
191}
192
193static inline void osc_pack_capa(struct ptlrpc_request *req,
194 struct ost_body *body, void *capa)
195{
196 struct obd_capa *oc = (struct obd_capa *)capa;
197 struct lustre_capa *c;
198
199 if (!capa)
200 return;
201
202 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
203 LASSERT(c);
204 capa_cpy(c, oc);
205 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206 DEBUG_CAPA(D_SEC, c, "pack");
207}
208
209static inline void osc_pack_req_body(struct ptlrpc_request *req,
210 struct obd_info *oinfo)
211{
212 struct ost_body *body;
213
214 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
215 LASSERT(body);
216
3b2f75fd 217 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
218 oinfo->oi_oa);
d7e09d03
PT
219 osc_pack_capa(req, body, oinfo->oi_capa);
220}
221
222static inline void osc_set_capa_size(struct ptlrpc_request *req,
223 const struct req_msg_field *field,
224 struct obd_capa *oc)
225{
226 if (oc == NULL)
227 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
228 else
229 /* it is already calculated as sizeof struct obd_capa */
230 ;
231}
232
233static int osc_getattr_interpret(const struct lu_env *env,
234 struct ptlrpc_request *req,
235 struct osc_async_args *aa, int rc)
236{
237 struct ost_body *body;
d7e09d03
PT
238
239 if (rc != 0)
26c4ea46 240 goto out;
d7e09d03
PT
241
242 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
243 if (body) {
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 245 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
247
248 /* This should really be sent by the OST */
249 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
251 } else {
252 CDEBUG(D_INFO, "can't unpack ost_body\n");
253 rc = -EPROTO;
254 aa->aa_oi->oi_oa->o_valid = 0;
255 }
256out:
257 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 258 return rc;
d7e09d03
PT
259}
260
261static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262 struct ptlrpc_request_set *set)
263{
264 struct ptlrpc_request *req;
265 struct osc_async_args *aa;
266 int rc;
d7e09d03
PT
267
268 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269 if (req == NULL)
0a3bdb00 270 return -ENOMEM;
d7e09d03
PT
271
272 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
274 if (rc) {
275 ptlrpc_request_free(req);
0a3bdb00 276 return rc;
d7e09d03
PT
277 }
278
279 osc_pack_req_body(req, oinfo);
280
281 ptlrpc_request_set_replen(req);
282 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
283
284 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285 aa = ptlrpc_req_async_args(req);
286 aa->aa_oi = oinfo;
287
288 ptlrpc_set_add_req(set, req);
0a3bdb00 289 return 0;
d7e09d03
PT
290}
291
292static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293 struct obd_info *oinfo)
294{
295 struct ptlrpc_request *req;
296 struct ost_body *body;
297 int rc;
d7e09d03
PT
298
299 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
300 if (req == NULL)
0a3bdb00 301 return -ENOMEM;
d7e09d03
PT
302
303 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
305 if (rc) {
306 ptlrpc_request_free(req);
0a3bdb00 307 return rc;
d7e09d03
PT
308 }
309
310 osc_pack_req_body(req, oinfo);
311
312 ptlrpc_request_set_replen(req);
313
314 rc = ptlrpc_queue_wait(req);
315 if (rc)
26c4ea46 316 goto out;
d7e09d03
PT
317
318 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
319 if (body == NULL) {
320 rc = -EPROTO;
321 goto out;
322 }
d7e09d03
PT
323
324 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 325 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
326 &body->oa);
d7e09d03
PT
327
328 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
330
d7e09d03
PT
331 out:
332 ptlrpc_req_finished(req);
333 return rc;
334}
335
336static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337 struct obd_info *oinfo, struct obd_trans_info *oti)
338{
339 struct ptlrpc_request *req;
340 struct ost_body *body;
341 int rc;
d7e09d03
PT
342
343 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
344
345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
346 if (req == NULL)
0a3bdb00 347 return -ENOMEM;
d7e09d03
PT
348
349 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
351 if (rc) {
352 ptlrpc_request_free(req);
0a3bdb00 353 return rc;
d7e09d03
PT
354 }
355
356 osc_pack_req_body(req, oinfo);
357
358 ptlrpc_request_set_replen(req);
359
360 rc = ptlrpc_queue_wait(req);
361 if (rc)
26c4ea46 362 goto out;
d7e09d03
PT
363
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
365 if (body == NULL) {
366 rc = -EPROTO;
367 goto out;
368 }
d7e09d03 369
3b2f75fd 370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
371 &body->oa);
d7e09d03 372
d7e09d03
PT
373out:
374 ptlrpc_req_finished(req);
0a3bdb00 375 return rc;
d7e09d03
PT
376}
377
378static int osc_setattr_interpret(const struct lu_env *env,
379 struct ptlrpc_request *req,
380 struct osc_setattr_args *sa, int rc)
381{
382 struct ost_body *body;
d7e09d03
PT
383
384 if (rc != 0)
26c4ea46 385 goto out;
d7e09d03
PT
386
387 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
388 if (body == NULL) {
389 rc = -EPROTO;
390 goto out;
391 }
d7e09d03 392
3b2f75fd 393 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
394 &body->oa);
d7e09d03
PT
395out:
396 rc = sa->sa_upcall(sa->sa_cookie, rc);
0a3bdb00 397 return rc;
d7e09d03
PT
398}
399
400int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401 struct obd_trans_info *oti,
402 obd_enqueue_update_f upcall, void *cookie,
403 struct ptlrpc_request_set *rqset)
404{
405 struct ptlrpc_request *req;
406 struct osc_setattr_args *sa;
407 int rc;
d7e09d03
PT
408
409 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
410 if (req == NULL)
0a3bdb00 411 return -ENOMEM;
d7e09d03
PT
412
413 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
415 if (rc) {
416 ptlrpc_request_free(req);
0a3bdb00 417 return rc;
d7e09d03
PT
418 }
419
420 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
422
423 osc_pack_req_body(req, oinfo);
424
425 ptlrpc_request_set_replen(req);
426
427 /* do mds to ost setattr asynchronously */
428 if (!rqset) {
429 /* Do not wait for response. */
430 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431 } else {
432 req->rq_interpret_reply =
433 (ptlrpc_interpterer_t)osc_setattr_interpret;
434
435 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436 sa = ptlrpc_req_async_args(req);
437 sa->sa_oa = oinfo->oi_oa;
438 sa->sa_upcall = upcall;
439 sa->sa_cookie = cookie;
440
441 if (rqset == PTLRPCD_SET)
442 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
443 else
444 ptlrpc_set_add_req(rqset, req);
445 }
446
0a3bdb00 447 return 0;
d7e09d03
PT
448}
449
450static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451 struct obd_trans_info *oti,
452 struct ptlrpc_request_set *rqset)
453{
454 return osc_setattr_async_base(exp, oinfo, oti,
455 oinfo->oi_cb_up, oinfo, rqset);
456}
457
458int osc_real_create(struct obd_export *exp, struct obdo *oa,
459 struct lov_stripe_md **ea, struct obd_trans_info *oti)
460{
461 struct ptlrpc_request *req;
462 struct ost_body *body;
463 struct lov_stripe_md *lsm;
464 int rc;
d7e09d03
PT
465
466 LASSERT(oa);
467 LASSERT(ea);
468
469 lsm = *ea;
470 if (!lsm) {
471 rc = obd_alloc_memmd(exp, &lsm);
472 if (rc < 0)
0a3bdb00 473 return rc;
d7e09d03
PT
474 }
475
476 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
26c4ea46
TJ
477 if (req == NULL) {
478 rc = -ENOMEM;
479 goto out;
480 }
d7e09d03
PT
481
482 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
483 if (rc) {
484 ptlrpc_request_free(req);
26c4ea46 485 goto out;
d7e09d03
PT
486 }
487
488 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489 LASSERT(body);
3b2f75fd 490
491 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
492
493 ptlrpc_request_set_replen(req);
494
495 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496 oa->o_flags == OBD_FL_DELORPHAN) {
497 DEBUG_REQ(D_HA, req,
498 "delorphan from OST integration");
499 /* Don't resend the delorphan req */
500 req->rq_no_resend = req->rq_no_delay = 1;
501 }
502
503 rc = ptlrpc_queue_wait(req);
504 if (rc)
26c4ea46 505 goto out_req;
d7e09d03
PT
506
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
508 if (body == NULL) {
509 rc = -EPROTO;
510 goto out_req;
511 }
d7e09d03 512
3b2f75fd 513 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
515
516 oa->o_blksize = cli_brw_size(exp->exp_obd);
517 oa->o_valid |= OBD_MD_FLBLKSZ;
518
519 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520 * have valid lsm_oinfo data structs, so don't go touching that.
521 * This needs to be fixed in a big way.
522 */
523 lsm->lsm_oi = oa->o_oi;
524 *ea = lsm;
525
526 if (oti != NULL) {
527 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
528
529 if (oa->o_valid & OBD_MD_FLCOOKIE) {
530 if (!oti->oti_logcookies)
531 oti_alloc_cookies(oti, 1);
532 *oti->oti_logcookies = oa->o_lcookie;
533 }
534 }
535
f537dd2c 536 CDEBUG(D_HA, "transno: %lld\n",
d7e09d03
PT
537 lustre_msg_get_transno(req->rq_repmsg));
538out_req:
539 ptlrpc_req_finished(req);
540out:
541 if (rc && !*ea)
542 obd_free_memmd(exp, &lsm);
0a3bdb00 543 return rc;
d7e09d03
PT
544}
545
546int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547 obd_enqueue_update_f upcall, void *cookie,
548 struct ptlrpc_request_set *rqset)
549{
550 struct ptlrpc_request *req;
551 struct osc_setattr_args *sa;
552 struct ost_body *body;
553 int rc;
d7e09d03
PT
554
555 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
556 if (req == NULL)
0a3bdb00 557 return -ENOMEM;
d7e09d03
PT
558
559 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
561 if (rc) {
562 ptlrpc_request_free(req);
0a3bdb00 563 return rc;
d7e09d03
PT
564 }
565 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566 ptlrpc_at_set_req_timeout(req);
567
568 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
569 LASSERT(body);
3b2f75fd 570 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
571 oinfo->oi_oa);
d7e09d03
PT
572 osc_pack_capa(req, body, oinfo->oi_capa);
573
574 ptlrpc_request_set_replen(req);
575
576 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578 sa = ptlrpc_req_async_args(req);
579 sa->sa_oa = oinfo->oi_oa;
580 sa->sa_upcall = upcall;
581 sa->sa_cookie = cookie;
582 if (rqset == PTLRPCD_SET)
583 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
584 else
585 ptlrpc_set_add_req(rqset, req);
586
0a3bdb00 587 return 0;
d7e09d03
PT
588}
589
d7e09d03
PT
590static int osc_sync_interpret(const struct lu_env *env,
591 struct ptlrpc_request *req,
592 void *arg, int rc)
593{
594 struct osc_fsync_args *fa = arg;
595 struct ost_body *body;
d7e09d03
PT
596
597 if (rc)
26c4ea46 598 goto out;
d7e09d03
PT
599
600 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
601 if (body == NULL) {
602 CERROR ("can't unpack ost_body\n");
26c4ea46
TJ
603 rc = -EPROTO;
604 goto out;
d7e09d03
PT
605 }
606
607 *fa->fa_oi->oi_oa = body->oa;
608out:
609 rc = fa->fa_upcall(fa->fa_cookie, rc);
0a3bdb00 610 return rc;
d7e09d03
PT
611}
612
613int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614 obd_enqueue_update_f upcall, void *cookie,
615 struct ptlrpc_request_set *rqset)
616{
617 struct ptlrpc_request *req;
618 struct ost_body *body;
619 struct osc_fsync_args *fa;
620 int rc;
d7e09d03
PT
621
622 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
623 if (req == NULL)
0a3bdb00 624 return -ENOMEM;
d7e09d03
PT
625
626 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
628 if (rc) {
629 ptlrpc_request_free(req);
0a3bdb00 630 return rc;
d7e09d03
PT
631 }
632
633 /* overload the size and blocks fields in the oa with start/end */
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635 LASSERT(body);
3b2f75fd 636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
637 oinfo->oi_oa);
d7e09d03
PT
638 osc_pack_capa(req, body, oinfo->oi_capa);
639
640 ptlrpc_request_set_replen(req);
641 req->rq_interpret_reply = osc_sync_interpret;
642
643 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644 fa = ptlrpc_req_async_args(req);
645 fa->fa_oi = oinfo;
646 fa->fa_upcall = upcall;
647 fa->fa_cookie = cookie;
648
649 if (rqset == PTLRPCD_SET)
650 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
651 else
652 ptlrpc_set_add_req(rqset, req);
653
0a3bdb00 654 return 0;
d7e09d03
PT
655}
656
d7e09d03
PT
657/* Find and cancel locally locks matched by @mode in the resource found by
658 * @objid. Found locks are added into @cancel list. Returns the amount of
659 * locks added to @cancels list. */
660static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661 struct list_head *cancels,
875332d4 662 ldlm_mode_t mode, __u64 lock_flags)
d7e09d03
PT
663{
664 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665 struct ldlm_res_id res_id;
666 struct ldlm_resource *res;
667 int count;
d7e09d03
PT
668
669 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670 * export) but disabled through procfs (flag in NS).
671 *
672 * This distinguishes from a case when ELC is not supported originally,
673 * when we still want to cancel locks in advance and just cancel them
674 * locally, without sending any RPC. */
675 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
0a3bdb00 676 return 0;
d7e09d03
PT
677
678 ostid_build_res_name(&oa->o_oi, &res_id);
679 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680 if (res == NULL)
0a3bdb00 681 return 0;
d7e09d03
PT
682
683 LDLM_RESOURCE_ADDREF(res);
684 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685 lock_flags, 0, NULL);
686 LDLM_RESOURCE_DELREF(res);
687 ldlm_resource_putref(res);
0a3bdb00 688 return count;
d7e09d03
PT
689}
690
691static int osc_destroy_interpret(const struct lu_env *env,
692 struct ptlrpc_request *req, void *data,
693 int rc)
694{
695 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697 atomic_dec(&cli->cl_destroy_in_flight);
698 wake_up(&cli->cl_destroy_waitq);
699 return 0;
700}
701
702static int osc_can_send_destroy(struct client_obd *cli)
703{
704 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705 cli->cl_max_rpcs_in_flight) {
706 /* The destroy request can be sent */
707 return 1;
708 }
709 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710 cli->cl_max_rpcs_in_flight) {
711 /*
712 * The counter has been modified between the two atomic
713 * operations.
714 */
715 wake_up(&cli->cl_destroy_waitq);
716 }
717 return 0;
718}
719
720int osc_create(const struct lu_env *env, struct obd_export *exp,
721 struct obdo *oa, struct lov_stripe_md **ea,
722 struct obd_trans_info *oti)
723{
724 int rc = 0;
d7e09d03
PT
725
726 LASSERT(oa);
727 LASSERT(ea);
728 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729
730 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731 oa->o_flags == OBD_FL_RECREATE_OBJS) {
0a3bdb00 732 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
733 }
734
735 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
0a3bdb00 736 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
737
738 /* we should not get here anymore */
739 LBUG();
740
0a3bdb00 741 return rc;
d7e09d03
PT
742}
743
744/* Destroy requests can be async always on the client, and we don't even really
745 * care about the return code since the client cannot do anything at all about
746 * a destroy failure.
747 * When the MDS is unlinking a filename, it saves the file objects into a
748 * recovery llog, and these object records are cancelled when the OST reports
749 * they were destroyed and sync'd to disk (i.e. transaction committed).
750 * If the client dies, or the OST is down when the object should be destroyed,
751 * the records are not cancelled, and when the OST reconnects to the MDS next,
752 * it will retrieve the llog unlink logs and then sends the log cancellation
753 * cookies to the MDS after committing destroy transactions. */
754static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755 struct obdo *oa, struct lov_stripe_md *ea,
756 struct obd_trans_info *oti, struct obd_export *md_export,
757 void *capa)
758{
759 struct client_obd *cli = &exp->exp_obd->u.cli;
760 struct ptlrpc_request *req;
761 struct ost_body *body;
762 LIST_HEAD(cancels);
763 int rc, count;
d7e09d03
PT
764
765 if (!oa) {
766 CDEBUG(D_INFO, "oa NULL\n");
0a3bdb00 767 return -EINVAL;
d7e09d03
PT
768 }
769
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
772
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774 if (req == NULL) {
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 776 return -ENOMEM;
d7e09d03
PT
777 }
778
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781 0, &cancels, count);
782 if (rc) {
783 ptlrpc_request_free(req);
0a3bdb00 784 return rc;
d7e09d03
PT
785 }
786
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
789
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793 LASSERT(body);
3b2f75fd 794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
795
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
798
11d66e89 799 /* If osc_destroy is for destroying the unlink orphan,
d7e09d03
PT
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807 NULL);
808
809 /*
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
812 */
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
815 }
816 }
817
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
0a3bdb00 820 return 0;
d7e09d03
PT
821}
822
823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824 long writing_bytes)
825{
21aef7d9 826 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
d7e09d03
PT
827
828 LASSERT(!(oa->o_valid & bits));
829
830 oa->o_valid |= bits;
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837 oa->o_undirty = 0;
c52f69c5 838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
c52f69c5 844 CERROR("dirty %d - %d > system dirty_max %d\n",
d7e09d03
PT
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
848 oa->o_undirty = 0;
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
852 oa->o_undirty = 0;
853 } else {
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
855 PAGE_CACHE_SHIFT)*
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858 }
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
1d8cb70c 863 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
d7e09d03
PT
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866}
867
868void osc_update_next_shrink(struct client_obd *cli)
869{
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
874}
875
21aef7d9 876static void __osc_update_grant(struct client_obd *cli, u64 grant)
d7e09d03
PT
877{
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
881}
882
883static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884{
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
b0f5aad5 886 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
d7e09d03
PT
887 __osc_update_grant(cli, body->oa.o_grant);
888 }
889}
890
891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 892 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
893 void *val, struct ptlrpc_request_set *set);
894
895static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
897 void *aa, int rc)
898{
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
f024bad4 900 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
d7e09d03
PT
901 struct ost_body *body;
902
903 if (rc != 0) {
904 __osc_update_grant(cli, oa->o_grant);
26c4ea46 905 goto out;
d7e09d03
PT
906 }
907
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909 LASSERT(body);
910 osc_update_grant(cli, body);
911out:
912 OBDO_FREE(oa);
913 return rc;
914}
915
916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917{
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
924 oa->o_flags = 0;
925 }
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
928}
929
930/* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934static int osc_shrink_grant(struct client_obd *cli)
935{
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944 return osc_shrink_grant_to_target(cli, target_bytes);
945}
946
947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948{
949 int rc = 0;
950 struct ost_body *body;
d7e09d03
PT
951
952 client_obd_list_lock(&cli->cl_loi_list_lock);
953 /* Don't shrink if we are already above or below the desired limit
954 * We don't want to shrink below a single RPC, as that will negatively
955 * impact block allocation and long-term performance. */
956 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
958
959 if (target_bytes >= cli->cl_avail_grant) {
960 client_obd_list_unlock(&cli->cl_loi_list_lock);
0a3bdb00 961 return 0;
d7e09d03
PT
962 }
963 client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965 OBD_ALLOC_PTR(body);
966 if (!body)
0a3bdb00 967 return -ENOMEM;
d7e09d03
PT
968
969 osc_announce_cached(cli, &body->oa, 0);
970
971 client_obd_list_lock(&cli->cl_loi_list_lock);
972 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973 cli->cl_avail_grant = target_bytes;
974 client_obd_list_unlock(&cli->cl_loi_list_lock);
975 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976 body->oa.o_valid |= OBD_MD_FLFLAGS;
977 body->oa.o_flags = 0;
978 }
979 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980 osc_update_next_shrink(cli);
981
982 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984 sizeof(*body), body, NULL);
985 if (rc != 0)
986 __osc_update_grant(cli, body->oa.o_grant);
987 OBD_FREE_PTR(body);
0a3bdb00 988 return rc;
d7e09d03
PT
989}
990
991static int osc_should_shrink_grant(struct client_obd *client)
992{
a649ad1d
GKH
993 unsigned long time = cfs_time_current();
994 unsigned long next_shrink = client->cl_next_shrink_grant;
d7e09d03
PT
995
996 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997 OBD_CONNECT_GRANT_SHRINK) == 0)
998 return 0;
999
1000 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001 /* Get the current RPC size directly, instead of going via:
1002 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003 * Keep comment here so that it can be found by searching. */
1004 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1005
1006 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007 client->cl_avail_grant > brw_size)
1008 return 1;
1009 else
1010 osc_update_next_shrink(client);
1011 }
1012 return 0;
1013}
1014
1015static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016{
1017 struct client_obd *client;
1018
1019 list_for_each_entry(client, &item->ti_obd_list,
1020 cl_grant_shrink_list) {
1021 if (osc_should_shrink_grant(client))
1022 osc_shrink_grant(client);
1023 }
1024 return 0;
1025}
1026
1027static int osc_add_shrink_grant(struct client_obd *client)
1028{
1029 int rc;
1030
1031 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032 TIMEOUT_GRANT,
1033 osc_grant_shrink_grant_cb, NULL,
1034 &client->cl_grant_shrink_list);
1035 if (rc) {
1036 CERROR("add grant client %s error %d\n",
1037 client->cl_import->imp_obd->obd_name, rc);
1038 return rc;
1039 }
1040 CDEBUG(D_CACHE, "add grant client %s \n",
1041 client->cl_import->imp_obd->obd_name);
1042 osc_update_next_shrink(client);
1043 return 0;
1044}
1045
1046static int osc_del_shrink_grant(struct client_obd *client)
1047{
1048 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049 TIMEOUT_GRANT);
1050}
1051
1052static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053{
1054 /*
1055 * ocd_grant is the total grant amount we're expect to hold: if we've
1056 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058 *
1059 * race is tolerable here: if we're evicted, but imp_state already
1060 * left EVICTED state, then cl_dirty must be 0 already.
1061 */
1062 client_obd_list_lock(&cli->cl_loi_list_lock);
1063 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064 cli->cl_avail_grant = ocd->ocd_grant;
1065 else
1066 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067
1068 if (cli->cl_avail_grant < 0) {
1069 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071 ocd->ocd_grant, cli->cl_dirty);
1072 /* workaround for servers which do not have the patch from
1073 * LU-2679 */
1074 cli->cl_avail_grant = ocd->ocd_grant;
1075 }
1076
1077 /* determine the appropriate chunk size used by osc_extent. */
1078 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079 client_obd_list_unlock(&cli->cl_loi_list_lock);
1080
1081 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1082 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1083 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084
1085 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086 list_empty(&cli->cl_grant_shrink_list))
1087 osc_add_shrink_grant(cli);
1088}
1089
1090/* We assume that the reason this OSC got a short read is because it read
1091 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093 * this stripe never got written at or beyond this stripe offset yet. */
21aef7d9 1094static void handle_short_read(int nob_read, u32 page_count,
d7e09d03
PT
1095 struct brw_page **pga)
1096{
1097 char *ptr;
1098 int i = 0;
1099
1100 /* skip bytes read OK */
1101 while (nob_read > 0) {
1102 LASSERT (page_count > 0);
1103
1104 if (pga[i]->count > nob_read) {
1105 /* EOF inside this page */
1106 ptr = kmap(pga[i]->pg) +
1107 (pga[i]->off & ~CFS_PAGE_MASK);
1108 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109 kunmap(pga[i]->pg);
1110 page_count--;
1111 i++;
1112 break;
1113 }
1114
1115 nob_read -= pga[i]->count;
1116 page_count--;
1117 i++;
1118 }
1119
1120 /* zero remaining pages */
1121 while (page_count-- > 0) {
1122 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123 memset(ptr, 0, pga[i]->count);
1124 kunmap(pga[i]->pg);
1125 i++;
1126 }
1127}
1128
1129static int check_write_rcs(struct ptlrpc_request *req,
1130 int requested_nob, int niocount,
21aef7d9 1131 u32 page_count, struct brw_page **pga)
d7e09d03
PT
1132{
1133 int i;
1134 __u32 *remote_rcs;
1135
1136 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137 sizeof(*remote_rcs) *
1138 niocount);
1139 if (remote_rcs == NULL) {
1140 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
fbe7c6c7 1141 return -EPROTO;
d7e09d03
PT
1142 }
1143
1144 /* return error if any niobuf was in error */
1145 for (i = 0; i < niocount; i++) {
1146 if ((int)remote_rcs[i] < 0)
e8291974 1147 return remote_rcs[i];
d7e09d03
PT
1148
1149 if (remote_rcs[i] != 0) {
1150 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151 i, remote_rcs[i], req);
fbe7c6c7 1152 return -EPROTO;
d7e09d03
PT
1153 }
1154 }
1155
1156 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158 req->rq_bulk->bd_nob_transferred, requested_nob);
fbe7c6c7 1159 return -EPROTO;
d7e09d03
PT
1160 }
1161
fbe7c6c7 1162 return 0;
d7e09d03
PT
1163}
1164
1165static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166{
1167 if (p1->flag != p2->flag) {
1168 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1169 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170
1171 /* warn if we try to combine flags that we don't know to be
1172 * safe to combine */
1173 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175 "report this at http://bugs.whamcloud.com/\n",
1176 p1->flag, p2->flag);
1177 }
1178 return 0;
1179 }
1180
1181 return (p1->off + p1->count == p2->off);
1182}
1183
21aef7d9 1184static u32 osc_checksum_bulk(int nob, u32 pg_count,
d7e09d03
PT
1185 struct brw_page **pga, int opc,
1186 cksum_type_t cksum_type)
1187{
1188 __u32 cksum;
1189 int i = 0;
1190 struct cfs_crypto_hash_desc *hdesc;
1191 unsigned int bufsize;
1192 int err;
1193 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1194
1195 LASSERT(pg_count > 0);
1196
1197 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198 if (IS_ERR(hdesc)) {
1199 CERROR("Unable to initialize checksum hash %s\n",
1200 cfs_crypto_hash_name(cfs_alg));
1201 return PTR_ERR(hdesc);
1202 }
1203
1204 while (nob > 0 && pg_count > 0) {
1205 int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207 /* corrupt the data before we compute the checksum, to
1208 * simulate an OST->client data error */
1209 if (i == 0 && opc == OST_READ &&
1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211 unsigned char *ptr = kmap(pga[i]->pg);
1212 int off = pga[i]->off & ~CFS_PAGE_MASK;
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 kunmap(pga[i]->pg);
1215 }
1216 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217 pga[i]->off & ~CFS_PAGE_MASK,
1218 count);
aa3bee0d
GKH
1219 CDEBUG(D_PAGE,
1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1222 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1223 page_private(pga[i]->pg),
1224 (int)(pga[i]->off & ~CFS_PAGE_MASK));
d7e09d03
PT
1225
1226 nob -= pga[i]->count;
1227 pg_count--;
1228 i++;
1229 }
1230
1231 bufsize = 4;
1232 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234 if (err)
1235 cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237 /* For sending we only compute the wrong checksum instead
1238 * of corrupting the data so it is still correct on a redo */
1239 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240 cksum++;
1241
1242 return cksum;
1243}
1244
1d8cb70c
GD
1245static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1246 struct obdo *oa,
21aef7d9 1247 struct lov_stripe_md *lsm, u32 page_count,
d7e09d03
PT
1248 struct brw_page **pga,
1249 struct ptlrpc_request **reqp,
1250 struct obd_capa *ocapa, int reserve,
1251 int resend)
1252{
1253 struct ptlrpc_request *req;
1254 struct ptlrpc_bulk_desc *desc;
1255 struct ost_body *body;
1256 struct obd_ioobj *ioobj;
1257 struct niobuf_remote *niobuf;
1258 int niocount, i, requested_nob, opc, rc;
1259 struct osc_brw_async_args *aa;
1260 struct req_capsule *pill;
1261 struct brw_page *pg_prev;
1262
d7e09d03 1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
0a3bdb00 1264 return -ENOMEM; /* Recoverable */
d7e09d03 1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
0a3bdb00 1266 return -EINVAL; /* Fatal */
d7e09d03
PT
1267
1268 if ((cmd & OBD_BRW_WRITE) != 0) {
1269 opc = OST_WRITE;
1270 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271 cli->cl_import->imp_rq_pool,
1272 &RQF_OST_BRW_WRITE);
1273 } else {
1274 opc = OST_READ;
1275 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276 }
1277 if (req == NULL)
0a3bdb00 1278 return -ENOMEM;
d7e09d03
PT
1279
1280 for (niocount = i = 1; i < page_count; i++) {
1281 if (!can_merge_pages(pga[i - 1], pga[i]))
1282 niocount++;
1283 }
1284
1285 pill = &req->rq_pill;
1286 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287 sizeof(*ioobj));
1288 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289 niocount * sizeof(*niobuf));
1290 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293 if (rc) {
1294 ptlrpc_request_free(req);
0a3bdb00 1295 return rc;
d7e09d03
PT
1296 }
1297 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298 ptlrpc_at_set_req_timeout(req);
1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300 * retry logic */
1301 req->rq_no_retry_einprogress = 1;
1302
1303 desc = ptlrpc_prep_bulk_imp(req, page_count,
1304 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306 OST_BULK_PORTAL);
1307
26c4ea46
TJ
1308 if (desc == NULL) {
1309 rc = -ENOMEM;
1310 goto out;
1311 }
d7e09d03
PT
1312 /* NB request now owns desc and will free it when it gets freed */
1313
1314 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1315 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1316 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1317 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1318
3b2f75fd 1319 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1320
1321 obdo_to_ioobj(oa, ioobj);
1322 ioobj->ioo_bufcnt = niocount;
1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324 * that might be send for this request. The actual number is decided
1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326 * "max - 1" for old client compatibility sending "0", and also so the
1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329 osc_pack_capa(req, body, ocapa);
1330 LASSERT(page_count > 0);
1331 pg_prev = pga[0];
1332 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1333 struct brw_page *pg = pga[i];
1334 int poff = pg->off & ~CFS_PAGE_MASK;
1335
1336 LASSERT(pg->count > 0);
1337 /* make sure there is no gap in the middle of page array */
1338 LASSERTF(page_count == 1 ||
1339 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1340 ergo(i > 0 && i < page_count - 1,
1341 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1342 ergo(i == page_count - 1, poff == 0)),
b0f5aad5 1343 "i: %d/%d pg: %p off: %llu, count: %u\n",
d7e09d03
PT
1344 i, page_count, pg, pg->off, pg->count);
1345 LASSERTF(i == 0 || pg->off > pg_prev->off,
b0f5aad5
GKH
1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1347 " prev_pg %p [pri %lu ind %lu] off %llu\n",
d7e09d03
PT
1348 i, page_count,
1349 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1350 pg_prev->pg, page_private(pg_prev->pg),
1351 pg_prev->pg->index, pg_prev->off);
1352 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1353 (pg->flag & OBD_BRW_SRVLOCK));
1354
1355 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1356 requested_nob += pg->count;
1357
1358 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1359 niobuf--;
1360 niobuf->len += pg->count;
1361 } else {
1362 niobuf->offset = pg->off;
1363 niobuf->len = pg->count;
1364 niobuf->flags = pg->flag;
1365 }
1366 pg_prev = pg;
1367 }
1368
1369 LASSERTF((void *)(niobuf - niocount) ==
1370 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1371 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1372 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1373
1374 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1375 if (resend) {
1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1377 body->oa.o_valid |= OBD_MD_FLFLAGS;
1378 body->oa.o_flags = 0;
1379 }
1380 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1381 }
1382
1383 if (osc_should_shrink_grant(cli))
1384 osc_shrink_grant_local(cli, &body->oa);
1385
1386 /* size[REQ_REC_OFF] still sizeof (*body) */
1387 if (opc == OST_WRITE) {
1388 if (cli->cl_checksum &&
1389 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1390 /* store cl_cksum_type in a local variable since
1391 * it can be changed via lprocfs */
1392 cksum_type_t cksum_type = cli->cl_cksum_type;
1393
1394 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1395 oa->o_flags &= OBD_FL_LOCAL_MASK;
1396 body->oa.o_flags = 0;
1397 }
1398 body->oa.o_flags |= cksum_type_pack(cksum_type);
1399 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1400 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1401 page_count, pga,
1402 OST_WRITE,
1403 cksum_type);
1404 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1405 body->oa.o_cksum);
1406 /* save this in 'oa', too, for later checking */
1407 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1408 oa->o_flags |= cksum_type_pack(cksum_type);
1409 } else {
1410 /* clear out the checksum flag, in case this is a
1411 * resend but cl_checksum is no longer set. b=11238 */
1412 oa->o_valid &= ~OBD_MD_FLCKSUM;
1413 }
1414 oa->o_cksum = body->oa.o_cksum;
1415 /* 1 RC per niobuf */
1416 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1417 sizeof(__u32) * niocount);
1418 } else {
1419 if (cli->cl_checksum &&
1420 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1421 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1422 body->oa.o_flags = 0;
1423 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1424 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1425 }
1426 }
1427 ptlrpc_request_set_replen(req);
1428
1429 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1430 aa = ptlrpc_req_async_args(req);
1431 aa->aa_oa = oa;
1432 aa->aa_requested_nob = requested_nob;
1433 aa->aa_nio_count = niocount;
1434 aa->aa_page_count = page_count;
1435 aa->aa_resends = 0;
1436 aa->aa_ppga = pga;
1437 aa->aa_cli = cli;
1438 INIT_LIST_HEAD(&aa->aa_oaps);
1439 if (ocapa && reserve)
1440 aa->aa_ocapa = capa_get(ocapa);
1441
1442 *reqp = req;
0a3bdb00 1443 return 0;
d7e09d03
PT
1444
1445 out:
1446 ptlrpc_req_finished(req);
0a3bdb00 1447 return rc;
d7e09d03
PT
1448}
1449
1450static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1451 __u32 client_cksum, __u32 server_cksum, int nob,
21aef7d9 1452 u32 page_count, struct brw_page **pga,
d7e09d03
PT
1453 cksum_type_t client_cksum_type)
1454{
1455 __u32 new_cksum;
1456 char *msg;
1457 cksum_type_t cksum_type;
1458
1459 if (server_cksum == client_cksum) {
1460 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1461 return 0;
1462 }
1463
1464 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1465 oa->o_flags : 0);
1466 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1467 cksum_type);
1468
1469 if (cksum_type != client_cksum_type)
1470 msg = "the server did not use the checksum type specified in "
1471 "the original request - likely a protocol problem";
1472 else if (new_cksum == server_cksum)
1473 msg = "changed on the client after we checksummed it - "
1474 "likely false positive due to mmap IO (bug 11742)";
1475 else if (new_cksum == client_cksum)
1476 msg = "changed in transit before arrival at OST";
1477 else
1478 msg = "changed in transit AND doesn't match the original - "
1479 "likely false positive due to mmap IO (bug 11742)";
1480
1481 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
b0f5aad5 1482 " object "DOSTID" extent [%llu-%llu]\n",
d7e09d03
PT
1483 msg, libcfs_nid2str(peer->nid),
1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1485 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1486 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1487 POSTID(&oa->o_oi), pga[0]->off,
1488 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1489 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1490 "client csum now %x\n", client_cksum, client_cksum_type,
1491 server_cksum, cksum_type, new_cksum);
1492 return 1;
1493}
1494
1495/* Note rc enters this function as number of bytes transferred */
1496static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1497{
1498 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1499 const lnet_process_id_t *peer =
1500 &req->rq_import->imp_connection->c_peer;
1501 struct client_obd *cli = aa->aa_cli;
1502 struct ost_body *body;
1503 __u32 client_cksum = 0;
d7e09d03
PT
1504
1505 if (rc < 0 && rc != -EDQUOT) {
1506 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
0a3bdb00 1507 return rc;
d7e09d03
PT
1508 }
1509
1510 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1511 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1512 if (body == NULL) {
1513 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
0a3bdb00 1514 return -EPROTO;
d7e09d03
PT
1515 }
1516
1517 /* set/clear over quota flag for a uid/gid */
1518 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1519 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1520 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1521
55f5a824 1522 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
d7e09d03
PT
1523 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1524 body->oa.o_flags);
1525 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1526 }
1527
1528 osc_update_grant(cli, body);
1529
1530 if (rc < 0)
0a3bdb00 1531 return rc;
d7e09d03
PT
1532
1533 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1534 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1535
1536 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1537 if (rc > 0) {
1538 CERROR("Unexpected +ve rc %d\n", rc);
0a3bdb00 1539 return -EPROTO;
d7e09d03
PT
1540 }
1541 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1542
1543 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
0a3bdb00 1544 return -EAGAIN;
d7e09d03
PT
1545
1546 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1547 check_write_checksum(&body->oa, peer, client_cksum,
1548 body->oa.o_cksum, aa->aa_requested_nob,
1549 aa->aa_page_count, aa->aa_ppga,
1550 cksum_type_unpack(aa->aa_oa->o_flags)))
0a3bdb00 1551 return -EAGAIN;
d7e09d03 1552
1d8cb70c
GD
1553 rc = check_write_rcs(req, aa->aa_requested_nob,
1554 aa->aa_nio_count,
d7e09d03 1555 aa->aa_page_count, aa->aa_ppga);
26c4ea46 1556 goto out;
d7e09d03
PT
1557 }
1558
1559 /* The rest of this function executes only for OST_READs */
1560
1561 /* if unwrap_bulk failed, return -EAGAIN to retry */
1562 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
26c4ea46
TJ
1563 if (rc < 0) {
1564 rc = -EAGAIN;
1565 goto out;
1566 }
d7e09d03
PT
1567
1568 if (rc > aa->aa_requested_nob) {
1569 CERROR("Unexpected rc %d (%d requested)\n", rc,
1570 aa->aa_requested_nob);
0a3bdb00 1571 return -EPROTO;
d7e09d03
PT
1572 }
1573
1574 if (rc != req->rq_bulk->bd_nob_transferred) {
1575 CERROR ("Unexpected rc %d (%d transferred)\n",
1576 rc, req->rq_bulk->bd_nob_transferred);
fbe7c6c7 1577 return -EPROTO;
d7e09d03
PT
1578 }
1579
1580 if (rc < aa->aa_requested_nob)
1581 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1582
1583 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1584 static int cksum_counter;
1585 __u32 server_cksum = body->oa.o_cksum;
1586 char *via;
1587 char *router;
1588 cksum_type_t cksum_type;
1589
1590 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1591 body->oa.o_flags : 0);
1592 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1593 aa->aa_ppga, OST_READ,
1594 cksum_type);
1595
1596 if (peer->nid == req->rq_bulk->bd_sender) {
1597 via = router = "";
1598 } else {
1599 via = " via ";
1600 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1601 }
1602
a2ff0f97 1603 if (server_cksum != client_cksum) {
d7e09d03
PT
1604 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1605 "%s%s%s inode "DFID" object "DOSTID
b0f5aad5 1606 " extent [%llu-%llu]\n",
d7e09d03
PT
1607 req->rq_import->imp_obd->obd_name,
1608 libcfs_nid2str(peer->nid),
1609 via, router,
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_seq : (__u64)0,
1612 body->oa.o_valid & OBD_MD_FLFID ?
1613 body->oa.o_parent_oid : 0,
1614 body->oa.o_valid & OBD_MD_FLFID ?
1615 body->oa.o_parent_ver : 0,
1616 POSTID(&body->oa.o_oi),
1617 aa->aa_ppga[0]->off,
1618 aa->aa_ppga[aa->aa_page_count-1]->off +
1619 aa->aa_ppga[aa->aa_page_count-1]->count -
1620 1);
1621 CERROR("client %x, server %x, cksum_type %x\n",
1622 client_cksum, server_cksum, cksum_type);
1623 cksum_counter = 0;
1624 aa->aa_oa->o_cksum = client_cksum;
1625 rc = -EAGAIN;
1626 } else {
1627 cksum_counter++;
1628 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1629 rc = 0;
1630 }
1631 } else if (unlikely(client_cksum)) {
1632 static int cksum_missed;
1633
1634 cksum_missed++;
1635 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1636 CERROR("Checksum %u requested from %s but not sent\n",
1637 cksum_missed, libcfs_nid2str(peer->nid));
1638 } else {
1639 rc = 0;
1640 }
1641out:
1642 if (rc >= 0)
3b2f75fd 1643 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1644 aa->aa_oa, &body->oa);
d7e09d03 1645
0a3bdb00 1646 return rc;
d7e09d03
PT
1647}
1648
d7e09d03
PT
1649static int osc_brw_redo_request(struct ptlrpc_request *request,
1650 struct osc_brw_async_args *aa, int rc)
1651{
1652 struct ptlrpc_request *new_req;
1653 struct osc_brw_async_args *new_aa;
1654 struct osc_async_page *oap;
d7e09d03
PT
1655
1656 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1657 "redo for recoverable error %d", rc);
1658
1659 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1660 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1661 aa->aa_cli, aa->aa_oa,
1662 NULL /* lsm unused by osc currently */,
1663 aa->aa_page_count, aa->aa_ppga,
1664 &new_req, aa->aa_ocapa, 0, 1);
1665 if (rc)
0a3bdb00 1666 return rc;
d7e09d03
PT
1667
1668 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1669 if (oap->oap_request != NULL) {
1670 LASSERTF(request == oap->oap_request,
1671 "request %p != oap_request %p\n",
1672 request, oap->oap_request);
1673 if (oap->oap_interrupted) {
1674 ptlrpc_req_finished(new_req);
0a3bdb00 1675 return -EINTR;
d7e09d03
PT
1676 }
1677 }
1678 }
1679 /* New request takes over pga and oaps from old request.
1680 * Note that copying a list_head doesn't work, need to move it... */
1681 aa->aa_resends++;
1682 new_req->rq_interpret_reply = request->rq_interpret_reply;
1683 new_req->rq_async_args = request->rq_async_args;
d7e09d03
PT
1684 /* cap resend delay to the current request timeout, this is similar to
1685 * what ptlrpc does (see after_reply()) */
1686 if (aa->aa_resends > new_req->rq_timeout)
7264b8a5 1687 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
d7e09d03 1688 else
7264b8a5 1689 new_req->rq_sent = get_seconds() + aa->aa_resends;
d7e09d03
PT
1690 new_req->rq_generation_set = 1;
1691 new_req->rq_import_generation = request->rq_import_generation;
1692
1693 new_aa = ptlrpc_req_async_args(new_req);
1694
1695 INIT_LIST_HEAD(&new_aa->aa_oaps);
1696 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1697 INIT_LIST_HEAD(&new_aa->aa_exts);
1698 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1699 new_aa->aa_resends = aa->aa_resends;
1700
1701 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1702 if (oap->oap_request) {
1703 ptlrpc_req_finished(oap->oap_request);
1704 oap->oap_request = ptlrpc_request_addref(new_req);
1705 }
1706 }
1707
1708 new_aa->aa_ocapa = aa->aa_ocapa;
1709 aa->aa_ocapa = NULL;
1710
1711 /* XXX: This code will run into problem if we're going to support
1712 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1713 * and wait for all of them to be finished. We should inherit request
1714 * set from old request. */
1715 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1716
1717 DEBUG_REQ(D_INFO, new_req, "new request");
0a3bdb00 1718 return 0;
d7e09d03
PT
1719}
1720
1721/*
1722 * ugh, we want disk allocation on the target to happen in offset order. we'll
1723 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1724 * fine for our small page arrays and doesn't require allocation. its an
1725 * insertion sort that swaps elements that are strides apart, shrinking the
1726 * stride down until its '1' and the array is sorted.
1727 */
1728static void sort_brw_pages(struct brw_page **array, int num)
1729{
1730 int stride, i, j;
1731 struct brw_page *tmp;
1732
1733 if (num == 1)
1734 return;
1735 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1736 ;
1737
1738 do {
1739 stride /= 3;
1740 for (i = stride ; i < num ; i++) {
1741 tmp = array[i];
1742 j = i;
1743 while (j >= stride && array[j - stride]->off > tmp->off) {
1744 array[j] = array[j - stride];
1745 j -= stride;
1746 }
1747 array[j] = tmp;
1748 }
1749 } while (stride > 1);
1750}
1751
21aef7d9 1752static void osc_release_ppga(struct brw_page **ppga, u32 count)
d7e09d03
PT
1753{
1754 LASSERT(ppga != NULL);
1755 OBD_FREE(ppga, sizeof(*ppga) * count);
1756}
1757
d7e09d03
PT
1758static int brw_interpret(const struct lu_env *env,
1759 struct ptlrpc_request *req, void *data, int rc)
1760{
1761 struct osc_brw_async_args *aa = data;
1762 struct osc_extent *ext;
1763 struct osc_extent *tmp;
1764 struct cl_object *obj = NULL;
1765 struct client_obd *cli = aa->aa_cli;
d7e09d03
PT
1766
1767 rc = osc_brw_fini_request(req, rc);
1768 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1769 /* When server return -EINPROGRESS, client should always retry
1770 * regardless of the number of times the bulk was resent already. */
1771 if (osc_recoverable_error(rc)) {
1772 if (req->rq_import_generation !=
1773 req->rq_import->imp_generation) {
1774 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1775 ""DOSTID", rc = %d.\n",
1776 req->rq_import->imp_obd->obd_name,
1777 POSTID(&aa->aa_oa->o_oi), rc);
1778 } else if (rc == -EINPROGRESS ||
1779 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1780 rc = osc_brw_redo_request(req, aa, rc);
1781 } else {
b0f5aad5 1782 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
d7e09d03
PT
1783 req->rq_import->imp_obd->obd_name,
1784 POSTID(&aa->aa_oa->o_oi), rc);
1785 }
1786
1787 if (rc == 0)
0a3bdb00 1788 return 0;
d7e09d03
PT
1789 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1790 rc = -EIO;
1791 }
1792
1793 if (aa->aa_ocapa) {
1794 capa_put(aa->aa_ocapa);
1795 aa->aa_ocapa = NULL;
1796 }
1797
1798 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799 if (obj == NULL && rc == 0) {
1800 obj = osc2cl(ext->oe_obj);
1801 cl_object_get(obj);
1802 }
1803
1804 list_del_init(&ext->oe_link);
1805 osc_extent_finish(env, ext, 1, rc);
1806 }
1807 LASSERT(list_empty(&aa->aa_exts));
1808 LASSERT(list_empty(&aa->aa_oaps));
1809
1810 if (obj != NULL) {
1811 struct obdo *oa = aa->aa_oa;
1812 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1813 unsigned long valid = 0;
1814
1815 LASSERT(rc == 0);
1816 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1817 attr->cat_blocks = oa->o_blocks;
1818 valid |= CAT_BLOCKS;
1819 }
1820 if (oa->o_valid & OBD_MD_FLMTIME) {
1821 attr->cat_mtime = oa->o_mtime;
1822 valid |= CAT_MTIME;
1823 }
1824 if (oa->o_valid & OBD_MD_FLATIME) {
1825 attr->cat_atime = oa->o_atime;
1826 valid |= CAT_ATIME;
1827 }
1828 if (oa->o_valid & OBD_MD_FLCTIME) {
1829 attr->cat_ctime = oa->o_ctime;
1830 valid |= CAT_CTIME;
1831 }
1832 if (valid != 0) {
1833 cl_object_attr_lock(obj);
1834 cl_object_attr_set(env, obj, attr, valid);
1835 cl_object_attr_unlock(obj);
1836 }
1837 cl_object_put(env, obj);
1838 }
1839 OBDO_FREE(aa->aa_oa);
1840
1841 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1842 req->rq_bulk->bd_nob_transferred);
1843 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1844 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1845
1846 client_obd_list_lock(&cli->cl_loi_list_lock);
1847 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1848 * is called so we know whether to go to sync BRWs or wait for more
1849 * RPCs to complete */
1850 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1851 cli->cl_w_in_flight--;
1852 else
1853 cli->cl_r_in_flight--;
1854 osc_wake_cache_waiters(cli);
1855 client_obd_list_unlock(&cli->cl_loi_list_lock);
1856
1857 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
0a3bdb00 1858 return rc;
d7e09d03
PT
1859}
1860
d7e09d03
PT
1861/**
1862 * Build an RPC by the list of extent @ext_list. The caller must ensure
1863 * that the total pages in this list are NOT over max pages per RPC.
1864 * Extents in the list must be in OES_RPC state.
1865 */
1866int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1867 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1868{
cad6fafa
BJ
1869 struct ptlrpc_request *req = NULL;
1870 struct osc_extent *ext;
1871 struct brw_page **pga = NULL;
1872 struct osc_brw_async_args *aa = NULL;
1873 struct obdo *oa = NULL;
1874 struct osc_async_page *oap;
1875 struct osc_async_page *tmp;
1876 struct cl_req *clerq = NULL;
1877 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1878 CRT_READ;
1879 struct ldlm_lock *lock = NULL;
1880 struct cl_req_attr *crattr = NULL;
21aef7d9
OD
1881 u64 starting_offset = OBD_OBJECT_EOF;
1882 u64 ending_offset = 0;
cad6fafa
BJ
1883 int mpflag = 0;
1884 int mem_tight = 0;
1885 int page_count = 0;
1886 int i;
1887 int rc;
d7e09d03 1888 LIST_HEAD(rpc_list);
d7e09d03 1889
d7e09d03
PT
1890 LASSERT(!list_empty(ext_list));
1891
1892 /* add pages into rpc_list to build BRW rpc */
1893 list_for_each_entry(ext, ext_list, oe_link) {
1894 LASSERT(ext->oe_state == OES_RPC);
1895 mem_tight |= ext->oe_memalloc;
1896 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1897 ++page_count;
1898 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1899 if (starting_offset > oap->oap_obj_off)
1900 starting_offset = oap->oap_obj_off;
1901 else
1902 LASSERT(oap->oap_page_off == 0);
1903 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1904 ending_offset = oap->oap_obj_off +
1905 oap->oap_count;
1906 else
1907 LASSERT(oap->oap_page_off + oap->oap_count ==
1908 PAGE_CACHE_SIZE);
1909 }
1910 }
1911
1912 if (mem_tight)
1913 mpflag = cfs_memory_pressure_get_and_set();
1914
cad6fafa 1915 OBD_ALLOC(crattr, sizeof(*crattr));
26c4ea46
TJ
1916 if (crattr == NULL) {
1917 rc = -ENOMEM;
1918 goto out;
1919 }
cad6fafa 1920
d7e09d03 1921 OBD_ALLOC(pga, sizeof(*pga) * page_count);
26c4ea46
TJ
1922 if (pga == NULL) {
1923 rc = -ENOMEM;
1924 goto out;
1925 }
d7e09d03
PT
1926
1927 OBDO_ALLOC(oa);
26c4ea46
TJ
1928 if (oa == NULL) {
1929 rc = -ENOMEM;
1930 goto out;
1931 }
d7e09d03
PT
1932
1933 i = 0;
1934 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1935 struct cl_page *page = oap2cl_page(oap);
1936 if (clerq == NULL) {
1937 clerq = cl_req_alloc(env, page, crt,
cad6fafa 1938 1 /* only 1-object rpcs for now */);
26c4ea46
TJ
1939 if (IS_ERR(clerq)) {
1940 rc = PTR_ERR(clerq);
1941 goto out;
1942 }
d7e09d03
PT
1943 lock = oap->oap_ldlm_lock;
1944 }
1945 if (mem_tight)
1946 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1947 pga[i] = &oap->oap_brw_page;
1948 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1949 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
cad6fafa
BJ
1950 pga[i]->pg, page_index(oap->oap_page), oap,
1951 pga[i]->flag);
d7e09d03
PT
1952 i++;
1953 cl_req_page_add(env, clerq, page);
1954 }
1955
1956 /* always get the data for the obdo for the rpc */
1957 LASSERT(clerq != NULL);
cad6fafa
BJ
1958 crattr->cra_oa = oa;
1959 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
1960 if (lock) {
1961 oa->o_handle = lock->l_remote_handle;
1962 oa->o_valid |= OBD_MD_FLHANDLE;
1963 }
1964
1965 rc = cl_req_prep(env, clerq);
1966 if (rc != 0) {
1967 CERROR("cl_req_prep failed: %d\n", rc);
26c4ea46 1968 goto out;
d7e09d03
PT
1969 }
1970
1971 sort_brw_pages(pga, page_count);
1972 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
cad6fafa 1973 pga, &req, crattr->cra_capa, 1, 0);
d7e09d03
PT
1974 if (rc != 0) {
1975 CERROR("prep_req failed: %d\n", rc);
26c4ea46 1976 goto out;
d7e09d03
PT
1977 }
1978
d7e09d03
PT
1979 req->rq_interpret_reply = brw_interpret;
1980
1981 if (mem_tight != 0)
1982 req->rq_memalloc = 1;
1983
1984 /* Need to update the timestamps after the request is built in case
1985 * we race with setattr (locally or in queue at OST). If OST gets
1986 * later setattr before earlier BRW (as determined by the request xid),
1987 * the OST will not use BRW timestamps. Sadly, there is no obvious
1988 * way to do this in a single call. bug 10150 */
cad6fafa 1989 cl_req_attr_set(env, clerq, crattr,
d7e09d03
PT
1990 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1991
cad6fafa 1992 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
1993
1994 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1995 aa = ptlrpc_req_async_args(req);
1996 INIT_LIST_HEAD(&aa->aa_oaps);
1997 list_splice_init(&rpc_list, &aa->aa_oaps);
1998 INIT_LIST_HEAD(&aa->aa_exts);
1999 list_splice_init(ext_list, &aa->aa_exts);
2000 aa->aa_clerq = clerq;
2001
2002 /* queued sync pages can be torn down while the pages
2003 * were between the pending list and the rpc */
2004 tmp = NULL;
2005 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2006 /* only one oap gets a request reference */
2007 if (tmp == NULL)
2008 tmp = oap;
2009 if (oap->oap_interrupted && !req->rq_intr) {
2010 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2011 oap, req);
2012 ptlrpc_mark_interrupted(req);
2013 }
2014 }
2015 if (tmp != NULL)
2016 tmp->oap_request = ptlrpc_request_addref(req);
2017
2018 client_obd_list_lock(&cli->cl_loi_list_lock);
2019 starting_offset >>= PAGE_CACHE_SHIFT;
2020 if (cmd == OBD_BRW_READ) {
2021 cli->cl_r_in_flight++;
2022 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2023 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2024 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2025 starting_offset + 1);
2026 } else {
2027 cli->cl_w_in_flight++;
2028 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2029 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2030 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2031 starting_offset + 1);
2032 }
2033 client_obd_list_unlock(&cli->cl_loi_list_lock);
2034
2035 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2036 page_count, aa, cli->cl_r_in_flight,
2037 cli->cl_w_in_flight);
2038
2039 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2040 * see which CPU/NUMA node the majority of pages were allocated
2041 * on, and try to assign the async RPC to the CPU core
2042 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2043 *
2044 * But on the other hand, we expect that multiple ptlrpcd
2045 * threads and the initial write sponsor can run in parallel,
2046 * especially when data checksum is enabled, which is CPU-bound
2047 * operation and single ptlrpcd thread cannot process in time.
2048 * So more ptlrpcd threads sharing BRW load
2049 * (with PDL_POLICY_ROUND) seems better.
2050 */
2051 ptlrpcd_add_req(req, pol, -1);
2052 rc = 0;
d7e09d03
PT
2053
2054out:
2055 if (mem_tight != 0)
2056 cfs_memory_pressure_restore(mpflag);
2057
cad6fafa
BJ
2058 if (crattr != NULL) {
2059 capa_put(crattr->cra_capa);
2060 OBD_FREE(crattr, sizeof(*crattr));
2061 }
2062
d7e09d03
PT
2063 if (rc != 0) {
2064 LASSERT(req == NULL);
2065
2066 if (oa)
2067 OBDO_FREE(oa);
2068 if (pga)
2069 OBD_FREE(pga, sizeof(*pga) * page_count);
2070 /* this should happen rarely and is pretty bad, it makes the
2071 * pending list not follow the dirty order */
2072 while (!list_empty(ext_list)) {
2073 ext = list_entry(ext_list->next, struct osc_extent,
2074 oe_link);
2075 list_del_init(&ext->oe_link);
2076 osc_extent_finish(env, ext, 0, rc);
2077 }
2078 if (clerq && !IS_ERR(clerq))
2079 cl_req_completion(env, clerq, rc);
2080 }
0a3bdb00 2081 return rc;
d7e09d03
PT
2082}
2083
2084static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2085 struct ldlm_enqueue_info *einfo)
2086{
2087 void *data = einfo->ei_cbdata;
2088 int set = 0;
2089
2090 LASSERT(lock != NULL);
2091 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2092 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2093 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2094 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2095
2096 lock_res_and_lock(lock);
2097 spin_lock(&osc_ast_guard);
2098
2099 if (lock->l_ast_data == NULL)
2100 lock->l_ast_data = data;
2101 if (lock->l_ast_data == data)
2102 set = 1;
2103
2104 spin_unlock(&osc_ast_guard);
2105 unlock_res_and_lock(lock);
2106
2107 return set;
2108}
2109
2110static int osc_set_data_with_check(struct lustre_handle *lockh,
2111 struct ldlm_enqueue_info *einfo)
2112{
2113 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2114 int set = 0;
2115
2116 if (lock != NULL) {
2117 set = osc_set_lock_data_with_check(lock, einfo);
2118 LDLM_LOCK_PUT(lock);
2119 } else
2120 CERROR("lockh %p, data %p - client evicted?\n",
2121 lockh, einfo->ei_cbdata);
2122 return set;
2123}
2124
d7e09d03
PT
2125/* find any ldlm lock of the inode in osc
2126 * return 0 not find
2127 * 1 find one
2128 * < 0 error */
2129static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2130 ldlm_iterator_t replace, void *data)
2131{
2132 struct ldlm_res_id res_id;
2133 struct obd_device *obd = class_exp2obd(exp);
2134 int rc = 0;
2135
2136 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2137 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2138 if (rc == LDLM_ITER_STOP)
fbe7c6c7 2139 return 1;
d7e09d03 2140 if (rc == LDLM_ITER_CONTINUE)
fbe7c6c7
JL
2141 return 0;
2142 return rc;
d7e09d03
PT
2143}
2144
2145static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2146 obd_enqueue_update_f upcall, void *cookie,
2147 __u64 *flags, int agl, int rc)
2148{
2149 int intent = *flags & LDLM_FL_HAS_INTENT;
d7e09d03
PT
2150
2151 if (intent) {
2152 /* The request was created before ldlm_cli_enqueue call. */
2153 if (rc == ELDLM_LOCK_ABORTED) {
2154 struct ldlm_reply *rep;
2155 rep = req_capsule_server_get(&req->rq_pill,
2156 &RMF_DLM_REP);
2157
2158 LASSERT(rep != NULL);
2d58de78
LW
2159 rep->lock_policy_res1 =
2160 ptlrpc_status_ntoh(rep->lock_policy_res1);
d7e09d03
PT
2161 if (rep->lock_policy_res1)
2162 rc = rep->lock_policy_res1;
2163 }
2164 }
2165
2166 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2167 (rc == 0)) {
2168 *flags |= LDLM_FL_LVB_READY;
1d8cb70c 2169 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
d7e09d03
PT
2170 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2171 }
2172
2173 /* Call the update callback. */
2174 rc = (*upcall)(cookie, rc);
0a3bdb00 2175 return rc;
d7e09d03
PT
2176}
2177
2178static int osc_enqueue_interpret(const struct lu_env *env,
2179 struct ptlrpc_request *req,
2180 struct osc_enqueue_args *aa, int rc)
2181{
2182 struct ldlm_lock *lock;
2183 struct lustre_handle handle;
2184 __u32 mode;
2185 struct ost_lvb *lvb;
2186 __u32 lvb_len;
2187 __u64 *flags = aa->oa_flags;
2188
2189 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2190 * might be freed anytime after lock upcall has been called. */
2191 lustre_handle_copy(&handle, aa->oa_lockh);
2192 mode = aa->oa_ei->ei_mode;
2193
2194 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2195 * be valid. */
2196 lock = ldlm_handle2lock(&handle);
2197
2198 /* Take an additional reference so that a blocking AST that
2199 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2200 * to arrive after an upcall has been executed by
2201 * osc_enqueue_fini(). */
2202 ldlm_lock_addref(&handle, mode);
2203
2204 /* Let CP AST to grant the lock first. */
2205 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2206
2207 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2208 lvb = NULL;
2209 lvb_len = 0;
2210 } else {
2211 lvb = aa->oa_lvb;
2212 lvb_len = sizeof(*aa->oa_lvb);
2213 }
2214
2215 /* Complete obtaining the lock procedure. */
2216 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2217 mode, flags, lvb, lvb_len, &handle, rc);
2218 /* Complete osc stuff. */
2219 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2220 flags, aa->oa_agl, rc);
2221
2222 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2223
2224 /* Release the lock for async request. */
2225 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2226 /*
2227 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2228 * not already released by
2229 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2230 */
2231 ldlm_lock_decref(&handle, mode);
2232
2233 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2234 aa->oa_lockh, req, aa);
2235 ldlm_lock_decref(&handle, mode);
2236 LDLM_LOCK_PUT(lock);
2237 return rc;
2238}
2239
d7e09d03
PT
2240struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2241
2242/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2243 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2244 * other synchronous requests, however keeping some locks and trying to obtain
2245 * others may take a considerable amount of time in a case of ost failure; and
2246 * when other sync requests do not get released lock from a client, the client
2247 * is excluded from the cluster -- such scenarious make the life difficult, so
2248 * release locks just after they are obtained. */
2249int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2250 __u64 *flags, ldlm_policy_data_t *policy,
2251 struct ost_lvb *lvb, int kms_valid,
2252 obd_enqueue_update_f upcall, void *cookie,
2253 struct ldlm_enqueue_info *einfo,
2254 struct lustre_handle *lockh,
2255 struct ptlrpc_request_set *rqset, int async, int agl)
2256{
2257 struct obd_device *obd = exp->exp_obd;
2258 struct ptlrpc_request *req = NULL;
2259 int intent = *flags & LDLM_FL_HAS_INTENT;
875332d4 2260 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
d7e09d03
PT
2261 ldlm_mode_t mode;
2262 int rc;
d7e09d03
PT
2263
2264 /* Filesystem lock extents are extended to page boundaries so that
2265 * dealing with the page cache is a little smoother. */
2266 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2267 policy->l_extent.end |= ~CFS_PAGE_MASK;
2268
2269 /*
2270 * kms is not valid when either object is completely fresh (so that no
2271 * locks are cached), or object was evicted. In the latter case cached
2272 * lock cannot be used, because it would prime inode state with
2273 * potentially stale LVB.
2274 */
2275 if (!kms_valid)
2276 goto no_match;
2277
2278 /* Next, search for already existing extent locks that will cover us */
2279 /* If we're trying to read, we also search for an existing PW lock. The
2280 * VFS and page cache already protect us locally, so lots of readers/
2281 * writers can share a single PW lock.
2282 *
2283 * There are problems with conversion deadlocks, so instead of
2284 * converting a read lock to a write lock, we'll just enqueue a new
2285 * one.
2286 *
2287 * At some point we should cancel the read lock instead of making them
2288 * send us a blocking callback, but there are problems with canceling
2289 * locks out from other users right now, too. */
2290 mode = einfo->ei_mode;
2291 if (einfo->ei_mode == LCK_PR)
2292 mode |= LCK_PW;
2293 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2294 einfo->ei_type, policy, mode, lockh, 0);
2295 if (mode) {
2296 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2297
2298 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2299 /* For AGL, if enqueue RPC is sent but the lock is not
2300 * granted, then skip to process this strpe.
2301 * Return -ECANCELED to tell the caller. */
2302 ldlm_lock_decref(lockh, mode);
2303 LDLM_LOCK_PUT(matched);
0a3bdb00 2304 return -ECANCELED;
d7e09d03
PT
2305 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2306 *flags |= LDLM_FL_LVB_READY;
2307 /* addref the lock only if not async requests and PW
2308 * lock is matched whereas we asked for PR. */
2309 if (!rqset && einfo->ei_mode != mode)
2310 ldlm_lock_addref(lockh, LCK_PR);
2311 if (intent) {
2312 /* I would like to be able to ASSERT here that
2313 * rss <= kms, but I can't, for reasons which
2314 * are explained in lov_enqueue() */
2315 }
2316
2317 /* We already have a lock, and it's referenced.
2318 *
2319 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2320 * AGL upcall may change it to CLS_HELD directly. */
2321 (*upcall)(cookie, ELDLM_OK);
2322
2323 if (einfo->ei_mode != mode)
2324 ldlm_lock_decref(lockh, LCK_PW);
2325 else if (rqset)
2326 /* For async requests, decref the lock. */
2327 ldlm_lock_decref(lockh, einfo->ei_mode);
2328 LDLM_LOCK_PUT(matched);
0a3bdb00 2329 return ELDLM_OK;
d7e09d03
PT
2330 } else {
2331 ldlm_lock_decref(lockh, mode);
2332 LDLM_LOCK_PUT(matched);
2333 }
2334 }
2335
2336 no_match:
2337 if (intent) {
2338 LIST_HEAD(cancels);
2339 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2340 &RQF_LDLM_ENQUEUE_LVB);
2341 if (req == NULL)
0a3bdb00 2342 return -ENOMEM;
d7e09d03
PT
2343
2344 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2345 if (rc) {
2346 ptlrpc_request_free(req);
0a3bdb00 2347 return rc;
d7e09d03
PT
2348 }
2349
2350 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
ec83e611 2351 sizeof(*lvb));
d7e09d03
PT
2352 ptlrpc_request_set_replen(req);
2353 }
2354
2355 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2356 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2357
2358 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2359 sizeof(*lvb), LVB_T_OST, lockh, async);
2360 if (rqset) {
2361 if (!rc) {
2362 struct osc_enqueue_args *aa;
2363 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2364 aa = ptlrpc_req_async_args(req);
2365 aa->oa_ei = einfo;
2366 aa->oa_exp = exp;
2367 aa->oa_flags = flags;
2368 aa->oa_upcall = upcall;
2369 aa->oa_cookie = cookie;
2370 aa->oa_lvb = lvb;
2371 aa->oa_lockh = lockh;
2372 aa->oa_agl = !!agl;
2373
2374 req->rq_interpret_reply =
2375 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2376 if (rqset == PTLRPCD_SET)
2377 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2378 else
2379 ptlrpc_set_add_req(rqset, req);
2380 } else if (intent) {
2381 ptlrpc_req_finished(req);
2382 }
0a3bdb00 2383 return rc;
d7e09d03
PT
2384 }
2385
2386 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2387 if (intent)
2388 ptlrpc_req_finished(req);
2389
0a3bdb00 2390 return rc;
d7e09d03
PT
2391}
2392
d7e09d03
PT
2393int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2394 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
875332d4 2395 __u64 *flags, void *data, struct lustre_handle *lockh,
d7e09d03
PT
2396 int unref)
2397{
2398 struct obd_device *obd = exp->exp_obd;
875332d4 2399 __u64 lflags = *flags;
d7e09d03 2400 ldlm_mode_t rc;
d7e09d03
PT
2401
2402 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
0a3bdb00 2403 return -EIO;
d7e09d03
PT
2404
2405 /* Filesystem lock extents are extended to page boundaries so that
2406 * dealing with the page cache is a little smoother */
2407 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2408 policy->l_extent.end |= ~CFS_PAGE_MASK;
2409
2410 /* Next, search for already existing extent locks that will cover us */
2411 /* If we're trying to read, we also search for an existing PW lock. The
2412 * VFS and page cache already protect us locally, so lots of readers/
2413 * writers can share a single PW lock. */
2414 rc = mode;
2415 if (mode == LCK_PR)
2416 rc |= LCK_PW;
2417 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2418 res_id, type, policy, rc, lockh, unref);
2419 if (rc) {
2420 if (data != NULL) {
2421 if (!osc_set_data_with_check(lockh, data)) {
2422 if (!(lflags & LDLM_FL_TEST_LOCK))
2423 ldlm_lock_decref(lockh, rc);
0a3bdb00 2424 return 0;
d7e09d03
PT
2425 }
2426 }
2427 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2428 ldlm_lock_addref(lockh, LCK_PR);
2429 ldlm_lock_decref(lockh, LCK_PW);
2430 }
0a3bdb00 2431 return rc;
d7e09d03 2432 }
0a3bdb00 2433 return rc;
d7e09d03
PT
2434}
2435
2436int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2437{
d7e09d03
PT
2438 if (unlikely(mode == LCK_GROUP))
2439 ldlm_lock_decref_and_cancel(lockh, mode);
2440 else
2441 ldlm_lock_decref(lockh, mode);
2442
0a3bdb00 2443 return 0;
d7e09d03
PT
2444}
2445
d7e09d03
PT
2446static int osc_statfs_interpret(const struct lu_env *env,
2447 struct ptlrpc_request *req,
2448 struct osc_async_args *aa, int rc)
2449{
2450 struct obd_statfs *msfs;
d7e09d03
PT
2451
2452 if (rc == -EBADR)
2453 /* The request has in fact never been sent
2454 * due to issues at a higher level (LOV).
2455 * Exit immediately since the caller is
2456 * aware of the problem and takes care
2457 * of the clean up */
0a3bdb00 2458 return rc;
d7e09d03
PT
2459
2460 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
26c4ea46
TJ
2461 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2462 rc = 0;
2463 goto out;
2464 }
d7e09d03
PT
2465
2466 if (rc != 0)
26c4ea46 2467 goto out;
d7e09d03
PT
2468
2469 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2470 if (msfs == NULL) {
26c4ea46
TJ
2471 rc = -EPROTO;
2472 goto out;
d7e09d03
PT
2473 }
2474
2475 *aa->aa_oi->oi_osfs = *msfs;
2476out:
2477 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 2478 return rc;
d7e09d03
PT
2479}
2480
2481static int osc_statfs_async(struct obd_export *exp,
2482 struct obd_info *oinfo, __u64 max_age,
2483 struct ptlrpc_request_set *rqset)
2484{
2485 struct obd_device *obd = class_exp2obd(exp);
2486 struct ptlrpc_request *req;
2487 struct osc_async_args *aa;
2488 int rc;
d7e09d03
PT
2489
2490 /* We could possibly pass max_age in the request (as an absolute
2491 * timestamp or a "seconds.usec ago") so the target can avoid doing
2492 * extra calls into the filesystem if that isn't necessary (e.g.
2493 * during mount that would help a bit). Having relative timestamps
2494 * is not so great if request processing is slow, while absolute
2495 * timestamps are not ideal because they need time synchronization. */
2496 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2497 if (req == NULL)
0a3bdb00 2498 return -ENOMEM;
d7e09d03
PT
2499
2500 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2501 if (rc) {
2502 ptlrpc_request_free(req);
0a3bdb00 2503 return rc;
d7e09d03
PT
2504 }
2505 ptlrpc_request_set_replen(req);
2506 req->rq_request_portal = OST_CREATE_PORTAL;
2507 ptlrpc_at_set_req_timeout(req);
2508
2509 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2510 /* procfs requests not want stat in wait for avoid deadlock */
2511 req->rq_no_resend = 1;
2512 req->rq_no_delay = 1;
2513 }
2514
2515 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2516 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2517 aa = ptlrpc_req_async_args(req);
2518 aa->aa_oi = oinfo;
2519
2520 ptlrpc_set_add_req(rqset, req);
0a3bdb00 2521 return 0;
d7e09d03
PT
2522}
2523
2524static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2525 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2526{
2527 struct obd_device *obd = class_exp2obd(exp);
2528 struct obd_statfs *msfs;
2529 struct ptlrpc_request *req;
2530 struct obd_import *imp = NULL;
2531 int rc;
d7e09d03
PT
2532
2533 /*Since the request might also come from lprocfs, so we need
2534 *sync this with client_disconnect_export Bug15684*/
2535 down_read(&obd->u.cli.cl_sem);
2536 if (obd->u.cli.cl_import)
2537 imp = class_import_get(obd->u.cli.cl_import);
2538 up_read(&obd->u.cli.cl_sem);
2539 if (!imp)
0a3bdb00 2540 return -ENODEV;
d7e09d03
PT
2541
2542 /* We could possibly pass max_age in the request (as an absolute
2543 * timestamp or a "seconds.usec ago") so the target can avoid doing
2544 * extra calls into the filesystem if that isn't necessary (e.g.
2545 * during mount that would help a bit). Having relative timestamps
2546 * is not so great if request processing is slow, while absolute
2547 * timestamps are not ideal because they need time synchronization. */
2548 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2549
2550 class_import_put(imp);
2551
2552 if (req == NULL)
0a3bdb00 2553 return -ENOMEM;
d7e09d03
PT
2554
2555 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2556 if (rc) {
2557 ptlrpc_request_free(req);
0a3bdb00 2558 return rc;
d7e09d03
PT
2559 }
2560 ptlrpc_request_set_replen(req);
2561 req->rq_request_portal = OST_CREATE_PORTAL;
2562 ptlrpc_at_set_req_timeout(req);
2563
2564 if (flags & OBD_STATFS_NODELAY) {
2565 /* procfs requests not want stat in wait for avoid deadlock */
2566 req->rq_no_resend = 1;
2567 req->rq_no_delay = 1;
2568 }
2569
2570 rc = ptlrpc_queue_wait(req);
2571 if (rc)
26c4ea46 2572 goto out;
d7e09d03
PT
2573
2574 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2575 if (msfs == NULL) {
26c4ea46
TJ
2576 rc = -EPROTO;
2577 goto out;
d7e09d03
PT
2578 }
2579
2580 *osfs = *msfs;
2581
d7e09d03
PT
2582 out:
2583 ptlrpc_req_finished(req);
2584 return rc;
2585}
2586
2587/* Retrieve object striping information.
2588 *
2589 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2590 * the maximum number of OST indices which will fit in the user buffer.
2591 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2592 */
2593static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2594{
2595 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2596 struct lov_user_md_v3 lum, *lumk;
2597 struct lov_user_ost_data_v1 *lmm_objects;
2598 int rc = 0, lum_size;
d7e09d03
PT
2599
2600 if (!lsm)
0a3bdb00 2601 return -ENODATA;
d7e09d03
PT
2602
2603 /* we only need the header part from user space to get lmm_magic and
2604 * lmm_stripe_count, (the header part is common to v1 and v3) */
2605 lum_size = sizeof(struct lov_user_md_v1);
2606 if (copy_from_user(&lum, lump, lum_size))
0a3bdb00 2607 return -EFAULT;
d7e09d03
PT
2608
2609 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2610 (lum.lmm_magic != LOV_USER_MAGIC_V3))
0a3bdb00 2611 return -EINVAL;
d7e09d03
PT
2612
2613 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2614 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2615 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2616 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2617
2618 /* we can use lov_mds_md_size() to compute lum_size
2619 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2620 if (lum.lmm_stripe_count > 0) {
2621 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2622 OBD_ALLOC(lumk, lum_size);
2623 if (!lumk)
0a3bdb00 2624 return -ENOMEM;
d7e09d03
PT
2625
2626 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2627 lmm_objects =
2628 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2629 else
2630 lmm_objects = &(lumk->lmm_objects[0]);
2631 lmm_objects->l_ost_oi = lsm->lsm_oi;
2632 } else {
2633 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2634 lumk = &lum;
2635 }
2636
2637 lumk->lmm_oi = lsm->lsm_oi;
2638 lumk->lmm_stripe_count = 1;
2639
2640 if (copy_to_user(lump, lumk, lum_size))
2641 rc = -EFAULT;
2642
2643 if (lumk != &lum)
2644 OBD_FREE(lumk, lum_size);
2645
0a3bdb00 2646 return rc;
d7e09d03
PT
2647}
2648
2649
2650static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2651 void *karg, void *uarg)
2652{
2653 struct obd_device *obd = exp->exp_obd;
2654 struct obd_ioctl_data *data = karg;
2655 int err = 0;
d7e09d03
PT
2656
2657 if (!try_module_get(THIS_MODULE)) {
2658 CERROR("Can't get module. Is it alive?");
2659 return -EINVAL;
2660 }
2661 switch (cmd) {
2662 case OBD_IOC_LOV_GET_CONFIG: {
2663 char *buf;
2664 struct lov_desc *desc;
2665 struct obd_uuid uuid;
2666
2667 buf = NULL;
2668 len = 0;
26c4ea46
TJ
2669 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2670 err = -EINVAL;
2671 goto out;
2672 }
d7e09d03
PT
2673
2674 data = (struct obd_ioctl_data *)buf;
2675
2676 if (sizeof(*desc) > data->ioc_inllen1) {
2677 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2678 err = -EINVAL;
2679 goto out;
d7e09d03
PT
2680 }
2681
2682 if (data->ioc_inllen2 < sizeof(uuid)) {
2683 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2684 err = -EINVAL;
2685 goto out;
d7e09d03
PT
2686 }
2687
2688 desc = (struct lov_desc *)data->ioc_inlbuf1;
2689 desc->ld_tgt_count = 1;
2690 desc->ld_active_tgt_count = 1;
2691 desc->ld_default_stripe_count = 1;
2692 desc->ld_default_stripe_size = 0;
2693 desc->ld_default_stripe_offset = 0;
2694 desc->ld_pattern = 0;
2695 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2696
2697 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2698
2699 err = copy_to_user((void *)uarg, buf, len);
2700 if (err)
2701 err = -EFAULT;
2702 obd_ioctl_freedata(buf, len);
26c4ea46 2703 goto out;
d7e09d03
PT
2704 }
2705 case LL_IOC_LOV_SETSTRIPE:
2706 err = obd_alloc_memmd(exp, karg);
2707 if (err > 0)
2708 err = 0;
26c4ea46 2709 goto out;
d7e09d03
PT
2710 case LL_IOC_LOV_GETSTRIPE:
2711 err = osc_getstripe(karg, uarg);
26c4ea46 2712 goto out;
d7e09d03
PT
2713 case OBD_IOC_CLIENT_RECOVER:
2714 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2715 data->ioc_inlbuf1, 0);
2716 if (err > 0)
2717 err = 0;
26c4ea46 2718 goto out;
d7e09d03
PT
2719 case IOC_OSC_SET_ACTIVE:
2720 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2721 data->ioc_offset);
26c4ea46 2722 goto out;
d7e09d03
PT
2723 case OBD_IOC_POLL_QUOTACHECK:
2724 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
26c4ea46 2725 goto out;
d7e09d03
PT
2726 case OBD_IOC_PING_TARGET:
2727 err = ptlrpc_obd_ping(obd);
26c4ea46 2728 goto out;
d7e09d03
PT
2729 default:
2730 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2731 cmd, current_comm());
26c4ea46
TJ
2732 err = -ENOTTY;
2733 goto out;
d7e09d03
PT
2734 }
2735out:
2736 module_put(THIS_MODULE);
2737 return err;
2738}
2739
2740static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2741 u32 keylen, void *key, __u32 *vallen, void *val,
d7e09d03
PT
2742 struct lov_stripe_md *lsm)
2743{
d7e09d03 2744 if (!vallen || !val)
0a3bdb00 2745 return -EFAULT;
d7e09d03
PT
2746
2747 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2748 __u32 *stripe = val;
2749 *vallen = sizeof(*stripe);
2750 *stripe = 0;
0a3bdb00 2751 return 0;
d7e09d03
PT
2752 } else if (KEY_IS(KEY_LAST_ID)) {
2753 struct ptlrpc_request *req;
21aef7d9 2754 u64 *reply;
d7e09d03
PT
2755 char *tmp;
2756 int rc;
2757
2758 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2759 &RQF_OST_GET_INFO_LAST_ID);
2760 if (req == NULL)
0a3bdb00 2761 return -ENOMEM;
d7e09d03
PT
2762
2763 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2764 RCL_CLIENT, keylen);
2765 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2766 if (rc) {
2767 ptlrpc_request_free(req);
0a3bdb00 2768 return rc;
d7e09d03
PT
2769 }
2770
2771 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2772 memcpy(tmp, key, keylen);
2773
2774 req->rq_no_delay = req->rq_no_resend = 1;
2775 ptlrpc_request_set_replen(req);
2776 rc = ptlrpc_queue_wait(req);
2777 if (rc)
26c4ea46 2778 goto out;
d7e09d03
PT
2779
2780 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
26c4ea46
TJ
2781 if (reply == NULL) {
2782 rc = -EPROTO;
2783 goto out;
2784 }
d7e09d03 2785
21aef7d9 2786 *((u64 *)val) = *reply;
d7e09d03
PT
2787 out:
2788 ptlrpc_req_finished(req);
0a3bdb00 2789 return rc;
d7e09d03 2790 } else if (KEY_IS(KEY_FIEMAP)) {
9d865439
AB
2791 struct ll_fiemap_info_key *fm_key =
2792 (struct ll_fiemap_info_key *)key;
2793 struct ldlm_res_id res_id;
2794 ldlm_policy_data_t policy;
2795 struct lustre_handle lockh;
2796 ldlm_mode_t mode = 0;
2797 struct ptlrpc_request *req;
2798 struct ll_user_fiemap *reply;
2799 char *tmp;
2800 int rc;
2801
2802 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2803 goto skip_locking;
2804
2805 policy.l_extent.start = fm_key->fiemap.fm_start &
2806 CFS_PAGE_MASK;
2807
2808 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2809 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2810 policy.l_extent.end = OBD_OBJECT_EOF;
2811 else
2812 policy.l_extent.end = (fm_key->fiemap.fm_start +
2813 fm_key->fiemap.fm_length +
2814 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2815
2816 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2817 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2818 LDLM_FL_BLOCK_GRANTED |
2819 LDLM_FL_LVB_READY,
2820 &res_id, LDLM_EXTENT, &policy,
2821 LCK_PR | LCK_PW, &lockh, 0);
2822 if (mode) { /* lock is cached on client */
2823 if (mode != LCK_PR) {
2824 ldlm_lock_addref(&lockh, LCK_PR);
2825 ldlm_lock_decref(&lockh, LCK_PW);
2826 }
2827 } else { /* no cached lock, needs acquire lock on server side */
2828 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2829 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2830 }
d7e09d03 2831
9d865439 2832skip_locking:
d7e09d03
PT
2833 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2834 &RQF_OST_GET_INFO_FIEMAP);
26c4ea46
TJ
2835 if (req == NULL) {
2836 rc = -ENOMEM;
2837 goto drop_lock;
2838 }
d7e09d03
PT
2839
2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2841 RCL_CLIENT, keylen);
2842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843 RCL_CLIENT, *vallen);
2844 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2845 RCL_SERVER, *vallen);
2846
2847 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2848 if (rc) {
2849 ptlrpc_request_free(req);
26c4ea46 2850 goto drop_lock;
d7e09d03
PT
2851 }
2852
2853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2854 memcpy(tmp, key, keylen);
2855 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2856 memcpy(tmp, val, *vallen);
2857
2858 ptlrpc_request_set_replen(req);
2859 rc = ptlrpc_queue_wait(req);
2860 if (rc)
26c4ea46 2861 goto fini_req;
d7e09d03
PT
2862
2863 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
26c4ea46
TJ
2864 if (reply == NULL) {
2865 rc = -EPROTO;
2866 goto fini_req;
2867 }
d7e09d03
PT
2868
2869 memcpy(val, reply, *vallen);
9d865439 2870fini_req:
d7e09d03 2871 ptlrpc_req_finished(req);
9d865439
AB
2872drop_lock:
2873 if (mode)
2874 ldlm_lock_decref(&lockh, LCK_PR);
0a3bdb00 2875 return rc;
d7e09d03
PT
2876 }
2877
0a3bdb00 2878 return -EINVAL;
d7e09d03
PT
2879}
2880
2881static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2882 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
2883 void *val, struct ptlrpc_request_set *set)
2884{
2885 struct ptlrpc_request *req;
2886 struct obd_device *obd = exp->exp_obd;
2887 struct obd_import *imp = class_exp2cliimp(exp);
2888 char *tmp;
2889 int rc;
d7e09d03
PT
2890
2891 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2892
2893 if (KEY_IS(KEY_CHECKSUM)) {
2894 if (vallen != sizeof(int))
0a3bdb00 2895 return -EINVAL;
d7e09d03 2896 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
0a3bdb00 2897 return 0;
d7e09d03
PT
2898 }
2899
2900 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2901 sptlrpc_conf_client_adapt(obd);
0a3bdb00 2902 return 0;
d7e09d03
PT
2903 }
2904
2905 if (KEY_IS(KEY_FLUSH_CTX)) {
2906 sptlrpc_import_flush_my_ctx(imp);
0a3bdb00 2907 return 0;
d7e09d03
PT
2908 }
2909
2910 if (KEY_IS(KEY_CACHE_SET)) {
2911 struct client_obd *cli = &obd->u.cli;
2912
2913 LASSERT(cli->cl_cache == NULL); /* only once */
2914 cli->cl_cache = (struct cl_client_cache *)val;
2915 atomic_inc(&cli->cl_cache->ccc_users);
2916 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2917
2918 /* add this osc into entity list */
2919 LASSERT(list_empty(&cli->cl_lru_osc));
2920 spin_lock(&cli->cl_cache->ccc_lru_lock);
2921 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2922 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2923
0a3bdb00 2924 return 0;
d7e09d03
PT
2925 }
2926
2927 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2928 struct client_obd *cli = &obd->u.cli;
2929 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2930 int target = *(int *)val;
2931
2932 nr = osc_lru_shrink(cli, min(nr, target));
2933 *(int *)val -= nr;
0a3bdb00 2934 return 0;
d7e09d03
PT
2935 }
2936
2937 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
0a3bdb00 2938 return -EINVAL;
d7e09d03
PT
2939
2940 /* We pass all other commands directly to OST. Since nobody calls osc
2941 methods directly and everybody is supposed to go through LOV, we
2942 assume lov checked invalid values for us.
2943 The only recognised values so far are evict_by_nid and mds_conn.
2944 Even if something bad goes through, we'd get a -EINVAL from OST
2945 anyway. */
2946
2947 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2948 &RQF_OST_SET_GRANT_INFO :
2949 &RQF_OBD_SET_INFO);
2950 if (req == NULL)
0a3bdb00 2951 return -ENOMEM;
d7e09d03
PT
2952
2953 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2954 RCL_CLIENT, keylen);
2955 if (!KEY_IS(KEY_GRANT_SHRINK))
2956 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2957 RCL_CLIENT, vallen);
2958 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2959 if (rc) {
2960 ptlrpc_request_free(req);
0a3bdb00 2961 return rc;
d7e09d03
PT
2962 }
2963
2964 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2965 memcpy(tmp, key, keylen);
2966 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2967 &RMF_OST_BODY :
2968 &RMF_SETINFO_VAL);
2969 memcpy(tmp, val, vallen);
2970
2971 if (KEY_IS(KEY_GRANT_SHRINK)) {
f024bad4 2972 struct osc_brw_async_args *aa;
d7e09d03
PT
2973 struct obdo *oa;
2974
2975 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2976 aa = ptlrpc_req_async_args(req);
2977 OBDO_ALLOC(oa);
2978 if (!oa) {
2979 ptlrpc_req_finished(req);
0a3bdb00 2980 return -ENOMEM;
d7e09d03
PT
2981 }
2982 *oa = ((struct ost_body *)val)->oa;
2983 aa->aa_oa = oa;
2984 req->rq_interpret_reply = osc_shrink_grant_interpret;
2985 }
2986
2987 ptlrpc_request_set_replen(req);
2988 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2989 LASSERT(set != NULL);
2990 ptlrpc_set_add_req(set, req);
2991 ptlrpc_check_set(NULL, set);
2992 } else
2993 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2994
0a3bdb00 2995 return 0;
d7e09d03
PT
2996}
2997
d7e09d03
PT
2998static int osc_reconnect(const struct lu_env *env,
2999 struct obd_export *exp, struct obd_device *obd,
3000 struct obd_uuid *cluuid,
3001 struct obd_connect_data *data,
3002 void *localdata)
3003{
3004 struct client_obd *cli = &obd->u.cli;
3005
3006 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3007 long lost_grant;
3008
3009 client_obd_list_lock(&cli->cl_loi_list_lock);
3010 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3011 2 * cli_brw_size(obd);
3012 lost_grant = cli->cl_lost_grant;
3013 cli->cl_lost_grant = 0;
3014 client_obd_list_unlock(&cli->cl_loi_list_lock);
3015
55f5a824 3016 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
d7e09d03
PT
3017 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3018 data->ocd_version, data->ocd_grant, lost_grant);
3019 }
3020
0a3bdb00 3021 return 0;
d7e09d03
PT
3022}
3023
3024static int osc_disconnect(struct obd_export *exp)
3025{
3026 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
3027 int rc;
3028
d7e09d03
PT
3029 rc = client_disconnect_export(exp);
3030 /**
3031 * Initially we put del_shrink_grant before disconnect_export, but it
3032 * causes the following problem if setup (connect) and cleanup
3033 * (disconnect) are tangled together.
3034 * connect p1 disconnect p2
3035 * ptlrpc_connect_import
3036 * ............... class_manual_cleanup
3037 * osc_disconnect
3038 * del_shrink_grant
3039 * ptlrpc_connect_interrupt
3040 * init_grant_shrink
3041 * add this client to shrink list
3042 * cleanup_osc
3043 * Bang! pinger trigger the shrink.
3044 * So the osc should be disconnected from the shrink list, after we
3045 * are sure the import has been destroyed. BUG18662
3046 */
3047 if (obd->u.cli.cl_import == NULL)
3048 osc_del_shrink_grant(&obd->u.cli);
3049 return rc;
3050}
3051
3052static int osc_import_event(struct obd_device *obd,
3053 struct obd_import *imp,
3054 enum obd_import_event event)
3055{
3056 struct client_obd *cli;
3057 int rc = 0;
3058
d7e09d03
PT
3059 LASSERT(imp->imp_obd == obd);
3060
3061 switch (event) {
3062 case IMP_EVENT_DISCON: {
3063 cli = &obd->u.cli;
3064 client_obd_list_lock(&cli->cl_loi_list_lock);
3065 cli->cl_avail_grant = 0;
3066 cli->cl_lost_grant = 0;
3067 client_obd_list_unlock(&cli->cl_loi_list_lock);
3068 break;
3069 }
3070 case IMP_EVENT_INACTIVE: {
3071 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3072 break;
3073 }
3074 case IMP_EVENT_INVALIDATE: {
3075 struct ldlm_namespace *ns = obd->obd_namespace;
3076 struct lu_env *env;
3077 int refcheck;
3078
3079 env = cl_env_get(&refcheck);
3080 if (!IS_ERR(env)) {
3081 /* Reset grants */
3082 cli = &obd->u.cli;
3083 /* all pages go to failing rpcs due to the invalid
3084 * import */
3085 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3086
3087 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3088 cl_env_put(env, &refcheck);
3089 } else
3090 rc = PTR_ERR(env);
3091 break;
3092 }
3093 case IMP_EVENT_ACTIVE: {
3094 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3095 break;
3096 }
3097 case IMP_EVENT_OCD: {
3098 struct obd_connect_data *ocd = &imp->imp_connect_data;
3099
3100 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3101 osc_init_grant(&obd->u.cli, ocd);
3102
3103 /* See bug 7198 */
3104 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3105 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3106
3107 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3108 break;
3109 }
3110 case IMP_EVENT_DEACTIVATE: {
3111 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3112 break;
3113 }
3114 case IMP_EVENT_ACTIVATE: {
3115 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3116 break;
3117 }
3118 default:
3119 CERROR("Unknown import event %d\n", event);
3120 LBUG();
3121 }
0a3bdb00 3122 return rc;
d7e09d03
PT
3123}
3124
3125/**
3126 * Determine whether the lock can be canceled before replaying the lock
3127 * during recovery, see bug16774 for detailed information.
3128 *
3129 * \retval zero the lock can't be canceled
3130 * \retval other ok to cancel
3131 */
3132static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3133{
3134 check_res_locked(lock->l_resource);
3135
3136 /*
3137 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3138 *
3139 * XXX as a future improvement, we can also cancel unused write lock
3140 * if it doesn't have dirty data and active mmaps.
3141 */
3142 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3143 (lock->l_granted_mode == LCK_PR ||
3144 lock->l_granted_mode == LCK_CR) &&
3145 (osc_dlm_lock_pageref(lock) == 0))
0a3bdb00 3146 return 1;
d7e09d03 3147
0a3bdb00 3148 return 0;
d7e09d03
PT
3149}
3150
3151static int brw_queue_work(const struct lu_env *env, void *data)
3152{
3153 struct client_obd *cli = data;
3154
3155 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3156
3157 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
0a3bdb00 3158 return 0;
d7e09d03
PT
3159}
3160
3161int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3162{
ea7893bb 3163 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3164 struct client_obd *cli = &obd->u.cli;
3165 void *handler;
3166 int rc;
d7e09d03
PT
3167
3168 rc = ptlrpcd_addref();
3169 if (rc)
0a3bdb00 3170 return rc;
d7e09d03
PT
3171
3172 rc = client_obd_setup(obd, lcfg);
3173 if (rc)
26c4ea46 3174 goto out_ptlrpcd;
d7e09d03
PT
3175
3176 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
26c4ea46
TJ
3177 if (IS_ERR(handler)) {
3178 rc = PTR_ERR(handler);
3179 goto out_client_setup;
3180 }
d7e09d03
PT
3181 cli->cl_writeback_work = handler;
3182
3183 rc = osc_quota_setup(obd);
3184 if (rc)
26c4ea46 3185 goto out_ptlrpcd_work;
d7e09d03
PT
3186
3187 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188 lprocfs_osc_init_vars(&lvars);
3189 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3190 lproc_osc_attach_seqstat(obd);
3191 sptlrpc_lprocfs_cliobd_attach(obd);
3192 ptlrpc_lprocfs_register_obd(obd);
3193 }
3194
3195 /* We need to allocate a few requests more, because
3196 * brw_interpret tries to create new requests before freeing
3197 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3198 * reserved, but I'm afraid that might be too much wasted RAM
3199 * in fact, so 2 is just my guess and still should work. */
3200 cli->cl_import->imp_rq_pool =
3201 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3202 OST_MAXREQSIZE,
3203 ptlrpc_add_rqs_to_pool);
3204
3205 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3206 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
0a3bdb00 3207 return rc;
d7e09d03
PT
3208
3209out_ptlrpcd_work:
3210 ptlrpcd_destroy_work(handler);
3211out_client_setup:
3212 client_obd_cleanup(obd);
3213out_ptlrpcd:
3214 ptlrpcd_decref();
0a3bdb00 3215 return rc;
d7e09d03
PT
3216}
3217
3218static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3219{
d7e09d03
PT
3220 switch (stage) {
3221 case OBD_CLEANUP_EARLY: {
3222 struct obd_import *imp;
3223 imp = obd->u.cli.cl_import;
3224 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3225 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3226 ptlrpc_deactivate_import(imp);
3227 spin_lock(&imp->imp_lock);
3228 imp->imp_pingable = 0;
3229 spin_unlock(&imp->imp_lock);
3230 break;
3231 }
3232 case OBD_CLEANUP_EXPORTS: {
3233 struct client_obd *cli = &obd->u.cli;
3234 /* LU-464
3235 * for echo client, export may be on zombie list, wait for
3236 * zombie thread to cull it, because cli.cl_import will be
3237 * cleared in client_disconnect_export():
3238 * class_export_destroy() -> obd_cleanup() ->
3239 * echo_device_free() -> echo_client_cleanup() ->
3240 * obd_disconnect() -> osc_disconnect() ->
3241 * client_disconnect_export()
3242 */
3243 obd_zombie_barrier();
3244 if (cli->cl_writeback_work) {
3245 ptlrpcd_destroy_work(cli->cl_writeback_work);
3246 cli->cl_writeback_work = NULL;
3247 }
3248 obd_cleanup_client_import(obd);
3249 ptlrpc_lprocfs_unregister_obd(obd);
3250 lprocfs_obd_cleanup(obd);
d7e09d03
PT
3251 break;
3252 }
3253 }
41f8d410 3254 return 0;
d7e09d03
PT
3255}
3256
3257int osc_cleanup(struct obd_device *obd)
3258{
3259 struct client_obd *cli = &obd->u.cli;
3260 int rc;
3261
d7e09d03
PT
3262 /* lru cleanup */
3263 if (cli->cl_cache != NULL) {
3264 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3265 spin_lock(&cli->cl_cache->ccc_lru_lock);
3266 list_del_init(&cli->cl_lru_osc);
3267 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3268 cli->cl_lru_left = NULL;
3269 atomic_dec(&cli->cl_cache->ccc_users);
3270 cli->cl_cache = NULL;
3271 }
3272
3273 /* free memory of osc quota cache */
3274 osc_quota_cleanup(obd);
3275
3276 rc = client_obd_cleanup(obd);
3277
3278 ptlrpcd_decref();
0a3bdb00 3279 return rc;
d7e09d03
PT
3280}
3281
3282int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3283{
ea7893bb 3284 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3285 int rc = 0;
3286
3287 lprocfs_osc_init_vars(&lvars);
3288
3289 switch (lcfg->lcfg_command) {
3290 default:
3291 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3292 lcfg, obd);
3293 if (rc > 0)
3294 rc = 0;
3295 break;
3296 }
3297
fbe7c6c7 3298 return rc;
d7e09d03
PT
3299}
3300
21aef7d9 3301static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
3302{
3303 return osc_process_config_base(obd, buf);
3304}
3305
3306struct obd_ops osc_obd_ops = {
3307 .o_owner = THIS_MODULE,
3308 .o_setup = osc_setup,
3309 .o_precleanup = osc_precleanup,
3310 .o_cleanup = osc_cleanup,
3311 .o_add_conn = client_import_add_conn,
3312 .o_del_conn = client_import_del_conn,
3313 .o_connect = client_connect_import,
3314 .o_reconnect = osc_reconnect,
3315 .o_disconnect = osc_disconnect,
3316 .o_statfs = osc_statfs,
3317 .o_statfs_async = osc_statfs_async,
3318 .o_packmd = osc_packmd,
3319 .o_unpackmd = osc_unpackmd,
3320 .o_create = osc_create,
3321 .o_destroy = osc_destroy,
3322 .o_getattr = osc_getattr,
3323 .o_getattr_async = osc_getattr_async,
3324 .o_setattr = osc_setattr,
3325 .o_setattr_async = osc_setattr_async,
d7e09d03 3326 .o_find_cbdata = osc_find_cbdata,
d7e09d03
PT
3327 .o_iocontrol = osc_iocontrol,
3328 .o_get_info = osc_get_info,
3329 .o_set_info_async = osc_set_info_async,
3330 .o_import_event = osc_import_event,
d7e09d03
PT
3331 .o_process_config = osc_process_config,
3332 .o_quotactl = osc_quotactl,
3333 .o_quotacheck = osc_quotacheck,
3334};
3335
3336extern struct lu_kmem_descr osc_caches[];
3337extern spinlock_t osc_ast_guard;
3338extern struct lock_class_key osc_ast_guard_class;
3339
3340int __init osc_init(void)
3341{
ea7893bb 3342 struct lprocfs_static_vars lvars = { NULL };
d7e09d03 3343 int rc;
d7e09d03
PT
3344
3345 /* print an address of _any_ initialized kernel symbol from this
3346 * module, to allow debugging with gdb that doesn't support data
3347 * symbols from modules.*/
3348 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3349
3350 rc = lu_kmem_init(osc_caches);
a55e0f44 3351 if (rc)
0a3bdb00 3352 return rc;
d7e09d03
PT
3353
3354 lprocfs_osc_init_vars(&lvars);
3355
3356 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3357 LUSTRE_OSC_NAME, &osc_device_type);
3358 if (rc) {
3359 lu_kmem_fini(osc_caches);
0a3bdb00 3360 return rc;
d7e09d03
PT
3361 }
3362
3363 spin_lock_init(&osc_ast_guard);
3364 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3365
0a3bdb00 3366 return rc;
d7e09d03
PT
3367}
3368
3369static void /*__exit*/ osc_exit(void)
3370{
3371 class_unregister_type(LUSTRE_OSC_NAME);
3372 lu_kmem_fini(osc_caches);
3373}
3374
3375MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3376MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3377MODULE_LICENSE("GPL");
6960736c 3378MODULE_VERSION(LUSTRE_VERSION_STRING);
d7e09d03 3379
6960736c
GKH
3380module_init(osc_init);
3381module_exit(osc_exit);
This page took 0.638562 seconds and 5 git commands to generate.