staging/lustre: use 64-bit times for ptlrpc_sec
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
9fdaf8c0 39#include "../../include/linux/libcfs/libcfs.h"
d7e09d03
PT
40
41
3ee30015
GKH
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
d7e09d03 46
3ee30015
GKH
47#include "../include/lustre_ha.h"
48#include "../include/lprocfs_status.h"
3ee30015
GKH
49#include "../include/lustre_debug.h"
50#include "../include/lustre_param.h"
51#include "../include/lustre_fid.h"
dd45f477 52#include "../include/obd_class.h"
aefd9d71 53#include "../include/obd.h"
d7e09d03
PT
54#include "osc_internal.h"
55#include "osc_cl_internal.h"
56
aefd9d71
LX
57atomic_t osc_pool_req_count;
58unsigned int osc_reqpool_maxreqcount;
59struct ptlrpc_request_pool *osc_rq_pool;
60
61/* max memory used for request pool, unit is MB */
62static unsigned int osc_reqpool_mem_max = 5;
63module_param(osc_reqpool_mem_max, uint, 0444);
64
f024bad4
JH
65struct osc_brw_async_args {
66 struct obdo *aa_oa;
67 int aa_requested_nob;
68 int aa_nio_count;
69 u32 aa_page_count;
70 int aa_resends;
71 struct brw_page **aa_ppga;
72 struct client_obd *aa_cli;
73 struct list_head aa_oaps;
74 struct list_head aa_exts;
75 struct obd_capa *aa_ocapa;
76 struct cl_req *aa_clerq;
77};
78
79struct osc_async_args {
80 struct obd_info *aa_oi;
81};
82
83struct osc_setattr_args {
84 struct obdo *sa_oa;
85 obd_enqueue_update_f sa_upcall;
86 void *sa_cookie;
87};
88
89struct osc_fsync_args {
90 struct obd_info *fa_oi;
91 obd_enqueue_update_f fa_upcall;
92 void *fa_cookie;
93};
94
95struct osc_enqueue_args {
96 struct obd_export *oa_exp;
97 __u64 *oa_flags;
98 obd_enqueue_update_f oa_upcall;
99 void *oa_cookie;
100 struct ost_lvb *oa_lvb;
101 struct lustre_handle *oa_lockh;
102 struct ldlm_enqueue_info *oa_ei;
103 unsigned int oa_agl:1;
104};
105
21aef7d9 106static void osc_release_ppga(struct brw_page **ppga, u32 count);
d7e09d03
PT
107static int brw_interpret(const struct lu_env *env,
108 struct ptlrpc_request *req, void *data, int rc);
109int osc_cleanup(struct obd_device *obd);
110
111/* Pack OSC object metadata for disk storage (LE byte order). */
112static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
113 struct lov_stripe_md *lsm)
114{
115 int lmm_size;
d7e09d03
PT
116
117 lmm_size = sizeof(**lmmp);
118 if (lmmp == NULL)
0a3bdb00 119 return lmm_size;
d7e09d03
PT
120
121 if (*lmmp != NULL && lsm == NULL) {
7795178d 122 kfree(*lmmp);
d7e09d03 123 *lmmp = NULL;
0a3bdb00 124 return 0;
d7e09d03 125 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
0a3bdb00 126 return -EBADF;
d7e09d03
PT
127 }
128
129 if (*lmmp == NULL) {
7795178d 130 *lmmp = kzalloc(lmm_size, GFP_NOFS);
3408e9ae 131 if (!*lmmp)
0a3bdb00 132 return -ENOMEM;
d7e09d03
PT
133 }
134
135 if (lsm)
136 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
137
0a3bdb00 138 return lmm_size;
d7e09d03
PT
139}
140
141/* Unpack OSC object metadata from disk storage (LE byte order). */
142static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
143 struct lov_mds_md *lmm, int lmm_bytes)
144{
145 int lsm_size;
146 struct obd_import *imp = class_exp2cliimp(exp);
d7e09d03
PT
147
148 if (lmm != NULL) {
149 if (lmm_bytes < sizeof(*lmm)) {
150 CERROR("%s: lov_mds_md too small: %d, need %d\n",
151 exp->exp_obd->obd_name, lmm_bytes,
152 (int)sizeof(*lmm));
0a3bdb00 153 return -EINVAL;
d7e09d03
PT
154 }
155 /* XXX LOV_MAGIC etc check? */
156
157 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
158 CERROR("%s: zero lmm_object_id: rc = %d\n",
159 exp->exp_obd->obd_name, -EINVAL);
0a3bdb00 160 return -EINVAL;
d7e09d03
PT
161 }
162 }
163
164 lsm_size = lov_stripe_md_size(1);
165 if (lsmp == NULL)
0a3bdb00 166 return lsm_size;
d7e09d03
PT
167
168 if (*lsmp != NULL && lmm == NULL) {
7795178d
JL
169 kfree((*lsmp)->lsm_oinfo[0]);
170 kfree(*lsmp);
d7e09d03 171 *lsmp = NULL;
0a3bdb00 172 return 0;
d7e09d03
PT
173 }
174
175 if (*lsmp == NULL) {
7795178d 176 *lsmp = kzalloc(lsm_size, GFP_NOFS);
d7e09d03 177 if (unlikely(*lsmp == NULL))
0a3bdb00 178 return -ENOMEM;
7795178d
JL
179 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
180 GFP_NOFS);
d7e09d03 181 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
7795178d 182 kfree(*lsmp);
0a3bdb00 183 return -ENOMEM;
d7e09d03
PT
184 }
185 loi_init((*lsmp)->lsm_oinfo[0]);
186 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
0a3bdb00 187 return -EBADF;
d7e09d03
PT
188 }
189
190 if (lmm != NULL)
191 /* XXX zero *lsmp? */
192 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
193
194 if (imp != NULL &&
195 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
196 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
197 else
198 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
199
0a3bdb00 200 return lsm_size;
d7e09d03
PT
201}
202
203static inline void osc_pack_capa(struct ptlrpc_request *req,
204 struct ost_body *body, void *capa)
205{
206 struct obd_capa *oc = (struct obd_capa *)capa;
207 struct lustre_capa *c;
208
209 if (!capa)
210 return;
211
212 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
213 LASSERT(c);
214 capa_cpy(c, oc);
215 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
216 DEBUG_CAPA(D_SEC, c, "pack");
217}
218
219static inline void osc_pack_req_body(struct ptlrpc_request *req,
220 struct obd_info *oinfo)
221{
222 struct ost_body *body;
223
224 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
225 LASSERT(body);
226
3b2f75fd 227 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
228 oinfo->oi_oa);
d7e09d03
PT
229 osc_pack_capa(req, body, oinfo->oi_capa);
230}
231
232static inline void osc_set_capa_size(struct ptlrpc_request *req,
233 const struct req_msg_field *field,
234 struct obd_capa *oc)
235{
236 if (oc == NULL)
237 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
238 else
239 /* it is already calculated as sizeof struct obd_capa */
240 ;
241}
242
243static int osc_getattr_interpret(const struct lu_env *env,
244 struct ptlrpc_request *req,
245 struct osc_async_args *aa, int rc)
246{
247 struct ost_body *body;
d7e09d03
PT
248
249 if (rc != 0)
26c4ea46 250 goto out;
d7e09d03
PT
251
252 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
253 if (body) {
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 255 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
256 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
257
258 /* This should really be sent by the OST */
259 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
260 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
261 } else {
262 CDEBUG(D_INFO, "can't unpack ost_body\n");
263 rc = -EPROTO;
264 aa->aa_oi->oi_oa->o_valid = 0;
265 }
266out:
267 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 268 return rc;
d7e09d03
PT
269}
270
271static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
272 struct ptlrpc_request_set *set)
273{
274 struct ptlrpc_request *req;
275 struct osc_async_args *aa;
29ac6840 276 int rc;
d7e09d03
PT
277
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279 if (req == NULL)
0a3bdb00 280 return -ENOMEM;
d7e09d03
PT
281
282 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 if (rc) {
285 ptlrpc_request_free(req);
0a3bdb00 286 return rc;
d7e09d03
PT
287 }
288
289 osc_pack_req_body(req, oinfo);
290
291 ptlrpc_request_set_replen(req);
292 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
293
294 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
295 aa = ptlrpc_req_async_args(req);
296 aa->aa_oi = oinfo;
297
298 ptlrpc_set_add_req(set, req);
0a3bdb00 299 return 0;
d7e09d03
PT
300}
301
302static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
303 struct obd_info *oinfo)
304{
305 struct ptlrpc_request *req;
29ac6840
CH
306 struct ost_body *body;
307 int rc;
d7e09d03
PT
308
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
310 if (req == NULL)
0a3bdb00 311 return -ENOMEM;
d7e09d03
PT
312
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
315 if (rc) {
316 ptlrpc_request_free(req);
0a3bdb00 317 return rc;
d7e09d03
PT
318 }
319
320 osc_pack_req_body(req, oinfo);
321
322 ptlrpc_request_set_replen(req);
323
324 rc = ptlrpc_queue_wait(req);
325 if (rc)
26c4ea46 326 goto out;
d7e09d03
PT
327
328 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
329 if (body == NULL) {
330 rc = -EPROTO;
331 goto out;
332 }
d7e09d03
PT
333
334 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 335 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
336 &body->oa);
d7e09d03
PT
337
338 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
339 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
340
d7e09d03
PT
341 out:
342 ptlrpc_req_finished(req);
343 return rc;
344}
345
346static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
347 struct obd_info *oinfo, struct obd_trans_info *oti)
348{
349 struct ptlrpc_request *req;
29ac6840
CH
350 struct ost_body *body;
351 int rc;
d7e09d03
PT
352
353 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
354
355 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
356 if (req == NULL)
0a3bdb00 357 return -ENOMEM;
d7e09d03
PT
358
359 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
360 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
361 if (rc) {
362 ptlrpc_request_free(req);
0a3bdb00 363 return rc;
d7e09d03
PT
364 }
365
366 osc_pack_req_body(req, oinfo);
367
368 ptlrpc_request_set_replen(req);
369
370 rc = ptlrpc_queue_wait(req);
371 if (rc)
26c4ea46 372 goto out;
d7e09d03
PT
373
374 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
375 if (body == NULL) {
376 rc = -EPROTO;
377 goto out;
378 }
d7e09d03 379
3b2f75fd 380 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
381 &body->oa);
d7e09d03 382
d7e09d03
PT
383out:
384 ptlrpc_req_finished(req);
0a3bdb00 385 return rc;
d7e09d03
PT
386}
387
388static int osc_setattr_interpret(const struct lu_env *env,
389 struct ptlrpc_request *req,
390 struct osc_setattr_args *sa, int rc)
391{
392 struct ost_body *body;
d7e09d03
PT
393
394 if (rc != 0)
26c4ea46 395 goto out;
d7e09d03
PT
396
397 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
398 if (body == NULL) {
399 rc = -EPROTO;
400 goto out;
401 }
d7e09d03 402
3b2f75fd 403 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
404 &body->oa);
d7e09d03
PT
405out:
406 rc = sa->sa_upcall(sa->sa_cookie, rc);
0a3bdb00 407 return rc;
d7e09d03
PT
408}
409
410int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
411 struct obd_trans_info *oti,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
414{
29ac6840 415 struct ptlrpc_request *req;
d7e09d03 416 struct osc_setattr_args *sa;
29ac6840 417 int rc;
d7e09d03
PT
418
419 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
420 if (req == NULL)
0a3bdb00 421 return -ENOMEM;
d7e09d03
PT
422
423 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
424 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
425 if (rc) {
426 ptlrpc_request_free(req);
0a3bdb00 427 return rc;
d7e09d03
PT
428 }
429
430 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
431 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
432
433 osc_pack_req_body(req, oinfo);
434
435 ptlrpc_request_set_replen(req);
436
437 /* do mds to ost setattr asynchronously */
438 if (!rqset) {
439 /* Do not wait for response. */
c5c4c6fa 440 ptlrpcd_add_req(req);
d7e09d03
PT
441 } else {
442 req->rq_interpret_reply =
443 (ptlrpc_interpterer_t)osc_setattr_interpret;
444
e72f36e2 445 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03
PT
446 sa = ptlrpc_req_async_args(req);
447 sa->sa_oa = oinfo->oi_oa;
448 sa->sa_upcall = upcall;
449 sa->sa_cookie = cookie;
450
451 if (rqset == PTLRPCD_SET)
c5c4c6fa 452 ptlrpcd_add_req(req);
d7e09d03
PT
453 else
454 ptlrpc_set_add_req(rqset, req);
455 }
456
0a3bdb00 457 return 0;
d7e09d03
PT
458}
459
460static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
461 struct obd_trans_info *oti,
462 struct ptlrpc_request_set *rqset)
463{
464 return osc_setattr_async_base(exp, oinfo, oti,
465 oinfo->oi_cb_up, oinfo, rqset);
466}
467
468int osc_real_create(struct obd_export *exp, struct obdo *oa,
469 struct lov_stripe_md **ea, struct obd_trans_info *oti)
470{
471 struct ptlrpc_request *req;
29ac6840
CH
472 struct ost_body *body;
473 struct lov_stripe_md *lsm;
474 int rc;
d7e09d03
PT
475
476 LASSERT(oa);
477 LASSERT(ea);
478
479 lsm = *ea;
480 if (!lsm) {
481 rc = obd_alloc_memmd(exp, &lsm);
482 if (rc < 0)
0a3bdb00 483 return rc;
d7e09d03
PT
484 }
485
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
26c4ea46
TJ
487 if (req == NULL) {
488 rc = -ENOMEM;
489 goto out;
490 }
d7e09d03
PT
491
492 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
493 if (rc) {
494 ptlrpc_request_free(req);
26c4ea46 495 goto out;
d7e09d03
PT
496 }
497
498 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
499 LASSERT(body);
3b2f75fd 500
501 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
502
503 ptlrpc_request_set_replen(req);
504
505 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
506 oa->o_flags == OBD_FL_DELORPHAN) {
507 DEBUG_REQ(D_HA, req,
508 "delorphan from OST integration");
509 /* Don't resend the delorphan req */
510 req->rq_no_resend = req->rq_no_delay = 1;
511 }
512
513 rc = ptlrpc_queue_wait(req);
514 if (rc)
26c4ea46 515 goto out_req;
d7e09d03
PT
516
517 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
26c4ea46
TJ
518 if (body == NULL) {
519 rc = -EPROTO;
520 goto out_req;
521 }
d7e09d03 522
3b2f75fd 523 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
524 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
525
526 oa->o_blksize = cli_brw_size(exp->exp_obd);
527 oa->o_valid |= OBD_MD_FLBLKSZ;
528
529 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
530 * have valid lsm_oinfo data structs, so don't go touching that.
531 * This needs to be fixed in a big way.
532 */
533 lsm->lsm_oi = oa->o_oi;
534 *ea = lsm;
535
536 if (oti != NULL) {
537 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
538
539 if (oa->o_valid & OBD_MD_FLCOOKIE) {
540 if (!oti->oti_logcookies)
541 oti_alloc_cookies(oti, 1);
542 *oti->oti_logcookies = oa->o_lcookie;
543 }
544 }
545
f537dd2c 546 CDEBUG(D_HA, "transno: %lld\n",
d7e09d03
PT
547 lustre_msg_get_transno(req->rq_repmsg));
548out_req:
549 ptlrpc_req_finished(req);
550out:
551 if (rc && !*ea)
552 obd_free_memmd(exp, &lsm);
0a3bdb00 553 return rc;
d7e09d03
PT
554}
555
556int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
557 obd_enqueue_update_f upcall, void *cookie,
558 struct ptlrpc_request_set *rqset)
559{
29ac6840 560 struct ptlrpc_request *req;
d7e09d03 561 struct osc_setattr_args *sa;
29ac6840
CH
562 struct ost_body *body;
563 int rc;
d7e09d03
PT
564
565 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
566 if (req == NULL)
0a3bdb00 567 return -ENOMEM;
d7e09d03
PT
568
569 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
570 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
571 if (rc) {
572 ptlrpc_request_free(req);
0a3bdb00 573 return rc;
d7e09d03
PT
574 }
575 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
576 ptlrpc_at_set_req_timeout(req);
577
578 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
579 LASSERT(body);
3b2f75fd 580 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
581 oinfo->oi_oa);
d7e09d03
PT
582 osc_pack_capa(req, body, oinfo->oi_capa);
583
584 ptlrpc_request_set_replen(req);
585
586 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
e72f36e2 587 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03 588 sa = ptlrpc_req_async_args(req);
29ac6840 589 sa->sa_oa = oinfo->oi_oa;
d7e09d03
PT
590 sa->sa_upcall = upcall;
591 sa->sa_cookie = cookie;
592 if (rqset == PTLRPCD_SET)
c5c4c6fa 593 ptlrpcd_add_req(req);
d7e09d03
PT
594 else
595 ptlrpc_set_add_req(rqset, req);
596
0a3bdb00 597 return 0;
d7e09d03
PT
598}
599
d7e09d03
PT
600static int osc_sync_interpret(const struct lu_env *env,
601 struct ptlrpc_request *req,
602 void *arg, int rc)
603{
604 struct osc_fsync_args *fa = arg;
605 struct ost_body *body;
d7e09d03
PT
606
607 if (rc)
26c4ea46 608 goto out;
d7e09d03
PT
609
610 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
611 if (body == NULL) {
e72f36e2 612 CERROR("can't unpack ost_body\n");
26c4ea46
TJ
613 rc = -EPROTO;
614 goto out;
d7e09d03
PT
615 }
616
617 *fa->fa_oi->oi_oa = body->oa;
618out:
619 rc = fa->fa_upcall(fa->fa_cookie, rc);
0a3bdb00 620 return rc;
d7e09d03
PT
621}
622
623int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
624 obd_enqueue_update_f upcall, void *cookie,
625 struct ptlrpc_request_set *rqset)
626{
627 struct ptlrpc_request *req;
29ac6840 628 struct ost_body *body;
d7e09d03 629 struct osc_fsync_args *fa;
29ac6840 630 int rc;
d7e09d03
PT
631
632 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
633 if (req == NULL)
0a3bdb00 634 return -ENOMEM;
d7e09d03
PT
635
636 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
637 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
638 if (rc) {
639 ptlrpc_request_free(req);
0a3bdb00 640 return rc;
d7e09d03
PT
641 }
642
643 /* overload the size and blocks fields in the oa with start/end */
644 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
645 LASSERT(body);
3b2f75fd 646 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
647 oinfo->oi_oa);
d7e09d03
PT
648 osc_pack_capa(req, body, oinfo->oi_capa);
649
650 ptlrpc_request_set_replen(req);
651 req->rq_interpret_reply = osc_sync_interpret;
652
653 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
654 fa = ptlrpc_req_async_args(req);
655 fa->fa_oi = oinfo;
656 fa->fa_upcall = upcall;
657 fa->fa_cookie = cookie;
658
659 if (rqset == PTLRPCD_SET)
c5c4c6fa 660 ptlrpcd_add_req(req);
d7e09d03
PT
661 else
662 ptlrpc_set_add_req(rqset, req);
663
0a3bdb00 664 return 0;
d7e09d03
PT
665}
666
d7e09d03
PT
667/* Find and cancel locally locks matched by @mode in the resource found by
668 * @objid. Found locks are added into @cancel list. Returns the amount of
669 * locks added to @cancels list. */
670static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
671 struct list_head *cancels,
875332d4 672 ldlm_mode_t mode, __u64 lock_flags)
d7e09d03
PT
673{
674 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
675 struct ldlm_res_id res_id;
676 struct ldlm_resource *res;
677 int count;
d7e09d03
PT
678
679 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
680 * export) but disabled through procfs (flag in NS).
681 *
682 * This distinguishes from a case when ELC is not supported originally,
683 * when we still want to cancel locks in advance and just cancel them
684 * locally, without sending any RPC. */
685 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
0a3bdb00 686 return 0;
d7e09d03
PT
687
688 ostid_build_res_name(&oa->o_oi, &res_id);
689 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
690 if (res == NULL)
0a3bdb00 691 return 0;
d7e09d03
PT
692
693 LDLM_RESOURCE_ADDREF(res);
694 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
695 lock_flags, 0, NULL);
696 LDLM_RESOURCE_DELREF(res);
697 ldlm_resource_putref(res);
0a3bdb00 698 return count;
d7e09d03
PT
699}
700
701static int osc_destroy_interpret(const struct lu_env *env,
702 struct ptlrpc_request *req, void *data,
703 int rc)
704{
705 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
706
707 atomic_dec(&cli->cl_destroy_in_flight);
708 wake_up(&cli->cl_destroy_waitq);
709 return 0;
710}
711
712static int osc_can_send_destroy(struct client_obd *cli)
713{
714 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
715 cli->cl_max_rpcs_in_flight) {
716 /* The destroy request can be sent */
717 return 1;
718 }
719 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
720 cli->cl_max_rpcs_in_flight) {
721 /*
722 * The counter has been modified between the two atomic
723 * operations.
724 */
725 wake_up(&cli->cl_destroy_waitq);
726 }
727 return 0;
728}
729
730int osc_create(const struct lu_env *env, struct obd_export *exp,
731 struct obdo *oa, struct lov_stripe_md **ea,
732 struct obd_trans_info *oti)
733{
734 int rc = 0;
d7e09d03
PT
735
736 LASSERT(oa);
737 LASSERT(ea);
738 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
739
740 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
741 oa->o_flags == OBD_FL_RECREATE_OBJS) {
0a3bdb00 742 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
743 }
744
745 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
0a3bdb00 746 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
747
748 /* we should not get here anymore */
749 LBUG();
750
0a3bdb00 751 return rc;
d7e09d03
PT
752}
753
754/* Destroy requests can be async always on the client, and we don't even really
755 * care about the return code since the client cannot do anything at all about
756 * a destroy failure.
757 * When the MDS is unlinking a filename, it saves the file objects into a
758 * recovery llog, and these object records are cancelled when the OST reports
759 * they were destroyed and sync'd to disk (i.e. transaction committed).
760 * If the client dies, or the OST is down when the object should be destroyed,
761 * the records are not cancelled, and when the OST reconnects to the MDS next,
762 * it will retrieve the llog unlink logs and then sends the log cancellation
763 * cookies to the MDS after committing destroy transactions. */
764static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
765 struct obdo *oa, struct lov_stripe_md *ea,
766 struct obd_trans_info *oti, struct obd_export *md_export,
767 void *capa)
768{
29ac6840 769 struct client_obd *cli = &exp->exp_obd->u.cli;
d7e09d03 770 struct ptlrpc_request *req;
29ac6840 771 struct ost_body *body;
d7e09d03
PT
772 LIST_HEAD(cancels);
773 int rc, count;
d7e09d03
PT
774
775 if (!oa) {
776 CDEBUG(D_INFO, "oa NULL\n");
0a3bdb00 777 return -EINVAL;
d7e09d03
PT
778 }
779
780 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
781 LDLM_FL_DISCARD_DATA);
782
783 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
784 if (req == NULL) {
785 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 786 return -ENOMEM;
d7e09d03
PT
787 }
788
789 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
790 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
791 0, &cancels, count);
792 if (rc) {
793 ptlrpc_request_free(req);
0a3bdb00 794 return rc;
d7e09d03
PT
795 }
796
797 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
798 ptlrpc_at_set_req_timeout(req);
799
800 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
801 oa->o_lcookie = *oti->oti_logcookies;
802 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
803 LASSERT(body);
3b2f75fd 804 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
805
806 osc_pack_capa(req, body, (struct obd_capa *)capa);
807 ptlrpc_request_set_replen(req);
808
11d66e89 809 /* If osc_destroy is for destroying the unlink orphan,
d7e09d03
PT
810 * sent from MDT to OST, which should not be blocked here,
811 * because the process might be triggered by ptlrpcd, and
812 * it is not good to block ptlrpcd thread (b=16006)*/
813 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
814 req->rq_interpret_reply = osc_destroy_interpret;
815 if (!osc_can_send_destroy(cli)) {
816 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
817 NULL);
818
819 /*
820 * Wait until the number of on-going destroy RPCs drops
821 * under max_rpc_in_flight
822 */
823 l_wait_event_exclusive(cli->cl_destroy_waitq,
824 osc_can_send_destroy(cli), &lwi);
825 }
826 }
827
828 /* Do not wait for response */
c5c4c6fa 829 ptlrpcd_add_req(req);
0a3bdb00 830 return 0;
d7e09d03
PT
831}
832
833static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
834 long writing_bytes)
835{
21aef7d9 836 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
d7e09d03
PT
837
838 LASSERT(!(oa->o_valid & bits));
839
840 oa->o_valid |= bits;
841 client_obd_list_lock(&cli->cl_loi_list_lock);
842 oa->o_dirty = cli->cl_dirty;
843 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
844 cli->cl_dirty_max)) {
845 CERROR("dirty %lu - %lu > dirty_max %lu\n",
846 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
847 oa->o_undirty = 0;
c52f69c5 848 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
849 atomic_read(&obd_dirty_transit_pages) >
850 (long)(obd_max_dirty_pages + 1))) {
851 /* The atomic_read() allowing the atomic_inc() are
852 * not covered by a lock thus they may safely race and trip
853 * this CERROR() unless we add in a small fudge factor (+1). */
c52f69c5 854 CERROR("dirty %d - %d > system dirty_max %d\n",
d7e09d03
PT
855 atomic_read(&obd_dirty_pages),
856 atomic_read(&obd_dirty_transit_pages),
857 obd_max_dirty_pages);
858 oa->o_undirty = 0;
859 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
860 CERROR("dirty %lu - dirty_max %lu too big???\n",
861 cli->cl_dirty, cli->cl_dirty_max);
862 oa->o_undirty = 0;
863 } else {
864 long max_in_flight = (cli->cl_max_pages_per_rpc <<
865 PAGE_CACHE_SHIFT)*
866 (cli->cl_max_rpcs_in_flight + 1);
867 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
868 }
869 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
870 oa->o_dropped = cli->cl_lost_grant;
871 cli->cl_lost_grant = 0;
872 client_obd_list_unlock(&cli->cl_loi_list_lock);
1d8cb70c 873 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
d7e09d03
PT
874 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
875
876}
877
878void osc_update_next_shrink(struct client_obd *cli)
879{
880 cli->cl_next_shrink_grant =
881 cfs_time_shift(cli->cl_grant_shrink_interval);
882 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
883 cli->cl_next_shrink_grant);
884}
885
21aef7d9 886static void __osc_update_grant(struct client_obd *cli, u64 grant)
d7e09d03
PT
887{
888 client_obd_list_lock(&cli->cl_loi_list_lock);
889 cli->cl_avail_grant += grant;
890 client_obd_list_unlock(&cli->cl_loi_list_lock);
891}
892
893static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
894{
895 if (body->oa.o_valid & OBD_MD_FLGRANT) {
b0f5aad5 896 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
d7e09d03
PT
897 __osc_update_grant(cli, body->oa.o_grant);
898 }
899}
900
901static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 902 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
903 void *val, struct ptlrpc_request_set *set);
904
905static int osc_shrink_grant_interpret(const struct lu_env *env,
906 struct ptlrpc_request *req,
907 void *aa, int rc)
908{
909 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
f024bad4 910 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
d7e09d03
PT
911 struct ost_body *body;
912
913 if (rc != 0) {
914 __osc_update_grant(cli, oa->o_grant);
26c4ea46 915 goto out;
d7e09d03
PT
916 }
917
918 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
919 LASSERT(body);
920 osc_update_grant(cli, body);
921out:
922 OBDO_FREE(oa);
923 return rc;
924}
925
926static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
927{
928 client_obd_list_lock(&cli->cl_loi_list_lock);
929 oa->o_grant = cli->cl_avail_grant / 4;
930 cli->cl_avail_grant -= oa->o_grant;
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
932 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
933 oa->o_valid |= OBD_MD_FLFLAGS;
934 oa->o_flags = 0;
935 }
936 oa->o_flags |= OBD_FL_SHRINK_GRANT;
937 osc_update_next_shrink(cli);
938}
939
940/* Shrink the current grant, either from some large amount to enough for a
941 * full set of in-flight RPCs, or if we have already shrunk to that limit
942 * then to enough for a single RPC. This avoids keeping more grant than
943 * needed, and avoids shrinking the grant piecemeal. */
944static int osc_shrink_grant(struct client_obd *cli)
945{
946 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
947 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
948
949 client_obd_list_lock(&cli->cl_loi_list_lock);
950 if (cli->cl_avail_grant <= target_bytes)
951 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
952 client_obd_list_unlock(&cli->cl_loi_list_lock);
953
954 return osc_shrink_grant_to_target(cli, target_bytes);
955}
956
957int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
958{
29ac6840 959 int rc = 0;
d7e09d03 960 struct ost_body *body;
d7e09d03
PT
961
962 client_obd_list_lock(&cli->cl_loi_list_lock);
963 /* Don't shrink if we are already above or below the desired limit
964 * We don't want to shrink below a single RPC, as that will negatively
965 * impact block allocation and long-term performance. */
966 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
967 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
968
969 if (target_bytes >= cli->cl_avail_grant) {
970 client_obd_list_unlock(&cli->cl_loi_list_lock);
0a3bdb00 971 return 0;
d7e09d03
PT
972 }
973 client_obd_list_unlock(&cli->cl_loi_list_lock);
974
7795178d 975 body = kzalloc(sizeof(*body), GFP_NOFS);
d7e09d03 976 if (!body)
0a3bdb00 977 return -ENOMEM;
d7e09d03
PT
978
979 osc_announce_cached(cli, &body->oa, 0);
980
981 client_obd_list_lock(&cli->cl_loi_list_lock);
982 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
983 cli->cl_avail_grant = target_bytes;
984 client_obd_list_unlock(&cli->cl_loi_list_lock);
985 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
986 body->oa.o_valid |= OBD_MD_FLFLAGS;
987 body->oa.o_flags = 0;
988 }
989 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
990 osc_update_next_shrink(cli);
991
992 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
993 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
994 sizeof(*body), body, NULL);
995 if (rc != 0)
996 __osc_update_grant(cli, body->oa.o_grant);
7795178d 997 kfree(body);
0a3bdb00 998 return rc;
d7e09d03
PT
999}
1000
1001static int osc_should_shrink_grant(struct client_obd *client)
1002{
a649ad1d
GKH
1003 unsigned long time = cfs_time_current();
1004 unsigned long next_shrink = client->cl_next_shrink_grant;
d7e09d03
PT
1005
1006 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1007 OBD_CONNECT_GRANT_SHRINK) == 0)
1008 return 0;
1009
1010 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1011 /* Get the current RPC size directly, instead of going via:
1012 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1013 * Keep comment here so that it can be found by searching. */
1014 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1015
1016 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1017 client->cl_avail_grant > brw_size)
1018 return 1;
71e8dd9a
AM
1019
1020 osc_update_next_shrink(client);
d7e09d03
PT
1021 }
1022 return 0;
1023}
1024
1025static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1026{
1027 struct client_obd *client;
1028
1029 list_for_each_entry(client, &item->ti_obd_list,
1030 cl_grant_shrink_list) {
1031 if (osc_should_shrink_grant(client))
1032 osc_shrink_grant(client);
1033 }
1034 return 0;
1035}
1036
1037static int osc_add_shrink_grant(struct client_obd *client)
1038{
1039 int rc;
1040
1041 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1042 TIMEOUT_GRANT,
1043 osc_grant_shrink_grant_cb, NULL,
1044 &client->cl_grant_shrink_list);
1045 if (rc) {
1046 CERROR("add grant client %s error %d\n",
1047 client->cl_import->imp_obd->obd_name, rc);
1048 return rc;
1049 }
1050 CDEBUG(D_CACHE, "add grant client %s \n",
1051 client->cl_import->imp_obd->obd_name);
1052 osc_update_next_shrink(client);
1053 return 0;
1054}
1055
1056static int osc_del_shrink_grant(struct client_obd *client)
1057{
1058 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1059 TIMEOUT_GRANT);
1060}
1061
1062static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1063{
1064 /*
1065 * ocd_grant is the total grant amount we're expect to hold: if we've
1066 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1067 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1068 *
1069 * race is tolerable here: if we're evicted, but imp_state already
1070 * left EVICTED state, then cl_dirty must be 0 already.
1071 */
1072 client_obd_list_lock(&cli->cl_loi_list_lock);
1073 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1074 cli->cl_avail_grant = ocd->ocd_grant;
1075 else
1076 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1077
1078 if (cli->cl_avail_grant < 0) {
1079 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1080 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1081 ocd->ocd_grant, cli->cl_dirty);
1082 /* workaround for servers which do not have the patch from
1083 * LU-2679 */
1084 cli->cl_avail_grant = ocd->ocd_grant;
1085 }
1086
1087 /* determine the appropriate chunk size used by osc_extent. */
1088 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1089 client_obd_list_unlock(&cli->cl_loi_list_lock);
1090
2d00bd17
JP
1091 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1092 cli->cl_import->imp_obd->obd_name,
1093 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
d7e09d03
PT
1094
1095 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1096 list_empty(&cli->cl_grant_shrink_list))
1097 osc_add_shrink_grant(cli);
1098}
1099
1100/* We assume that the reason this OSC got a short read is because it read
1101 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1102 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1103 * this stripe never got written at or beyond this stripe offset yet. */
21aef7d9 1104static void handle_short_read(int nob_read, u32 page_count,
d7e09d03
PT
1105 struct brw_page **pga)
1106{
1107 char *ptr;
1108 int i = 0;
1109
1110 /* skip bytes read OK */
1111 while (nob_read > 0) {
e72f36e2 1112 LASSERT(page_count > 0);
d7e09d03
PT
1113
1114 if (pga[i]->count > nob_read) {
1115 /* EOF inside this page */
1116 ptr = kmap(pga[i]->pg) +
1117 (pga[i]->off & ~CFS_PAGE_MASK);
1118 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1119 kunmap(pga[i]->pg);
1120 page_count--;
1121 i++;
1122 break;
1123 }
1124
1125 nob_read -= pga[i]->count;
1126 page_count--;
1127 i++;
1128 }
1129
1130 /* zero remaining pages */
1131 while (page_count-- > 0) {
1132 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1133 memset(ptr, 0, pga[i]->count);
1134 kunmap(pga[i]->pg);
1135 i++;
1136 }
1137}
1138
1139static int check_write_rcs(struct ptlrpc_request *req,
1140 int requested_nob, int niocount,
21aef7d9 1141 u32 page_count, struct brw_page **pga)
d7e09d03 1142{
29ac6840
CH
1143 int i;
1144 __u32 *remote_rcs;
d7e09d03
PT
1145
1146 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1147 sizeof(*remote_rcs) *
1148 niocount);
1149 if (remote_rcs == NULL) {
1150 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
fbe7c6c7 1151 return -EPROTO;
d7e09d03
PT
1152 }
1153
1154 /* return error if any niobuf was in error */
1155 for (i = 0; i < niocount; i++) {
1156 if ((int)remote_rcs[i] < 0)
e8291974 1157 return remote_rcs[i];
d7e09d03
PT
1158
1159 if (remote_rcs[i] != 0) {
1160 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1161 i, remote_rcs[i], req);
fbe7c6c7 1162 return -EPROTO;
d7e09d03
PT
1163 }
1164 }
1165
1166 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1167 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1168 req->rq_bulk->bd_nob_transferred, requested_nob);
fbe7c6c7 1169 return -EPROTO;
d7e09d03
PT
1170 }
1171
fbe7c6c7 1172 return 0;
d7e09d03
PT
1173}
1174
1175static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1176{
1177 if (p1->flag != p2->flag) {
7cf1054b
HE
1178 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1179 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
d7e09d03
PT
1180
1181 /* warn if we try to combine flags that we don't know to be
1182 * safe to combine */
1183 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
2d00bd17 1184 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
d7e09d03
PT
1185 p1->flag, p2->flag);
1186 }
1187 return 0;
1188 }
1189
1190 return (p1->off + p1->count == p2->off);
1191}
1192
21aef7d9 1193static u32 osc_checksum_bulk(int nob, u32 pg_count,
29ac6840
CH
1194 struct brw_page **pga, int opc,
1195 cksum_type_t cksum_type)
d7e09d03 1196{
29ac6840
CH
1197 __u32 cksum;
1198 int i = 0;
1199 struct cfs_crypto_hash_desc *hdesc;
1200 unsigned int bufsize;
1201 int err;
1202 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
d7e09d03
PT
1203
1204 LASSERT(pg_count > 0);
1205
1206 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207 if (IS_ERR(hdesc)) {
1208 CERROR("Unable to initialize checksum hash %s\n",
1209 cfs_crypto_hash_name(cfs_alg));
1210 return PTR_ERR(hdesc);
1211 }
1212
1213 while (nob > 0 && pg_count > 0) {
1214 int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216 /* corrupt the data before we compute the checksum, to
1217 * simulate an OST->client data error */
1218 if (i == 0 && opc == OST_READ &&
1219 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1220 unsigned char *ptr = kmap(pga[i]->pg);
1221 int off = pga[i]->off & ~CFS_PAGE_MASK;
1222 memcpy(ptr + off, "bad1", min(4, nob));
1223 kunmap(pga[i]->pg);
1224 }
1225 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1226 pga[i]->off & ~CFS_PAGE_MASK,
1227 count);
aa3bee0d
GKH
1228 CDEBUG(D_PAGE,
1229 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1230 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1231 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1232 page_private(pga[i]->pg),
1233 (int)(pga[i]->off & ~CFS_PAGE_MASK));
d7e09d03
PT
1234
1235 nob -= pga[i]->count;
1236 pg_count--;
1237 i++;
1238 }
1239
1240 bufsize = 4;
1241 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1242
1243 if (err)
1244 cfs_crypto_hash_final(hdesc, NULL, NULL);
1245
1246 /* For sending we only compute the wrong checksum instead
1247 * of corrupting the data so it is still correct on a redo */
1248 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1249 cksum++;
1250
1251 return cksum;
1252}
1253
1d8cb70c
GD
1254static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1255 struct obdo *oa,
21aef7d9 1256 struct lov_stripe_md *lsm, u32 page_count,
d7e09d03
PT
1257 struct brw_page **pga,
1258 struct ptlrpc_request **reqp,
1259 struct obd_capa *ocapa, int reserve,
1260 int resend)
1261{
29ac6840 1262 struct ptlrpc_request *req;
d7e09d03 1263 struct ptlrpc_bulk_desc *desc;
29ac6840
CH
1264 struct ost_body *body;
1265 struct obd_ioobj *ioobj;
1266 struct niobuf_remote *niobuf;
d7e09d03
PT
1267 int niocount, i, requested_nob, opc, rc;
1268 struct osc_brw_async_args *aa;
29ac6840 1269 struct req_capsule *pill;
d7e09d03
PT
1270 struct brw_page *pg_prev;
1271
d7e09d03 1272 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
0a3bdb00 1273 return -ENOMEM; /* Recoverable */
d7e09d03 1274 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
0a3bdb00 1275 return -EINVAL; /* Fatal */
d7e09d03
PT
1276
1277 if ((cmd & OBD_BRW_WRITE) != 0) {
1278 opc = OST_WRITE;
1279 req = ptlrpc_request_alloc_pool(cli->cl_import,
aefd9d71 1280 osc_rq_pool,
d7e09d03
PT
1281 &RQF_OST_BRW_WRITE);
1282 } else {
1283 opc = OST_READ;
1284 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1285 }
1286 if (req == NULL)
0a3bdb00 1287 return -ENOMEM;
d7e09d03
PT
1288
1289 for (niocount = i = 1; i < page_count; i++) {
1290 if (!can_merge_pages(pga[i - 1], pga[i]))
1291 niocount++;
1292 }
1293
1294 pill = &req->rq_pill;
1295 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1296 sizeof(*ioobj));
1297 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1298 niocount * sizeof(*niobuf));
1299 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1300
1301 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1302 if (rc) {
1303 ptlrpc_request_free(req);
0a3bdb00 1304 return rc;
d7e09d03
PT
1305 }
1306 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1307 ptlrpc_at_set_req_timeout(req);
1308 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1309 * retry logic */
1310 req->rq_no_retry_einprogress = 1;
1311
1312 desc = ptlrpc_prep_bulk_imp(req, page_count,
1313 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1314 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1315 OST_BULK_PORTAL);
1316
26c4ea46
TJ
1317 if (desc == NULL) {
1318 rc = -ENOMEM;
1319 goto out;
1320 }
d7e09d03
PT
1321 /* NB request now owns desc and will free it when it gets freed */
1322
1323 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1324 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1325 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1326 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1327
3b2f75fd 1328 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1329
1330 obdo_to_ioobj(oa, ioobj);
1331 ioobj->ioo_bufcnt = niocount;
1332 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1333 * that might be send for this request. The actual number is decided
1334 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1335 * "max - 1" for old client compatibility sending "0", and also so the
1336 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1337 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1338 osc_pack_capa(req, body, ocapa);
1339 LASSERT(page_count > 0);
1340 pg_prev = pga[0];
1341 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1342 struct brw_page *pg = pga[i];
1343 int poff = pg->off & ~CFS_PAGE_MASK;
1344
1345 LASSERT(pg->count > 0);
1346 /* make sure there is no gap in the middle of page array */
1347 LASSERTF(page_count == 1 ||
1348 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1349 ergo(i > 0 && i < page_count - 1,
1350 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1351 ergo(i == page_count - 1, poff == 0)),
b0f5aad5 1352 "i: %d/%d pg: %p off: %llu, count: %u\n",
d7e09d03
PT
1353 i, page_count, pg, pg->off, pg->count);
1354 LASSERTF(i == 0 || pg->off > pg_prev->off,
2d00bd17 1355 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
d7e09d03
PT
1356 i, page_count,
1357 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1358 pg_prev->pg, page_private(pg_prev->pg),
1359 pg_prev->pg->index, pg_prev->off);
1360 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1361 (pg->flag & OBD_BRW_SRVLOCK));
1362
1363 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1364 requested_nob += pg->count;
1365
1366 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1367 niobuf--;
1368 niobuf->len += pg->count;
1369 } else {
1370 niobuf->offset = pg->off;
29ac6840
CH
1371 niobuf->len = pg->count;
1372 niobuf->flags = pg->flag;
d7e09d03
PT
1373 }
1374 pg_prev = pg;
1375 }
1376
1377 LASSERTF((void *)(niobuf - niocount) ==
1378 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1379 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1380 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1381
1382 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1383 if (resend) {
1384 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1385 body->oa.o_valid |= OBD_MD_FLFLAGS;
1386 body->oa.o_flags = 0;
1387 }
1388 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1389 }
1390
1391 if (osc_should_shrink_grant(cli))
1392 osc_shrink_grant_local(cli, &body->oa);
1393
1394 /* size[REQ_REC_OFF] still sizeof (*body) */
1395 if (opc == OST_WRITE) {
1396 if (cli->cl_checksum &&
1397 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1398 /* store cl_cksum_type in a local variable since
1399 * it can be changed via lprocfs */
1400 cksum_type_t cksum_type = cli->cl_cksum_type;
1401
1402 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1403 oa->o_flags &= OBD_FL_LOCAL_MASK;
1404 body->oa.o_flags = 0;
1405 }
1406 body->oa.o_flags |= cksum_type_pack(cksum_type);
1407 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1408 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1409 page_count, pga,
1410 OST_WRITE,
1411 cksum_type);
1412 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1413 body->oa.o_cksum);
1414 /* save this in 'oa', too, for later checking */
1415 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1416 oa->o_flags |= cksum_type_pack(cksum_type);
1417 } else {
1418 /* clear out the checksum flag, in case this is a
1419 * resend but cl_checksum is no longer set. b=11238 */
1420 oa->o_valid &= ~OBD_MD_FLCKSUM;
1421 }
1422 oa->o_cksum = body->oa.o_cksum;
1423 /* 1 RC per niobuf */
1424 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1425 sizeof(__u32) * niocount);
1426 } else {
1427 if (cli->cl_checksum &&
1428 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1429 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1430 body->oa.o_flags = 0;
1431 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1432 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1433 }
1434 }
1435 ptlrpc_request_set_replen(req);
1436
1437 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1438 aa = ptlrpc_req_async_args(req);
1439 aa->aa_oa = oa;
1440 aa->aa_requested_nob = requested_nob;
1441 aa->aa_nio_count = niocount;
1442 aa->aa_page_count = page_count;
1443 aa->aa_resends = 0;
1444 aa->aa_ppga = pga;
1445 aa->aa_cli = cli;
1446 INIT_LIST_HEAD(&aa->aa_oaps);
1447 if (ocapa && reserve)
1448 aa->aa_ocapa = capa_get(ocapa);
1449
1450 *reqp = req;
0a3bdb00 1451 return 0;
d7e09d03
PT
1452
1453 out:
1454 ptlrpc_req_finished(req);
0a3bdb00 1455 return rc;
d7e09d03
PT
1456}
1457
1458static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1459 __u32 client_cksum, __u32 server_cksum, int nob,
21aef7d9 1460 u32 page_count, struct brw_page **pga,
d7e09d03
PT
1461 cksum_type_t client_cksum_type)
1462{
1463 __u32 new_cksum;
1464 char *msg;
1465 cksum_type_t cksum_type;
1466
1467 if (server_cksum == client_cksum) {
1468 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1469 return 0;
1470 }
1471
1472 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1473 oa->o_flags : 0);
1474 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1475 cksum_type);
1476
1477 if (cksum_type != client_cksum_type)
2d00bd17
JP
1478 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1479 ;
d7e09d03 1480 else if (new_cksum == server_cksum)
2d00bd17
JP
1481 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1482 ;
d7e09d03
PT
1483 else if (new_cksum == client_cksum)
1484 msg = "changed in transit before arrival at OST";
1485 else
2d00bd17
JP
1486 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1487 ;
d7e09d03
PT
1488
1489 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
b0f5aad5 1490 " object "DOSTID" extent [%llu-%llu]\n",
d7e09d03
PT
1491 msg, libcfs_nid2str(peer->nid),
1492 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1493 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1494 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1495 POSTID(&oa->o_oi), pga[0]->off,
1496 pga[page_count-1]->off + pga[page_count-1]->count - 1);
2d00bd17
JP
1497 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1498 client_cksum, client_cksum_type,
d7e09d03
PT
1499 server_cksum, cksum_type, new_cksum);
1500 return 1;
1501}
1502
1503/* Note rc enters this function as number of bytes transferred */
1504static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1505{
1506 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1507 const lnet_process_id_t *peer =
1508 &req->rq_import->imp_connection->c_peer;
1509 struct client_obd *cli = aa->aa_cli;
1510 struct ost_body *body;
1511 __u32 client_cksum = 0;
d7e09d03
PT
1512
1513 if (rc < 0 && rc != -EDQUOT) {
1514 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
0a3bdb00 1515 return rc;
d7e09d03
PT
1516 }
1517
1518 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1519 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1520 if (body == NULL) {
1521 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
0a3bdb00 1522 return -EPROTO;
d7e09d03
PT
1523 }
1524
1525 /* set/clear over quota flag for a uid/gid */
1526 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1529
55f5a824 1530 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
d7e09d03
PT
1531 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1532 body->oa.o_flags);
1533 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1534 }
1535
1536 osc_update_grant(cli, body);
1537
1538 if (rc < 0)
0a3bdb00 1539 return rc;
d7e09d03
PT
1540
1541 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1543
1544 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1545 if (rc > 0) {
1546 CERROR("Unexpected +ve rc %d\n", rc);
0a3bdb00 1547 return -EPROTO;
d7e09d03
PT
1548 }
1549 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1550
1551 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
0a3bdb00 1552 return -EAGAIN;
d7e09d03
PT
1553
1554 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555 check_write_checksum(&body->oa, peer, client_cksum,
1556 body->oa.o_cksum, aa->aa_requested_nob,
1557 aa->aa_page_count, aa->aa_ppga,
1558 cksum_type_unpack(aa->aa_oa->o_flags)))
0a3bdb00 1559 return -EAGAIN;
d7e09d03 1560
1d8cb70c
GD
1561 rc = check_write_rcs(req, aa->aa_requested_nob,
1562 aa->aa_nio_count,
d7e09d03 1563 aa->aa_page_count, aa->aa_ppga);
26c4ea46 1564 goto out;
d7e09d03
PT
1565 }
1566
1567 /* The rest of this function executes only for OST_READs */
1568
1569 /* if unwrap_bulk failed, return -EAGAIN to retry */
1570 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
26c4ea46
TJ
1571 if (rc < 0) {
1572 rc = -EAGAIN;
1573 goto out;
1574 }
d7e09d03
PT
1575
1576 if (rc > aa->aa_requested_nob) {
1577 CERROR("Unexpected rc %d (%d requested)\n", rc,
1578 aa->aa_requested_nob);
0a3bdb00 1579 return -EPROTO;
d7e09d03
PT
1580 }
1581
1582 if (rc != req->rq_bulk->bd_nob_transferred) {
e72f36e2 1583 CERROR("Unexpected rc %d (%d transferred)\n",
d7e09d03 1584 rc, req->rq_bulk->bd_nob_transferred);
fbe7c6c7 1585 return -EPROTO;
d7e09d03
PT
1586 }
1587
1588 if (rc < aa->aa_requested_nob)
1589 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1590
1591 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1592 static int cksum_counter;
29ac6840
CH
1593 __u32 server_cksum = body->oa.o_cksum;
1594 char *via;
1595 char *router;
d7e09d03
PT
1596 cksum_type_t cksum_type;
1597
b2952d62 1598 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
d7e09d03
PT
1599 body->oa.o_flags : 0);
1600 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1601 aa->aa_ppga, OST_READ,
1602 cksum_type);
1603
1604 if (peer->nid == req->rq_bulk->bd_sender) {
1605 via = router = "";
1606 } else {
1607 via = " via ";
1608 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1609 }
1610
a2ff0f97 1611 if (server_cksum != client_cksum) {
2d00bd17 1612 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
d7e09d03
PT
1613 req->rq_import->imp_obd->obd_name,
1614 libcfs_nid2str(peer->nid),
1615 via, router,
1616 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1617 body->oa.o_parent_seq : (__u64)0,
d7e09d03 1618 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1619 body->oa.o_parent_oid : 0,
d7e09d03 1620 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1621 body->oa.o_parent_ver : 0,
d7e09d03
PT
1622 POSTID(&body->oa.o_oi),
1623 aa->aa_ppga[0]->off,
1624 aa->aa_ppga[aa->aa_page_count-1]->off +
1625 aa->aa_ppga[aa->aa_page_count-1]->count -
2d00bd17 1626 1);
d7e09d03
PT
1627 CERROR("client %x, server %x, cksum_type %x\n",
1628 client_cksum, server_cksum, cksum_type);
1629 cksum_counter = 0;
1630 aa->aa_oa->o_cksum = client_cksum;
1631 rc = -EAGAIN;
1632 } else {
1633 cksum_counter++;
1634 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1635 rc = 0;
1636 }
1637 } else if (unlikely(client_cksum)) {
1638 static int cksum_missed;
1639
1640 cksum_missed++;
1641 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1642 CERROR("Checksum %u requested from %s but not sent\n",
1643 cksum_missed, libcfs_nid2str(peer->nid));
1644 } else {
1645 rc = 0;
1646 }
1647out:
1648 if (rc >= 0)
3b2f75fd 1649 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1650 aa->aa_oa, &body->oa);
d7e09d03 1651
0a3bdb00 1652 return rc;
d7e09d03
PT
1653}
1654
d7e09d03
PT
1655static int osc_brw_redo_request(struct ptlrpc_request *request,
1656 struct osc_brw_async_args *aa, int rc)
1657{
1658 struct ptlrpc_request *new_req;
1659 struct osc_brw_async_args *new_aa;
1660 struct osc_async_page *oap;
d7e09d03
PT
1661
1662 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1663 "redo for recoverable error %d", rc);
1664
1665 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
b2952d62 1666 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
d7e09d03
PT
1667 aa->aa_cli, aa->aa_oa,
1668 NULL /* lsm unused by osc currently */,
1669 aa->aa_page_count, aa->aa_ppga,
1670 &new_req, aa->aa_ocapa, 0, 1);
1671 if (rc)
0a3bdb00 1672 return rc;
d7e09d03
PT
1673
1674 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1675 if (oap->oap_request != NULL) {
1676 LASSERTF(request == oap->oap_request,
1677 "request %p != oap_request %p\n",
1678 request, oap->oap_request);
1679 if (oap->oap_interrupted) {
1680 ptlrpc_req_finished(new_req);
0a3bdb00 1681 return -EINTR;
d7e09d03
PT
1682 }
1683 }
1684 }
1685 /* New request takes over pga and oaps from old request.
1686 * Note that copying a list_head doesn't work, need to move it... */
1687 aa->aa_resends++;
1688 new_req->rq_interpret_reply = request->rq_interpret_reply;
1689 new_req->rq_async_args = request->rq_async_args;
d7e09d03
PT
1690 /* cap resend delay to the current request timeout, this is similar to
1691 * what ptlrpc does (see after_reply()) */
1692 if (aa->aa_resends > new_req->rq_timeout)
7264b8a5 1693 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
d7e09d03 1694 else
7264b8a5 1695 new_req->rq_sent = get_seconds() + aa->aa_resends;
d7e09d03
PT
1696 new_req->rq_generation_set = 1;
1697 new_req->rq_import_generation = request->rq_import_generation;
1698
1699 new_aa = ptlrpc_req_async_args(new_req);
1700
1701 INIT_LIST_HEAD(&new_aa->aa_oaps);
1702 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1703 INIT_LIST_HEAD(&new_aa->aa_exts);
1704 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1705 new_aa->aa_resends = aa->aa_resends;
1706
1707 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1708 if (oap->oap_request) {
1709 ptlrpc_req_finished(oap->oap_request);
1710 oap->oap_request = ptlrpc_request_addref(new_req);
1711 }
1712 }
1713
1714 new_aa->aa_ocapa = aa->aa_ocapa;
1715 aa->aa_ocapa = NULL;
1716
1717 /* XXX: This code will run into problem if we're going to support
1718 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1719 * and wait for all of them to be finished. We should inherit request
1720 * set from old request. */
c5c4c6fa 1721 ptlrpcd_add_req(new_req);
d7e09d03
PT
1722
1723 DEBUG_REQ(D_INFO, new_req, "new request");
0a3bdb00 1724 return 0;
d7e09d03
PT
1725}
1726
1727/*
1728 * ugh, we want disk allocation on the target to happen in offset order. we'll
1729 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1730 * fine for our small page arrays and doesn't require allocation. its an
1731 * insertion sort that swaps elements that are strides apart, shrinking the
1732 * stride down until its '1' and the array is sorted.
1733 */
1734static void sort_brw_pages(struct brw_page **array, int num)
1735{
1736 int stride, i, j;
1737 struct brw_page *tmp;
1738
1739 if (num == 1)
1740 return;
1741 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1742 ;
1743
1744 do {
1745 stride /= 3;
1746 for (i = stride ; i < num ; i++) {
1747 tmp = array[i];
1748 j = i;
1749 while (j >= stride && array[j - stride]->off > tmp->off) {
1750 array[j] = array[j - stride];
1751 j -= stride;
1752 }
1753 array[j] = tmp;
1754 }
1755 } while (stride > 1);
1756}
1757
21aef7d9 1758static void osc_release_ppga(struct brw_page **ppga, u32 count)
d7e09d03
PT
1759{
1760 LASSERT(ppga != NULL);
7795178d 1761 kfree(ppga);
d7e09d03
PT
1762}
1763
d7e09d03
PT
1764static int brw_interpret(const struct lu_env *env,
1765 struct ptlrpc_request *req, void *data, int rc)
1766{
1767 struct osc_brw_async_args *aa = data;
1768 struct osc_extent *ext;
1769 struct osc_extent *tmp;
29ac6840 1770 struct cl_object *obj = NULL;
d7e09d03 1771 struct client_obd *cli = aa->aa_cli;
d7e09d03
PT
1772
1773 rc = osc_brw_fini_request(req, rc);
1774 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1775 /* When server return -EINPROGRESS, client should always retry
1776 * regardless of the number of times the bulk was resent already. */
1777 if (osc_recoverable_error(rc)) {
1778 if (req->rq_import_generation !=
1779 req->rq_import->imp_generation) {
2d00bd17 1780 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
d7e09d03
PT
1781 req->rq_import->imp_obd->obd_name,
1782 POSTID(&aa->aa_oa->o_oi), rc);
1783 } else if (rc == -EINPROGRESS ||
1784 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1785 rc = osc_brw_redo_request(req, aa, rc);
1786 } else {
b0f5aad5 1787 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
d7e09d03
PT
1788 req->rq_import->imp_obd->obd_name,
1789 POSTID(&aa->aa_oa->o_oi), rc);
1790 }
1791
1792 if (rc == 0)
0a3bdb00 1793 return 0;
d7e09d03
PT
1794 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1795 rc = -EIO;
1796 }
1797
1798 if (aa->aa_ocapa) {
1799 capa_put(aa->aa_ocapa);
1800 aa->aa_ocapa = NULL;
1801 }
1802
1803 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1804 if (obj == NULL && rc == 0) {
1805 obj = osc2cl(ext->oe_obj);
1806 cl_object_get(obj);
1807 }
1808
1809 list_del_init(&ext->oe_link);
1810 osc_extent_finish(env, ext, 1, rc);
1811 }
1812 LASSERT(list_empty(&aa->aa_exts));
1813 LASSERT(list_empty(&aa->aa_oaps));
1814
1815 if (obj != NULL) {
1816 struct obdo *oa = aa->aa_oa;
1817 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1818 unsigned long valid = 0;
1819
1820 LASSERT(rc == 0);
1821 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1822 attr->cat_blocks = oa->o_blocks;
1823 valid |= CAT_BLOCKS;
1824 }
1825 if (oa->o_valid & OBD_MD_FLMTIME) {
1826 attr->cat_mtime = oa->o_mtime;
1827 valid |= CAT_MTIME;
1828 }
1829 if (oa->o_valid & OBD_MD_FLATIME) {
1830 attr->cat_atime = oa->o_atime;
1831 valid |= CAT_ATIME;
1832 }
1833 if (oa->o_valid & OBD_MD_FLCTIME) {
1834 attr->cat_ctime = oa->o_ctime;
1835 valid |= CAT_CTIME;
1836 }
1837 if (valid != 0) {
1838 cl_object_attr_lock(obj);
1839 cl_object_attr_set(env, obj, attr, valid);
1840 cl_object_attr_unlock(obj);
1841 }
1842 cl_object_put(env, obj);
1843 }
1844 OBDO_FREE(aa->aa_oa);
1845
1846 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1847 req->rq_bulk->bd_nob_transferred);
1848 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1849 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1850
1851 client_obd_list_lock(&cli->cl_loi_list_lock);
1852 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1853 * is called so we know whether to go to sync BRWs or wait for more
1854 * RPCs to complete */
1855 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1856 cli->cl_w_in_flight--;
1857 else
1858 cli->cl_r_in_flight--;
1859 osc_wake_cache_waiters(cli);
1860 client_obd_list_unlock(&cli->cl_loi_list_lock);
1861
c5c4c6fa 1862 osc_io_unplug(env, cli, NULL);
0a3bdb00 1863 return rc;
d7e09d03
PT
1864}
1865
d7e09d03
PT
1866/**
1867 * Build an RPC by the list of extent @ext_list. The caller must ensure
1868 * that the total pages in this list are NOT over max pages per RPC.
1869 * Extents in the list must be in OES_RPC state.
1870 */
1871int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
c5c4c6fa 1872 struct list_head *ext_list, int cmd)
d7e09d03 1873{
29ac6840
CH
1874 struct ptlrpc_request *req = NULL;
1875 struct osc_extent *ext;
1876 struct brw_page **pga = NULL;
1877 struct osc_brw_async_args *aa = NULL;
1878 struct obdo *oa = NULL;
1879 struct osc_async_page *oap;
1880 struct osc_async_page *tmp;
1881 struct cl_req *clerq = NULL;
1882 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1883 struct ldlm_lock *lock = NULL;
1884 struct cl_req_attr *crattr = NULL;
1885 u64 starting_offset = OBD_OBJECT_EOF;
1886 u64 ending_offset = 0;
1887 int mpflag = 0;
1888 int mem_tight = 0;
1889 int page_count = 0;
1890 int i;
1891 int rc;
1892 struct ost_body *body;
d7e09d03 1893 LIST_HEAD(rpc_list);
d7e09d03 1894
d7e09d03
PT
1895 LASSERT(!list_empty(ext_list));
1896
1897 /* add pages into rpc_list to build BRW rpc */
1898 list_for_each_entry(ext, ext_list, oe_link) {
1899 LASSERT(ext->oe_state == OES_RPC);
1900 mem_tight |= ext->oe_memalloc;
1901 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1902 ++page_count;
1903 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1904 if (starting_offset > oap->oap_obj_off)
1905 starting_offset = oap->oap_obj_off;
1906 else
1907 LASSERT(oap->oap_page_off == 0);
1908 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1909 ending_offset = oap->oap_obj_off +
1910 oap->oap_count;
1911 else
1912 LASSERT(oap->oap_page_off + oap->oap_count ==
1913 PAGE_CACHE_SIZE);
1914 }
1915 }
1916
1917 if (mem_tight)
1918 mpflag = cfs_memory_pressure_get_and_set();
1919
7795178d 1920 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
3408e9ae 1921 if (!crattr) {
26c4ea46
TJ
1922 rc = -ENOMEM;
1923 goto out;
1924 }
cad6fafa 1925
7795178d 1926 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
26c4ea46
TJ
1927 if (pga == NULL) {
1928 rc = -ENOMEM;
1929 goto out;
1930 }
d7e09d03
PT
1931
1932 OBDO_ALLOC(oa);
26c4ea46
TJ
1933 if (oa == NULL) {
1934 rc = -ENOMEM;
1935 goto out;
1936 }
d7e09d03
PT
1937
1938 i = 0;
1939 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1940 struct cl_page *page = oap2cl_page(oap);
1941 if (clerq == NULL) {
1942 clerq = cl_req_alloc(env, page, crt,
cad6fafa 1943 1 /* only 1-object rpcs for now */);
26c4ea46
TJ
1944 if (IS_ERR(clerq)) {
1945 rc = PTR_ERR(clerq);
1946 goto out;
1947 }
d7e09d03
PT
1948 lock = oap->oap_ldlm_lock;
1949 }
1950 if (mem_tight)
1951 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1952 pga[i] = &oap->oap_brw_page;
1953 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1954 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
cad6fafa
BJ
1955 pga[i]->pg, page_index(oap->oap_page), oap,
1956 pga[i]->flag);
d7e09d03
PT
1957 i++;
1958 cl_req_page_add(env, clerq, page);
1959 }
1960
1961 /* always get the data for the obdo for the rpc */
1962 LASSERT(clerq != NULL);
cad6fafa
BJ
1963 crattr->cra_oa = oa;
1964 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
1965 if (lock) {
1966 oa->o_handle = lock->l_remote_handle;
1967 oa->o_valid |= OBD_MD_FLHANDLE;
1968 }
1969
1970 rc = cl_req_prep(env, clerq);
1971 if (rc != 0) {
1972 CERROR("cl_req_prep failed: %d\n", rc);
26c4ea46 1973 goto out;
d7e09d03
PT
1974 }
1975
1976 sort_brw_pages(pga, page_count);
1977 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
cad6fafa 1978 pga, &req, crattr->cra_capa, 1, 0);
d7e09d03
PT
1979 if (rc != 0) {
1980 CERROR("prep_req failed: %d\n", rc);
26c4ea46 1981 goto out;
d7e09d03
PT
1982 }
1983
d7e09d03
PT
1984 req->rq_interpret_reply = brw_interpret;
1985
1986 if (mem_tight != 0)
1987 req->rq_memalloc = 1;
1988
1989 /* Need to update the timestamps after the request is built in case
1990 * we race with setattr (locally or in queue at OST). If OST gets
1991 * later setattr before earlier BRW (as determined by the request xid),
1992 * the OST will not use BRW timestamps. Sadly, there is no obvious
1993 * way to do this in a single call. bug 10150 */
3ce08cd7
NY
1994 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1995 crattr->cra_oa = &body->oa;
cad6fafa 1996 cl_req_attr_set(env, clerq, crattr,
d7e09d03
PT
1997 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1998
cad6fafa 1999 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
2000
2001 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2002 aa = ptlrpc_req_async_args(req);
2003 INIT_LIST_HEAD(&aa->aa_oaps);
2004 list_splice_init(&rpc_list, &aa->aa_oaps);
2005 INIT_LIST_HEAD(&aa->aa_exts);
2006 list_splice_init(ext_list, &aa->aa_exts);
2007 aa->aa_clerq = clerq;
2008
2009 /* queued sync pages can be torn down while the pages
2010 * were between the pending list and the rpc */
2011 tmp = NULL;
2012 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2013 /* only one oap gets a request reference */
2014 if (tmp == NULL)
2015 tmp = oap;
2016 if (oap->oap_interrupted && !req->rq_intr) {
2017 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2018 oap, req);
2019 ptlrpc_mark_interrupted(req);
2020 }
2021 }
2022 if (tmp != NULL)
2023 tmp->oap_request = ptlrpc_request_addref(req);
2024
2025 client_obd_list_lock(&cli->cl_loi_list_lock);
2026 starting_offset >>= PAGE_CACHE_SHIFT;
2027 if (cmd == OBD_BRW_READ) {
2028 cli->cl_r_in_flight++;
2029 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2030 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2031 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2032 starting_offset + 1);
2033 } else {
2034 cli->cl_w_in_flight++;
2035 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2036 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2037 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2038 starting_offset + 1);
2039 }
2040 client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2043 page_count, aa, cli->cl_r_in_flight,
2044 cli->cl_w_in_flight);
2045
c5c4c6fa 2046 ptlrpcd_add_req(req);
d7e09d03 2047 rc = 0;
d7e09d03
PT
2048
2049out:
2050 if (mem_tight != 0)
2051 cfs_memory_pressure_restore(mpflag);
2052
cad6fafa
BJ
2053 if (crattr != NULL) {
2054 capa_put(crattr->cra_capa);
7795178d 2055 kfree(crattr);
cad6fafa
BJ
2056 }
2057
d7e09d03
PT
2058 if (rc != 0) {
2059 LASSERT(req == NULL);
2060
2061 if (oa)
2062 OBDO_FREE(oa);
59e267c0 2063 kfree(pga);
d7e09d03
PT
2064 /* this should happen rarely and is pretty bad, it makes the
2065 * pending list not follow the dirty order */
2066 while (!list_empty(ext_list)) {
2067 ext = list_entry(ext_list->next, struct osc_extent,
2068 oe_link);
2069 list_del_init(&ext->oe_link);
2070 osc_extent_finish(env, ext, 0, rc);
2071 }
2072 if (clerq && !IS_ERR(clerq))
2073 cl_req_completion(env, clerq, rc);
2074 }
0a3bdb00 2075 return rc;
d7e09d03
PT
2076}
2077
2078static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2079 struct ldlm_enqueue_info *einfo)
2080{
2081 void *data = einfo->ei_cbdata;
2082 int set = 0;
2083
2084 LASSERT(lock != NULL);
2085 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2086 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2087 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2088 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2089
2090 lock_res_and_lock(lock);
2091 spin_lock(&osc_ast_guard);
2092
2093 if (lock->l_ast_data == NULL)
2094 lock->l_ast_data = data;
2095 if (lock->l_ast_data == data)
2096 set = 1;
2097
2098 spin_unlock(&osc_ast_guard);
2099 unlock_res_and_lock(lock);
2100
2101 return set;
2102}
2103
2104static int osc_set_data_with_check(struct lustre_handle *lockh,
2105 struct ldlm_enqueue_info *einfo)
2106{
2107 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2108 int set = 0;
2109
2110 if (lock != NULL) {
2111 set = osc_set_lock_data_with_check(lock, einfo);
2112 LDLM_LOCK_PUT(lock);
2113 } else
2114 CERROR("lockh %p, data %p - client evicted?\n",
2115 lockh, einfo->ei_cbdata);
2116 return set;
2117}
2118
d7e09d03
PT
2119/* find any ldlm lock of the inode in osc
2120 * return 0 not find
2121 * 1 find one
2122 * < 0 error */
2123static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2124 ldlm_iterator_t replace, void *data)
2125{
2126 struct ldlm_res_id res_id;
2127 struct obd_device *obd = class_exp2obd(exp);
2128 int rc = 0;
2129
2130 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2131 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2132 if (rc == LDLM_ITER_STOP)
fbe7c6c7 2133 return 1;
d7e09d03 2134 if (rc == LDLM_ITER_CONTINUE)
fbe7c6c7
JL
2135 return 0;
2136 return rc;
d7e09d03
PT
2137}
2138
2139static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2140 obd_enqueue_update_f upcall, void *cookie,
2141 __u64 *flags, int agl, int rc)
2142{
2143 int intent = *flags & LDLM_FL_HAS_INTENT;
d7e09d03
PT
2144
2145 if (intent) {
2146 /* The request was created before ldlm_cli_enqueue call. */
2147 if (rc == ELDLM_LOCK_ABORTED) {
2148 struct ldlm_reply *rep;
2149 rep = req_capsule_server_get(&req->rq_pill,
2150 &RMF_DLM_REP);
2151
2152 LASSERT(rep != NULL);
2d58de78
LW
2153 rep->lock_policy_res1 =
2154 ptlrpc_status_ntoh(rep->lock_policy_res1);
d7e09d03
PT
2155 if (rep->lock_policy_res1)
2156 rc = rep->lock_policy_res1;
2157 }
2158 }
2159
2160 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2161 (rc == 0)) {
2162 *flags |= LDLM_FL_LVB_READY;
1d8cb70c 2163 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
d7e09d03
PT
2164 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2165 }
2166
2167 /* Call the update callback. */
2168 rc = (*upcall)(cookie, rc);
0a3bdb00 2169 return rc;
d7e09d03
PT
2170}
2171
2172static int osc_enqueue_interpret(const struct lu_env *env,
2173 struct ptlrpc_request *req,
2174 struct osc_enqueue_args *aa, int rc)
2175{
2176 struct ldlm_lock *lock;
2177 struct lustre_handle handle;
2178 __u32 mode;
2179 struct ost_lvb *lvb;
2180 __u32 lvb_len;
2181 __u64 *flags = aa->oa_flags;
2182
2183 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2184 * might be freed anytime after lock upcall has been called. */
2185 lustre_handle_copy(&handle, aa->oa_lockh);
2186 mode = aa->oa_ei->ei_mode;
2187
2188 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2189 * be valid. */
2190 lock = ldlm_handle2lock(&handle);
2191
2192 /* Take an additional reference so that a blocking AST that
2193 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2194 * to arrive after an upcall has been executed by
2195 * osc_enqueue_fini(). */
2196 ldlm_lock_addref(&handle, mode);
2197
2198 /* Let CP AST to grant the lock first. */
2199 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2200
2201 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2202 lvb = NULL;
2203 lvb_len = 0;
2204 } else {
2205 lvb = aa->oa_lvb;
2206 lvb_len = sizeof(*aa->oa_lvb);
2207 }
2208
2209 /* Complete obtaining the lock procedure. */
2210 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2211 mode, flags, lvb, lvb_len, &handle, rc);
2212 /* Complete osc stuff. */
2213 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2214 flags, aa->oa_agl, rc);
2215
2216 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2217
2218 /* Release the lock for async request. */
2219 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2220 /*
2221 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2222 * not already released by
2223 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2224 */
2225 ldlm_lock_decref(&handle, mode);
2226
2227 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2228 aa->oa_lockh, req, aa);
2229 ldlm_lock_decref(&handle, mode);
2230 LDLM_LOCK_PUT(lock);
2231 return rc;
2232}
2233
d7e09d03
PT
2234struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2235
2236/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2237 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2238 * other synchronous requests, however keeping some locks and trying to obtain
2239 * others may take a considerable amount of time in a case of ost failure; and
2240 * when other sync requests do not get released lock from a client, the client
2241 * is excluded from the cluster -- such scenarious make the life difficult, so
2242 * release locks just after they are obtained. */
2243int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2244 __u64 *flags, ldlm_policy_data_t *policy,
2245 struct ost_lvb *lvb, int kms_valid,
2246 obd_enqueue_update_f upcall, void *cookie,
2247 struct ldlm_enqueue_info *einfo,
2248 struct lustre_handle *lockh,
2249 struct ptlrpc_request_set *rqset, int async, int agl)
2250{
2251 struct obd_device *obd = exp->exp_obd;
2252 struct ptlrpc_request *req = NULL;
2253 int intent = *flags & LDLM_FL_HAS_INTENT;
875332d4 2254 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
d7e09d03
PT
2255 ldlm_mode_t mode;
2256 int rc;
d7e09d03
PT
2257
2258 /* Filesystem lock extents are extended to page boundaries so that
2259 * dealing with the page cache is a little smoother. */
2260 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2261 policy->l_extent.end |= ~CFS_PAGE_MASK;
2262
2263 /*
2264 * kms is not valid when either object is completely fresh (so that no
2265 * locks are cached), or object was evicted. In the latter case cached
2266 * lock cannot be used, because it would prime inode state with
2267 * potentially stale LVB.
2268 */
2269 if (!kms_valid)
2270 goto no_match;
2271
2272 /* Next, search for already existing extent locks that will cover us */
2273 /* If we're trying to read, we also search for an existing PW lock. The
2274 * VFS and page cache already protect us locally, so lots of readers/
2275 * writers can share a single PW lock.
2276 *
2277 * There are problems with conversion deadlocks, so instead of
2278 * converting a read lock to a write lock, we'll just enqueue a new
2279 * one.
2280 *
2281 * At some point we should cancel the read lock instead of making them
2282 * send us a blocking callback, but there are problems with canceling
2283 * locks out from other users right now, too. */
2284 mode = einfo->ei_mode;
2285 if (einfo->ei_mode == LCK_PR)
2286 mode |= LCK_PW;
2287 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2288 einfo->ei_type, policy, mode, lockh, 0);
2289 if (mode) {
2290 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2291
2292 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2293 /* For AGL, if enqueue RPC is sent but the lock is not
2294 * granted, then skip to process this strpe.
2295 * Return -ECANCELED to tell the caller. */
2296 ldlm_lock_decref(lockh, mode);
2297 LDLM_LOCK_PUT(matched);
0a3bdb00 2298 return -ECANCELED;
71e8dd9a
AM
2299 }
2300
2301 if (osc_set_lock_data_with_check(matched, einfo)) {
d7e09d03
PT
2302 *flags |= LDLM_FL_LVB_READY;
2303 /* addref the lock only if not async requests and PW
2304 * lock is matched whereas we asked for PR. */
2305 if (!rqset && einfo->ei_mode != mode)
2306 ldlm_lock_addref(lockh, LCK_PR);
2307 if (intent) {
2308 /* I would like to be able to ASSERT here that
2309 * rss <= kms, but I can't, for reasons which
2310 * are explained in lov_enqueue() */
2311 }
2312
2313 /* We already have a lock, and it's referenced.
2314 *
2315 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2316 * AGL upcall may change it to CLS_HELD directly. */
2317 (*upcall)(cookie, ELDLM_OK);
2318
2319 if (einfo->ei_mode != mode)
2320 ldlm_lock_decref(lockh, LCK_PW);
2321 else if (rqset)
2322 /* For async requests, decref the lock. */
2323 ldlm_lock_decref(lockh, einfo->ei_mode);
2324 LDLM_LOCK_PUT(matched);
0a3bdb00 2325 return ELDLM_OK;
d7e09d03 2326 }
71e8dd9a
AM
2327
2328 ldlm_lock_decref(lockh, mode);
2329 LDLM_LOCK_PUT(matched);
d7e09d03
PT
2330 }
2331
2332 no_match:
2333 if (intent) {
2334 LIST_HEAD(cancels);
2335 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2336 &RQF_LDLM_ENQUEUE_LVB);
2337 if (req == NULL)
0a3bdb00 2338 return -ENOMEM;
d7e09d03
PT
2339
2340 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2341 if (rc) {
2342 ptlrpc_request_free(req);
0a3bdb00 2343 return rc;
d7e09d03
PT
2344 }
2345
2346 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
ec83e611 2347 sizeof(*lvb));
d7e09d03
PT
2348 ptlrpc_request_set_replen(req);
2349 }
2350
2351 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2352 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2353
2354 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2355 sizeof(*lvb), LVB_T_OST, lockh, async);
2356 if (rqset) {
2357 if (!rc) {
2358 struct osc_enqueue_args *aa;
2359 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2360 aa = ptlrpc_req_async_args(req);
2361 aa->oa_ei = einfo;
2362 aa->oa_exp = exp;
2363 aa->oa_flags = flags;
2364 aa->oa_upcall = upcall;
2365 aa->oa_cookie = cookie;
2366 aa->oa_lvb = lvb;
2367 aa->oa_lockh = lockh;
2368 aa->oa_agl = !!agl;
2369
2370 req->rq_interpret_reply =
2371 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2372 if (rqset == PTLRPCD_SET)
c5c4c6fa 2373 ptlrpcd_add_req(req);
d7e09d03
PT
2374 else
2375 ptlrpc_set_add_req(rqset, req);
2376 } else if (intent) {
2377 ptlrpc_req_finished(req);
2378 }
0a3bdb00 2379 return rc;
d7e09d03
PT
2380 }
2381
2382 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2383 if (intent)
2384 ptlrpc_req_finished(req);
2385
0a3bdb00 2386 return rc;
d7e09d03
PT
2387}
2388
d7e09d03
PT
2389int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2390 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
875332d4 2391 __u64 *flags, void *data, struct lustre_handle *lockh,
d7e09d03
PT
2392 int unref)
2393{
2394 struct obd_device *obd = exp->exp_obd;
875332d4 2395 __u64 lflags = *flags;
d7e09d03 2396 ldlm_mode_t rc;
d7e09d03
PT
2397
2398 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
0a3bdb00 2399 return -EIO;
d7e09d03
PT
2400
2401 /* Filesystem lock extents are extended to page boundaries so that
2402 * dealing with the page cache is a little smoother */
2403 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2404 policy->l_extent.end |= ~CFS_PAGE_MASK;
2405
2406 /* Next, search for already existing extent locks that will cover us */
2407 /* If we're trying to read, we also search for an existing PW lock. The
2408 * VFS and page cache already protect us locally, so lots of readers/
2409 * writers can share a single PW lock. */
2410 rc = mode;
2411 if (mode == LCK_PR)
2412 rc |= LCK_PW;
2413 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2414 res_id, type, policy, rc, lockh, unref);
2415 if (rc) {
2416 if (data != NULL) {
2417 if (!osc_set_data_with_check(lockh, data)) {
2418 if (!(lflags & LDLM_FL_TEST_LOCK))
2419 ldlm_lock_decref(lockh, rc);
0a3bdb00 2420 return 0;
d7e09d03
PT
2421 }
2422 }
2423 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2424 ldlm_lock_addref(lockh, LCK_PR);
2425 ldlm_lock_decref(lockh, LCK_PW);
2426 }
0a3bdb00 2427 return rc;
d7e09d03 2428 }
0a3bdb00 2429 return rc;
d7e09d03
PT
2430}
2431
2432int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2433{
d7e09d03
PT
2434 if (unlikely(mode == LCK_GROUP))
2435 ldlm_lock_decref_and_cancel(lockh, mode);
2436 else
2437 ldlm_lock_decref(lockh, mode);
2438
0a3bdb00 2439 return 0;
d7e09d03
PT
2440}
2441
d7e09d03
PT
2442static int osc_statfs_interpret(const struct lu_env *env,
2443 struct ptlrpc_request *req,
2444 struct osc_async_args *aa, int rc)
2445{
2446 struct obd_statfs *msfs;
d7e09d03
PT
2447
2448 if (rc == -EBADR)
2449 /* The request has in fact never been sent
2450 * due to issues at a higher level (LOV).
2451 * Exit immediately since the caller is
2452 * aware of the problem and takes care
2453 * of the clean up */
0a3bdb00 2454 return rc;
d7e09d03
PT
2455
2456 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
26c4ea46
TJ
2457 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2458 rc = 0;
2459 goto out;
2460 }
d7e09d03
PT
2461
2462 if (rc != 0)
26c4ea46 2463 goto out;
d7e09d03
PT
2464
2465 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2466 if (msfs == NULL) {
26c4ea46
TJ
2467 rc = -EPROTO;
2468 goto out;
d7e09d03
PT
2469 }
2470
2471 *aa->aa_oi->oi_osfs = *msfs;
2472out:
2473 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 2474 return rc;
d7e09d03
PT
2475}
2476
2477static int osc_statfs_async(struct obd_export *exp,
2478 struct obd_info *oinfo, __u64 max_age,
2479 struct ptlrpc_request_set *rqset)
2480{
29ac6840 2481 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
2482 struct ptlrpc_request *req;
2483 struct osc_async_args *aa;
29ac6840 2484 int rc;
d7e09d03
PT
2485
2486 /* We could possibly pass max_age in the request (as an absolute
2487 * timestamp or a "seconds.usec ago") so the target can avoid doing
2488 * extra calls into the filesystem if that isn't necessary (e.g.
2489 * during mount that would help a bit). Having relative timestamps
2490 * is not so great if request processing is slow, while absolute
2491 * timestamps are not ideal because they need time synchronization. */
2492 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2493 if (req == NULL)
0a3bdb00 2494 return -ENOMEM;
d7e09d03
PT
2495
2496 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2497 if (rc) {
2498 ptlrpc_request_free(req);
0a3bdb00 2499 return rc;
d7e09d03
PT
2500 }
2501 ptlrpc_request_set_replen(req);
2502 req->rq_request_portal = OST_CREATE_PORTAL;
2503 ptlrpc_at_set_req_timeout(req);
2504
2505 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2506 /* procfs requests not want stat in wait for avoid deadlock */
2507 req->rq_no_resend = 1;
2508 req->rq_no_delay = 1;
2509 }
2510
2511 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2512 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2513 aa = ptlrpc_req_async_args(req);
2514 aa->aa_oi = oinfo;
2515
2516 ptlrpc_set_add_req(rqset, req);
0a3bdb00 2517 return 0;
d7e09d03
PT
2518}
2519
2520static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2521 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2522{
29ac6840
CH
2523 struct obd_device *obd = class_exp2obd(exp);
2524 struct obd_statfs *msfs;
d7e09d03 2525 struct ptlrpc_request *req;
29ac6840 2526 struct obd_import *imp = NULL;
d7e09d03 2527 int rc;
d7e09d03
PT
2528
2529 /*Since the request might also come from lprocfs, so we need
2530 *sync this with client_disconnect_export Bug15684*/
2531 down_read(&obd->u.cli.cl_sem);
2532 if (obd->u.cli.cl_import)
2533 imp = class_import_get(obd->u.cli.cl_import);
2534 up_read(&obd->u.cli.cl_sem);
2535 if (!imp)
0a3bdb00 2536 return -ENODEV;
d7e09d03
PT
2537
2538 /* We could possibly pass max_age in the request (as an absolute
2539 * timestamp or a "seconds.usec ago") so the target can avoid doing
2540 * extra calls into the filesystem if that isn't necessary (e.g.
2541 * during mount that would help a bit). Having relative timestamps
2542 * is not so great if request processing is slow, while absolute
2543 * timestamps are not ideal because they need time synchronization. */
2544 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2545
2546 class_import_put(imp);
2547
2548 if (req == NULL)
0a3bdb00 2549 return -ENOMEM;
d7e09d03
PT
2550
2551 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2552 if (rc) {
2553 ptlrpc_request_free(req);
0a3bdb00 2554 return rc;
d7e09d03
PT
2555 }
2556 ptlrpc_request_set_replen(req);
2557 req->rq_request_portal = OST_CREATE_PORTAL;
2558 ptlrpc_at_set_req_timeout(req);
2559
2560 if (flags & OBD_STATFS_NODELAY) {
2561 /* procfs requests not want stat in wait for avoid deadlock */
2562 req->rq_no_resend = 1;
2563 req->rq_no_delay = 1;
2564 }
2565
2566 rc = ptlrpc_queue_wait(req);
2567 if (rc)
26c4ea46 2568 goto out;
d7e09d03
PT
2569
2570 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2571 if (msfs == NULL) {
26c4ea46
TJ
2572 rc = -EPROTO;
2573 goto out;
d7e09d03
PT
2574 }
2575
2576 *osfs = *msfs;
2577
d7e09d03
PT
2578 out:
2579 ptlrpc_req_finished(req);
2580 return rc;
2581}
2582
2583/* Retrieve object striping information.
2584 *
2585 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2586 * the maximum number of OST indices which will fit in the user buffer.
2587 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2588 */
2589static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2590{
2591 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2592 struct lov_user_md_v3 lum, *lumk;
2593 struct lov_user_ost_data_v1 *lmm_objects;
2594 int rc = 0, lum_size;
d7e09d03
PT
2595
2596 if (!lsm)
0a3bdb00 2597 return -ENODATA;
d7e09d03
PT
2598
2599 /* we only need the header part from user space to get lmm_magic and
2600 * lmm_stripe_count, (the header part is common to v1 and v3) */
2601 lum_size = sizeof(struct lov_user_md_v1);
2602 if (copy_from_user(&lum, lump, lum_size))
0a3bdb00 2603 return -EFAULT;
d7e09d03
PT
2604
2605 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2606 (lum.lmm_magic != LOV_USER_MAGIC_V3))
0a3bdb00 2607 return -EINVAL;
d7e09d03
PT
2608
2609 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2610 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2611 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2612 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2613
2614 /* we can use lov_mds_md_size() to compute lum_size
2615 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2616 if (lum.lmm_stripe_count > 0) {
2617 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
7795178d 2618 lumk = kzalloc(lum_size, GFP_NOFS);
d7e09d03 2619 if (!lumk)
0a3bdb00 2620 return -ENOMEM;
d7e09d03
PT
2621
2622 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2623 lmm_objects =
2624 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2625 else
2626 lmm_objects = &(lumk->lmm_objects[0]);
2627 lmm_objects->l_ost_oi = lsm->lsm_oi;
2628 } else {
2629 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2630 lumk = &lum;
2631 }
2632
2633 lumk->lmm_oi = lsm->lsm_oi;
2634 lumk->lmm_stripe_count = 1;
2635
2636 if (copy_to_user(lump, lumk, lum_size))
2637 rc = -EFAULT;
2638
2639 if (lumk != &lum)
7795178d 2640 kfree(lumk);
d7e09d03 2641
0a3bdb00 2642 return rc;
d7e09d03
PT
2643}
2644
2645
2646static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2647 void *karg, void *uarg)
2648{
2649 struct obd_device *obd = exp->exp_obd;
2650 struct obd_ioctl_data *data = karg;
2651 int err = 0;
d7e09d03
PT
2652
2653 if (!try_module_get(THIS_MODULE)) {
2654 CERROR("Can't get module. Is it alive?");
2655 return -EINVAL;
2656 }
2657 switch (cmd) {
2658 case OBD_IOC_LOV_GET_CONFIG: {
2659 char *buf;
2660 struct lov_desc *desc;
2661 struct obd_uuid uuid;
2662
2663 buf = NULL;
2664 len = 0;
b7856753 2665 if (obd_ioctl_getdata(&buf, &len, uarg)) {
26c4ea46
TJ
2666 err = -EINVAL;
2667 goto out;
2668 }
d7e09d03
PT
2669
2670 data = (struct obd_ioctl_data *)buf;
2671
2672 if (sizeof(*desc) > data->ioc_inllen1) {
2673 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2674 err = -EINVAL;
2675 goto out;
d7e09d03
PT
2676 }
2677
2678 if (data->ioc_inllen2 < sizeof(uuid)) {
2679 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2680 err = -EINVAL;
2681 goto out;
d7e09d03
PT
2682 }
2683
2684 desc = (struct lov_desc *)data->ioc_inlbuf1;
2685 desc->ld_tgt_count = 1;
2686 desc->ld_active_tgt_count = 1;
2687 desc->ld_default_stripe_count = 1;
2688 desc->ld_default_stripe_size = 0;
2689 desc->ld_default_stripe_offset = 0;
2690 desc->ld_pattern = 0;
2691 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2692
2693 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2694
b7856753 2695 err = copy_to_user(uarg, buf, len);
d7e09d03
PT
2696 if (err)
2697 err = -EFAULT;
2698 obd_ioctl_freedata(buf, len);
26c4ea46 2699 goto out;
d7e09d03
PT
2700 }
2701 case LL_IOC_LOV_SETSTRIPE:
2702 err = obd_alloc_memmd(exp, karg);
2703 if (err > 0)
2704 err = 0;
26c4ea46 2705 goto out;
d7e09d03
PT
2706 case LL_IOC_LOV_GETSTRIPE:
2707 err = osc_getstripe(karg, uarg);
26c4ea46 2708 goto out;
d7e09d03
PT
2709 case OBD_IOC_CLIENT_RECOVER:
2710 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2711 data->ioc_inlbuf1, 0);
2712 if (err > 0)
2713 err = 0;
26c4ea46 2714 goto out;
d7e09d03
PT
2715 case IOC_OSC_SET_ACTIVE:
2716 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2717 data->ioc_offset);
26c4ea46 2718 goto out;
d7e09d03
PT
2719 case OBD_IOC_POLL_QUOTACHECK:
2720 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
26c4ea46 2721 goto out;
d7e09d03
PT
2722 case OBD_IOC_PING_TARGET:
2723 err = ptlrpc_obd_ping(obd);
26c4ea46 2724 goto out;
d7e09d03
PT
2725 default:
2726 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2727 cmd, current_comm());
26c4ea46
TJ
2728 err = -ENOTTY;
2729 goto out;
d7e09d03
PT
2730 }
2731out:
2732 module_put(THIS_MODULE);
2733 return err;
2734}
2735
2736static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2737 u32 keylen, void *key, __u32 *vallen, void *val,
d7e09d03
PT
2738 struct lov_stripe_md *lsm)
2739{
d7e09d03 2740 if (!vallen || !val)
0a3bdb00 2741 return -EFAULT;
d7e09d03
PT
2742
2743 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2744 __u32 *stripe = val;
2745 *vallen = sizeof(*stripe);
2746 *stripe = 0;
0a3bdb00 2747 return 0;
d7e09d03
PT
2748 } else if (KEY_IS(KEY_LAST_ID)) {
2749 struct ptlrpc_request *req;
29ac6840
CH
2750 u64 *reply;
2751 char *tmp;
2752 int rc;
d7e09d03
PT
2753
2754 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2755 &RQF_OST_GET_INFO_LAST_ID);
2756 if (req == NULL)
0a3bdb00 2757 return -ENOMEM;
d7e09d03
PT
2758
2759 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2760 RCL_CLIENT, keylen);
2761 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2762 if (rc) {
2763 ptlrpc_request_free(req);
0a3bdb00 2764 return rc;
d7e09d03
PT
2765 }
2766
2767 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2768 memcpy(tmp, key, keylen);
2769
2770 req->rq_no_delay = req->rq_no_resend = 1;
2771 ptlrpc_request_set_replen(req);
2772 rc = ptlrpc_queue_wait(req);
2773 if (rc)
26c4ea46 2774 goto out;
d7e09d03
PT
2775
2776 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
26c4ea46
TJ
2777 if (reply == NULL) {
2778 rc = -EPROTO;
2779 goto out;
2780 }
d7e09d03 2781
21aef7d9 2782 *((u64 *)val) = *reply;
d7e09d03
PT
2783 out:
2784 ptlrpc_req_finished(req);
0a3bdb00 2785 return rc;
d7e09d03 2786 } else if (KEY_IS(KEY_FIEMAP)) {
9d865439
AB
2787 struct ll_fiemap_info_key *fm_key =
2788 (struct ll_fiemap_info_key *)key;
29ac6840
CH
2789 struct ldlm_res_id res_id;
2790 ldlm_policy_data_t policy;
2791 struct lustre_handle lockh;
2792 ldlm_mode_t mode = 0;
2793 struct ptlrpc_request *req;
2794 struct ll_user_fiemap *reply;
2795 char *tmp;
2796 int rc;
9d865439
AB
2797
2798 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2799 goto skip_locking;
2800
2801 policy.l_extent.start = fm_key->fiemap.fm_start &
2802 CFS_PAGE_MASK;
2803
2804 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2805 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2806 policy.l_extent.end = OBD_OBJECT_EOF;
2807 else
2808 policy.l_extent.end = (fm_key->fiemap.fm_start +
2809 fm_key->fiemap.fm_length +
2810 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2811
2812 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2813 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2814 LDLM_FL_BLOCK_GRANTED |
2815 LDLM_FL_LVB_READY,
2816 &res_id, LDLM_EXTENT, &policy,
2817 LCK_PR | LCK_PW, &lockh, 0);
2818 if (mode) { /* lock is cached on client */
2819 if (mode != LCK_PR) {
2820 ldlm_lock_addref(&lockh, LCK_PR);
2821 ldlm_lock_decref(&lockh, LCK_PW);
2822 }
2823 } else { /* no cached lock, needs acquire lock on server side */
2824 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2825 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2826 }
d7e09d03 2827
9d865439 2828skip_locking:
d7e09d03
PT
2829 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2830 &RQF_OST_GET_INFO_FIEMAP);
26c4ea46
TJ
2831 if (req == NULL) {
2832 rc = -ENOMEM;
2833 goto drop_lock;
2834 }
d7e09d03
PT
2835
2836 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2837 RCL_CLIENT, keylen);
2838 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2839 RCL_CLIENT, *vallen);
2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2841 RCL_SERVER, *vallen);
2842
2843 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2844 if (rc) {
2845 ptlrpc_request_free(req);
26c4ea46 2846 goto drop_lock;
d7e09d03
PT
2847 }
2848
2849 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2850 memcpy(tmp, key, keylen);
2851 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2852 memcpy(tmp, val, *vallen);
2853
2854 ptlrpc_request_set_replen(req);
2855 rc = ptlrpc_queue_wait(req);
2856 if (rc)
26c4ea46 2857 goto fini_req;
d7e09d03
PT
2858
2859 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
26c4ea46
TJ
2860 if (reply == NULL) {
2861 rc = -EPROTO;
2862 goto fini_req;
2863 }
d7e09d03
PT
2864
2865 memcpy(val, reply, *vallen);
9d865439 2866fini_req:
d7e09d03 2867 ptlrpc_req_finished(req);
9d865439
AB
2868drop_lock:
2869 if (mode)
2870 ldlm_lock_decref(&lockh, LCK_PR);
0a3bdb00 2871 return rc;
d7e09d03
PT
2872 }
2873
0a3bdb00 2874 return -EINVAL;
d7e09d03
PT
2875}
2876
2877static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2878 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
2879 void *val, struct ptlrpc_request_set *set)
2880{
2881 struct ptlrpc_request *req;
29ac6840
CH
2882 struct obd_device *obd = exp->exp_obd;
2883 struct obd_import *imp = class_exp2cliimp(exp);
2884 char *tmp;
2885 int rc;
d7e09d03
PT
2886
2887 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2888
2889 if (KEY_IS(KEY_CHECKSUM)) {
2890 if (vallen != sizeof(int))
0a3bdb00 2891 return -EINVAL;
d7e09d03 2892 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
0a3bdb00 2893 return 0;
d7e09d03
PT
2894 }
2895
2896 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2897 sptlrpc_conf_client_adapt(obd);
0a3bdb00 2898 return 0;
d7e09d03
PT
2899 }
2900
2901 if (KEY_IS(KEY_FLUSH_CTX)) {
2902 sptlrpc_import_flush_my_ctx(imp);
0a3bdb00 2903 return 0;
d7e09d03
PT
2904 }
2905
2906 if (KEY_IS(KEY_CACHE_SET)) {
2907 struct client_obd *cli = &obd->u.cli;
2908
2909 LASSERT(cli->cl_cache == NULL); /* only once */
2910 cli->cl_cache = (struct cl_client_cache *)val;
2911 atomic_inc(&cli->cl_cache->ccc_users);
2912 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2913
2914 /* add this osc into entity list */
2915 LASSERT(list_empty(&cli->cl_lru_osc));
2916 spin_lock(&cli->cl_cache->ccc_lru_lock);
2917 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2918 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2919
0a3bdb00 2920 return 0;
d7e09d03
PT
2921 }
2922
2923 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2924 struct client_obd *cli = &obd->u.cli;
2925 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2926 int target = *(int *)val;
2927
2928 nr = osc_lru_shrink(cli, min(nr, target));
2929 *(int *)val -= nr;
0a3bdb00 2930 return 0;
d7e09d03
PT
2931 }
2932
2933 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
0a3bdb00 2934 return -EINVAL;
d7e09d03
PT
2935
2936 /* We pass all other commands directly to OST. Since nobody calls osc
2937 methods directly and everybody is supposed to go through LOV, we
2938 assume lov checked invalid values for us.
2939 The only recognised values so far are evict_by_nid and mds_conn.
2940 Even if something bad goes through, we'd get a -EINVAL from OST
2941 anyway. */
2942
2943 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2944 &RQF_OST_SET_GRANT_INFO :
2945 &RQF_OBD_SET_INFO);
2946 if (req == NULL)
0a3bdb00 2947 return -ENOMEM;
d7e09d03
PT
2948
2949 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2950 RCL_CLIENT, keylen);
2951 if (!KEY_IS(KEY_GRANT_SHRINK))
2952 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2953 RCL_CLIENT, vallen);
2954 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2955 if (rc) {
2956 ptlrpc_request_free(req);
0a3bdb00 2957 return rc;
d7e09d03
PT
2958 }
2959
2960 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2961 memcpy(tmp, key, keylen);
2962 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2963 &RMF_OST_BODY :
2964 &RMF_SETINFO_VAL);
2965 memcpy(tmp, val, vallen);
2966
2967 if (KEY_IS(KEY_GRANT_SHRINK)) {
f024bad4 2968 struct osc_brw_async_args *aa;
d7e09d03
PT
2969 struct obdo *oa;
2970
2971 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2972 aa = ptlrpc_req_async_args(req);
2973 OBDO_ALLOC(oa);
2974 if (!oa) {
2975 ptlrpc_req_finished(req);
0a3bdb00 2976 return -ENOMEM;
d7e09d03
PT
2977 }
2978 *oa = ((struct ost_body *)val)->oa;
2979 aa->aa_oa = oa;
2980 req->rq_interpret_reply = osc_shrink_grant_interpret;
2981 }
2982
2983 ptlrpc_request_set_replen(req);
2984 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2985 LASSERT(set != NULL);
2986 ptlrpc_set_add_req(set, req);
2987 ptlrpc_check_set(NULL, set);
c5c4c6fa
OW
2988 } else {
2989 ptlrpcd_add_req(req);
2990 }
d7e09d03 2991
0a3bdb00 2992 return 0;
d7e09d03
PT
2993}
2994
d7e09d03
PT
2995static int osc_reconnect(const struct lu_env *env,
2996 struct obd_export *exp, struct obd_device *obd,
2997 struct obd_uuid *cluuid,
2998 struct obd_connect_data *data,
2999 void *localdata)
3000{
3001 struct client_obd *cli = &obd->u.cli;
3002
3003 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3004 long lost_grant;
3005
3006 client_obd_list_lock(&cli->cl_loi_list_lock);
3007 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3008 2 * cli_brw_size(obd);
3009 lost_grant = cli->cl_lost_grant;
3010 cli->cl_lost_grant = 0;
3011 client_obd_list_unlock(&cli->cl_loi_list_lock);
3012
2d00bd17
JP
3013 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3014 data->ocd_connect_flags,
d7e09d03
PT
3015 data->ocd_version, data->ocd_grant, lost_grant);
3016 }
3017
0a3bdb00 3018 return 0;
d7e09d03
PT
3019}
3020
3021static int osc_disconnect(struct obd_export *exp)
3022{
3023 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
3024 int rc;
3025
d7e09d03
PT
3026 rc = client_disconnect_export(exp);
3027 /**
3028 * Initially we put del_shrink_grant before disconnect_export, but it
3029 * causes the following problem if setup (connect) and cleanup
3030 * (disconnect) are tangled together.
3031 * connect p1 disconnect p2
3032 * ptlrpc_connect_import
3033 * ............... class_manual_cleanup
3034 * osc_disconnect
3035 * del_shrink_grant
3036 * ptlrpc_connect_interrupt
3037 * init_grant_shrink
3038 * add this client to shrink list
3039 * cleanup_osc
3040 * Bang! pinger trigger the shrink.
3041 * So the osc should be disconnected from the shrink list, after we
3042 * are sure the import has been destroyed. BUG18662
3043 */
3044 if (obd->u.cli.cl_import == NULL)
3045 osc_del_shrink_grant(&obd->u.cli);
3046 return rc;
3047}
3048
3049static int osc_import_event(struct obd_device *obd,
3050 struct obd_import *imp,
3051 enum obd_import_event event)
3052{
3053 struct client_obd *cli;
3054 int rc = 0;
3055
d7e09d03
PT
3056 LASSERT(imp->imp_obd == obd);
3057
3058 switch (event) {
3059 case IMP_EVENT_DISCON: {
3060 cli = &obd->u.cli;
3061 client_obd_list_lock(&cli->cl_loi_list_lock);
3062 cli->cl_avail_grant = 0;
3063 cli->cl_lost_grant = 0;
3064 client_obd_list_unlock(&cli->cl_loi_list_lock);
3065 break;
3066 }
3067 case IMP_EVENT_INACTIVE: {
3068 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3069 break;
3070 }
3071 case IMP_EVENT_INVALIDATE: {
3072 struct ldlm_namespace *ns = obd->obd_namespace;
29ac6840
CH
3073 struct lu_env *env;
3074 int refcheck;
d7e09d03
PT
3075
3076 env = cl_env_get(&refcheck);
3077 if (!IS_ERR(env)) {
3078 /* Reset grants */
3079 cli = &obd->u.cli;
3080 /* all pages go to failing rpcs due to the invalid
3081 * import */
c5c4c6fa 3082 osc_io_unplug(env, cli, NULL);
d7e09d03
PT
3083
3084 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3085 cl_env_put(env, &refcheck);
3086 } else
3087 rc = PTR_ERR(env);
3088 break;
3089 }
3090 case IMP_EVENT_ACTIVE: {
3091 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3092 break;
3093 }
3094 case IMP_EVENT_OCD: {
3095 struct obd_connect_data *ocd = &imp->imp_connect_data;
3096
3097 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3098 osc_init_grant(&obd->u.cli, ocd);
3099
3100 /* See bug 7198 */
3101 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
b2952d62 3102 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
d7e09d03
PT
3103
3104 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3105 break;
3106 }
3107 case IMP_EVENT_DEACTIVATE: {
3108 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3109 break;
3110 }
3111 case IMP_EVENT_ACTIVATE: {
3112 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3113 break;
3114 }
3115 default:
3116 CERROR("Unknown import event %d\n", event);
3117 LBUG();
3118 }
0a3bdb00 3119 return rc;
d7e09d03
PT
3120}
3121
3122/**
3123 * Determine whether the lock can be canceled before replaying the lock
3124 * during recovery, see bug16774 for detailed information.
3125 *
3126 * \retval zero the lock can't be canceled
3127 * \retval other ok to cancel
3128 */
3129static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3130{
3131 check_res_locked(lock->l_resource);
3132
3133 /*
3134 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3135 *
3136 * XXX as a future improvement, we can also cancel unused write lock
3137 * if it doesn't have dirty data and active mmaps.
3138 */
3139 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3140 (lock->l_granted_mode == LCK_PR ||
3141 lock->l_granted_mode == LCK_CR) &&
3142 (osc_dlm_lock_pageref(lock) == 0))
0a3bdb00 3143 return 1;
d7e09d03 3144
0a3bdb00 3145 return 0;
d7e09d03
PT
3146}
3147
3148static int brw_queue_work(const struct lu_env *env, void *data)
3149{
3150 struct client_obd *cli = data;
3151
3152 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3153
c5c4c6fa 3154 osc_io_unplug(env, cli, NULL);
0a3bdb00 3155 return 0;
d7e09d03
PT
3156}
3157
3158int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3159{
ea7893bb 3160 struct lprocfs_static_vars lvars = { NULL };
29ac6840
CH
3161 struct client_obd *cli = &obd->u.cli;
3162 void *handler;
3163 int rc;
aefd9d71
LX
3164 int adding;
3165 int added;
3166 int req_count;
d7e09d03
PT
3167
3168 rc = ptlrpcd_addref();
3169 if (rc)
0a3bdb00 3170 return rc;
d7e09d03
PT
3171
3172 rc = client_obd_setup(obd, lcfg);
3173 if (rc)
26c4ea46 3174 goto out_ptlrpcd;
d7e09d03
PT
3175
3176 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
26c4ea46
TJ
3177 if (IS_ERR(handler)) {
3178 rc = PTR_ERR(handler);
3179 goto out_client_setup;
3180 }
d7e09d03
PT
3181 cli->cl_writeback_work = handler;
3182
3183 rc = osc_quota_setup(obd);
3184 if (rc)
26c4ea46 3185 goto out_ptlrpcd_work;
d7e09d03
PT
3186
3187 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188 lprocfs_osc_init_vars(&lvars);
9b801302 3189 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
d7e09d03
PT
3190 lproc_osc_attach_seqstat(obd);
3191 sptlrpc_lprocfs_cliobd_attach(obd);
3192 ptlrpc_lprocfs_register_obd(obd);
3193 }
3194
aefd9d71
LX
3195 /*
3196 * We try to control the total number of requests with a upper limit
3197 * osc_reqpool_maxreqcount. There might be some race which will cause
3198 * over-limit allocation, but it is fine.
3199 */
3200 req_count = atomic_read(&osc_pool_req_count);
3201 if (req_count < osc_reqpool_maxreqcount) {
3202 adding = cli->cl_max_rpcs_in_flight + 2;
3203 if (req_count + adding > osc_reqpool_maxreqcount)
3204 adding = osc_reqpool_maxreqcount - req_count;
3205
3206 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3207 atomic_add(added, &osc_pool_req_count);
3208 }
d7e09d03
PT
3209
3210 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3211 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
0a3bdb00 3212 return rc;
d7e09d03
PT
3213
3214out_ptlrpcd_work:
3215 ptlrpcd_destroy_work(handler);
3216out_client_setup:
3217 client_obd_cleanup(obd);
3218out_ptlrpcd:
3219 ptlrpcd_decref();
0a3bdb00 3220 return rc;
d7e09d03
PT
3221}
3222
3223static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3224{
d7e09d03
PT
3225 switch (stage) {
3226 case OBD_CLEANUP_EARLY: {
3227 struct obd_import *imp;
3228 imp = obd->u.cli.cl_import;
3229 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3230 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3231 ptlrpc_deactivate_import(imp);
3232 spin_lock(&imp->imp_lock);
3233 imp->imp_pingable = 0;
3234 spin_unlock(&imp->imp_lock);
3235 break;
3236 }
3237 case OBD_CLEANUP_EXPORTS: {
3238 struct client_obd *cli = &obd->u.cli;
3239 /* LU-464
3240 * for echo client, export may be on zombie list, wait for
3241 * zombie thread to cull it, because cli.cl_import will be
3242 * cleared in client_disconnect_export():
3243 * class_export_destroy() -> obd_cleanup() ->
3244 * echo_device_free() -> echo_client_cleanup() ->
3245 * obd_disconnect() -> osc_disconnect() ->
3246 * client_disconnect_export()
3247 */
3248 obd_zombie_barrier();
3249 if (cli->cl_writeback_work) {
3250 ptlrpcd_destroy_work(cli->cl_writeback_work);
3251 cli->cl_writeback_work = NULL;
3252 }
3253 obd_cleanup_client_import(obd);
3254 ptlrpc_lprocfs_unregister_obd(obd);
3255 lprocfs_obd_cleanup(obd);
d7e09d03
PT
3256 break;
3257 }
3258 }
41f8d410 3259 return 0;
d7e09d03
PT
3260}
3261
3262int osc_cleanup(struct obd_device *obd)
3263{
3264 struct client_obd *cli = &obd->u.cli;
3265 int rc;
3266
d7e09d03
PT
3267 /* lru cleanup */
3268 if (cli->cl_cache != NULL) {
3269 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3270 spin_lock(&cli->cl_cache->ccc_lru_lock);
3271 list_del_init(&cli->cl_lru_osc);
3272 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3273 cli->cl_lru_left = NULL;
3274 atomic_dec(&cli->cl_cache->ccc_users);
3275 cli->cl_cache = NULL;
3276 }
3277
3278 /* free memory of osc quota cache */
3279 osc_quota_cleanup(obd);
3280
3281 rc = client_obd_cleanup(obd);
3282
3283 ptlrpcd_decref();
0a3bdb00 3284 return rc;
d7e09d03
PT
3285}
3286
3287int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3288{
ea7893bb 3289 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3290 int rc = 0;
3291
3292 lprocfs_osc_init_vars(&lvars);
3293
3294 switch (lcfg->lcfg_command) {
3295 default:
3296 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3297 lcfg, obd);
3298 if (rc > 0)
3299 rc = 0;
3300 break;
3301 }
3302
fbe7c6c7 3303 return rc;
d7e09d03
PT
3304}
3305
21aef7d9 3306static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
3307{
3308 return osc_process_config_base(obd, buf);
3309}
3310
3311struct obd_ops osc_obd_ops = {
3312 .o_owner = THIS_MODULE,
3313 .o_setup = osc_setup,
3314 .o_precleanup = osc_precleanup,
3315 .o_cleanup = osc_cleanup,
3316 .o_add_conn = client_import_add_conn,
3317 .o_del_conn = client_import_del_conn,
3318 .o_connect = client_connect_import,
3319 .o_reconnect = osc_reconnect,
3320 .o_disconnect = osc_disconnect,
3321 .o_statfs = osc_statfs,
3322 .o_statfs_async = osc_statfs_async,
3323 .o_packmd = osc_packmd,
3324 .o_unpackmd = osc_unpackmd,
3325 .o_create = osc_create,
3326 .o_destroy = osc_destroy,
3327 .o_getattr = osc_getattr,
3328 .o_getattr_async = osc_getattr_async,
3329 .o_setattr = osc_setattr,
3330 .o_setattr_async = osc_setattr_async,
d7e09d03 3331 .o_find_cbdata = osc_find_cbdata,
d7e09d03
PT
3332 .o_iocontrol = osc_iocontrol,
3333 .o_get_info = osc_get_info,
3334 .o_set_info_async = osc_set_info_async,
3335 .o_import_event = osc_import_event,
d7e09d03
PT
3336 .o_process_config = osc_process_config,
3337 .o_quotactl = osc_quotactl,
3338 .o_quotacheck = osc_quotacheck,
3339};
3340
3341extern struct lu_kmem_descr osc_caches[];
3342extern spinlock_t osc_ast_guard;
3343extern struct lock_class_key osc_ast_guard_class;
3344
b47ea4bb 3345static int __init osc_init(void)
d7e09d03 3346{
ea7893bb 3347 struct lprocfs_static_vars lvars = { NULL };
aefd9d71
LX
3348 unsigned int reqpool_size;
3349 unsigned int reqsize;
d7e09d03 3350 int rc;
d7e09d03
PT
3351
3352 /* print an address of _any_ initialized kernel symbol from this
3353 * module, to allow debugging with gdb that doesn't support data
3354 * symbols from modules.*/
3355 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3356
3357 rc = lu_kmem_init(osc_caches);
a55e0f44 3358 if (rc)
0a3bdb00 3359 return rc;
d7e09d03
PT
3360
3361 lprocfs_osc_init_vars(&lvars);
3362
2962b440 3363 rc = class_register_type(&osc_obd_ops, NULL,
d7e09d03 3364 LUSTRE_OSC_NAME, &osc_device_type);
aefd9d71
LX
3365 if (rc)
3366 goto out_kmem;
d7e09d03
PT
3367
3368 spin_lock_init(&osc_ast_guard);
3369 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3370
aefd9d71
LX
3371 /* This is obviously too much memory, only prevent overflow here */
3372 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3373 rc = -EINVAL;
3374 goto out_type;
3375 }
3376
3377 reqpool_size = osc_reqpool_mem_max << 20;
3378
3379 reqsize = 1;
3380 while (reqsize < OST_MAXREQSIZE)
3381 reqsize = reqsize << 1;
3382
3383 /*
3384 * We don't enlarge the request count in OSC pool according to
3385 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3386 * tried after normal allocation failed. So a small OSC pool won't
3387 * cause much performance degression in most of cases.
3388 */
3389 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3390
3391 atomic_set(&osc_pool_req_count, 0);
3392 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3393 ptlrpc_add_rqs_to_pool);
3394
3395 if (osc_rq_pool)
3396 return 0;
3397
3398 rc = -ENOMEM;
3399
3400out_type:
3401 class_unregister_type(LUSTRE_OSC_NAME);
3402out_kmem:
3403 lu_kmem_fini(osc_caches);
0a3bdb00 3404 return rc;
d7e09d03
PT
3405}
3406
3407static void /*__exit*/ osc_exit(void)
3408{
3409 class_unregister_type(LUSTRE_OSC_NAME);
3410 lu_kmem_fini(osc_caches);
aefd9d71 3411 ptlrpc_free_rq_pool(osc_rq_pool);
d7e09d03
PT
3412}
3413
3414MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3415MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3416MODULE_LICENSE("GPL");
6960736c 3417MODULE_VERSION(LUSTRE_VERSION_STRING);
d7e09d03 3418
6960736c
GKH
3419module_init(osc_init);
3420module_exit(osc_exit);
This page took 0.540387 seconds and 5 git commands to generate.