staging: lustre: mgc: Remove useless cast on void pointer
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
9fdaf8c0 39#include "../../include/linux/libcfs/libcfs.h"
d7e09d03 40
3ee30015
GKH
41#include "../include/lustre_dlm.h"
42#include "../include/lustre_net.h"
43#include "../include/lustre/lustre_user.h"
44#include "../include/obd_cksum.h"
d7e09d03 45
3ee30015
GKH
46#include "../include/lustre_ha.h"
47#include "../include/lprocfs_status.h"
3ee30015
GKH
48#include "../include/lustre_debug.h"
49#include "../include/lustre_param.h"
50#include "../include/lustre_fid.h"
dd45f477 51#include "../include/obd_class.h"
aefd9d71 52#include "../include/obd.h"
d7e09d03
PT
53#include "osc_internal.h"
54#include "osc_cl_internal.h"
55
aefd9d71
LX
56atomic_t osc_pool_req_count;
57unsigned int osc_reqpool_maxreqcount;
58struct ptlrpc_request_pool *osc_rq_pool;
59
60/* max memory used for request pool, unit is MB */
61static unsigned int osc_reqpool_mem_max = 5;
62module_param(osc_reqpool_mem_max, uint, 0444);
63
f024bad4
JH
64struct osc_brw_async_args {
65 struct obdo *aa_oa;
66 int aa_requested_nob;
67 int aa_nio_count;
68 u32 aa_page_count;
69 int aa_resends;
70 struct brw_page **aa_ppga;
71 struct client_obd *aa_cli;
72 struct list_head aa_oaps;
73 struct list_head aa_exts;
f024bad4
JH
74 struct cl_req *aa_clerq;
75};
76
77struct osc_async_args {
78 struct obd_info *aa_oi;
79};
80
81struct osc_setattr_args {
82 struct obdo *sa_oa;
83 obd_enqueue_update_f sa_upcall;
84 void *sa_cookie;
85};
86
87struct osc_fsync_args {
88 struct obd_info *fa_oi;
89 obd_enqueue_update_f fa_upcall;
90 void *fa_cookie;
91};
92
93struct osc_enqueue_args {
94 struct obd_export *oa_exp;
95 __u64 *oa_flags;
96 obd_enqueue_update_f oa_upcall;
97 void *oa_cookie;
98 struct ost_lvb *oa_lvb;
99 struct lustre_handle *oa_lockh;
100 struct ldlm_enqueue_info *oa_ei;
101 unsigned int oa_agl:1;
102};
103
21aef7d9 104static void osc_release_ppga(struct brw_page **ppga, u32 count);
d7e09d03
PT
105static int brw_interpret(const struct lu_env *env,
106 struct ptlrpc_request *req, void *data, int rc);
107int osc_cleanup(struct obd_device *obd);
108
109/* Pack OSC object metadata for disk storage (LE byte order). */
110static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
111 struct lov_stripe_md *lsm)
112{
113 int lmm_size;
d7e09d03
PT
114
115 lmm_size = sizeof(**lmmp);
116 if (lmmp == NULL)
0a3bdb00 117 return lmm_size;
d7e09d03
PT
118
119 if (*lmmp != NULL && lsm == NULL) {
7795178d 120 kfree(*lmmp);
d7e09d03 121 *lmmp = NULL;
0a3bdb00 122 return 0;
d7e09d03 123 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
0a3bdb00 124 return -EBADF;
d7e09d03
PT
125 }
126
127 if (*lmmp == NULL) {
7795178d 128 *lmmp = kzalloc(lmm_size, GFP_NOFS);
3408e9ae 129 if (!*lmmp)
0a3bdb00 130 return -ENOMEM;
d7e09d03
PT
131 }
132
133 if (lsm)
134 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
135
0a3bdb00 136 return lmm_size;
d7e09d03
PT
137}
138
139/* Unpack OSC object metadata from disk storage (LE byte order). */
140static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
141 struct lov_mds_md *lmm, int lmm_bytes)
142{
143 int lsm_size;
144 struct obd_import *imp = class_exp2cliimp(exp);
d7e09d03
PT
145
146 if (lmm != NULL) {
147 if (lmm_bytes < sizeof(*lmm)) {
148 CERROR("%s: lov_mds_md too small: %d, need %d\n",
149 exp->exp_obd->obd_name, lmm_bytes,
150 (int)sizeof(*lmm));
0a3bdb00 151 return -EINVAL;
d7e09d03
PT
152 }
153 /* XXX LOV_MAGIC etc check? */
154
155 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
156 CERROR("%s: zero lmm_object_id: rc = %d\n",
157 exp->exp_obd->obd_name, -EINVAL);
0a3bdb00 158 return -EINVAL;
d7e09d03
PT
159 }
160 }
161
162 lsm_size = lov_stripe_md_size(1);
163 if (lsmp == NULL)
0a3bdb00 164 return lsm_size;
d7e09d03
PT
165
166 if (*lsmp != NULL && lmm == NULL) {
7795178d
JL
167 kfree((*lsmp)->lsm_oinfo[0]);
168 kfree(*lsmp);
d7e09d03 169 *lsmp = NULL;
0a3bdb00 170 return 0;
d7e09d03
PT
171 }
172
173 if (*lsmp == NULL) {
7795178d 174 *lsmp = kzalloc(lsm_size, GFP_NOFS);
d7e09d03 175 if (unlikely(*lsmp == NULL))
0a3bdb00 176 return -ENOMEM;
7795178d
JL
177 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
178 GFP_NOFS);
d7e09d03 179 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
7795178d 180 kfree(*lsmp);
0a3bdb00 181 return -ENOMEM;
d7e09d03
PT
182 }
183 loi_init((*lsmp)->lsm_oinfo[0]);
184 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
0a3bdb00 185 return -EBADF;
d7e09d03
PT
186 }
187
188 if (lmm != NULL)
189 /* XXX zero *lsmp? */
190 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
191
192 if (imp != NULL &&
193 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
194 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
195 else
196 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
197
0a3bdb00 198 return lsm_size;
d7e09d03
PT
199}
200
d7e09d03
PT
201static inline void osc_pack_req_body(struct ptlrpc_request *req,
202 struct obd_info *oinfo)
203{
204 struct ost_body *body;
205
206 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
207 LASSERT(body);
208
3b2f75fd 209 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
210 oinfo->oi_oa);
d7e09d03
PT
211}
212
213static int osc_getattr_interpret(const struct lu_env *env,
214 struct ptlrpc_request *req,
215 struct osc_async_args *aa, int rc)
216{
217 struct ost_body *body;
d7e09d03
PT
218
219 if (rc != 0)
26c4ea46 220 goto out;
d7e09d03
PT
221
222 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223 if (body) {
224 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 225 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
226 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
227
228 /* This should really be sent by the OST */
229 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
230 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
231 } else {
232 CDEBUG(D_INFO, "can't unpack ost_body\n");
233 rc = -EPROTO;
234 aa->aa_oi->oi_oa->o_valid = 0;
235 }
236out:
237 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 238 return rc;
d7e09d03
PT
239}
240
241static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
242 struct ptlrpc_request_set *set)
243{
244 struct ptlrpc_request *req;
245 struct osc_async_args *aa;
29ac6840 246 int rc;
d7e09d03
PT
247
248 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
249 if (req == NULL)
0a3bdb00 250 return -ENOMEM;
d7e09d03 251
d7e09d03
PT
252 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253 if (rc) {
254 ptlrpc_request_free(req);
0a3bdb00 255 return rc;
d7e09d03
PT
256 }
257
258 osc_pack_req_body(req, oinfo);
259
260 ptlrpc_request_set_replen(req);
261 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262
263 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264 aa = ptlrpc_req_async_args(req);
265 aa->aa_oi = oinfo;
266
267 ptlrpc_set_add_req(set, req);
0a3bdb00 268 return 0;
d7e09d03
PT
269}
270
271static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
272 struct obd_info *oinfo)
273{
274 struct ptlrpc_request *req;
29ac6840
CH
275 struct ost_body *body;
276 int rc;
d7e09d03
PT
277
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279 if (req == NULL)
0a3bdb00 280 return -ENOMEM;
d7e09d03 281
d7e09d03
PT
282 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283 if (rc) {
284 ptlrpc_request_free(req);
0a3bdb00 285 return rc;
d7e09d03
PT
286 }
287
288 osc_pack_req_body(req, oinfo);
289
290 ptlrpc_request_set_replen(req);
291
292 rc = ptlrpc_queue_wait(req);
293 if (rc)
26c4ea46 294 goto out;
d7e09d03
PT
295
296 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
297 if (body == NULL) {
298 rc = -EPROTO;
299 goto out;
300 }
d7e09d03
PT
301
302 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 303 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
304 &body->oa);
d7e09d03
PT
305
306 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
307 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
308
d7e09d03
PT
309 out:
310 ptlrpc_req_finished(req);
311 return rc;
312}
313
314static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
315 struct obd_info *oinfo, struct obd_trans_info *oti)
316{
317 struct ptlrpc_request *req;
29ac6840
CH
318 struct ost_body *body;
319 int rc;
d7e09d03
PT
320
321 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322
323 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 if (req == NULL)
0a3bdb00 325 return -ENOMEM;
d7e09d03 326
d7e09d03
PT
327 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328 if (rc) {
329 ptlrpc_request_free(req);
0a3bdb00 330 return rc;
d7e09d03
PT
331 }
332
333 osc_pack_req_body(req, oinfo);
334
335 ptlrpc_request_set_replen(req);
336
337 rc = ptlrpc_queue_wait(req);
338 if (rc)
26c4ea46 339 goto out;
d7e09d03
PT
340
341 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
342 if (body == NULL) {
343 rc = -EPROTO;
344 goto out;
345 }
d7e09d03 346
3b2f75fd 347 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
348 &body->oa);
d7e09d03 349
d7e09d03
PT
350out:
351 ptlrpc_req_finished(req);
0a3bdb00 352 return rc;
d7e09d03
PT
353}
354
355static int osc_setattr_interpret(const struct lu_env *env,
356 struct ptlrpc_request *req,
357 struct osc_setattr_args *sa, int rc)
358{
359 struct ost_body *body;
d7e09d03
PT
360
361 if (rc != 0)
26c4ea46 362 goto out;
d7e09d03
PT
363
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
365 if (body == NULL) {
366 rc = -EPROTO;
367 goto out;
368 }
d7e09d03 369
3b2f75fd 370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
371 &body->oa);
d7e09d03
PT
372out:
373 rc = sa->sa_upcall(sa->sa_cookie, rc);
0a3bdb00 374 return rc;
d7e09d03
PT
375}
376
377int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
378 struct obd_trans_info *oti,
379 obd_enqueue_update_f upcall, void *cookie,
380 struct ptlrpc_request_set *rqset)
381{
29ac6840 382 struct ptlrpc_request *req;
d7e09d03 383 struct osc_setattr_args *sa;
29ac6840 384 int rc;
d7e09d03
PT
385
386 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
387 if (req == NULL)
0a3bdb00 388 return -ENOMEM;
d7e09d03 389
d7e09d03
PT
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391 if (rc) {
392 ptlrpc_request_free(req);
0a3bdb00 393 return rc;
d7e09d03
PT
394 }
395
396 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399 osc_pack_req_body(req, oinfo);
400
401 ptlrpc_request_set_replen(req);
402
403 /* do mds to ost setattr asynchronously */
404 if (!rqset) {
405 /* Do not wait for response. */
c5c4c6fa 406 ptlrpcd_add_req(req);
d7e09d03
PT
407 } else {
408 req->rq_interpret_reply =
409 (ptlrpc_interpterer_t)osc_setattr_interpret;
410
e72f36e2 411 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03
PT
412 sa = ptlrpc_req_async_args(req);
413 sa->sa_oa = oinfo->oi_oa;
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
416
417 if (rqset == PTLRPCD_SET)
c5c4c6fa 418 ptlrpcd_add_req(req);
d7e09d03
PT
419 else
420 ptlrpc_set_add_req(rqset, req);
421 }
422
0a3bdb00 423 return 0;
d7e09d03
PT
424}
425
426static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 struct obd_trans_info *oti,
428 struct ptlrpc_request_set *rqset)
429{
430 return osc_setattr_async_base(exp, oinfo, oti,
431 oinfo->oi_cb_up, oinfo, rqset);
432}
433
434int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 struct lov_stripe_md **ea, struct obd_trans_info *oti)
436{
437 struct ptlrpc_request *req;
29ac6840
CH
438 struct ost_body *body;
439 struct lov_stripe_md *lsm;
440 int rc;
d7e09d03
PT
441
442 LASSERT(oa);
443 LASSERT(ea);
444
445 lsm = *ea;
446 if (!lsm) {
447 rc = obd_alloc_memmd(exp, &lsm);
448 if (rc < 0)
0a3bdb00 449 return rc;
d7e09d03
PT
450 }
451
452 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
26c4ea46
TJ
453 if (req == NULL) {
454 rc = -ENOMEM;
455 goto out;
456 }
d7e09d03
PT
457
458 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459 if (rc) {
460 ptlrpc_request_free(req);
26c4ea46 461 goto out;
d7e09d03
PT
462 }
463
464 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465 LASSERT(body);
3b2f75fd 466
467 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
468
469 ptlrpc_request_set_replen(req);
470
471 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
472 oa->o_flags == OBD_FL_DELORPHAN) {
473 DEBUG_REQ(D_HA, req,
474 "delorphan from OST integration");
475 /* Don't resend the delorphan req */
476 req->rq_no_resend = req->rq_no_delay = 1;
477 }
478
479 rc = ptlrpc_queue_wait(req);
480 if (rc)
26c4ea46 481 goto out_req;
d7e09d03
PT
482
483 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
484 if (body == NULL) {
485 rc = -EPROTO;
486 goto out_req;
487 }
d7e09d03 488
3b2f75fd 489 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
490 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
491
492 oa->o_blksize = cli_brw_size(exp->exp_obd);
493 oa->o_valid |= OBD_MD_FLBLKSZ;
494
495 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
496 * have valid lsm_oinfo data structs, so don't go touching that.
497 * This needs to be fixed in a big way.
498 */
499 lsm->lsm_oi = oa->o_oi;
500 *ea = lsm;
501
502 if (oti != NULL) {
503 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
504
505 if (oa->o_valid & OBD_MD_FLCOOKIE) {
506 if (!oti->oti_logcookies)
507 oti_alloc_cookies(oti, 1);
508 *oti->oti_logcookies = oa->o_lcookie;
509 }
510 }
511
f537dd2c 512 CDEBUG(D_HA, "transno: %lld\n",
d7e09d03
PT
513 lustre_msg_get_transno(req->rq_repmsg));
514out_req:
515 ptlrpc_req_finished(req);
516out:
517 if (rc && !*ea)
518 obd_free_memmd(exp, &lsm);
0a3bdb00 519 return rc;
d7e09d03
PT
520}
521
522int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
523 obd_enqueue_update_f upcall, void *cookie,
524 struct ptlrpc_request_set *rqset)
525{
29ac6840 526 struct ptlrpc_request *req;
d7e09d03 527 struct osc_setattr_args *sa;
29ac6840
CH
528 struct ost_body *body;
529 int rc;
d7e09d03
PT
530
531 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 if (req == NULL)
0a3bdb00 533 return -ENOMEM;
d7e09d03 534
d7e09d03
PT
535 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 if (rc) {
537 ptlrpc_request_free(req);
0a3bdb00 538 return rc;
d7e09d03
PT
539 }
540 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541 ptlrpc_at_set_req_timeout(req);
542
543 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544 LASSERT(body);
3b2f75fd 545 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
546 oinfo->oi_oa);
d7e09d03
PT
547
548 ptlrpc_request_set_replen(req);
549
550 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
e72f36e2 551 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03 552 sa = ptlrpc_req_async_args(req);
29ac6840 553 sa->sa_oa = oinfo->oi_oa;
d7e09d03
PT
554 sa->sa_upcall = upcall;
555 sa->sa_cookie = cookie;
556 if (rqset == PTLRPCD_SET)
c5c4c6fa 557 ptlrpcd_add_req(req);
d7e09d03
PT
558 else
559 ptlrpc_set_add_req(rqset, req);
560
0a3bdb00 561 return 0;
d7e09d03
PT
562}
563
d7e09d03
PT
564static int osc_sync_interpret(const struct lu_env *env,
565 struct ptlrpc_request *req,
566 void *arg, int rc)
567{
568 struct osc_fsync_args *fa = arg;
569 struct ost_body *body;
d7e09d03
PT
570
571 if (rc)
26c4ea46 572 goto out;
d7e09d03
PT
573
574 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575 if (body == NULL) {
e72f36e2 576 CERROR("can't unpack ost_body\n");
26c4ea46
TJ
577 rc = -EPROTO;
578 goto out;
d7e09d03
PT
579 }
580
581 *fa->fa_oi->oi_oa = body->oa;
582out:
583 rc = fa->fa_upcall(fa->fa_cookie, rc);
0a3bdb00 584 return rc;
d7e09d03
PT
585}
586
587int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
588 obd_enqueue_update_f upcall, void *cookie,
589 struct ptlrpc_request_set *rqset)
590{
591 struct ptlrpc_request *req;
29ac6840 592 struct ost_body *body;
d7e09d03 593 struct osc_fsync_args *fa;
29ac6840 594 int rc;
d7e09d03
PT
595
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597 if (req == NULL)
0a3bdb00 598 return -ENOMEM;
d7e09d03 599
d7e09d03
PT
600 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 if (rc) {
602 ptlrpc_request_free(req);
0a3bdb00 603 return rc;
d7e09d03
PT
604 }
605
606 /* overload the size and blocks fields in the oa with start/end */
607 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
608 LASSERT(body);
3b2f75fd 609 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
610 oinfo->oi_oa);
d7e09d03
PT
611
612 ptlrpc_request_set_replen(req);
613 req->rq_interpret_reply = osc_sync_interpret;
614
615 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616 fa = ptlrpc_req_async_args(req);
617 fa->fa_oi = oinfo;
618 fa->fa_upcall = upcall;
619 fa->fa_cookie = cookie;
620
621 if (rqset == PTLRPCD_SET)
c5c4c6fa 622 ptlrpcd_add_req(req);
d7e09d03
PT
623 else
624 ptlrpc_set_add_req(rqset, req);
625
0a3bdb00 626 return 0;
d7e09d03
PT
627}
628
d7e09d03
PT
629/* Find and cancel locally locks matched by @mode in the resource found by
630 * @objid. Found locks are added into @cancel list. Returns the amount of
631 * locks added to @cancels list. */
632static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633 struct list_head *cancels,
875332d4 634 ldlm_mode_t mode, __u64 lock_flags)
d7e09d03
PT
635{
636 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637 struct ldlm_res_id res_id;
638 struct ldlm_resource *res;
639 int count;
d7e09d03
PT
640
641 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
642 * export) but disabled through procfs (flag in NS).
643 *
644 * This distinguishes from a case when ELC is not supported originally,
645 * when we still want to cancel locks in advance and just cancel them
646 * locally, without sending any RPC. */
647 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
0a3bdb00 648 return 0;
d7e09d03
PT
649
650 ostid_build_res_name(&oa->o_oi, &res_id);
651 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652 if (res == NULL)
0a3bdb00 653 return 0;
d7e09d03
PT
654
655 LDLM_RESOURCE_ADDREF(res);
656 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657 lock_flags, 0, NULL);
658 LDLM_RESOURCE_DELREF(res);
659 ldlm_resource_putref(res);
0a3bdb00 660 return count;
d7e09d03
PT
661}
662
663static int osc_destroy_interpret(const struct lu_env *env,
664 struct ptlrpc_request *req, void *data,
665 int rc)
666{
667 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668
669 atomic_dec(&cli->cl_destroy_in_flight);
670 wake_up(&cli->cl_destroy_waitq);
671 return 0;
672}
673
674static int osc_can_send_destroy(struct client_obd *cli)
675{
676 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
677 cli->cl_max_rpcs_in_flight) {
678 /* The destroy request can be sent */
679 return 1;
680 }
681 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
682 cli->cl_max_rpcs_in_flight) {
683 /*
684 * The counter has been modified between the two atomic
685 * operations.
686 */
687 wake_up(&cli->cl_destroy_waitq);
688 }
689 return 0;
690}
691
692int osc_create(const struct lu_env *env, struct obd_export *exp,
693 struct obdo *oa, struct lov_stripe_md **ea,
694 struct obd_trans_info *oti)
695{
696 int rc = 0;
d7e09d03
PT
697
698 LASSERT(oa);
699 LASSERT(ea);
700 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
701
702 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
703 oa->o_flags == OBD_FL_RECREATE_OBJS) {
0a3bdb00 704 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
705 }
706
707 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
0a3bdb00 708 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
709
710 /* we should not get here anymore */
711 LBUG();
712
0a3bdb00 713 return rc;
d7e09d03
PT
714}
715
716/* Destroy requests can be async always on the client, and we don't even really
717 * care about the return code since the client cannot do anything at all about
718 * a destroy failure.
719 * When the MDS is unlinking a filename, it saves the file objects into a
720 * recovery llog, and these object records are cancelled when the OST reports
721 * they were destroyed and sync'd to disk (i.e. transaction committed).
722 * If the client dies, or the OST is down when the object should be destroyed,
723 * the records are not cancelled, and when the OST reconnects to the MDS next,
724 * it will retrieve the llog unlink logs and then sends the log cancellation
725 * cookies to the MDS after committing destroy transactions. */
726static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
727 struct obdo *oa, struct lov_stripe_md *ea,
ef2e0f55 728 struct obd_trans_info *oti, struct obd_export *md_export)
d7e09d03 729{
29ac6840 730 struct client_obd *cli = &exp->exp_obd->u.cli;
d7e09d03 731 struct ptlrpc_request *req;
29ac6840 732 struct ost_body *body;
d7e09d03
PT
733 LIST_HEAD(cancels);
734 int rc, count;
d7e09d03
PT
735
736 if (!oa) {
737 CDEBUG(D_INFO, "oa NULL\n");
0a3bdb00 738 return -EINVAL;
d7e09d03
PT
739 }
740
741 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
742 LDLM_FL_DISCARD_DATA);
743
744 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
745 if (req == NULL) {
746 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 747 return -ENOMEM;
d7e09d03
PT
748 }
749
d7e09d03
PT
750 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
751 0, &cancels, count);
752 if (rc) {
753 ptlrpc_request_free(req);
0a3bdb00 754 return rc;
d7e09d03
PT
755 }
756
757 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758 ptlrpc_at_set_req_timeout(req);
759
760 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
761 oa->o_lcookie = *oti->oti_logcookies;
762 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
763 LASSERT(body);
3b2f75fd 764 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03 765
d7e09d03
PT
766 ptlrpc_request_set_replen(req);
767
11d66e89 768 /* If osc_destroy is for destroying the unlink orphan,
d7e09d03
PT
769 * sent from MDT to OST, which should not be blocked here,
770 * because the process might be triggered by ptlrpcd, and
771 * it is not good to block ptlrpcd thread (b=16006)*/
772 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
773 req->rq_interpret_reply = osc_destroy_interpret;
774 if (!osc_can_send_destroy(cli)) {
775 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
776 NULL);
777
778 /*
779 * Wait until the number of on-going destroy RPCs drops
780 * under max_rpc_in_flight
781 */
782 l_wait_event_exclusive(cli->cl_destroy_waitq,
783 osc_can_send_destroy(cli), &lwi);
784 }
785 }
786
787 /* Do not wait for response */
c5c4c6fa 788 ptlrpcd_add_req(req);
0a3bdb00 789 return 0;
d7e09d03
PT
790}
791
792static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
793 long writing_bytes)
794{
21aef7d9 795 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
d7e09d03
PT
796
797 LASSERT(!(oa->o_valid & bits));
798
799 oa->o_valid |= bits;
800 client_obd_list_lock(&cli->cl_loi_list_lock);
801 oa->o_dirty = cli->cl_dirty;
802 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
803 cli->cl_dirty_max)) {
804 CERROR("dirty %lu - %lu > dirty_max %lu\n",
805 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
806 oa->o_undirty = 0;
c52f69c5 807 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
808 atomic_read(&obd_dirty_transit_pages) >
809 (long)(obd_max_dirty_pages + 1))) {
810 /* The atomic_read() allowing the atomic_inc() are
811 * not covered by a lock thus they may safely race and trip
812 * this CERROR() unless we add in a small fudge factor (+1). */
c52f69c5 813 CERROR("dirty %d - %d > system dirty_max %d\n",
d7e09d03
PT
814 atomic_read(&obd_dirty_pages),
815 atomic_read(&obd_dirty_transit_pages),
816 obd_max_dirty_pages);
817 oa->o_undirty = 0;
818 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
819 CERROR("dirty %lu - dirty_max %lu too big???\n",
820 cli->cl_dirty, cli->cl_dirty_max);
821 oa->o_undirty = 0;
822 } else {
823 long max_in_flight = (cli->cl_max_pages_per_rpc <<
824 PAGE_CACHE_SHIFT)*
825 (cli->cl_max_rpcs_in_flight + 1);
826 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
827 }
828 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
829 oa->o_dropped = cli->cl_lost_grant;
830 cli->cl_lost_grant = 0;
831 client_obd_list_unlock(&cli->cl_loi_list_lock);
1d8cb70c 832 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
d7e09d03
PT
833 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
834
835}
836
837void osc_update_next_shrink(struct client_obd *cli)
838{
839 cli->cl_next_shrink_grant =
840 cfs_time_shift(cli->cl_grant_shrink_interval);
841 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
842 cli->cl_next_shrink_grant);
843}
844
21aef7d9 845static void __osc_update_grant(struct client_obd *cli, u64 grant)
d7e09d03
PT
846{
847 client_obd_list_lock(&cli->cl_loi_list_lock);
848 cli->cl_avail_grant += grant;
849 client_obd_list_unlock(&cli->cl_loi_list_lock);
850}
851
852static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853{
854 if (body->oa.o_valid & OBD_MD_FLGRANT) {
b0f5aad5 855 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
d7e09d03
PT
856 __osc_update_grant(cli, body->oa.o_grant);
857 }
858}
859
860static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 861 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
862 void *val, struct ptlrpc_request_set *set);
863
864static int osc_shrink_grant_interpret(const struct lu_env *env,
865 struct ptlrpc_request *req,
866 void *aa, int rc)
867{
868 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
f024bad4 869 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
d7e09d03
PT
870 struct ost_body *body;
871
872 if (rc != 0) {
873 __osc_update_grant(cli, oa->o_grant);
26c4ea46 874 goto out;
d7e09d03
PT
875 }
876
877 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
878 LASSERT(body);
879 osc_update_grant(cli, body);
880out:
881 OBDO_FREE(oa);
882 return rc;
883}
884
885static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
886{
887 client_obd_list_lock(&cli->cl_loi_list_lock);
888 oa->o_grant = cli->cl_avail_grant / 4;
889 cli->cl_avail_grant -= oa->o_grant;
890 client_obd_list_unlock(&cli->cl_loi_list_lock);
891 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
892 oa->o_valid |= OBD_MD_FLFLAGS;
893 oa->o_flags = 0;
894 }
895 oa->o_flags |= OBD_FL_SHRINK_GRANT;
896 osc_update_next_shrink(cli);
897}
898
899/* Shrink the current grant, either from some large amount to enough for a
900 * full set of in-flight RPCs, or if we have already shrunk to that limit
901 * then to enough for a single RPC. This avoids keeping more grant than
902 * needed, and avoids shrinking the grant piecemeal. */
903static int osc_shrink_grant(struct client_obd *cli)
904{
905 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
906 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
907
908 client_obd_list_lock(&cli->cl_loi_list_lock);
909 if (cli->cl_avail_grant <= target_bytes)
910 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
911 client_obd_list_unlock(&cli->cl_loi_list_lock);
912
913 return osc_shrink_grant_to_target(cli, target_bytes);
914}
915
916int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
917{
29ac6840 918 int rc = 0;
d7e09d03 919 struct ost_body *body;
d7e09d03
PT
920
921 client_obd_list_lock(&cli->cl_loi_list_lock);
922 /* Don't shrink if we are already above or below the desired limit
923 * We don't want to shrink below a single RPC, as that will negatively
924 * impact block allocation and long-term performance. */
925 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
926 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
927
928 if (target_bytes >= cli->cl_avail_grant) {
929 client_obd_list_unlock(&cli->cl_loi_list_lock);
0a3bdb00 930 return 0;
d7e09d03
PT
931 }
932 client_obd_list_unlock(&cli->cl_loi_list_lock);
933
7795178d 934 body = kzalloc(sizeof(*body), GFP_NOFS);
d7e09d03 935 if (!body)
0a3bdb00 936 return -ENOMEM;
d7e09d03
PT
937
938 osc_announce_cached(cli, &body->oa, 0);
939
940 client_obd_list_lock(&cli->cl_loi_list_lock);
941 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
942 cli->cl_avail_grant = target_bytes;
943 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
945 body->oa.o_valid |= OBD_MD_FLFLAGS;
946 body->oa.o_flags = 0;
947 }
948 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
949 osc_update_next_shrink(cli);
950
951 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
952 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
953 sizeof(*body), body, NULL);
954 if (rc != 0)
955 __osc_update_grant(cli, body->oa.o_grant);
7795178d 956 kfree(body);
0a3bdb00 957 return rc;
d7e09d03
PT
958}
959
960static int osc_should_shrink_grant(struct client_obd *client)
961{
a649ad1d
GKH
962 unsigned long time = cfs_time_current();
963 unsigned long next_shrink = client->cl_next_shrink_grant;
d7e09d03
PT
964
965 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
966 OBD_CONNECT_GRANT_SHRINK) == 0)
967 return 0;
968
969 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
970 /* Get the current RPC size directly, instead of going via:
971 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
972 * Keep comment here so that it can be found by searching. */
973 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
974
975 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
976 client->cl_avail_grant > brw_size)
977 return 1;
71e8dd9a
AM
978
979 osc_update_next_shrink(client);
d7e09d03
PT
980 }
981 return 0;
982}
983
984static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
985{
986 struct client_obd *client;
987
988 list_for_each_entry(client, &item->ti_obd_list,
989 cl_grant_shrink_list) {
990 if (osc_should_shrink_grant(client))
991 osc_shrink_grant(client);
992 }
993 return 0;
994}
995
996static int osc_add_shrink_grant(struct client_obd *client)
997{
998 int rc;
999
1000 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1001 TIMEOUT_GRANT,
1002 osc_grant_shrink_grant_cb, NULL,
1003 &client->cl_grant_shrink_list);
1004 if (rc) {
1005 CERROR("add grant client %s error %d\n",
1006 client->cl_import->imp_obd->obd_name, rc);
1007 return rc;
1008 }
1009 CDEBUG(D_CACHE, "add grant client %s \n",
1010 client->cl_import->imp_obd->obd_name);
1011 osc_update_next_shrink(client);
1012 return 0;
1013}
1014
1015static int osc_del_shrink_grant(struct client_obd *client)
1016{
1017 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1018 TIMEOUT_GRANT);
1019}
1020
1021static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022{
1023 /*
1024 * ocd_grant is the total grant amount we're expect to hold: if we've
1025 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1026 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1027 *
1028 * race is tolerable here: if we're evicted, but imp_state already
1029 * left EVICTED state, then cl_dirty must be 0 already.
1030 */
1031 client_obd_list_lock(&cli->cl_loi_list_lock);
1032 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1033 cli->cl_avail_grant = ocd->ocd_grant;
1034 else
1035 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1036
1037 if (cli->cl_avail_grant < 0) {
1038 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1039 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1040 ocd->ocd_grant, cli->cl_dirty);
1041 /* workaround for servers which do not have the patch from
1042 * LU-2679 */
1043 cli->cl_avail_grant = ocd->ocd_grant;
1044 }
1045
1046 /* determine the appropriate chunk size used by osc_extent. */
1047 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1048 client_obd_list_unlock(&cli->cl_loi_list_lock);
1049
2d00bd17
JP
1050 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1051 cli->cl_import->imp_obd->obd_name,
1052 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
d7e09d03
PT
1053
1054 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1055 list_empty(&cli->cl_grant_shrink_list))
1056 osc_add_shrink_grant(cli);
1057}
1058
1059/* We assume that the reason this OSC got a short read is because it read
1060 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1061 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1062 * this stripe never got written at or beyond this stripe offset yet. */
21aef7d9 1063static void handle_short_read(int nob_read, u32 page_count,
d7e09d03
PT
1064 struct brw_page **pga)
1065{
1066 char *ptr;
1067 int i = 0;
1068
1069 /* skip bytes read OK */
1070 while (nob_read > 0) {
e72f36e2 1071 LASSERT(page_count > 0);
d7e09d03
PT
1072
1073 if (pga[i]->count > nob_read) {
1074 /* EOF inside this page */
1075 ptr = kmap(pga[i]->pg) +
1076 (pga[i]->off & ~CFS_PAGE_MASK);
1077 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1078 kunmap(pga[i]->pg);
1079 page_count--;
1080 i++;
1081 break;
1082 }
1083
1084 nob_read -= pga[i]->count;
1085 page_count--;
1086 i++;
1087 }
1088
1089 /* zero remaining pages */
1090 while (page_count-- > 0) {
1091 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1092 memset(ptr, 0, pga[i]->count);
1093 kunmap(pga[i]->pg);
1094 i++;
1095 }
1096}
1097
1098static int check_write_rcs(struct ptlrpc_request *req,
1099 int requested_nob, int niocount,
21aef7d9 1100 u32 page_count, struct brw_page **pga)
d7e09d03 1101{
29ac6840
CH
1102 int i;
1103 __u32 *remote_rcs;
d7e09d03
PT
1104
1105 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1106 sizeof(*remote_rcs) *
1107 niocount);
1108 if (remote_rcs == NULL) {
1109 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
fbe7c6c7 1110 return -EPROTO;
d7e09d03
PT
1111 }
1112
1113 /* return error if any niobuf was in error */
1114 for (i = 0; i < niocount; i++) {
1115 if ((int)remote_rcs[i] < 0)
e8291974 1116 return remote_rcs[i];
d7e09d03
PT
1117
1118 if (remote_rcs[i] != 0) {
1119 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1120 i, remote_rcs[i], req);
fbe7c6c7 1121 return -EPROTO;
d7e09d03
PT
1122 }
1123 }
1124
1125 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127 req->rq_bulk->bd_nob_transferred, requested_nob);
fbe7c6c7 1128 return -EPROTO;
d7e09d03
PT
1129 }
1130
fbe7c6c7 1131 return 0;
d7e09d03
PT
1132}
1133
1134static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1135{
1136 if (p1->flag != p2->flag) {
7cf1054b
HE
1137 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1138 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
d7e09d03
PT
1139
1140 /* warn if we try to combine flags that we don't know to be
1141 * safe to combine */
1142 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
2d00bd17 1143 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
d7e09d03
PT
1144 p1->flag, p2->flag);
1145 }
1146 return 0;
1147 }
1148
1149 return (p1->off + p1->count == p2->off);
1150}
1151
21aef7d9 1152static u32 osc_checksum_bulk(int nob, u32 pg_count,
29ac6840
CH
1153 struct brw_page **pga, int opc,
1154 cksum_type_t cksum_type)
d7e09d03 1155{
29ac6840
CH
1156 __u32 cksum;
1157 int i = 0;
1158 struct cfs_crypto_hash_desc *hdesc;
1159 unsigned int bufsize;
1160 int err;
1161 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
d7e09d03
PT
1162
1163 LASSERT(pg_count > 0);
1164
1165 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1166 if (IS_ERR(hdesc)) {
1167 CERROR("Unable to initialize checksum hash %s\n",
1168 cfs_crypto_hash_name(cfs_alg));
1169 return PTR_ERR(hdesc);
1170 }
1171
1172 while (nob > 0 && pg_count > 0) {
1173 int count = pga[i]->count > nob ? nob : pga[i]->count;
1174
1175 /* corrupt the data before we compute the checksum, to
1176 * simulate an OST->client data error */
1177 if (i == 0 && opc == OST_READ &&
1178 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1179 unsigned char *ptr = kmap(pga[i]->pg);
1180 int off = pga[i]->off & ~CFS_PAGE_MASK;
50ffcb7e 1181
d7e09d03
PT
1182 memcpy(ptr + off, "bad1", min(4, nob));
1183 kunmap(pga[i]->pg);
1184 }
1185 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1186 pga[i]->off & ~CFS_PAGE_MASK,
1187 count);
aa3bee0d
GKH
1188 CDEBUG(D_PAGE,
1189 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1190 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1191 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1192 page_private(pga[i]->pg),
1193 (int)(pga[i]->off & ~CFS_PAGE_MASK));
d7e09d03
PT
1194
1195 nob -= pga[i]->count;
1196 pg_count--;
1197 i++;
1198 }
1199
1200 bufsize = 4;
1201 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1202
1203 if (err)
1204 cfs_crypto_hash_final(hdesc, NULL, NULL);
1205
1206 /* For sending we only compute the wrong checksum instead
1207 * of corrupting the data so it is still correct on a redo */
1208 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209 cksum++;
1210
1211 return cksum;
1212}
1213
1d8cb70c
GD
1214static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1215 struct obdo *oa,
21aef7d9 1216 struct lov_stripe_md *lsm, u32 page_count,
d7e09d03
PT
1217 struct brw_page **pga,
1218 struct ptlrpc_request **reqp,
ef2e0f55 1219 int reserve,
d7e09d03
PT
1220 int resend)
1221{
29ac6840 1222 struct ptlrpc_request *req;
d7e09d03 1223 struct ptlrpc_bulk_desc *desc;
29ac6840
CH
1224 struct ost_body *body;
1225 struct obd_ioobj *ioobj;
1226 struct niobuf_remote *niobuf;
d7e09d03
PT
1227 int niocount, i, requested_nob, opc, rc;
1228 struct osc_brw_async_args *aa;
29ac6840 1229 struct req_capsule *pill;
d7e09d03
PT
1230 struct brw_page *pg_prev;
1231
d7e09d03 1232 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
0a3bdb00 1233 return -ENOMEM; /* Recoverable */
d7e09d03 1234 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
0a3bdb00 1235 return -EINVAL; /* Fatal */
d7e09d03
PT
1236
1237 if ((cmd & OBD_BRW_WRITE) != 0) {
1238 opc = OST_WRITE;
1239 req = ptlrpc_request_alloc_pool(cli->cl_import,
aefd9d71 1240 osc_rq_pool,
d7e09d03
PT
1241 &RQF_OST_BRW_WRITE);
1242 } else {
1243 opc = OST_READ;
1244 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1245 }
1246 if (req == NULL)
0a3bdb00 1247 return -ENOMEM;
d7e09d03
PT
1248
1249 for (niocount = i = 1; i < page_count; i++) {
1250 if (!can_merge_pages(pga[i - 1], pga[i]))
1251 niocount++;
1252 }
1253
1254 pill = &req->rq_pill;
1255 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256 sizeof(*ioobj));
1257 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1258 niocount * sizeof(*niobuf));
d7e09d03
PT
1259
1260 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261 if (rc) {
1262 ptlrpc_request_free(req);
0a3bdb00 1263 return rc;
d7e09d03
PT
1264 }
1265 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266 ptlrpc_at_set_req_timeout(req);
1267 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1268 * retry logic */
1269 req->rq_no_retry_einprogress = 1;
1270
1271 desc = ptlrpc_prep_bulk_imp(req, page_count,
1272 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1273 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1274 OST_BULK_PORTAL);
1275
26c4ea46
TJ
1276 if (desc == NULL) {
1277 rc = -ENOMEM;
1278 goto out;
1279 }
d7e09d03
PT
1280 /* NB request now owns desc and will free it when it gets freed */
1281
1282 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1283 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1284 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1285 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1286
3b2f75fd 1287 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1288
1289 obdo_to_ioobj(oa, ioobj);
1290 ioobj->ioo_bufcnt = niocount;
1291 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1292 * that might be send for this request. The actual number is decided
1293 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1294 * "max - 1" for old client compatibility sending "0", and also so the
1295 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1296 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
d7e09d03
PT
1297 LASSERT(page_count > 0);
1298 pg_prev = pga[0];
1299 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1300 struct brw_page *pg = pga[i];
1301 int poff = pg->off & ~CFS_PAGE_MASK;
1302
1303 LASSERT(pg->count > 0);
1304 /* make sure there is no gap in the middle of page array */
1305 LASSERTF(page_count == 1 ||
1306 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1307 ergo(i > 0 && i < page_count - 1,
1308 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1309 ergo(i == page_count - 1, poff == 0)),
b0f5aad5 1310 "i: %d/%d pg: %p off: %llu, count: %u\n",
d7e09d03
PT
1311 i, page_count, pg, pg->off, pg->count);
1312 LASSERTF(i == 0 || pg->off > pg_prev->off,
2d00bd17 1313 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
d7e09d03
PT
1314 i, page_count,
1315 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1316 pg_prev->pg, page_private(pg_prev->pg),
1317 pg_prev->pg->index, pg_prev->off);
1318 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1319 (pg->flag & OBD_BRW_SRVLOCK));
1320
1321 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1322 requested_nob += pg->count;
1323
1324 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1325 niobuf--;
1326 niobuf->len += pg->count;
1327 } else {
1328 niobuf->offset = pg->off;
29ac6840
CH
1329 niobuf->len = pg->count;
1330 niobuf->flags = pg->flag;
d7e09d03
PT
1331 }
1332 pg_prev = pg;
1333 }
1334
1335 LASSERTF((void *)(niobuf - niocount) ==
1336 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1337 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1338 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1339
1340 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341 if (resend) {
1342 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1343 body->oa.o_valid |= OBD_MD_FLFLAGS;
1344 body->oa.o_flags = 0;
1345 }
1346 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1347 }
1348
1349 if (osc_should_shrink_grant(cli))
1350 osc_shrink_grant_local(cli, &body->oa);
1351
1352 /* size[REQ_REC_OFF] still sizeof (*body) */
1353 if (opc == OST_WRITE) {
1354 if (cli->cl_checksum &&
1355 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356 /* store cl_cksum_type in a local variable since
1357 * it can be changed via lprocfs */
1358 cksum_type_t cksum_type = cli->cl_cksum_type;
1359
1360 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361 oa->o_flags &= OBD_FL_LOCAL_MASK;
1362 body->oa.o_flags = 0;
1363 }
1364 body->oa.o_flags |= cksum_type_pack(cksum_type);
1365 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367 page_count, pga,
1368 OST_WRITE,
1369 cksum_type);
1370 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371 body->oa.o_cksum);
1372 /* save this in 'oa', too, for later checking */
1373 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374 oa->o_flags |= cksum_type_pack(cksum_type);
1375 } else {
1376 /* clear out the checksum flag, in case this is a
1377 * resend but cl_checksum is no longer set. b=11238 */
1378 oa->o_valid &= ~OBD_MD_FLCKSUM;
1379 }
1380 oa->o_cksum = body->oa.o_cksum;
1381 /* 1 RC per niobuf */
1382 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383 sizeof(__u32) * niocount);
1384 } else {
1385 if (cli->cl_checksum &&
1386 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388 body->oa.o_flags = 0;
1389 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391 }
1392 }
1393 ptlrpc_request_set_replen(req);
1394
1395 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396 aa = ptlrpc_req_async_args(req);
1397 aa->aa_oa = oa;
1398 aa->aa_requested_nob = requested_nob;
1399 aa->aa_nio_count = niocount;
1400 aa->aa_page_count = page_count;
1401 aa->aa_resends = 0;
1402 aa->aa_ppga = pga;
1403 aa->aa_cli = cli;
1404 INIT_LIST_HEAD(&aa->aa_oaps);
d7e09d03
PT
1405
1406 *reqp = req;
0a3bdb00 1407 return 0;
d7e09d03
PT
1408
1409 out:
1410 ptlrpc_req_finished(req);
0a3bdb00 1411 return rc;
d7e09d03
PT
1412}
1413
1414static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415 __u32 client_cksum, __u32 server_cksum, int nob,
21aef7d9 1416 u32 page_count, struct brw_page **pga,
d7e09d03
PT
1417 cksum_type_t client_cksum_type)
1418{
1419 __u32 new_cksum;
1420 char *msg;
1421 cksum_type_t cksum_type;
1422
1423 if (server_cksum == client_cksum) {
1424 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425 return 0;
1426 }
1427
1428 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1429 oa->o_flags : 0);
1430 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1431 cksum_type);
1432
1433 if (cksum_type != client_cksum_type)
2d00bd17
JP
1434 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1435 ;
d7e09d03 1436 else if (new_cksum == server_cksum)
2d00bd17
JP
1437 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1438 ;
d7e09d03
PT
1439 else if (new_cksum == client_cksum)
1440 msg = "changed in transit before arrival at OST";
1441 else
2d00bd17
JP
1442 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1443 ;
d7e09d03
PT
1444
1445 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
b0f5aad5 1446 " object "DOSTID" extent [%llu-%llu]\n",
d7e09d03
PT
1447 msg, libcfs_nid2str(peer->nid),
1448 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1449 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1450 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1451 POSTID(&oa->o_oi), pga[0]->off,
1452 pga[page_count-1]->off + pga[page_count-1]->count - 1);
2d00bd17
JP
1453 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1454 client_cksum, client_cksum_type,
d7e09d03
PT
1455 server_cksum, cksum_type, new_cksum);
1456 return 1;
1457}
1458
1459/* Note rc enters this function as number of bytes transferred */
1460static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461{
1462 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1463 const lnet_process_id_t *peer =
1464 &req->rq_import->imp_connection->c_peer;
1465 struct client_obd *cli = aa->aa_cli;
1466 struct ost_body *body;
1467 __u32 client_cksum = 0;
d7e09d03
PT
1468
1469 if (rc < 0 && rc != -EDQUOT) {
1470 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
0a3bdb00 1471 return rc;
d7e09d03
PT
1472 }
1473
1474 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1476 if (body == NULL) {
1477 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
0a3bdb00 1478 return -EPROTO;
d7e09d03
PT
1479 }
1480
1481 /* set/clear over quota flag for a uid/gid */
1482 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1483 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1484 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485
55f5a824 1486 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
d7e09d03
PT
1487 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1488 body->oa.o_flags);
1489 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1490 }
1491
1492 osc_update_grant(cli, body);
1493
1494 if (rc < 0)
0a3bdb00 1495 return rc;
d7e09d03
PT
1496
1497 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1498 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1499
1500 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501 if (rc > 0) {
1502 CERROR("Unexpected +ve rc %d\n", rc);
0a3bdb00 1503 return -EPROTO;
d7e09d03
PT
1504 }
1505 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
0a3bdb00 1508 return -EAGAIN;
d7e09d03
PT
1509
1510 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511 check_write_checksum(&body->oa, peer, client_cksum,
1512 body->oa.o_cksum, aa->aa_requested_nob,
1513 aa->aa_page_count, aa->aa_ppga,
1514 cksum_type_unpack(aa->aa_oa->o_flags)))
0a3bdb00 1515 return -EAGAIN;
d7e09d03 1516
1d8cb70c
GD
1517 rc = check_write_rcs(req, aa->aa_requested_nob,
1518 aa->aa_nio_count,
d7e09d03 1519 aa->aa_page_count, aa->aa_ppga);
26c4ea46 1520 goto out;
d7e09d03
PT
1521 }
1522
1523 /* The rest of this function executes only for OST_READs */
1524
1525 /* if unwrap_bulk failed, return -EAGAIN to retry */
1526 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
26c4ea46
TJ
1527 if (rc < 0) {
1528 rc = -EAGAIN;
1529 goto out;
1530 }
d7e09d03
PT
1531
1532 if (rc > aa->aa_requested_nob) {
1533 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534 aa->aa_requested_nob);
0a3bdb00 1535 return -EPROTO;
d7e09d03
PT
1536 }
1537
1538 if (rc != req->rq_bulk->bd_nob_transferred) {
e72f36e2 1539 CERROR("Unexpected rc %d (%d transferred)\n",
d7e09d03 1540 rc, req->rq_bulk->bd_nob_transferred);
fbe7c6c7 1541 return -EPROTO;
d7e09d03
PT
1542 }
1543
1544 if (rc < aa->aa_requested_nob)
1545 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546
1547 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548 static int cksum_counter;
29ac6840
CH
1549 __u32 server_cksum = body->oa.o_cksum;
1550 char *via;
1551 char *router;
d7e09d03
PT
1552 cksum_type_t cksum_type;
1553
b2952d62 1554 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
d7e09d03
PT
1555 body->oa.o_flags : 0);
1556 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1557 aa->aa_ppga, OST_READ,
1558 cksum_type);
1559
1560 if (peer->nid == req->rq_bulk->bd_sender) {
1561 via = router = "";
1562 } else {
1563 via = " via ";
1564 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1565 }
1566
a2ff0f97 1567 if (server_cksum != client_cksum) {
2d00bd17 1568 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
d7e09d03
PT
1569 req->rq_import->imp_obd->obd_name,
1570 libcfs_nid2str(peer->nid),
1571 via, router,
1572 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1573 body->oa.o_parent_seq : (__u64)0,
d7e09d03 1574 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1575 body->oa.o_parent_oid : 0,
d7e09d03 1576 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1577 body->oa.o_parent_ver : 0,
d7e09d03
PT
1578 POSTID(&body->oa.o_oi),
1579 aa->aa_ppga[0]->off,
1580 aa->aa_ppga[aa->aa_page_count-1]->off +
1581 aa->aa_ppga[aa->aa_page_count-1]->count -
2d00bd17 1582 1);
d7e09d03
PT
1583 CERROR("client %x, server %x, cksum_type %x\n",
1584 client_cksum, server_cksum, cksum_type);
1585 cksum_counter = 0;
1586 aa->aa_oa->o_cksum = client_cksum;
1587 rc = -EAGAIN;
1588 } else {
1589 cksum_counter++;
1590 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1591 rc = 0;
1592 }
1593 } else if (unlikely(client_cksum)) {
1594 static int cksum_missed;
1595
1596 cksum_missed++;
1597 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1598 CERROR("Checksum %u requested from %s but not sent\n",
1599 cksum_missed, libcfs_nid2str(peer->nid));
1600 } else {
1601 rc = 0;
1602 }
1603out:
1604 if (rc >= 0)
3b2f75fd 1605 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1606 aa->aa_oa, &body->oa);
d7e09d03 1607
0a3bdb00 1608 return rc;
d7e09d03
PT
1609}
1610
d7e09d03
PT
1611static int osc_brw_redo_request(struct ptlrpc_request *request,
1612 struct osc_brw_async_args *aa, int rc)
1613{
1614 struct ptlrpc_request *new_req;
1615 struct osc_brw_async_args *new_aa;
1616 struct osc_async_page *oap;
d7e09d03
PT
1617
1618 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1619 "redo for recoverable error %d", rc);
1620
1621 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
b2952d62 1622 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
d7e09d03
PT
1623 aa->aa_cli, aa->aa_oa,
1624 NULL /* lsm unused by osc currently */,
1625 aa->aa_page_count, aa->aa_ppga,
ef2e0f55 1626 &new_req, 0, 1);
d7e09d03 1627 if (rc)
0a3bdb00 1628 return rc;
d7e09d03
PT
1629
1630 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1631 if (oap->oap_request != NULL) {
1632 LASSERTF(request == oap->oap_request,
1633 "request %p != oap_request %p\n",
1634 request, oap->oap_request);
1635 if (oap->oap_interrupted) {
1636 ptlrpc_req_finished(new_req);
0a3bdb00 1637 return -EINTR;
d7e09d03
PT
1638 }
1639 }
1640 }
1641 /* New request takes over pga and oaps from old request.
1642 * Note that copying a list_head doesn't work, need to move it... */
1643 aa->aa_resends++;
1644 new_req->rq_interpret_reply = request->rq_interpret_reply;
1645 new_req->rq_async_args = request->rq_async_args;
d7e09d03
PT
1646 /* cap resend delay to the current request timeout, this is similar to
1647 * what ptlrpc does (see after_reply()) */
1648 if (aa->aa_resends > new_req->rq_timeout)
219e6de6 1649 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
d7e09d03 1650 else
219e6de6 1651 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
d7e09d03
PT
1652 new_req->rq_generation_set = 1;
1653 new_req->rq_import_generation = request->rq_import_generation;
1654
1655 new_aa = ptlrpc_req_async_args(new_req);
1656
1657 INIT_LIST_HEAD(&new_aa->aa_oaps);
1658 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1659 INIT_LIST_HEAD(&new_aa->aa_exts);
1660 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1661 new_aa->aa_resends = aa->aa_resends;
1662
1663 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1664 if (oap->oap_request) {
1665 ptlrpc_req_finished(oap->oap_request);
1666 oap->oap_request = ptlrpc_request_addref(new_req);
1667 }
1668 }
1669
d7e09d03
PT
1670 /* XXX: This code will run into problem if we're going to support
1671 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1672 * and wait for all of them to be finished. We should inherit request
1673 * set from old request. */
c5c4c6fa 1674 ptlrpcd_add_req(new_req);
d7e09d03
PT
1675
1676 DEBUG_REQ(D_INFO, new_req, "new request");
0a3bdb00 1677 return 0;
d7e09d03
PT
1678}
1679
1680/*
1681 * ugh, we want disk allocation on the target to happen in offset order. we'll
1682 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1683 * fine for our small page arrays and doesn't require allocation. its an
1684 * insertion sort that swaps elements that are strides apart, shrinking the
1685 * stride down until its '1' and the array is sorted.
1686 */
1687static void sort_brw_pages(struct brw_page **array, int num)
1688{
1689 int stride, i, j;
1690 struct brw_page *tmp;
1691
1692 if (num == 1)
1693 return;
1694 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1695 ;
1696
1697 do {
1698 stride /= 3;
1699 for (i = stride ; i < num ; i++) {
1700 tmp = array[i];
1701 j = i;
1702 while (j >= stride && array[j - stride]->off > tmp->off) {
1703 array[j] = array[j - stride];
1704 j -= stride;
1705 }
1706 array[j] = tmp;
1707 }
1708 } while (stride > 1);
1709}
1710
21aef7d9 1711static void osc_release_ppga(struct brw_page **ppga, u32 count)
d7e09d03
PT
1712{
1713 LASSERT(ppga != NULL);
7795178d 1714 kfree(ppga);
d7e09d03
PT
1715}
1716
d7e09d03
PT
1717static int brw_interpret(const struct lu_env *env,
1718 struct ptlrpc_request *req, void *data, int rc)
1719{
1720 struct osc_brw_async_args *aa = data;
1721 struct osc_extent *ext;
1722 struct osc_extent *tmp;
29ac6840 1723 struct cl_object *obj = NULL;
d7e09d03 1724 struct client_obd *cli = aa->aa_cli;
d7e09d03
PT
1725
1726 rc = osc_brw_fini_request(req, rc);
1727 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1728 /* When server return -EINPROGRESS, client should always retry
1729 * regardless of the number of times the bulk was resent already. */
1730 if (osc_recoverable_error(rc)) {
1731 if (req->rq_import_generation !=
1732 req->rq_import->imp_generation) {
2d00bd17 1733 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
d7e09d03
PT
1734 req->rq_import->imp_obd->obd_name,
1735 POSTID(&aa->aa_oa->o_oi), rc);
1736 } else if (rc == -EINPROGRESS ||
1737 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1738 rc = osc_brw_redo_request(req, aa, rc);
1739 } else {
b0f5aad5 1740 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
d7e09d03
PT
1741 req->rq_import->imp_obd->obd_name,
1742 POSTID(&aa->aa_oa->o_oi), rc);
1743 }
1744
1745 if (rc == 0)
0a3bdb00 1746 return 0;
d7e09d03
PT
1747 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1748 rc = -EIO;
1749 }
1750
d7e09d03
PT
1751 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1752 if (obj == NULL && rc == 0) {
1753 obj = osc2cl(ext->oe_obj);
1754 cl_object_get(obj);
1755 }
1756
1757 list_del_init(&ext->oe_link);
1758 osc_extent_finish(env, ext, 1, rc);
1759 }
1760 LASSERT(list_empty(&aa->aa_exts));
1761 LASSERT(list_empty(&aa->aa_oaps));
1762
1763 if (obj != NULL) {
1764 struct obdo *oa = aa->aa_oa;
1765 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1766 unsigned long valid = 0;
1767
1768 LASSERT(rc == 0);
1769 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1770 attr->cat_blocks = oa->o_blocks;
1771 valid |= CAT_BLOCKS;
1772 }
1773 if (oa->o_valid & OBD_MD_FLMTIME) {
1774 attr->cat_mtime = oa->o_mtime;
1775 valid |= CAT_MTIME;
1776 }
1777 if (oa->o_valid & OBD_MD_FLATIME) {
1778 attr->cat_atime = oa->o_atime;
1779 valid |= CAT_ATIME;
1780 }
1781 if (oa->o_valid & OBD_MD_FLCTIME) {
1782 attr->cat_ctime = oa->o_ctime;
1783 valid |= CAT_CTIME;
1784 }
1785 if (valid != 0) {
1786 cl_object_attr_lock(obj);
1787 cl_object_attr_set(env, obj, attr, valid);
1788 cl_object_attr_unlock(obj);
1789 }
1790 cl_object_put(env, obj);
1791 }
1792 OBDO_FREE(aa->aa_oa);
1793
1794 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1795 req->rq_bulk->bd_nob_transferred);
1796 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1797 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1798
1799 client_obd_list_lock(&cli->cl_loi_list_lock);
1800 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1801 * is called so we know whether to go to sync BRWs or wait for more
1802 * RPCs to complete */
1803 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1804 cli->cl_w_in_flight--;
1805 else
1806 cli->cl_r_in_flight--;
1807 osc_wake_cache_waiters(cli);
1808 client_obd_list_unlock(&cli->cl_loi_list_lock);
1809
c5c4c6fa 1810 osc_io_unplug(env, cli, NULL);
0a3bdb00 1811 return rc;
d7e09d03
PT
1812}
1813
d7e09d03
PT
1814/**
1815 * Build an RPC by the list of extent @ext_list. The caller must ensure
1816 * that the total pages in this list are NOT over max pages per RPC.
1817 * Extents in the list must be in OES_RPC state.
1818 */
1819int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
c5c4c6fa 1820 struct list_head *ext_list, int cmd)
d7e09d03 1821{
29ac6840
CH
1822 struct ptlrpc_request *req = NULL;
1823 struct osc_extent *ext;
1824 struct brw_page **pga = NULL;
1825 struct osc_brw_async_args *aa = NULL;
1826 struct obdo *oa = NULL;
1827 struct osc_async_page *oap;
1828 struct osc_async_page *tmp;
1829 struct cl_req *clerq = NULL;
1830 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1831 struct ldlm_lock *lock = NULL;
1832 struct cl_req_attr *crattr = NULL;
1833 u64 starting_offset = OBD_OBJECT_EOF;
1834 u64 ending_offset = 0;
1835 int mpflag = 0;
1836 int mem_tight = 0;
1837 int page_count = 0;
1838 int i;
1839 int rc;
1840 struct ost_body *body;
d7e09d03 1841 LIST_HEAD(rpc_list);
d7e09d03 1842
d7e09d03
PT
1843 LASSERT(!list_empty(ext_list));
1844
1845 /* add pages into rpc_list to build BRW rpc */
1846 list_for_each_entry(ext, ext_list, oe_link) {
1847 LASSERT(ext->oe_state == OES_RPC);
1848 mem_tight |= ext->oe_memalloc;
1849 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1850 ++page_count;
1851 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1852 if (starting_offset > oap->oap_obj_off)
1853 starting_offset = oap->oap_obj_off;
1854 else
1855 LASSERT(oap->oap_page_off == 0);
1856 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1857 ending_offset = oap->oap_obj_off +
1858 oap->oap_count;
1859 else
1860 LASSERT(oap->oap_page_off + oap->oap_count ==
1861 PAGE_CACHE_SIZE);
1862 }
1863 }
1864
1865 if (mem_tight)
1866 mpflag = cfs_memory_pressure_get_and_set();
1867
7795178d 1868 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
3408e9ae 1869 if (!crattr) {
26c4ea46
TJ
1870 rc = -ENOMEM;
1871 goto out;
1872 }
cad6fafa 1873
7795178d 1874 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
26c4ea46
TJ
1875 if (pga == NULL) {
1876 rc = -ENOMEM;
1877 goto out;
1878 }
d7e09d03
PT
1879
1880 OBDO_ALLOC(oa);
26c4ea46
TJ
1881 if (oa == NULL) {
1882 rc = -ENOMEM;
1883 goto out;
1884 }
d7e09d03
PT
1885
1886 i = 0;
1887 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1888 struct cl_page *page = oap2cl_page(oap);
50ffcb7e 1889
d7e09d03
PT
1890 if (clerq == NULL) {
1891 clerq = cl_req_alloc(env, page, crt,
cad6fafa 1892 1 /* only 1-object rpcs for now */);
26c4ea46
TJ
1893 if (IS_ERR(clerq)) {
1894 rc = PTR_ERR(clerq);
1895 goto out;
1896 }
d7e09d03
PT
1897 lock = oap->oap_ldlm_lock;
1898 }
1899 if (mem_tight)
1900 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1901 pga[i] = &oap->oap_brw_page;
1902 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1903 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
cad6fafa
BJ
1904 pga[i]->pg, page_index(oap->oap_page), oap,
1905 pga[i]->flag);
d7e09d03
PT
1906 i++;
1907 cl_req_page_add(env, clerq, page);
1908 }
1909
1910 /* always get the data for the obdo for the rpc */
1911 LASSERT(clerq != NULL);
cad6fafa
BJ
1912 crattr->cra_oa = oa;
1913 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
1914 if (lock) {
1915 oa->o_handle = lock->l_remote_handle;
1916 oa->o_valid |= OBD_MD_FLHANDLE;
1917 }
1918
1919 rc = cl_req_prep(env, clerq);
1920 if (rc != 0) {
1921 CERROR("cl_req_prep failed: %d\n", rc);
26c4ea46 1922 goto out;
d7e09d03
PT
1923 }
1924
1925 sort_brw_pages(pga, page_count);
1926 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
ef2e0f55 1927 pga, &req, 1, 0);
d7e09d03
PT
1928 if (rc != 0) {
1929 CERROR("prep_req failed: %d\n", rc);
26c4ea46 1930 goto out;
d7e09d03
PT
1931 }
1932
d7e09d03
PT
1933 req->rq_interpret_reply = brw_interpret;
1934
1935 if (mem_tight != 0)
1936 req->rq_memalloc = 1;
1937
1938 /* Need to update the timestamps after the request is built in case
1939 * we race with setattr (locally or in queue at OST). If OST gets
1940 * later setattr before earlier BRW (as determined by the request xid),
1941 * the OST will not use BRW timestamps. Sadly, there is no obvious
1942 * way to do this in a single call. bug 10150 */
3ce08cd7
NY
1943 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1944 crattr->cra_oa = &body->oa;
cad6fafa 1945 cl_req_attr_set(env, clerq, crattr,
d7e09d03
PT
1946 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1947
cad6fafa 1948 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
1949
1950 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1951 aa = ptlrpc_req_async_args(req);
1952 INIT_LIST_HEAD(&aa->aa_oaps);
1953 list_splice_init(&rpc_list, &aa->aa_oaps);
1954 INIT_LIST_HEAD(&aa->aa_exts);
1955 list_splice_init(ext_list, &aa->aa_exts);
1956 aa->aa_clerq = clerq;
1957
1958 /* queued sync pages can be torn down while the pages
1959 * were between the pending list and the rpc */
1960 tmp = NULL;
1961 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1962 /* only one oap gets a request reference */
1963 if (tmp == NULL)
1964 tmp = oap;
1965 if (oap->oap_interrupted && !req->rq_intr) {
1966 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1967 oap, req);
1968 ptlrpc_mark_interrupted(req);
1969 }
1970 }
1971 if (tmp != NULL)
1972 tmp->oap_request = ptlrpc_request_addref(req);
1973
1974 client_obd_list_lock(&cli->cl_loi_list_lock);
1975 starting_offset >>= PAGE_CACHE_SHIFT;
1976 if (cmd == OBD_BRW_READ) {
1977 cli->cl_r_in_flight++;
1978 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1979 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1980 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1981 starting_offset + 1);
1982 } else {
1983 cli->cl_w_in_flight++;
1984 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1985 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1986 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1987 starting_offset + 1);
1988 }
1989 client_obd_list_unlock(&cli->cl_loi_list_lock);
1990
1991 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1992 page_count, aa, cli->cl_r_in_flight,
1993 cli->cl_w_in_flight);
1994
c5c4c6fa 1995 ptlrpcd_add_req(req);
d7e09d03 1996 rc = 0;
d7e09d03
PT
1997
1998out:
1999 if (mem_tight != 0)
2000 cfs_memory_pressure_restore(mpflag);
2001
f999d098 2002 kfree(crattr);
cad6fafa 2003
d7e09d03
PT
2004 if (rc != 0) {
2005 LASSERT(req == NULL);
2006
2007 if (oa)
2008 OBDO_FREE(oa);
59e267c0 2009 kfree(pga);
d7e09d03
PT
2010 /* this should happen rarely and is pretty bad, it makes the
2011 * pending list not follow the dirty order */
2012 while (!list_empty(ext_list)) {
2013 ext = list_entry(ext_list->next, struct osc_extent,
2014 oe_link);
2015 list_del_init(&ext->oe_link);
2016 osc_extent_finish(env, ext, 0, rc);
2017 }
2018 if (clerq && !IS_ERR(clerq))
2019 cl_req_completion(env, clerq, rc);
2020 }
0a3bdb00 2021 return rc;
d7e09d03
PT
2022}
2023
2024static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2025 struct ldlm_enqueue_info *einfo)
2026{
2027 void *data = einfo->ei_cbdata;
2028 int set = 0;
2029
2030 LASSERT(lock != NULL);
2031 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2032 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2033 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2034 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2035
2036 lock_res_and_lock(lock);
2037 spin_lock(&osc_ast_guard);
2038
2039 if (lock->l_ast_data == NULL)
2040 lock->l_ast_data = data;
2041 if (lock->l_ast_data == data)
2042 set = 1;
2043
2044 spin_unlock(&osc_ast_guard);
2045 unlock_res_and_lock(lock);
2046
2047 return set;
2048}
2049
2050static int osc_set_data_with_check(struct lustre_handle *lockh,
2051 struct ldlm_enqueue_info *einfo)
2052{
2053 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2054 int set = 0;
2055
2056 if (lock != NULL) {
2057 set = osc_set_lock_data_with_check(lock, einfo);
2058 LDLM_LOCK_PUT(lock);
2059 } else
2060 CERROR("lockh %p, data %p - client evicted?\n",
2061 lockh, einfo->ei_cbdata);
2062 return set;
2063}
2064
d7e09d03
PT
2065/* find any ldlm lock of the inode in osc
2066 * return 0 not find
2067 * 1 find one
2068 * < 0 error */
2069static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2070 ldlm_iterator_t replace, void *data)
2071{
2072 struct ldlm_res_id res_id;
2073 struct obd_device *obd = class_exp2obd(exp);
2074 int rc = 0;
2075
2076 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2077 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2078 if (rc == LDLM_ITER_STOP)
fbe7c6c7 2079 return 1;
d7e09d03 2080 if (rc == LDLM_ITER_CONTINUE)
fbe7c6c7
JL
2081 return 0;
2082 return rc;
d7e09d03
PT
2083}
2084
2085static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2086 obd_enqueue_update_f upcall, void *cookie,
2087 __u64 *flags, int agl, int rc)
2088{
2089 int intent = *flags & LDLM_FL_HAS_INTENT;
d7e09d03
PT
2090
2091 if (intent) {
2092 /* The request was created before ldlm_cli_enqueue call. */
2093 if (rc == ELDLM_LOCK_ABORTED) {
2094 struct ldlm_reply *rep;
50ffcb7e 2095
d7e09d03
PT
2096 rep = req_capsule_server_get(&req->rq_pill,
2097 &RMF_DLM_REP);
2098
2099 LASSERT(rep != NULL);
2d58de78
LW
2100 rep->lock_policy_res1 =
2101 ptlrpc_status_ntoh(rep->lock_policy_res1);
d7e09d03
PT
2102 if (rep->lock_policy_res1)
2103 rc = rep->lock_policy_res1;
2104 }
2105 }
2106
2107 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2108 (rc == 0)) {
2109 *flags |= LDLM_FL_LVB_READY;
1d8cb70c 2110 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
d7e09d03
PT
2111 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2112 }
2113
2114 /* Call the update callback. */
2115 rc = (*upcall)(cookie, rc);
0a3bdb00 2116 return rc;
d7e09d03
PT
2117}
2118
2119static int osc_enqueue_interpret(const struct lu_env *env,
2120 struct ptlrpc_request *req,
2121 struct osc_enqueue_args *aa, int rc)
2122{
2123 struct ldlm_lock *lock;
2124 struct lustre_handle handle;
2125 __u32 mode;
2126 struct ost_lvb *lvb;
2127 __u32 lvb_len;
2128 __u64 *flags = aa->oa_flags;
2129
2130 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2131 * might be freed anytime after lock upcall has been called. */
2132 lustre_handle_copy(&handle, aa->oa_lockh);
2133 mode = aa->oa_ei->ei_mode;
2134
2135 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2136 * be valid. */
2137 lock = ldlm_handle2lock(&handle);
2138
2139 /* Take an additional reference so that a blocking AST that
2140 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2141 * to arrive after an upcall has been executed by
2142 * osc_enqueue_fini(). */
2143 ldlm_lock_addref(&handle, mode);
2144
2145 /* Let CP AST to grant the lock first. */
2146 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2147
2148 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2149 lvb = NULL;
2150 lvb_len = 0;
2151 } else {
2152 lvb = aa->oa_lvb;
2153 lvb_len = sizeof(*aa->oa_lvb);
2154 }
2155
2156 /* Complete obtaining the lock procedure. */
2157 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2158 mode, flags, lvb, lvb_len, &handle, rc);
2159 /* Complete osc stuff. */
2160 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2161 flags, aa->oa_agl, rc);
2162
2163 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2164
2165 /* Release the lock for async request. */
2166 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2167 /*
2168 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2169 * not already released by
2170 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2171 */
2172 ldlm_lock_decref(&handle, mode);
2173
2174 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2175 aa->oa_lockh, req, aa);
2176 ldlm_lock_decref(&handle, mode);
2177 LDLM_LOCK_PUT(lock);
2178 return rc;
2179}
2180
d7e09d03
PT
2181struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2182
2183/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2184 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2185 * other synchronous requests, however keeping some locks and trying to obtain
2186 * others may take a considerable amount of time in a case of ost failure; and
2187 * when other sync requests do not get released lock from a client, the client
2188 * is excluded from the cluster -- such scenarious make the life difficult, so
2189 * release locks just after they are obtained. */
2190int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2191 __u64 *flags, ldlm_policy_data_t *policy,
2192 struct ost_lvb *lvb, int kms_valid,
2193 obd_enqueue_update_f upcall, void *cookie,
2194 struct ldlm_enqueue_info *einfo,
2195 struct lustre_handle *lockh,
2196 struct ptlrpc_request_set *rqset, int async, int agl)
2197{
2198 struct obd_device *obd = exp->exp_obd;
2199 struct ptlrpc_request *req = NULL;
2200 int intent = *flags & LDLM_FL_HAS_INTENT;
875332d4 2201 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
d7e09d03
PT
2202 ldlm_mode_t mode;
2203 int rc;
d7e09d03
PT
2204
2205 /* Filesystem lock extents are extended to page boundaries so that
2206 * dealing with the page cache is a little smoother. */
2207 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2208 policy->l_extent.end |= ~CFS_PAGE_MASK;
2209
2210 /*
2211 * kms is not valid when either object is completely fresh (so that no
2212 * locks are cached), or object was evicted. In the latter case cached
2213 * lock cannot be used, because it would prime inode state with
2214 * potentially stale LVB.
2215 */
2216 if (!kms_valid)
2217 goto no_match;
2218
2219 /* Next, search for already existing extent locks that will cover us */
2220 /* If we're trying to read, we also search for an existing PW lock. The
2221 * VFS and page cache already protect us locally, so lots of readers/
2222 * writers can share a single PW lock.
2223 *
2224 * There are problems with conversion deadlocks, so instead of
2225 * converting a read lock to a write lock, we'll just enqueue a new
2226 * one.
2227 *
2228 * At some point we should cancel the read lock instead of making them
2229 * send us a blocking callback, but there are problems with canceling
2230 * locks out from other users right now, too. */
2231 mode = einfo->ei_mode;
2232 if (einfo->ei_mode == LCK_PR)
2233 mode |= LCK_PW;
2234 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2235 einfo->ei_type, policy, mode, lockh, 0);
2236 if (mode) {
2237 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2238
2239 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2240 /* For AGL, if enqueue RPC is sent but the lock is not
2241 * granted, then skip to process this strpe.
2242 * Return -ECANCELED to tell the caller. */
2243 ldlm_lock_decref(lockh, mode);
2244 LDLM_LOCK_PUT(matched);
0a3bdb00 2245 return -ECANCELED;
71e8dd9a
AM
2246 }
2247
2248 if (osc_set_lock_data_with_check(matched, einfo)) {
d7e09d03
PT
2249 *flags |= LDLM_FL_LVB_READY;
2250 /* addref the lock only if not async requests and PW
2251 * lock is matched whereas we asked for PR. */
2252 if (!rqset && einfo->ei_mode != mode)
2253 ldlm_lock_addref(lockh, LCK_PR);
2254 if (intent) {
2255 /* I would like to be able to ASSERT here that
2256 * rss <= kms, but I can't, for reasons which
2257 * are explained in lov_enqueue() */
2258 }
2259
2260 /* We already have a lock, and it's referenced.
2261 *
2262 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2263 * AGL upcall may change it to CLS_HELD directly. */
2264 (*upcall)(cookie, ELDLM_OK);
2265
2266 if (einfo->ei_mode != mode)
2267 ldlm_lock_decref(lockh, LCK_PW);
2268 else if (rqset)
2269 /* For async requests, decref the lock. */
2270 ldlm_lock_decref(lockh, einfo->ei_mode);
2271 LDLM_LOCK_PUT(matched);
0a3bdb00 2272 return ELDLM_OK;
d7e09d03 2273 }
71e8dd9a
AM
2274
2275 ldlm_lock_decref(lockh, mode);
2276 LDLM_LOCK_PUT(matched);
d7e09d03
PT
2277 }
2278
2279 no_match:
2280 if (intent) {
2281 LIST_HEAD(cancels);
50ffcb7e 2282
d7e09d03
PT
2283 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2284 &RQF_LDLM_ENQUEUE_LVB);
2285 if (req == NULL)
0a3bdb00 2286 return -ENOMEM;
d7e09d03
PT
2287
2288 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2289 if (rc) {
2290 ptlrpc_request_free(req);
0a3bdb00 2291 return rc;
d7e09d03
PT
2292 }
2293
2294 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
ec83e611 2295 sizeof(*lvb));
d7e09d03
PT
2296 ptlrpc_request_set_replen(req);
2297 }
2298
2299 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2300 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2301
2302 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2303 sizeof(*lvb), LVB_T_OST, lockh, async);
2304 if (rqset) {
2305 if (!rc) {
2306 struct osc_enqueue_args *aa;
50ffcb7e 2307
d7e09d03
PT
2308 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2309 aa = ptlrpc_req_async_args(req);
2310 aa->oa_ei = einfo;
2311 aa->oa_exp = exp;
2312 aa->oa_flags = flags;
2313 aa->oa_upcall = upcall;
2314 aa->oa_cookie = cookie;
2315 aa->oa_lvb = lvb;
2316 aa->oa_lockh = lockh;
2317 aa->oa_agl = !!agl;
2318
2319 req->rq_interpret_reply =
2320 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2321 if (rqset == PTLRPCD_SET)
c5c4c6fa 2322 ptlrpcd_add_req(req);
d7e09d03
PT
2323 else
2324 ptlrpc_set_add_req(rqset, req);
2325 } else if (intent) {
2326 ptlrpc_req_finished(req);
2327 }
0a3bdb00 2328 return rc;
d7e09d03
PT
2329 }
2330
2331 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2332 if (intent)
2333 ptlrpc_req_finished(req);
2334
0a3bdb00 2335 return rc;
d7e09d03
PT
2336}
2337
d7e09d03
PT
2338int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2339 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
875332d4 2340 __u64 *flags, void *data, struct lustre_handle *lockh,
d7e09d03
PT
2341 int unref)
2342{
2343 struct obd_device *obd = exp->exp_obd;
875332d4 2344 __u64 lflags = *flags;
d7e09d03 2345 ldlm_mode_t rc;
d7e09d03
PT
2346
2347 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
0a3bdb00 2348 return -EIO;
d7e09d03
PT
2349
2350 /* Filesystem lock extents are extended to page boundaries so that
2351 * dealing with the page cache is a little smoother */
2352 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2353 policy->l_extent.end |= ~CFS_PAGE_MASK;
2354
2355 /* Next, search for already existing extent locks that will cover us */
2356 /* If we're trying to read, we also search for an existing PW lock. The
2357 * VFS and page cache already protect us locally, so lots of readers/
2358 * writers can share a single PW lock. */
2359 rc = mode;
2360 if (mode == LCK_PR)
2361 rc |= LCK_PW;
2362 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2363 res_id, type, policy, rc, lockh, unref);
2364 if (rc) {
2365 if (data != NULL) {
2366 if (!osc_set_data_with_check(lockh, data)) {
2367 if (!(lflags & LDLM_FL_TEST_LOCK))
2368 ldlm_lock_decref(lockh, rc);
0a3bdb00 2369 return 0;
d7e09d03
PT
2370 }
2371 }
2372 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2373 ldlm_lock_addref(lockh, LCK_PR);
2374 ldlm_lock_decref(lockh, LCK_PW);
2375 }
0a3bdb00 2376 return rc;
d7e09d03 2377 }
0a3bdb00 2378 return rc;
d7e09d03
PT
2379}
2380
2381int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2382{
d7e09d03
PT
2383 if (unlikely(mode == LCK_GROUP))
2384 ldlm_lock_decref_and_cancel(lockh, mode);
2385 else
2386 ldlm_lock_decref(lockh, mode);
2387
0a3bdb00 2388 return 0;
d7e09d03
PT
2389}
2390
d7e09d03
PT
2391static int osc_statfs_interpret(const struct lu_env *env,
2392 struct ptlrpc_request *req,
2393 struct osc_async_args *aa, int rc)
2394{
2395 struct obd_statfs *msfs;
d7e09d03
PT
2396
2397 if (rc == -EBADR)
2398 /* The request has in fact never been sent
2399 * due to issues at a higher level (LOV).
2400 * Exit immediately since the caller is
2401 * aware of the problem and takes care
2402 * of the clean up */
0a3bdb00 2403 return rc;
d7e09d03
PT
2404
2405 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
26c4ea46
TJ
2406 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2407 rc = 0;
2408 goto out;
2409 }
d7e09d03
PT
2410
2411 if (rc != 0)
26c4ea46 2412 goto out;
d7e09d03
PT
2413
2414 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2415 if (msfs == NULL) {
26c4ea46
TJ
2416 rc = -EPROTO;
2417 goto out;
d7e09d03
PT
2418 }
2419
2420 *aa->aa_oi->oi_osfs = *msfs;
2421out:
2422 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 2423 return rc;
d7e09d03
PT
2424}
2425
2426static int osc_statfs_async(struct obd_export *exp,
2427 struct obd_info *oinfo, __u64 max_age,
2428 struct ptlrpc_request_set *rqset)
2429{
29ac6840 2430 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
2431 struct ptlrpc_request *req;
2432 struct osc_async_args *aa;
29ac6840 2433 int rc;
d7e09d03
PT
2434
2435 /* We could possibly pass max_age in the request (as an absolute
2436 * timestamp or a "seconds.usec ago") so the target can avoid doing
2437 * extra calls into the filesystem if that isn't necessary (e.g.
2438 * during mount that would help a bit). Having relative timestamps
2439 * is not so great if request processing is slow, while absolute
2440 * timestamps are not ideal because they need time synchronization. */
2441 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2442 if (req == NULL)
0a3bdb00 2443 return -ENOMEM;
d7e09d03
PT
2444
2445 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2446 if (rc) {
2447 ptlrpc_request_free(req);
0a3bdb00 2448 return rc;
d7e09d03
PT
2449 }
2450 ptlrpc_request_set_replen(req);
2451 req->rq_request_portal = OST_CREATE_PORTAL;
2452 ptlrpc_at_set_req_timeout(req);
2453
2454 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2455 /* procfs requests not want stat in wait for avoid deadlock */
2456 req->rq_no_resend = 1;
2457 req->rq_no_delay = 1;
2458 }
2459
2460 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2461 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2462 aa = ptlrpc_req_async_args(req);
2463 aa->aa_oi = oinfo;
2464
2465 ptlrpc_set_add_req(rqset, req);
0a3bdb00 2466 return 0;
d7e09d03
PT
2467}
2468
2469static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2470 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2471{
29ac6840
CH
2472 struct obd_device *obd = class_exp2obd(exp);
2473 struct obd_statfs *msfs;
d7e09d03 2474 struct ptlrpc_request *req;
29ac6840 2475 struct obd_import *imp = NULL;
d7e09d03 2476 int rc;
d7e09d03
PT
2477
2478 /*Since the request might also come from lprocfs, so we need
2479 *sync this with client_disconnect_export Bug15684*/
2480 down_read(&obd->u.cli.cl_sem);
2481 if (obd->u.cli.cl_import)
2482 imp = class_import_get(obd->u.cli.cl_import);
2483 up_read(&obd->u.cli.cl_sem);
2484 if (!imp)
0a3bdb00 2485 return -ENODEV;
d7e09d03
PT
2486
2487 /* We could possibly pass max_age in the request (as an absolute
2488 * timestamp or a "seconds.usec ago") so the target can avoid doing
2489 * extra calls into the filesystem if that isn't necessary (e.g.
2490 * during mount that would help a bit). Having relative timestamps
2491 * is not so great if request processing is slow, while absolute
2492 * timestamps are not ideal because they need time synchronization. */
2493 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2494
2495 class_import_put(imp);
2496
2497 if (req == NULL)
0a3bdb00 2498 return -ENOMEM;
d7e09d03
PT
2499
2500 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2501 if (rc) {
2502 ptlrpc_request_free(req);
0a3bdb00 2503 return rc;
d7e09d03
PT
2504 }
2505 ptlrpc_request_set_replen(req);
2506 req->rq_request_portal = OST_CREATE_PORTAL;
2507 ptlrpc_at_set_req_timeout(req);
2508
2509 if (flags & OBD_STATFS_NODELAY) {
2510 /* procfs requests not want stat in wait for avoid deadlock */
2511 req->rq_no_resend = 1;
2512 req->rq_no_delay = 1;
2513 }
2514
2515 rc = ptlrpc_queue_wait(req);
2516 if (rc)
26c4ea46 2517 goto out;
d7e09d03
PT
2518
2519 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2520 if (msfs == NULL) {
26c4ea46
TJ
2521 rc = -EPROTO;
2522 goto out;
d7e09d03
PT
2523 }
2524
2525 *osfs = *msfs;
2526
d7e09d03
PT
2527 out:
2528 ptlrpc_req_finished(req);
2529 return rc;
2530}
2531
2532/* Retrieve object striping information.
2533 *
2534 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2535 * the maximum number of OST indices which will fit in the user buffer.
2536 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2537 */
2538static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2539{
2540 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2541 struct lov_user_md_v3 lum, *lumk;
2542 struct lov_user_ost_data_v1 *lmm_objects;
2543 int rc = 0, lum_size;
d7e09d03
PT
2544
2545 if (!lsm)
0a3bdb00 2546 return -ENODATA;
d7e09d03
PT
2547
2548 /* we only need the header part from user space to get lmm_magic and
2549 * lmm_stripe_count, (the header part is common to v1 and v3) */
2550 lum_size = sizeof(struct lov_user_md_v1);
2551 if (copy_from_user(&lum, lump, lum_size))
0a3bdb00 2552 return -EFAULT;
d7e09d03
PT
2553
2554 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2555 (lum.lmm_magic != LOV_USER_MAGIC_V3))
0a3bdb00 2556 return -EINVAL;
d7e09d03
PT
2557
2558 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2559 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2560 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2561 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2562
2563 /* we can use lov_mds_md_size() to compute lum_size
2564 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2565 if (lum.lmm_stripe_count > 0) {
2566 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
7795178d 2567 lumk = kzalloc(lum_size, GFP_NOFS);
d7e09d03 2568 if (!lumk)
0a3bdb00 2569 return -ENOMEM;
d7e09d03
PT
2570
2571 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2572 lmm_objects =
2573 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2574 else
2575 lmm_objects = &(lumk->lmm_objects[0]);
2576 lmm_objects->l_ost_oi = lsm->lsm_oi;
2577 } else {
2578 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2579 lumk = &lum;
2580 }
2581
2582 lumk->lmm_oi = lsm->lsm_oi;
2583 lumk->lmm_stripe_count = 1;
2584
2585 if (copy_to_user(lump, lumk, lum_size))
2586 rc = -EFAULT;
2587
2588 if (lumk != &lum)
7795178d 2589 kfree(lumk);
d7e09d03 2590
0a3bdb00 2591 return rc;
d7e09d03
PT
2592}
2593
d7e09d03
PT
2594static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2595 void *karg, void *uarg)
2596{
2597 struct obd_device *obd = exp->exp_obd;
2598 struct obd_ioctl_data *data = karg;
2599 int err = 0;
d7e09d03
PT
2600
2601 if (!try_module_get(THIS_MODULE)) {
2602 CERROR("Can't get module. Is it alive?");
2603 return -EINVAL;
2604 }
2605 switch (cmd) {
2606 case OBD_IOC_LOV_GET_CONFIG: {
2607 char *buf;
2608 struct lov_desc *desc;
2609 struct obd_uuid uuid;
2610
2611 buf = NULL;
2612 len = 0;
b7856753 2613 if (obd_ioctl_getdata(&buf, &len, uarg)) {
26c4ea46
TJ
2614 err = -EINVAL;
2615 goto out;
2616 }
d7e09d03
PT
2617
2618 data = (struct obd_ioctl_data *)buf;
2619
2620 if (sizeof(*desc) > data->ioc_inllen1) {
2621 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2622 err = -EINVAL;
2623 goto out;
d7e09d03
PT
2624 }
2625
2626 if (data->ioc_inllen2 < sizeof(uuid)) {
2627 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2628 err = -EINVAL;
2629 goto out;
d7e09d03
PT
2630 }
2631
2632 desc = (struct lov_desc *)data->ioc_inlbuf1;
2633 desc->ld_tgt_count = 1;
2634 desc->ld_active_tgt_count = 1;
2635 desc->ld_default_stripe_count = 1;
2636 desc->ld_default_stripe_size = 0;
2637 desc->ld_default_stripe_offset = 0;
2638 desc->ld_pattern = 0;
2639 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2640
2641 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2642
b7856753 2643 err = copy_to_user(uarg, buf, len);
d7e09d03
PT
2644 if (err)
2645 err = -EFAULT;
2646 obd_ioctl_freedata(buf, len);
26c4ea46 2647 goto out;
d7e09d03
PT
2648 }
2649 case LL_IOC_LOV_SETSTRIPE:
2650 err = obd_alloc_memmd(exp, karg);
2651 if (err > 0)
2652 err = 0;
26c4ea46 2653 goto out;
d7e09d03
PT
2654 case LL_IOC_LOV_GETSTRIPE:
2655 err = osc_getstripe(karg, uarg);
26c4ea46 2656 goto out;
d7e09d03
PT
2657 case OBD_IOC_CLIENT_RECOVER:
2658 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2659 data->ioc_inlbuf1, 0);
2660 if (err > 0)
2661 err = 0;
26c4ea46 2662 goto out;
d7e09d03
PT
2663 case IOC_OSC_SET_ACTIVE:
2664 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2665 data->ioc_offset);
26c4ea46 2666 goto out;
d7e09d03
PT
2667 case OBD_IOC_POLL_QUOTACHECK:
2668 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
26c4ea46 2669 goto out;
d7e09d03
PT
2670 case OBD_IOC_PING_TARGET:
2671 err = ptlrpc_obd_ping(obd);
26c4ea46 2672 goto out;
d7e09d03
PT
2673 default:
2674 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2675 cmd, current_comm());
26c4ea46
TJ
2676 err = -ENOTTY;
2677 goto out;
d7e09d03
PT
2678 }
2679out:
2680 module_put(THIS_MODULE);
2681 return err;
2682}
2683
2684static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2685 u32 keylen, void *key, __u32 *vallen, void *val,
d7e09d03
PT
2686 struct lov_stripe_md *lsm)
2687{
d7e09d03 2688 if (!vallen || !val)
0a3bdb00 2689 return -EFAULT;
d7e09d03
PT
2690
2691 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2692 __u32 *stripe = val;
2693 *vallen = sizeof(*stripe);
2694 *stripe = 0;
0a3bdb00 2695 return 0;
d7e09d03
PT
2696 } else if (KEY_IS(KEY_LAST_ID)) {
2697 struct ptlrpc_request *req;
29ac6840
CH
2698 u64 *reply;
2699 char *tmp;
2700 int rc;
d7e09d03
PT
2701
2702 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2703 &RQF_OST_GET_INFO_LAST_ID);
2704 if (req == NULL)
0a3bdb00 2705 return -ENOMEM;
d7e09d03
PT
2706
2707 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2708 RCL_CLIENT, keylen);
2709 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2710 if (rc) {
2711 ptlrpc_request_free(req);
0a3bdb00 2712 return rc;
d7e09d03
PT
2713 }
2714
2715 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2716 memcpy(tmp, key, keylen);
2717
2718 req->rq_no_delay = req->rq_no_resend = 1;
2719 ptlrpc_request_set_replen(req);
2720 rc = ptlrpc_queue_wait(req);
2721 if (rc)
26c4ea46 2722 goto out;
d7e09d03
PT
2723
2724 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
26c4ea46
TJ
2725 if (reply == NULL) {
2726 rc = -EPROTO;
2727 goto out;
2728 }
d7e09d03 2729
21aef7d9 2730 *((u64 *)val) = *reply;
d7e09d03
PT
2731 out:
2732 ptlrpc_req_finished(req);
0a3bdb00 2733 return rc;
d7e09d03 2734 } else if (KEY_IS(KEY_FIEMAP)) {
9d865439
AB
2735 struct ll_fiemap_info_key *fm_key =
2736 (struct ll_fiemap_info_key *)key;
29ac6840
CH
2737 struct ldlm_res_id res_id;
2738 ldlm_policy_data_t policy;
2739 struct lustre_handle lockh;
2740 ldlm_mode_t mode = 0;
2741 struct ptlrpc_request *req;
2742 struct ll_user_fiemap *reply;
2743 char *tmp;
2744 int rc;
9d865439
AB
2745
2746 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2747 goto skip_locking;
2748
2749 policy.l_extent.start = fm_key->fiemap.fm_start &
2750 CFS_PAGE_MASK;
2751
2752 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2753 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2754 policy.l_extent.end = OBD_OBJECT_EOF;
2755 else
2756 policy.l_extent.end = (fm_key->fiemap.fm_start +
2757 fm_key->fiemap.fm_length +
2758 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2759
2760 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2761 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2762 LDLM_FL_BLOCK_GRANTED |
2763 LDLM_FL_LVB_READY,
2764 &res_id, LDLM_EXTENT, &policy,
2765 LCK_PR | LCK_PW, &lockh, 0);
2766 if (mode) { /* lock is cached on client */
2767 if (mode != LCK_PR) {
2768 ldlm_lock_addref(&lockh, LCK_PR);
2769 ldlm_lock_decref(&lockh, LCK_PW);
2770 }
2771 } else { /* no cached lock, needs acquire lock on server side */
2772 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2773 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2774 }
d7e09d03 2775
9d865439 2776skip_locking:
d7e09d03
PT
2777 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2778 &RQF_OST_GET_INFO_FIEMAP);
26c4ea46
TJ
2779 if (req == NULL) {
2780 rc = -ENOMEM;
2781 goto drop_lock;
2782 }
d7e09d03
PT
2783
2784 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2785 RCL_CLIENT, keylen);
2786 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2787 RCL_CLIENT, *vallen);
2788 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2789 RCL_SERVER, *vallen);
2790
2791 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2792 if (rc) {
2793 ptlrpc_request_free(req);
26c4ea46 2794 goto drop_lock;
d7e09d03
PT
2795 }
2796
2797 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2798 memcpy(tmp, key, keylen);
2799 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2800 memcpy(tmp, val, *vallen);
2801
2802 ptlrpc_request_set_replen(req);
2803 rc = ptlrpc_queue_wait(req);
2804 if (rc)
26c4ea46 2805 goto fini_req;
d7e09d03
PT
2806
2807 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
26c4ea46
TJ
2808 if (reply == NULL) {
2809 rc = -EPROTO;
2810 goto fini_req;
2811 }
d7e09d03
PT
2812
2813 memcpy(val, reply, *vallen);
9d865439 2814fini_req:
d7e09d03 2815 ptlrpc_req_finished(req);
9d865439
AB
2816drop_lock:
2817 if (mode)
2818 ldlm_lock_decref(&lockh, LCK_PR);
0a3bdb00 2819 return rc;
d7e09d03
PT
2820 }
2821
0a3bdb00 2822 return -EINVAL;
d7e09d03
PT
2823}
2824
2825static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2826 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
2827 void *val, struct ptlrpc_request_set *set)
2828{
2829 struct ptlrpc_request *req;
29ac6840
CH
2830 struct obd_device *obd = exp->exp_obd;
2831 struct obd_import *imp = class_exp2cliimp(exp);
2832 char *tmp;
2833 int rc;
d7e09d03
PT
2834
2835 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2836
2837 if (KEY_IS(KEY_CHECKSUM)) {
2838 if (vallen != sizeof(int))
0a3bdb00 2839 return -EINVAL;
d7e09d03 2840 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
0a3bdb00 2841 return 0;
d7e09d03
PT
2842 }
2843
2844 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2845 sptlrpc_conf_client_adapt(obd);
0a3bdb00 2846 return 0;
d7e09d03
PT
2847 }
2848
2849 if (KEY_IS(KEY_FLUSH_CTX)) {
2850 sptlrpc_import_flush_my_ctx(imp);
0a3bdb00 2851 return 0;
d7e09d03
PT
2852 }
2853
2854 if (KEY_IS(KEY_CACHE_SET)) {
2855 struct client_obd *cli = &obd->u.cli;
2856
2857 LASSERT(cli->cl_cache == NULL); /* only once */
2858 cli->cl_cache = (struct cl_client_cache *)val;
2859 atomic_inc(&cli->cl_cache->ccc_users);
2860 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2861
2862 /* add this osc into entity list */
2863 LASSERT(list_empty(&cli->cl_lru_osc));
2864 spin_lock(&cli->cl_cache->ccc_lru_lock);
2865 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2866 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2867
0a3bdb00 2868 return 0;
d7e09d03
PT
2869 }
2870
2871 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2872 struct client_obd *cli = &obd->u.cli;
2873 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2874 int target = *(int *)val;
2875
2876 nr = osc_lru_shrink(cli, min(nr, target));
2877 *(int *)val -= nr;
0a3bdb00 2878 return 0;
d7e09d03
PT
2879 }
2880
2881 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
0a3bdb00 2882 return -EINVAL;
d7e09d03
PT
2883
2884 /* We pass all other commands directly to OST. Since nobody calls osc
2885 methods directly and everybody is supposed to go through LOV, we
2886 assume lov checked invalid values for us.
2887 The only recognised values so far are evict_by_nid and mds_conn.
2888 Even if something bad goes through, we'd get a -EINVAL from OST
2889 anyway. */
2890
2891 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2892 &RQF_OST_SET_GRANT_INFO :
2893 &RQF_OBD_SET_INFO);
2894 if (req == NULL)
0a3bdb00 2895 return -ENOMEM;
d7e09d03
PT
2896
2897 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2898 RCL_CLIENT, keylen);
2899 if (!KEY_IS(KEY_GRANT_SHRINK))
2900 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2901 RCL_CLIENT, vallen);
2902 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2903 if (rc) {
2904 ptlrpc_request_free(req);
0a3bdb00 2905 return rc;
d7e09d03
PT
2906 }
2907
2908 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2909 memcpy(tmp, key, keylen);
2910 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2911 &RMF_OST_BODY :
2912 &RMF_SETINFO_VAL);
2913 memcpy(tmp, val, vallen);
2914
2915 if (KEY_IS(KEY_GRANT_SHRINK)) {
f024bad4 2916 struct osc_brw_async_args *aa;
d7e09d03
PT
2917 struct obdo *oa;
2918
2919 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2920 aa = ptlrpc_req_async_args(req);
2921 OBDO_ALLOC(oa);
2922 if (!oa) {
2923 ptlrpc_req_finished(req);
0a3bdb00 2924 return -ENOMEM;
d7e09d03
PT
2925 }
2926 *oa = ((struct ost_body *)val)->oa;
2927 aa->aa_oa = oa;
2928 req->rq_interpret_reply = osc_shrink_grant_interpret;
2929 }
2930
2931 ptlrpc_request_set_replen(req);
2932 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2933 LASSERT(set != NULL);
2934 ptlrpc_set_add_req(set, req);
2935 ptlrpc_check_set(NULL, set);
c5c4c6fa
OW
2936 } else {
2937 ptlrpcd_add_req(req);
2938 }
d7e09d03 2939
0a3bdb00 2940 return 0;
d7e09d03
PT
2941}
2942
d7e09d03
PT
2943static int osc_reconnect(const struct lu_env *env,
2944 struct obd_export *exp, struct obd_device *obd,
2945 struct obd_uuid *cluuid,
2946 struct obd_connect_data *data,
2947 void *localdata)
2948{
2949 struct client_obd *cli = &obd->u.cli;
2950
2951 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2952 long lost_grant;
2953
2954 client_obd_list_lock(&cli->cl_loi_list_lock);
2955 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2956 2 * cli_brw_size(obd);
2957 lost_grant = cli->cl_lost_grant;
2958 cli->cl_lost_grant = 0;
2959 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960
2d00bd17
JP
2961 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2962 data->ocd_connect_flags,
d7e09d03
PT
2963 data->ocd_version, data->ocd_grant, lost_grant);
2964 }
2965
0a3bdb00 2966 return 0;
d7e09d03
PT
2967}
2968
2969static int osc_disconnect(struct obd_export *exp)
2970{
2971 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
2972 int rc;
2973
d7e09d03
PT
2974 rc = client_disconnect_export(exp);
2975 /**
2976 * Initially we put del_shrink_grant before disconnect_export, but it
2977 * causes the following problem if setup (connect) and cleanup
2978 * (disconnect) are tangled together.
2979 * connect p1 disconnect p2
2980 * ptlrpc_connect_import
2981 * ............... class_manual_cleanup
2982 * osc_disconnect
2983 * del_shrink_grant
2984 * ptlrpc_connect_interrupt
2985 * init_grant_shrink
2986 * add this client to shrink list
2987 * cleanup_osc
2988 * Bang! pinger trigger the shrink.
2989 * So the osc should be disconnected from the shrink list, after we
2990 * are sure the import has been destroyed. BUG18662
2991 */
2992 if (obd->u.cli.cl_import == NULL)
2993 osc_del_shrink_grant(&obd->u.cli);
2994 return rc;
2995}
2996
2997static int osc_import_event(struct obd_device *obd,
2998 struct obd_import *imp,
2999 enum obd_import_event event)
3000{
3001 struct client_obd *cli;
3002 int rc = 0;
3003
d7e09d03
PT
3004 LASSERT(imp->imp_obd == obd);
3005
3006 switch (event) {
3007 case IMP_EVENT_DISCON: {
3008 cli = &obd->u.cli;
3009 client_obd_list_lock(&cli->cl_loi_list_lock);
3010 cli->cl_avail_grant = 0;
3011 cli->cl_lost_grant = 0;
3012 client_obd_list_unlock(&cli->cl_loi_list_lock);
3013 break;
3014 }
3015 case IMP_EVENT_INACTIVE: {
3016 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3017 break;
3018 }
3019 case IMP_EVENT_INVALIDATE: {
3020 struct ldlm_namespace *ns = obd->obd_namespace;
29ac6840
CH
3021 struct lu_env *env;
3022 int refcheck;
d7e09d03
PT
3023
3024 env = cl_env_get(&refcheck);
3025 if (!IS_ERR(env)) {
3026 /* Reset grants */
3027 cli = &obd->u.cli;
3028 /* all pages go to failing rpcs due to the invalid
3029 * import */
c5c4c6fa 3030 osc_io_unplug(env, cli, NULL);
d7e09d03
PT
3031
3032 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3033 cl_env_put(env, &refcheck);
3034 } else
3035 rc = PTR_ERR(env);
3036 break;
3037 }
3038 case IMP_EVENT_ACTIVE: {
3039 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3040 break;
3041 }
3042 case IMP_EVENT_OCD: {
3043 struct obd_connect_data *ocd = &imp->imp_connect_data;
3044
3045 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3046 osc_init_grant(&obd->u.cli, ocd);
3047
3048 /* See bug 7198 */
3049 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
b2952d62 3050 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
d7e09d03
PT
3051
3052 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3053 break;
3054 }
3055 case IMP_EVENT_DEACTIVATE: {
3056 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3057 break;
3058 }
3059 case IMP_EVENT_ACTIVATE: {
3060 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3061 break;
3062 }
3063 default:
3064 CERROR("Unknown import event %d\n", event);
3065 LBUG();
3066 }
0a3bdb00 3067 return rc;
d7e09d03
PT
3068}
3069
3070/**
3071 * Determine whether the lock can be canceled before replaying the lock
3072 * during recovery, see bug16774 for detailed information.
3073 *
3074 * \retval zero the lock can't be canceled
3075 * \retval other ok to cancel
3076 */
3077static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3078{
3079 check_res_locked(lock->l_resource);
3080
3081 /*
3082 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3083 *
3084 * XXX as a future improvement, we can also cancel unused write lock
3085 * if it doesn't have dirty data and active mmaps.
3086 */
3087 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3088 (lock->l_granted_mode == LCK_PR ||
3089 lock->l_granted_mode == LCK_CR) &&
3090 (osc_dlm_lock_pageref(lock) == 0))
0a3bdb00 3091 return 1;
d7e09d03 3092
0a3bdb00 3093 return 0;
d7e09d03
PT
3094}
3095
3096static int brw_queue_work(const struct lu_env *env, void *data)
3097{
3098 struct client_obd *cli = data;
3099
3100 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3101
c5c4c6fa 3102 osc_io_unplug(env, cli, NULL);
0a3bdb00 3103 return 0;
d7e09d03
PT
3104}
3105
3106int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3107{
ea7893bb 3108 struct lprocfs_static_vars lvars = { NULL };
29ac6840
CH
3109 struct client_obd *cli = &obd->u.cli;
3110 void *handler;
3111 int rc;
aefd9d71
LX
3112 int adding;
3113 int added;
3114 int req_count;
d7e09d03
PT
3115
3116 rc = ptlrpcd_addref();
3117 if (rc)
0a3bdb00 3118 return rc;
d7e09d03
PT
3119
3120 rc = client_obd_setup(obd, lcfg);
3121 if (rc)
26c4ea46 3122 goto out_ptlrpcd;
d7e09d03
PT
3123
3124 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
26c4ea46
TJ
3125 if (IS_ERR(handler)) {
3126 rc = PTR_ERR(handler);
3127 goto out_client_setup;
3128 }
d7e09d03
PT
3129 cli->cl_writeback_work = handler;
3130
3131 rc = osc_quota_setup(obd);
3132 if (rc)
26c4ea46 3133 goto out_ptlrpcd_work;
d7e09d03
PT
3134
3135 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3136 lprocfs_osc_init_vars(&lvars);
9b801302 3137 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
d7e09d03
PT
3138 lproc_osc_attach_seqstat(obd);
3139 sptlrpc_lprocfs_cliobd_attach(obd);
3140 ptlrpc_lprocfs_register_obd(obd);
3141 }
3142
aefd9d71
LX
3143 /*
3144 * We try to control the total number of requests with a upper limit
3145 * osc_reqpool_maxreqcount. There might be some race which will cause
3146 * over-limit allocation, but it is fine.
3147 */
3148 req_count = atomic_read(&osc_pool_req_count);
3149 if (req_count < osc_reqpool_maxreqcount) {
3150 adding = cli->cl_max_rpcs_in_flight + 2;
3151 if (req_count + adding > osc_reqpool_maxreqcount)
3152 adding = osc_reqpool_maxreqcount - req_count;
3153
3154 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3155 atomic_add(added, &osc_pool_req_count);
3156 }
d7e09d03
PT
3157
3158 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3159 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
0a3bdb00 3160 return rc;
d7e09d03
PT
3161
3162out_ptlrpcd_work:
3163 ptlrpcd_destroy_work(handler);
3164out_client_setup:
3165 client_obd_cleanup(obd);
3166out_ptlrpcd:
3167 ptlrpcd_decref();
0a3bdb00 3168 return rc;
d7e09d03
PT
3169}
3170
3171static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3172{
d7e09d03
PT
3173 switch (stage) {
3174 case OBD_CLEANUP_EARLY: {
3175 struct obd_import *imp;
50ffcb7e 3176
d7e09d03
PT
3177 imp = obd->u.cli.cl_import;
3178 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3179 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3180 ptlrpc_deactivate_import(imp);
3181 spin_lock(&imp->imp_lock);
3182 imp->imp_pingable = 0;
3183 spin_unlock(&imp->imp_lock);
3184 break;
3185 }
3186 case OBD_CLEANUP_EXPORTS: {
3187 struct client_obd *cli = &obd->u.cli;
3188 /* LU-464
3189 * for echo client, export may be on zombie list, wait for
3190 * zombie thread to cull it, because cli.cl_import will be
3191 * cleared in client_disconnect_export():
3192 * class_export_destroy() -> obd_cleanup() ->
3193 * echo_device_free() -> echo_client_cleanup() ->
3194 * obd_disconnect() -> osc_disconnect() ->
3195 * client_disconnect_export()
3196 */
3197 obd_zombie_barrier();
3198 if (cli->cl_writeback_work) {
3199 ptlrpcd_destroy_work(cli->cl_writeback_work);
3200 cli->cl_writeback_work = NULL;
3201 }
3202 obd_cleanup_client_import(obd);
3203 ptlrpc_lprocfs_unregister_obd(obd);
3204 lprocfs_obd_cleanup(obd);
d7e09d03
PT
3205 break;
3206 }
3207 }
41f8d410 3208 return 0;
d7e09d03
PT
3209}
3210
3211int osc_cleanup(struct obd_device *obd)
3212{
3213 struct client_obd *cli = &obd->u.cli;
3214 int rc;
3215
d7e09d03
PT
3216 /* lru cleanup */
3217 if (cli->cl_cache != NULL) {
3218 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3219 spin_lock(&cli->cl_cache->ccc_lru_lock);
3220 list_del_init(&cli->cl_lru_osc);
3221 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3222 cli->cl_lru_left = NULL;
3223 atomic_dec(&cli->cl_cache->ccc_users);
3224 cli->cl_cache = NULL;
3225 }
3226
3227 /* free memory of osc quota cache */
3228 osc_quota_cleanup(obd);
3229
3230 rc = client_obd_cleanup(obd);
3231
3232 ptlrpcd_decref();
0a3bdb00 3233 return rc;
d7e09d03
PT
3234}
3235
3236int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3237{
ea7893bb 3238 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3239 int rc = 0;
3240
3241 lprocfs_osc_init_vars(&lvars);
3242
3243 switch (lcfg->lcfg_command) {
3244 default:
3245 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3246 lcfg, obd);
3247 if (rc > 0)
3248 rc = 0;
3249 break;
3250 }
3251
fbe7c6c7 3252 return rc;
d7e09d03
PT
3253}
3254
21aef7d9 3255static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
3256{
3257 return osc_process_config_base(obd, buf);
3258}
3259
3260struct obd_ops osc_obd_ops = {
3261 .o_owner = THIS_MODULE,
3262 .o_setup = osc_setup,
3263 .o_precleanup = osc_precleanup,
3264 .o_cleanup = osc_cleanup,
3265 .o_add_conn = client_import_add_conn,
3266 .o_del_conn = client_import_del_conn,
3267 .o_connect = client_connect_import,
3268 .o_reconnect = osc_reconnect,
3269 .o_disconnect = osc_disconnect,
3270 .o_statfs = osc_statfs,
3271 .o_statfs_async = osc_statfs_async,
3272 .o_packmd = osc_packmd,
3273 .o_unpackmd = osc_unpackmd,
3274 .o_create = osc_create,
3275 .o_destroy = osc_destroy,
3276 .o_getattr = osc_getattr,
3277 .o_getattr_async = osc_getattr_async,
3278 .o_setattr = osc_setattr,
3279 .o_setattr_async = osc_setattr_async,
d7e09d03 3280 .o_find_cbdata = osc_find_cbdata,
d7e09d03
PT
3281 .o_iocontrol = osc_iocontrol,
3282 .o_get_info = osc_get_info,
3283 .o_set_info_async = osc_set_info_async,
3284 .o_import_event = osc_import_event,
d7e09d03
PT
3285 .o_process_config = osc_process_config,
3286 .o_quotactl = osc_quotactl,
3287 .o_quotacheck = osc_quotacheck,
3288};
3289
3290extern struct lu_kmem_descr osc_caches[];
3291extern spinlock_t osc_ast_guard;
3292extern struct lock_class_key osc_ast_guard_class;
3293
b47ea4bb 3294static int __init osc_init(void)
d7e09d03 3295{
ea7893bb 3296 struct lprocfs_static_vars lvars = { NULL };
aefd9d71
LX
3297 unsigned int reqpool_size;
3298 unsigned int reqsize;
d7e09d03 3299 int rc;
d7e09d03
PT
3300
3301 /* print an address of _any_ initialized kernel symbol from this
3302 * module, to allow debugging with gdb that doesn't support data
3303 * symbols from modules.*/
3304 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3305
3306 rc = lu_kmem_init(osc_caches);
a55e0f44 3307 if (rc)
0a3bdb00 3308 return rc;
d7e09d03
PT
3309
3310 lprocfs_osc_init_vars(&lvars);
3311
2962b440 3312 rc = class_register_type(&osc_obd_ops, NULL,
d7e09d03 3313 LUSTRE_OSC_NAME, &osc_device_type);
aefd9d71
LX
3314 if (rc)
3315 goto out_kmem;
d7e09d03
PT
3316
3317 spin_lock_init(&osc_ast_guard);
3318 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3319
aefd9d71
LX
3320 /* This is obviously too much memory, only prevent overflow here */
3321 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3322 rc = -EINVAL;
3323 goto out_type;
3324 }
3325
3326 reqpool_size = osc_reqpool_mem_max << 20;
3327
3328 reqsize = 1;
3329 while (reqsize < OST_MAXREQSIZE)
3330 reqsize = reqsize << 1;
3331
3332 /*
3333 * We don't enlarge the request count in OSC pool according to
3334 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3335 * tried after normal allocation failed. So a small OSC pool won't
3336 * cause much performance degression in most of cases.
3337 */
3338 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3339
3340 atomic_set(&osc_pool_req_count, 0);
3341 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3342 ptlrpc_add_rqs_to_pool);
3343
3344 if (osc_rq_pool)
3345 return 0;
3346
3347 rc = -ENOMEM;
3348
3349out_type:
3350 class_unregister_type(LUSTRE_OSC_NAME);
3351out_kmem:
3352 lu_kmem_fini(osc_caches);
0a3bdb00 3353 return rc;
d7e09d03
PT
3354}
3355
3356static void /*__exit*/ osc_exit(void)
3357{
3358 class_unregister_type(LUSTRE_OSC_NAME);
3359 lu_kmem_fini(osc_caches);
aefd9d71 3360 ptlrpc_free_rq_pool(osc_rq_pool);
d7e09d03
PT
3361}
3362
3363MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3364MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3365MODULE_LICENSE("GPL");
6960736c 3366MODULE_VERSION(LUSTRE_VERSION_STRING);
d7e09d03 3367
6960736c
GKH
3368module_init(osc_init);
3369module_exit(osc_exit);
This page took 0.580484 seconds and 5 git commands to generate.