4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_LOV
39 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/obd_class.h"
42 #include "../include/lustre/lustre_idl.h"
43 #include "lov_internal.h"
45 static void lov_init_set(struct lov_request_set
*set
)
48 atomic_set(&set
->set_completes
, 0);
49 atomic_set(&set
->set_success
, 0);
50 atomic_set(&set
->set_finish_checked
, 0);
51 set
->set_cookies
= NULL
;
52 INIT_LIST_HEAD(&set
->set_list
);
53 atomic_set(&set
->set_refcount
, 1);
54 init_waitqueue_head(&set
->set_waitq
);
55 spin_lock_init(&set
->set_lock
);
58 void lov_finish_set(struct lov_request_set
*set
)
60 struct list_head
*pos
, *n
;
63 list_for_each_safe(pos
, n
, &set
->set_list
) {
64 struct lov_request
*req
= list_entry(pos
,
67 list_del_init(&req
->rq_link
);
70 OBDO_FREE(req
->rq_oi
.oi_oa
);
72 OBD_FREE_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
73 kfree(req
->rq_oi
.oi_osfs
);
79 int lov_set_finished(struct lov_request_set
*set
, int idempotent
)
81 int completes
= atomic_read(&set
->set_completes
);
83 CDEBUG(D_INFO
, "check set %d/%d\n", completes
, set
->set_count
);
85 if (completes
== set
->set_count
) {
88 if (atomic_inc_return(&set
->set_finish_checked
) == 1)
94 void lov_update_set(struct lov_request_set
*set
,
95 struct lov_request
*req
, int rc
)
100 atomic_inc(&set
->set_completes
);
102 atomic_inc(&set
->set_success
);
104 wake_up(&set
->set_waitq
);
107 int lov_update_common_set(struct lov_request_set
*set
,
108 struct lov_request
*req
, int rc
)
110 struct lov_obd
*lov
= &set
->set_exp
->exp_obd
->u
.lov
;
112 lov_update_set(set
, req
, rc
);
114 /* grace error on inactive ost */
115 if (rc
&& !(lov
->lov_tgts
[req
->rq_idx
] &&
116 lov
->lov_tgts
[req
->rq_idx
]->ltd_active
))
119 /* FIXME in raid1 regime, should return 0 */
123 void lov_set_add_req(struct lov_request
*req
, struct lov_request_set
*set
)
125 list_add_tail(&req
->rq_link
, &set
->set_list
);
130 static int lov_check_set(struct lov_obd
*lov
, int idx
)
133 struct lov_tgt_desc
*tgt
;
135 mutex_lock(&lov
->lov_lock
);
136 tgt
= lov
->lov_tgts
[idx
];
137 rc
= !tgt
|| tgt
->ltd_active
||
139 class_exp2cliimp(tgt
->ltd_exp
)->imp_connect_tried
);
140 mutex_unlock(&lov
->lov_lock
);
145 /* Check if the OSC connection exists and is active.
146 * If the OSC has not yet had a chance to connect to the OST the first time,
147 * wait once for it to connect instead of returning an error.
149 int lov_check_and_wait_active(struct lov_obd
*lov
, int ost_idx
)
151 wait_queue_head_t waitq
;
152 struct l_wait_info lwi
;
153 struct lov_tgt_desc
*tgt
;
156 mutex_lock(&lov
->lov_lock
);
158 tgt
= lov
->lov_tgts
[ost_idx
];
160 if (unlikely(tgt
== NULL
)) {
165 if (likely(tgt
->ltd_active
)) {
170 if (tgt
->ltd_exp
&& class_exp2cliimp(tgt
->ltd_exp
)->imp_connect_tried
) {
175 mutex_unlock(&lov
->lov_lock
);
177 init_waitqueue_head(&waitq
);
178 lwi
= LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout
),
179 cfs_time_seconds(1), NULL
, NULL
);
181 rc
= l_wait_event(waitq
, lov_check_set(lov
, ost_idx
), &lwi
);
182 if (tgt
!= NULL
&& tgt
->ltd_active
)
188 mutex_unlock(&lov
->lov_lock
);
192 static int common_attr_done(struct lov_request_set
*set
)
194 struct list_head
*pos
;
195 struct lov_request
*req
;
197 int rc
= 0, attrset
= 0;
199 LASSERT(set
->set_oi
!= NULL
);
201 if (set
->set_oi
->oi_oa
== NULL
)
204 if (!atomic_read(&set
->set_success
))
208 if (tmp_oa
== NULL
) {
213 list_for_each(pos
, &set
->set_list
) {
214 req
= list_entry(pos
, struct lov_request
, rq_link
);
216 if (!req
->rq_complete
|| req
->rq_rc
)
218 if (req
->rq_oi
.oi_oa
->o_valid
== 0) /* inactive stripe */
220 lov_merge_attrs(tmp_oa
, req
->rq_oi
.oi_oa
,
221 req
->rq_oi
.oi_oa
->o_valid
,
222 set
->set_oi
->oi_md
, req
->rq_stripe
, &attrset
);
225 CERROR("No stripes had valid attrs\n");
228 if ((set
->set_oi
->oi_oa
->o_valid
& OBD_MD_FLEPOCH
) &&
229 (set
->set_oi
->oi_md
->lsm_stripe_count
!= attrset
)) {
230 /* When we take attributes of some epoch, we require all the
231 * ost to be active. */
232 CERROR("Not all the stripes had valid attrs\n");
237 tmp_oa
->o_oi
= set
->set_oi
->oi_oa
->o_oi
;
238 memcpy(set
->set_oi
->oi_oa
, tmp_oa
, sizeof(*set
->set_oi
->oi_oa
));
246 int lov_fini_getattr_set(struct lov_request_set
*set
)
252 LASSERT(set
->set_exp
);
253 if (atomic_read(&set
->set_completes
))
254 rc
= common_attr_done(set
);
261 /* The callback for osc_getattr_async that finalizes a request info when a
262 * response is received. */
263 static int cb_getattr_update(void *cookie
, int rc
)
265 struct obd_info
*oinfo
= cookie
;
266 struct lov_request
*lovreq
;
268 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
269 return lov_update_common_set(lovreq
->rq_rqset
, lovreq
, rc
);
272 int lov_prep_getattr_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
273 struct lov_request_set
**reqset
)
275 struct lov_request_set
*set
;
276 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
279 set
= kzalloc(sizeof(*set
), GFP_NOFS
);
287 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
288 struct lov_oinfo
*loi
;
289 struct lov_request
*req
;
291 loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
292 if (lov_oinfo_is_dummy(loi
))
295 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
296 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
297 if (oinfo
->oi_oa
->o_valid
& OBD_MD_FLEPOCH
) {
298 /* SOM requires all the OSTs to be active. */
305 req
= kzalloc(sizeof(*req
), GFP_NOFS
);
312 req
->rq_idx
= loi
->loi_ost_idx
;
314 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
315 if (req
->rq_oi
.oi_oa
== NULL
) {
320 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
321 sizeof(*req
->rq_oi
.oi_oa
));
322 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
323 req
->rq_oi
.oi_cb_up
= cb_getattr_update
;
324 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
326 lov_set_add_req(req
, set
);
328 if (!set
->set_count
) {
335 lov_fini_getattr_set(set
);
339 int lov_fini_destroy_set(struct lov_request_set
*set
)
343 LASSERT(set
->set_exp
);
344 if (atomic_read(&set
->set_completes
)) {
345 /* FIXME update qos data here */
353 int lov_prep_destroy_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
354 struct obdo
*src_oa
, struct lov_stripe_md
*lsm
,
355 struct obd_trans_info
*oti
,
356 struct lov_request_set
**reqset
)
358 struct lov_request_set
*set
;
359 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
362 set
= kzalloc(sizeof(*set
), GFP_NOFS
);
369 set
->set_oi
->oi_md
= lsm
;
370 set
->set_oi
->oi_oa
= src_oa
;
372 if (oti
!= NULL
&& src_oa
->o_valid
& OBD_MD_FLCOOKIE
)
373 set
->set_cookies
= oti
->oti_logcookies
;
375 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
376 struct lov_oinfo
*loi
;
377 struct lov_request
*req
;
379 loi
= lsm
->lsm_oinfo
[i
];
380 if (lov_oinfo_is_dummy(loi
))
383 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
384 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
388 req
= kzalloc(sizeof(*req
), GFP_NOFS
);
395 req
->rq_idx
= loi
->loi_ost_idx
;
397 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
398 if (req
->rq_oi
.oi_oa
== NULL
) {
403 memcpy(req
->rq_oi
.oi_oa
, src_oa
, sizeof(*req
->rq_oi
.oi_oa
));
404 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
405 lov_set_add_req(req
, set
);
407 if (!set
->set_count
) {
414 lov_fini_destroy_set(set
);
418 int lov_fini_setattr_set(struct lov_request_set
*set
)
424 LASSERT(set
->set_exp
);
425 if (atomic_read(&set
->set_completes
)) {
426 rc
= common_attr_done(set
);
427 /* FIXME update qos data here */
434 int lov_update_setattr_set(struct lov_request_set
*set
,
435 struct lov_request
*req
, int rc
)
437 struct lov_obd
*lov
= &req
->rq_rqset
->set_exp
->exp_obd
->u
.lov
;
438 struct lov_stripe_md
*lsm
= req
->rq_rqset
->set_oi
->oi_md
;
440 lov_update_set(set
, req
, rc
);
442 /* grace error on inactive ost */
443 if (rc
&& !(lov
->lov_tgts
[req
->rq_idx
] &&
444 lov
->lov_tgts
[req
->rq_idx
]->ltd_active
))
448 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLCTIME
)
449 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_ctime
=
450 req
->rq_oi
.oi_oa
->o_ctime
;
451 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLMTIME
)
452 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_mtime
=
453 req
->rq_oi
.oi_oa
->o_mtime
;
454 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLATIME
)
455 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_atime
=
456 req
->rq_oi
.oi_oa
->o_atime
;
462 /* The callback for osc_setattr_async that finalizes a request info when a
463 * response is received. */
464 static int cb_setattr_update(void *cookie
, int rc
)
466 struct obd_info
*oinfo
= cookie
;
467 struct lov_request
*lovreq
;
469 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
470 return lov_update_setattr_set(lovreq
->rq_rqset
, lovreq
, rc
);
473 int lov_prep_setattr_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
474 struct obd_trans_info
*oti
,
475 struct lov_request_set
**reqset
)
477 struct lov_request_set
*set
;
478 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
481 set
= kzalloc(sizeof(*set
), GFP_NOFS
);
489 if (oti
!= NULL
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
490 set
->set_cookies
= oti
->oti_logcookies
;
492 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
493 struct lov_oinfo
*loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
494 struct lov_request
*req
;
496 if (lov_oinfo_is_dummy(loi
))
499 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
500 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
504 req
= kzalloc(sizeof(*req
), GFP_NOFS
);
510 req
->rq_idx
= loi
->loi_ost_idx
;
512 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
513 if (req
->rq_oi
.oi_oa
== NULL
) {
518 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
519 sizeof(*req
->rq_oi
.oi_oa
));
520 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
521 req
->rq_oi
.oi_oa
->o_stripe_idx
= i
;
522 req
->rq_oi
.oi_cb_up
= cb_setattr_update
;
523 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
525 if (oinfo
->oi_oa
->o_valid
& OBD_MD_FLSIZE
) {
526 int off
= lov_stripe_offset(oinfo
->oi_md
,
527 oinfo
->oi_oa
->o_size
, i
,
528 &req
->rq_oi
.oi_oa
->o_size
);
530 if (off
< 0 && req
->rq_oi
.oi_oa
->o_size
)
531 req
->rq_oi
.oi_oa
->o_size
--;
533 CDEBUG(D_INODE
, "stripe %d has size %llu/%llu\n",
534 i
, req
->rq_oi
.oi_oa
->o_size
,
535 oinfo
->oi_oa
->o_size
);
537 lov_set_add_req(req
, set
);
539 if (!set
->set_count
) {
546 lov_fini_setattr_set(set
);
550 #define LOV_U64_MAX ((__u64)~0ULL)
551 #define LOV_SUM_MAX(tot, add) \
553 if ((tot) + (add) < (tot)) \
554 (tot) = LOV_U64_MAX; \
559 int lov_fini_statfs(struct obd_device
*obd
, struct obd_statfs
*osfs
,
563 __u32 expected_stripes
= lov_get_stripecnt(&obd
->u
.lov
,
565 if (osfs
->os_files
!= LOV_U64_MAX
)
566 lov_do_div64(osfs
->os_files
, expected_stripes
);
567 if (osfs
->os_ffree
!= LOV_U64_MAX
)
568 lov_do_div64(osfs
->os_ffree
, expected_stripes
);
570 spin_lock(&obd
->obd_osfs_lock
);
571 memcpy(&obd
->obd_osfs
, osfs
, sizeof(*osfs
));
572 obd
->obd_osfs_age
= cfs_time_current_64();
573 spin_unlock(&obd
->obd_osfs_lock
);
580 int lov_fini_statfs_set(struct lov_request_set
*set
)
587 if (atomic_read(&set
->set_completes
)) {
588 rc
= lov_fini_statfs(set
->set_obd
, set
->set_oi
->oi_osfs
,
589 atomic_read(&set
->set_success
));
595 void lov_update_statfs(struct obd_statfs
*osfs
, struct obd_statfs
*lov_sfs
,
598 int shift
= 0, quit
= 0;
602 memcpy(osfs
, lov_sfs
, sizeof(*lov_sfs
));
604 if (osfs
->os_bsize
!= lov_sfs
->os_bsize
) {
605 /* assume all block sizes are always powers of 2 */
606 /* get the bits difference */
607 tmp
= osfs
->os_bsize
| lov_sfs
->os_bsize
;
608 for (shift
= 0; shift
<= 64; ++shift
) {
620 if (osfs
->os_bsize
< lov_sfs
->os_bsize
) {
621 osfs
->os_bsize
= lov_sfs
->os_bsize
;
623 osfs
->os_bfree
>>= shift
;
624 osfs
->os_bavail
>>= shift
;
625 osfs
->os_blocks
>>= shift
;
626 } else if (shift
!= 0) {
627 lov_sfs
->os_bfree
>>= shift
;
628 lov_sfs
->os_bavail
>>= shift
;
629 lov_sfs
->os_blocks
>>= shift
;
631 osfs
->os_bfree
+= lov_sfs
->os_bfree
;
632 osfs
->os_bavail
+= lov_sfs
->os_bavail
;
633 osfs
->os_blocks
+= lov_sfs
->os_blocks
;
634 /* XXX not sure about this one - depends on policy.
635 * - could be minimum if we always stripe on all OBDs
636 * (but that would be wrong for any other policy,
637 * if one of the OBDs has no more objects left)
638 * - could be sum if we stripe whole objects
639 * - could be average, just to give a nice number
641 * To give a "reasonable" (if not wholly accurate)
642 * number, we divide the total number of free objects
643 * by expected stripe count (watch out for overflow).
645 LOV_SUM_MAX(osfs
->os_files
, lov_sfs
->os_files
);
646 LOV_SUM_MAX(osfs
->os_ffree
, lov_sfs
->os_ffree
);
650 /* The callback for osc_statfs_async that finalizes a request info when a
651 * response is received. */
652 static int cb_statfs_update(void *cookie
, int rc
)
654 struct obd_info
*oinfo
= cookie
;
655 struct lov_request
*lovreq
;
656 struct lov_request_set
*set
;
657 struct obd_statfs
*osfs
, *lov_sfs
;
659 struct lov_tgt_desc
*tgt
;
660 struct obd_device
*lovobd
, *tgtobd
;
663 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
664 set
= lovreq
->rq_rqset
;
665 lovobd
= set
->set_obd
;
666 lov
= &lovobd
->u
.lov
;
667 osfs
= set
->set_oi
->oi_osfs
;
668 lov_sfs
= oinfo
->oi_osfs
;
669 success
= atomic_read(&set
->set_success
);
670 /* XXX: the same is done in lov_update_common_set, however
671 lovset->set_exp is not initialized. */
672 lov_update_set(set
, lovreq
, rc
);
677 tgt
= lov
->lov_tgts
[lovreq
->rq_idx
];
678 if (!tgt
|| !tgt
->ltd_active
)
681 tgtobd
= class_exp2obd(tgt
->ltd_exp
);
682 spin_lock(&tgtobd
->obd_osfs_lock
);
683 memcpy(&tgtobd
->obd_osfs
, lov_sfs
, sizeof(*lov_sfs
));
684 if ((oinfo
->oi_flags
& OBD_STATFS_FROM_CACHE
) == 0)
685 tgtobd
->obd_osfs_age
= cfs_time_current_64();
686 spin_unlock(&tgtobd
->obd_osfs_lock
);
689 lov_update_statfs(osfs
, lov_sfs
, success
);
693 if (set
->set_oi
->oi_flags
& OBD_STATFS_PTLRPCD
&&
694 lov_set_finished(set
, 0)) {
695 lov_statfs_interpret(NULL
, set
, set
->set_count
!=
696 atomic_read(&set
->set_success
));
702 int lov_prep_statfs_set(struct obd_device
*obd
, struct obd_info
*oinfo
,
703 struct lov_request_set
**reqset
)
705 struct lov_request_set
*set
;
706 struct lov_obd
*lov
= &obd
->u
.lov
;
709 set
= kzalloc(sizeof(*set
), GFP_NOFS
);
717 /* We only get block data from the OBD */
718 for (i
= 0; i
< lov
->desc
.ld_tgt_count
; i
++) {
719 struct lov_request
*req
;
721 if (lov
->lov_tgts
[i
] == NULL
||
722 (!lov_check_and_wait_active(lov
, i
) &&
723 (oinfo
->oi_flags
& OBD_STATFS_NODELAY
))) {
724 CDEBUG(D_HA
, "lov idx %d inactive\n", i
);
728 /* skip targets that have been explicitly disabled by the
730 if (!lov
->lov_tgts
[i
]->ltd_exp
) {
731 CDEBUG(D_HA
, "lov idx %d administratively disabled\n", i
);
735 req
= kzalloc(sizeof(*req
), GFP_NOFS
);
741 req
->rq_oi
.oi_osfs
= kzalloc(sizeof(*req
->rq_oi
.oi_osfs
),
743 if (req
->rq_oi
.oi_osfs
== NULL
) {
750 req
->rq_oi
.oi_cb_up
= cb_statfs_update
;
751 req
->rq_oi
.oi_flags
= oinfo
->oi_flags
;
753 lov_set_add_req(req
, set
);
755 if (!set
->set_count
) {
762 lov_fini_statfs_set(set
);
This page took 0.048095 seconds and 5 git commands to generate.