staging: lustre: remove RETURN macro
[deliverable/linux.git] / drivers / staging / lustre / lustre / lov / lov_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_LOV
38
39#include <linux/libcfs/libcfs.h>
40
41#include <obd_class.h>
42#include <obd_lov.h>
43#include <lustre/lustre_idl.h>
44
45#include "lov_internal.h"
46
47static void lov_init_set(struct lov_request_set *set)
48{
49 set->set_count = 0;
50 atomic_set(&set->set_completes, 0);
51 atomic_set(&set->set_success, 0);
52 atomic_set(&set->set_finish_checked, 0);
53 set->set_cookies = 0;
54 INIT_LIST_HEAD(&set->set_list);
55 atomic_set(&set->set_refcount, 1);
56 init_waitqueue_head(&set->set_waitq);
57 spin_lock_init(&set->set_lock);
58}
59
60void lov_finish_set(struct lov_request_set *set)
61{
62 struct list_head *pos, *n;
d7e09d03
PT
63
64 LASSERT(set);
65 list_for_each_safe(pos, n, &set->set_list) {
66 struct lov_request *req = list_entry(pos,
67 struct lov_request,
68 rq_link);
69 list_del_init(&req->rq_link);
70
71 if (req->rq_oi.oi_oa)
72 OBDO_FREE(req->rq_oi.oi_oa);
73 if (req->rq_oi.oi_md)
74 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
75 if (req->rq_oi.oi_osfs)
76 OBD_FREE(req->rq_oi.oi_osfs,
77 sizeof(*req->rq_oi.oi_osfs));
78 OBD_FREE(req, sizeof(*req));
79 }
80
81 if (set->set_pga) {
82 int len = set->set_oabufs * sizeof(*set->set_pga);
83 OBD_FREE_LARGE(set->set_pga, len);
84 }
85 if (set->set_lockh)
86 lov_llh_put(set->set_lockh);
87
88 OBD_FREE(set, sizeof(*set));
d7e09d03
PT
89}
90
91int lov_set_finished(struct lov_request_set *set, int idempotent)
92{
93 int completes = atomic_read(&set->set_completes);
94
95 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
96
97 if (completes == set->set_count) {
98 if (idempotent)
99 return 1;
100 if (atomic_inc_return(&set->set_finish_checked) == 1)
101 return 1;
102 }
103 return 0;
104}
105
106void lov_update_set(struct lov_request_set *set,
107 struct lov_request *req, int rc)
108{
109 req->rq_complete = 1;
110 req->rq_rc = rc;
111
112 atomic_inc(&set->set_completes);
113 if (rc == 0)
114 atomic_inc(&set->set_success);
115
116 wake_up(&set->set_waitq);
117}
118
119int lov_update_common_set(struct lov_request_set *set,
120 struct lov_request *req, int rc)
121{
122 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
d7e09d03
PT
123
124 lov_update_set(set, req, rc);
125
126 /* grace error on inactive ost */
127 if (rc && !(lov->lov_tgts[req->rq_idx] &&
128 lov->lov_tgts[req->rq_idx]->ltd_active))
129 rc = 0;
130
131 /* FIXME in raid1 regime, should return 0 */
0a3bdb00 132 return rc;
d7e09d03
PT
133}
134
135void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136{
137 list_add_tail(&req->rq_link, &set->set_list);
138 set->set_count++;
139 req->rq_rqset = set;
140}
141
142static int lov_check_set(struct lov_obd *lov, int idx)
143{
144 int rc = 0;
145 mutex_lock(&lov->lov_lock);
146
147 if (lov->lov_tgts[idx] == NULL ||
148 lov->lov_tgts[idx]->ltd_active ||
149 (lov->lov_tgts[idx]->ltd_exp != NULL &&
150 class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
151 rc = 1;
152
153 mutex_unlock(&lov->lov_lock);
154 return rc;
155}
156
157/* Check if the OSC connection exists and is active.
158 * If the OSC has not yet had a chance to connect to the OST the first time,
159 * wait once for it to connect instead of returning an error.
160 */
161int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
162{
163 wait_queue_head_t waitq;
164 struct l_wait_info lwi;
165 struct lov_tgt_desc *tgt;
166 int rc = 0;
167
168 mutex_lock(&lov->lov_lock);
169
170 tgt = lov->lov_tgts[ost_idx];
171
172 if (unlikely(tgt == NULL))
173 GOTO(out, rc = 0);
174
175 if (likely(tgt->ltd_active))
176 GOTO(out, rc = 1);
177
178 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
179 GOTO(out, rc = 0);
180
181 mutex_unlock(&lov->lov_lock);
182
183 init_waitqueue_head(&waitq);
184 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
185 cfs_time_seconds(1), NULL, NULL);
186
187 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
188 if (tgt != NULL && tgt->ltd_active)
189 return 1;
190
191 return 0;
192
193out:
194 mutex_unlock(&lov->lov_lock);
195 return rc;
196}
197
198extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
199 struct lov_oinfo *loi, int flags,
200 struct ost_lvb *lvb, __u32 mode, int rc);
201
202static int lov_update_enqueue_lov(struct obd_export *exp,
203 struct lustre_handle *lov_lockhp,
204 struct lov_oinfo *loi, int flags, int idx,
205 struct ost_id *oi, int rc)
206{
207 struct lov_obd *lov = &exp->exp_obd->u.lov;
208
209 if (rc != ELDLM_OK &&
210 !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
211 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
212 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
213 /* -EUSERS used by OST to report file contention */
214 if (rc != -EINTR && rc != -EUSERS)
215 CERROR("%s: enqueue objid "DOSTID" subobj"
216 DOSTID" on OST idx %d: rc %d\n",
217 exp->exp_obd->obd_name,
218 POSTID(oi), POSTID(&loi->loi_oi),
219 loi->loi_ost_idx, rc);
220 } else
221 rc = ELDLM_OK;
222 }
223 return rc;
224}
225
226int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
227{
228 struct lov_request_set *set = req->rq_rqset;
229 struct lustre_handle *lov_lockhp;
230 struct obd_info *oi = set->set_oi;
231 struct lov_oinfo *loi;
d7e09d03
PT
232
233 LASSERT(oi != NULL);
234
235 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
236 loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
237
238 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
239 * and that copy can be arbitrarily out of date.
240 *
241 * The LOV API is due for a serious rewriting anyways, and this
242 * can be addressed then. */
243
244 lov_stripe_lock(oi->oi_md);
245 osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
246 &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
247 if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
248 memset(lov_lockhp, 0, sizeof *lov_lockhp);
249 rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
250 req->rq_idx, &oi->oi_md->lsm_oi, rc);
251 lov_stripe_unlock(oi->oi_md);
252 lov_update_set(set, req, rc);
0a3bdb00 253 return rc;
d7e09d03
PT
254}
255
256/* The callback for osc_enqueue that updates lov info for every OSC request. */
257static int cb_update_enqueue(void *cookie, int rc)
258{
259 struct obd_info *oinfo = cookie;
260 struct ldlm_enqueue_info *einfo;
261 struct lov_request *lovreq;
262
263 lovreq = container_of(oinfo, struct lov_request, rq_oi);
264 einfo = lovreq->rq_rqset->set_ei;
265 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
266}
267
268static int enqueue_done(struct lov_request_set *set, __u32 mode)
269{
270 struct lov_request *req;
271 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
272 int completes = atomic_read(&set->set_completes);
273 int rc = 0;
d7e09d03
PT
274
275 /* enqueue/match success, just return */
276 if (completes && completes == atomic_read(&set->set_success))
0a3bdb00 277 return 0;
d7e09d03
PT
278
279 /* cancel enqueued/matched locks */
280 list_for_each_entry(req, &set->set_list, rq_link) {
281 struct lustre_handle *lov_lockhp;
282
283 if (!req->rq_complete || req->rq_rc)
284 continue;
285
286 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
287 LASSERT(lov_lockhp);
288 if (!lustre_handle_is_used(lov_lockhp))
289 continue;
290
291 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
292 req->rq_oi.oi_md, mode, lov_lockhp);
293 if (rc && lov->lov_tgts[req->rq_idx] &&
294 lov->lov_tgts[req->rq_idx]->ltd_active)
295 CERROR("%s: cancelling obdjid "DOSTID" on OST"
296 "idx %d error: rc = %d\n",
297 set->set_exp->exp_obd->obd_name,
298 POSTID(&req->rq_oi.oi_md->lsm_oi),
299 req->rq_idx, rc);
300 }
301 if (set->set_lockh)
302 lov_llh_put(set->set_lockh);
0a3bdb00 303 return rc;
d7e09d03
PT
304}
305
306int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
307 struct ptlrpc_request_set *rqset)
308{
309 int ret = 0;
d7e09d03
PT
310
311 if (set == NULL)
0a3bdb00 312 return 0;
d7e09d03
PT
313 LASSERT(set->set_exp);
314 /* Do enqueue_done only for sync requests and if any request
315 * succeeded. */
316 if (!rqset) {
317 if (rc)
318 atomic_set(&set->set_completes, 0);
319 ret = enqueue_done(set, mode);
320 } else if (set->set_lockh)
321 lov_llh_put(set->set_lockh);
322
323 lov_put_reqset(set);
324
0a3bdb00 325 return rc ? rc : ret;
d7e09d03
PT
326}
327
328static void lov_llh_addref(void *llhp)
329{
330 struct lov_lock_handles *llh = llhp;
331
332 atomic_inc(&llh->llh_refcount);
333 CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
334 atomic_read(&llh->llh_refcount));
335}
336
337static struct portals_handle_ops lov_handle_ops = {
338 .hop_addref = lov_llh_addref,
339 .hop_free = NULL,
340};
341
342static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
343{
344 struct lov_lock_handles *llh;
345
346 OBD_ALLOC(llh, sizeof *llh +
347 sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
348 if (llh == NULL)
349 return NULL;
350
351 atomic_set(&llh->llh_refcount, 2);
352 llh->llh_stripe_count = lsm->lsm_stripe_count;
353 INIT_LIST_HEAD(&llh->llh_handle.h_link);
354 class_handle_hash(&llh->llh_handle, &lov_handle_ops);
355
356 return llh;
357}
358
359int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
360 struct ldlm_enqueue_info *einfo,
361 struct lov_request_set **reqset)
362{
363 struct lov_obd *lov = &exp->exp_obd->u.lov;
364 struct lov_request_set *set;
365 int i, rc = 0;
d7e09d03
PT
366
367 OBD_ALLOC(set, sizeof(*set));
368 if (set == NULL)
0a3bdb00 369 return -ENOMEM;
d7e09d03
PT
370 lov_init_set(set);
371
372 set->set_exp = exp;
373 set->set_oi = oinfo;
374 set->set_ei = einfo;
375 set->set_lockh = lov_llh_new(oinfo->oi_md);
376 if (set->set_lockh == NULL)
377 GOTO(out_set, rc = -ENOMEM);
378 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
379
380 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
381 struct lov_oinfo *loi;
382 struct lov_request *req;
383 obd_off start, end;
384
385 loi = oinfo->oi_md->lsm_oinfo[i];
386 if (!lov_stripe_intersects(oinfo->oi_md, i,
387 oinfo->oi_policy.l_extent.start,
388 oinfo->oi_policy.l_extent.end,
389 &start, &end))
390 continue;
391
392 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
393 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
394 continue;
395 }
396
397 OBD_ALLOC(req, sizeof(*req));
398 if (req == NULL)
399 GOTO(out_set, rc = -ENOMEM);
400
401 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
402 sizeof(struct lov_oinfo *) +
403 sizeof(struct lov_oinfo);
404 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
405 if (req->rq_oi.oi_md == NULL) {
406 OBD_FREE(req, sizeof(*req));
407 GOTO(out_set, rc = -ENOMEM);
408 }
409 req->rq_oi.oi_md->lsm_oinfo[0] =
410 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
411 sizeof(struct lov_oinfo *);
412
413 /* Set lov request specific parameters. */
414 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
415 req->rq_oi.oi_cb_up = cb_update_enqueue;
416 req->rq_oi.oi_flags = oinfo->oi_flags;
417
418 LASSERT(req->rq_oi.oi_lockh);
419
420 req->rq_oi.oi_policy.l_extent.gid =
421 oinfo->oi_policy.l_extent.gid;
422 req->rq_oi.oi_policy.l_extent.start = start;
423 req->rq_oi.oi_policy.l_extent.end = end;
424
425 req->rq_idx = loi->loi_ost_idx;
426 req->rq_stripe = i;
427
428 /* XXX LOV STACKING: submd should be from the subobj */
429 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
430 req->rq_oi.oi_md->lsm_stripe_count = 0;
431 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
432 loi->loi_kms_valid;
433 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
434 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
435
436 lov_set_add_req(req, set);
437 }
438 if (!set->set_count)
439 GOTO(out_set, rc = -EIO);
440 *reqset = set;
0a3bdb00 441 return 0;
d7e09d03
PT
442out_set:
443 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
0a3bdb00 444 return rc;
d7e09d03
PT
445}
446
447int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
448{
449 int rc = 0;
d7e09d03
PT
450
451 if (set == NULL)
0a3bdb00 452 return 0;
d7e09d03
PT
453 LASSERT(set->set_exp);
454 rc = enqueue_done(set, mode);
455 if ((set->set_count == atomic_read(&set->set_success)) &&
456 (flags & LDLM_FL_TEST_LOCK))
457 lov_llh_put(set->set_lockh);
458
459 lov_put_reqset(set);
460
0a3bdb00 461 return rc;
d7e09d03
PT
462}
463
464int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
465 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
466 __u32 mode, struct lustre_handle *lockh,
467 struct lov_request_set **reqset)
468{
469 struct lov_obd *lov = &exp->exp_obd->u.lov;
470 struct lov_request_set *set;
471 int i, rc = 0;
d7e09d03
PT
472
473 OBD_ALLOC(set, sizeof(*set));
474 if (set == NULL)
0a3bdb00 475 return -ENOMEM;
d7e09d03
PT
476 lov_init_set(set);
477
478 set->set_exp = exp;
479 set->set_oi = oinfo;
480 set->set_oi->oi_md = lsm;
481 set->set_lockh = lov_llh_new(lsm);
482 if (set->set_lockh == NULL)
483 GOTO(out_set, rc = -ENOMEM);
484 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
485
486 for (i = 0; i < lsm->lsm_stripe_count; i++){
487 struct lov_oinfo *loi;
488 struct lov_request *req;
489 obd_off start, end;
490
491 loi = lsm->lsm_oinfo[i];
492 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
493 policy->l_extent.end, &start, &end))
494 continue;
495
496 /* FIXME raid1 should grace this error */
497 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
498 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
499 GOTO(out_set, rc = -EIO);
500 }
501
502 OBD_ALLOC(req, sizeof(*req));
503 if (req == NULL)
504 GOTO(out_set, rc = -ENOMEM);
505
506 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
507 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
508 if (req->rq_oi.oi_md == NULL) {
509 OBD_FREE(req, sizeof(*req));
510 GOTO(out_set, rc = -ENOMEM);
511 }
512
513 req->rq_oi.oi_policy.l_extent.start = start;
514 req->rq_oi.oi_policy.l_extent.end = end;
515 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
516
517 req->rq_idx = loi->loi_ost_idx;
518 req->rq_stripe = i;
519
520 /* XXX LOV STACKING: submd should be from the subobj */
521 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
522 req->rq_oi.oi_md->lsm_stripe_count = 0;
523
524 lov_set_add_req(req, set);
525 }
526 if (!set->set_count)
527 GOTO(out_set, rc = -EIO);
528 *reqset = set;
0a3bdb00 529 return rc;
d7e09d03
PT
530out_set:
531 lov_fini_match_set(set, mode, 0);
0a3bdb00 532 return rc;
d7e09d03
PT
533}
534
535int lov_fini_cancel_set(struct lov_request_set *set)
536{
537 int rc = 0;
d7e09d03
PT
538
539 if (set == NULL)
0a3bdb00 540 return 0;
d7e09d03
PT
541
542 LASSERT(set->set_exp);
543 if (set->set_lockh)
544 lov_llh_put(set->set_lockh);
545
546 lov_put_reqset(set);
547
0a3bdb00 548 return rc;
d7e09d03
PT
549}
550
551int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
552 struct lov_stripe_md *lsm, __u32 mode,
553 struct lustre_handle *lockh,
554 struct lov_request_set **reqset)
555{
556 struct lov_request_set *set;
557 int i, rc = 0;
d7e09d03
PT
558
559 OBD_ALLOC(set, sizeof(*set));
560 if (set == NULL)
0a3bdb00 561 return -ENOMEM;
d7e09d03
PT
562 lov_init_set(set);
563
564 set->set_exp = exp;
565 set->set_oi = oinfo;
566 set->set_oi->oi_md = lsm;
567 set->set_lockh = lov_handle2llh(lockh);
568 if (set->set_lockh == NULL) {
569 CERROR("LOV: invalid lov lock handle %p\n", lockh);
570 GOTO(out_set, rc = -EINVAL);
571 }
572 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
573
574 for (i = 0; i < lsm->lsm_stripe_count; i++){
575 struct lov_request *req;
576 struct lustre_handle *lov_lockhp;
577 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
578
579 lov_lockhp = set->set_lockh->llh_handles + i;
580 if (!lustre_handle_is_used(lov_lockhp)) {
581 CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
582 loi->loi_ost_idx, POSTID(&loi->loi_oi));
583 continue;
584 }
585
586 OBD_ALLOC(req, sizeof(*req));
587 if (req == NULL)
588 GOTO(out_set, rc = -ENOMEM);
589
590 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
591 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
592 if (req->rq_oi.oi_md == NULL) {
593 OBD_FREE(req, sizeof(*req));
594 GOTO(out_set, rc = -ENOMEM);
595 }
596
597 req->rq_idx = loi->loi_ost_idx;
598 req->rq_stripe = i;
599
600 /* XXX LOV STACKING: submd should be from the subobj */
601 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
602 req->rq_oi.oi_md->lsm_stripe_count = 0;
603
604 lov_set_add_req(req, set);
605 }
606 if (!set->set_count)
607 GOTO(out_set, rc = -EIO);
608 *reqset = set;
0a3bdb00 609 return rc;
d7e09d03
PT
610out_set:
611 lov_fini_cancel_set(set);
0a3bdb00 612 return rc;
d7e09d03
PT
613}
614static int common_attr_done(struct lov_request_set *set)
615{
616 struct list_head *pos;
617 struct lov_request *req;
618 struct obdo *tmp_oa;
619 int rc = 0, attrset = 0;
d7e09d03
PT
620
621 LASSERT(set->set_oi != NULL);
622
623 if (set->set_oi->oi_oa == NULL)
0a3bdb00 624 return 0;
d7e09d03
PT
625
626 if (!atomic_read(&set->set_success))
0a3bdb00 627 return -EIO;
d7e09d03
PT
628
629 OBDO_ALLOC(tmp_oa);
630 if (tmp_oa == NULL)
631 GOTO(out, rc = -ENOMEM);
632
633 list_for_each (pos, &set->set_list) {
634 req = list_entry(pos, struct lov_request, rq_link);
635
636 if (!req->rq_complete || req->rq_rc)
637 continue;
638 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
639 continue;
640 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
641 req->rq_oi.oi_oa->o_valid,
642 set->set_oi->oi_md, req->rq_stripe, &attrset);
643 }
644 if (!attrset) {
645 CERROR("No stripes had valid attrs\n");
646 rc = -EIO;
647 }
648 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
649 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
650 /* When we take attributes of some epoch, we require all the
651 * ost to be active. */
652 CERROR("Not all the stripes had valid attrs\n");
653 GOTO(out, rc = -EIO);
654 }
655
656 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
657 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
658out:
659 if (tmp_oa)
660 OBDO_FREE(tmp_oa);
0a3bdb00 661 return rc;
d7e09d03
PT
662
663}
664
665static int brw_done(struct lov_request_set *set)
666{
667 struct lov_stripe_md *lsm = set->set_oi->oi_md;
668 struct lov_oinfo *loi = NULL;
669 struct list_head *pos;
670 struct lov_request *req;
d7e09d03
PT
671
672 list_for_each (pos, &set->set_list) {
673 req = list_entry(pos, struct lov_request, rq_link);
674
675 if (!req->rq_complete || req->rq_rc)
676 continue;
677
678 loi = lsm->lsm_oinfo[req->rq_stripe];
679
680 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
681 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
682 }
683
0a3bdb00 684 return 0;
d7e09d03
PT
685}
686
687int lov_fini_brw_set(struct lov_request_set *set)
688{
689 int rc = 0;
d7e09d03
PT
690
691 if (set == NULL)
0a3bdb00 692 return 0;
d7e09d03
PT
693 LASSERT(set->set_exp);
694 if (atomic_read(&set->set_completes)) {
695 rc = brw_done(set);
696 /* FIXME update qos data here */
697 }
698 lov_put_reqset(set);
699
0a3bdb00 700 return rc;
d7e09d03
PT
701}
702
703int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
704 obd_count oa_bufs, struct brw_page *pga,
705 struct obd_trans_info *oti,
706 struct lov_request_set **reqset)
707{
708 struct {
709 obd_count index;
710 obd_count count;
711 obd_count off;
712 } *info = NULL;
713 struct lov_request_set *set;
714 struct lov_obd *lov = &exp->exp_obd->u.lov;
715 int rc = 0, i, shift;
d7e09d03
PT
716
717 OBD_ALLOC(set, sizeof(*set));
718 if (set == NULL)
0a3bdb00 719 return -ENOMEM;
d7e09d03
PT
720 lov_init_set(set);
721
722 set->set_exp = exp;
723 set->set_oti = oti;
724 set->set_oi = oinfo;
725 set->set_oabufs = oa_bufs;
726 OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
727 if (!set->set_pga)
728 GOTO(out, rc = -ENOMEM);
729
730 OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
731 if (!info)
732 GOTO(out, rc = -ENOMEM);
733
734 /* calculate the page count for each stripe */
735 for (i = 0; i < oa_bufs; i++) {
736 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
737 info[stripe].count++;
738 }
739
740 /* alloc and initialize lov request */
741 shift = 0;
742 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
743 struct lov_oinfo *loi = NULL;
744 struct lov_request *req;
745
746 if (info[i].count == 0)
747 continue;
748
749 loi = oinfo->oi_md->lsm_oinfo[i];
750 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
751 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
752 GOTO(out, rc = -EIO);
753 }
754
755 OBD_ALLOC(req, sizeof(*req));
756 if (req == NULL)
757 GOTO(out, rc = -ENOMEM);
758
759 OBDO_ALLOC(req->rq_oi.oi_oa);
760 if (req->rq_oi.oi_oa == NULL) {
761 OBD_FREE(req, sizeof(*req));
762 GOTO(out, rc = -ENOMEM);
763 }
764
765 if (oinfo->oi_oa) {
766 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
767 sizeof(*req->rq_oi.oi_oa));
768 }
769 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
770 req->rq_oi.oi_oa->o_stripe_idx = i;
771
772 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
773 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
774 if (req->rq_oi.oi_md == NULL) {
775 OBDO_FREE(req->rq_oi.oi_oa);
776 OBD_FREE(req, sizeof(*req));
777 GOTO(out, rc = -ENOMEM);
778 }
779
780 req->rq_idx = loi->loi_ost_idx;
781 req->rq_stripe = i;
782
783 /* XXX LOV STACKING */
784 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
785 req->rq_oabufs = info[i].count;
786 req->rq_pgaidx = shift;
787 shift += req->rq_oabufs;
788
789 /* remember the index for sort brw_page array */
790 info[i].index = req->rq_pgaidx;
791
792 req->rq_oi.oi_capa = oinfo->oi_capa;
793
794 lov_set_add_req(req, set);
795 }
796 if (!set->set_count)
797 GOTO(out, rc = -EIO);
798
799 /* rotate & sort the brw_page array */
800 for (i = 0; i < oa_bufs; i++) {
801 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
802
803 shift = info[stripe].index + info[stripe].off;
804 LASSERT(shift < oa_bufs);
805 set->set_pga[shift] = pga[i];
806 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
807 &set->set_pga[shift].off);
808 info[stripe].off++;
809 }
810out:
811 if (info)
812 OBD_FREE_LARGE(info,
813 sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
814
815 if (rc == 0)
816 *reqset = set;
817 else
818 lov_fini_brw_set(set);
819
0a3bdb00 820 return rc;
d7e09d03
PT
821}
822
823int lov_fini_getattr_set(struct lov_request_set *set)
824{
825 int rc = 0;
d7e09d03
PT
826
827 if (set == NULL)
0a3bdb00 828 return 0;
d7e09d03
PT
829 LASSERT(set->set_exp);
830 if (atomic_read(&set->set_completes))
831 rc = common_attr_done(set);
832
833 lov_put_reqset(set);
834
0a3bdb00 835 return rc;
d7e09d03
PT
836}
837
838/* The callback for osc_getattr_async that finilizes a request info when a
839 * response is received. */
840static int cb_getattr_update(void *cookie, int rc)
841{
842 struct obd_info *oinfo = cookie;
843 struct lov_request *lovreq;
844 lovreq = container_of(oinfo, struct lov_request, rq_oi);
845 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
846}
847
848int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
849 struct lov_request_set **reqset)
850{
851 struct lov_request_set *set;
852 struct lov_obd *lov = &exp->exp_obd->u.lov;
853 int rc = 0, i;
d7e09d03
PT
854
855 OBD_ALLOC(set, sizeof(*set));
856 if (set == NULL)
0a3bdb00 857 return -ENOMEM;
d7e09d03
PT
858 lov_init_set(set);
859
860 set->set_exp = exp;
861 set->set_oi = oinfo;
862
863 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
864 struct lov_oinfo *loi;
865 struct lov_request *req;
866
867 loi = oinfo->oi_md->lsm_oinfo[i];
868 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
869 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
870 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
871 /* SOM requires all the OSTs to be active. */
872 GOTO(out_set, rc = -EIO);
873 continue;
874 }
875
876 OBD_ALLOC(req, sizeof(*req));
877 if (req == NULL)
878 GOTO(out_set, rc = -ENOMEM);
879
880 req->rq_stripe = i;
881 req->rq_idx = loi->loi_ost_idx;
882
883 OBDO_ALLOC(req->rq_oi.oi_oa);
884 if (req->rq_oi.oi_oa == NULL) {
885 OBD_FREE(req, sizeof(*req));
886 GOTO(out_set, rc = -ENOMEM);
887 }
888 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
889 sizeof(*req->rq_oi.oi_oa));
890 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
891 req->rq_oi.oi_cb_up = cb_getattr_update;
892 req->rq_oi.oi_capa = oinfo->oi_capa;
893
894 lov_set_add_req(req, set);
895 }
896 if (!set->set_count)
897 GOTO(out_set, rc = -EIO);
898 *reqset = set;
0a3bdb00 899 return rc;
d7e09d03
PT
900out_set:
901 lov_fini_getattr_set(set);
0a3bdb00 902 return rc;
d7e09d03
PT
903}
904
905int lov_fini_destroy_set(struct lov_request_set *set)
906{
d7e09d03 907 if (set == NULL)
0a3bdb00 908 return 0;
d7e09d03
PT
909 LASSERT(set->set_exp);
910 if (atomic_read(&set->set_completes)) {
911 /* FIXME update qos data here */
912 }
913
914 lov_put_reqset(set);
915
0a3bdb00 916 return 0;
d7e09d03
PT
917}
918
919int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
920 struct obdo *src_oa, struct lov_stripe_md *lsm,
921 struct obd_trans_info *oti,
922 struct lov_request_set **reqset)
923{
924 struct lov_request_set *set;
925 struct lov_obd *lov = &exp->exp_obd->u.lov;
926 int rc = 0, i;
d7e09d03
PT
927
928 OBD_ALLOC(set, sizeof(*set));
929 if (set == NULL)
0a3bdb00 930 return -ENOMEM;
d7e09d03
PT
931 lov_init_set(set);
932
933 set->set_exp = exp;
934 set->set_oi = oinfo;
935 set->set_oi->oi_md = lsm;
936 set->set_oi->oi_oa = src_oa;
937 set->set_oti = oti;
938 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
939 set->set_cookies = oti->oti_logcookies;
940
941 for (i = 0; i < lsm->lsm_stripe_count; i++) {
942 struct lov_oinfo *loi;
943 struct lov_request *req;
944
945 loi = lsm->lsm_oinfo[i];
946 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
947 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
948 continue;
949 }
950
951 OBD_ALLOC(req, sizeof(*req));
952 if (req == NULL)
953 GOTO(out_set, rc = -ENOMEM);
954
955 req->rq_stripe = i;
956 req->rq_idx = loi->loi_ost_idx;
957
958 OBDO_ALLOC(req->rq_oi.oi_oa);
959 if (req->rq_oi.oi_oa == NULL) {
960 OBD_FREE(req, sizeof(*req));
961 GOTO(out_set, rc = -ENOMEM);
962 }
963 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
964 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
965 lov_set_add_req(req, set);
966 }
967 if (!set->set_count)
968 GOTO(out_set, rc = -EIO);
969 *reqset = set;
0a3bdb00 970 return rc;
d7e09d03
PT
971out_set:
972 lov_fini_destroy_set(set);
0a3bdb00 973 return rc;
d7e09d03
PT
974}
975
976int lov_fini_setattr_set(struct lov_request_set *set)
977{
978 int rc = 0;
d7e09d03
PT
979
980 if (set == NULL)
0a3bdb00 981 return 0;
d7e09d03
PT
982 LASSERT(set->set_exp);
983 if (atomic_read(&set->set_completes)) {
984 rc = common_attr_done(set);
985 /* FIXME update qos data here */
986 }
987
988 lov_put_reqset(set);
0a3bdb00 989 return rc;
d7e09d03
PT
990}
991
992int lov_update_setattr_set(struct lov_request_set *set,
993 struct lov_request *req, int rc)
994{
995 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
996 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
d7e09d03
PT
997
998 lov_update_set(set, req, rc);
999
1000 /* grace error on inactive ost */
1001 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1002 lov->lov_tgts[req->rq_idx]->ltd_active))
1003 rc = 0;
1004
1005 if (rc == 0) {
1006 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1007 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1008 req->rq_oi.oi_oa->o_ctime;
1009 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1010 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1011 req->rq_oi.oi_oa->o_mtime;
1012 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1013 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1014 req->rq_oi.oi_oa->o_atime;
1015 }
1016
0a3bdb00 1017 return rc;
d7e09d03
PT
1018}
1019
1020/* The callback for osc_setattr_async that finilizes a request info when a
1021 * response is received. */
1022static int cb_setattr_update(void *cookie, int rc)
1023{
1024 struct obd_info *oinfo = cookie;
1025 struct lov_request *lovreq;
1026 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1027 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1028}
1029
1030int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1031 struct obd_trans_info *oti,
1032 struct lov_request_set **reqset)
1033{
1034 struct lov_request_set *set;
1035 struct lov_obd *lov = &exp->exp_obd->u.lov;
1036 int rc = 0, i;
d7e09d03
PT
1037
1038 OBD_ALLOC(set, sizeof(*set));
1039 if (set == NULL)
0a3bdb00 1040 return -ENOMEM;
d7e09d03
PT
1041 lov_init_set(set);
1042
1043 set->set_exp = exp;
1044 set->set_oti = oti;
1045 set->set_oi = oinfo;
1046 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1047 set->set_cookies = oti->oti_logcookies;
1048
1049 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1050 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1051 struct lov_request *req;
1052
1053 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1054 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1055 continue;
1056 }
1057
1058 OBD_ALLOC(req, sizeof(*req));
1059 if (req == NULL)
1060 GOTO(out_set, rc = -ENOMEM);
1061 req->rq_stripe = i;
1062 req->rq_idx = loi->loi_ost_idx;
1063
1064 OBDO_ALLOC(req->rq_oi.oi_oa);
1065 if (req->rq_oi.oi_oa == NULL) {
1066 OBD_FREE(req, sizeof(*req));
1067 GOTO(out_set, rc = -ENOMEM);
1068 }
1069 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1070 sizeof(*req->rq_oi.oi_oa));
1071 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1072 req->rq_oi.oi_oa->o_stripe_idx = i;
1073 req->rq_oi.oi_cb_up = cb_setattr_update;
1074 req->rq_oi.oi_capa = oinfo->oi_capa;
1075
1076 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1077 int off = lov_stripe_offset(oinfo->oi_md,
1078 oinfo->oi_oa->o_size, i,
1079 &req->rq_oi.oi_oa->o_size);
1080
1081 if (off < 0 && req->rq_oi.oi_oa->o_size)
1082 req->rq_oi.oi_oa->o_size--;
1083
1084 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1085 i, req->rq_oi.oi_oa->o_size,
1086 oinfo->oi_oa->o_size);
1087 }
1088 lov_set_add_req(req, set);
1089 }
1090 if (!set->set_count)
1091 GOTO(out_set, rc = -EIO);
1092 *reqset = set;
0a3bdb00 1093 return rc;
d7e09d03
PT
1094out_set:
1095 lov_fini_setattr_set(set);
0a3bdb00 1096 return rc;
d7e09d03
PT
1097}
1098
1099int lov_fini_punch_set(struct lov_request_set *set)
1100{
1101 int rc = 0;
d7e09d03
PT
1102
1103 if (set == NULL)
0a3bdb00 1104 return 0;
d7e09d03
PT
1105 LASSERT(set->set_exp);
1106 if (atomic_read(&set->set_completes)) {
1107 rc = -EIO;
1108 /* FIXME update qos data here */
1109 if (atomic_read(&set->set_success))
1110 rc = common_attr_done(set);
1111 }
1112
1113 lov_put_reqset(set);
1114
0a3bdb00 1115 return rc;
d7e09d03
PT
1116}
1117
1118int lov_update_punch_set(struct lov_request_set *set,
1119 struct lov_request *req, int rc)
1120{
1121 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1122 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
d7e09d03
PT
1123
1124 lov_update_set(set, req, rc);
1125
1126 /* grace error on inactive ost */
1127 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1128 rc = 0;
1129
1130 if (rc == 0) {
1131 lov_stripe_lock(lsm);
1132 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1133 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1134 req->rq_oi.oi_oa->o_blocks;
1135 }
1136
1137 lov_stripe_unlock(lsm);
1138 }
1139
0a3bdb00 1140 return rc;
d7e09d03
PT
1141}
1142
1143/* The callback for osc_punch that finilizes a request info when a response
1144 * is received. */
1145static int cb_update_punch(void *cookie, int rc)
1146{
1147 struct obd_info *oinfo = cookie;
1148 struct lov_request *lovreq;
1149 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1150 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1151}
1152
1153int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1154 struct obd_trans_info *oti,
1155 struct lov_request_set **reqset)
1156{
1157 struct lov_request_set *set;
1158 struct lov_obd *lov = &exp->exp_obd->u.lov;
1159 int rc = 0, i;
d7e09d03
PT
1160
1161 OBD_ALLOC(set, sizeof(*set));
1162 if (set == NULL)
0a3bdb00 1163 return -ENOMEM;
d7e09d03
PT
1164 lov_init_set(set);
1165
1166 set->set_oi = oinfo;
1167 set->set_exp = exp;
1168
1169 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1170 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1171 struct lov_request *req;
1172 obd_off rs, re;
1173
1174 if (!lov_stripe_intersects(oinfo->oi_md, i,
1175 oinfo->oi_policy.l_extent.start,
1176 oinfo->oi_policy.l_extent.end,
1177 &rs, &re))
1178 continue;
1179
1180 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1181 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1182 GOTO(out_set, rc = -EIO);
1183 }
1184
1185 OBD_ALLOC(req, sizeof(*req));
1186 if (req == NULL)
1187 GOTO(out_set, rc = -ENOMEM);
1188 req->rq_stripe = i;
1189 req->rq_idx = loi->loi_ost_idx;
1190
1191 OBDO_ALLOC(req->rq_oi.oi_oa);
1192 if (req->rq_oi.oi_oa == NULL) {
1193 OBD_FREE(req, sizeof(*req));
1194 GOTO(out_set, rc = -ENOMEM);
1195 }
1196 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1197 sizeof(*req->rq_oi.oi_oa));
1198 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1199 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1200
1201 req->rq_oi.oi_oa->o_stripe_idx = i;
1202 req->rq_oi.oi_cb_up = cb_update_punch;
1203
1204 req->rq_oi.oi_policy.l_extent.start = rs;
1205 req->rq_oi.oi_policy.l_extent.end = re;
1206 req->rq_oi.oi_policy.l_extent.gid = -1;
1207
1208 req->rq_oi.oi_capa = oinfo->oi_capa;
1209
1210 lov_set_add_req(req, set);
1211 }
1212 if (!set->set_count)
1213 GOTO(out_set, rc = -EIO);
1214 *reqset = set;
0a3bdb00 1215 return rc;
d7e09d03
PT
1216out_set:
1217 lov_fini_punch_set(set);
0a3bdb00 1218 return rc;
d7e09d03
PT
1219}
1220
1221int lov_fini_sync_set(struct lov_request_set *set)
1222{
1223 int rc = 0;
d7e09d03
PT
1224
1225 if (set == NULL)
0a3bdb00 1226 return 0;
d7e09d03
PT
1227 LASSERT(set->set_exp);
1228 if (atomic_read(&set->set_completes)) {
1229 if (!atomic_read(&set->set_success))
1230 rc = -EIO;
1231 /* FIXME update qos data here */
1232 }
1233
1234 lov_put_reqset(set);
1235
0a3bdb00 1236 return rc;
d7e09d03
PT
1237}
1238
1239/* The callback for osc_sync that finilizes a request info when a
1240 * response is recieved. */
1241static int cb_sync_update(void *cookie, int rc)
1242{
1243 struct obd_info *oinfo = cookie;
1244 struct lov_request *lovreq;
1245
1246 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1247 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1248}
1249
1250int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1251 obd_off start, obd_off end,
1252 struct lov_request_set **reqset)
1253{
1254 struct lov_request_set *set;
1255 struct lov_obd *lov = &exp->exp_obd->u.lov;
1256 int rc = 0, i;
d7e09d03
PT
1257
1258 OBD_ALLOC_PTR(set);
1259 if (set == NULL)
0a3bdb00 1260 return -ENOMEM;
d7e09d03
PT
1261 lov_init_set(set);
1262
1263 set->set_exp = exp;
1264 set->set_oi = oinfo;
1265
1266 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1267 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1268 struct lov_request *req;
1269 obd_off rs, re;
1270
1271 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1272 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1273 continue;
1274 }
1275
1276 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1277 &re))
1278 continue;
1279
1280 OBD_ALLOC_PTR(req);
1281 if (req == NULL)
1282 GOTO(out_set, rc = -ENOMEM);
1283 req->rq_stripe = i;
1284 req->rq_idx = loi->loi_ost_idx;
1285
1286 OBDO_ALLOC(req->rq_oi.oi_oa);
1287 if (req->rq_oi.oi_oa == NULL) {
1288 OBD_FREE(req, sizeof(*req));
1289 GOTO(out_set, rc = -ENOMEM);
1290 }
1291 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1292 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1293 req->rq_oi.oi_oa->o_stripe_idx = i;
1294
1295 req->rq_oi.oi_policy.l_extent.start = rs;
1296 req->rq_oi.oi_policy.l_extent.end = re;
1297 req->rq_oi.oi_policy.l_extent.gid = -1;
1298 req->rq_oi.oi_cb_up = cb_sync_update;
1299
1300 lov_set_add_req(req, set);
1301 }
1302 if (!set->set_count)
1303 GOTO(out_set, rc = -EIO);
1304 *reqset = set;
0a3bdb00 1305 return rc;
d7e09d03
PT
1306out_set:
1307 lov_fini_sync_set(set);
0a3bdb00 1308 return rc;
d7e09d03
PT
1309}
1310
1311#define LOV_U64_MAX ((__u64)~0ULL)
1312#define LOV_SUM_MAX(tot, add) \
1313 do { \
1314 if ((tot) + (add) < (tot)) \
1315 (tot) = LOV_U64_MAX; \
1316 else \
1317 (tot) += (add); \
1318 } while(0)
1319
1320int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1321{
d7e09d03
PT
1322 if (success) {
1323 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1324 LOV_MAGIC, 0);
1325 if (osfs->os_files != LOV_U64_MAX)
1326 lov_do_div64(osfs->os_files, expected_stripes);
1327 if (osfs->os_ffree != LOV_U64_MAX)
1328 lov_do_div64(osfs->os_ffree, expected_stripes);
1329
1330 spin_lock(&obd->obd_osfs_lock);
1331 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1332 obd->obd_osfs_age = cfs_time_current_64();
1333 spin_unlock(&obd->obd_osfs_lock);
0a3bdb00 1334 return 0;
d7e09d03
PT
1335 }
1336
0a3bdb00 1337 return -EIO;
d7e09d03
PT
1338}
1339
1340int lov_fini_statfs_set(struct lov_request_set *set)
1341{
1342 int rc = 0;
d7e09d03
PT
1343
1344 if (set == NULL)
0a3bdb00 1345 return 0;
d7e09d03
PT
1346
1347 if (atomic_read(&set->set_completes)) {
1348 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1349 atomic_read(&set->set_success));
1350 }
1351 lov_put_reqset(set);
0a3bdb00 1352 return rc;
d7e09d03
PT
1353}
1354
1355void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1356 int success)
1357{
1358 int shift = 0, quit = 0;
1359 __u64 tmp;
1360
1361 if (success == 0) {
1362 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1363 } else {
1364 if (osfs->os_bsize != lov_sfs->os_bsize) {
1365 /* assume all block sizes are always powers of 2 */
1366 /* get the bits difference */
1367 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1368 for (shift = 0; shift <= 64; ++shift) {
1369 if (tmp & 1) {
1370 if (quit)
1371 break;
1372 else
1373 quit = 1;
1374 shift = 0;
1375 }
1376 tmp >>= 1;
1377 }
1378 }
1379
1380 if (osfs->os_bsize < lov_sfs->os_bsize) {
1381 osfs->os_bsize = lov_sfs->os_bsize;
1382
1383 osfs->os_bfree >>= shift;
1384 osfs->os_bavail >>= shift;
1385 osfs->os_blocks >>= shift;
1386 } else if (shift != 0) {
1387 lov_sfs->os_bfree >>= shift;
1388 lov_sfs->os_bavail >>= shift;
1389 lov_sfs->os_blocks >>= shift;
1390 }
1391 osfs->os_bfree += lov_sfs->os_bfree;
1392 osfs->os_bavail += lov_sfs->os_bavail;
1393 osfs->os_blocks += lov_sfs->os_blocks;
1394 /* XXX not sure about this one - depends on policy.
1395 * - could be minimum if we always stripe on all OBDs
1396 * (but that would be wrong for any other policy,
1397 * if one of the OBDs has no more objects left)
1398 * - could be sum if we stripe whole objects
1399 * - could be average, just to give a nice number
1400 *
1401 * To give a "reasonable" (if not wholly accurate)
1402 * number, we divide the total number of free objects
1403 * by expected stripe count (watch out for overflow).
1404 */
1405 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1406 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1407 }
1408}
1409
1410/* The callback for osc_statfs_async that finilizes a request info when a
1411 * response is received. */
1412static int cb_statfs_update(void *cookie, int rc)
1413{
1414 struct obd_info *oinfo = cookie;
1415 struct lov_request *lovreq;
1416 struct lov_request_set *set;
1417 struct obd_statfs *osfs, *lov_sfs;
1418 struct lov_obd *lov;
1419 struct lov_tgt_desc *tgt;
1420 struct obd_device *lovobd, *tgtobd;
1421 int success;
d7e09d03
PT
1422
1423 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1424 set = lovreq->rq_rqset;
1425 lovobd = set->set_obd;
1426 lov = &lovobd->u.lov;
1427 osfs = set->set_oi->oi_osfs;
1428 lov_sfs = oinfo->oi_osfs;
1429 success = atomic_read(&set->set_success);
1430 /* XXX: the same is done in lov_update_common_set, however
1431 lovset->set_exp is not initialized. */
1432 lov_update_set(set, lovreq, rc);
1433 if (rc)
1434 GOTO(out, rc);
1435
1436 obd_getref(lovobd);
1437 tgt = lov->lov_tgts[lovreq->rq_idx];
1438 if (!tgt || !tgt->ltd_active)
1439 GOTO(out_update, rc);
1440
1441 tgtobd = class_exp2obd(tgt->ltd_exp);
1442 spin_lock(&tgtobd->obd_osfs_lock);
1443 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1444 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1445 tgtobd->obd_osfs_age = cfs_time_current_64();
1446 spin_unlock(&tgtobd->obd_osfs_lock);
1447
1448out_update:
1449 lov_update_statfs(osfs, lov_sfs, success);
1450 obd_putref(lovobd);
1451
1452out:
1453 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1454 lov_set_finished(set, 0)) {
1455 lov_statfs_interpret(NULL, set, set->set_count !=
1456 atomic_read(&set->set_success));
1457 }
1458
0a3bdb00 1459 return 0;
d7e09d03
PT
1460}
1461
1462int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1463 struct lov_request_set **reqset)
1464{
1465 struct lov_request_set *set;
1466 struct lov_obd *lov = &obd->u.lov;
1467 int rc = 0, i;
d7e09d03
PT
1468
1469 OBD_ALLOC(set, sizeof(*set));
1470 if (set == NULL)
0a3bdb00 1471 return -ENOMEM;
d7e09d03
PT
1472 lov_init_set(set);
1473
1474 set->set_obd = obd;
1475 set->set_oi = oinfo;
1476
1477 /* We only get block data from the OBD */
1478 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1479 struct lov_request *req;
1480
1481 if (lov->lov_tgts[i] == NULL ||
1482 (!lov_check_and_wait_active(lov, i) &&
1483 (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1484 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1485 continue;
1486 }
1487
1488 /* skip targets that have been explicitely disabled by the
1489 * administrator */
1490 if (!lov->lov_tgts[i]->ltd_exp) {
1491 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1492 continue;
1493 }
1494
1495 OBD_ALLOC(req, sizeof(*req));
1496 if (req == NULL)
1497 GOTO(out_set, rc = -ENOMEM);
1498
1499 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1500 if (req->rq_oi.oi_osfs == NULL) {
1501 OBD_FREE(req, sizeof(*req));
1502 GOTO(out_set, rc = -ENOMEM);
1503 }
1504
1505 req->rq_idx = i;
1506 req->rq_oi.oi_cb_up = cb_statfs_update;
1507 req->rq_oi.oi_flags = oinfo->oi_flags;
1508
1509 lov_set_add_req(req, set);
1510 }
1511 if (!set->set_count)
1512 GOTO(out_set, rc = -EIO);
1513 *reqset = set;
0a3bdb00 1514 return rc;
d7e09d03
PT
1515out_set:
1516 lov_fini_statfs_set(set);
0a3bdb00 1517 return rc;
d7e09d03 1518}
This page took 0.140225 seconds and 5 git commands to generate.