staging/lustre/ptlrpc: Translate between host and network errnos
[deliverable/linux.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
43
44 #include <lustre_acl.h>
45 #include <obd_class.h>
46 #include <lustre_dlm.h>
47 /* fid_res_name_eq() */
48 #include <lustre_fid.h>
49 #include <lprocfs_status.h>
50 #include "mdc_internal.h"
51
52 struct mdc_getattr_args {
53 struct obd_export *ga_exp;
54 struct md_enqueue_info *ga_minfo;
55 struct ldlm_enqueue_info *ga_einfo;
56 };
57
58 int it_disposition(struct lookup_intent *it, int flag)
59 {
60 return it->d.lustre.it_disposition & flag;
61 }
62 EXPORT_SYMBOL(it_disposition);
63
64 void it_set_disposition(struct lookup_intent *it, int flag)
65 {
66 it->d.lustre.it_disposition |= flag;
67 }
68 EXPORT_SYMBOL(it_set_disposition);
69
70 void it_clear_disposition(struct lookup_intent *it, int flag)
71 {
72 it->d.lustre.it_disposition &= ~flag;
73 }
74 EXPORT_SYMBOL(it_clear_disposition);
75
76 int it_open_error(int phase, struct lookup_intent *it)
77 {
78 if (it_disposition(it, DISP_OPEN_OPEN)) {
79 if (phase >= DISP_OPEN_OPEN)
80 return it->d.lustre.it_status;
81 else
82 return 0;
83 }
84
85 if (it_disposition(it, DISP_OPEN_CREATE)) {
86 if (phase >= DISP_OPEN_CREATE)
87 return it->d.lustre.it_status;
88 else
89 return 0;
90 }
91
92 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
93 if (phase >= DISP_LOOKUP_EXECD)
94 return it->d.lustre.it_status;
95 else
96 return 0;
97 }
98
99 if (it_disposition(it, DISP_IT_EXECD)) {
100 if (phase >= DISP_IT_EXECD)
101 return it->d.lustre.it_status;
102 else
103 return 0;
104 }
105 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
106 it->d.lustre.it_status);
107 LBUG();
108 return 0;
109 }
110 EXPORT_SYMBOL(it_open_error);
111
112 /* this must be called on a lockh that is known to have a referenced lock */
113 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
114 __u64 *bits)
115 {
116 struct ldlm_lock *lock;
117 struct inode *new_inode = data;
118 ENTRY;
119
120 if(bits)
121 *bits = 0;
122
123 if (!*lockh)
124 RETURN(0);
125
126 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
127
128 LASSERT(lock != NULL);
129 lock_res_and_lock(lock);
130 if (lock->l_resource->lr_lvb_inode &&
131 lock->l_resource->lr_lvb_inode != data) {
132 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
133 LASSERTF(old_inode->i_state & I_FREEING,
134 "Found existing inode %p/%lu/%u state %lu in lock: "
135 "setting data to %p/%lu/%u\n", old_inode,
136 old_inode->i_ino, old_inode->i_generation,
137 old_inode->i_state,
138 new_inode, new_inode->i_ino, new_inode->i_generation);
139 }
140 lock->l_resource->lr_lvb_inode = new_inode;
141 if (bits)
142 *bits = lock->l_policy_data.l_inodebits.bits;
143
144 unlock_res_and_lock(lock);
145 LDLM_LOCK_PUT(lock);
146
147 RETURN(0);
148 }
149
150 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
151 const struct lu_fid *fid, ldlm_type_t type,
152 ldlm_policy_data_t *policy, ldlm_mode_t mode,
153 struct lustre_handle *lockh)
154 {
155 struct ldlm_res_id res_id;
156 ldlm_mode_t rc;
157 ENTRY;
158
159 fid_build_reg_res_name(fid, &res_id);
160 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
161 &res_id, type, policy, mode, lockh, 0);
162 RETURN(rc);
163 }
164
165 int mdc_cancel_unused(struct obd_export *exp,
166 const struct lu_fid *fid,
167 ldlm_policy_data_t *policy,
168 ldlm_mode_t mode,
169 ldlm_cancel_flags_t flags,
170 void *opaque)
171 {
172 struct ldlm_res_id res_id;
173 struct obd_device *obd = class_exp2obd(exp);
174 int rc;
175
176 ENTRY;
177
178 fid_build_reg_res_name(fid, &res_id);
179 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
180 policy, mode, flags, opaque);
181 RETURN(rc);
182 }
183
184 int mdc_null_inode(struct obd_export *exp,
185 const struct lu_fid *fid)
186 {
187 struct ldlm_res_id res_id;
188 struct ldlm_resource *res;
189 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
190 ENTRY;
191
192 LASSERTF(ns != NULL, "no namespace passed\n");
193
194 fid_build_reg_res_name(fid, &res_id);
195
196 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
197 if(res == NULL)
198 RETURN(0);
199
200 lock_res(res);
201 res->lr_lvb_inode = NULL;
202 unlock_res(res);
203
204 ldlm_resource_putref(res);
205 RETURN(0);
206 }
207
208 /* find any ldlm lock of the inode in mdc
209 * return 0 not find
210 * 1 find one
211 * < 0 error */
212 int mdc_find_cbdata(struct obd_export *exp,
213 const struct lu_fid *fid,
214 ldlm_iterator_t it, void *data)
215 {
216 struct ldlm_res_id res_id;
217 int rc = 0;
218 ENTRY;
219
220 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
221 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
222 it, data);
223 if (rc == LDLM_ITER_STOP)
224 RETURN(1);
225 else if (rc == LDLM_ITER_CONTINUE)
226 RETURN(0);
227 RETURN(rc);
228 }
229
230 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
231 {
232 /* Don't hold error requests for replay. */
233 if (req->rq_replay) {
234 spin_lock(&req->rq_lock);
235 req->rq_replay = 0;
236 spin_unlock(&req->rq_lock);
237 }
238 if (rc && req->rq_transno != 0) {
239 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 LBUG();
241 }
242 }
243
244 /* Save a large LOV EA into the request buffer so that it is available
245 * for replay. We don't do this in the initial request because the
246 * original request doesn't need this buffer (at most it sends just the
247 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
248 * buffer and may also be difficult to allocate and save a very large
249 * request buffer for each open. (bug 5707)
250 *
251 * OOM here may cause recovery failure if lmm is needed (only for the
252 * original open if the MDS crashed just when this client also OOM'd)
253 * but this is incredibly unlikely, and questionable whether the client
254 * could do MDS recovery under OOM anyways... */
255 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
256 struct mdt_body *body)
257 {
258 int rc;
259
260 /* FIXME: remove this explicit offset. */
261 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
262 body->eadatasize);
263 if (rc) {
264 CERROR("Can't enlarge segment %d size to %d\n",
265 DLM_INTENT_REC_OFF + 4, body->eadatasize);
266 body->valid &= ~OBD_MD_FLEASIZE;
267 body->eadatasize = 0;
268 }
269 }
270
271 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
272 struct lookup_intent *it,
273 struct md_op_data *op_data,
274 void *lmm, int lmmsize,
275 void *cb_data)
276 {
277 struct ptlrpc_request *req;
278 struct obd_device *obddev = class_exp2obd(exp);
279 struct ldlm_intent *lit;
280 LIST_HEAD(cancels);
281 int count = 0;
282 int mode;
283 int rc;
284 ENTRY;
285
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
292 mode = LCK_CW;
293 #ifdef FMODE_EXEC
294 else if (it->it_flags & FMODE_EXEC)
295 mode = LCK_PR;
296 #endif
297 else
298 mode = LCK_CR;
299 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 &cancels, mode,
301 MDS_INODELOCK_OPEN);
302 }
303
304 /* If CREATE, cancel parent's UPDATE lock. */
305 if (it->it_op & IT_CREAT)
306 mode = LCK_EX;
307 else
308 mode = LCK_CR;
309 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
310 &cancels, mode,
311 MDS_INODELOCK_UPDATE);
312
313 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
314 &RQF_LDLM_INTENT_OPEN);
315 if (req == NULL) {
316 ldlm_lock_list_put(&cancels, l_bl_ast, count);
317 RETURN(ERR_PTR(-ENOMEM));
318 }
319
320 /* parent capability */
321 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
322 /* child capability, reserve the size according to parent capa, it will
323 * be filled after we get the reply */
324 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
325
326 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
327 op_data->op_namelen + 1);
328 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
329 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
330
331 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332 if (rc) {
333 ptlrpc_request_free(req);
334 return NULL;
335 }
336
337 spin_lock(&req->rq_lock);
338 req->rq_replay = req->rq_import->imp_replayable;
339 spin_unlock(&req->rq_lock);
340
341 /* pack the intent */
342 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
343 lit->opc = (__u64)it->it_op;
344
345 /* pack the intended request */
346 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
347 lmmsize);
348
349 /* for remote client, fetch remote perm for current user */
350 if (client_is_remote(exp))
351 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
352 sizeof(struct mdt_remote_perm));
353 ptlrpc_request_set_replen(req);
354 return req;
355 }
356
357 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
358 struct lookup_intent *it,
359 struct md_op_data *op_data)
360 {
361 struct ptlrpc_request *req;
362 struct obd_device *obddev = class_exp2obd(exp);
363 struct ldlm_intent *lit;
364 int rc;
365 ENTRY;
366
367 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
368 &RQF_LDLM_INTENT_UNLINK);
369 if (req == NULL)
370 RETURN(ERR_PTR(-ENOMEM));
371
372 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
373 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
374 op_data->op_namelen + 1);
375
376 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
377 if (rc) {
378 ptlrpc_request_free(req);
379 RETURN(ERR_PTR(rc));
380 }
381
382 /* pack the intent */
383 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
384 lit->opc = (__u64)it->it_op;
385
386 /* pack the intended request */
387 mdc_unlink_pack(req, op_data);
388
389 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
390 obddev->u.cli.cl_max_mds_easize);
391 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
392 obddev->u.cli.cl_max_mds_cookiesize);
393 ptlrpc_request_set_replen(req);
394 RETURN(req);
395 }
396
397 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
398 struct lookup_intent *it,
399 struct md_op_data *op_data)
400 {
401 struct ptlrpc_request *req;
402 struct obd_device *obddev = class_exp2obd(exp);
403 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
404 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
405 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
406 (client_is_remote(exp) ?
407 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
408 struct ldlm_intent *lit;
409 int rc;
410 ENTRY;
411
412 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
413 &RQF_LDLM_INTENT_GETATTR);
414 if (req == NULL)
415 RETURN(ERR_PTR(-ENOMEM));
416
417 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
418 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
419 op_data->op_namelen + 1);
420
421 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
422 if (rc) {
423 ptlrpc_request_free(req);
424 RETURN(ERR_PTR(rc));
425 }
426
427 /* pack the intent */
428 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
429 lit->opc = (__u64)it->it_op;
430
431 /* pack the intended request */
432 mdc_getattr_pack(req, valid, it->it_flags, op_data,
433 obddev->u.cli.cl_max_mds_easize);
434
435 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
436 obddev->u.cli.cl_max_mds_easize);
437 if (client_is_remote(exp))
438 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
439 sizeof(struct mdt_remote_perm));
440 ptlrpc_request_set_replen(req);
441 RETURN(req);
442 }
443
444 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
445 struct lookup_intent *it,
446 struct md_op_data *unused)
447 {
448 struct obd_device *obd = class_exp2obd(exp);
449 struct ptlrpc_request *req;
450 struct ldlm_intent *lit;
451 struct layout_intent *layout;
452 int rc;
453 ENTRY;
454
455 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
456 &RQF_LDLM_INTENT_LAYOUT);
457 if (req == NULL)
458 RETURN(ERR_PTR(-ENOMEM));
459
460 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
461 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
462 if (rc) {
463 ptlrpc_request_free(req);
464 RETURN(ERR_PTR(rc));
465 }
466
467 /* pack the intent */
468 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469 lit->opc = (__u64)it->it_op;
470
471 /* pack the layout intent request */
472 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
473 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
474 * set for replication */
475 layout->li_opc = LAYOUT_INTENT_ACCESS;
476
477 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
478 obd->u.cli.cl_max_mds_easize);
479 ptlrpc_request_set_replen(req);
480 RETURN(req);
481 }
482
483 static struct ptlrpc_request *
484 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
485 {
486 struct ptlrpc_request *req;
487 int rc;
488 ENTRY;
489
490 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
491 if (req == NULL)
492 RETURN(ERR_PTR(-ENOMEM));
493
494 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
495 if (rc) {
496 ptlrpc_request_free(req);
497 RETURN(ERR_PTR(rc));
498 }
499
500 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
501 ptlrpc_request_set_replen(req);
502 RETURN(req);
503 }
504
505 static int mdc_finish_enqueue(struct obd_export *exp,
506 struct ptlrpc_request *req,
507 struct ldlm_enqueue_info *einfo,
508 struct lookup_intent *it,
509 struct lustre_handle *lockh,
510 int rc)
511 {
512 struct req_capsule *pill = &req->rq_pill;
513 struct ldlm_request *lockreq;
514 struct ldlm_reply *lockrep;
515 struct lustre_intent_data *intent = &it->d.lustre;
516 struct ldlm_lock *lock;
517 void *lvb_data = NULL;
518 int lvb_len = 0;
519 ENTRY;
520
521 LASSERT(rc >= 0);
522 /* Similarly, if we're going to replay this request, we don't want to
523 * actually get a lock, just perform the intent. */
524 if (req->rq_transno || req->rq_replay) {
525 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
526 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
527 }
528
529 if (rc == ELDLM_LOCK_ABORTED) {
530 einfo->ei_mode = 0;
531 memset(lockh, 0, sizeof(*lockh));
532 rc = 0;
533 } else { /* rc = 0 */
534 lock = ldlm_handle2lock(lockh);
535 LASSERT(lock != NULL);
536
537 /* If the server gave us back a different lock mode, we should
538 * fix up our variables. */
539 if (lock->l_req_mode != einfo->ei_mode) {
540 ldlm_lock_addref(lockh, lock->l_req_mode);
541 ldlm_lock_decref(lockh, einfo->ei_mode);
542 einfo->ei_mode = lock->l_req_mode;
543 }
544 LDLM_LOCK_PUT(lock);
545 }
546
547 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
548 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
549
550 intent->it_disposition = (int)lockrep->lock_policy_res1;
551 intent->it_status = (int)lockrep->lock_policy_res2;
552 intent->it_lock_mode = einfo->ei_mode;
553 intent->it_lock_handle = lockh->cookie;
554 intent->it_data = req;
555
556 /* Technically speaking rq_transno must already be zero if
557 * it_status is in error, so the check is a bit redundant */
558 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
559 mdc_clear_replay_flag(req, intent->it_status);
560
561 /* If we're doing an IT_OPEN which did not result in an actual
562 * successful open, then we need to remove the bit which saves
563 * this request for unconditional replay.
564 *
565 * It's important that we do this first! Otherwise we might exit the
566 * function without doing so, and try to replay a failed create
567 * (bug 3440) */
568 if (it->it_op & IT_OPEN && req->rq_replay &&
569 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
570 mdc_clear_replay_flag(req, intent->it_status);
571
572 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
573 it->it_op, intent->it_disposition, intent->it_status);
574
575 /* We know what to expect, so we do any byte flipping required here */
576 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
577 struct mdt_body *body;
578
579 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
580 if (body == NULL) {
581 CERROR ("Can't swab mdt_body\n");
582 RETURN (-EPROTO);
583 }
584
585 if (it_disposition(it, DISP_OPEN_OPEN) &&
586 !it_open_error(DISP_OPEN_OPEN, it)) {
587 /*
588 * If this is a successful OPEN request, we need to set
589 * replay handler and data early, so that if replay
590 * happens immediately after swabbing below, new reply
591 * is swabbed by that handler correctly.
592 */
593 mdc_set_open_replay_data(NULL, NULL, req);
594 }
595
596 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
597 void *eadata;
598
599 mdc_update_max_ea_from_body(exp, body);
600
601 /*
602 * The eadata is opaque; just check that it is there.
603 * Eventually, obd_unpackmd() will check the contents.
604 */
605 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
606 body->eadatasize);
607 if (eadata == NULL)
608 RETURN(-EPROTO);
609
610 /* save lvb data and length in case this is for layout
611 * lock */
612 lvb_data = eadata;
613 lvb_len = body->eadatasize;
614
615 /*
616 * We save the reply LOV EA in case we have to replay a
617 * create for recovery. If we didn't allocate a large
618 * enough request buffer above we need to reallocate it
619 * here to hold the actual LOV EA.
620 *
621 * To not save LOV EA if request is not going to replay
622 * (for example error one).
623 */
624 if ((it->it_op & IT_OPEN) && req->rq_replay) {
625 void *lmm;
626 if (req_capsule_get_size(pill, &RMF_EADATA,
627 RCL_CLIENT) <
628 body->eadatasize)
629 mdc_realloc_openmsg(req, body);
630 else
631 req_capsule_shrink(pill, &RMF_EADATA,
632 body->eadatasize,
633 RCL_CLIENT);
634
635 req_capsule_set_size(pill, &RMF_EADATA,
636 RCL_CLIENT,
637 body->eadatasize);
638
639 lmm = req_capsule_client_get(pill, &RMF_EADATA);
640 if (lmm)
641 memcpy(lmm, eadata, body->eadatasize);
642 }
643 }
644
645 if (body->valid & OBD_MD_FLRMTPERM) {
646 struct mdt_remote_perm *perm;
647
648 LASSERT(client_is_remote(exp));
649 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
650 lustre_swab_mdt_remote_perm);
651 if (perm == NULL)
652 RETURN(-EPROTO);
653 }
654 if (body->valid & OBD_MD_FLMDSCAPA) {
655 struct lustre_capa *capa, *p;
656
657 capa = req_capsule_server_get(pill, &RMF_CAPA1);
658 if (capa == NULL)
659 RETURN(-EPROTO);
660
661 if (it->it_op & IT_OPEN) {
662 /* client fid capa will be checked in replay */
663 p = req_capsule_client_get(pill, &RMF_CAPA2);
664 LASSERT(p);
665 *p = *capa;
666 }
667 }
668 if (body->valid & OBD_MD_FLOSSCAPA) {
669 struct lustre_capa *capa;
670
671 capa = req_capsule_server_get(pill, &RMF_CAPA2);
672 if (capa == NULL)
673 RETURN(-EPROTO);
674 }
675 } else if (it->it_op & IT_LAYOUT) {
676 /* maybe the lock was granted right away and layout
677 * is packed into RMF_DLM_LVB of req */
678 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
679 if (lvb_len > 0) {
680 lvb_data = req_capsule_server_sized_get(pill,
681 &RMF_DLM_LVB, lvb_len);
682 if (lvb_data == NULL)
683 RETURN(-EPROTO);
684 }
685 }
686
687 /* fill in stripe data for layout lock */
688 lock = ldlm_handle2lock(lockh);
689 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
690 void *lmm;
691
692 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
693 ldlm_it2str(it->it_op), lvb_len);
694
695 OBD_ALLOC_LARGE(lmm, lvb_len);
696 if (lmm == NULL) {
697 LDLM_LOCK_PUT(lock);
698 RETURN(-ENOMEM);
699 }
700 memcpy(lmm, lvb_data, lvb_len);
701
702 /* install lvb_data */
703 lock_res_and_lock(lock);
704 if (lock->l_lvb_data == NULL) {
705 lock->l_lvb_data = lmm;
706 lock->l_lvb_len = lvb_len;
707 lmm = NULL;
708 }
709 unlock_res_and_lock(lock);
710 if (lmm != NULL)
711 OBD_FREE_LARGE(lmm, lvb_len);
712 }
713 if (lock != NULL)
714 LDLM_LOCK_PUT(lock);
715
716 RETURN(rc);
717 }
718
719 /* We always reserve enough space in the reply packet for a stripe MD, because
720 * we don't know in advance the file type. */
721 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
722 struct lookup_intent *it, struct md_op_data *op_data,
723 struct lustre_handle *lockh, void *lmm, int lmmsize,
724 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
725 {
726 struct obd_device *obddev = class_exp2obd(exp);
727 struct ptlrpc_request *req = NULL;
728 __u64 flags, saved_flags = extra_lock_flags;
729 int rc;
730 struct ldlm_res_id res_id;
731 static const ldlm_policy_data_t lookup_policy =
732 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
733 static const ldlm_policy_data_t update_policy =
734 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
735 static const ldlm_policy_data_t layout_policy =
736 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
737 ldlm_policy_data_t const *policy = &lookup_policy;
738 int generation, resends = 0;
739 struct ldlm_reply *lockrep;
740 enum lvb_type lvb_type = 0;
741 ENTRY;
742
743 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
744 einfo->ei_type);
745
746 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
747
748 if (it) {
749 saved_flags |= LDLM_FL_HAS_INTENT;
750 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
751 policy = &update_policy;
752 else if (it->it_op & IT_LAYOUT)
753 policy = &layout_policy;
754 }
755
756 LASSERT(reqp == NULL);
757
758 generation = obddev->u.cli.cl_import->imp_generation;
759 resend:
760 flags = saved_flags;
761 if (!it) {
762 /* The only way right now is FLOCK, in this case we hide flock
763 policy as lmm, but lmmsize is 0 */
764 LASSERT(lmm && lmmsize == 0);
765 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
766 einfo->ei_type);
767 policy = (ldlm_policy_data_t *)lmm;
768 res_id.name[3] = LDLM_FLOCK;
769 } else if (it->it_op & IT_OPEN) {
770 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
771 einfo->ei_cbdata);
772 policy = &update_policy;
773 einfo->ei_cbdata = NULL;
774 lmm = NULL;
775 } else if (it->it_op & IT_UNLINK) {
776 req = mdc_intent_unlink_pack(exp, it, op_data);
777 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
778 req = mdc_intent_getattr_pack(exp, it, op_data);
779 } else if (it->it_op & IT_READDIR) {
780 req = mdc_enqueue_pack(exp, 0);
781 } else if (it->it_op & IT_LAYOUT) {
782 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
783 RETURN(-EOPNOTSUPP);
784
785 req = mdc_intent_layout_pack(exp, it, op_data);
786 lvb_type = LVB_T_LAYOUT;
787 } else {
788 LBUG();
789 RETURN(-EINVAL);
790 }
791
792 if (IS_ERR(req))
793 RETURN(PTR_ERR(req));
794
795 if (req != NULL && it && it->it_op & IT_CREAT)
796 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
797 * retry logic */
798 req->rq_no_retry_einprogress = 1;
799
800 if (resends) {
801 req->rq_generation_set = 1;
802 req->rq_import_generation = generation;
803 req->rq_sent = cfs_time_current_sec() + resends;
804 }
805
806 /* It is important to obtain rpc_lock first (if applicable), so that
807 * threads that are serialised with rpc_lock are not polluting our
808 * rpcs in flight counter. We do not do flock request limiting, though*/
809 if (it) {
810 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
811 rc = mdc_enter_request(&obddev->u.cli);
812 if (rc != 0) {
813 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
814 mdc_clear_replay_flag(req, 0);
815 ptlrpc_req_finished(req);
816 RETURN(rc);
817 }
818 }
819
820 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
821 0, lvb_type, lockh, 0);
822 if (!it) {
823 /* For flock requests we immediatelly return without further
824 delay and let caller deal with the rest, since rest of
825 this function metadata processing makes no sense for flock
826 requests anyway */
827 RETURN(rc);
828 }
829
830 mdc_exit_request(&obddev->u.cli);
831 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
832
833 if (rc < 0) {
834 CERROR("ldlm_cli_enqueue: %d\n", rc);
835 mdc_clear_replay_flag(req, rc);
836 ptlrpc_req_finished(req);
837 RETURN(rc);
838 }
839
840 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
841 LASSERT(lockrep != NULL);
842
843 lockrep->lock_policy_res2 =
844 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
845
846 /* Retry the create infinitely when we get -EINPROGRESS from
847 * server. This is required by the new quota design. */
848 if (it && it->it_op & IT_CREAT &&
849 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
850 mdc_clear_replay_flag(req, rc);
851 ptlrpc_req_finished(req);
852 resends++;
853
854 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
855 obddev->obd_name, resends, it->it_op,
856 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
857
858 if (generation == obddev->u.cli.cl_import->imp_generation) {
859 goto resend;
860 } else {
861 CDEBUG(D_HA, "resend cross eviction\n");
862 RETURN(-EIO);
863 }
864 }
865
866 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
867 if (rc < 0) {
868 if (lustre_handle_is_used(lockh)) {
869 ldlm_lock_decref(lockh, einfo->ei_mode);
870 memset(lockh, 0, sizeof(*lockh));
871 }
872 ptlrpc_req_finished(req);
873 }
874 RETURN(rc);
875 }
876
877 static int mdc_finish_intent_lock(struct obd_export *exp,
878 struct ptlrpc_request *request,
879 struct md_op_data *op_data,
880 struct lookup_intent *it,
881 struct lustre_handle *lockh)
882 {
883 struct lustre_handle old_lock;
884 struct mdt_body *mdt_body;
885 struct ldlm_lock *lock;
886 int rc;
887
888
889 LASSERT(request != NULL);
890 LASSERT(request != LP_POISON);
891 LASSERT(request->rq_repmsg != LP_POISON);
892
893 if (!it_disposition(it, DISP_IT_EXECD)) {
894 /* The server failed before it even started executing the
895 * intent, i.e. because it couldn't unpack the request. */
896 LASSERT(it->d.lustre.it_status != 0);
897 RETURN(it->d.lustre.it_status);
898 }
899 rc = it_open_error(DISP_IT_EXECD, it);
900 if (rc)
901 RETURN(rc);
902
903 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
904 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
905
906 /* If we were revalidating a fid/name pair, mark the intent in
907 * case we fail and get called again from lookup */
908 if (fid_is_sane(&op_data->op_fid2) &&
909 it->it_create_mode & M_CHECK_STALE &&
910 it->it_op != IT_GETATTR) {
911 it_set_disposition(it, DISP_ENQ_COMPLETE);
912
913 /* Also: did we find the same inode? */
914 /* sever can return one of two fids:
915 * op_fid2 - new allocated fid - if file is created.
916 * op_fid3 - existent fid - if file only open.
917 * op_fid3 is saved in lmv_intent_open */
918 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
919 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
920 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
921 "\n", PFID(&op_data->op_fid2),
922 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
923 RETURN(-ESTALE);
924 }
925 }
926
927 rc = it_open_error(DISP_LOOKUP_EXECD, it);
928 if (rc)
929 RETURN(rc);
930
931 /* keep requests around for the multiple phases of the call
932 * this shows the DISP_XX must guarantee we make it into the call
933 */
934 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
935 it_disposition(it, DISP_OPEN_CREATE) &&
936 !it_open_error(DISP_OPEN_CREATE, it)) {
937 it_set_disposition(it, DISP_ENQ_CREATE_REF);
938 ptlrpc_request_addref(request); /* balanced in ll_create_node */
939 }
940 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
941 it_disposition(it, DISP_OPEN_OPEN) &&
942 !it_open_error(DISP_OPEN_OPEN, it)) {
943 it_set_disposition(it, DISP_ENQ_OPEN_REF);
944 ptlrpc_request_addref(request); /* balanced in ll_file_open */
945 /* BUG 11546 - eviction in the middle of open rpc processing */
946 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
947 }
948
949 if (it->it_op & IT_CREAT) {
950 /* XXX this belongs in ll_create_it */
951 } else if (it->it_op == IT_OPEN) {
952 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
953 } else {
954 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
955 }
956
957 /* If we already have a matching lock, then cancel the new
958 * one. We have to set the data here instead of in
959 * mdc_enqueue, because we need to use the child's inode as
960 * the l_ast_data to match, and that's not available until
961 * intent_finish has performed the iget().) */
962 lock = ldlm_handle2lock(lockh);
963 if (lock) {
964 ldlm_policy_data_t policy = lock->l_policy_data;
965 LDLM_DEBUG(lock, "matching against this");
966
967 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
968 &lock->l_resource->lr_name),
969 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
970 (unsigned long)lock->l_resource->lr_name.name[0],
971 (unsigned long)lock->l_resource->lr_name.name[1],
972 (unsigned long)lock->l_resource->lr_name.name[2],
973 (unsigned long)fid_seq(&mdt_body->fid1),
974 (unsigned long)fid_oid(&mdt_body->fid1),
975 (unsigned long)fid_ver(&mdt_body->fid1));
976 LDLM_LOCK_PUT(lock);
977
978 memcpy(&old_lock, lockh, sizeof(*lockh));
979 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
980 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
981 ldlm_lock_decref_and_cancel(lockh,
982 it->d.lustre.it_lock_mode);
983 memcpy(lockh, &old_lock, sizeof(old_lock));
984 it->d.lustre.it_lock_handle = lockh->cookie;
985 }
986 }
987 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
988 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
989 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
990 RETURN(rc);
991 }
992
993 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
994 struct lu_fid *fid, __u64 *bits)
995 {
996 /* We could just return 1 immediately, but since we should only
997 * be called in revalidate_it if we already have a lock, let's
998 * verify that. */
999 struct ldlm_res_id res_id;
1000 struct lustre_handle lockh;
1001 ldlm_policy_data_t policy;
1002 ldlm_mode_t mode;
1003 ENTRY;
1004
1005 if (it->d.lustre.it_lock_handle) {
1006 lockh.cookie = it->d.lustre.it_lock_handle;
1007 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1008 } else {
1009 fid_build_reg_res_name(fid, &res_id);
1010 switch (it->it_op) {
1011 case IT_GETATTR:
1012 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1013 break;
1014 case IT_LAYOUT:
1015 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1016 break;
1017 default:
1018 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1019 break;
1020 }
1021 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1022 LDLM_FL_BLOCK_GRANTED, &res_id,
1023 LDLM_IBITS, &policy,
1024 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1025 }
1026
1027 if (mode) {
1028 it->d.lustre.it_lock_handle = lockh.cookie;
1029 it->d.lustre.it_lock_mode = mode;
1030 } else {
1031 it->d.lustre.it_lock_handle = 0;
1032 it->d.lustre.it_lock_mode = 0;
1033 }
1034
1035 RETURN(!!mode);
1036 }
1037
1038 /*
1039 * This long block is all about fixing up the lock and request state
1040 * so that it is correct as of the moment _before_ the operation was
1041 * applied; that way, the VFS will think that everything is normal and
1042 * call Lustre's regular VFS methods.
1043 *
1044 * If we're performing a creation, that means that unless the creation
1045 * failed with EEXIST, we should fake up a negative dentry.
1046 *
1047 * For everything else, we want to lookup to succeed.
1048 *
1049 * One additional note: if CREATE or OPEN succeeded, we add an extra
1050 * reference to the request because we need to keep it around until
1051 * ll_create/ll_open gets called.
1052 *
1053 * The server will return to us, in it_disposition, an indication of
1054 * exactly what d.lustre.it_status refers to.
1055 *
1056 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1057 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1058 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1059 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1060 * was successful.
1061 *
1062 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1063 * child lookup.
1064 */
1065 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1066 void *lmm, int lmmsize, struct lookup_intent *it,
1067 int lookup_flags, struct ptlrpc_request **reqp,
1068 ldlm_blocking_callback cb_blocking,
1069 __u64 extra_lock_flags)
1070 {
1071 struct lustre_handle lockh;
1072 int rc = 0;
1073 ENTRY;
1074 LASSERT(it);
1075
1076 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1077 ", intent: %s flags %#o\n", op_data->op_namelen,
1078 op_data->op_name, PFID(&op_data->op_fid2),
1079 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1080 it->it_flags);
1081
1082 lockh.cookie = 0;
1083 if (fid_is_sane(&op_data->op_fid2) &&
1084 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1085 /* We could just return 1 immediately, but since we should only
1086 * be called in revalidate_it if we already have a lock, let's
1087 * verify that. */
1088 it->d.lustre.it_lock_handle = 0;
1089 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1090 /* Only return failure if it was not GETATTR by cfid
1091 (from inode_revalidate) */
1092 if (rc || op_data->op_namelen != 0)
1093 RETURN(rc);
1094 }
1095
1096 /* lookup_it may be called only after revalidate_it has run, because
1097 * revalidate_it cannot return errors, only zero. Returning zero causes
1098 * this call to lookup, which *can* return an error.
1099 *
1100 * We only want to execute the request associated with the intent one
1101 * time, however, so don't send the request again. Instead, skip past
1102 * this and use the request from revalidate. In this case, revalidate
1103 * never dropped its reference, so the refcounts are all OK */
1104 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1105 struct ldlm_enqueue_info einfo =
1106 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1107 ldlm_completion_ast, NULL, NULL, NULL };
1108
1109 /* For case if upper layer did not alloc fid, do it now. */
1110 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1111 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1112 if (rc < 0) {
1113 CERROR("Can't alloc new fid, rc %d\n", rc);
1114 RETURN(rc);
1115 }
1116 }
1117 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1118 lmm, lmmsize, NULL, extra_lock_flags);
1119 if (rc < 0)
1120 RETURN(rc);
1121 } else if (!fid_is_sane(&op_data->op_fid2) ||
1122 !(it->it_create_mode & M_CHECK_STALE)) {
1123 /* DISP_ENQ_COMPLETE set means there is extra reference on
1124 * request referenced from this intent, saved for subsequent
1125 * lookup. This path is executed when we proceed to this
1126 * lookup, so we clear DISP_ENQ_COMPLETE */
1127 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1128 }
1129 *reqp = it->d.lustre.it_data;
1130 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1131 RETURN(rc);
1132 }
1133
1134 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1135 struct ptlrpc_request *req,
1136 void *args, int rc)
1137 {
1138 struct mdc_getattr_args *ga = args;
1139 struct obd_export *exp = ga->ga_exp;
1140 struct md_enqueue_info *minfo = ga->ga_minfo;
1141 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1142 struct lookup_intent *it;
1143 struct lustre_handle *lockh;
1144 struct obd_device *obddev;
1145 struct ldlm_reply *lockrep;
1146 __u64 flags = LDLM_FL_HAS_INTENT;
1147 ENTRY;
1148
1149 it = &minfo->mi_it;
1150 lockh = &minfo->mi_lockh;
1151
1152 obddev = class_exp2obd(exp);
1153
1154 mdc_exit_request(&obddev->u.cli);
1155 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1156 rc = -ETIMEDOUT;
1157
1158 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1159 &flags, NULL, 0, lockh, rc);
1160 if (rc < 0) {
1161 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1162 mdc_clear_replay_flag(req, rc);
1163 GOTO(out, rc);
1164 }
1165
1166 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1167 LASSERT(lockrep != NULL);
1168
1169 lockrep->lock_policy_res2 =
1170 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1171
1172 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1173 if (rc)
1174 GOTO(out, rc);
1175
1176 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1177 EXIT;
1178
1179 out:
1180 OBD_FREE_PTR(einfo);
1181 minfo->mi_cb(req, minfo, rc);
1182 return 0;
1183 }
1184
1185 int mdc_intent_getattr_async(struct obd_export *exp,
1186 struct md_enqueue_info *minfo,
1187 struct ldlm_enqueue_info *einfo)
1188 {
1189 struct md_op_data *op_data = &minfo->mi_data;
1190 struct lookup_intent *it = &minfo->mi_it;
1191 struct ptlrpc_request *req;
1192 struct mdc_getattr_args *ga;
1193 struct obd_device *obddev = class_exp2obd(exp);
1194 struct ldlm_res_id res_id;
1195 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1196 * for statahead currently. Consider CMD in future, such two bits
1197 * maybe managed by different MDS, should be adjusted then. */
1198 ldlm_policy_data_t policy = {
1199 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1200 MDS_INODELOCK_UPDATE }
1201 };
1202 int rc = 0;
1203 __u64 flags = LDLM_FL_HAS_INTENT;
1204 ENTRY;
1205
1206 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1207 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1208 ldlm_it2str(it->it_op), it->it_flags);
1209
1210 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1211 req = mdc_intent_getattr_pack(exp, it, op_data);
1212 if (!req)
1213 RETURN(-ENOMEM);
1214
1215 rc = mdc_enter_request(&obddev->u.cli);
1216 if (rc != 0) {
1217 ptlrpc_req_finished(req);
1218 RETURN(rc);
1219 }
1220
1221 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1222 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1223 if (rc < 0) {
1224 mdc_exit_request(&obddev->u.cli);
1225 ptlrpc_req_finished(req);
1226 RETURN(rc);
1227 }
1228
1229 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1230 ga = ptlrpc_req_async_args(req);
1231 ga->ga_exp = exp;
1232 ga->ga_minfo = minfo;
1233 ga->ga_einfo = einfo;
1234
1235 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1236 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1237
1238 RETURN(0);
1239 }
This page took 0.055611 seconds and 6 git commands to generate.