staging/lustre/ptlrpc: make ptlrpcd threads cpt-aware
[deliverable/linux.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_MDC
38
39# include <linux/module.h>
d7e09d03 40
00d65ec8 41#include "../include/lustre_intent.h"
05932307
GKH
42#include "../include/obd.h"
43#include "../include/obd_class.h"
44#include "../include/lustre_dlm.h"
45#include "../include/lustre_fid.h" /* fid_res_name_eq() */
46#include "../include/lustre_mdc.h"
47#include "../include/lustre_net.h"
48#include "../include/lustre_req_layout.h"
d7e09d03
PT
49#include "mdc_internal.h"
50
51struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
55};
56
57int it_disposition(struct lookup_intent *it, int flag)
58{
59 return it->d.lustre.it_disposition & flag;
60}
61EXPORT_SYMBOL(it_disposition);
62
63void it_set_disposition(struct lookup_intent *it, int flag)
64{
65 it->d.lustre.it_disposition |= flag;
66}
67EXPORT_SYMBOL(it_set_disposition);
68
69void it_clear_disposition(struct lookup_intent *it, int flag)
70{
71 it->d.lustre.it_disposition &= ~flag;
72}
73EXPORT_SYMBOL(it_clear_disposition);
74
75int it_open_error(int phase, struct lookup_intent *it)
76{
d3a8a4e2
JX
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
80 else
81 return 0;
82 }
d7e09d03
PT
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
86 else
87 return 0;
88 }
89
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
93 else
94 return 0;
95 }
96
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
100 else
101 return 0;
102 }
103
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
107 else
108 return 0;
109 }
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
112 LBUG();
113 return 0;
114}
115EXPORT_SYMBOL(it_open_error);
116
117/* this must be called on a lockh that is known to have a referenced lock */
118int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 __u64 *bits)
120{
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
d7e09d03 123
88005c5f 124 if (bits)
d7e09d03
PT
125 *bits = 0;
126
127 if (!*lockh)
0a3bdb00 128 return 0;
d7e09d03
PT
129
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
7436d070 137
d7e09d03 138 LASSERTF(old_inode->i_state & I_FREEING,
ee990b33
SM
139 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140 old_inode, old_inode->i_ino, old_inode->i_generation,
141 old_inode->i_state, new_inode, new_inode->i_ino,
142 new_inode->i_generation);
d7e09d03
PT
143 }
144 lock->l_resource->lr_lvb_inode = new_inode;
145 if (bits)
146 *bits = lock->l_policy_data.l_inodebits.bits;
147
148 unlock_res_and_lock(lock);
149 LDLM_LOCK_PUT(lock);
150
0a3bdb00 151 return 0;
d7e09d03
PT
152}
153
154ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
158{
159 struct ldlm_res_id res_id;
160 ldlm_mode_t rc;
d7e09d03
PT
161
162 fid_build_reg_res_name(fid, &res_id);
6caea2f9
AL
163 /* LU-4405: Clear bits not supported by server */
164 policy->l_inodebits.bits &= exp_connect_ibits(exp);
d7e09d03
PT
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
0a3bdb00 167 return rc;
d7e09d03
PT
168}
169
170int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
173 ldlm_mode_t mode,
174 ldlm_cancel_flags_t flags,
175 void *opaque)
176{
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
179 int rc;
180
d7e09d03
PT
181 fid_build_reg_res_name(fid, &res_id);
182 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 policy, mode, flags, opaque);
0a3bdb00 184 return rc;
d7e09d03
PT
185}
186
187int mdc_null_inode(struct obd_export *exp,
188 const struct lu_fid *fid)
189{
190 struct ldlm_res_id res_id;
191 struct ldlm_resource *res;
192 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
d7e09d03
PT
193
194 LASSERTF(ns != NULL, "no namespace passed\n");
195
196 fid_build_reg_res_name(fid, &res_id);
197
198 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
88005c5f 199 if (res == NULL)
0a3bdb00 200 return 0;
d7e09d03
PT
201
202 lock_res(res);
203 res->lr_lvb_inode = NULL;
204 unlock_res(res);
205
206 ldlm_resource_putref(res);
0a3bdb00 207 return 0;
d7e09d03
PT
208}
209
210/* find any ldlm lock of the inode in mdc
211 * return 0 not find
212 * 1 find one
213 * < 0 error */
214int mdc_find_cbdata(struct obd_export *exp,
215 const struct lu_fid *fid,
216 ldlm_iterator_t it, void *data)
217{
218 struct ldlm_res_id res_id;
219 int rc = 0;
d7e09d03 220
c35e01ff 221 fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
d7e09d03
PT
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 it, data);
224 if (rc == LDLM_ITER_STOP)
0a3bdb00 225 return 1;
d7e09d03 226 else if (rc == LDLM_ITER_CONTINUE)
0a3bdb00
GKH
227 return 0;
228 return rc;
d7e09d03
PT
229}
230
231static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232{
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 spin_lock(&req->rq_lock);
236 req->rq_replay = 0;
237 spin_unlock(&req->rq_lock);
238 }
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241 LBUG();
242 }
243}
244
245/* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
251 *
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
258{
259 int rc;
260
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 body->eadatasize);
264 if (rc) {
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
269 }
270}
271
272static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
276 void *cb_data)
277{
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 LIST_HEAD(cancels);
282 int count = 0;
283 int mode;
284 int rc;
d7e09d03
PT
285
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
d3a8a4e2
JX
291 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 if (it->it_flags & FMODE_WRITE)
293 mode = LCK_EX;
294 else
295 mode = LCK_PR;
296 } else {
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298 mode = LCK_CW;
962dbfd6 299 else if (it->it_flags & __FMODE_EXEC)
d3a8a4e2 300 mode = LCK_PR;
d3a8a4e2
JX
301 else
302 mode = LCK_CR;
303 }
d7e09d03
PT
304 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 &cancels, mode,
306 MDS_INODELOCK_OPEN);
307 }
308
309 /* If CREATE, cancel parent's UPDATE lock. */
310 if (it->it_op & IT_CREAT)
311 mode = LCK_EX;
312 else
313 mode = LCK_CR;
314 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 &cancels, mode,
316 MDS_INODELOCK_UPDATE);
317
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319 &RQF_LDLM_INTENT_OPEN);
320 if (req == NULL) {
321 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 322 return ERR_PTR(-ENOMEM);
d7e09d03
PT
323 }
324
325 /* parent capability */
326 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
327 /* child capability, reserve the size according to parent capa, it will
328 * be filled after we get the reply */
329 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
330
331 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
332 op_data->op_namelen + 1);
333 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
334 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
335
336 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
add882a8 337 if (rc < 0) {
d7e09d03 338 ptlrpc_request_free(req);
add882a8 339 return ERR_PTR(rc);
d7e09d03
PT
340 }
341
342 spin_lock(&req->rq_lock);
343 req->rq_replay = req->rq_import->imp_replayable;
344 spin_unlock(&req->rq_lock);
345
346 /* pack the intent */
347 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348 lit->opc = (__u64)it->it_op;
349
350 /* pack the intended request */
351 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
352 lmmsize);
353
354 /* for remote client, fetch remote perm for current user */
355 if (client_is_remote(exp))
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 sizeof(struct mdt_remote_perm));
358 ptlrpc_request_set_replen(req);
359 return req;
360}
361
7fc1f831
AP
362static struct ptlrpc_request *
363mdc_intent_getxattr_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
366{
367 struct ptlrpc_request *req;
368 struct ldlm_intent *lit;
369 int rc, count = 0, maxdata;
370 LIST_HEAD(cancels);
371
372
373
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 &RQF_LDLM_INTENT_GETXATTR);
376 if (req == NULL)
377 return ERR_PTR(-ENOMEM);
378
379 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380
7fc1f831
AP
381 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
382 if (rc) {
383 ptlrpc_request_free(req);
384 return ERR_PTR(rc);
385 }
386
387 /* pack the intent */
388 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389 lit->opc = IT_GETXATTR;
390
391 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
392
393 /* pack the intended request */
394 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395 op_data->op_valid, maxdata, -1, 0);
396
397 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398 RCL_SERVER, maxdata);
399
400 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401 RCL_SERVER, maxdata);
402
403 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404 RCL_SERVER, maxdata);
405
406 ptlrpc_request_set_replen(req);
407
408 return req;
409}
410
d7e09d03
PT
411static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412 struct lookup_intent *it,
413 struct md_op_data *op_data)
414{
415 struct ptlrpc_request *req;
416 struct obd_device *obddev = class_exp2obd(exp);
417 struct ldlm_intent *lit;
418 int rc;
d7e09d03
PT
419
420 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
421 &RQF_LDLM_INTENT_UNLINK);
422 if (req == NULL)
0a3bdb00 423 return ERR_PTR(-ENOMEM);
d7e09d03
PT
424
425 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
426 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
427 op_data->op_namelen + 1);
428
429 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 if (rc) {
431 ptlrpc_request_free(req);
0a3bdb00 432 return ERR_PTR(rc);
d7e09d03
PT
433 }
434
435 /* pack the intent */
436 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
437 lit->opc = (__u64)it->it_op;
438
439 /* pack the intended request */
440 mdc_unlink_pack(req, op_data);
441
442 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
44779340 443 obddev->u.cli.cl_default_mds_easize);
d7e09d03 444 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
44779340 445 obddev->u.cli.cl_default_mds_cookiesize);
d7e09d03 446 ptlrpc_request_set_replen(req);
0a3bdb00 447 return req;
d7e09d03
PT
448}
449
450static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
e5e663ae
SM
451 struct lookup_intent *it,
452 struct md_op_data *op_data)
d7e09d03
PT
453{
454 struct ptlrpc_request *req;
455 struct obd_device *obddev = class_exp2obd(exp);
21aef7d9 456 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
d7e09d03
PT
457 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
458 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
459 (client_is_remote(exp) ?
460 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
461 struct ldlm_intent *lit;
462 int rc;
2c580836 463 int easize;
d7e09d03
PT
464
465 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466 &RQF_LDLM_INTENT_GETATTR);
467 if (req == NULL)
0a3bdb00 468 return ERR_PTR(-ENOMEM);
d7e09d03
PT
469
470 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
471 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472 op_data->op_namelen + 1);
473
474 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
475 if (rc) {
476 ptlrpc_request_free(req);
0a3bdb00 477 return ERR_PTR(rc);
d7e09d03
PT
478 }
479
480 /* pack the intent */
481 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
482 lit->opc = (__u64)it->it_op;
483
2c580836 484 if (obddev->u.cli.cl_default_mds_easize > 0)
485 easize = obddev->u.cli.cl_default_mds_easize;
486 else
487 easize = obddev->u.cli.cl_max_mds_easize;
488
d7e09d03 489 /* pack the intended request */
2c580836 490 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
d7e09d03 491
2c580836 492 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
d7e09d03
PT
493 if (client_is_remote(exp))
494 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
495 sizeof(struct mdt_remote_perm));
496 ptlrpc_request_set_replen(req);
0a3bdb00 497 return req;
d7e09d03
PT
498}
499
500static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
501 struct lookup_intent *it,
502 struct md_op_data *unused)
503{
504 struct obd_device *obd = class_exp2obd(exp);
505 struct ptlrpc_request *req;
506 struct ldlm_intent *lit;
507 struct layout_intent *layout;
508 int rc;
d7e09d03
PT
509
510 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
511 &RQF_LDLM_INTENT_LAYOUT);
512 if (req == NULL)
0a3bdb00 513 return ERR_PTR(-ENOMEM);
d7e09d03
PT
514
515 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
516 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517 if (rc) {
518 ptlrpc_request_free(req);
0a3bdb00 519 return ERR_PTR(rc);
d7e09d03
PT
520 }
521
522 /* pack the intent */
523 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
524 lit->opc = (__u64)it->it_op;
525
526 /* pack the layout intent request */
527 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
528 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
529 * set for replication */
530 layout->li_opc = LAYOUT_INTENT_ACCESS;
531
532 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
44779340 533 obd->u.cli.cl_default_mds_easize);
d7e09d03 534 ptlrpc_request_set_replen(req);
0a3bdb00 535 return req;
d7e09d03
PT
536}
537
538static struct ptlrpc_request *
539mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540{
541 struct ptlrpc_request *req;
542 int rc;
d7e09d03
PT
543
544 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545 if (req == NULL)
0a3bdb00 546 return ERR_PTR(-ENOMEM);
d7e09d03
PT
547
548 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549 if (rc) {
550 ptlrpc_request_free(req);
0a3bdb00 551 return ERR_PTR(rc);
d7e09d03
PT
552 }
553
554 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555 ptlrpc_request_set_replen(req);
0a3bdb00 556 return req;
d7e09d03
PT
557}
558
559static int mdc_finish_enqueue(struct obd_export *exp,
560 struct ptlrpc_request *req,
561 struct ldlm_enqueue_info *einfo,
562 struct lookup_intent *it,
563 struct lustre_handle *lockh,
564 int rc)
565{
566 struct req_capsule *pill = &req->rq_pill;
567 struct ldlm_request *lockreq;
568 struct ldlm_reply *lockrep;
569 struct lustre_intent_data *intent = &it->d.lustre;
570 struct ldlm_lock *lock;
571 void *lvb_data = NULL;
572 int lvb_len = 0;
d7e09d03
PT
573
574 LASSERT(rc >= 0);
575 /* Similarly, if we're going to replay this request, we don't want to
576 * actually get a lock, just perform the intent. */
577 if (req->rq_transno || req->rq_replay) {
578 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
579 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
580 }
581
582 if (rc == ELDLM_LOCK_ABORTED) {
583 einfo->ei_mode = 0;
584 memset(lockh, 0, sizeof(*lockh));
585 rc = 0;
586 } else { /* rc = 0 */
587 lock = ldlm_handle2lock(lockh);
588 LASSERT(lock != NULL);
589
590 /* If the server gave us back a different lock mode, we should
591 * fix up our variables. */
592 if (lock->l_req_mode != einfo->ei_mode) {
593 ldlm_lock_addref(lockh, lock->l_req_mode);
594 ldlm_lock_decref(lockh, einfo->ei_mode);
595 einfo->ei_mode = lock->l_req_mode;
596 }
597 LDLM_LOCK_PUT(lock);
598 }
599
600 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
601 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
602
603 intent->it_disposition = (int)lockrep->lock_policy_res1;
604 intent->it_status = (int)lockrep->lock_policy_res2;
605 intent->it_lock_mode = einfo->ei_mode;
606 intent->it_lock_handle = lockh->cookie;
607 intent->it_data = req;
608
609 /* Technically speaking rq_transno must already be zero if
610 * it_status is in error, so the check is a bit redundant */
611 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
612 mdc_clear_replay_flag(req, intent->it_status);
613
614 /* If we're doing an IT_OPEN which did not result in an actual
615 * successful open, then we need to remove the bit which saves
616 * this request for unconditional replay.
617 *
618 * It's important that we do this first! Otherwise we might exit the
619 * function without doing so, and try to replay a failed create
620 * (bug 3440) */
621 if (it->it_op & IT_OPEN && req->rq_replay &&
301af906 622 (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
d7e09d03
PT
623 mdc_clear_replay_flag(req, intent->it_status);
624
625 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
626 it->it_op, intent->it_disposition, intent->it_status);
627
628 /* We know what to expect, so we do any byte flipping required here */
629 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
630 struct mdt_body *body;
631
632 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633 if (body == NULL) {
ffdac6ce 634 CERROR("Can't swab mdt_body\n");
0a3bdb00 635 return -EPROTO;
d7e09d03
PT
636 }
637
638 if (it_disposition(it, DISP_OPEN_OPEN) &&
639 !it_open_error(DISP_OPEN_OPEN, it)) {
640 /*
641 * If this is a successful OPEN request, we need to set
642 * replay handler and data early, so that if replay
643 * happens immediately after swabbing below, new reply
644 * is swabbed by that handler correctly.
645 */
63d42578 646 mdc_set_open_replay_data(NULL, NULL, it);
d7e09d03
PT
647 }
648
649 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
650 void *eadata;
651
652 mdc_update_max_ea_from_body(exp, body);
653
654 /*
655 * The eadata is opaque; just check that it is there.
656 * Eventually, obd_unpackmd() will check the contents.
657 */
658 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
659 body->eadatasize);
660 if (eadata == NULL)
0a3bdb00 661 return -EPROTO;
d7e09d03
PT
662
663 /* save lvb data and length in case this is for layout
664 * lock */
665 lvb_data = eadata;
666 lvb_len = body->eadatasize;
667
668 /*
669 * We save the reply LOV EA in case we have to replay a
670 * create for recovery. If we didn't allocate a large
671 * enough request buffer above we need to reallocate it
672 * here to hold the actual LOV EA.
673 *
674 * To not save LOV EA if request is not going to replay
675 * (for example error one).
676 */
677 if ((it->it_op & IT_OPEN) && req->rq_replay) {
678 void *lmm;
7436d070 679
d7e09d03
PT
680 if (req_capsule_get_size(pill, &RMF_EADATA,
681 RCL_CLIENT) <
682 body->eadatasize)
683 mdc_realloc_openmsg(req, body);
684 else
685 req_capsule_shrink(pill, &RMF_EADATA,
686 body->eadatasize,
687 RCL_CLIENT);
688
689 req_capsule_set_size(pill, &RMF_EADATA,
690 RCL_CLIENT,
691 body->eadatasize);
692
693 lmm = req_capsule_client_get(pill, &RMF_EADATA);
694 if (lmm)
695 memcpy(lmm, eadata, body->eadatasize);
696 }
697 }
698
699 if (body->valid & OBD_MD_FLRMTPERM) {
700 struct mdt_remote_perm *perm;
701
702 LASSERT(client_is_remote(exp));
703 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704 lustre_swab_mdt_remote_perm);
705 if (perm == NULL)
0a3bdb00 706 return -EPROTO;
d7e09d03
PT
707 }
708 if (body->valid & OBD_MD_FLMDSCAPA) {
709 struct lustre_capa *capa, *p;
710
711 capa = req_capsule_server_get(pill, &RMF_CAPA1);
712 if (capa == NULL)
0a3bdb00 713 return -EPROTO;
d7e09d03
PT
714
715 if (it->it_op & IT_OPEN) {
716 /* client fid capa will be checked in replay */
717 p = req_capsule_client_get(pill, &RMF_CAPA2);
718 LASSERT(p);
719 *p = *capa;
720 }
721 }
722 if (body->valid & OBD_MD_FLOSSCAPA) {
723 struct lustre_capa *capa;
724
725 capa = req_capsule_server_get(pill, &RMF_CAPA2);
726 if (capa == NULL)
0a3bdb00 727 return -EPROTO;
d7e09d03
PT
728 }
729 } else if (it->it_op & IT_LAYOUT) {
730 /* maybe the lock was granted right away and layout
731 * is packed into RMF_DLM_LVB of req */
732 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
733 if (lvb_len > 0) {
734 lvb_data = req_capsule_server_sized_get(pill,
735 &RMF_DLM_LVB, lvb_len);
736 if (lvb_data == NULL)
0a3bdb00 737 return -EPROTO;
d7e09d03
PT
738 }
739 }
740
741 /* fill in stripe data for layout lock */
742 lock = ldlm_handle2lock(lockh);
743 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
744 void *lmm;
745
746 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747 ldlm_it2str(it->it_op), lvb_len);
748
33784467 749 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
d7e09d03
PT
750 if (lmm == NULL) {
751 LDLM_LOCK_PUT(lock);
0a3bdb00 752 return -ENOMEM;
d7e09d03
PT
753 }
754 memcpy(lmm, lvb_data, lvb_len);
755
756 /* install lvb_data */
757 lock_res_and_lock(lock);
758 if (lock->l_lvb_data == NULL) {
04aa5d15 759 lock->l_lvb_type = LVB_T_LAYOUT;
d7e09d03
PT
760 lock->l_lvb_data = lmm;
761 lock->l_lvb_len = lvb_len;
762 lmm = NULL;
763 }
764 unlock_res_and_lock(lock);
765 if (lmm != NULL)
33784467 766 kvfree(lmm);
d7e09d03
PT
767 }
768 if (lock != NULL)
769 LDLM_LOCK_PUT(lock);
770
0a3bdb00 771 return rc;
d7e09d03
PT
772}
773
774/* We always reserve enough space in the reply packet for a stripe MD, because
775 * we don't know in advance the file type. */
776int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777 struct lookup_intent *it, struct md_op_data *op_data,
778 struct lustre_handle *lockh, void *lmm, int lmmsize,
ab909585 779 struct ptlrpc_request **reqp, u64 extra_lock_flags)
d7e09d03 780{
1a4cd3e9
SM
781 static const ldlm_policy_data_t lookup_policy = {
782 .l_inodebits = { MDS_INODELOCK_LOOKUP }
783 };
784 static const ldlm_policy_data_t update_policy = {
785 .l_inodebits = { MDS_INODELOCK_UPDATE }
786 };
787 static const ldlm_policy_data_t layout_policy = {
788 .l_inodebits = { MDS_INODELOCK_LAYOUT }
789 };
7fc1f831 790 static const ldlm_policy_data_t getxattr_policy = {
1a4cd3e9
SM
791 .l_inodebits = { MDS_INODELOCK_XATTR }
792 };
d7e09d03 793 ldlm_policy_data_t const *policy = &lookup_policy;
ab909585
SM
794 struct obd_device *obddev = class_exp2obd(exp);
795 struct ptlrpc_request *req;
796 u64 flags, saved_flags = extra_lock_flags;
797 struct ldlm_res_id res_id;
798 int generation, resends = 0;
799 struct ldlm_reply *lockrep;
800 enum lvb_type lvb_type = LVB_T_NONE;
801 int rc;
d7e09d03
PT
802
803 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
804 einfo->ei_type);
805
806 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
807
808 if (it) {
809 saved_flags |= LDLM_FL_HAS_INTENT;
810 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
811 policy = &update_policy;
812 else if (it->it_op & IT_LAYOUT)
813 policy = &layout_policy;
7fc1f831
AP
814 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
815 policy = &getxattr_policy;
d7e09d03
PT
816 }
817
818 LASSERT(reqp == NULL);
819
820 generation = obddev->u.cli.cl_import->imp_generation;
821resend:
822 flags = saved_flags;
823 if (!it) {
824 /* The only way right now is FLOCK, in this case we hide flock
825 policy as lmm, but lmmsize is 0 */
826 LASSERT(lmm && lmmsize == 0);
827 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
828 einfo->ei_type);
3cf8e32d 829 policy = lmm;
d7e09d03 830 res_id.name[3] = LDLM_FLOCK;
3a09f36e 831 req = NULL;
d7e09d03
PT
832 } else if (it->it_op & IT_OPEN) {
833 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
834 einfo->ei_cbdata);
835 policy = &update_policy;
836 einfo->ei_cbdata = NULL;
837 lmm = NULL;
838 } else if (it->it_op & IT_UNLINK) {
839 req = mdc_intent_unlink_pack(exp, it, op_data);
840 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
841 req = mdc_intent_getattr_pack(exp, it, op_data);
842 } else if (it->it_op & IT_READDIR) {
843 req = mdc_enqueue_pack(exp, 0);
844 } else if (it->it_op & IT_LAYOUT) {
845 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
0a3bdb00 846 return -EOPNOTSUPP;
d7e09d03
PT
847 req = mdc_intent_layout_pack(exp, it, op_data);
848 lvb_type = LVB_T_LAYOUT;
e93a3082 849 } else if (it->it_op & IT_GETXATTR) {
7fc1f831 850 req = mdc_intent_getxattr_pack(exp, it, op_data);
d7e09d03
PT
851 } else {
852 LBUG();
0a3bdb00 853 return -EINVAL;
d7e09d03
PT
854 }
855
856 if (IS_ERR(req))
0a3bdb00 857 return PTR_ERR(req);
d7e09d03
PT
858
859 if (req != NULL && it && it->it_op & IT_CREAT)
860 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
861 * retry logic */
862 req->rq_no_retry_einprogress = 1;
863
864 if (resends) {
865 req->rq_generation_set = 1;
866 req->rq_import_generation = generation;
7264b8a5 867 req->rq_sent = get_seconds() + resends;
d7e09d03
PT
868 }
869
870 /* It is important to obtain rpc_lock first (if applicable), so that
871 * threads that are serialised with rpc_lock are not polluting our
872 * rpcs in flight counter. We do not do flock request limiting, though*/
873 if (it) {
874 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875 rc = mdc_enter_request(&obddev->u.cli);
876 if (rc != 0) {
877 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
878 mdc_clear_replay_flag(req, 0);
879 ptlrpc_req_finished(req);
0a3bdb00 880 return rc;
d7e09d03
PT
881 }
882 }
883
884 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
885 0, lvb_type, lockh, 0);
886 if (!it) {
34ca8748 887 /* For flock requests we immediately return without further
d7e09d03
PT
888 delay and let caller deal with the rest, since rest of
889 this function metadata processing makes no sense for flock
cd6b328c
BF
890 requests anyway. But in case of problem during comms with
891 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
892 can not rely on caller and this mainly for F_UNLCKs
893 (explicits or automatically generated by Kernel to clean
894 current FLocks upon exit) that can't be trashed */
895 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
896 goto resend;
0a3bdb00 897 return rc;
d7e09d03
PT
898 }
899
900 mdc_exit_request(&obddev->u.cli);
901 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
902
903 if (rc < 0) {
e49634bb
AD
904 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
905 "%s: ldlm_cli_enqueue failed: rc = %d\n",
906 obddev->obd_name, rc);
907
d7e09d03
PT
908 mdc_clear_replay_flag(req, rc);
909 ptlrpc_req_finished(req);
0a3bdb00 910 return rc;
d7e09d03
PT
911 }
912
913 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
914 LASSERT(lockrep != NULL);
915
2d58de78
LW
916 lockrep->lock_policy_res2 =
917 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
918
d7e09d03
PT
919 /* Retry the create infinitely when we get -EINPROGRESS from
920 * server. This is required by the new quota design. */
921 if (it && it->it_op & IT_CREAT &&
922 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
923 mdc_clear_replay_flag(req, rc);
924 ptlrpc_req_finished(req);
925 resends++;
926
927 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
928 obddev->obd_name, resends, it->it_op,
929 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
930
931 if (generation == obddev->u.cli.cl_import->imp_generation) {
932 goto resend;
933 } else {
934 CDEBUG(D_HA, "resend cross eviction\n");
0a3bdb00 935 return -EIO;
d7e09d03
PT
936 }
937 }
938
939 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
940 if (rc < 0) {
941 if (lustre_handle_is_used(lockh)) {
942 ldlm_lock_decref(lockh, einfo->ei_mode);
943 memset(lockh, 0, sizeof(*lockh));
944 }
945 ptlrpc_req_finished(req);
7591805a
JH
946
947 it->d.lustre.it_lock_handle = 0;
948 it->d.lustre.it_lock_mode = 0;
949 it->d.lustre.it_data = NULL;
d7e09d03 950 }
7591805a 951
0a3bdb00 952 return rc;
d7e09d03
PT
953}
954
955static int mdc_finish_intent_lock(struct obd_export *exp,
956 struct ptlrpc_request *request,
957 struct md_op_data *op_data,
958 struct lookup_intent *it,
959 struct lustre_handle *lockh)
960{
961 struct lustre_handle old_lock;
962 struct mdt_body *mdt_body;
963 struct ldlm_lock *lock;
964 int rc;
d7e09d03
PT
965
966 LASSERT(request != NULL);
967 LASSERT(request != LP_POISON);
968 LASSERT(request->rq_repmsg != LP_POISON);
969
970 if (!it_disposition(it, DISP_IT_EXECD)) {
971 /* The server failed before it even started executing the
972 * intent, i.e. because it couldn't unpack the request. */
973 LASSERT(it->d.lustre.it_status != 0);
0a3bdb00 974 return it->d.lustre.it_status;
d7e09d03
PT
975 }
976 rc = it_open_error(DISP_IT_EXECD, it);
977 if (rc)
0a3bdb00 978 return rc;
d7e09d03
PT
979
980 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
981 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
982
983 /* If we were revalidating a fid/name pair, mark the intent in
984 * case we fail and get called again from lookup */
985 if (fid_is_sane(&op_data->op_fid2) &&
986 it->it_create_mode & M_CHECK_STALE &&
987 it->it_op != IT_GETATTR) {
d7e09d03
PT
988
989 /* Also: did we find the same inode? */
990 /* sever can return one of two fids:
991 * op_fid2 - new allocated fid - if file is created.
992 * op_fid3 - existent fid - if file only open.
993 * op_fid3 is saved in lmv_intent_open */
994 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
995 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
996 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
997 "\n", PFID(&op_data->op_fid2),
998 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
0a3bdb00 999 return -ESTALE;
d7e09d03
PT
1000 }
1001 }
1002
1003 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1004 if (rc)
0a3bdb00 1005 return rc;
d7e09d03
PT
1006
1007 /* keep requests around for the multiple phases of the call
1008 * this shows the DISP_XX must guarantee we make it into the call
1009 */
1010 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1011 it_disposition(it, DISP_OPEN_CREATE) &&
1012 !it_open_error(DISP_OPEN_CREATE, it)) {
1013 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1014 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1015 }
1016 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1017 it_disposition(it, DISP_OPEN_OPEN) &&
1018 !it_open_error(DISP_OPEN_OPEN, it)) {
1019 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1020 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1021 /* BUG 11546 - eviction in the middle of open rpc processing */
1022 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1023 }
1024
1025 if (it->it_op & IT_CREAT) {
1026 /* XXX this belongs in ll_create_it */
1027 } else if (it->it_op == IT_OPEN) {
1028 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1029 } else {
1030 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1031 }
1032
1033 /* If we already have a matching lock, then cancel the new
1034 * one. We have to set the data here instead of in
1035 * mdc_enqueue, because we need to use the child's inode as
1036 * the l_ast_data to match, and that's not available until
1037 * intent_finish has performed the iget().) */
1038 lock = ldlm_handle2lock(lockh);
1039 if (lock) {
1040 ldlm_policy_data_t policy = lock->l_policy_data;
7436d070 1041
d7e09d03
PT
1042 LDLM_DEBUG(lock, "matching against this");
1043
1044 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1045 &lock->l_resource->lr_name),
6d95e048
AD
1046 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1047 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
d7e09d03
PT
1048 LDLM_LOCK_PUT(lock);
1049
1050 memcpy(&old_lock, lockh, sizeof(*lockh));
1051 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
e5e663ae
SM
1052 LDLM_IBITS, &policy, LCK_NL,
1053 &old_lock, 0)) {
d7e09d03
PT
1054 ldlm_lock_decref_and_cancel(lockh,
1055 it->d.lustre.it_lock_mode);
1056 memcpy(lockh, &old_lock, sizeof(old_lock));
1057 it->d.lustre.it_lock_handle = lockh->cookie;
1058 }
1059 }
301af906
SM
1060 CDEBUG(D_DENTRY,
1061 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
d7e09d03
PT
1062 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1063 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
0a3bdb00 1064 return rc;
d7e09d03
PT
1065}
1066
1067int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1068 struct lu_fid *fid, __u64 *bits)
1069{
1070 /* We could just return 1 immediately, but since we should only
1071 * be called in revalidate_it if we already have a lock, let's
1072 * verify that. */
1073 struct ldlm_res_id res_id;
1074 struct lustre_handle lockh;
1075 ldlm_policy_data_t policy;
1076 ldlm_mode_t mode;
d7e09d03
PT
1077
1078 if (it->d.lustre.it_lock_handle) {
1079 lockh.cookie = it->d.lustre.it_lock_handle;
1080 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1081 } else {
1082 fid_build_reg_res_name(fid, &res_id);
1083 switch (it->it_op) {
1084 case IT_GETATTR:
bf08ee0d
OD
1085 /* File attributes are held under multiple bits:
1086 * nlink is under lookup lock, size and times are
1087 * under UPDATE lock and recently we've also got
1088 * a separate permissions lock for owner/group/acl that
1089 * were protected by lookup lock before.
1090 * Getattr must provide all of that information,
1091 * so we need to ensure we have all of those locks.
1092 * Unfortunately, if the bits are split across multiple
1093 * locks, there's no easy way to match all of them here,
1094 * so an extra RPC would be performed to fetch all
1095 * of those bits at once for now. */
fe4c58af 1096 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1097 * but for old MDTs (< 2.4), permission is covered
1098 * by LOOKUP lock, so it needs to match all bits here.*/
bf08ee0d
OD
1099 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100 MDS_INODELOCK_LOOKUP |
1101 MDS_INODELOCK_PERM;
d7e09d03
PT
1102 break;
1103 case IT_LAYOUT:
1104 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1105 break;
1106 default:
1107 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108 break;
1109 }
bf08ee0d 1110
6caea2f9 1111 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
d7e09d03 1112 LDLM_IBITS, &policy,
6caea2f9
AL
1113 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1114 &lockh);
d7e09d03
PT
1115 }
1116
1117 if (mode) {
1118 it->d.lustre.it_lock_handle = lockh.cookie;
1119 it->d.lustre.it_lock_mode = mode;
1120 } else {
1121 it->d.lustre.it_lock_handle = 0;
1122 it->d.lustre.it_lock_mode = 0;
1123 }
1124
0a3bdb00 1125 return !!mode;
d7e09d03
PT
1126}
1127
1128/*
1129 * This long block is all about fixing up the lock and request state
1130 * so that it is correct as of the moment _before_ the operation was
1131 * applied; that way, the VFS will think that everything is normal and
1132 * call Lustre's regular VFS methods.
1133 *
1134 * If we're performing a creation, that means that unless the creation
1135 * failed with EEXIST, we should fake up a negative dentry.
1136 *
1137 * For everything else, we want to lookup to succeed.
1138 *
1139 * One additional note: if CREATE or OPEN succeeded, we add an extra
1140 * reference to the request because we need to keep it around until
1141 * ll_create/ll_open gets called.
1142 *
1143 * The server will return to us, in it_disposition, an indication of
1144 * exactly what d.lustre.it_status refers to.
1145 *
1146 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1147 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1148 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1149 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1150 * was successful.
1151 *
1152 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1153 * child lookup.
1154 */
1155int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1156 void *lmm, int lmmsize, struct lookup_intent *it,
1157 int lookup_flags, struct ptlrpc_request **reqp,
1158 ldlm_blocking_callback cb_blocking,
1159 __u64 extra_lock_flags)
1160{
f236f69b
LS
1161 struct ldlm_enqueue_info einfo = {
1162 .ei_type = LDLM_IBITS,
1163 .ei_mode = it_to_lock_mode(it),
1164 .ei_cb_bl = cb_blocking,
1165 .ei_cb_cp = ldlm_completion_ast,
1166 };
d7e09d03
PT
1167 struct lustre_handle lockh;
1168 int rc = 0;
29aaf496 1169
d7e09d03
PT
1170 LASSERT(it);
1171
1172 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
d3a8a4e2
JX
1173 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1174 op_data->op_name, PFID(&op_data->op_fid2),
1175 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1176 it->it_flags);
d7e09d03
PT
1177
1178 lockh.cookie = 0;
1179 if (fid_is_sane(&op_data->op_fid2) &&
1180 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1181 /* We could just return 1 immediately, but since we should only
1182 * be called in revalidate_it if we already have a lock, let's
1183 * verify that. */
1184 it->d.lustre.it_lock_handle = 0;
1185 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1186 /* Only return failure if it was not GETATTR by cfid
1187 (from inode_revalidate) */
1188 if (rc || op_data->op_namelen != 0)
0a3bdb00 1189 return rc;
d7e09d03
PT
1190 }
1191
f236f69b
LS
1192 /* For case if upper layer did not alloc fid, do it now. */
1193 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1194 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1195 if (rc < 0) {
1196 CERROR("Can't alloc new fid, rc %d\n", rc);
0a3bdb00 1197 return rc;
f236f69b 1198 }
d7e09d03 1199 }
f236f69b
LS
1200 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1201 extra_lock_flags);
1202 if (rc < 0)
1203 return rc;
1204
d7e09d03
PT
1205 *reqp = it->d.lustre.it_data;
1206 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
0a3bdb00 1207 return rc;
d7e09d03
PT
1208}
1209
1210static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1211 struct ptlrpc_request *req,
1212 void *args, int rc)
1213{
1214 struct mdc_getattr_args *ga = args;
1215 struct obd_export *exp = ga->ga_exp;
1216 struct md_enqueue_info *minfo = ga->ga_minfo;
1217 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1218 struct lookup_intent *it;
1219 struct lustre_handle *lockh;
1220 struct obd_device *obddev;
2d58de78 1221 struct ldlm_reply *lockrep;
d7e09d03 1222 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03
PT
1223
1224 it = &minfo->mi_it;
1225 lockh = &minfo->mi_lockh;
1226
1227 obddev = class_exp2obd(exp);
1228
1229 mdc_exit_request(&obddev->u.cli);
1230 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1231 rc = -ETIMEDOUT;
1232
1233 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1234 &flags, NULL, 0, lockh, rc);
1235 if (rc < 0) {
1236 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1237 mdc_clear_replay_flag(req, rc);
d5fdc207 1238 goto out;
d7e09d03
PT
1239 }
1240
2d58de78
LW
1241 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1242 LASSERT(lockrep != NULL);
1243
1244 lockrep->lock_policy_res2 =
1245 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1246
d7e09d03
PT
1247 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1248 if (rc)
d5fdc207 1249 goto out;
d7e09d03
PT
1250
1251 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
d7e09d03
PT
1252
1253out:
7b81779d 1254 kfree(einfo);
d7e09d03
PT
1255 minfo->mi_cb(req, minfo, rc);
1256 return 0;
1257}
1258
1259int mdc_intent_getattr_async(struct obd_export *exp,
1260 struct md_enqueue_info *minfo,
1261 struct ldlm_enqueue_info *einfo)
1262{
1263 struct md_op_data *op_data = &minfo->mi_data;
1264 struct lookup_intent *it = &minfo->mi_it;
1265 struct ptlrpc_request *req;
1266 struct mdc_getattr_args *ga;
1267 struct obd_device *obddev = class_exp2obd(exp);
1268 struct ldlm_res_id res_id;
1269 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1270 * for statahead currently. Consider CMD in future, such two bits
1271 * maybe managed by different MDS, should be adjusted then. */
1272 ldlm_policy_data_t policy = {
1273 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1274 MDS_INODELOCK_UPDATE }
1275 };
1276 int rc = 0;
1277 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03 1278
d3a8a4e2
JX
1279 CDEBUG(D_DLMTRACE,
1280 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1281 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282 ldlm_it2str(it->it_op), it->it_flags);
d7e09d03
PT
1283
1284 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285 req = mdc_intent_getattr_pack(exp, it, op_data);
add882a8
JH
1286 if (IS_ERR(req))
1287 return PTR_ERR(req);
d7e09d03
PT
1288
1289 rc = mdc_enter_request(&obddev->u.cli);
1290 if (rc != 0) {
1291 ptlrpc_req_finished(req);
0a3bdb00 1292 return rc;
d7e09d03
PT
1293 }
1294
1295 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297 if (rc < 0) {
1298 mdc_exit_request(&obddev->u.cli);
1299 ptlrpc_req_finished(req);
0a3bdb00 1300 return rc;
d7e09d03
PT
1301 }
1302
1303 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304 ga = ptlrpc_req_async_args(req);
1305 ga->ga_exp = exp;
1306 ga->ga_minfo = minfo;
1307 ga->ga_einfo = einfo;
1308
1309 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
c5c4c6fa 1310 ptlrpcd_add_req(req);
d7e09d03 1311
0a3bdb00 1312 return 0;
d7e09d03 1313}
This page took 0.746578 seconds and 5 git commands to generate.