b0d0e2a1744ef8e8a738a9af3395517679111ed1
[deliverable/linux.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include <linux/lustre_intent.h>
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_dlm.h>
45 #include <lustre_fid.h> /* fid_res_name_eq() */
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
55 };
56
57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59 return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65 it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71 it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
80 else
81 return 0;
82 }
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
86 else
87 return 0;
88 }
89
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
93 else
94 return 0;
95 }
96
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
100 else
101 return 0;
102 }
103
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
107 else
108 return 0;
109 }
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
112 LBUG();
113 return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 __u64 *bits)
120 {
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
123
124 if(bits)
125 *bits = 0;
126
127 if (!*lockh)
128 return 0;
129
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
141 old_inode->i_state,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
143 }
144 lock->l_resource->lr_lvb_inode = new_inode;
145 if (bits)
146 *bits = lock->l_policy_data.l_inodebits.bits;
147
148 unlock_res_and_lock(lock);
149 LDLM_LOCK_PUT(lock);
150
151 return 0;
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
158 {
159 struct ldlm_res_id res_id;
160 ldlm_mode_t rc;
161
162 fid_build_reg_res_name(fid, &res_id);
163 /* LU-4405: Clear bits not supported by server */
164 policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
167 return rc;
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
173 ldlm_mode_t mode,
174 ldlm_cancel_flags_t flags,
175 void *opaque)
176 {
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
179 int rc;
180
181 fid_build_reg_res_name(fid, &res_id);
182 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 policy, mode, flags, opaque);
184 return rc;
185 }
186
187 int mdc_null_inode(struct obd_export *exp,
188 const struct lu_fid *fid)
189 {
190 struct ldlm_res_id res_id;
191 struct ldlm_resource *res;
192 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194 LASSERTF(ns != NULL, "no namespace passed\n");
195
196 fid_build_reg_res_name(fid, &res_id);
197
198 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199 if(res == NULL)
200 return 0;
201
202 lock_res(res);
203 res->lr_lvb_inode = NULL;
204 unlock_res(res);
205
206 ldlm_resource_putref(res);
207 return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211 * return 0 not find
212 * 1 find one
213 * < 0 error */
214 int mdc_find_cbdata(struct obd_export *exp,
215 const struct lu_fid *fid,
216 ldlm_iterator_t it, void *data)
217 {
218 struct ldlm_res_id res_id;
219 int rc = 0;
220
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 it, data);
224 if (rc == LDLM_ITER_STOP)
225 return 1;
226 else if (rc == LDLM_ITER_CONTINUE)
227 return 0;
228 return rc;
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 spin_lock(&req->rq_lock);
236 req->rq_replay = 0;
237 spin_unlock(&req->rq_lock);
238 }
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241 LBUG();
242 }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
251 *
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
258 {
259 int rc;
260
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 body->eadatasize);
264 if (rc) {
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
269 }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
276 void *cb_data)
277 {
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 LIST_HEAD(cancels);
282 int count = 0;
283 int mode;
284 int rc;
285
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 if (it->it_flags & FMODE_WRITE)
293 mode = LCK_EX;
294 else
295 mode = LCK_PR;
296 } else {
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298 mode = LCK_CW;
299 #ifdef FMODE_EXEC
300 else if (it->it_flags & FMODE_EXEC)
301 mode = LCK_PR;
302 #endif
303 else
304 mode = LCK_CR;
305 }
306 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
307 &cancels, mode,
308 MDS_INODELOCK_OPEN);
309 }
310
311 /* If CREATE, cancel parent's UPDATE lock. */
312 if (it->it_op & IT_CREAT)
313 mode = LCK_EX;
314 else
315 mode = LCK_CR;
316 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
317 &cancels, mode,
318 MDS_INODELOCK_UPDATE);
319
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
321 &RQF_LDLM_INTENT_OPEN);
322 if (req == NULL) {
323 ldlm_lock_list_put(&cancels, l_bl_ast, count);
324 return ERR_PTR(-ENOMEM);
325 }
326
327 /* parent capability */
328 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
329 /* child capability, reserve the size according to parent capa, it will
330 * be filled after we get the reply */
331 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
332
333 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
334 op_data->op_namelen + 1);
335 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
336 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
337
338 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339 if (rc < 0) {
340 ptlrpc_request_free(req);
341 return ERR_PTR(rc);
342 }
343
344 spin_lock(&req->rq_lock);
345 req->rq_replay = req->rq_import->imp_replayable;
346 spin_unlock(&req->rq_lock);
347
348 /* pack the intent */
349 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
350 lit->opc = (__u64)it->it_op;
351
352 /* pack the intended request */
353 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
354 lmmsize);
355
356 /* for remote client, fetch remote perm for current user */
357 if (client_is_remote(exp))
358 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359 sizeof(struct mdt_remote_perm));
360 ptlrpc_request_set_replen(req);
361 return req;
362 }
363
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366 struct lookup_intent *it,
367 struct md_op_data *op_data)
368 {
369 struct ptlrpc_request *req;
370 struct ldlm_intent *lit;
371 int rc, count = 0, maxdata;
372 LIST_HEAD(cancels);
373
374
375
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377 &RQF_LDLM_INTENT_GETXATTR);
378 if (req == NULL)
379 return ERR_PTR(-ENOMEM);
380
381 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
382
383 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
384 if (rc) {
385 ptlrpc_request_free(req);
386 return ERR_PTR(rc);
387 }
388
389 /* pack the intent */
390 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391 lit->opc = IT_GETXATTR;
392
393 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
394
395 /* pack the intended request */
396 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397 op_data->op_valid, maxdata, -1, 0);
398
399 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400 RCL_SERVER, maxdata);
401
402 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403 RCL_SERVER, maxdata);
404
405 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406 RCL_SERVER, maxdata);
407
408 ptlrpc_request_set_replen(req);
409
410 return req;
411 }
412
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414 struct lookup_intent *it,
415 struct md_op_data *op_data)
416 {
417 struct ptlrpc_request *req;
418 struct obd_device *obddev = class_exp2obd(exp);
419 struct ldlm_intent *lit;
420 int rc;
421
422 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
423 &RQF_LDLM_INTENT_UNLINK);
424 if (req == NULL)
425 return ERR_PTR(-ENOMEM);
426
427 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
428 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
429 op_data->op_namelen + 1);
430
431 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432 if (rc) {
433 ptlrpc_request_free(req);
434 return ERR_PTR(rc);
435 }
436
437 /* pack the intent */
438 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439 lit->opc = (__u64)it->it_op;
440
441 /* pack the intended request */
442 mdc_unlink_pack(req, op_data);
443
444 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
445 obddev->u.cli.cl_max_mds_easize);
446 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
447 obddev->u.cli.cl_max_mds_cookiesize);
448 ptlrpc_request_set_replen(req);
449 return req;
450 }
451
452 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
453 struct lookup_intent *it,
454 struct md_op_data *op_data)
455 {
456 struct ptlrpc_request *req;
457 struct obd_device *obddev = class_exp2obd(exp);
458 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
461 (client_is_remote(exp) ?
462 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
463 struct ldlm_intent *lit;
464 int rc;
465
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_GETATTR);
468 if (req == NULL)
469 return ERR_PTR(-ENOMEM);
470
471 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473 op_data->op_namelen + 1);
474
475 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
476 if (rc) {
477 ptlrpc_request_free(req);
478 return ERR_PTR(rc);
479 }
480
481 /* pack the intent */
482 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483 lit->opc = (__u64)it->it_op;
484
485 /* pack the intended request */
486 mdc_getattr_pack(req, valid, it->it_flags, op_data,
487 obddev->u.cli.cl_max_mds_easize);
488
489 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490 obddev->u.cli.cl_max_mds_easize);
491 if (client_is_remote(exp))
492 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493 sizeof(struct mdt_remote_perm));
494 ptlrpc_request_set_replen(req);
495 return req;
496 }
497
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499 struct lookup_intent *it,
500 struct md_op_data *unused)
501 {
502 struct obd_device *obd = class_exp2obd(exp);
503 struct ptlrpc_request *req;
504 struct ldlm_intent *lit;
505 struct layout_intent *layout;
506 int rc;
507
508 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
509 &RQF_LDLM_INTENT_LAYOUT);
510 if (req == NULL)
511 return ERR_PTR(-ENOMEM);
512
513 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
514 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
515 if (rc) {
516 ptlrpc_request_free(req);
517 return ERR_PTR(rc);
518 }
519
520 /* pack the intent */
521 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
522 lit->opc = (__u64)it->it_op;
523
524 /* pack the layout intent request */
525 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
526 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
527 * set for replication */
528 layout->li_opc = LAYOUT_INTENT_ACCESS;
529
530 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
531 obd->u.cli.cl_max_mds_easize);
532 ptlrpc_request_set_replen(req);
533 return req;
534 }
535
536 static struct ptlrpc_request *
537 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
538 {
539 struct ptlrpc_request *req;
540 int rc;
541
542 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
543 if (req == NULL)
544 return ERR_PTR(-ENOMEM);
545
546 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
547 if (rc) {
548 ptlrpc_request_free(req);
549 return ERR_PTR(rc);
550 }
551
552 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
553 ptlrpc_request_set_replen(req);
554 return req;
555 }
556
557 static int mdc_finish_enqueue(struct obd_export *exp,
558 struct ptlrpc_request *req,
559 struct ldlm_enqueue_info *einfo,
560 struct lookup_intent *it,
561 struct lustre_handle *lockh,
562 int rc)
563 {
564 struct req_capsule *pill = &req->rq_pill;
565 struct ldlm_request *lockreq;
566 struct ldlm_reply *lockrep;
567 struct lustre_intent_data *intent = &it->d.lustre;
568 struct ldlm_lock *lock;
569 void *lvb_data = NULL;
570 int lvb_len = 0;
571
572 LASSERT(rc >= 0);
573 /* Similarly, if we're going to replay this request, we don't want to
574 * actually get a lock, just perform the intent. */
575 if (req->rq_transno || req->rq_replay) {
576 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
577 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
578 }
579
580 if (rc == ELDLM_LOCK_ABORTED) {
581 einfo->ei_mode = 0;
582 memset(lockh, 0, sizeof(*lockh));
583 rc = 0;
584 } else { /* rc = 0 */
585 lock = ldlm_handle2lock(lockh);
586 LASSERT(lock != NULL);
587
588 /* If the server gave us back a different lock mode, we should
589 * fix up our variables. */
590 if (lock->l_req_mode != einfo->ei_mode) {
591 ldlm_lock_addref(lockh, lock->l_req_mode);
592 ldlm_lock_decref(lockh, einfo->ei_mode);
593 einfo->ei_mode = lock->l_req_mode;
594 }
595 LDLM_LOCK_PUT(lock);
596 }
597
598 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
599 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
600
601 intent->it_disposition = (int)lockrep->lock_policy_res1;
602 intent->it_status = (int)lockrep->lock_policy_res2;
603 intent->it_lock_mode = einfo->ei_mode;
604 intent->it_lock_handle = lockh->cookie;
605 intent->it_data = req;
606
607 /* Technically speaking rq_transno must already be zero if
608 * it_status is in error, so the check is a bit redundant */
609 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
610 mdc_clear_replay_flag(req, intent->it_status);
611
612 /* If we're doing an IT_OPEN which did not result in an actual
613 * successful open, then we need to remove the bit which saves
614 * this request for unconditional replay.
615 *
616 * It's important that we do this first! Otherwise we might exit the
617 * function without doing so, and try to replay a failed create
618 * (bug 3440) */
619 if (it->it_op & IT_OPEN && req->rq_replay &&
620 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
621 mdc_clear_replay_flag(req, intent->it_status);
622
623 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
624 it->it_op, intent->it_disposition, intent->it_status);
625
626 /* We know what to expect, so we do any byte flipping required here */
627 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
628 struct mdt_body *body;
629
630 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
631 if (body == NULL) {
632 CERROR ("Can't swab mdt_body\n");
633 return -EPROTO;
634 }
635
636 if (it_disposition(it, DISP_OPEN_OPEN) &&
637 !it_open_error(DISP_OPEN_OPEN, it)) {
638 /*
639 * If this is a successful OPEN request, we need to set
640 * replay handler and data early, so that if replay
641 * happens immediately after swabbing below, new reply
642 * is swabbed by that handler correctly.
643 */
644 mdc_set_open_replay_data(NULL, NULL, it);
645 }
646
647 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
648 void *eadata;
649
650 mdc_update_max_ea_from_body(exp, body);
651
652 /*
653 * The eadata is opaque; just check that it is there.
654 * Eventually, obd_unpackmd() will check the contents.
655 */
656 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
657 body->eadatasize);
658 if (eadata == NULL)
659 return -EPROTO;
660
661 /* save lvb data and length in case this is for layout
662 * lock */
663 lvb_data = eadata;
664 lvb_len = body->eadatasize;
665
666 /*
667 * We save the reply LOV EA in case we have to replay a
668 * create for recovery. If we didn't allocate a large
669 * enough request buffer above we need to reallocate it
670 * here to hold the actual LOV EA.
671 *
672 * To not save LOV EA if request is not going to replay
673 * (for example error one).
674 */
675 if ((it->it_op & IT_OPEN) && req->rq_replay) {
676 void *lmm;
677 if (req_capsule_get_size(pill, &RMF_EADATA,
678 RCL_CLIENT) <
679 body->eadatasize)
680 mdc_realloc_openmsg(req, body);
681 else
682 req_capsule_shrink(pill, &RMF_EADATA,
683 body->eadatasize,
684 RCL_CLIENT);
685
686 req_capsule_set_size(pill, &RMF_EADATA,
687 RCL_CLIENT,
688 body->eadatasize);
689
690 lmm = req_capsule_client_get(pill, &RMF_EADATA);
691 if (lmm)
692 memcpy(lmm, eadata, body->eadatasize);
693 }
694 }
695
696 if (body->valid & OBD_MD_FLRMTPERM) {
697 struct mdt_remote_perm *perm;
698
699 LASSERT(client_is_remote(exp));
700 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
701 lustre_swab_mdt_remote_perm);
702 if (perm == NULL)
703 return -EPROTO;
704 }
705 if (body->valid & OBD_MD_FLMDSCAPA) {
706 struct lustre_capa *capa, *p;
707
708 capa = req_capsule_server_get(pill, &RMF_CAPA1);
709 if (capa == NULL)
710 return -EPROTO;
711
712 if (it->it_op & IT_OPEN) {
713 /* client fid capa will be checked in replay */
714 p = req_capsule_client_get(pill, &RMF_CAPA2);
715 LASSERT(p);
716 *p = *capa;
717 }
718 }
719 if (body->valid & OBD_MD_FLOSSCAPA) {
720 struct lustre_capa *capa;
721
722 capa = req_capsule_server_get(pill, &RMF_CAPA2);
723 if (capa == NULL)
724 return -EPROTO;
725 }
726 } else if (it->it_op & IT_LAYOUT) {
727 /* maybe the lock was granted right away and layout
728 * is packed into RMF_DLM_LVB of req */
729 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
730 if (lvb_len > 0) {
731 lvb_data = req_capsule_server_sized_get(pill,
732 &RMF_DLM_LVB, lvb_len);
733 if (lvb_data == NULL)
734 return -EPROTO;
735 }
736 }
737
738 /* fill in stripe data for layout lock */
739 lock = ldlm_handle2lock(lockh);
740 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
741 void *lmm;
742
743 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
744 ldlm_it2str(it->it_op), lvb_len);
745
746 OBD_ALLOC_LARGE(lmm, lvb_len);
747 if (lmm == NULL) {
748 LDLM_LOCK_PUT(lock);
749 return -ENOMEM;
750 }
751 memcpy(lmm, lvb_data, lvb_len);
752
753 /* install lvb_data */
754 lock_res_and_lock(lock);
755 if (lock->l_lvb_data == NULL) {
756 lock->l_lvb_type = LVB_T_LAYOUT;
757 lock->l_lvb_data = lmm;
758 lock->l_lvb_len = lvb_len;
759 lmm = NULL;
760 }
761 unlock_res_and_lock(lock);
762 if (lmm != NULL)
763 OBD_FREE_LARGE(lmm, lvb_len);
764 }
765 if (lock != NULL)
766 LDLM_LOCK_PUT(lock);
767
768 return rc;
769 }
770
771 /* We always reserve enough space in the reply packet for a stripe MD, because
772 * we don't know in advance the file type. */
773 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
774 struct lookup_intent *it, struct md_op_data *op_data,
775 struct lustre_handle *lockh, void *lmm, int lmmsize,
776 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
777 {
778 struct obd_device *obddev = class_exp2obd(exp);
779 struct ptlrpc_request *req = NULL;
780 __u64 flags, saved_flags = extra_lock_flags;
781 int rc;
782 struct ldlm_res_id res_id;
783 static const ldlm_policy_data_t lookup_policy =
784 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
785 static const ldlm_policy_data_t update_policy =
786 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
787 static const ldlm_policy_data_t layout_policy =
788 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
789 static const ldlm_policy_data_t getxattr_policy = {
790 .l_inodebits = { MDS_INODELOCK_XATTR } };
791 ldlm_policy_data_t const *policy = &lookup_policy;
792 int generation, resends = 0;
793 struct ldlm_reply *lockrep;
794 enum lvb_type lvb_type = 0;
795
796 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
797 einfo->ei_type);
798
799 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
800
801 if (it) {
802 saved_flags |= LDLM_FL_HAS_INTENT;
803 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
804 policy = &update_policy;
805 else if (it->it_op & IT_LAYOUT)
806 policy = &layout_policy;
807 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
808 policy = &getxattr_policy;
809 }
810
811 LASSERT(reqp == NULL);
812
813 generation = obddev->u.cli.cl_import->imp_generation;
814 resend:
815 flags = saved_flags;
816 if (!it) {
817 /* The only way right now is FLOCK, in this case we hide flock
818 policy as lmm, but lmmsize is 0 */
819 LASSERT(lmm && lmmsize == 0);
820 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
821 einfo->ei_type);
822 policy = (ldlm_policy_data_t *)lmm;
823 res_id.name[3] = LDLM_FLOCK;
824 } else if (it->it_op & IT_OPEN) {
825 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
826 einfo->ei_cbdata);
827 policy = &update_policy;
828 einfo->ei_cbdata = NULL;
829 lmm = NULL;
830 } else if (it->it_op & IT_UNLINK) {
831 req = mdc_intent_unlink_pack(exp, it, op_data);
832 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
833 req = mdc_intent_getattr_pack(exp, it, op_data);
834 } else if (it->it_op & IT_READDIR) {
835 req = mdc_enqueue_pack(exp, 0);
836 } else if (it->it_op & IT_LAYOUT) {
837 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
838 return -EOPNOTSUPP;
839 req = mdc_intent_layout_pack(exp, it, op_data);
840 lvb_type = LVB_T_LAYOUT;
841 } else if (it->it_op & IT_GETXATTR) {
842 req = mdc_intent_getxattr_pack(exp, it, op_data);
843 } else {
844 LBUG();
845 return -EINVAL;
846 }
847
848 if (IS_ERR(req))
849 return PTR_ERR(req);
850
851 if (req != NULL && it && it->it_op & IT_CREAT)
852 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
853 * retry logic */
854 req->rq_no_retry_einprogress = 1;
855
856 if (resends) {
857 req->rq_generation_set = 1;
858 req->rq_import_generation = generation;
859 req->rq_sent = cfs_time_current_sec() + resends;
860 }
861
862 /* It is important to obtain rpc_lock first (if applicable), so that
863 * threads that are serialised with rpc_lock are not polluting our
864 * rpcs in flight counter. We do not do flock request limiting, though*/
865 if (it) {
866 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867 rc = mdc_enter_request(&obddev->u.cli);
868 if (rc != 0) {
869 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
870 mdc_clear_replay_flag(req, 0);
871 ptlrpc_req_finished(req);
872 return rc;
873 }
874 }
875
876 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877 0, lvb_type, lockh, 0);
878 if (!it) {
879 /* For flock requests we immediatelly return without further
880 delay and let caller deal with the rest, since rest of
881 this function metadata processing makes no sense for flock
882 requests anyway. But in case of problem during comms with
883 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884 can not rely on caller and this mainly for F_UNLCKs
885 (explicits or automatically generated by Kernel to clean
886 current FLocks upon exit) that can't be trashed */
887 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
888 goto resend;
889 return rc;
890 }
891
892 mdc_exit_request(&obddev->u.cli);
893 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
894
895 if (rc < 0) {
896 CERROR("ldlm_cli_enqueue: %d\n", rc);
897 mdc_clear_replay_flag(req, rc);
898 ptlrpc_req_finished(req);
899 return rc;
900 }
901
902 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
903 LASSERT(lockrep != NULL);
904
905 lockrep->lock_policy_res2 =
906 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
907
908 /* Retry the create infinitely when we get -EINPROGRESS from
909 * server. This is required by the new quota design. */
910 if (it && it->it_op & IT_CREAT &&
911 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
912 mdc_clear_replay_flag(req, rc);
913 ptlrpc_req_finished(req);
914 resends++;
915
916 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
917 obddev->obd_name, resends, it->it_op,
918 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
919
920 if (generation == obddev->u.cli.cl_import->imp_generation) {
921 goto resend;
922 } else {
923 CDEBUG(D_HA, "resend cross eviction\n");
924 return -EIO;
925 }
926 }
927
928 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
929 if (rc < 0) {
930 if (lustre_handle_is_used(lockh)) {
931 ldlm_lock_decref(lockh, einfo->ei_mode);
932 memset(lockh, 0, sizeof(*lockh));
933 }
934 ptlrpc_req_finished(req);
935 }
936 return rc;
937 }
938
939 static int mdc_finish_intent_lock(struct obd_export *exp,
940 struct ptlrpc_request *request,
941 struct md_op_data *op_data,
942 struct lookup_intent *it,
943 struct lustre_handle *lockh)
944 {
945 struct lustre_handle old_lock;
946 struct mdt_body *mdt_body;
947 struct ldlm_lock *lock;
948 int rc;
949
950 LASSERT(request != NULL);
951 LASSERT(request != LP_POISON);
952 LASSERT(request->rq_repmsg != LP_POISON);
953
954 if (!it_disposition(it, DISP_IT_EXECD)) {
955 /* The server failed before it even started executing the
956 * intent, i.e. because it couldn't unpack the request. */
957 LASSERT(it->d.lustre.it_status != 0);
958 return it->d.lustre.it_status;
959 }
960 rc = it_open_error(DISP_IT_EXECD, it);
961 if (rc)
962 return rc;
963
964 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
965 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
966
967 /* If we were revalidating a fid/name pair, mark the intent in
968 * case we fail and get called again from lookup */
969 if (fid_is_sane(&op_data->op_fid2) &&
970 it->it_create_mode & M_CHECK_STALE &&
971 it->it_op != IT_GETATTR) {
972
973 /* Also: did we find the same inode? */
974 /* sever can return one of two fids:
975 * op_fid2 - new allocated fid - if file is created.
976 * op_fid3 - existent fid - if file only open.
977 * op_fid3 is saved in lmv_intent_open */
978 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
979 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
980 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
981 "\n", PFID(&op_data->op_fid2),
982 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
983 return -ESTALE;
984 }
985 }
986
987 rc = it_open_error(DISP_LOOKUP_EXECD, it);
988 if (rc)
989 return rc;
990
991 /* keep requests around for the multiple phases of the call
992 * this shows the DISP_XX must guarantee we make it into the call
993 */
994 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
995 it_disposition(it, DISP_OPEN_CREATE) &&
996 !it_open_error(DISP_OPEN_CREATE, it)) {
997 it_set_disposition(it, DISP_ENQ_CREATE_REF);
998 ptlrpc_request_addref(request); /* balanced in ll_create_node */
999 }
1000 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1001 it_disposition(it, DISP_OPEN_OPEN) &&
1002 !it_open_error(DISP_OPEN_OPEN, it)) {
1003 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1004 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1005 /* BUG 11546 - eviction in the middle of open rpc processing */
1006 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1007 }
1008
1009 if (it->it_op & IT_CREAT) {
1010 /* XXX this belongs in ll_create_it */
1011 } else if (it->it_op == IT_OPEN) {
1012 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1013 } else {
1014 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1015 }
1016
1017 /* If we already have a matching lock, then cancel the new
1018 * one. We have to set the data here instead of in
1019 * mdc_enqueue, because we need to use the child's inode as
1020 * the l_ast_data to match, and that's not available until
1021 * intent_finish has performed the iget().) */
1022 lock = ldlm_handle2lock(lockh);
1023 if (lock) {
1024 ldlm_policy_data_t policy = lock->l_policy_data;
1025 LDLM_DEBUG(lock, "matching against this");
1026
1027 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1028 &lock->l_resource->lr_name),
1029 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1030 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1031 LDLM_LOCK_PUT(lock);
1032
1033 memcpy(&old_lock, lockh, sizeof(*lockh));
1034 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1035 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1036 ldlm_lock_decref_and_cancel(lockh,
1037 it->d.lustre.it_lock_mode);
1038 memcpy(lockh, &old_lock, sizeof(old_lock));
1039 it->d.lustre.it_lock_handle = lockh->cookie;
1040 }
1041 }
1042 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1043 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1044 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1045 return rc;
1046 }
1047
1048 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1049 struct lu_fid *fid, __u64 *bits)
1050 {
1051 /* We could just return 1 immediately, but since we should only
1052 * be called in revalidate_it if we already have a lock, let's
1053 * verify that. */
1054 struct ldlm_res_id res_id;
1055 struct lustre_handle lockh;
1056 ldlm_policy_data_t policy;
1057 ldlm_mode_t mode;
1058
1059 if (it->d.lustre.it_lock_handle) {
1060 lockh.cookie = it->d.lustre.it_lock_handle;
1061 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1062 } else {
1063 fid_build_reg_res_name(fid, &res_id);
1064 switch (it->it_op) {
1065 case IT_GETATTR:
1066 /* File attributes are held under multiple bits:
1067 * nlink is under lookup lock, size and times are
1068 * under UPDATE lock and recently we've also got
1069 * a separate permissions lock for owner/group/acl that
1070 * were protected by lookup lock before.
1071 * Getattr must provide all of that information,
1072 * so we need to ensure we have all of those locks.
1073 * Unfortunately, if the bits are split across multiple
1074 * locks, there's no easy way to match all of them here,
1075 * so an extra RPC would be performed to fetch all
1076 * of those bits at once for now. */
1077 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1078 * but for old MDTs (< 2.4), permission is covered
1079 * by LOOKUP lock, so it needs to match all bits here.*/
1080 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1081 MDS_INODELOCK_LOOKUP |
1082 MDS_INODELOCK_PERM;
1083 break;
1084 case IT_LAYOUT:
1085 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1086 break;
1087 default:
1088 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1089 break;
1090 }
1091
1092 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1093 LDLM_IBITS, &policy,
1094 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1095 &lockh);
1096 }
1097
1098 if (mode) {
1099 it->d.lustre.it_lock_handle = lockh.cookie;
1100 it->d.lustre.it_lock_mode = mode;
1101 } else {
1102 it->d.lustre.it_lock_handle = 0;
1103 it->d.lustre.it_lock_mode = 0;
1104 }
1105
1106 return !!mode;
1107 }
1108
1109 /*
1110 * This long block is all about fixing up the lock and request state
1111 * so that it is correct as of the moment _before_ the operation was
1112 * applied; that way, the VFS will think that everything is normal and
1113 * call Lustre's regular VFS methods.
1114 *
1115 * If we're performing a creation, that means that unless the creation
1116 * failed with EEXIST, we should fake up a negative dentry.
1117 *
1118 * For everything else, we want to lookup to succeed.
1119 *
1120 * One additional note: if CREATE or OPEN succeeded, we add an extra
1121 * reference to the request because we need to keep it around until
1122 * ll_create/ll_open gets called.
1123 *
1124 * The server will return to us, in it_disposition, an indication of
1125 * exactly what d.lustre.it_status refers to.
1126 *
1127 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1128 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1129 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1130 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1131 * was successful.
1132 *
1133 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1134 * child lookup.
1135 */
1136 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1137 void *lmm, int lmmsize, struct lookup_intent *it,
1138 int lookup_flags, struct ptlrpc_request **reqp,
1139 ldlm_blocking_callback cb_blocking,
1140 __u64 extra_lock_flags)
1141 {
1142 struct ldlm_enqueue_info einfo = {
1143 .ei_type = LDLM_IBITS,
1144 .ei_mode = it_to_lock_mode(it),
1145 .ei_cb_bl = cb_blocking,
1146 .ei_cb_cp = ldlm_completion_ast,
1147 };
1148 struct lustre_handle lockh;
1149 int rc = 0;
1150
1151 LASSERT(it);
1152
1153 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1154 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1155 op_data->op_name, PFID(&op_data->op_fid2),
1156 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1157 it->it_flags);
1158
1159 lockh.cookie = 0;
1160 if (fid_is_sane(&op_data->op_fid2) &&
1161 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1162 /* We could just return 1 immediately, but since we should only
1163 * be called in revalidate_it if we already have a lock, let's
1164 * verify that. */
1165 it->d.lustre.it_lock_handle = 0;
1166 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1167 /* Only return failure if it was not GETATTR by cfid
1168 (from inode_revalidate) */
1169 if (rc || op_data->op_namelen != 0)
1170 return rc;
1171 }
1172
1173 /* For case if upper layer did not alloc fid, do it now. */
1174 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1175 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1176 if (rc < 0) {
1177 CERROR("Can't alloc new fid, rc %d\n", rc);
1178 return rc;
1179 }
1180 }
1181 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1182 extra_lock_flags);
1183 if (rc < 0)
1184 return rc;
1185
1186 *reqp = it->d.lustre.it_data;
1187 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1188 return rc;
1189 }
1190
1191 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1192 struct ptlrpc_request *req,
1193 void *args, int rc)
1194 {
1195 struct mdc_getattr_args *ga = args;
1196 struct obd_export *exp = ga->ga_exp;
1197 struct md_enqueue_info *minfo = ga->ga_minfo;
1198 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1199 struct lookup_intent *it;
1200 struct lustre_handle *lockh;
1201 struct obd_device *obddev;
1202 struct ldlm_reply *lockrep;
1203 __u64 flags = LDLM_FL_HAS_INTENT;
1204
1205 it = &minfo->mi_it;
1206 lockh = &minfo->mi_lockh;
1207
1208 obddev = class_exp2obd(exp);
1209
1210 mdc_exit_request(&obddev->u.cli);
1211 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1212 rc = -ETIMEDOUT;
1213
1214 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1215 &flags, NULL, 0, lockh, rc);
1216 if (rc < 0) {
1217 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1218 mdc_clear_replay_flag(req, rc);
1219 GOTO(out, rc);
1220 }
1221
1222 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1223 LASSERT(lockrep != NULL);
1224
1225 lockrep->lock_policy_res2 =
1226 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1227
1228 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1229 if (rc)
1230 GOTO(out, rc);
1231
1232 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1233
1234 out:
1235 OBD_FREE_PTR(einfo);
1236 minfo->mi_cb(req, minfo, rc);
1237 return 0;
1238 }
1239
1240 int mdc_intent_getattr_async(struct obd_export *exp,
1241 struct md_enqueue_info *minfo,
1242 struct ldlm_enqueue_info *einfo)
1243 {
1244 struct md_op_data *op_data = &minfo->mi_data;
1245 struct lookup_intent *it = &minfo->mi_it;
1246 struct ptlrpc_request *req;
1247 struct mdc_getattr_args *ga;
1248 struct obd_device *obddev = class_exp2obd(exp);
1249 struct ldlm_res_id res_id;
1250 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1251 * for statahead currently. Consider CMD in future, such two bits
1252 * maybe managed by different MDS, should be adjusted then. */
1253 ldlm_policy_data_t policy = {
1254 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1255 MDS_INODELOCK_UPDATE }
1256 };
1257 int rc = 0;
1258 __u64 flags = LDLM_FL_HAS_INTENT;
1259
1260 CDEBUG(D_DLMTRACE,
1261 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1262 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1263 ldlm_it2str(it->it_op), it->it_flags);
1264
1265 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1266 req = mdc_intent_getattr_pack(exp, it, op_data);
1267 if (IS_ERR(req))
1268 return PTR_ERR(req);
1269
1270 rc = mdc_enter_request(&obddev->u.cli);
1271 if (rc != 0) {
1272 ptlrpc_req_finished(req);
1273 return rc;
1274 }
1275
1276 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1277 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1278 if (rc < 0) {
1279 mdc_exit_request(&obddev->u.cli);
1280 ptlrpc_req_finished(req);
1281 return rc;
1282 }
1283
1284 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1285 ga = ptlrpc_req_async_args(req);
1286 ga->ga_exp = exp;
1287 ga->ga_minfo = minfo;
1288 ga->ga_einfo = einfo;
1289
1290 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1291 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1292
1293 return 0;
1294 }
This page took 0.055809 seconds and 4 git commands to generate.