f41f762573f554c8385a7f13152f2c02a2a8c548
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "../include/obd.h"
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66 struct obdo *aa_oa;
67 int aa_requested_nob;
68 int aa_nio_count;
69 u32 aa_page_count;
70 int aa_resends;
71 struct brw_page **aa_ppga;
72 struct client_obd *aa_cli;
73 struct list_head aa_oaps;
74 struct list_head aa_exts;
75 struct obd_capa *aa_ocapa;
76 struct cl_req *aa_clerq;
77 };
78
79 struct osc_async_args {
80 struct obd_info *aa_oi;
81 };
82
83 struct osc_setattr_args {
84 struct obdo *sa_oa;
85 obd_enqueue_update_f sa_upcall;
86 void *sa_cookie;
87 };
88
89 struct osc_fsync_args {
90 struct obd_info *fa_oi;
91 obd_enqueue_update_f fa_upcall;
92 void *fa_cookie;
93 };
94
95 struct osc_enqueue_args {
96 struct obd_export *oa_exp;
97 __u64 *oa_flags;
98 obd_enqueue_update_f oa_upcall;
99 void *oa_cookie;
100 struct ost_lvb *oa_lvb;
101 struct lustre_handle *oa_lockh;
102 struct ldlm_enqueue_info *oa_ei;
103 unsigned int oa_agl:1;
104 };
105
106 static void osc_release_ppga(struct brw_page **ppga, u32 count);
107 static int brw_interpret(const struct lu_env *env,
108 struct ptlrpc_request *req, void *data, int rc);
109 int osc_cleanup(struct obd_device *obd);
110
111 /* Pack OSC object metadata for disk storage (LE byte order). */
112 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
113 struct lov_stripe_md *lsm)
114 {
115 int lmm_size;
116
117 lmm_size = sizeof(**lmmp);
118 if (lmmp == NULL)
119 return lmm_size;
120
121 if (*lmmp != NULL && lsm == NULL) {
122 kfree(*lmmp);
123 *lmmp = NULL;
124 return 0;
125 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
126 return -EBADF;
127 }
128
129 if (*lmmp == NULL) {
130 *lmmp = kzalloc(lmm_size, GFP_NOFS);
131 if (!*lmmp)
132 return -ENOMEM;
133 }
134
135 if (lsm)
136 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
137
138 return lmm_size;
139 }
140
141 /* Unpack OSC object metadata from disk storage (LE byte order). */
142 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
143 struct lov_mds_md *lmm, int lmm_bytes)
144 {
145 int lsm_size;
146 struct obd_import *imp = class_exp2cliimp(exp);
147
148 if (lmm != NULL) {
149 if (lmm_bytes < sizeof(*lmm)) {
150 CERROR("%s: lov_mds_md too small: %d, need %d\n",
151 exp->exp_obd->obd_name, lmm_bytes,
152 (int)sizeof(*lmm));
153 return -EINVAL;
154 }
155 /* XXX LOV_MAGIC etc check? */
156
157 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
158 CERROR("%s: zero lmm_object_id: rc = %d\n",
159 exp->exp_obd->obd_name, -EINVAL);
160 return -EINVAL;
161 }
162 }
163
164 lsm_size = lov_stripe_md_size(1);
165 if (lsmp == NULL)
166 return lsm_size;
167
168 if (*lsmp != NULL && lmm == NULL) {
169 kfree((*lsmp)->lsm_oinfo[0]);
170 kfree(*lsmp);
171 *lsmp = NULL;
172 return 0;
173 }
174
175 if (*lsmp == NULL) {
176 *lsmp = kzalloc(lsm_size, GFP_NOFS);
177 if (unlikely(*lsmp == NULL))
178 return -ENOMEM;
179 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
180 GFP_NOFS);
181 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
182 kfree(*lsmp);
183 return -ENOMEM;
184 }
185 loi_init((*lsmp)->lsm_oinfo[0]);
186 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
187 return -EBADF;
188 }
189
190 if (lmm != NULL)
191 /* XXX zero *lsmp? */
192 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
193
194 if (imp != NULL &&
195 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
196 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
197 else
198 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
199
200 return lsm_size;
201 }
202
203 static inline void osc_pack_capa(struct ptlrpc_request *req,
204 struct ost_body *body, void *capa)
205 {
206 struct obd_capa *oc = (struct obd_capa *)capa;
207 struct lustre_capa *c;
208
209 if (!capa)
210 return;
211
212 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
213 LASSERT(c);
214 capa_cpy(c, oc);
215 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
216 DEBUG_CAPA(D_SEC, c, "pack");
217 }
218
219 static inline void osc_pack_req_body(struct ptlrpc_request *req,
220 struct obd_info *oinfo)
221 {
222 struct ost_body *body;
223
224 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
225 LASSERT(body);
226
227 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
228 oinfo->oi_oa);
229 osc_pack_capa(req, body, oinfo->oi_capa);
230 }
231
232 static inline void osc_set_capa_size(struct ptlrpc_request *req,
233 const struct req_msg_field *field,
234 struct obd_capa *oc)
235 {
236 if (oc == NULL)
237 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
238 else
239 /* it is already calculated as sizeof struct obd_capa */
240 ;
241 }
242
243 static int osc_getattr_interpret(const struct lu_env *env,
244 struct ptlrpc_request *req,
245 struct osc_async_args *aa, int rc)
246 {
247 struct ost_body *body;
248
249 if (rc != 0)
250 goto out;
251
252 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
253 if (body) {
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
256 aa->aa_oi->oi_oa, &body->oa);
257
258 /* This should really be sent by the OST */
259 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
260 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
261 } else {
262 CDEBUG(D_INFO, "can't unpack ost_body\n");
263 rc = -EPROTO;
264 aa->aa_oi->oi_oa->o_valid = 0;
265 }
266 out:
267 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
268 return rc;
269 }
270
271 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
272 struct ptlrpc_request_set *set)
273 {
274 struct ptlrpc_request *req;
275 struct osc_async_args *aa;
276 int rc;
277
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279 if (req == NULL)
280 return -ENOMEM;
281
282 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 if (rc) {
285 ptlrpc_request_free(req);
286 return rc;
287 }
288
289 osc_pack_req_body(req, oinfo);
290
291 ptlrpc_request_set_replen(req);
292 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
293
294 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
295 aa = ptlrpc_req_async_args(req);
296 aa->aa_oi = oinfo;
297
298 ptlrpc_set_add_req(set, req);
299 return 0;
300 }
301
302 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
303 struct obd_info *oinfo)
304 {
305 struct ptlrpc_request *req;
306 struct ost_body *body;
307 int rc;
308
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
310 if (req == NULL)
311 return -ENOMEM;
312
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
315 if (rc) {
316 ptlrpc_request_free(req);
317 return rc;
318 }
319
320 osc_pack_req_body(req, oinfo);
321
322 ptlrpc_request_set_replen(req);
323
324 rc = ptlrpc_queue_wait(req);
325 if (rc)
326 goto out;
327
328 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
329 if (body == NULL) {
330 rc = -EPROTO;
331 goto out;
332 }
333
334 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
335 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
336 &body->oa);
337
338 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
339 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
340
341 out:
342 ptlrpc_req_finished(req);
343 return rc;
344 }
345
346 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
347 struct obd_info *oinfo, struct obd_trans_info *oti)
348 {
349 struct ptlrpc_request *req;
350 struct ost_body *body;
351 int rc;
352
353 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
354
355 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
356 if (req == NULL)
357 return -ENOMEM;
358
359 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
360 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
361 if (rc) {
362 ptlrpc_request_free(req);
363 return rc;
364 }
365
366 osc_pack_req_body(req, oinfo);
367
368 ptlrpc_request_set_replen(req);
369
370 rc = ptlrpc_queue_wait(req);
371 if (rc)
372 goto out;
373
374 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
375 if (body == NULL) {
376 rc = -EPROTO;
377 goto out;
378 }
379
380 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
381 &body->oa);
382
383 out:
384 ptlrpc_req_finished(req);
385 return rc;
386 }
387
388 static int osc_setattr_interpret(const struct lu_env *env,
389 struct ptlrpc_request *req,
390 struct osc_setattr_args *sa, int rc)
391 {
392 struct ost_body *body;
393
394 if (rc != 0)
395 goto out;
396
397 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
398 if (body == NULL) {
399 rc = -EPROTO;
400 goto out;
401 }
402
403 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
404 &body->oa);
405 out:
406 rc = sa->sa_upcall(sa->sa_cookie, rc);
407 return rc;
408 }
409
410 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
411 struct obd_trans_info *oti,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
414 {
415 struct ptlrpc_request *req;
416 struct osc_setattr_args *sa;
417 int rc;
418
419 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
420 if (req == NULL)
421 return -ENOMEM;
422
423 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
424 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
425 if (rc) {
426 ptlrpc_request_free(req);
427 return rc;
428 }
429
430 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
431 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
432
433 osc_pack_req_body(req, oinfo);
434
435 ptlrpc_request_set_replen(req);
436
437 /* do mds to ost setattr asynchronously */
438 if (!rqset) {
439 /* Do not wait for response. */
440 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
441 } else {
442 req->rq_interpret_reply =
443 (ptlrpc_interpterer_t)osc_setattr_interpret;
444
445 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
446 sa = ptlrpc_req_async_args(req);
447 sa->sa_oa = oinfo->oi_oa;
448 sa->sa_upcall = upcall;
449 sa->sa_cookie = cookie;
450
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
453 else
454 ptlrpc_set_add_req(rqset, req);
455 }
456
457 return 0;
458 }
459
460 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
461 struct obd_trans_info *oti,
462 struct ptlrpc_request_set *rqset)
463 {
464 return osc_setattr_async_base(exp, oinfo, oti,
465 oinfo->oi_cb_up, oinfo, rqset);
466 }
467
468 int osc_real_create(struct obd_export *exp, struct obdo *oa,
469 struct lov_stripe_md **ea, struct obd_trans_info *oti)
470 {
471 struct ptlrpc_request *req;
472 struct ost_body *body;
473 struct lov_stripe_md *lsm;
474 int rc;
475
476 LASSERT(oa);
477 LASSERT(ea);
478
479 lsm = *ea;
480 if (!lsm) {
481 rc = obd_alloc_memmd(exp, &lsm);
482 if (rc < 0)
483 return rc;
484 }
485
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
487 if (req == NULL) {
488 rc = -ENOMEM;
489 goto out;
490 }
491
492 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
493 if (rc) {
494 ptlrpc_request_free(req);
495 goto out;
496 }
497
498 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
499 LASSERT(body);
500
501 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
502
503 ptlrpc_request_set_replen(req);
504
505 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
506 oa->o_flags == OBD_FL_DELORPHAN) {
507 DEBUG_REQ(D_HA, req,
508 "delorphan from OST integration");
509 /* Don't resend the delorphan req */
510 req->rq_no_resend = req->rq_no_delay = 1;
511 }
512
513 rc = ptlrpc_queue_wait(req);
514 if (rc)
515 goto out_req;
516
517 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
518 if (body == NULL) {
519 rc = -EPROTO;
520 goto out_req;
521 }
522
523 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
524 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
525
526 oa->o_blksize = cli_brw_size(exp->exp_obd);
527 oa->o_valid |= OBD_MD_FLBLKSZ;
528
529 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
530 * have valid lsm_oinfo data structs, so don't go touching that.
531 * This needs to be fixed in a big way.
532 */
533 lsm->lsm_oi = oa->o_oi;
534 *ea = lsm;
535
536 if (oti != NULL) {
537 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
538
539 if (oa->o_valid & OBD_MD_FLCOOKIE) {
540 if (!oti->oti_logcookies)
541 oti_alloc_cookies(oti, 1);
542 *oti->oti_logcookies = oa->o_lcookie;
543 }
544 }
545
546 CDEBUG(D_HA, "transno: %lld\n",
547 lustre_msg_get_transno(req->rq_repmsg));
548 out_req:
549 ptlrpc_req_finished(req);
550 out:
551 if (rc && !*ea)
552 obd_free_memmd(exp, &lsm);
553 return rc;
554 }
555
556 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
557 obd_enqueue_update_f upcall, void *cookie,
558 struct ptlrpc_request_set *rqset)
559 {
560 struct ptlrpc_request *req;
561 struct osc_setattr_args *sa;
562 struct ost_body *body;
563 int rc;
564
565 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
566 if (req == NULL)
567 return -ENOMEM;
568
569 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
570 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
571 if (rc) {
572 ptlrpc_request_free(req);
573 return rc;
574 }
575 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
576 ptlrpc_at_set_req_timeout(req);
577
578 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
579 LASSERT(body);
580 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
581 oinfo->oi_oa);
582 osc_pack_capa(req, body, oinfo->oi_capa);
583
584 ptlrpc_request_set_replen(req);
585
586 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
587 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
588 sa = ptlrpc_req_async_args(req);
589 sa->sa_oa = oinfo->oi_oa;
590 sa->sa_upcall = upcall;
591 sa->sa_cookie = cookie;
592 if (rqset == PTLRPCD_SET)
593 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
594 else
595 ptlrpc_set_add_req(rqset, req);
596
597 return 0;
598 }
599
600 static int osc_sync_interpret(const struct lu_env *env,
601 struct ptlrpc_request *req,
602 void *arg, int rc)
603 {
604 struct osc_fsync_args *fa = arg;
605 struct ost_body *body;
606
607 if (rc)
608 goto out;
609
610 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
611 if (body == NULL) {
612 CERROR("can't unpack ost_body\n");
613 rc = -EPROTO;
614 goto out;
615 }
616
617 *fa->fa_oi->oi_oa = body->oa;
618 out:
619 rc = fa->fa_upcall(fa->fa_cookie, rc);
620 return rc;
621 }
622
623 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
624 obd_enqueue_update_f upcall, void *cookie,
625 struct ptlrpc_request_set *rqset)
626 {
627 struct ptlrpc_request *req;
628 struct ost_body *body;
629 struct osc_fsync_args *fa;
630 int rc;
631
632 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
633 if (req == NULL)
634 return -ENOMEM;
635
636 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
637 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
638 if (rc) {
639 ptlrpc_request_free(req);
640 return rc;
641 }
642
643 /* overload the size and blocks fields in the oa with start/end */
644 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
645 LASSERT(body);
646 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
647 oinfo->oi_oa);
648 osc_pack_capa(req, body, oinfo->oi_capa);
649
650 ptlrpc_request_set_replen(req);
651 req->rq_interpret_reply = osc_sync_interpret;
652
653 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
654 fa = ptlrpc_req_async_args(req);
655 fa->fa_oi = oinfo;
656 fa->fa_upcall = upcall;
657 fa->fa_cookie = cookie;
658
659 if (rqset == PTLRPCD_SET)
660 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
661 else
662 ptlrpc_set_add_req(rqset, req);
663
664 return 0;
665 }
666
667 /* Find and cancel locally locks matched by @mode in the resource found by
668 * @objid. Found locks are added into @cancel list. Returns the amount of
669 * locks added to @cancels list. */
670 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
671 struct list_head *cancels,
672 ldlm_mode_t mode, __u64 lock_flags)
673 {
674 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
675 struct ldlm_res_id res_id;
676 struct ldlm_resource *res;
677 int count;
678
679 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
680 * export) but disabled through procfs (flag in NS).
681 *
682 * This distinguishes from a case when ELC is not supported originally,
683 * when we still want to cancel locks in advance and just cancel them
684 * locally, without sending any RPC. */
685 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
686 return 0;
687
688 ostid_build_res_name(&oa->o_oi, &res_id);
689 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
690 if (res == NULL)
691 return 0;
692
693 LDLM_RESOURCE_ADDREF(res);
694 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
695 lock_flags, 0, NULL);
696 LDLM_RESOURCE_DELREF(res);
697 ldlm_resource_putref(res);
698 return count;
699 }
700
701 static int osc_destroy_interpret(const struct lu_env *env,
702 struct ptlrpc_request *req, void *data,
703 int rc)
704 {
705 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
706
707 atomic_dec(&cli->cl_destroy_in_flight);
708 wake_up(&cli->cl_destroy_waitq);
709 return 0;
710 }
711
712 static int osc_can_send_destroy(struct client_obd *cli)
713 {
714 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
715 cli->cl_max_rpcs_in_flight) {
716 /* The destroy request can be sent */
717 return 1;
718 }
719 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
720 cli->cl_max_rpcs_in_flight) {
721 /*
722 * The counter has been modified between the two atomic
723 * operations.
724 */
725 wake_up(&cli->cl_destroy_waitq);
726 }
727 return 0;
728 }
729
730 int osc_create(const struct lu_env *env, struct obd_export *exp,
731 struct obdo *oa, struct lov_stripe_md **ea,
732 struct obd_trans_info *oti)
733 {
734 int rc = 0;
735
736 LASSERT(oa);
737 LASSERT(ea);
738 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
739
740 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
741 oa->o_flags == OBD_FL_RECREATE_OBJS) {
742 return osc_real_create(exp, oa, ea, oti);
743 }
744
745 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
746 return osc_real_create(exp, oa, ea, oti);
747
748 /* we should not get here anymore */
749 LBUG();
750
751 return rc;
752 }
753
754 /* Destroy requests can be async always on the client, and we don't even really
755 * care about the return code since the client cannot do anything at all about
756 * a destroy failure.
757 * When the MDS is unlinking a filename, it saves the file objects into a
758 * recovery llog, and these object records are cancelled when the OST reports
759 * they were destroyed and sync'd to disk (i.e. transaction committed).
760 * If the client dies, or the OST is down when the object should be destroyed,
761 * the records are not cancelled, and when the OST reconnects to the MDS next,
762 * it will retrieve the llog unlink logs and then sends the log cancellation
763 * cookies to the MDS after committing destroy transactions. */
764 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
765 struct obdo *oa, struct lov_stripe_md *ea,
766 struct obd_trans_info *oti, struct obd_export *md_export,
767 void *capa)
768 {
769 struct client_obd *cli = &exp->exp_obd->u.cli;
770 struct ptlrpc_request *req;
771 struct ost_body *body;
772 LIST_HEAD(cancels);
773 int rc, count;
774
775 if (!oa) {
776 CDEBUG(D_INFO, "oa NULL\n");
777 return -EINVAL;
778 }
779
780 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
781 LDLM_FL_DISCARD_DATA);
782
783 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
784 if (req == NULL) {
785 ldlm_lock_list_put(&cancels, l_bl_ast, count);
786 return -ENOMEM;
787 }
788
789 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
790 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
791 0, &cancels, count);
792 if (rc) {
793 ptlrpc_request_free(req);
794 return rc;
795 }
796
797 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
798 ptlrpc_at_set_req_timeout(req);
799
800 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
801 oa->o_lcookie = *oti->oti_logcookies;
802 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
803 LASSERT(body);
804 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
805
806 osc_pack_capa(req, body, (struct obd_capa *)capa);
807 ptlrpc_request_set_replen(req);
808
809 /* If osc_destroy is for destroying the unlink orphan,
810 * sent from MDT to OST, which should not be blocked here,
811 * because the process might be triggered by ptlrpcd, and
812 * it is not good to block ptlrpcd thread (b=16006)*/
813 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
814 req->rq_interpret_reply = osc_destroy_interpret;
815 if (!osc_can_send_destroy(cli)) {
816 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
817 NULL);
818
819 /*
820 * Wait until the number of on-going destroy RPCs drops
821 * under max_rpc_in_flight
822 */
823 l_wait_event_exclusive(cli->cl_destroy_waitq,
824 osc_can_send_destroy(cli), &lwi);
825 }
826 }
827
828 /* Do not wait for response */
829 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
830 return 0;
831 }
832
833 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
834 long writing_bytes)
835 {
836 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
837
838 LASSERT(!(oa->o_valid & bits));
839
840 oa->o_valid |= bits;
841 client_obd_list_lock(&cli->cl_loi_list_lock);
842 oa->o_dirty = cli->cl_dirty;
843 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
844 cli->cl_dirty_max)) {
845 CERROR("dirty %lu - %lu > dirty_max %lu\n",
846 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
847 oa->o_undirty = 0;
848 } else if (unlikely(atomic_read(&obd_dirty_pages) -
849 atomic_read(&obd_dirty_transit_pages) >
850 (long)(obd_max_dirty_pages + 1))) {
851 /* The atomic_read() allowing the atomic_inc() are
852 * not covered by a lock thus they may safely race and trip
853 * this CERROR() unless we add in a small fudge factor (+1). */
854 CERROR("dirty %d - %d > system dirty_max %d\n",
855 atomic_read(&obd_dirty_pages),
856 atomic_read(&obd_dirty_transit_pages),
857 obd_max_dirty_pages);
858 oa->o_undirty = 0;
859 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
860 CERROR("dirty %lu - dirty_max %lu too big???\n",
861 cli->cl_dirty, cli->cl_dirty_max);
862 oa->o_undirty = 0;
863 } else {
864 long max_in_flight = (cli->cl_max_pages_per_rpc <<
865 PAGE_CACHE_SHIFT)*
866 (cli->cl_max_rpcs_in_flight + 1);
867 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
868 }
869 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
870 oa->o_dropped = cli->cl_lost_grant;
871 cli->cl_lost_grant = 0;
872 client_obd_list_unlock(&cli->cl_loi_list_lock);
873 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
874 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
875
876 }
877
878 void osc_update_next_shrink(struct client_obd *cli)
879 {
880 cli->cl_next_shrink_grant =
881 cfs_time_shift(cli->cl_grant_shrink_interval);
882 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
883 cli->cl_next_shrink_grant);
884 }
885
886 static void __osc_update_grant(struct client_obd *cli, u64 grant)
887 {
888 client_obd_list_lock(&cli->cl_loi_list_lock);
889 cli->cl_avail_grant += grant;
890 client_obd_list_unlock(&cli->cl_loi_list_lock);
891 }
892
893 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
894 {
895 if (body->oa.o_valid & OBD_MD_FLGRANT) {
896 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
897 __osc_update_grant(cli, body->oa.o_grant);
898 }
899 }
900
901 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
902 u32 keylen, void *key, u32 vallen,
903 void *val, struct ptlrpc_request_set *set);
904
905 static int osc_shrink_grant_interpret(const struct lu_env *env,
906 struct ptlrpc_request *req,
907 void *aa, int rc)
908 {
909 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
910 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
911 struct ost_body *body;
912
913 if (rc != 0) {
914 __osc_update_grant(cli, oa->o_grant);
915 goto out;
916 }
917
918 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
919 LASSERT(body);
920 osc_update_grant(cli, body);
921 out:
922 OBDO_FREE(oa);
923 return rc;
924 }
925
926 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
927 {
928 client_obd_list_lock(&cli->cl_loi_list_lock);
929 oa->o_grant = cli->cl_avail_grant / 4;
930 cli->cl_avail_grant -= oa->o_grant;
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
932 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
933 oa->o_valid |= OBD_MD_FLFLAGS;
934 oa->o_flags = 0;
935 }
936 oa->o_flags |= OBD_FL_SHRINK_GRANT;
937 osc_update_next_shrink(cli);
938 }
939
940 /* Shrink the current grant, either from some large amount to enough for a
941 * full set of in-flight RPCs, or if we have already shrunk to that limit
942 * then to enough for a single RPC. This avoids keeping more grant than
943 * needed, and avoids shrinking the grant piecemeal. */
944 static int osc_shrink_grant(struct client_obd *cli)
945 {
946 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
947 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
948
949 client_obd_list_lock(&cli->cl_loi_list_lock);
950 if (cli->cl_avail_grant <= target_bytes)
951 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
952 client_obd_list_unlock(&cli->cl_loi_list_lock);
953
954 return osc_shrink_grant_to_target(cli, target_bytes);
955 }
956
957 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
958 {
959 int rc = 0;
960 struct ost_body *body;
961
962 client_obd_list_lock(&cli->cl_loi_list_lock);
963 /* Don't shrink if we are already above or below the desired limit
964 * We don't want to shrink below a single RPC, as that will negatively
965 * impact block allocation and long-term performance. */
966 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
967 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
968
969 if (target_bytes >= cli->cl_avail_grant) {
970 client_obd_list_unlock(&cli->cl_loi_list_lock);
971 return 0;
972 }
973 client_obd_list_unlock(&cli->cl_loi_list_lock);
974
975 body = kzalloc(sizeof(*body), GFP_NOFS);
976 if (!body)
977 return -ENOMEM;
978
979 osc_announce_cached(cli, &body->oa, 0);
980
981 client_obd_list_lock(&cli->cl_loi_list_lock);
982 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
983 cli->cl_avail_grant = target_bytes;
984 client_obd_list_unlock(&cli->cl_loi_list_lock);
985 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
986 body->oa.o_valid |= OBD_MD_FLFLAGS;
987 body->oa.o_flags = 0;
988 }
989 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
990 osc_update_next_shrink(cli);
991
992 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
993 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
994 sizeof(*body), body, NULL);
995 if (rc != 0)
996 __osc_update_grant(cli, body->oa.o_grant);
997 kfree(body);
998 return rc;
999 }
1000
1001 static int osc_should_shrink_grant(struct client_obd *client)
1002 {
1003 unsigned long time = cfs_time_current();
1004 unsigned long next_shrink = client->cl_next_shrink_grant;
1005
1006 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1007 OBD_CONNECT_GRANT_SHRINK) == 0)
1008 return 0;
1009
1010 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1011 /* Get the current RPC size directly, instead of going via:
1012 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1013 * Keep comment here so that it can be found by searching. */
1014 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1015
1016 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1017 client->cl_avail_grant > brw_size)
1018 return 1;
1019
1020 osc_update_next_shrink(client);
1021 }
1022 return 0;
1023 }
1024
1025 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1026 {
1027 struct client_obd *client;
1028
1029 list_for_each_entry(client, &item->ti_obd_list,
1030 cl_grant_shrink_list) {
1031 if (osc_should_shrink_grant(client))
1032 osc_shrink_grant(client);
1033 }
1034 return 0;
1035 }
1036
1037 static int osc_add_shrink_grant(struct client_obd *client)
1038 {
1039 int rc;
1040
1041 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1042 TIMEOUT_GRANT,
1043 osc_grant_shrink_grant_cb, NULL,
1044 &client->cl_grant_shrink_list);
1045 if (rc) {
1046 CERROR("add grant client %s error %d\n",
1047 client->cl_import->imp_obd->obd_name, rc);
1048 return rc;
1049 }
1050 CDEBUG(D_CACHE, "add grant client %s \n",
1051 client->cl_import->imp_obd->obd_name);
1052 osc_update_next_shrink(client);
1053 return 0;
1054 }
1055
1056 static int osc_del_shrink_grant(struct client_obd *client)
1057 {
1058 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1059 TIMEOUT_GRANT);
1060 }
1061
1062 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1063 {
1064 /*
1065 * ocd_grant is the total grant amount we're expect to hold: if we've
1066 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1067 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1068 *
1069 * race is tolerable here: if we're evicted, but imp_state already
1070 * left EVICTED state, then cl_dirty must be 0 already.
1071 */
1072 client_obd_list_lock(&cli->cl_loi_list_lock);
1073 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1074 cli->cl_avail_grant = ocd->ocd_grant;
1075 else
1076 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1077
1078 if (cli->cl_avail_grant < 0) {
1079 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1080 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1081 ocd->ocd_grant, cli->cl_dirty);
1082 /* workaround for servers which do not have the patch from
1083 * LU-2679 */
1084 cli->cl_avail_grant = ocd->ocd_grant;
1085 }
1086
1087 /* determine the appropriate chunk size used by osc_extent. */
1088 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1089 client_obd_list_unlock(&cli->cl_loi_list_lock);
1090
1091 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1092 cli->cl_import->imp_obd->obd_name,
1093 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1094
1095 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1096 list_empty(&cli->cl_grant_shrink_list))
1097 osc_add_shrink_grant(cli);
1098 }
1099
1100 /* We assume that the reason this OSC got a short read is because it read
1101 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1102 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1103 * this stripe never got written at or beyond this stripe offset yet. */
1104 static void handle_short_read(int nob_read, u32 page_count,
1105 struct brw_page **pga)
1106 {
1107 char *ptr;
1108 int i = 0;
1109
1110 /* skip bytes read OK */
1111 while (nob_read > 0) {
1112 LASSERT(page_count > 0);
1113
1114 if (pga[i]->count > nob_read) {
1115 /* EOF inside this page */
1116 ptr = kmap(pga[i]->pg) +
1117 (pga[i]->off & ~CFS_PAGE_MASK);
1118 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1119 kunmap(pga[i]->pg);
1120 page_count--;
1121 i++;
1122 break;
1123 }
1124
1125 nob_read -= pga[i]->count;
1126 page_count--;
1127 i++;
1128 }
1129
1130 /* zero remaining pages */
1131 while (page_count-- > 0) {
1132 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1133 memset(ptr, 0, pga[i]->count);
1134 kunmap(pga[i]->pg);
1135 i++;
1136 }
1137 }
1138
1139 static int check_write_rcs(struct ptlrpc_request *req,
1140 int requested_nob, int niocount,
1141 u32 page_count, struct brw_page **pga)
1142 {
1143 int i;
1144 __u32 *remote_rcs;
1145
1146 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1147 sizeof(*remote_rcs) *
1148 niocount);
1149 if (remote_rcs == NULL) {
1150 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1151 return -EPROTO;
1152 }
1153
1154 /* return error if any niobuf was in error */
1155 for (i = 0; i < niocount; i++) {
1156 if ((int)remote_rcs[i] < 0)
1157 return remote_rcs[i];
1158
1159 if (remote_rcs[i] != 0) {
1160 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1161 i, remote_rcs[i], req);
1162 return -EPROTO;
1163 }
1164 }
1165
1166 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1167 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1168 req->rq_bulk->bd_nob_transferred, requested_nob);
1169 return -EPROTO;
1170 }
1171
1172 return 0;
1173 }
1174
1175 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1176 {
1177 if (p1->flag != p2->flag) {
1178 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1179 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1180
1181 /* warn if we try to combine flags that we don't know to be
1182 * safe to combine */
1183 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1184 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1185 p1->flag, p2->flag);
1186 }
1187 return 0;
1188 }
1189
1190 return (p1->off + p1->count == p2->off);
1191 }
1192
1193 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1194 struct brw_page **pga, int opc,
1195 cksum_type_t cksum_type)
1196 {
1197 __u32 cksum;
1198 int i = 0;
1199 struct cfs_crypto_hash_desc *hdesc;
1200 unsigned int bufsize;
1201 int err;
1202 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1203
1204 LASSERT(pg_count > 0);
1205
1206 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207 if (IS_ERR(hdesc)) {
1208 CERROR("Unable to initialize checksum hash %s\n",
1209 cfs_crypto_hash_name(cfs_alg));
1210 return PTR_ERR(hdesc);
1211 }
1212
1213 while (nob > 0 && pg_count > 0) {
1214 int count = pga[i]->count > nob ? nob : pga[i]->count;
1215
1216 /* corrupt the data before we compute the checksum, to
1217 * simulate an OST->client data error */
1218 if (i == 0 && opc == OST_READ &&
1219 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1220 unsigned char *ptr = kmap(pga[i]->pg);
1221 int off = pga[i]->off & ~CFS_PAGE_MASK;
1222 memcpy(ptr + off, "bad1", min(4, nob));
1223 kunmap(pga[i]->pg);
1224 }
1225 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1226 pga[i]->off & ~CFS_PAGE_MASK,
1227 count);
1228 CDEBUG(D_PAGE,
1229 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1230 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1231 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1232 page_private(pga[i]->pg),
1233 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1234
1235 nob -= pga[i]->count;
1236 pg_count--;
1237 i++;
1238 }
1239
1240 bufsize = 4;
1241 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1242
1243 if (err)
1244 cfs_crypto_hash_final(hdesc, NULL, NULL);
1245
1246 /* For sending we only compute the wrong checksum instead
1247 * of corrupting the data so it is still correct on a redo */
1248 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1249 cksum++;
1250
1251 return cksum;
1252 }
1253
1254 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1255 struct obdo *oa,
1256 struct lov_stripe_md *lsm, u32 page_count,
1257 struct brw_page **pga,
1258 struct ptlrpc_request **reqp,
1259 struct obd_capa *ocapa, int reserve,
1260 int resend)
1261 {
1262 struct ptlrpc_request *req;
1263 struct ptlrpc_bulk_desc *desc;
1264 struct ost_body *body;
1265 struct obd_ioobj *ioobj;
1266 struct niobuf_remote *niobuf;
1267 int niocount, i, requested_nob, opc, rc;
1268 struct osc_brw_async_args *aa;
1269 struct req_capsule *pill;
1270 struct brw_page *pg_prev;
1271
1272 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1273 return -ENOMEM; /* Recoverable */
1274 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1275 return -EINVAL; /* Fatal */
1276
1277 if ((cmd & OBD_BRW_WRITE) != 0) {
1278 opc = OST_WRITE;
1279 req = ptlrpc_request_alloc_pool(cli->cl_import,
1280 osc_rq_pool,
1281 &RQF_OST_BRW_WRITE);
1282 } else {
1283 opc = OST_READ;
1284 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1285 }
1286 if (req == NULL)
1287 return -ENOMEM;
1288
1289 for (niocount = i = 1; i < page_count; i++) {
1290 if (!can_merge_pages(pga[i - 1], pga[i]))
1291 niocount++;
1292 }
1293
1294 pill = &req->rq_pill;
1295 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1296 sizeof(*ioobj));
1297 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1298 niocount * sizeof(*niobuf));
1299 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1300
1301 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1302 if (rc) {
1303 ptlrpc_request_free(req);
1304 return rc;
1305 }
1306 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1307 ptlrpc_at_set_req_timeout(req);
1308 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1309 * retry logic */
1310 req->rq_no_retry_einprogress = 1;
1311
1312 desc = ptlrpc_prep_bulk_imp(req, page_count,
1313 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1314 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1315 OST_BULK_PORTAL);
1316
1317 if (desc == NULL) {
1318 rc = -ENOMEM;
1319 goto out;
1320 }
1321 /* NB request now owns desc and will free it when it gets freed */
1322
1323 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1324 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1325 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1326 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1327
1328 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1329
1330 obdo_to_ioobj(oa, ioobj);
1331 ioobj->ioo_bufcnt = niocount;
1332 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1333 * that might be send for this request. The actual number is decided
1334 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1335 * "max - 1" for old client compatibility sending "0", and also so the
1336 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1337 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1338 osc_pack_capa(req, body, ocapa);
1339 LASSERT(page_count > 0);
1340 pg_prev = pga[0];
1341 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1342 struct brw_page *pg = pga[i];
1343 int poff = pg->off & ~CFS_PAGE_MASK;
1344
1345 LASSERT(pg->count > 0);
1346 /* make sure there is no gap in the middle of page array */
1347 LASSERTF(page_count == 1 ||
1348 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1349 ergo(i > 0 && i < page_count - 1,
1350 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1351 ergo(i == page_count - 1, poff == 0)),
1352 "i: %d/%d pg: %p off: %llu, count: %u\n",
1353 i, page_count, pg, pg->off, pg->count);
1354 LASSERTF(i == 0 || pg->off > pg_prev->off,
1355 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1356 i, page_count,
1357 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1358 pg_prev->pg, page_private(pg_prev->pg),
1359 pg_prev->pg->index, pg_prev->off);
1360 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1361 (pg->flag & OBD_BRW_SRVLOCK));
1362
1363 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1364 requested_nob += pg->count;
1365
1366 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1367 niobuf--;
1368 niobuf->len += pg->count;
1369 } else {
1370 niobuf->offset = pg->off;
1371 niobuf->len = pg->count;
1372 niobuf->flags = pg->flag;
1373 }
1374 pg_prev = pg;
1375 }
1376
1377 LASSERTF((void *)(niobuf - niocount) ==
1378 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1379 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1380 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1381
1382 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1383 if (resend) {
1384 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1385 body->oa.o_valid |= OBD_MD_FLFLAGS;
1386 body->oa.o_flags = 0;
1387 }
1388 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1389 }
1390
1391 if (osc_should_shrink_grant(cli))
1392 osc_shrink_grant_local(cli, &body->oa);
1393
1394 /* size[REQ_REC_OFF] still sizeof (*body) */
1395 if (opc == OST_WRITE) {
1396 if (cli->cl_checksum &&
1397 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1398 /* store cl_cksum_type in a local variable since
1399 * it can be changed via lprocfs */
1400 cksum_type_t cksum_type = cli->cl_cksum_type;
1401
1402 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1403 oa->o_flags &= OBD_FL_LOCAL_MASK;
1404 body->oa.o_flags = 0;
1405 }
1406 body->oa.o_flags |= cksum_type_pack(cksum_type);
1407 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1408 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1409 page_count, pga,
1410 OST_WRITE,
1411 cksum_type);
1412 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1413 body->oa.o_cksum);
1414 /* save this in 'oa', too, for later checking */
1415 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1416 oa->o_flags |= cksum_type_pack(cksum_type);
1417 } else {
1418 /* clear out the checksum flag, in case this is a
1419 * resend but cl_checksum is no longer set. b=11238 */
1420 oa->o_valid &= ~OBD_MD_FLCKSUM;
1421 }
1422 oa->o_cksum = body->oa.o_cksum;
1423 /* 1 RC per niobuf */
1424 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1425 sizeof(__u32) * niocount);
1426 } else {
1427 if (cli->cl_checksum &&
1428 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1429 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1430 body->oa.o_flags = 0;
1431 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1432 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1433 }
1434 }
1435 ptlrpc_request_set_replen(req);
1436
1437 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1438 aa = ptlrpc_req_async_args(req);
1439 aa->aa_oa = oa;
1440 aa->aa_requested_nob = requested_nob;
1441 aa->aa_nio_count = niocount;
1442 aa->aa_page_count = page_count;
1443 aa->aa_resends = 0;
1444 aa->aa_ppga = pga;
1445 aa->aa_cli = cli;
1446 INIT_LIST_HEAD(&aa->aa_oaps);
1447 if (ocapa && reserve)
1448 aa->aa_ocapa = capa_get(ocapa);
1449
1450 *reqp = req;
1451 return 0;
1452
1453 out:
1454 ptlrpc_req_finished(req);
1455 return rc;
1456 }
1457
1458 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1459 __u32 client_cksum, __u32 server_cksum, int nob,
1460 u32 page_count, struct brw_page **pga,
1461 cksum_type_t client_cksum_type)
1462 {
1463 __u32 new_cksum;
1464 char *msg;
1465 cksum_type_t cksum_type;
1466
1467 if (server_cksum == client_cksum) {
1468 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1469 return 0;
1470 }
1471
1472 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1473 oa->o_flags : 0);
1474 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1475 cksum_type);
1476
1477 if (cksum_type != client_cksum_type)
1478 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1479 ;
1480 else if (new_cksum == server_cksum)
1481 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1482 ;
1483 else if (new_cksum == client_cksum)
1484 msg = "changed in transit before arrival at OST";
1485 else
1486 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1487 ;
1488
1489 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1490 " object "DOSTID" extent [%llu-%llu]\n",
1491 msg, libcfs_nid2str(peer->nid),
1492 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1493 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1494 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1495 POSTID(&oa->o_oi), pga[0]->off,
1496 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1497 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1498 client_cksum, client_cksum_type,
1499 server_cksum, cksum_type, new_cksum);
1500 return 1;
1501 }
1502
1503 /* Note rc enters this function as number of bytes transferred */
1504 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1505 {
1506 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1507 const lnet_process_id_t *peer =
1508 &req->rq_import->imp_connection->c_peer;
1509 struct client_obd *cli = aa->aa_cli;
1510 struct ost_body *body;
1511 __u32 client_cksum = 0;
1512
1513 if (rc < 0 && rc != -EDQUOT) {
1514 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1515 return rc;
1516 }
1517
1518 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1519 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1520 if (body == NULL) {
1521 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1522 return -EPROTO;
1523 }
1524
1525 /* set/clear over quota flag for a uid/gid */
1526 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1529
1530 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1531 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1532 body->oa.o_flags);
1533 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1534 }
1535
1536 osc_update_grant(cli, body);
1537
1538 if (rc < 0)
1539 return rc;
1540
1541 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1543
1544 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1545 if (rc > 0) {
1546 CERROR("Unexpected +ve rc %d\n", rc);
1547 return -EPROTO;
1548 }
1549 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1550
1551 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1552 return -EAGAIN;
1553
1554 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555 check_write_checksum(&body->oa, peer, client_cksum,
1556 body->oa.o_cksum, aa->aa_requested_nob,
1557 aa->aa_page_count, aa->aa_ppga,
1558 cksum_type_unpack(aa->aa_oa->o_flags)))
1559 return -EAGAIN;
1560
1561 rc = check_write_rcs(req, aa->aa_requested_nob,
1562 aa->aa_nio_count,
1563 aa->aa_page_count, aa->aa_ppga);
1564 goto out;
1565 }
1566
1567 /* The rest of this function executes only for OST_READs */
1568
1569 /* if unwrap_bulk failed, return -EAGAIN to retry */
1570 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1571 if (rc < 0) {
1572 rc = -EAGAIN;
1573 goto out;
1574 }
1575
1576 if (rc > aa->aa_requested_nob) {
1577 CERROR("Unexpected rc %d (%d requested)\n", rc,
1578 aa->aa_requested_nob);
1579 return -EPROTO;
1580 }
1581
1582 if (rc != req->rq_bulk->bd_nob_transferred) {
1583 CERROR("Unexpected rc %d (%d transferred)\n",
1584 rc, req->rq_bulk->bd_nob_transferred);
1585 return -EPROTO;
1586 }
1587
1588 if (rc < aa->aa_requested_nob)
1589 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1590
1591 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1592 static int cksum_counter;
1593 __u32 server_cksum = body->oa.o_cksum;
1594 char *via;
1595 char *router;
1596 cksum_type_t cksum_type;
1597
1598 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1599 body->oa.o_flags : 0);
1600 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1601 aa->aa_ppga, OST_READ,
1602 cksum_type);
1603
1604 if (peer->nid == req->rq_bulk->bd_sender) {
1605 via = router = "";
1606 } else {
1607 via = " via ";
1608 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1609 }
1610
1611 if (server_cksum != client_cksum) {
1612 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1613 req->rq_import->imp_obd->obd_name,
1614 libcfs_nid2str(peer->nid),
1615 via, router,
1616 body->oa.o_valid & OBD_MD_FLFID ?
1617 body->oa.o_parent_seq : (__u64)0,
1618 body->oa.o_valid & OBD_MD_FLFID ?
1619 body->oa.o_parent_oid : 0,
1620 body->oa.o_valid & OBD_MD_FLFID ?
1621 body->oa.o_parent_ver : 0,
1622 POSTID(&body->oa.o_oi),
1623 aa->aa_ppga[0]->off,
1624 aa->aa_ppga[aa->aa_page_count-1]->off +
1625 aa->aa_ppga[aa->aa_page_count-1]->count -
1626 1);
1627 CERROR("client %x, server %x, cksum_type %x\n",
1628 client_cksum, server_cksum, cksum_type);
1629 cksum_counter = 0;
1630 aa->aa_oa->o_cksum = client_cksum;
1631 rc = -EAGAIN;
1632 } else {
1633 cksum_counter++;
1634 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1635 rc = 0;
1636 }
1637 } else if (unlikely(client_cksum)) {
1638 static int cksum_missed;
1639
1640 cksum_missed++;
1641 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1642 CERROR("Checksum %u requested from %s but not sent\n",
1643 cksum_missed, libcfs_nid2str(peer->nid));
1644 } else {
1645 rc = 0;
1646 }
1647 out:
1648 if (rc >= 0)
1649 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1650 aa->aa_oa, &body->oa);
1651
1652 return rc;
1653 }
1654
1655 static int osc_brw_redo_request(struct ptlrpc_request *request,
1656 struct osc_brw_async_args *aa, int rc)
1657 {
1658 struct ptlrpc_request *new_req;
1659 struct osc_brw_async_args *new_aa;
1660 struct osc_async_page *oap;
1661
1662 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1663 "redo for recoverable error %d", rc);
1664
1665 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1666 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1667 aa->aa_cli, aa->aa_oa,
1668 NULL /* lsm unused by osc currently */,
1669 aa->aa_page_count, aa->aa_ppga,
1670 &new_req, aa->aa_ocapa, 0, 1);
1671 if (rc)
1672 return rc;
1673
1674 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1675 if (oap->oap_request != NULL) {
1676 LASSERTF(request == oap->oap_request,
1677 "request %p != oap_request %p\n",
1678 request, oap->oap_request);
1679 if (oap->oap_interrupted) {
1680 ptlrpc_req_finished(new_req);
1681 return -EINTR;
1682 }
1683 }
1684 }
1685 /* New request takes over pga and oaps from old request.
1686 * Note that copying a list_head doesn't work, need to move it... */
1687 aa->aa_resends++;
1688 new_req->rq_interpret_reply = request->rq_interpret_reply;
1689 new_req->rq_async_args = request->rq_async_args;
1690 /* cap resend delay to the current request timeout, this is similar to
1691 * what ptlrpc does (see after_reply()) */
1692 if (aa->aa_resends > new_req->rq_timeout)
1693 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1694 else
1695 new_req->rq_sent = get_seconds() + aa->aa_resends;
1696 new_req->rq_generation_set = 1;
1697 new_req->rq_import_generation = request->rq_import_generation;
1698
1699 new_aa = ptlrpc_req_async_args(new_req);
1700
1701 INIT_LIST_HEAD(&new_aa->aa_oaps);
1702 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1703 INIT_LIST_HEAD(&new_aa->aa_exts);
1704 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1705 new_aa->aa_resends = aa->aa_resends;
1706
1707 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1708 if (oap->oap_request) {
1709 ptlrpc_req_finished(oap->oap_request);
1710 oap->oap_request = ptlrpc_request_addref(new_req);
1711 }
1712 }
1713
1714 new_aa->aa_ocapa = aa->aa_ocapa;
1715 aa->aa_ocapa = NULL;
1716
1717 /* XXX: This code will run into problem if we're going to support
1718 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1719 * and wait for all of them to be finished. We should inherit request
1720 * set from old request. */
1721 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1722
1723 DEBUG_REQ(D_INFO, new_req, "new request");
1724 return 0;
1725 }
1726
1727 /*
1728 * ugh, we want disk allocation on the target to happen in offset order. we'll
1729 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1730 * fine for our small page arrays and doesn't require allocation. its an
1731 * insertion sort that swaps elements that are strides apart, shrinking the
1732 * stride down until its '1' and the array is sorted.
1733 */
1734 static void sort_brw_pages(struct brw_page **array, int num)
1735 {
1736 int stride, i, j;
1737 struct brw_page *tmp;
1738
1739 if (num == 1)
1740 return;
1741 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1742 ;
1743
1744 do {
1745 stride /= 3;
1746 for (i = stride ; i < num ; i++) {
1747 tmp = array[i];
1748 j = i;
1749 while (j >= stride && array[j - stride]->off > tmp->off) {
1750 array[j] = array[j - stride];
1751 j -= stride;
1752 }
1753 array[j] = tmp;
1754 }
1755 } while (stride > 1);
1756 }
1757
1758 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1759 {
1760 LASSERT(ppga != NULL);
1761 kfree(ppga);
1762 }
1763
1764 static int brw_interpret(const struct lu_env *env,
1765 struct ptlrpc_request *req, void *data, int rc)
1766 {
1767 struct osc_brw_async_args *aa = data;
1768 struct osc_extent *ext;
1769 struct osc_extent *tmp;
1770 struct cl_object *obj = NULL;
1771 struct client_obd *cli = aa->aa_cli;
1772
1773 rc = osc_brw_fini_request(req, rc);
1774 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1775 /* When server return -EINPROGRESS, client should always retry
1776 * regardless of the number of times the bulk was resent already. */
1777 if (osc_recoverable_error(rc)) {
1778 if (req->rq_import_generation !=
1779 req->rq_import->imp_generation) {
1780 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1781 req->rq_import->imp_obd->obd_name,
1782 POSTID(&aa->aa_oa->o_oi), rc);
1783 } else if (rc == -EINPROGRESS ||
1784 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1785 rc = osc_brw_redo_request(req, aa, rc);
1786 } else {
1787 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1788 req->rq_import->imp_obd->obd_name,
1789 POSTID(&aa->aa_oa->o_oi), rc);
1790 }
1791
1792 if (rc == 0)
1793 return 0;
1794 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1795 rc = -EIO;
1796 }
1797
1798 if (aa->aa_ocapa) {
1799 capa_put(aa->aa_ocapa);
1800 aa->aa_ocapa = NULL;
1801 }
1802
1803 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1804 if (obj == NULL && rc == 0) {
1805 obj = osc2cl(ext->oe_obj);
1806 cl_object_get(obj);
1807 }
1808
1809 list_del_init(&ext->oe_link);
1810 osc_extent_finish(env, ext, 1, rc);
1811 }
1812 LASSERT(list_empty(&aa->aa_exts));
1813 LASSERT(list_empty(&aa->aa_oaps));
1814
1815 if (obj != NULL) {
1816 struct obdo *oa = aa->aa_oa;
1817 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1818 unsigned long valid = 0;
1819
1820 LASSERT(rc == 0);
1821 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1822 attr->cat_blocks = oa->o_blocks;
1823 valid |= CAT_BLOCKS;
1824 }
1825 if (oa->o_valid & OBD_MD_FLMTIME) {
1826 attr->cat_mtime = oa->o_mtime;
1827 valid |= CAT_MTIME;
1828 }
1829 if (oa->o_valid & OBD_MD_FLATIME) {
1830 attr->cat_atime = oa->o_atime;
1831 valid |= CAT_ATIME;
1832 }
1833 if (oa->o_valid & OBD_MD_FLCTIME) {
1834 attr->cat_ctime = oa->o_ctime;
1835 valid |= CAT_CTIME;
1836 }
1837 if (valid != 0) {
1838 cl_object_attr_lock(obj);
1839 cl_object_attr_set(env, obj, attr, valid);
1840 cl_object_attr_unlock(obj);
1841 }
1842 cl_object_put(env, obj);
1843 }
1844 OBDO_FREE(aa->aa_oa);
1845
1846 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1847 req->rq_bulk->bd_nob_transferred);
1848 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1849 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1850
1851 client_obd_list_lock(&cli->cl_loi_list_lock);
1852 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1853 * is called so we know whether to go to sync BRWs or wait for more
1854 * RPCs to complete */
1855 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1856 cli->cl_w_in_flight--;
1857 else
1858 cli->cl_r_in_flight--;
1859 osc_wake_cache_waiters(cli);
1860 client_obd_list_unlock(&cli->cl_loi_list_lock);
1861
1862 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1863 return rc;
1864 }
1865
1866 /**
1867 * Build an RPC by the list of extent @ext_list. The caller must ensure
1868 * that the total pages in this list are NOT over max pages per RPC.
1869 * Extents in the list must be in OES_RPC state.
1870 */
1871 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1872 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1873 {
1874 struct ptlrpc_request *req = NULL;
1875 struct osc_extent *ext;
1876 struct brw_page **pga = NULL;
1877 struct osc_brw_async_args *aa = NULL;
1878 struct obdo *oa = NULL;
1879 struct osc_async_page *oap;
1880 struct osc_async_page *tmp;
1881 struct cl_req *clerq = NULL;
1882 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1883 struct ldlm_lock *lock = NULL;
1884 struct cl_req_attr *crattr = NULL;
1885 u64 starting_offset = OBD_OBJECT_EOF;
1886 u64 ending_offset = 0;
1887 int mpflag = 0;
1888 int mem_tight = 0;
1889 int page_count = 0;
1890 int i;
1891 int rc;
1892 struct ost_body *body;
1893 LIST_HEAD(rpc_list);
1894
1895 LASSERT(!list_empty(ext_list));
1896
1897 /* add pages into rpc_list to build BRW rpc */
1898 list_for_each_entry(ext, ext_list, oe_link) {
1899 LASSERT(ext->oe_state == OES_RPC);
1900 mem_tight |= ext->oe_memalloc;
1901 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1902 ++page_count;
1903 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1904 if (starting_offset > oap->oap_obj_off)
1905 starting_offset = oap->oap_obj_off;
1906 else
1907 LASSERT(oap->oap_page_off == 0);
1908 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1909 ending_offset = oap->oap_obj_off +
1910 oap->oap_count;
1911 else
1912 LASSERT(oap->oap_page_off + oap->oap_count ==
1913 PAGE_CACHE_SIZE);
1914 }
1915 }
1916
1917 if (mem_tight)
1918 mpflag = cfs_memory_pressure_get_and_set();
1919
1920 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1921 if (!crattr) {
1922 rc = -ENOMEM;
1923 goto out;
1924 }
1925
1926 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1927 if (pga == NULL) {
1928 rc = -ENOMEM;
1929 goto out;
1930 }
1931
1932 OBDO_ALLOC(oa);
1933 if (oa == NULL) {
1934 rc = -ENOMEM;
1935 goto out;
1936 }
1937
1938 i = 0;
1939 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1940 struct cl_page *page = oap2cl_page(oap);
1941 if (clerq == NULL) {
1942 clerq = cl_req_alloc(env, page, crt,
1943 1 /* only 1-object rpcs for now */);
1944 if (IS_ERR(clerq)) {
1945 rc = PTR_ERR(clerq);
1946 goto out;
1947 }
1948 lock = oap->oap_ldlm_lock;
1949 }
1950 if (mem_tight)
1951 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1952 pga[i] = &oap->oap_brw_page;
1953 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1954 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1955 pga[i]->pg, page_index(oap->oap_page), oap,
1956 pga[i]->flag);
1957 i++;
1958 cl_req_page_add(env, clerq, page);
1959 }
1960
1961 /* always get the data for the obdo for the rpc */
1962 LASSERT(clerq != NULL);
1963 crattr->cra_oa = oa;
1964 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1965 if (lock) {
1966 oa->o_handle = lock->l_remote_handle;
1967 oa->o_valid |= OBD_MD_FLHANDLE;
1968 }
1969
1970 rc = cl_req_prep(env, clerq);
1971 if (rc != 0) {
1972 CERROR("cl_req_prep failed: %d\n", rc);
1973 goto out;
1974 }
1975
1976 sort_brw_pages(pga, page_count);
1977 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1978 pga, &req, crattr->cra_capa, 1, 0);
1979 if (rc != 0) {
1980 CERROR("prep_req failed: %d\n", rc);
1981 goto out;
1982 }
1983
1984 req->rq_interpret_reply = brw_interpret;
1985
1986 if (mem_tight != 0)
1987 req->rq_memalloc = 1;
1988
1989 /* Need to update the timestamps after the request is built in case
1990 * we race with setattr (locally or in queue at OST). If OST gets
1991 * later setattr before earlier BRW (as determined by the request xid),
1992 * the OST will not use BRW timestamps. Sadly, there is no obvious
1993 * way to do this in a single call. bug 10150 */
1994 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1995 crattr->cra_oa = &body->oa;
1996 cl_req_attr_set(env, clerq, crattr,
1997 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1998
1999 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2000
2001 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2002 aa = ptlrpc_req_async_args(req);
2003 INIT_LIST_HEAD(&aa->aa_oaps);
2004 list_splice_init(&rpc_list, &aa->aa_oaps);
2005 INIT_LIST_HEAD(&aa->aa_exts);
2006 list_splice_init(ext_list, &aa->aa_exts);
2007 aa->aa_clerq = clerq;
2008
2009 /* queued sync pages can be torn down while the pages
2010 * were between the pending list and the rpc */
2011 tmp = NULL;
2012 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2013 /* only one oap gets a request reference */
2014 if (tmp == NULL)
2015 tmp = oap;
2016 if (oap->oap_interrupted && !req->rq_intr) {
2017 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2018 oap, req);
2019 ptlrpc_mark_interrupted(req);
2020 }
2021 }
2022 if (tmp != NULL)
2023 tmp->oap_request = ptlrpc_request_addref(req);
2024
2025 client_obd_list_lock(&cli->cl_loi_list_lock);
2026 starting_offset >>= PAGE_CACHE_SHIFT;
2027 if (cmd == OBD_BRW_READ) {
2028 cli->cl_r_in_flight++;
2029 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2030 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2031 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2032 starting_offset + 1);
2033 } else {
2034 cli->cl_w_in_flight++;
2035 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2036 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2037 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2038 starting_offset + 1);
2039 }
2040 client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2043 page_count, aa, cli->cl_r_in_flight,
2044 cli->cl_w_in_flight);
2045
2046 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2047 * see which CPU/NUMA node the majority of pages were allocated
2048 * on, and try to assign the async RPC to the CPU core
2049 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2050 *
2051 * But on the other hand, we expect that multiple ptlrpcd
2052 * threads and the initial write sponsor can run in parallel,
2053 * especially when data checksum is enabled, which is CPU-bound
2054 * operation and single ptlrpcd thread cannot process in time.
2055 * So more ptlrpcd threads sharing BRW load
2056 * (with PDL_POLICY_ROUND) seems better.
2057 */
2058 ptlrpcd_add_req(req, pol, -1);
2059 rc = 0;
2060
2061 out:
2062 if (mem_tight != 0)
2063 cfs_memory_pressure_restore(mpflag);
2064
2065 if (crattr != NULL) {
2066 capa_put(crattr->cra_capa);
2067 kfree(crattr);
2068 }
2069
2070 if (rc != 0) {
2071 LASSERT(req == NULL);
2072
2073 if (oa)
2074 OBDO_FREE(oa);
2075 kfree(pga);
2076 /* this should happen rarely and is pretty bad, it makes the
2077 * pending list not follow the dirty order */
2078 while (!list_empty(ext_list)) {
2079 ext = list_entry(ext_list->next, struct osc_extent,
2080 oe_link);
2081 list_del_init(&ext->oe_link);
2082 osc_extent_finish(env, ext, 0, rc);
2083 }
2084 if (clerq && !IS_ERR(clerq))
2085 cl_req_completion(env, clerq, rc);
2086 }
2087 return rc;
2088 }
2089
2090 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2091 struct ldlm_enqueue_info *einfo)
2092 {
2093 void *data = einfo->ei_cbdata;
2094 int set = 0;
2095
2096 LASSERT(lock != NULL);
2097 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2098 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2099 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2100 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2101
2102 lock_res_and_lock(lock);
2103 spin_lock(&osc_ast_guard);
2104
2105 if (lock->l_ast_data == NULL)
2106 lock->l_ast_data = data;
2107 if (lock->l_ast_data == data)
2108 set = 1;
2109
2110 spin_unlock(&osc_ast_guard);
2111 unlock_res_and_lock(lock);
2112
2113 return set;
2114 }
2115
2116 static int osc_set_data_with_check(struct lustre_handle *lockh,
2117 struct ldlm_enqueue_info *einfo)
2118 {
2119 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2120 int set = 0;
2121
2122 if (lock != NULL) {
2123 set = osc_set_lock_data_with_check(lock, einfo);
2124 LDLM_LOCK_PUT(lock);
2125 } else
2126 CERROR("lockh %p, data %p - client evicted?\n",
2127 lockh, einfo->ei_cbdata);
2128 return set;
2129 }
2130
2131 /* find any ldlm lock of the inode in osc
2132 * return 0 not find
2133 * 1 find one
2134 * < 0 error */
2135 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2136 ldlm_iterator_t replace, void *data)
2137 {
2138 struct ldlm_res_id res_id;
2139 struct obd_device *obd = class_exp2obd(exp);
2140 int rc = 0;
2141
2142 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2143 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2144 if (rc == LDLM_ITER_STOP)
2145 return 1;
2146 if (rc == LDLM_ITER_CONTINUE)
2147 return 0;
2148 return rc;
2149 }
2150
2151 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2152 obd_enqueue_update_f upcall, void *cookie,
2153 __u64 *flags, int agl, int rc)
2154 {
2155 int intent = *flags & LDLM_FL_HAS_INTENT;
2156
2157 if (intent) {
2158 /* The request was created before ldlm_cli_enqueue call. */
2159 if (rc == ELDLM_LOCK_ABORTED) {
2160 struct ldlm_reply *rep;
2161 rep = req_capsule_server_get(&req->rq_pill,
2162 &RMF_DLM_REP);
2163
2164 LASSERT(rep != NULL);
2165 rep->lock_policy_res1 =
2166 ptlrpc_status_ntoh(rep->lock_policy_res1);
2167 if (rep->lock_policy_res1)
2168 rc = rep->lock_policy_res1;
2169 }
2170 }
2171
2172 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2173 (rc == 0)) {
2174 *flags |= LDLM_FL_LVB_READY;
2175 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2176 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2177 }
2178
2179 /* Call the update callback. */
2180 rc = (*upcall)(cookie, rc);
2181 return rc;
2182 }
2183
2184 static int osc_enqueue_interpret(const struct lu_env *env,
2185 struct ptlrpc_request *req,
2186 struct osc_enqueue_args *aa, int rc)
2187 {
2188 struct ldlm_lock *lock;
2189 struct lustre_handle handle;
2190 __u32 mode;
2191 struct ost_lvb *lvb;
2192 __u32 lvb_len;
2193 __u64 *flags = aa->oa_flags;
2194
2195 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2196 * might be freed anytime after lock upcall has been called. */
2197 lustre_handle_copy(&handle, aa->oa_lockh);
2198 mode = aa->oa_ei->ei_mode;
2199
2200 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2201 * be valid. */
2202 lock = ldlm_handle2lock(&handle);
2203
2204 /* Take an additional reference so that a blocking AST that
2205 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2206 * to arrive after an upcall has been executed by
2207 * osc_enqueue_fini(). */
2208 ldlm_lock_addref(&handle, mode);
2209
2210 /* Let CP AST to grant the lock first. */
2211 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2212
2213 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2214 lvb = NULL;
2215 lvb_len = 0;
2216 } else {
2217 lvb = aa->oa_lvb;
2218 lvb_len = sizeof(*aa->oa_lvb);
2219 }
2220
2221 /* Complete obtaining the lock procedure. */
2222 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2223 mode, flags, lvb, lvb_len, &handle, rc);
2224 /* Complete osc stuff. */
2225 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2226 flags, aa->oa_agl, rc);
2227
2228 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2229
2230 /* Release the lock for async request. */
2231 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2232 /*
2233 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2234 * not already released by
2235 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2236 */
2237 ldlm_lock_decref(&handle, mode);
2238
2239 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2240 aa->oa_lockh, req, aa);
2241 ldlm_lock_decref(&handle, mode);
2242 LDLM_LOCK_PUT(lock);
2243 return rc;
2244 }
2245
2246 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2247
2248 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2249 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2250 * other synchronous requests, however keeping some locks and trying to obtain
2251 * others may take a considerable amount of time in a case of ost failure; and
2252 * when other sync requests do not get released lock from a client, the client
2253 * is excluded from the cluster -- such scenarious make the life difficult, so
2254 * release locks just after they are obtained. */
2255 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2256 __u64 *flags, ldlm_policy_data_t *policy,
2257 struct ost_lvb *lvb, int kms_valid,
2258 obd_enqueue_update_f upcall, void *cookie,
2259 struct ldlm_enqueue_info *einfo,
2260 struct lustre_handle *lockh,
2261 struct ptlrpc_request_set *rqset, int async, int agl)
2262 {
2263 struct obd_device *obd = exp->exp_obd;
2264 struct ptlrpc_request *req = NULL;
2265 int intent = *flags & LDLM_FL_HAS_INTENT;
2266 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2267 ldlm_mode_t mode;
2268 int rc;
2269
2270 /* Filesystem lock extents are extended to page boundaries so that
2271 * dealing with the page cache is a little smoother. */
2272 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2273 policy->l_extent.end |= ~CFS_PAGE_MASK;
2274
2275 /*
2276 * kms is not valid when either object is completely fresh (so that no
2277 * locks are cached), or object was evicted. In the latter case cached
2278 * lock cannot be used, because it would prime inode state with
2279 * potentially stale LVB.
2280 */
2281 if (!kms_valid)
2282 goto no_match;
2283
2284 /* Next, search for already existing extent locks that will cover us */
2285 /* If we're trying to read, we also search for an existing PW lock. The
2286 * VFS and page cache already protect us locally, so lots of readers/
2287 * writers can share a single PW lock.
2288 *
2289 * There are problems with conversion deadlocks, so instead of
2290 * converting a read lock to a write lock, we'll just enqueue a new
2291 * one.
2292 *
2293 * At some point we should cancel the read lock instead of making them
2294 * send us a blocking callback, but there are problems with canceling
2295 * locks out from other users right now, too. */
2296 mode = einfo->ei_mode;
2297 if (einfo->ei_mode == LCK_PR)
2298 mode |= LCK_PW;
2299 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2300 einfo->ei_type, policy, mode, lockh, 0);
2301 if (mode) {
2302 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2303
2304 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2305 /* For AGL, if enqueue RPC is sent but the lock is not
2306 * granted, then skip to process this strpe.
2307 * Return -ECANCELED to tell the caller. */
2308 ldlm_lock_decref(lockh, mode);
2309 LDLM_LOCK_PUT(matched);
2310 return -ECANCELED;
2311 }
2312
2313 if (osc_set_lock_data_with_check(matched, einfo)) {
2314 *flags |= LDLM_FL_LVB_READY;
2315 /* addref the lock only if not async requests and PW
2316 * lock is matched whereas we asked for PR. */
2317 if (!rqset && einfo->ei_mode != mode)
2318 ldlm_lock_addref(lockh, LCK_PR);
2319 if (intent) {
2320 /* I would like to be able to ASSERT here that
2321 * rss <= kms, but I can't, for reasons which
2322 * are explained in lov_enqueue() */
2323 }
2324
2325 /* We already have a lock, and it's referenced.
2326 *
2327 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2328 * AGL upcall may change it to CLS_HELD directly. */
2329 (*upcall)(cookie, ELDLM_OK);
2330
2331 if (einfo->ei_mode != mode)
2332 ldlm_lock_decref(lockh, LCK_PW);
2333 else if (rqset)
2334 /* For async requests, decref the lock. */
2335 ldlm_lock_decref(lockh, einfo->ei_mode);
2336 LDLM_LOCK_PUT(matched);
2337 return ELDLM_OK;
2338 }
2339
2340 ldlm_lock_decref(lockh, mode);
2341 LDLM_LOCK_PUT(matched);
2342 }
2343
2344 no_match:
2345 if (intent) {
2346 LIST_HEAD(cancels);
2347 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2348 &RQF_LDLM_ENQUEUE_LVB);
2349 if (req == NULL)
2350 return -ENOMEM;
2351
2352 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2353 if (rc) {
2354 ptlrpc_request_free(req);
2355 return rc;
2356 }
2357
2358 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2359 sizeof(*lvb));
2360 ptlrpc_request_set_replen(req);
2361 }
2362
2363 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2364 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2365
2366 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2367 sizeof(*lvb), LVB_T_OST, lockh, async);
2368 if (rqset) {
2369 if (!rc) {
2370 struct osc_enqueue_args *aa;
2371 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2372 aa = ptlrpc_req_async_args(req);
2373 aa->oa_ei = einfo;
2374 aa->oa_exp = exp;
2375 aa->oa_flags = flags;
2376 aa->oa_upcall = upcall;
2377 aa->oa_cookie = cookie;
2378 aa->oa_lvb = lvb;
2379 aa->oa_lockh = lockh;
2380 aa->oa_agl = !!agl;
2381
2382 req->rq_interpret_reply =
2383 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2384 if (rqset == PTLRPCD_SET)
2385 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2386 else
2387 ptlrpc_set_add_req(rqset, req);
2388 } else if (intent) {
2389 ptlrpc_req_finished(req);
2390 }
2391 return rc;
2392 }
2393
2394 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2395 if (intent)
2396 ptlrpc_req_finished(req);
2397
2398 return rc;
2399 }
2400
2401 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2402 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2403 __u64 *flags, void *data, struct lustre_handle *lockh,
2404 int unref)
2405 {
2406 struct obd_device *obd = exp->exp_obd;
2407 __u64 lflags = *flags;
2408 ldlm_mode_t rc;
2409
2410 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2411 return -EIO;
2412
2413 /* Filesystem lock extents are extended to page boundaries so that
2414 * dealing with the page cache is a little smoother */
2415 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2416 policy->l_extent.end |= ~CFS_PAGE_MASK;
2417
2418 /* Next, search for already existing extent locks that will cover us */
2419 /* If we're trying to read, we also search for an existing PW lock. The
2420 * VFS and page cache already protect us locally, so lots of readers/
2421 * writers can share a single PW lock. */
2422 rc = mode;
2423 if (mode == LCK_PR)
2424 rc |= LCK_PW;
2425 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2426 res_id, type, policy, rc, lockh, unref);
2427 if (rc) {
2428 if (data != NULL) {
2429 if (!osc_set_data_with_check(lockh, data)) {
2430 if (!(lflags & LDLM_FL_TEST_LOCK))
2431 ldlm_lock_decref(lockh, rc);
2432 return 0;
2433 }
2434 }
2435 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2436 ldlm_lock_addref(lockh, LCK_PR);
2437 ldlm_lock_decref(lockh, LCK_PW);
2438 }
2439 return rc;
2440 }
2441 return rc;
2442 }
2443
2444 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2445 {
2446 if (unlikely(mode == LCK_GROUP))
2447 ldlm_lock_decref_and_cancel(lockh, mode);
2448 else
2449 ldlm_lock_decref(lockh, mode);
2450
2451 return 0;
2452 }
2453
2454 static int osc_statfs_interpret(const struct lu_env *env,
2455 struct ptlrpc_request *req,
2456 struct osc_async_args *aa, int rc)
2457 {
2458 struct obd_statfs *msfs;
2459
2460 if (rc == -EBADR)
2461 /* The request has in fact never been sent
2462 * due to issues at a higher level (LOV).
2463 * Exit immediately since the caller is
2464 * aware of the problem and takes care
2465 * of the clean up */
2466 return rc;
2467
2468 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2469 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2470 rc = 0;
2471 goto out;
2472 }
2473
2474 if (rc != 0)
2475 goto out;
2476
2477 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2478 if (msfs == NULL) {
2479 rc = -EPROTO;
2480 goto out;
2481 }
2482
2483 *aa->aa_oi->oi_osfs = *msfs;
2484 out:
2485 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2486 return rc;
2487 }
2488
2489 static int osc_statfs_async(struct obd_export *exp,
2490 struct obd_info *oinfo, __u64 max_age,
2491 struct ptlrpc_request_set *rqset)
2492 {
2493 struct obd_device *obd = class_exp2obd(exp);
2494 struct ptlrpc_request *req;
2495 struct osc_async_args *aa;
2496 int rc;
2497
2498 /* We could possibly pass max_age in the request (as an absolute
2499 * timestamp or a "seconds.usec ago") so the target can avoid doing
2500 * extra calls into the filesystem if that isn't necessary (e.g.
2501 * during mount that would help a bit). Having relative timestamps
2502 * is not so great if request processing is slow, while absolute
2503 * timestamps are not ideal because they need time synchronization. */
2504 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2505 if (req == NULL)
2506 return -ENOMEM;
2507
2508 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2509 if (rc) {
2510 ptlrpc_request_free(req);
2511 return rc;
2512 }
2513 ptlrpc_request_set_replen(req);
2514 req->rq_request_portal = OST_CREATE_PORTAL;
2515 ptlrpc_at_set_req_timeout(req);
2516
2517 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2518 /* procfs requests not want stat in wait for avoid deadlock */
2519 req->rq_no_resend = 1;
2520 req->rq_no_delay = 1;
2521 }
2522
2523 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2524 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2525 aa = ptlrpc_req_async_args(req);
2526 aa->aa_oi = oinfo;
2527
2528 ptlrpc_set_add_req(rqset, req);
2529 return 0;
2530 }
2531
2532 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2533 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2534 {
2535 struct obd_device *obd = class_exp2obd(exp);
2536 struct obd_statfs *msfs;
2537 struct ptlrpc_request *req;
2538 struct obd_import *imp = NULL;
2539 int rc;
2540
2541 /*Since the request might also come from lprocfs, so we need
2542 *sync this with client_disconnect_export Bug15684*/
2543 down_read(&obd->u.cli.cl_sem);
2544 if (obd->u.cli.cl_import)
2545 imp = class_import_get(obd->u.cli.cl_import);
2546 up_read(&obd->u.cli.cl_sem);
2547 if (!imp)
2548 return -ENODEV;
2549
2550 /* We could possibly pass max_age in the request (as an absolute
2551 * timestamp or a "seconds.usec ago") so the target can avoid doing
2552 * extra calls into the filesystem if that isn't necessary (e.g.
2553 * during mount that would help a bit). Having relative timestamps
2554 * is not so great if request processing is slow, while absolute
2555 * timestamps are not ideal because they need time synchronization. */
2556 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2557
2558 class_import_put(imp);
2559
2560 if (req == NULL)
2561 return -ENOMEM;
2562
2563 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2564 if (rc) {
2565 ptlrpc_request_free(req);
2566 return rc;
2567 }
2568 ptlrpc_request_set_replen(req);
2569 req->rq_request_portal = OST_CREATE_PORTAL;
2570 ptlrpc_at_set_req_timeout(req);
2571
2572 if (flags & OBD_STATFS_NODELAY) {
2573 /* procfs requests not want stat in wait for avoid deadlock */
2574 req->rq_no_resend = 1;
2575 req->rq_no_delay = 1;
2576 }
2577
2578 rc = ptlrpc_queue_wait(req);
2579 if (rc)
2580 goto out;
2581
2582 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2583 if (msfs == NULL) {
2584 rc = -EPROTO;
2585 goto out;
2586 }
2587
2588 *osfs = *msfs;
2589
2590 out:
2591 ptlrpc_req_finished(req);
2592 return rc;
2593 }
2594
2595 /* Retrieve object striping information.
2596 *
2597 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2598 * the maximum number of OST indices which will fit in the user buffer.
2599 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2600 */
2601 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2602 {
2603 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2604 struct lov_user_md_v3 lum, *lumk;
2605 struct lov_user_ost_data_v1 *lmm_objects;
2606 int rc = 0, lum_size;
2607
2608 if (!lsm)
2609 return -ENODATA;
2610
2611 /* we only need the header part from user space to get lmm_magic and
2612 * lmm_stripe_count, (the header part is common to v1 and v3) */
2613 lum_size = sizeof(struct lov_user_md_v1);
2614 if (copy_from_user(&lum, lump, lum_size))
2615 return -EFAULT;
2616
2617 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2618 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2619 return -EINVAL;
2620
2621 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2622 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2623 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2624 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2625
2626 /* we can use lov_mds_md_size() to compute lum_size
2627 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2628 if (lum.lmm_stripe_count > 0) {
2629 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2630 lumk = kzalloc(lum_size, GFP_NOFS);
2631 if (!lumk)
2632 return -ENOMEM;
2633
2634 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2635 lmm_objects =
2636 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2637 else
2638 lmm_objects = &(lumk->lmm_objects[0]);
2639 lmm_objects->l_ost_oi = lsm->lsm_oi;
2640 } else {
2641 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2642 lumk = &lum;
2643 }
2644
2645 lumk->lmm_oi = lsm->lsm_oi;
2646 lumk->lmm_stripe_count = 1;
2647
2648 if (copy_to_user(lump, lumk, lum_size))
2649 rc = -EFAULT;
2650
2651 if (lumk != &lum)
2652 kfree(lumk);
2653
2654 return rc;
2655 }
2656
2657
2658 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2659 void *karg, void *uarg)
2660 {
2661 struct obd_device *obd = exp->exp_obd;
2662 struct obd_ioctl_data *data = karg;
2663 int err = 0;
2664
2665 if (!try_module_get(THIS_MODULE)) {
2666 CERROR("Can't get module. Is it alive?");
2667 return -EINVAL;
2668 }
2669 switch (cmd) {
2670 case OBD_IOC_LOV_GET_CONFIG: {
2671 char *buf;
2672 struct lov_desc *desc;
2673 struct obd_uuid uuid;
2674
2675 buf = NULL;
2676 len = 0;
2677 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2678 err = -EINVAL;
2679 goto out;
2680 }
2681
2682 data = (struct obd_ioctl_data *)buf;
2683
2684 if (sizeof(*desc) > data->ioc_inllen1) {
2685 obd_ioctl_freedata(buf, len);
2686 err = -EINVAL;
2687 goto out;
2688 }
2689
2690 if (data->ioc_inllen2 < sizeof(uuid)) {
2691 obd_ioctl_freedata(buf, len);
2692 err = -EINVAL;
2693 goto out;
2694 }
2695
2696 desc = (struct lov_desc *)data->ioc_inlbuf1;
2697 desc->ld_tgt_count = 1;
2698 desc->ld_active_tgt_count = 1;
2699 desc->ld_default_stripe_count = 1;
2700 desc->ld_default_stripe_size = 0;
2701 desc->ld_default_stripe_offset = 0;
2702 desc->ld_pattern = 0;
2703 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2704
2705 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2706
2707 err = copy_to_user(uarg, buf, len);
2708 if (err)
2709 err = -EFAULT;
2710 obd_ioctl_freedata(buf, len);
2711 goto out;
2712 }
2713 case LL_IOC_LOV_SETSTRIPE:
2714 err = obd_alloc_memmd(exp, karg);
2715 if (err > 0)
2716 err = 0;
2717 goto out;
2718 case LL_IOC_LOV_GETSTRIPE:
2719 err = osc_getstripe(karg, uarg);
2720 goto out;
2721 case OBD_IOC_CLIENT_RECOVER:
2722 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2723 data->ioc_inlbuf1, 0);
2724 if (err > 0)
2725 err = 0;
2726 goto out;
2727 case IOC_OSC_SET_ACTIVE:
2728 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2729 data->ioc_offset);
2730 goto out;
2731 case OBD_IOC_POLL_QUOTACHECK:
2732 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2733 goto out;
2734 case OBD_IOC_PING_TARGET:
2735 err = ptlrpc_obd_ping(obd);
2736 goto out;
2737 default:
2738 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2739 cmd, current_comm());
2740 err = -ENOTTY;
2741 goto out;
2742 }
2743 out:
2744 module_put(THIS_MODULE);
2745 return err;
2746 }
2747
2748 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2749 u32 keylen, void *key, __u32 *vallen, void *val,
2750 struct lov_stripe_md *lsm)
2751 {
2752 if (!vallen || !val)
2753 return -EFAULT;
2754
2755 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2756 __u32 *stripe = val;
2757 *vallen = sizeof(*stripe);
2758 *stripe = 0;
2759 return 0;
2760 } else if (KEY_IS(KEY_LAST_ID)) {
2761 struct ptlrpc_request *req;
2762 u64 *reply;
2763 char *tmp;
2764 int rc;
2765
2766 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2767 &RQF_OST_GET_INFO_LAST_ID);
2768 if (req == NULL)
2769 return -ENOMEM;
2770
2771 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2772 RCL_CLIENT, keylen);
2773 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2774 if (rc) {
2775 ptlrpc_request_free(req);
2776 return rc;
2777 }
2778
2779 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2780 memcpy(tmp, key, keylen);
2781
2782 req->rq_no_delay = req->rq_no_resend = 1;
2783 ptlrpc_request_set_replen(req);
2784 rc = ptlrpc_queue_wait(req);
2785 if (rc)
2786 goto out;
2787
2788 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2789 if (reply == NULL) {
2790 rc = -EPROTO;
2791 goto out;
2792 }
2793
2794 *((u64 *)val) = *reply;
2795 out:
2796 ptlrpc_req_finished(req);
2797 return rc;
2798 } else if (KEY_IS(KEY_FIEMAP)) {
2799 struct ll_fiemap_info_key *fm_key =
2800 (struct ll_fiemap_info_key *)key;
2801 struct ldlm_res_id res_id;
2802 ldlm_policy_data_t policy;
2803 struct lustre_handle lockh;
2804 ldlm_mode_t mode = 0;
2805 struct ptlrpc_request *req;
2806 struct ll_user_fiemap *reply;
2807 char *tmp;
2808 int rc;
2809
2810 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2811 goto skip_locking;
2812
2813 policy.l_extent.start = fm_key->fiemap.fm_start &
2814 CFS_PAGE_MASK;
2815
2816 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2817 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2818 policy.l_extent.end = OBD_OBJECT_EOF;
2819 else
2820 policy.l_extent.end = (fm_key->fiemap.fm_start +
2821 fm_key->fiemap.fm_length +
2822 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2823
2824 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2825 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2826 LDLM_FL_BLOCK_GRANTED |
2827 LDLM_FL_LVB_READY,
2828 &res_id, LDLM_EXTENT, &policy,
2829 LCK_PR | LCK_PW, &lockh, 0);
2830 if (mode) { /* lock is cached on client */
2831 if (mode != LCK_PR) {
2832 ldlm_lock_addref(&lockh, LCK_PR);
2833 ldlm_lock_decref(&lockh, LCK_PW);
2834 }
2835 } else { /* no cached lock, needs acquire lock on server side */
2836 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2837 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2838 }
2839
2840 skip_locking:
2841 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2842 &RQF_OST_GET_INFO_FIEMAP);
2843 if (req == NULL) {
2844 rc = -ENOMEM;
2845 goto drop_lock;
2846 }
2847
2848 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2849 RCL_CLIENT, keylen);
2850 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2851 RCL_CLIENT, *vallen);
2852 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2853 RCL_SERVER, *vallen);
2854
2855 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2856 if (rc) {
2857 ptlrpc_request_free(req);
2858 goto drop_lock;
2859 }
2860
2861 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2862 memcpy(tmp, key, keylen);
2863 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2864 memcpy(tmp, val, *vallen);
2865
2866 ptlrpc_request_set_replen(req);
2867 rc = ptlrpc_queue_wait(req);
2868 if (rc)
2869 goto fini_req;
2870
2871 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2872 if (reply == NULL) {
2873 rc = -EPROTO;
2874 goto fini_req;
2875 }
2876
2877 memcpy(val, reply, *vallen);
2878 fini_req:
2879 ptlrpc_req_finished(req);
2880 drop_lock:
2881 if (mode)
2882 ldlm_lock_decref(&lockh, LCK_PR);
2883 return rc;
2884 }
2885
2886 return -EINVAL;
2887 }
2888
2889 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2890 u32 keylen, void *key, u32 vallen,
2891 void *val, struct ptlrpc_request_set *set)
2892 {
2893 struct ptlrpc_request *req;
2894 struct obd_device *obd = exp->exp_obd;
2895 struct obd_import *imp = class_exp2cliimp(exp);
2896 char *tmp;
2897 int rc;
2898
2899 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2900
2901 if (KEY_IS(KEY_CHECKSUM)) {
2902 if (vallen != sizeof(int))
2903 return -EINVAL;
2904 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2905 return 0;
2906 }
2907
2908 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2909 sptlrpc_conf_client_adapt(obd);
2910 return 0;
2911 }
2912
2913 if (KEY_IS(KEY_FLUSH_CTX)) {
2914 sptlrpc_import_flush_my_ctx(imp);
2915 return 0;
2916 }
2917
2918 if (KEY_IS(KEY_CACHE_SET)) {
2919 struct client_obd *cli = &obd->u.cli;
2920
2921 LASSERT(cli->cl_cache == NULL); /* only once */
2922 cli->cl_cache = (struct cl_client_cache *)val;
2923 atomic_inc(&cli->cl_cache->ccc_users);
2924 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2925
2926 /* add this osc into entity list */
2927 LASSERT(list_empty(&cli->cl_lru_osc));
2928 spin_lock(&cli->cl_cache->ccc_lru_lock);
2929 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2930 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2931
2932 return 0;
2933 }
2934
2935 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2936 struct client_obd *cli = &obd->u.cli;
2937 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2938 int target = *(int *)val;
2939
2940 nr = osc_lru_shrink(cli, min(nr, target));
2941 *(int *)val -= nr;
2942 return 0;
2943 }
2944
2945 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2946 return -EINVAL;
2947
2948 /* We pass all other commands directly to OST. Since nobody calls osc
2949 methods directly and everybody is supposed to go through LOV, we
2950 assume lov checked invalid values for us.
2951 The only recognised values so far are evict_by_nid and mds_conn.
2952 Even if something bad goes through, we'd get a -EINVAL from OST
2953 anyway. */
2954
2955 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2956 &RQF_OST_SET_GRANT_INFO :
2957 &RQF_OBD_SET_INFO);
2958 if (req == NULL)
2959 return -ENOMEM;
2960
2961 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2962 RCL_CLIENT, keylen);
2963 if (!KEY_IS(KEY_GRANT_SHRINK))
2964 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2965 RCL_CLIENT, vallen);
2966 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2967 if (rc) {
2968 ptlrpc_request_free(req);
2969 return rc;
2970 }
2971
2972 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2973 memcpy(tmp, key, keylen);
2974 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2975 &RMF_OST_BODY :
2976 &RMF_SETINFO_VAL);
2977 memcpy(tmp, val, vallen);
2978
2979 if (KEY_IS(KEY_GRANT_SHRINK)) {
2980 struct osc_brw_async_args *aa;
2981 struct obdo *oa;
2982
2983 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2984 aa = ptlrpc_req_async_args(req);
2985 OBDO_ALLOC(oa);
2986 if (!oa) {
2987 ptlrpc_req_finished(req);
2988 return -ENOMEM;
2989 }
2990 *oa = ((struct ost_body *)val)->oa;
2991 aa->aa_oa = oa;
2992 req->rq_interpret_reply = osc_shrink_grant_interpret;
2993 }
2994
2995 ptlrpc_request_set_replen(req);
2996 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2997 LASSERT(set != NULL);
2998 ptlrpc_set_add_req(set, req);
2999 ptlrpc_check_set(NULL, set);
3000 } else
3001 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3002
3003 return 0;
3004 }
3005
3006 static int osc_reconnect(const struct lu_env *env,
3007 struct obd_export *exp, struct obd_device *obd,
3008 struct obd_uuid *cluuid,
3009 struct obd_connect_data *data,
3010 void *localdata)
3011 {
3012 struct client_obd *cli = &obd->u.cli;
3013
3014 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3015 long lost_grant;
3016
3017 client_obd_list_lock(&cli->cl_loi_list_lock);
3018 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3019 2 * cli_brw_size(obd);
3020 lost_grant = cli->cl_lost_grant;
3021 cli->cl_lost_grant = 0;
3022 client_obd_list_unlock(&cli->cl_loi_list_lock);
3023
3024 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3025 data->ocd_connect_flags,
3026 data->ocd_version, data->ocd_grant, lost_grant);
3027 }
3028
3029 return 0;
3030 }
3031
3032 static int osc_disconnect(struct obd_export *exp)
3033 {
3034 struct obd_device *obd = class_exp2obd(exp);
3035 int rc;
3036
3037 rc = client_disconnect_export(exp);
3038 /**
3039 * Initially we put del_shrink_grant before disconnect_export, but it
3040 * causes the following problem if setup (connect) and cleanup
3041 * (disconnect) are tangled together.
3042 * connect p1 disconnect p2
3043 * ptlrpc_connect_import
3044 * ............... class_manual_cleanup
3045 * osc_disconnect
3046 * del_shrink_grant
3047 * ptlrpc_connect_interrupt
3048 * init_grant_shrink
3049 * add this client to shrink list
3050 * cleanup_osc
3051 * Bang! pinger trigger the shrink.
3052 * So the osc should be disconnected from the shrink list, after we
3053 * are sure the import has been destroyed. BUG18662
3054 */
3055 if (obd->u.cli.cl_import == NULL)
3056 osc_del_shrink_grant(&obd->u.cli);
3057 return rc;
3058 }
3059
3060 static int osc_import_event(struct obd_device *obd,
3061 struct obd_import *imp,
3062 enum obd_import_event event)
3063 {
3064 struct client_obd *cli;
3065 int rc = 0;
3066
3067 LASSERT(imp->imp_obd == obd);
3068
3069 switch (event) {
3070 case IMP_EVENT_DISCON: {
3071 cli = &obd->u.cli;
3072 client_obd_list_lock(&cli->cl_loi_list_lock);
3073 cli->cl_avail_grant = 0;
3074 cli->cl_lost_grant = 0;
3075 client_obd_list_unlock(&cli->cl_loi_list_lock);
3076 break;
3077 }
3078 case IMP_EVENT_INACTIVE: {
3079 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3080 break;
3081 }
3082 case IMP_EVENT_INVALIDATE: {
3083 struct ldlm_namespace *ns = obd->obd_namespace;
3084 struct lu_env *env;
3085 int refcheck;
3086
3087 env = cl_env_get(&refcheck);
3088 if (!IS_ERR(env)) {
3089 /* Reset grants */
3090 cli = &obd->u.cli;
3091 /* all pages go to failing rpcs due to the invalid
3092 * import */
3093 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3094
3095 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3096 cl_env_put(env, &refcheck);
3097 } else
3098 rc = PTR_ERR(env);
3099 break;
3100 }
3101 case IMP_EVENT_ACTIVE: {
3102 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3103 break;
3104 }
3105 case IMP_EVENT_OCD: {
3106 struct obd_connect_data *ocd = &imp->imp_connect_data;
3107
3108 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3109 osc_init_grant(&obd->u.cli, ocd);
3110
3111 /* See bug 7198 */
3112 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3113 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3114
3115 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3116 break;
3117 }
3118 case IMP_EVENT_DEACTIVATE: {
3119 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3120 break;
3121 }
3122 case IMP_EVENT_ACTIVATE: {
3123 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3124 break;
3125 }
3126 default:
3127 CERROR("Unknown import event %d\n", event);
3128 LBUG();
3129 }
3130 return rc;
3131 }
3132
3133 /**
3134 * Determine whether the lock can be canceled before replaying the lock
3135 * during recovery, see bug16774 for detailed information.
3136 *
3137 * \retval zero the lock can't be canceled
3138 * \retval other ok to cancel
3139 */
3140 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3141 {
3142 check_res_locked(lock->l_resource);
3143
3144 /*
3145 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3146 *
3147 * XXX as a future improvement, we can also cancel unused write lock
3148 * if it doesn't have dirty data and active mmaps.
3149 */
3150 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3151 (lock->l_granted_mode == LCK_PR ||
3152 lock->l_granted_mode == LCK_CR) &&
3153 (osc_dlm_lock_pageref(lock) == 0))
3154 return 1;
3155
3156 return 0;
3157 }
3158
3159 static int brw_queue_work(const struct lu_env *env, void *data)
3160 {
3161 struct client_obd *cli = data;
3162
3163 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3164
3165 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3166 return 0;
3167 }
3168
3169 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3170 {
3171 struct lprocfs_static_vars lvars = { NULL };
3172 struct client_obd *cli = &obd->u.cli;
3173 void *handler;
3174 int rc;
3175 int adding;
3176 int added;
3177 int req_count;
3178
3179 rc = ptlrpcd_addref();
3180 if (rc)
3181 return rc;
3182
3183 rc = client_obd_setup(obd, lcfg);
3184 if (rc)
3185 goto out_ptlrpcd;
3186
3187 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3188 if (IS_ERR(handler)) {
3189 rc = PTR_ERR(handler);
3190 goto out_client_setup;
3191 }
3192 cli->cl_writeback_work = handler;
3193
3194 rc = osc_quota_setup(obd);
3195 if (rc)
3196 goto out_ptlrpcd_work;
3197
3198 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3199 lprocfs_osc_init_vars(&lvars);
3200 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3201 lproc_osc_attach_seqstat(obd);
3202 sptlrpc_lprocfs_cliobd_attach(obd);
3203 ptlrpc_lprocfs_register_obd(obd);
3204 }
3205
3206 /*
3207 * We try to control the total number of requests with a upper limit
3208 * osc_reqpool_maxreqcount. There might be some race which will cause
3209 * over-limit allocation, but it is fine.
3210 */
3211 req_count = atomic_read(&osc_pool_req_count);
3212 if (req_count < osc_reqpool_maxreqcount) {
3213 adding = cli->cl_max_rpcs_in_flight + 2;
3214 if (req_count + adding > osc_reqpool_maxreqcount)
3215 adding = osc_reqpool_maxreqcount - req_count;
3216
3217 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3218 atomic_add(added, &osc_pool_req_count);
3219 }
3220
3221 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3222 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3223 return rc;
3224
3225 out_ptlrpcd_work:
3226 ptlrpcd_destroy_work(handler);
3227 out_client_setup:
3228 client_obd_cleanup(obd);
3229 out_ptlrpcd:
3230 ptlrpcd_decref();
3231 return rc;
3232 }
3233
3234 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3235 {
3236 switch (stage) {
3237 case OBD_CLEANUP_EARLY: {
3238 struct obd_import *imp;
3239 imp = obd->u.cli.cl_import;
3240 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3241 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3242 ptlrpc_deactivate_import(imp);
3243 spin_lock(&imp->imp_lock);
3244 imp->imp_pingable = 0;
3245 spin_unlock(&imp->imp_lock);
3246 break;
3247 }
3248 case OBD_CLEANUP_EXPORTS: {
3249 struct client_obd *cli = &obd->u.cli;
3250 /* LU-464
3251 * for echo client, export may be on zombie list, wait for
3252 * zombie thread to cull it, because cli.cl_import will be
3253 * cleared in client_disconnect_export():
3254 * class_export_destroy() -> obd_cleanup() ->
3255 * echo_device_free() -> echo_client_cleanup() ->
3256 * obd_disconnect() -> osc_disconnect() ->
3257 * client_disconnect_export()
3258 */
3259 obd_zombie_barrier();
3260 if (cli->cl_writeback_work) {
3261 ptlrpcd_destroy_work(cli->cl_writeback_work);
3262 cli->cl_writeback_work = NULL;
3263 }
3264 obd_cleanup_client_import(obd);
3265 ptlrpc_lprocfs_unregister_obd(obd);
3266 lprocfs_obd_cleanup(obd);
3267 break;
3268 }
3269 }
3270 return 0;
3271 }
3272
3273 int osc_cleanup(struct obd_device *obd)
3274 {
3275 struct client_obd *cli = &obd->u.cli;
3276 int rc;
3277
3278 /* lru cleanup */
3279 if (cli->cl_cache != NULL) {
3280 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3281 spin_lock(&cli->cl_cache->ccc_lru_lock);
3282 list_del_init(&cli->cl_lru_osc);
3283 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3284 cli->cl_lru_left = NULL;
3285 atomic_dec(&cli->cl_cache->ccc_users);
3286 cli->cl_cache = NULL;
3287 }
3288
3289 /* free memory of osc quota cache */
3290 osc_quota_cleanup(obd);
3291
3292 rc = client_obd_cleanup(obd);
3293
3294 ptlrpcd_decref();
3295 return rc;
3296 }
3297
3298 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3299 {
3300 struct lprocfs_static_vars lvars = { NULL };
3301 int rc = 0;
3302
3303 lprocfs_osc_init_vars(&lvars);
3304
3305 switch (lcfg->lcfg_command) {
3306 default:
3307 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3308 lcfg, obd);
3309 if (rc > 0)
3310 rc = 0;
3311 break;
3312 }
3313
3314 return rc;
3315 }
3316
3317 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3318 {
3319 return osc_process_config_base(obd, buf);
3320 }
3321
3322 struct obd_ops osc_obd_ops = {
3323 .o_owner = THIS_MODULE,
3324 .o_setup = osc_setup,
3325 .o_precleanup = osc_precleanup,
3326 .o_cleanup = osc_cleanup,
3327 .o_add_conn = client_import_add_conn,
3328 .o_del_conn = client_import_del_conn,
3329 .o_connect = client_connect_import,
3330 .o_reconnect = osc_reconnect,
3331 .o_disconnect = osc_disconnect,
3332 .o_statfs = osc_statfs,
3333 .o_statfs_async = osc_statfs_async,
3334 .o_packmd = osc_packmd,
3335 .o_unpackmd = osc_unpackmd,
3336 .o_create = osc_create,
3337 .o_destroy = osc_destroy,
3338 .o_getattr = osc_getattr,
3339 .o_getattr_async = osc_getattr_async,
3340 .o_setattr = osc_setattr,
3341 .o_setattr_async = osc_setattr_async,
3342 .o_find_cbdata = osc_find_cbdata,
3343 .o_iocontrol = osc_iocontrol,
3344 .o_get_info = osc_get_info,
3345 .o_set_info_async = osc_set_info_async,
3346 .o_import_event = osc_import_event,
3347 .o_process_config = osc_process_config,
3348 .o_quotactl = osc_quotactl,
3349 .o_quotacheck = osc_quotacheck,
3350 };
3351
3352 extern struct lu_kmem_descr osc_caches[];
3353 extern spinlock_t osc_ast_guard;
3354 extern struct lock_class_key osc_ast_guard_class;
3355
3356 static int __init osc_init(void)
3357 {
3358 struct lprocfs_static_vars lvars = { NULL };
3359 unsigned int reqpool_size;
3360 unsigned int reqsize;
3361 int rc;
3362
3363 /* print an address of _any_ initialized kernel symbol from this
3364 * module, to allow debugging with gdb that doesn't support data
3365 * symbols from modules.*/
3366 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3367
3368 rc = lu_kmem_init(osc_caches);
3369 if (rc)
3370 return rc;
3371
3372 lprocfs_osc_init_vars(&lvars);
3373
3374 rc = class_register_type(&osc_obd_ops, NULL,
3375 LUSTRE_OSC_NAME, &osc_device_type);
3376 if (rc)
3377 goto out_kmem;
3378
3379 spin_lock_init(&osc_ast_guard);
3380 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3381
3382 /* This is obviously too much memory, only prevent overflow here */
3383 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3384 rc = -EINVAL;
3385 goto out_type;
3386 }
3387
3388 reqpool_size = osc_reqpool_mem_max << 20;
3389
3390 reqsize = 1;
3391 while (reqsize < OST_MAXREQSIZE)
3392 reqsize = reqsize << 1;
3393
3394 /*
3395 * We don't enlarge the request count in OSC pool according to
3396 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3397 * tried after normal allocation failed. So a small OSC pool won't
3398 * cause much performance degression in most of cases.
3399 */
3400 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3401
3402 atomic_set(&osc_pool_req_count, 0);
3403 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3404 ptlrpc_add_rqs_to_pool);
3405
3406 if (osc_rq_pool)
3407 return 0;
3408
3409 rc = -ENOMEM;
3410
3411 out_type:
3412 class_unregister_type(LUSTRE_OSC_NAME);
3413 out_kmem:
3414 lu_kmem_fini(osc_caches);
3415 return rc;
3416 }
3417
3418 static void /*__exit*/ osc_exit(void)
3419 {
3420 class_unregister_type(LUSTRE_OSC_NAME);
3421 lu_kmem_fini(osc_caches);
3422 ptlrpc_free_rq_pool(osc_rq_pool);
3423 }
3424
3425 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3426 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3427 MODULE_LICENSE("GPL");
3428 MODULE_VERSION(LUSTRE_VERSION_STRING);
3429
3430 module_init(osc_init);
3431 module_exit(osc_exit);
This page took 0.104571 seconds and 4 git commands to generate.