4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_io for LOV layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
42 #define DEBUG_SUBSYSTEM S_LOV
44 #include "lov_cl_internal.h"
50 static inline void lov_sub_enter(struct lov_io_sub
*sub
)
54 static inline void lov_sub_exit(struct lov_io_sub
*sub
)
59 static void lov_io_sub_fini(const struct lu_env
*env
, struct lov_io
*lio
,
60 struct lov_io_sub
*sub
)
62 if (sub
->sub_io
!= NULL
) {
63 if (sub
->sub_io_initialized
) {
65 cl_io_fini(sub
->sub_env
, sub
->sub_io
);
67 sub
->sub_io_initialized
= 0;
68 lio
->lis_active_subios
--;
70 if (sub
->sub_stripe
== lio
->lis_single_subio_index
)
71 lio
->lis_single_subio_index
= -1;
72 else if (!sub
->sub_borrowed
)
73 OBD_FREE_PTR(sub
->sub_io
);
76 if (sub
->sub_env
!= NULL
&& !IS_ERR(sub
->sub_env
)) {
77 if (!sub
->sub_borrowed
)
78 cl_env_put(sub
->sub_env
, &sub
->sub_refcheck
);
83 static void lov_io_sub_inherit(struct cl_io
*io
, struct lov_io
*lio
,
84 int stripe
, loff_t start
, loff_t end
)
86 struct lov_stripe_md
*lsm
= lio
->lis_object
->lo_lsm
;
87 struct cl_io
*parent
= lio
->lis_cl
.cis_io
;
89 switch (io
->ci_type
) {
91 io
->u
.ci_setattr
.sa_attr
= parent
->u
.ci_setattr
.sa_attr
;
92 io
->u
.ci_setattr
.sa_valid
= parent
->u
.ci_setattr
.sa_valid
;
93 io
->u
.ci_setattr
.sa_capa
= parent
->u
.ci_setattr
.sa_capa
;
94 if (cl_io_is_trunc(io
)) {
95 loff_t new_size
= parent
->u
.ci_setattr
.sa_attr
.lvb_size
;
97 new_size
= lov_size_to_stripe(lsm
, new_size
, stripe
);
98 io
->u
.ci_setattr
.sa_attr
.lvb_size
= new_size
;
103 struct cl_object
*obj
= parent
->ci_obj
;
104 loff_t off
= cl_offset(obj
, parent
->u
.ci_fault
.ft_index
);
106 io
->u
.ci_fault
= parent
->u
.ci_fault
;
107 off
= lov_size_to_stripe(lsm
, off
, stripe
);
108 io
->u
.ci_fault
.ft_index
= cl_index(obj
, off
);
112 io
->u
.ci_fsync
.fi_start
= start
;
113 io
->u
.ci_fsync
.fi_end
= end
;
114 io
->u
.ci_fsync
.fi_capa
= parent
->u
.ci_fsync
.fi_capa
;
115 io
->u
.ci_fsync
.fi_fid
= parent
->u
.ci_fsync
.fi_fid
;
116 io
->u
.ci_fsync
.fi_mode
= parent
->u
.ci_fsync
.fi_mode
;
121 io
->u
.ci_wr
.wr_sync
= cl_io_is_sync_write(parent
);
122 if (cl_io_is_append(parent
)) {
123 io
->u
.ci_wr
.wr_append
= 1;
125 io
->u
.ci_rw
.crw_pos
= start
;
126 io
->u
.ci_rw
.crw_count
= end
- start
;
135 static int lov_io_sub_init(const struct lu_env
*env
, struct lov_io
*lio
,
136 struct lov_io_sub
*sub
)
138 struct lov_object
*lov
= lio
->lis_object
;
139 struct lov_device
*ld
= lu2lov_dev(lov2cl(lov
)->co_lu
.lo_dev
);
140 struct cl_io
*sub_io
;
141 struct cl_object
*sub_obj
;
142 struct cl_io
*io
= lio
->lis_cl
.cis_io
;
144 int stripe
= sub
->sub_stripe
;
147 LASSERT(sub
->sub_io
== NULL
);
148 LASSERT(sub
->sub_env
== NULL
);
149 LASSERT(sub
->sub_stripe
< lio
->lis_stripe_count
);
152 sub
->sub_io_initialized
= 0;
153 sub
->sub_borrowed
= 0;
155 if (lio
->lis_mem_frozen
) {
156 LASSERT(mutex_is_locked(&ld
->ld_mutex
));
157 sub
->sub_io
= &ld
->ld_emrg
[stripe
]->emrg_subio
;
158 sub
->sub_env
= ld
->ld_emrg
[stripe
]->emrg_env
;
159 sub
->sub_borrowed
= 1;
163 /* obtain new environment */
164 cookie
= cl_env_reenter();
165 sub
->sub_env
= cl_env_get(&sub
->sub_refcheck
);
166 cl_env_reexit(cookie
);
167 if (IS_ERR(sub
->sub_env
))
168 result
= PTR_ERR(sub
->sub_env
);
172 * First sub-io. Use ->lis_single_subio to
173 * avoid dynamic allocation.
175 if (lio
->lis_active_subios
== 0) {
176 sub
->sub_io
= &lio
->lis_single_subio
;
177 lio
->lis_single_subio_index
= stripe
;
179 OBD_ALLOC_PTR(sub
->sub_io
);
180 if (sub
->sub_io
== NULL
)
187 sub_obj
= lovsub2cl(lov_r0(lov
)->lo_sub
[stripe
]);
188 sub_io
= sub
->sub_io
;
190 sub_io
->ci_obj
= sub_obj
;
191 sub_io
->ci_result
= 0;
193 sub_io
->ci_parent
= io
;
194 sub_io
->ci_lockreq
= io
->ci_lockreq
;
195 sub_io
->ci_type
= io
->ci_type
;
196 sub_io
->ci_no_srvlock
= io
->ci_no_srvlock
;
197 sub_io
->ci_noatime
= io
->ci_noatime
;
200 result
= cl_io_sub_init(sub
->sub_env
, sub_io
,
201 io
->ci_type
, sub_obj
);
204 lio
->lis_active_subios
++;
205 sub
->sub_io_initialized
= 1;
210 lov_io_sub_fini(env
, lio
, sub
);
214 struct lov_io_sub
*lov_sub_get(const struct lu_env
*env
,
215 struct lov_io
*lio
, int stripe
)
218 struct lov_io_sub
*sub
= &lio
->lis_subs
[stripe
];
220 LASSERT(stripe
< lio
->lis_stripe_count
);
222 if (!sub
->sub_io_initialized
) {
223 sub
->sub_stripe
= stripe
;
224 rc
= lov_io_sub_init(env
, lio
, sub
);
234 void lov_sub_put(struct lov_io_sub
*sub
)
239 /*****************************************************************************
245 static int lov_page_stripe(const struct cl_page
*page
)
247 struct lovsub_object
*subobj
;
250 lu_object_locate(page
->cp_child
->cp_obj
->co_lu
.lo_header
,
251 &lovsub_device_type
));
252 LASSERT(subobj
!= NULL
);
253 return subobj
->lso_index
;
256 struct lov_io_sub
*lov_page_subio(const struct lu_env
*env
, struct lov_io
*lio
,
257 const struct cl_page_slice
*slice
)
259 struct lov_stripe_md
*lsm
= lio
->lis_object
->lo_lsm
;
260 struct cl_page
*page
= slice
->cpl_page
;
263 LASSERT(lio
->lis_cl
.cis_io
!= NULL
);
264 LASSERT(cl2lov(slice
->cpl_obj
) == lio
->lis_object
);
265 LASSERT(lsm
!= NULL
);
266 LASSERT(lio
->lis_nr_subios
> 0);
268 stripe
= lov_page_stripe(page
);
269 return lov_sub_get(env
, lio
, stripe
);
273 static int lov_io_subio_init(const struct lu_env
*env
, struct lov_io
*lio
,
276 struct lov_stripe_md
*lsm
= lio
->lis_object
->lo_lsm
;
279 LASSERT(lio
->lis_object
!= NULL
);
282 * Need to be optimized, we can't afford to allocate a piece of memory
283 * when writing a page. -jay
285 OBD_ALLOC_LARGE(lio
->lis_subs
,
286 lsm
->lsm_stripe_count
* sizeof(lio
->lis_subs
[0]));
287 if (lio
->lis_subs
!= NULL
) {
288 lio
->lis_nr_subios
= lio
->lis_stripe_count
;
289 lio
->lis_single_subio_index
= -1;
290 lio
->lis_active_subios
= 0;
297 static void lov_io_slice_init(struct lov_io
*lio
,
298 struct lov_object
*obj
, struct cl_io
*io
)
301 lio
->lis_object
= obj
;
303 LASSERT(obj
->lo_lsm
!= NULL
);
304 lio
->lis_stripe_count
= obj
->lo_lsm
->lsm_stripe_count
;
306 switch (io
->ci_type
) {
309 lio
->lis_pos
= io
->u
.ci_rw
.crw_pos
;
310 lio
->lis_endpos
= io
->u
.ci_rw
.crw_pos
+ io
->u
.ci_rw
.crw_count
;
311 lio
->lis_io_endpos
= lio
->lis_endpos
;
312 if (cl_io_is_append(io
)) {
313 LASSERT(io
->ci_type
== CIT_WRITE
);
315 lio
->lis_endpos
= OBD_OBJECT_EOF
;
320 if (cl_io_is_trunc(io
))
321 lio
->lis_pos
= io
->u
.ci_setattr
.sa_attr
.lvb_size
;
324 lio
->lis_endpos
= OBD_OBJECT_EOF
;
328 pgoff_t index
= io
->u
.ci_fault
.ft_index
;
329 lio
->lis_pos
= cl_offset(io
->ci_obj
, index
);
330 lio
->lis_endpos
= cl_offset(io
->ci_obj
, index
+ 1);
335 lio
->lis_pos
= io
->u
.ci_fsync
.fi_start
;
336 lio
->lis_endpos
= io
->u
.ci_fsync
.fi_end
;
342 lio
->lis_endpos
= OBD_OBJECT_EOF
;
350 static void lov_io_fini(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
352 struct lov_io
*lio
= cl2lov_io(env
, ios
);
353 struct lov_object
*lov
= cl2lov(ios
->cis_obj
);
356 if (lio
->lis_subs
!= NULL
) {
357 for (i
= 0; i
< lio
->lis_nr_subios
; i
++)
358 lov_io_sub_fini(env
, lio
, &lio
->lis_subs
[i
]);
359 OBD_FREE_LARGE(lio
->lis_subs
,
360 lio
->lis_nr_subios
* sizeof(lio
->lis_subs
[0]));
361 lio
->lis_nr_subios
= 0;
364 LASSERT(atomic_read(&lov
->lo_active_ios
) > 0);
365 if (atomic_dec_and_test(&lov
->lo_active_ios
))
366 wake_up_all(&lov
->lo_waitq
);
369 static obd_off
lov_offset_mod(obd_off val
, int delta
)
371 if (val
!= OBD_OBJECT_EOF
)
376 static int lov_io_iter_init(const struct lu_env
*env
,
377 const struct cl_io_slice
*ios
)
379 struct lov_io
*lio
= cl2lov_io(env
, ios
);
380 struct lov_stripe_md
*lsm
= lio
->lis_object
->lo_lsm
;
381 struct lov_io_sub
*sub
;
388 endpos
= lov_offset_mod(lio
->lis_endpos
, -1);
389 for (stripe
= 0; stripe
< lio
->lis_stripe_count
; stripe
++) {
390 if (!lov_stripe_intersects(lsm
, stripe
, lio
->lis_pos
,
391 endpos
, &start
, &end
))
394 end
= lov_offset_mod(end
, +1);
395 sub
= lov_sub_get(env
, lio
, stripe
);
397 lov_io_sub_inherit(sub
->sub_io
, lio
, stripe
,
399 rc
= cl_io_iter_init(sub
->sub_env
, sub
->sub_io
);
401 CDEBUG(D_VFSTRACE
, "shrink: %d ["LPU64
", "LPU64
")\n",
407 list_add_tail(&sub
->sub_linkage
, &lio
->lis_active
);
414 static int lov_io_rw_iter_init(const struct lu_env
*env
,
415 const struct cl_io_slice
*ios
)
417 struct lov_io
*lio
= cl2lov_io(env
, ios
);
418 struct cl_io
*io
= ios
->cis_io
;
419 struct lov_stripe_md
*lsm
= lio
->lis_object
->lo_lsm
;
420 __u64 start
= io
->u
.ci_rw
.crw_pos
;
422 unsigned long ssize
= lsm
->lsm_stripe_size
;
424 LASSERT(io
->ci_type
== CIT_READ
|| io
->ci_type
== CIT_WRITE
);
426 /* fast path for common case. */
427 if (lio
->lis_nr_subios
!= 1 && !cl_io_is_append(io
)) {
429 lov_do_div64(start
, ssize
);
430 next
= (start
+ 1) * ssize
;
431 if (next
<= start
* ssize
)
434 io
->ci_continue
= next
< lio
->lis_io_endpos
;
435 io
->u
.ci_rw
.crw_count
= min_t(loff_t
, lio
->lis_io_endpos
,
436 next
) - io
->u
.ci_rw
.crw_pos
;
437 lio
->lis_pos
= io
->u
.ci_rw
.crw_pos
;
438 lio
->lis_endpos
= io
->u
.ci_rw
.crw_pos
+ io
->u
.ci_rw
.crw_count
;
439 CDEBUG(D_VFSTRACE
, "stripe: "LPU64
" chunk: ["LPU64
", "LPU64
") "
440 LPU64
"\n", (__u64
)start
, lio
->lis_pos
, lio
->lis_endpos
,
441 (__u64
)lio
->lis_io_endpos
);
444 * XXX The following call should be optimized: we know, that
445 * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
447 return lov_io_iter_init(env
, ios
);
450 static int lov_io_call(const struct lu_env
*env
, struct lov_io
*lio
,
451 int (*iofunc
)(const struct lu_env
*, struct cl_io
*))
453 struct cl_io
*parent
= lio
->lis_cl
.cis_io
;
454 struct lov_io_sub
*sub
;
457 list_for_each_entry(sub
, &lio
->lis_active
, sub_linkage
) {
459 rc
= iofunc(sub
->sub_env
, sub
->sub_io
);
464 if (parent
->ci_result
== 0)
465 parent
->ci_result
= sub
->sub_io
->ci_result
;
470 static int lov_io_lock(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
472 return lov_io_call(env
, cl2lov_io(env
, ios
), cl_io_lock
);
475 static int lov_io_start(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
477 return lov_io_call(env
, cl2lov_io(env
, ios
), cl_io_start
);
480 static int lov_io_end_wrapper(const struct lu_env
*env
, struct cl_io
*io
)
483 * It's possible that lov_io_start() wasn't called against this
484 * sub-io, either because previous sub-io failed, or upper layer
487 if (io
->ci_state
== CIS_IO_GOING
)
490 io
->ci_state
= CIS_IO_FINISHED
;
494 static int lov_io_iter_fini_wrapper(const struct lu_env
*env
, struct cl_io
*io
)
496 cl_io_iter_fini(env
, io
);
500 static int lov_io_unlock_wrapper(const struct lu_env
*env
, struct cl_io
*io
)
502 cl_io_unlock(env
, io
);
506 static void lov_io_end(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
510 rc
= lov_io_call(env
, cl2lov_io(env
, ios
), lov_io_end_wrapper
);
514 static void lov_io_iter_fini(const struct lu_env
*env
,
515 const struct cl_io_slice
*ios
)
517 struct lov_io
*lio
= cl2lov_io(env
, ios
);
520 rc
= lov_io_call(env
, lio
, lov_io_iter_fini_wrapper
);
522 while (!list_empty(&lio
->lis_active
))
523 list_del_init(lio
->lis_active
.next
);
526 static void lov_io_unlock(const struct lu_env
*env
,
527 const struct cl_io_slice
*ios
)
531 rc
= lov_io_call(env
, cl2lov_io(env
, ios
), lov_io_unlock_wrapper
);
536 static struct cl_page_list
*lov_io_submit_qin(struct lov_device
*ld
,
537 struct cl_page_list
*qin
,
540 return alloc
? &qin
[idx
] : &ld
->ld_emrg
[idx
]->emrg_page_list
;
544 * lov implementation of cl_operations::cio_submit() method. It takes a list
545 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
546 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
549 * Major complication of this function is a need to handle memory cleansing:
550 * cl_io_submit() is called to write out pages as a part of VM memory
551 * reclamation, and hence it may not fail due to memory shortages (system
552 * dead-locks otherwise). To deal with this, some resources (sub-lists,
553 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
554 * not-memory cleansing context), and in case of memory shortage, these
555 * pre-allocated resources are used by lov_io_submit() under
556 * lov_device::ld_mutex mutex.
558 static int lov_io_submit(const struct lu_env
*env
,
559 const struct cl_io_slice
*ios
,
560 enum cl_req_type crt
, struct cl_2queue
*queue
)
562 struct lov_io
*lio
= cl2lov_io(env
, ios
);
563 struct lov_object
*obj
= lio
->lis_object
;
564 struct lov_device
*ld
= lu2lov_dev(lov2cl(obj
)->co_lu
.lo_dev
);
565 struct cl_page_list
*qin
= &queue
->c2_qin
;
566 struct cl_2queue
*cl2q
= &lov_env_info(env
)->lti_cl2q
;
567 struct cl_page_list
*stripes_qin
= NULL
;
568 struct cl_page
*page
;
572 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
576 !(current
->flags
& PF_MEMALLOC
);
578 if (lio
->lis_active_subios
== 1) {
579 int idx
= lio
->lis_single_subio_index
;
580 struct lov_io_sub
*sub
;
582 LASSERT(idx
< lio
->lis_nr_subios
);
583 sub
= lov_sub_get(env
, lio
, idx
);
584 LASSERT(!IS_ERR(sub
));
585 LASSERT(sub
->sub_io
== &lio
->lis_single_subio
);
586 rc
= cl_io_submit_rw(sub
->sub_env
, sub
->sub_io
,
592 LASSERT(lio
->lis_subs
!= NULL
);
594 OBD_ALLOC_LARGE(stripes_qin
,
595 sizeof(*stripes_qin
) * lio
->lis_nr_subios
);
596 if (stripes_qin
== NULL
)
599 for (stripe
= 0; stripe
< lio
->lis_nr_subios
; stripe
++)
600 cl_page_list_init(&stripes_qin
[stripe
]);
603 * If we get here, it means pageout & swap doesn't help.
604 * In order to not make things worse, even don't try to
605 * allocate the memory with __GFP_NOWARN. -jay
607 mutex_lock(&ld
->ld_mutex
);
608 lio
->lis_mem_frozen
= 1;
611 cl_2queue_init(cl2q
);
612 cl_page_list_for_each_safe(page
, tmp
, qin
) {
613 stripe
= lov_page_stripe(page
);
614 cl_page_list_move(QIN(stripe
), qin
, page
);
617 for (stripe
= 0; stripe
< lio
->lis_nr_subios
; stripe
++) {
618 struct lov_io_sub
*sub
;
619 struct cl_page_list
*sub_qin
= QIN(stripe
);
621 if (list_empty(&sub_qin
->pl_pages
))
624 cl_page_list_splice(sub_qin
, &cl2q
->c2_qin
);
625 sub
= lov_sub_get(env
, lio
, stripe
);
627 rc
= cl_io_submit_rw(sub
->sub_env
, sub
->sub_io
,
632 cl_page_list_splice(&cl2q
->c2_qin
, &queue
->c2_qin
);
633 cl_page_list_splice(&cl2q
->c2_qout
, &queue
->c2_qout
);
638 for (stripe
= 0; stripe
< lio
->lis_nr_subios
; stripe
++) {
639 struct cl_page_list
*sub_qin
= QIN(stripe
);
641 if (list_empty(&sub_qin
->pl_pages
))
644 cl_page_list_splice(sub_qin
, qin
);
648 OBD_FREE_LARGE(stripes_qin
,
649 sizeof(*stripes_qin
) * lio
->lis_nr_subios
);
653 for (i
= 0; i
< lio
->lis_nr_subios
; i
++) {
654 struct cl_io
*cio
= lio
->lis_subs
[i
].sub_io
;
656 if (cio
&& cio
== &ld
->ld_emrg
[i
]->emrg_subio
)
657 lov_io_sub_fini(env
, lio
, &lio
->lis_subs
[i
]);
659 lio
->lis_mem_frozen
= 0;
660 mutex_unlock(&ld
->ld_mutex
);
667 static int lov_io_prepare_write(const struct lu_env
*env
,
668 const struct cl_io_slice
*ios
,
669 const struct cl_page_slice
*slice
,
670 unsigned from
, unsigned to
)
672 struct lov_io
*lio
= cl2lov_io(env
, ios
);
673 struct cl_page
*sub_page
= lov_sub_page(slice
);
674 struct lov_io_sub
*sub
;
677 sub
= lov_page_subio(env
, lio
, slice
);
679 result
= cl_io_prepare_write(sub
->sub_env
, sub
->sub_io
,
683 result
= PTR_ERR(sub
);
687 static int lov_io_commit_write(const struct lu_env
*env
,
688 const struct cl_io_slice
*ios
,
689 const struct cl_page_slice
*slice
,
690 unsigned from
, unsigned to
)
692 struct lov_io
*lio
= cl2lov_io(env
, ios
);
693 struct cl_page
*sub_page
= lov_sub_page(slice
);
694 struct lov_io_sub
*sub
;
697 sub
= lov_page_subio(env
, lio
, slice
);
699 result
= cl_io_commit_write(sub
->sub_env
, sub
->sub_io
,
703 result
= PTR_ERR(sub
);
707 static int lov_io_fault_start(const struct lu_env
*env
,
708 const struct cl_io_slice
*ios
)
710 struct cl_fault_io
*fio
;
712 struct lov_io_sub
*sub
;
714 fio
= &ios
->cis_io
->u
.ci_fault
;
715 lio
= cl2lov_io(env
, ios
);
716 sub
= lov_sub_get(env
, lio
, lov_page_stripe(fio
->ft_page
));
717 sub
->sub_io
->u
.ci_fault
.ft_nob
= fio
->ft_nob
;
719 return lov_io_start(env
, ios
);
722 static void lov_io_fsync_end(const struct lu_env
*env
,
723 const struct cl_io_slice
*ios
)
725 struct lov_io
*lio
= cl2lov_io(env
, ios
);
726 struct lov_io_sub
*sub
;
727 unsigned int *written
= &ios
->cis_io
->u
.ci_fsync
.fi_nr_written
;
730 list_for_each_entry(sub
, &lio
->lis_active
, sub_linkage
) {
731 struct cl_io
*subio
= sub
->sub_io
;
734 lov_io_end_wrapper(sub
->sub_env
, subio
);
737 if (subio
->ci_result
== 0)
738 *written
+= subio
->u
.ci_fsync
.fi_nr_written
;
742 static const struct cl_io_operations lov_io_ops
= {
745 .cio_fini
= lov_io_fini
,
746 .cio_iter_init
= lov_io_rw_iter_init
,
747 .cio_iter_fini
= lov_io_iter_fini
,
748 .cio_lock
= lov_io_lock
,
749 .cio_unlock
= lov_io_unlock
,
750 .cio_start
= lov_io_start
,
751 .cio_end
= lov_io_end
754 .cio_fini
= lov_io_fini
,
755 .cio_iter_init
= lov_io_rw_iter_init
,
756 .cio_iter_fini
= lov_io_iter_fini
,
757 .cio_lock
= lov_io_lock
,
758 .cio_unlock
= lov_io_unlock
,
759 .cio_start
= lov_io_start
,
760 .cio_end
= lov_io_end
763 .cio_fini
= lov_io_fini
,
764 .cio_iter_init
= lov_io_iter_init
,
765 .cio_iter_fini
= lov_io_iter_fini
,
766 .cio_lock
= lov_io_lock
,
767 .cio_unlock
= lov_io_unlock
,
768 .cio_start
= lov_io_start
,
769 .cio_end
= lov_io_end
772 .cio_fini
= lov_io_fini
,
773 .cio_iter_init
= lov_io_iter_init
,
774 .cio_iter_fini
= lov_io_iter_fini
,
775 .cio_lock
= lov_io_lock
,
776 .cio_unlock
= lov_io_unlock
,
777 .cio_start
= lov_io_fault_start
,
778 .cio_end
= lov_io_end
781 .cio_fini
= lov_io_fini
,
782 .cio_iter_init
= lov_io_iter_init
,
783 .cio_iter_fini
= lov_io_iter_fini
,
784 .cio_lock
= lov_io_lock
,
785 .cio_unlock
= lov_io_unlock
,
786 .cio_start
= lov_io_start
,
787 .cio_end
= lov_io_fsync_end
790 .cio_fini
= lov_io_fini
795 .cio_submit
= lov_io_submit
798 .cio_submit
= lov_io_submit
801 .cio_prepare_write
= lov_io_prepare_write
,
802 .cio_commit_write
= lov_io_commit_write
805 /*****************************************************************************
807 * Empty lov io operations.
811 static void lov_empty_io_fini(const struct lu_env
*env
,
812 const struct cl_io_slice
*ios
)
814 struct lov_object
*lov
= cl2lov(ios
->cis_obj
);
816 if (atomic_dec_and_test(&lov
->lo_active_ios
))
817 wake_up_all(&lov
->lo_waitq
);
820 static void lov_empty_impossible(const struct lu_env
*env
,
821 struct cl_io_slice
*ios
)
826 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
829 * An io operation vector for files without stripes.
831 static const struct cl_io_operations lov_empty_io_ops
= {
834 .cio_fini
= lov_empty_io_fini
,
836 .cio_iter_init
= LOV_EMPTY_IMPOSSIBLE
,
837 .cio_lock
= LOV_EMPTY_IMPOSSIBLE
,
838 .cio_start
= LOV_EMPTY_IMPOSSIBLE
,
839 .cio_end
= LOV_EMPTY_IMPOSSIBLE
843 .cio_fini
= lov_empty_io_fini
,
844 .cio_iter_init
= LOV_EMPTY_IMPOSSIBLE
,
845 .cio_lock
= LOV_EMPTY_IMPOSSIBLE
,
846 .cio_start
= LOV_EMPTY_IMPOSSIBLE
,
847 .cio_end
= LOV_EMPTY_IMPOSSIBLE
850 .cio_fini
= lov_empty_io_fini
,
851 .cio_iter_init
= LOV_EMPTY_IMPOSSIBLE
,
852 .cio_lock
= LOV_EMPTY_IMPOSSIBLE
,
853 .cio_start
= LOV_EMPTY_IMPOSSIBLE
,
854 .cio_end
= LOV_EMPTY_IMPOSSIBLE
857 .cio_fini
= lov_empty_io_fini
,
858 .cio_iter_init
= LOV_EMPTY_IMPOSSIBLE
,
859 .cio_lock
= LOV_EMPTY_IMPOSSIBLE
,
860 .cio_start
= LOV_EMPTY_IMPOSSIBLE
,
861 .cio_end
= LOV_EMPTY_IMPOSSIBLE
864 .cio_fini
= lov_empty_io_fini
867 .cio_fini
= lov_empty_io_fini
872 .cio_submit
= LOV_EMPTY_IMPOSSIBLE
875 .cio_submit
= LOV_EMPTY_IMPOSSIBLE
878 .cio_commit_write
= LOV_EMPTY_IMPOSSIBLE
881 int lov_io_init_raid0(const struct lu_env
*env
, struct cl_object
*obj
,
884 struct lov_io
*lio
= lov_env_io(env
);
885 struct lov_object
*lov
= cl2lov(obj
);
887 INIT_LIST_HEAD(&lio
->lis_active
);
888 lov_io_slice_init(lio
, lov
, io
);
889 if (io
->ci_result
== 0) {
890 io
->ci_result
= lov_io_subio_init(env
, lio
, io
);
891 if (io
->ci_result
== 0) {
892 cl_io_slice_add(io
, &lio
->lis_cl
, obj
, &lov_io_ops
);
893 atomic_inc(&lov
->lo_active_ios
);
896 return io
->ci_result
;
899 int lov_io_init_empty(const struct lu_env
*env
, struct cl_object
*obj
,
902 struct lov_object
*lov
= cl2lov(obj
);
903 struct lov_io
*lio
= lov_env_io(env
);
906 lio
->lis_object
= lov
;
907 switch (io
->ci_type
) {
923 CERROR("Page fault on a file without stripes: "DFID
"\n",
924 PFID(lu_object_fid(&obj
->co_lu
)));
928 cl_io_slice_add(io
, &lio
->lis_cl
, obj
, &lov_empty_io_ops
);
929 atomic_inc(&lov
->lo_active_ios
);
932 io
->ci_result
= result
< 0 ? result
: 0;
936 int lov_io_init_released(const struct lu_env
*env
, struct cl_object
*obj
,
939 struct lov_object
*lov
= cl2lov(obj
);
940 struct lov_io
*lio
= lov_env_io(env
);
943 LASSERT(lov
->lo_lsm
!= NULL
);
944 lio
->lis_object
= lov
;
946 switch (io
->ci_type
) {
948 LASSERTF(0, "invalid type %d\n", io
->ci_type
);
954 /* the truncate to 0 is managed by MDT:
955 * - in open, for open O_TRUNC
956 * - in setattr, for truncate
958 /* the truncate is for size > 0 so triggers a restore */
959 if (cl_io_is_trunc(io
))
960 io
->ci_restore_needed
= 1;
966 io
->ci_restore_needed
= 1;
971 cl_io_slice_add(io
, &lio
->lis_cl
, obj
, &lov_empty_io_ops
);
972 atomic_inc(&lov
->lo_active_ios
);
975 io
->ci_result
= result
< 0 ? result
: 0;