staging: delete non-required instances of include <linux/init.h>
[deliverable/linux.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_MDC
38
39# include <linux/module.h>
40# include <linux/pagemap.h>
41# include <linux/miscdevice.h>
d7e09d03
PT
42
43#include <lustre_acl.h>
44#include <obd_class.h>
45#include <lustre_dlm.h>
46/* fid_res_name_eq() */
47#include <lustre_fid.h>
48#include <lprocfs_status.h>
49#include "mdc_internal.h"
50
51struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
55};
56
57int it_disposition(struct lookup_intent *it, int flag)
58{
59 return it->d.lustre.it_disposition & flag;
60}
61EXPORT_SYMBOL(it_disposition);
62
63void it_set_disposition(struct lookup_intent *it, int flag)
64{
65 it->d.lustre.it_disposition |= flag;
66}
67EXPORT_SYMBOL(it_set_disposition);
68
69void it_clear_disposition(struct lookup_intent *it, int flag)
70{
71 it->d.lustre.it_disposition &= ~flag;
72}
73EXPORT_SYMBOL(it_clear_disposition);
74
75int it_open_error(int phase, struct lookup_intent *it)
76{
d3a8a4e2
JX
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
80 else
81 return 0;
82 }
d7e09d03
PT
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
86 else
87 return 0;
88 }
89
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
93 else
94 return 0;
95 }
96
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
100 else
101 return 0;
102 }
103
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
107 else
108 return 0;
109 }
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
112 LBUG();
113 return 0;
114}
115EXPORT_SYMBOL(it_open_error);
116
117/* this must be called on a lockh that is known to have a referenced lock */
118int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 __u64 *bits)
120{
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
d7e09d03
PT
123
124 if(bits)
125 *bits = 0;
126
127 if (!*lockh)
0a3bdb00 128 return 0;
d7e09d03
PT
129
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
141 old_inode->i_state,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
143 }
144 lock->l_resource->lr_lvb_inode = new_inode;
145 if (bits)
146 *bits = lock->l_policy_data.l_inodebits.bits;
147
148 unlock_res_and_lock(lock);
149 LDLM_LOCK_PUT(lock);
150
0a3bdb00 151 return 0;
d7e09d03
PT
152}
153
154ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
158{
159 struct ldlm_res_id res_id;
160 ldlm_mode_t rc;
d7e09d03
PT
161
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
164 &res_id, type, policy, mode, lockh, 0);
0a3bdb00 165 return rc;
d7e09d03
PT
166}
167
168int mdc_cancel_unused(struct obd_export *exp,
169 const struct lu_fid *fid,
170 ldlm_policy_data_t *policy,
171 ldlm_mode_t mode,
172 ldlm_cancel_flags_t flags,
173 void *opaque)
174{
175 struct ldlm_res_id res_id;
176 struct obd_device *obd = class_exp2obd(exp);
177 int rc;
178
d7e09d03
PT
179 fid_build_reg_res_name(fid, &res_id);
180 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
181 policy, mode, flags, opaque);
0a3bdb00 182 return rc;
d7e09d03
PT
183}
184
185int mdc_null_inode(struct obd_export *exp,
186 const struct lu_fid *fid)
187{
188 struct ldlm_res_id res_id;
189 struct ldlm_resource *res;
190 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
d7e09d03
PT
191
192 LASSERTF(ns != NULL, "no namespace passed\n");
193
194 fid_build_reg_res_name(fid, &res_id);
195
196 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
197 if(res == NULL)
0a3bdb00 198 return 0;
d7e09d03
PT
199
200 lock_res(res);
201 res->lr_lvb_inode = NULL;
202 unlock_res(res);
203
204 ldlm_resource_putref(res);
0a3bdb00 205 return 0;
d7e09d03
PT
206}
207
208/* find any ldlm lock of the inode in mdc
209 * return 0 not find
210 * 1 find one
211 * < 0 error */
212int mdc_find_cbdata(struct obd_export *exp,
213 const struct lu_fid *fid,
214 ldlm_iterator_t it, void *data)
215{
216 struct ldlm_res_id res_id;
217 int rc = 0;
d7e09d03
PT
218
219 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
220 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
221 it, data);
222 if (rc == LDLM_ITER_STOP)
0a3bdb00 223 return 1;
d7e09d03 224 else if (rc == LDLM_ITER_CONTINUE)
0a3bdb00
GKH
225 return 0;
226 return rc;
d7e09d03
PT
227}
228
229static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
230{
231 /* Don't hold error requests for replay. */
232 if (req->rq_replay) {
233 spin_lock(&req->rq_lock);
234 req->rq_replay = 0;
235 spin_unlock(&req->rq_lock);
236 }
237 if (rc && req->rq_transno != 0) {
238 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 LBUG();
240 }
241}
242
243/* Save a large LOV EA into the request buffer so that it is available
244 * for replay. We don't do this in the initial request because the
245 * original request doesn't need this buffer (at most it sends just the
246 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
247 * buffer and may also be difficult to allocate and save a very large
248 * request buffer for each open. (bug 5707)
249 *
250 * OOM here may cause recovery failure if lmm is needed (only for the
251 * original open if the MDS crashed just when this client also OOM'd)
252 * but this is incredibly unlikely, and questionable whether the client
253 * could do MDS recovery under OOM anyways... */
254static void mdc_realloc_openmsg(struct ptlrpc_request *req,
255 struct mdt_body *body)
256{
257 int rc;
258
259 /* FIXME: remove this explicit offset. */
260 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
261 body->eadatasize);
262 if (rc) {
263 CERROR("Can't enlarge segment %d size to %d\n",
264 DLM_INTENT_REC_OFF + 4, body->eadatasize);
265 body->valid &= ~OBD_MD_FLEASIZE;
266 body->eadatasize = 0;
267 }
268}
269
270static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
271 struct lookup_intent *it,
272 struct md_op_data *op_data,
273 void *lmm, int lmmsize,
274 void *cb_data)
275{
276 struct ptlrpc_request *req;
277 struct obd_device *obddev = class_exp2obd(exp);
278 struct ldlm_intent *lit;
279 LIST_HEAD(cancels);
280 int count = 0;
281 int mode;
282 int rc;
d7e09d03
PT
283
284 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
285
286 /* XXX: openlock is not cancelled for cross-refs. */
287 /* If inode is known, cancel conflicting OPEN locks. */
288 if (fid_is_sane(&op_data->op_fid2)) {
d3a8a4e2
JX
289 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
290 if (it->it_flags & FMODE_WRITE)
291 mode = LCK_EX;
292 else
293 mode = LCK_PR;
294 } else {
295 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
296 mode = LCK_CW;
d7e09d03 297#ifdef FMODE_EXEC
d3a8a4e2
JX
298 else if (it->it_flags & FMODE_EXEC)
299 mode = LCK_PR;
d7e09d03 300#endif
d3a8a4e2
JX
301 else
302 mode = LCK_CR;
303 }
d7e09d03
PT
304 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 &cancels, mode,
306 MDS_INODELOCK_OPEN);
307 }
308
309 /* If CREATE, cancel parent's UPDATE lock. */
310 if (it->it_op & IT_CREAT)
311 mode = LCK_EX;
312 else
313 mode = LCK_CR;
314 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 &cancels, mode,
316 MDS_INODELOCK_UPDATE);
317
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319 &RQF_LDLM_INTENT_OPEN);
320 if (req == NULL) {
321 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 322 return ERR_PTR(-ENOMEM);
d7e09d03
PT
323 }
324
325 /* parent capability */
326 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
327 /* child capability, reserve the size according to parent capa, it will
328 * be filled after we get the reply */
329 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
330
331 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
332 op_data->op_namelen + 1);
333 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
334 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
335
336 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
337 if (rc) {
338 ptlrpc_request_free(req);
339 return NULL;
340 }
341
342 spin_lock(&req->rq_lock);
343 req->rq_replay = req->rq_import->imp_replayable;
344 spin_unlock(&req->rq_lock);
345
346 /* pack the intent */
347 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348 lit->opc = (__u64)it->it_op;
349
350 /* pack the intended request */
351 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
352 lmmsize);
353
354 /* for remote client, fetch remote perm for current user */
355 if (client_is_remote(exp))
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 sizeof(struct mdt_remote_perm));
358 ptlrpc_request_set_replen(req);
359 return req;
360}
361
7fc1f831
AP
362static struct ptlrpc_request *
363mdc_intent_getxattr_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
366{
367 struct ptlrpc_request *req;
368 struct ldlm_intent *lit;
369 int rc, count = 0, maxdata;
370 LIST_HEAD(cancels);
371
372
373
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 &RQF_LDLM_INTENT_GETXATTR);
376 if (req == NULL)
377 return ERR_PTR(-ENOMEM);
378
379 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380
381 if (it->it_op == IT_SETXATTR)
382 /* If we want to upgrade to LCK_PW, let's cancel LCK_PR
383 * locks now. This avoids unnecessary ASTs. */
384 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
385 &cancels, LCK_PW,
386 MDS_INODELOCK_XATTR);
387
388 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
389 if (rc) {
390 ptlrpc_request_free(req);
391 return ERR_PTR(rc);
392 }
393
394 /* pack the intent */
395 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
396 lit->opc = IT_GETXATTR;
397
398 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
399
400 /* pack the intended request */
401 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
402 op_data->op_valid, maxdata, -1, 0);
403
404 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
405 RCL_SERVER, maxdata);
406
407 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
408 RCL_SERVER, maxdata);
409
410 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
411 RCL_SERVER, maxdata);
412
413 ptlrpc_request_set_replen(req);
414
415 return req;
416}
417
d7e09d03
PT
418static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
419 struct lookup_intent *it,
420 struct md_op_data *op_data)
421{
422 struct ptlrpc_request *req;
423 struct obd_device *obddev = class_exp2obd(exp);
424 struct ldlm_intent *lit;
425 int rc;
d7e09d03
PT
426
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
428 &RQF_LDLM_INTENT_UNLINK);
429 if (req == NULL)
0a3bdb00 430 return ERR_PTR(-ENOMEM);
d7e09d03
PT
431
432 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
433 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
434 op_data->op_namelen + 1);
435
436 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
437 if (rc) {
438 ptlrpc_request_free(req);
0a3bdb00 439 return ERR_PTR(rc);
d7e09d03
PT
440 }
441
442 /* pack the intent */
443 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
444 lit->opc = (__u64)it->it_op;
445
446 /* pack the intended request */
447 mdc_unlink_pack(req, op_data);
448
449 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
450 obddev->u.cli.cl_max_mds_easize);
451 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
452 obddev->u.cli.cl_max_mds_cookiesize);
453 ptlrpc_request_set_replen(req);
0a3bdb00 454 return req;
d7e09d03
PT
455}
456
457static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
458 struct lookup_intent *it,
459 struct md_op_data *op_data)
460{
461 struct ptlrpc_request *req;
462 struct obd_device *obddev = class_exp2obd(exp);
463 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
464 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
465 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
466 (client_is_remote(exp) ?
467 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
468 struct ldlm_intent *lit;
469 int rc;
d7e09d03
PT
470
471 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
472 &RQF_LDLM_INTENT_GETATTR);
473 if (req == NULL)
0a3bdb00 474 return ERR_PTR(-ENOMEM);
d7e09d03
PT
475
476 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
477 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
478 op_data->op_namelen + 1);
479
480 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
481 if (rc) {
482 ptlrpc_request_free(req);
0a3bdb00 483 return ERR_PTR(rc);
d7e09d03
PT
484 }
485
486 /* pack the intent */
487 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
488 lit->opc = (__u64)it->it_op;
489
490 /* pack the intended request */
491 mdc_getattr_pack(req, valid, it->it_flags, op_data,
492 obddev->u.cli.cl_max_mds_easize);
493
494 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
495 obddev->u.cli.cl_max_mds_easize);
496 if (client_is_remote(exp))
497 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
498 sizeof(struct mdt_remote_perm));
499 ptlrpc_request_set_replen(req);
0a3bdb00 500 return req;
d7e09d03
PT
501}
502
503static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
504 struct lookup_intent *it,
505 struct md_op_data *unused)
506{
507 struct obd_device *obd = class_exp2obd(exp);
508 struct ptlrpc_request *req;
509 struct ldlm_intent *lit;
510 struct layout_intent *layout;
511 int rc;
d7e09d03
PT
512
513 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
514 &RQF_LDLM_INTENT_LAYOUT);
515 if (req == NULL)
0a3bdb00 516 return ERR_PTR(-ENOMEM);
d7e09d03
PT
517
518 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
519 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
520 if (rc) {
521 ptlrpc_request_free(req);
0a3bdb00 522 return ERR_PTR(rc);
d7e09d03
PT
523 }
524
525 /* pack the intent */
526 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
527 lit->opc = (__u64)it->it_op;
528
529 /* pack the layout intent request */
530 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
531 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
532 * set for replication */
533 layout->li_opc = LAYOUT_INTENT_ACCESS;
534
535 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
536 obd->u.cli.cl_max_mds_easize);
537 ptlrpc_request_set_replen(req);
0a3bdb00 538 return req;
d7e09d03
PT
539}
540
541static struct ptlrpc_request *
542mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
543{
544 struct ptlrpc_request *req;
545 int rc;
d7e09d03
PT
546
547 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
548 if (req == NULL)
0a3bdb00 549 return ERR_PTR(-ENOMEM);
d7e09d03
PT
550
551 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
552 if (rc) {
553 ptlrpc_request_free(req);
0a3bdb00 554 return ERR_PTR(rc);
d7e09d03
PT
555 }
556
557 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
558 ptlrpc_request_set_replen(req);
0a3bdb00 559 return req;
d7e09d03
PT
560}
561
562static int mdc_finish_enqueue(struct obd_export *exp,
563 struct ptlrpc_request *req,
564 struct ldlm_enqueue_info *einfo,
565 struct lookup_intent *it,
566 struct lustre_handle *lockh,
567 int rc)
568{
569 struct req_capsule *pill = &req->rq_pill;
570 struct ldlm_request *lockreq;
571 struct ldlm_reply *lockrep;
572 struct lustre_intent_data *intent = &it->d.lustre;
573 struct ldlm_lock *lock;
574 void *lvb_data = NULL;
575 int lvb_len = 0;
d7e09d03
PT
576
577 LASSERT(rc >= 0);
578 /* Similarly, if we're going to replay this request, we don't want to
579 * actually get a lock, just perform the intent. */
580 if (req->rq_transno || req->rq_replay) {
581 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
582 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
583 }
584
585 if (rc == ELDLM_LOCK_ABORTED) {
586 einfo->ei_mode = 0;
587 memset(lockh, 0, sizeof(*lockh));
588 rc = 0;
589 } else { /* rc = 0 */
590 lock = ldlm_handle2lock(lockh);
591 LASSERT(lock != NULL);
592
593 /* If the server gave us back a different lock mode, we should
594 * fix up our variables. */
595 if (lock->l_req_mode != einfo->ei_mode) {
596 ldlm_lock_addref(lockh, lock->l_req_mode);
597 ldlm_lock_decref(lockh, einfo->ei_mode);
598 einfo->ei_mode = lock->l_req_mode;
599 }
600 LDLM_LOCK_PUT(lock);
601 }
602
603 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
604 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
605
606 intent->it_disposition = (int)lockrep->lock_policy_res1;
607 intent->it_status = (int)lockrep->lock_policy_res2;
608 intent->it_lock_mode = einfo->ei_mode;
609 intent->it_lock_handle = lockh->cookie;
610 intent->it_data = req;
611
612 /* Technically speaking rq_transno must already be zero if
613 * it_status is in error, so the check is a bit redundant */
614 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
615 mdc_clear_replay_flag(req, intent->it_status);
616
617 /* If we're doing an IT_OPEN which did not result in an actual
618 * successful open, then we need to remove the bit which saves
619 * this request for unconditional replay.
620 *
621 * It's important that we do this first! Otherwise we might exit the
622 * function without doing so, and try to replay a failed create
623 * (bug 3440) */
624 if (it->it_op & IT_OPEN && req->rq_replay &&
625 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
626 mdc_clear_replay_flag(req, intent->it_status);
627
628 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
629 it->it_op, intent->it_disposition, intent->it_status);
630
631 /* We know what to expect, so we do any byte flipping required here */
632 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
633 struct mdt_body *body;
634
635 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
636 if (body == NULL) {
637 CERROR ("Can't swab mdt_body\n");
0a3bdb00 638 return -EPROTO;
d7e09d03
PT
639 }
640
641 if (it_disposition(it, DISP_OPEN_OPEN) &&
642 !it_open_error(DISP_OPEN_OPEN, it)) {
643 /*
644 * If this is a successful OPEN request, we need to set
645 * replay handler and data early, so that if replay
646 * happens immediately after swabbing below, new reply
647 * is swabbed by that handler correctly.
648 */
649 mdc_set_open_replay_data(NULL, NULL, req);
650 }
651
652 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
653 void *eadata;
654
655 mdc_update_max_ea_from_body(exp, body);
656
657 /*
658 * The eadata is opaque; just check that it is there.
659 * Eventually, obd_unpackmd() will check the contents.
660 */
661 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
662 body->eadatasize);
663 if (eadata == NULL)
0a3bdb00 664 return -EPROTO;
d7e09d03
PT
665
666 /* save lvb data and length in case this is for layout
667 * lock */
668 lvb_data = eadata;
669 lvb_len = body->eadatasize;
670
671 /*
672 * We save the reply LOV EA in case we have to replay a
673 * create for recovery. If we didn't allocate a large
674 * enough request buffer above we need to reallocate it
675 * here to hold the actual LOV EA.
676 *
677 * To not save LOV EA if request is not going to replay
678 * (for example error one).
679 */
680 if ((it->it_op & IT_OPEN) && req->rq_replay) {
681 void *lmm;
682 if (req_capsule_get_size(pill, &RMF_EADATA,
683 RCL_CLIENT) <
684 body->eadatasize)
685 mdc_realloc_openmsg(req, body);
686 else
687 req_capsule_shrink(pill, &RMF_EADATA,
688 body->eadatasize,
689 RCL_CLIENT);
690
691 req_capsule_set_size(pill, &RMF_EADATA,
692 RCL_CLIENT,
693 body->eadatasize);
694
695 lmm = req_capsule_client_get(pill, &RMF_EADATA);
696 if (lmm)
697 memcpy(lmm, eadata, body->eadatasize);
698 }
699 }
700
701 if (body->valid & OBD_MD_FLRMTPERM) {
702 struct mdt_remote_perm *perm;
703
704 LASSERT(client_is_remote(exp));
705 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
706 lustre_swab_mdt_remote_perm);
707 if (perm == NULL)
0a3bdb00 708 return -EPROTO;
d7e09d03
PT
709 }
710 if (body->valid & OBD_MD_FLMDSCAPA) {
711 struct lustre_capa *capa, *p;
712
713 capa = req_capsule_server_get(pill, &RMF_CAPA1);
714 if (capa == NULL)
0a3bdb00 715 return -EPROTO;
d7e09d03
PT
716
717 if (it->it_op & IT_OPEN) {
718 /* client fid capa will be checked in replay */
719 p = req_capsule_client_get(pill, &RMF_CAPA2);
720 LASSERT(p);
721 *p = *capa;
722 }
723 }
724 if (body->valid & OBD_MD_FLOSSCAPA) {
725 struct lustre_capa *capa;
726
727 capa = req_capsule_server_get(pill, &RMF_CAPA2);
728 if (capa == NULL)
0a3bdb00 729 return -EPROTO;
d7e09d03
PT
730 }
731 } else if (it->it_op & IT_LAYOUT) {
732 /* maybe the lock was granted right away and layout
733 * is packed into RMF_DLM_LVB of req */
734 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
735 if (lvb_len > 0) {
736 lvb_data = req_capsule_server_sized_get(pill,
737 &RMF_DLM_LVB, lvb_len);
738 if (lvb_data == NULL)
0a3bdb00 739 return -EPROTO;
d7e09d03
PT
740 }
741 }
742
743 /* fill in stripe data for layout lock */
744 lock = ldlm_handle2lock(lockh);
745 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
746 void *lmm;
747
748 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
749 ldlm_it2str(it->it_op), lvb_len);
750
751 OBD_ALLOC_LARGE(lmm, lvb_len);
752 if (lmm == NULL) {
753 LDLM_LOCK_PUT(lock);
0a3bdb00 754 return -ENOMEM;
d7e09d03
PT
755 }
756 memcpy(lmm, lvb_data, lvb_len);
757
758 /* install lvb_data */
759 lock_res_and_lock(lock);
760 if (lock->l_lvb_data == NULL) {
761 lock->l_lvb_data = lmm;
762 lock->l_lvb_len = lvb_len;
763 lmm = NULL;
764 }
765 unlock_res_and_lock(lock);
766 if (lmm != NULL)
767 OBD_FREE_LARGE(lmm, lvb_len);
768 }
769 if (lock != NULL)
770 LDLM_LOCK_PUT(lock);
771
0a3bdb00 772 return rc;
d7e09d03
PT
773}
774
775/* We always reserve enough space in the reply packet for a stripe MD, because
776 * we don't know in advance the file type. */
777int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
778 struct lookup_intent *it, struct md_op_data *op_data,
779 struct lustre_handle *lockh, void *lmm, int lmmsize,
780 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
781{
782 struct obd_device *obddev = class_exp2obd(exp);
783 struct ptlrpc_request *req = NULL;
784 __u64 flags, saved_flags = extra_lock_flags;
785 int rc;
786 struct ldlm_res_id res_id;
787 static const ldlm_policy_data_t lookup_policy =
788 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
789 static const ldlm_policy_data_t update_policy =
790 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
791 static const ldlm_policy_data_t layout_policy =
792 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
7fc1f831
AP
793 static const ldlm_policy_data_t getxattr_policy = {
794 .l_inodebits = { MDS_INODELOCK_XATTR } };
d7e09d03
PT
795 ldlm_policy_data_t const *policy = &lookup_policy;
796 int generation, resends = 0;
797 struct ldlm_reply *lockrep;
798 enum lvb_type lvb_type = 0;
d7e09d03
PT
799
800 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
801 einfo->ei_type);
802
803 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
804
805 if (it) {
806 saved_flags |= LDLM_FL_HAS_INTENT;
807 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
808 policy = &update_policy;
809 else if (it->it_op & IT_LAYOUT)
810 policy = &layout_policy;
7fc1f831
AP
811 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812 policy = &getxattr_policy;
d7e09d03
PT
813 }
814
815 LASSERT(reqp == NULL);
816
817 generation = obddev->u.cli.cl_import->imp_generation;
818resend:
819 flags = saved_flags;
820 if (!it) {
821 /* The only way right now is FLOCK, in this case we hide flock
822 policy as lmm, but lmmsize is 0 */
823 LASSERT(lmm && lmmsize == 0);
824 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
825 einfo->ei_type);
826 policy = (ldlm_policy_data_t *)lmm;
827 res_id.name[3] = LDLM_FLOCK;
828 } else if (it->it_op & IT_OPEN) {
829 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
830 einfo->ei_cbdata);
831 policy = &update_policy;
832 einfo->ei_cbdata = NULL;
833 lmm = NULL;
834 } else if (it->it_op & IT_UNLINK) {
835 req = mdc_intent_unlink_pack(exp, it, op_data);
836 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
837 req = mdc_intent_getattr_pack(exp, it, op_data);
838 } else if (it->it_op & IT_READDIR) {
839 req = mdc_enqueue_pack(exp, 0);
840 } else if (it->it_op & IT_LAYOUT) {
841 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
0a3bdb00 842 return -EOPNOTSUPP;
d7e09d03
PT
843 req = mdc_intent_layout_pack(exp, it, op_data);
844 lvb_type = LVB_T_LAYOUT;
7fc1f831
AP
845 } else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) {
846 req = mdc_intent_getxattr_pack(exp, it, op_data);
d7e09d03
PT
847 } else {
848 LBUG();
0a3bdb00 849 return -EINVAL;
d7e09d03
PT
850 }
851
852 if (IS_ERR(req))
0a3bdb00 853 return PTR_ERR(req);
d7e09d03
PT
854
855 if (req != NULL && it && it->it_op & IT_CREAT)
856 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
857 * retry logic */
858 req->rq_no_retry_einprogress = 1;
859
860 if (resends) {
861 req->rq_generation_set = 1;
862 req->rq_import_generation = generation;
863 req->rq_sent = cfs_time_current_sec() + resends;
864 }
865
866 /* It is important to obtain rpc_lock first (if applicable), so that
867 * threads that are serialised with rpc_lock are not polluting our
868 * rpcs in flight counter. We do not do flock request limiting, though*/
869 if (it) {
870 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
871 rc = mdc_enter_request(&obddev->u.cli);
872 if (rc != 0) {
873 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
874 mdc_clear_replay_flag(req, 0);
875 ptlrpc_req_finished(req);
0a3bdb00 876 return rc;
d7e09d03
PT
877 }
878 }
879
880 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
881 0, lvb_type, lockh, 0);
882 if (!it) {
883 /* For flock requests we immediatelly return without further
884 delay and let caller deal with the rest, since rest of
885 this function metadata processing makes no sense for flock
cd6b328c
BF
886 requests anyway. But in case of problem during comms with
887 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
888 can not rely on caller and this mainly for F_UNLCKs
889 (explicits or automatically generated by Kernel to clean
890 current FLocks upon exit) that can't be trashed */
891 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
892 goto resend;
0a3bdb00 893 return rc;
d7e09d03
PT
894 }
895
896 mdc_exit_request(&obddev->u.cli);
897 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
898
899 if (rc < 0) {
900 CERROR("ldlm_cli_enqueue: %d\n", rc);
901 mdc_clear_replay_flag(req, rc);
902 ptlrpc_req_finished(req);
0a3bdb00 903 return rc;
d7e09d03
PT
904 }
905
906 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
907 LASSERT(lockrep != NULL);
908
2d58de78
LW
909 lockrep->lock_policy_res2 =
910 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
911
d7e09d03
PT
912 /* Retry the create infinitely when we get -EINPROGRESS from
913 * server. This is required by the new quota design. */
914 if (it && it->it_op & IT_CREAT &&
915 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
916 mdc_clear_replay_flag(req, rc);
917 ptlrpc_req_finished(req);
918 resends++;
919
920 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
921 obddev->obd_name, resends, it->it_op,
922 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
923
924 if (generation == obddev->u.cli.cl_import->imp_generation) {
925 goto resend;
926 } else {
927 CDEBUG(D_HA, "resend cross eviction\n");
0a3bdb00 928 return -EIO;
d7e09d03
PT
929 }
930 }
931
932 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
933 if (rc < 0) {
934 if (lustre_handle_is_used(lockh)) {
935 ldlm_lock_decref(lockh, einfo->ei_mode);
936 memset(lockh, 0, sizeof(*lockh));
937 }
938 ptlrpc_req_finished(req);
939 }
0a3bdb00 940 return rc;
d7e09d03
PT
941}
942
943static int mdc_finish_intent_lock(struct obd_export *exp,
944 struct ptlrpc_request *request,
945 struct md_op_data *op_data,
946 struct lookup_intent *it,
947 struct lustre_handle *lockh)
948{
949 struct lustre_handle old_lock;
950 struct mdt_body *mdt_body;
951 struct ldlm_lock *lock;
952 int rc;
d7e09d03
PT
953
954 LASSERT(request != NULL);
955 LASSERT(request != LP_POISON);
956 LASSERT(request->rq_repmsg != LP_POISON);
957
958 if (!it_disposition(it, DISP_IT_EXECD)) {
959 /* The server failed before it even started executing the
960 * intent, i.e. because it couldn't unpack the request. */
961 LASSERT(it->d.lustre.it_status != 0);
0a3bdb00 962 return it->d.lustre.it_status;
d7e09d03
PT
963 }
964 rc = it_open_error(DISP_IT_EXECD, it);
965 if (rc)
0a3bdb00 966 return rc;
d7e09d03
PT
967
968 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
969 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
970
971 /* If we were revalidating a fid/name pair, mark the intent in
972 * case we fail and get called again from lookup */
973 if (fid_is_sane(&op_data->op_fid2) &&
974 it->it_create_mode & M_CHECK_STALE &&
975 it->it_op != IT_GETATTR) {
976 it_set_disposition(it, DISP_ENQ_COMPLETE);
977
978 /* Also: did we find the same inode? */
979 /* sever can return one of two fids:
980 * op_fid2 - new allocated fid - if file is created.
981 * op_fid3 - existent fid - if file only open.
982 * op_fid3 is saved in lmv_intent_open */
983 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
984 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
985 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
986 "\n", PFID(&op_data->op_fid2),
987 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
0a3bdb00 988 return -ESTALE;
d7e09d03
PT
989 }
990 }
991
992 rc = it_open_error(DISP_LOOKUP_EXECD, it);
993 if (rc)
0a3bdb00 994 return rc;
d7e09d03
PT
995
996 /* keep requests around for the multiple phases of the call
997 * this shows the DISP_XX must guarantee we make it into the call
998 */
999 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1000 it_disposition(it, DISP_OPEN_CREATE) &&
1001 !it_open_error(DISP_OPEN_CREATE, it)) {
1002 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1003 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1004 }
1005 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1006 it_disposition(it, DISP_OPEN_OPEN) &&
1007 !it_open_error(DISP_OPEN_OPEN, it)) {
1008 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1009 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1010 /* BUG 11546 - eviction in the middle of open rpc processing */
1011 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1012 }
1013
1014 if (it->it_op & IT_CREAT) {
1015 /* XXX this belongs in ll_create_it */
1016 } else if (it->it_op == IT_OPEN) {
1017 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1018 } else {
1019 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1020 }
1021
1022 /* If we already have a matching lock, then cancel the new
1023 * one. We have to set the data here instead of in
1024 * mdc_enqueue, because we need to use the child's inode as
1025 * the l_ast_data to match, and that's not available until
1026 * intent_finish has performed the iget().) */
1027 lock = ldlm_handle2lock(lockh);
1028 if (lock) {
1029 ldlm_policy_data_t policy = lock->l_policy_data;
1030 LDLM_DEBUG(lock, "matching against this");
1031
1032 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1033 &lock->l_resource->lr_name),
6d95e048
AD
1034 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1035 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
d7e09d03
PT
1036 LDLM_LOCK_PUT(lock);
1037
1038 memcpy(&old_lock, lockh, sizeof(*lockh));
1039 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1040 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1041 ldlm_lock_decref_and_cancel(lockh,
1042 it->d.lustre.it_lock_mode);
1043 memcpy(lockh, &old_lock, sizeof(old_lock));
1044 it->d.lustre.it_lock_handle = lockh->cookie;
1045 }
1046 }
1047 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1048 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1049 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
0a3bdb00 1050 return rc;
d7e09d03
PT
1051}
1052
1053int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1054 struct lu_fid *fid, __u64 *bits)
1055{
1056 /* We could just return 1 immediately, but since we should only
1057 * be called in revalidate_it if we already have a lock, let's
1058 * verify that. */
1059 struct ldlm_res_id res_id;
1060 struct lustre_handle lockh;
1061 ldlm_policy_data_t policy;
1062 ldlm_mode_t mode;
d7e09d03
PT
1063
1064 if (it->d.lustre.it_lock_handle) {
1065 lockh.cookie = it->d.lustre.it_lock_handle;
1066 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1067 } else {
1068 fid_build_reg_res_name(fid, &res_id);
1069 switch (it->it_op) {
1070 case IT_GETATTR:
1071 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1072 break;
1073 case IT_LAYOUT:
1074 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1075 break;
1076 default:
1077 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1078 break;
1079 }
1080 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1081 LDLM_FL_BLOCK_GRANTED, &res_id,
1082 LDLM_IBITS, &policy,
1083 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1084 }
1085
1086 if (mode) {
1087 it->d.lustre.it_lock_handle = lockh.cookie;
1088 it->d.lustre.it_lock_mode = mode;
1089 } else {
1090 it->d.lustre.it_lock_handle = 0;
1091 it->d.lustre.it_lock_mode = 0;
1092 }
1093
0a3bdb00 1094 return !!mode;
d7e09d03
PT
1095}
1096
1097/*
1098 * This long block is all about fixing up the lock and request state
1099 * so that it is correct as of the moment _before_ the operation was
1100 * applied; that way, the VFS will think that everything is normal and
1101 * call Lustre's regular VFS methods.
1102 *
1103 * If we're performing a creation, that means that unless the creation
1104 * failed with EEXIST, we should fake up a negative dentry.
1105 *
1106 * For everything else, we want to lookup to succeed.
1107 *
1108 * One additional note: if CREATE or OPEN succeeded, we add an extra
1109 * reference to the request because we need to keep it around until
1110 * ll_create/ll_open gets called.
1111 *
1112 * The server will return to us, in it_disposition, an indication of
1113 * exactly what d.lustre.it_status refers to.
1114 *
1115 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1116 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1117 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1118 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1119 * was successful.
1120 *
1121 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1122 * child lookup.
1123 */
1124int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1125 void *lmm, int lmmsize, struct lookup_intent *it,
1126 int lookup_flags, struct ptlrpc_request **reqp,
1127 ldlm_blocking_callback cb_blocking,
1128 __u64 extra_lock_flags)
1129{
1130 struct lustre_handle lockh;
1131 int rc = 0;
29aaf496 1132
d7e09d03
PT
1133 LASSERT(it);
1134
1135 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
d3a8a4e2
JX
1136 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1137 op_data->op_name, PFID(&op_data->op_fid2),
1138 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1139 it->it_flags);
d7e09d03
PT
1140
1141 lockh.cookie = 0;
1142 if (fid_is_sane(&op_data->op_fid2) &&
1143 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1144 /* We could just return 1 immediately, but since we should only
1145 * be called in revalidate_it if we already have a lock, let's
1146 * verify that. */
1147 it->d.lustre.it_lock_handle = 0;
1148 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1149 /* Only return failure if it was not GETATTR by cfid
1150 (from inode_revalidate) */
1151 if (rc || op_data->op_namelen != 0)
0a3bdb00 1152 return rc;
d7e09d03
PT
1153 }
1154
1155 /* lookup_it may be called only after revalidate_it has run, because
1156 * revalidate_it cannot return errors, only zero. Returning zero causes
1157 * this call to lookup, which *can* return an error.
1158 *
1159 * We only want to execute the request associated with the intent one
1160 * time, however, so don't send the request again. Instead, skip past
1161 * this and use the request from revalidate. In this case, revalidate
1162 * never dropped its reference, so the refcounts are all OK */
1163 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
f2145eae
BK
1164 struct ldlm_enqueue_info einfo = {
1165 .ei_type = LDLM_IBITS,
1166 .ei_mode = it_to_lock_mode(it),
1167 .ei_cb_bl = cb_blocking,
1168 .ei_cb_cp = ldlm_completion_ast,
1169 };
d7e09d03
PT
1170
1171 /* For case if upper layer did not alloc fid, do it now. */
1172 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1173 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1174 if (rc < 0) {
1175 CERROR("Can't alloc new fid, rc %d\n", rc);
0a3bdb00 1176 return rc;
d7e09d03
PT
1177 }
1178 }
1179 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1180 lmm, lmmsize, NULL, extra_lock_flags);
1181 if (rc < 0)
0a3bdb00 1182 return rc;
d7e09d03
PT
1183 } else if (!fid_is_sane(&op_data->op_fid2) ||
1184 !(it->it_create_mode & M_CHECK_STALE)) {
1185 /* DISP_ENQ_COMPLETE set means there is extra reference on
1186 * request referenced from this intent, saved for subsequent
1187 * lookup. This path is executed when we proceed to this
1188 * lookup, so we clear DISP_ENQ_COMPLETE */
1189 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1190 }
1191 *reqp = it->d.lustre.it_data;
1192 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
0a3bdb00 1193 return rc;
d7e09d03
PT
1194}
1195
1196static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1197 struct ptlrpc_request *req,
1198 void *args, int rc)
1199{
1200 struct mdc_getattr_args *ga = args;
1201 struct obd_export *exp = ga->ga_exp;
1202 struct md_enqueue_info *minfo = ga->ga_minfo;
1203 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1204 struct lookup_intent *it;
1205 struct lustre_handle *lockh;
1206 struct obd_device *obddev;
2d58de78 1207 struct ldlm_reply *lockrep;
d7e09d03 1208 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03
PT
1209
1210 it = &minfo->mi_it;
1211 lockh = &minfo->mi_lockh;
1212
1213 obddev = class_exp2obd(exp);
1214
1215 mdc_exit_request(&obddev->u.cli);
1216 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1217 rc = -ETIMEDOUT;
1218
1219 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1220 &flags, NULL, 0, lockh, rc);
1221 if (rc < 0) {
1222 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1223 mdc_clear_replay_flag(req, rc);
1224 GOTO(out, rc);
1225 }
1226
2d58de78
LW
1227 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1228 LASSERT(lockrep != NULL);
1229
1230 lockrep->lock_policy_res2 =
1231 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1232
d7e09d03
PT
1233 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1234 if (rc)
1235 GOTO(out, rc);
1236
1237 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
d7e09d03
PT
1238
1239out:
1240 OBD_FREE_PTR(einfo);
1241 minfo->mi_cb(req, minfo, rc);
1242 return 0;
1243}
1244
1245int mdc_intent_getattr_async(struct obd_export *exp,
1246 struct md_enqueue_info *minfo,
1247 struct ldlm_enqueue_info *einfo)
1248{
1249 struct md_op_data *op_data = &minfo->mi_data;
1250 struct lookup_intent *it = &minfo->mi_it;
1251 struct ptlrpc_request *req;
1252 struct mdc_getattr_args *ga;
1253 struct obd_device *obddev = class_exp2obd(exp);
1254 struct ldlm_res_id res_id;
1255 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1256 * for statahead currently. Consider CMD in future, such two bits
1257 * maybe managed by different MDS, should be adjusted then. */
1258 ldlm_policy_data_t policy = {
1259 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1260 MDS_INODELOCK_UPDATE }
1261 };
1262 int rc = 0;
1263 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03 1264
d3a8a4e2
JX
1265 CDEBUG(D_DLMTRACE,
1266 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1267 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1268 ldlm_it2str(it->it_op), it->it_flags);
d7e09d03
PT
1269
1270 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1271 req = mdc_intent_getattr_pack(exp, it, op_data);
1272 if (!req)
0a3bdb00 1273 return -ENOMEM;
d7e09d03
PT
1274
1275 rc = mdc_enter_request(&obddev->u.cli);
1276 if (rc != 0) {
1277 ptlrpc_req_finished(req);
0a3bdb00 1278 return rc;
d7e09d03
PT
1279 }
1280
1281 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1282 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1283 if (rc < 0) {
1284 mdc_exit_request(&obddev->u.cli);
1285 ptlrpc_req_finished(req);
0a3bdb00 1286 return rc;
d7e09d03
PT
1287 }
1288
1289 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1290 ga = ptlrpc_req_async_args(req);
1291 ga->ga_exp = exp;
1292 ga->ga_minfo = minfo;
1293 ga->ga_einfo = einfo;
1294
1295 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1296 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1297
0a3bdb00 1298 return 0;
d7e09d03 1299}
This page took 0.169902 seconds and 5 git commands to generate.