4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * cl code shared between vvp and liblustre (and other Lustre clients in the
39 * Author: Nikita Danilov <nikita.danilov@sun.com>
42 #define DEBUG_SUBSYSTEM S_LLITE
44 # include <linux/libcfs/libcfs.h>
45 # include <linux/fs.h>
46 # include <linux/sched.h>
47 # include <linux/mm.h>
48 # include <linux/quotaops.h>
49 # include <linux/highmem.h>
50 # include <linux/pagemap.h>
51 # include <linux/rbtree.h>
54 #include <obd_support.h>
55 #include <lustre_fid.h>
56 #include <lustre_lite.h>
57 #include <lustre_dlm.h>
58 #include <lustre_ver.h>
59 #include <lustre_mdc.h>
60 #include <cl_object.h>
64 #include "../llite/llite_internal.h"
66 const struct cl_req_operations ccc_req_ops
;
69 * ccc_ prefix stands for "Common Client Code".
72 static struct kmem_cache
*ccc_lock_kmem
;
73 static struct kmem_cache
*ccc_object_kmem
;
74 static struct kmem_cache
*ccc_thread_kmem
;
75 static struct kmem_cache
*ccc_session_kmem
;
76 static struct kmem_cache
*ccc_req_kmem
;
78 static struct lu_kmem_descr ccc_caches
[] = {
80 .ckd_cache
= &ccc_lock_kmem
,
81 .ckd_name
= "ccc_lock_kmem",
82 .ckd_size
= sizeof(struct ccc_lock
)
85 .ckd_cache
= &ccc_object_kmem
,
86 .ckd_name
= "ccc_object_kmem",
87 .ckd_size
= sizeof(struct ccc_object
)
90 .ckd_cache
= &ccc_thread_kmem
,
91 .ckd_name
= "ccc_thread_kmem",
92 .ckd_size
= sizeof(struct ccc_thread_info
),
95 .ckd_cache
= &ccc_session_kmem
,
96 .ckd_name
= "ccc_session_kmem",
97 .ckd_size
= sizeof(struct ccc_session
)
100 .ckd_cache
= &ccc_req_kmem
,
101 .ckd_name
= "ccc_req_kmem",
102 .ckd_size
= sizeof(struct ccc_req
)
109 /*****************************************************************************
111 * Vvp device and device type functions.
115 void *ccc_key_init(const struct lu_context
*ctx
,
116 struct lu_context_key
*key
)
118 struct ccc_thread_info
*info
;
120 OBD_SLAB_ALLOC_PTR_GFP(info
, ccc_thread_kmem
, __GFP_IO
);
122 info
= ERR_PTR(-ENOMEM
);
126 void ccc_key_fini(const struct lu_context
*ctx
,
127 struct lu_context_key
*key
, void *data
)
129 struct ccc_thread_info
*info
= data
;
130 OBD_SLAB_FREE_PTR(info
, ccc_thread_kmem
);
133 void *ccc_session_key_init(const struct lu_context
*ctx
,
134 struct lu_context_key
*key
)
136 struct ccc_session
*session
;
138 OBD_SLAB_ALLOC_PTR_GFP(session
, ccc_session_kmem
, __GFP_IO
);
140 session
= ERR_PTR(-ENOMEM
);
144 void ccc_session_key_fini(const struct lu_context
*ctx
,
145 struct lu_context_key
*key
, void *data
)
147 struct ccc_session
*session
= data
;
148 OBD_SLAB_FREE_PTR(session
, ccc_session_kmem
);
151 struct lu_context_key ccc_key
= {
152 .lct_tags
= LCT_CL_THREAD
,
153 .lct_init
= ccc_key_init
,
154 .lct_fini
= ccc_key_fini
157 struct lu_context_key ccc_session_key
= {
158 .lct_tags
= LCT_SESSION
,
159 .lct_init
= ccc_session_key_init
,
160 .lct_fini
= ccc_session_key_fini
164 /* type constructor/destructor: ccc_type_{init,fini,start,stop}(). */
165 // LU_TYPE_INIT_FINI(ccc, &ccc_key, &ccc_session_key);
167 int ccc_device_init(const struct lu_env
*env
, struct lu_device
*d
,
168 const char *name
, struct lu_device
*next
)
170 struct ccc_device
*vdv
;
174 vdv
->cdv_next
= lu2cl_dev(next
);
176 LASSERT(d
->ld_site
!= NULL
&& next
->ld_type
!= NULL
);
177 next
->ld_site
= d
->ld_site
;
178 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(
179 env
, next
, next
->ld_type
->ldt_name
, NULL
);
182 lu_ref_add(&next
->ld_reference
, "lu-stack", &lu_site_init
);
187 struct lu_device
*ccc_device_fini(const struct lu_env
*env
,
190 return cl2lu_dev(lu2ccc_dev(d
)->cdv_next
);
193 struct lu_device
*ccc_device_alloc(const struct lu_env
*env
,
194 struct lu_device_type
*t
,
195 struct lustre_cfg
*cfg
,
196 const struct lu_device_operations
*luops
,
197 const struct cl_device_operations
*clops
)
199 struct ccc_device
*vdv
;
200 struct lu_device
*lud
;
201 struct cl_site
*site
;
206 return ERR_PTR(-ENOMEM
);
208 lud
= &vdv
->cdv_cl
.cd_lu_dev
;
209 cl_device_init(&vdv
->cdv_cl
, t
);
210 ccc2lu_dev(vdv
)->ld_ops
= luops
;
211 vdv
->cdv_cl
.cd_ops
= clops
;
215 rc
= cl_site_init(site
, &vdv
->cdv_cl
);
217 rc
= lu_site_init_finish(&site
->cs_lu
);
219 LASSERT(lud
->ld_site
== NULL
);
220 CERROR("Cannot init lu_site, rc %d.\n", rc
);
226 ccc_device_free(env
, lud
);
232 struct lu_device
*ccc_device_free(const struct lu_env
*env
,
235 struct ccc_device
*vdv
= lu2ccc_dev(d
);
236 struct cl_site
*site
= lu2cl_site(d
->ld_site
);
237 struct lu_device
*next
= cl2lu_dev(vdv
->cdv_next
);
239 if (d
->ld_site
!= NULL
) {
243 cl_device_fini(lu2cl_dev(d
));
248 int ccc_req_init(const struct lu_env
*env
, struct cl_device
*dev
,
254 OBD_SLAB_ALLOC_PTR_GFP(vrq
, ccc_req_kmem
, __GFP_IO
);
256 cl_req_slice_add(req
, &vrq
->crq_cl
, dev
, &ccc_req_ops
);
264 * An `emergency' environment used by ccc_inode_fini() when cl_env_get()
265 * fails. Access to this environment is serialized by ccc_inode_fini_guard
268 static struct lu_env
*ccc_inode_fini_env
= NULL
;
271 * A mutex serializing calls to slp_inode_fini() under extreme memory
272 * pressure, when environments cannot be allocated.
274 static DEFINE_MUTEX(ccc_inode_fini_guard
);
275 static int dummy_refcheck
;
277 int ccc_global_init(struct lu_device_type
*device_type
)
281 result
= lu_kmem_init(ccc_caches
);
285 result
= lu_device_type_init(device_type
);
289 ccc_inode_fini_env
= cl_env_alloc(&dummy_refcheck
,
290 LCT_REMEMBER
|LCT_NOREF
);
291 if (IS_ERR(ccc_inode_fini_env
)) {
292 result
= PTR_ERR(ccc_inode_fini_env
);
296 ccc_inode_fini_env
->le_ctx
.lc_cookie
= 0x4;
299 lu_device_type_fini(device_type
);
301 lu_kmem_fini(ccc_caches
);
305 void ccc_global_fini(struct lu_device_type
*device_type
)
307 if (ccc_inode_fini_env
!= NULL
) {
308 cl_env_put(ccc_inode_fini_env
, &dummy_refcheck
);
309 ccc_inode_fini_env
= NULL
;
311 lu_device_type_fini(device_type
);
312 lu_kmem_fini(ccc_caches
);
315 /*****************************************************************************
321 struct lu_object
*ccc_object_alloc(const struct lu_env
*env
,
322 const struct lu_object_header
*unused
,
323 struct lu_device
*dev
,
324 const struct cl_object_operations
*clops
,
325 const struct lu_object_operations
*luops
)
327 struct ccc_object
*vob
;
328 struct lu_object
*obj
;
330 OBD_SLAB_ALLOC_PTR_GFP(vob
, ccc_object_kmem
, __GFP_IO
);
332 struct cl_object_header
*hdr
;
335 hdr
= &vob
->cob_header
;
336 cl_object_header_init(hdr
);
337 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
338 lu_object_add_top(&hdr
->coh_lu
, obj
);
340 vob
->cob_cl
.co_ops
= clops
;
347 int ccc_object_init0(const struct lu_env
*env
,
348 struct ccc_object
*vob
,
349 const struct cl_object_conf
*conf
)
351 vob
->cob_inode
= conf
->coc_inode
;
352 vob
->cob_transient_pages
= 0;
353 cl_object_page_init(&vob
->cob_cl
, sizeof(struct ccc_page
));
357 int ccc_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
358 const struct lu_object_conf
*conf
)
360 struct ccc_device
*dev
= lu2ccc_dev(obj
->lo_dev
);
361 struct ccc_object
*vob
= lu2ccc(obj
);
362 struct lu_object
*below
;
363 struct lu_device
*under
;
366 under
= &dev
->cdv_next
->cd_lu_dev
;
367 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
, under
);
369 const struct cl_object_conf
*cconf
;
371 cconf
= lu2cl_conf(conf
);
372 INIT_LIST_HEAD(&vob
->cob_pending_list
);
373 lu_object_add(obj
, below
);
374 result
= ccc_object_init0(env
, vob
, cconf
);
380 void ccc_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
382 struct ccc_object
*vob
= lu2ccc(obj
);
385 lu_object_header_fini(obj
->lo_header
);
386 OBD_SLAB_FREE_PTR(vob
, ccc_object_kmem
);
389 int ccc_lock_init(const struct lu_env
*env
,
390 struct cl_object
*obj
, struct cl_lock
*lock
,
391 const struct cl_io
*unused
,
392 const struct cl_lock_operations
*lkops
)
394 struct ccc_lock
*clk
;
397 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
399 OBD_SLAB_ALLOC_PTR_GFP(clk
, ccc_lock_kmem
, __GFP_IO
);
401 cl_lock_slice_add(lock
, &clk
->clk_cl
, obj
, lkops
);
408 int ccc_attr_set(const struct lu_env
*env
, struct cl_object
*obj
,
409 const struct cl_attr
*attr
, unsigned valid
)
414 int ccc_object_glimpse(const struct lu_env
*env
,
415 const struct cl_object
*obj
, struct ost_lvb
*lvb
)
417 struct inode
*inode
= ccc_object_inode(obj
);
419 lvb
->lvb_mtime
= cl_inode_mtime(inode
);
420 lvb
->lvb_atime
= cl_inode_atime(inode
);
421 lvb
->lvb_ctime
= cl_inode_ctime(inode
);
423 * LU-417: Add dirty pages block count lest i_blocks reports 0, some
424 * "cp" or "tar" on remote node may think it's a completely sparse file
427 if (lvb
->lvb_size
> 0 && lvb
->lvb_blocks
== 0)
428 lvb
->lvb_blocks
= dirty_cnt(inode
);
434 int ccc_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
435 const struct cl_object_conf
*conf
)
437 /* TODO: destroy all pages attached to this object. */
441 static void ccc_object_size_lock(struct cl_object
*obj
)
443 struct inode
*inode
= ccc_object_inode(obj
);
445 cl_isize_lock(inode
);
446 cl_object_attr_lock(obj
);
449 static void ccc_object_size_unlock(struct cl_object
*obj
)
451 struct inode
*inode
= ccc_object_inode(obj
);
453 cl_object_attr_unlock(obj
);
454 cl_isize_unlock(inode
);
457 /*****************************************************************************
463 struct page
*ccc_page_vmpage(const struct lu_env
*env
,
464 const struct cl_page_slice
*slice
)
466 return cl2vm_page(slice
);
469 int ccc_page_is_under_lock(const struct lu_env
*env
,
470 const struct cl_page_slice
*slice
,
473 struct ccc_io
*cio
= ccc_env_io(env
);
474 struct cl_lock_descr
*desc
= &ccc_env_info(env
)->cti_descr
;
475 struct cl_page
*page
= slice
->cpl_page
;
479 if (io
->ci_type
== CIT_READ
|| io
->ci_type
== CIT_WRITE
||
480 io
->ci_type
== CIT_FAULT
) {
481 if (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)
484 desc
->cld_start
= page
->cp_index
;
485 desc
->cld_end
= page
->cp_index
;
486 desc
->cld_obj
= page
->cp_obj
;
487 desc
->cld_mode
= CLM_READ
;
488 result
= cl_queue_match(&io
->ci_lockset
.cls_done
,
496 int ccc_fail(const struct lu_env
*env
, const struct cl_page_slice
*slice
)
505 void ccc_transient_page_verify(const struct cl_page
*page
)
509 int ccc_transient_page_own(const struct lu_env
*env
,
510 const struct cl_page_slice
*slice
,
511 struct cl_io
*unused
,
514 ccc_transient_page_verify(slice
->cpl_page
);
518 void ccc_transient_page_assume(const struct lu_env
*env
,
519 const struct cl_page_slice
*slice
,
520 struct cl_io
*unused
)
522 ccc_transient_page_verify(slice
->cpl_page
);
525 void ccc_transient_page_unassume(const struct lu_env
*env
,
526 const struct cl_page_slice
*slice
,
527 struct cl_io
*unused
)
529 ccc_transient_page_verify(slice
->cpl_page
);
532 void ccc_transient_page_disown(const struct lu_env
*env
,
533 const struct cl_page_slice
*slice
,
534 struct cl_io
*unused
)
536 ccc_transient_page_verify(slice
->cpl_page
);
539 void ccc_transient_page_discard(const struct lu_env
*env
,
540 const struct cl_page_slice
*slice
,
541 struct cl_io
*unused
)
543 struct cl_page
*page
= slice
->cpl_page
;
545 ccc_transient_page_verify(slice
->cpl_page
);
548 * For transient pages, remove it from the radix tree.
550 cl_page_delete(env
, page
);
553 int ccc_transient_page_prep(const struct lu_env
*env
,
554 const struct cl_page_slice
*slice
,
555 struct cl_io
*unused
)
557 /* transient page should always be sent. */
561 /*****************************************************************************
567 void ccc_lock_delete(const struct lu_env
*env
,
568 const struct cl_lock_slice
*slice
)
570 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
573 void ccc_lock_fini(const struct lu_env
*env
, struct cl_lock_slice
*slice
)
575 struct ccc_lock
*clk
= cl2ccc_lock(slice
);
576 OBD_SLAB_FREE_PTR(clk
, ccc_lock_kmem
);
579 int ccc_lock_enqueue(const struct lu_env
*env
,
580 const struct cl_lock_slice
*slice
,
581 struct cl_io
*unused
, __u32 enqflags
)
583 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
587 int ccc_lock_unuse(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
589 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
593 int ccc_lock_wait(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
595 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
600 * Implementation of cl_lock_operations::clo_fits_into() methods for ccc
601 * layer. This function is executed every time io finds an existing lock in
602 * the lock cache while creating new lock. This function has to decide whether
603 * cached lock "fits" into io.
605 * \param slice lock to be checked
606 * \param io IO that wants a lock.
608 * \see lov_lock_fits_into().
610 int ccc_lock_fits_into(const struct lu_env
*env
,
611 const struct cl_lock_slice
*slice
,
612 const struct cl_lock_descr
*need
,
613 const struct cl_io
*io
)
615 const struct cl_lock
*lock
= slice
->cls_lock
;
616 const struct cl_lock_descr
*descr
= &lock
->cll_descr
;
617 const struct ccc_io
*cio
= ccc_env_io(env
);
621 * Work around DLM peculiarity: it assumes that glimpse
622 * (LDLM_FL_HAS_INTENT) lock is always LCK_PR, and returns reads lock
623 * when asked for LCK_PW lock with LDLM_FL_HAS_INTENT flag set. Make
624 * sure that glimpse doesn't get CLM_WRITE top-lock, so that it
625 * doesn't enqueue CLM_WRITE sub-locks.
627 if (cio
->cui_glimpse
)
628 result
= descr
->cld_mode
!= CLM_WRITE
;
631 * Also, don't match incomplete write locks for read, otherwise read
632 * would enqueue missing sub-locks in the write mode.
634 else if (need
->cld_mode
!= descr
->cld_mode
)
635 result
= lock
->cll_state
>= CLS_ENQUEUED
;
642 * Implements cl_lock_operations::clo_state() method for ccc layer, invoked
643 * whenever lock state changes. Transfers object attributes, that might be
644 * updated as a result of lock acquiring into inode.
646 void ccc_lock_state(const struct lu_env
*env
,
647 const struct cl_lock_slice
*slice
,
648 enum cl_lock_state state
)
650 struct cl_lock
*lock
= slice
->cls_lock
;
653 * Refresh inode attributes when the lock is moving into CLS_HELD
654 * state, and only when this is a result of real enqueue, rather than
655 * of finding lock in the cache.
657 if (state
== CLS_HELD
&& lock
->cll_state
< CLS_HELD
) {
658 struct cl_object
*obj
;
661 obj
= slice
->cls_obj
;
662 inode
= ccc_object_inode(obj
);
664 /* vmtruncate() sets the i_size
665 * under both a DLM lock and the
666 * ll_inode_size_lock(). If we don't get the
667 * ll_inode_size_lock() here we can match the DLM lock and
668 * reset i_size. generic_file_write can then trust the
669 * stale i_size when doing appending writes and effectively
670 * cancel the result of the truncate. Getting the
671 * ll_inode_size_lock() after the enqueue maintains the DLM
672 * -> ll_inode_size_lock() acquiring order. */
673 if (lock
->cll_descr
.cld_start
== 0 &&
674 lock
->cll_descr
.cld_end
== CL_PAGE_EOF
)
675 cl_merge_lvb(env
, inode
);
679 /*****************************************************************************
685 void ccc_io_fini(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
687 struct cl_io
*io
= ios
->cis_io
;
689 CLOBINVRNT(env
, io
->ci_obj
, ccc_object_invariant(io
->ci_obj
));
692 int ccc_io_one_lock_index(const struct lu_env
*env
, struct cl_io
*io
,
693 __u32 enqflags
, enum cl_lock_mode mode
,
694 pgoff_t start
, pgoff_t end
)
696 struct ccc_io
*cio
= ccc_env_io(env
);
697 struct cl_lock_descr
*descr
= &cio
->cui_link
.cill_descr
;
698 struct cl_object
*obj
= io
->ci_obj
;
700 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
702 CDEBUG(D_VFSTRACE
, "lock: %d [%lu, %lu]\n", mode
, start
, end
);
704 memset(&cio
->cui_link
, 0, sizeof(cio
->cui_link
));
706 if (cio
->cui_fd
&& (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)) {
707 descr
->cld_mode
= CLM_GROUP
;
708 descr
->cld_gid
= cio
->cui_fd
->fd_grouplock
.cg_gid
;
710 descr
->cld_mode
= mode
;
712 descr
->cld_obj
= obj
;
713 descr
->cld_start
= start
;
714 descr
->cld_end
= end
;
715 descr
->cld_enq_flags
= enqflags
;
717 cl_io_lock_add(env
, io
, &cio
->cui_link
);
721 void ccc_io_update_iov(const struct lu_env
*env
,
722 struct ccc_io
*cio
, struct cl_io
*io
)
725 size_t size
= io
->u
.ci_rw
.crw_count
;
727 cio
->cui_iov_olen
= 0;
728 if (!cl_is_normalio(env
, io
) || cio
->cui_tot_nrsegs
== 0)
731 for (i
= 0; i
< cio
->cui_tot_nrsegs
; i
++) {
732 struct iovec
*iv
= &cio
->cui_iov
[i
];
734 if (iv
->iov_len
< size
)
737 if (iv
->iov_len
> size
) {
738 cio
->cui_iov_olen
= iv
->iov_len
;
745 cio
->cui_nrsegs
= i
+ 1;
746 LASSERTF(cio
->cui_tot_nrsegs
>= cio
->cui_nrsegs
,
747 "tot_nrsegs: %lu, nrsegs: %lu\n",
748 cio
->cui_tot_nrsegs
, cio
->cui_nrsegs
);
751 int ccc_io_one_lock(const struct lu_env
*env
, struct cl_io
*io
,
752 __u32 enqflags
, enum cl_lock_mode mode
,
753 loff_t start
, loff_t end
)
755 struct cl_object
*obj
= io
->ci_obj
;
756 return ccc_io_one_lock_index(env
, io
, enqflags
, mode
,
757 cl_index(obj
, start
), cl_index(obj
, end
));
760 void ccc_io_end(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
762 CLOBINVRNT(env
, ios
->cis_io
->ci_obj
,
763 ccc_object_invariant(ios
->cis_io
->ci_obj
));
766 void ccc_io_advance(const struct lu_env
*env
,
767 const struct cl_io_slice
*ios
,
770 struct ccc_io
*cio
= cl2ccc_io(env
, ios
);
771 struct cl_io
*io
= ios
->cis_io
;
772 struct cl_object
*obj
= ios
->cis_io
->ci_obj
;
774 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
776 if (!cl_is_normalio(env
, io
))
779 LASSERT(cio
->cui_tot_nrsegs
>= cio
->cui_nrsegs
);
780 LASSERT(cio
->cui_tot_count
>= nob
);
782 cio
->cui_iov
+= cio
->cui_nrsegs
;
783 cio
->cui_tot_nrsegs
-= cio
->cui_nrsegs
;
784 cio
->cui_tot_count
-= nob
;
787 if (cio
->cui_iov_olen
> 0) {
791 cio
->cui_tot_nrsegs
++;
792 iv
= &cio
->cui_iov
[0];
793 if (io
->ci_continue
) {
794 iv
->iov_base
+= iv
->iov_len
;
795 LASSERT(cio
->cui_iov_olen
> iv
->iov_len
);
796 iv
->iov_len
= cio
->cui_iov_olen
- iv
->iov_len
;
798 /* restore the iov_len, in case of restart io. */
799 iv
->iov_len
= cio
->cui_iov_olen
;
801 cio
->cui_iov_olen
= 0;
806 * Helper function that if necessary adjusts file size (inode->i_size), when
807 * position at the offset \a pos is accessed. File size can be arbitrary stale
808 * on a Lustre client, but client at least knows KMS. If accessed area is
809 * inside [0, KMS], set file size to KMS, otherwise glimpse file size.
811 * Locking: cl_isize_lock is used to serialize changes to inode size and to
812 * protect consistency between inode size and cl_object
813 * attributes. cl_object_size_lock() protects consistency between cl_attr's of
814 * top-object and sub-objects.
816 int ccc_prep_size(const struct lu_env
*env
, struct cl_object
*obj
,
817 struct cl_io
*io
, loff_t start
, size_t count
, int *exceed
)
819 struct cl_attr
*attr
= ccc_env_thread_attr(env
);
820 struct inode
*inode
= ccc_object_inode(obj
);
821 loff_t pos
= start
+ count
- 1;
826 * Consistency guarantees: following possibilities exist for the
827 * relation between region being accessed and real file size at this
830 * (A): the region is completely inside of the file;
832 * (B-x): x bytes of region are inside of the file, the rest is
835 * (C): the region is completely outside of the file.
837 * This classification is stable under DLM lock already acquired by
838 * the caller, because to change the class, other client has to take
839 * DLM lock conflicting with our lock. Also, any updates to ->i_size
840 * by other threads on this client are serialized by
841 * ll_inode_size_lock(). This guarantees that short reads are handled
842 * correctly in the face of concurrent writes and truncates.
844 ccc_object_size_lock(obj
);
845 result
= cl_object_attr_get(env
, obj
, attr
);
850 * A glimpse is necessary to determine whether we
851 * return a short read (B) or some zeroes at the end
854 ccc_object_size_unlock(obj
);
855 result
= cl_glimpse_lock(env
, io
, inode
, obj
, 0);
856 if (result
== 0 && exceed
!= NULL
) {
857 /* If objective page index exceed end-of-file
858 * page index, return directly. Do not expect
859 * kernel will check such case correctly.
860 * linux-2.6.18-128.1.1 miss to do that.
862 loff_t size
= cl_isize_read(inode
);
863 unsigned long cur_index
= start
>> PAGE_CACHE_SHIFT
;
865 if ((size
== 0 && cur_index
!= 0) ||
866 (((size
- 1) >> PAGE_CACHE_SHIFT
) < cur_index
))
872 * region is within kms and, hence, within real file
873 * size (A). We need to increase i_size to cover the
874 * read region so that generic_file_read() will do its
875 * job, but that doesn't mean the kms size is
876 * _correct_, it is only the _minimum_ size. If
877 * someone does a stat they will get the correct size
878 * which will always be >= the kms value here.
881 if (cl_isize_read(inode
) < kms
) {
882 cl_isize_write_nolock(inode
, kms
);
884 DFID
" updating i_size "LPU64
"\n",
885 PFID(lu_object_fid(&obj
->co_lu
)),
886 (__u64
)cl_isize_read(inode
));
891 ccc_object_size_unlock(obj
);
895 /*****************************************************************************
897 * Transfer operations.
901 void ccc_req_completion(const struct lu_env
*env
,
902 const struct cl_req_slice
*slice
, int ioret
)
907 cl_stats_tally(slice
->crs_dev
, slice
->crs_req
->crq_type
, ioret
);
909 vrq
= cl2ccc_req(slice
);
910 OBD_SLAB_FREE_PTR(vrq
, ccc_req_kmem
);
914 * Implementation of struct cl_req_operations::cro_attr_set() for ccc
915 * layer. ccc is responsible for
933 void ccc_req_attr_set(const struct lu_env
*env
,
934 const struct cl_req_slice
*slice
,
935 const struct cl_object
*obj
,
936 struct cl_req_attr
*attr
, obd_valid flags
)
940 obd_flag valid_flags
;
943 inode
= ccc_object_inode(obj
);
944 valid_flags
= OBD_MD_FLTYPE
;
946 if ((flags
& OBD_MD_FLOSSCAPA
) != 0) {
947 LASSERT(attr
->cra_capa
== NULL
);
948 attr
->cra_capa
= cl_capa_lookup(inode
,
949 slice
->crs_req
->crq_type
);
952 if (slice
->crs_req
->crq_type
== CRT_WRITE
) {
953 if (flags
& OBD_MD_FLEPOCH
) {
954 oa
->o_valid
|= OBD_MD_FLEPOCH
;
955 oa
->o_ioepoch
= cl_i2info(inode
)->lli_ioepoch
;
956 valid_flags
|= OBD_MD_FLMTIME
| OBD_MD_FLCTIME
|
957 OBD_MD_FLUID
| OBD_MD_FLGID
;
960 obdo_from_inode(oa
, inode
, valid_flags
& flags
);
961 obdo_set_parent_fid(oa
, &cl_i2info(inode
)->lli_fid
);
962 memcpy(attr
->cra_jobid
, cl_i2info(inode
)->lli_jobid
,
963 JOBSTATS_JOBID_SIZE
);
966 const struct cl_req_operations ccc_req_ops
= {
967 .cro_attr_set
= ccc_req_attr_set
,
968 .cro_completion
= ccc_req_completion
971 int cl_setattr_ost(struct inode
*inode
, const struct iattr
*attr
,
972 struct obd_capa
*capa
)
979 env
= cl_env_get(&refcheck
);
983 io
= ccc_env_thread_io(env
);
984 io
->ci_obj
= cl_i2info(inode
)->lli_clob
;
986 io
->u
.ci_setattr
.sa_attr
.lvb_atime
= LTIME_S(attr
->ia_atime
);
987 io
->u
.ci_setattr
.sa_attr
.lvb_mtime
= LTIME_S(attr
->ia_mtime
);
988 io
->u
.ci_setattr
.sa_attr
.lvb_ctime
= LTIME_S(attr
->ia_ctime
);
989 io
->u
.ci_setattr
.sa_attr
.lvb_size
= attr
->ia_size
;
990 io
->u
.ci_setattr
.sa_valid
= attr
->ia_valid
;
991 io
->u
.ci_setattr
.sa_capa
= capa
;
994 if (cl_io_init(env
, io
, CIT_SETATTR
, io
->ci_obj
) == 0) {
995 struct ccc_io
*cio
= ccc_env_io(env
);
997 if (attr
->ia_valid
& ATTR_FILE
)
998 /* populate the file descriptor for ftruncate to honor
999 * group lock - see LU-787 */
1000 cio
->cui_fd
= cl_iattr2fd(inode
, attr
);
1002 result
= cl_io_loop(env
, io
);
1004 result
= io
->ci_result
;
1006 cl_io_fini(env
, io
);
1007 if (unlikely(io
->ci_need_restart
))
1009 /* HSM import case: file is released, cannot be restored
1010 * no need to fail except if restore registration failed
1012 if (result
== -ENODATA
&& io
->ci_restore_needed
&&
1013 io
->ci_result
!= -ENODATA
)
1015 cl_env_put(env
, &refcheck
);
1019 /*****************************************************************************
1025 struct lu_device
*ccc2lu_dev(struct ccc_device
*vdv
)
1027 return &vdv
->cdv_cl
.cd_lu_dev
;
1030 struct ccc_device
*lu2ccc_dev(const struct lu_device
*d
)
1032 return container_of0(d
, struct ccc_device
, cdv_cl
.cd_lu_dev
);
1035 struct ccc_device
*cl2ccc_dev(const struct cl_device
*d
)
1037 return container_of0(d
, struct ccc_device
, cdv_cl
);
1040 struct lu_object
*ccc2lu(struct ccc_object
*vob
)
1042 return &vob
->cob_cl
.co_lu
;
1045 struct ccc_object
*lu2ccc(const struct lu_object
*obj
)
1047 return container_of0(obj
, struct ccc_object
, cob_cl
.co_lu
);
1050 struct ccc_object
*cl2ccc(const struct cl_object
*obj
)
1052 return container_of0(obj
, struct ccc_object
, cob_cl
);
1055 struct ccc_lock
*cl2ccc_lock(const struct cl_lock_slice
*slice
)
1057 return container_of(slice
, struct ccc_lock
, clk_cl
);
1060 struct ccc_io
*cl2ccc_io(const struct lu_env
*env
,
1061 const struct cl_io_slice
*slice
)
1065 cio
= container_of(slice
, struct ccc_io
, cui_cl
);
1066 LASSERT(cio
== ccc_env_io(env
));
1070 struct ccc_req
*cl2ccc_req(const struct cl_req_slice
*slice
)
1072 return container_of0(slice
, struct ccc_req
, crq_cl
);
1075 struct page
*cl2vm_page(const struct cl_page_slice
*slice
)
1077 return cl2ccc_page(slice
)->cpg_page
;
1080 /*****************************************************************************
1085 int ccc_object_invariant(const struct cl_object
*obj
)
1087 struct inode
*inode
= ccc_object_inode(obj
);
1088 struct cl_inode_info
*lli
= cl_i2info(inode
);
1090 return (S_ISREG(cl_inode_mode(inode
)) ||
1091 /* i_mode of unlinked inode is zeroed. */
1092 cl_inode_mode(inode
) == 0) && lli
->lli_clob
== obj
;
1095 struct inode
*ccc_object_inode(const struct cl_object
*obj
)
1097 return cl2ccc(obj
)->cob_inode
;
1101 * Returns a pointer to cl_page associated with \a vmpage, without acquiring
1102 * additional reference to the resulting page. This is an unsafe version of
1103 * cl_vmpage_page() that can only be used under vmpage lock.
1105 struct cl_page
*ccc_vmpage_page_transient(struct page
*vmpage
)
1107 KLASSERT(PageLocked(vmpage
));
1108 return (struct cl_page
*)vmpage
->private;
1112 * Initialize or update CLIO structures for regular files when new
1113 * meta-data arrives from the server.
1115 * \param inode regular file inode
1116 * \param md new file metadata from MDS
1117 * - allocates cl_object if necessary,
1118 * - updated layout, if object was already here.
1120 int cl_file_inode_init(struct inode
*inode
, struct lustre_md
*md
)
1123 struct cl_inode_info
*lli
;
1124 struct cl_object
*clob
;
1125 struct lu_site
*site
;
1127 struct cl_object_conf conf
= {
1136 LASSERT(md
->body
->valid
& OBD_MD_FLID
);
1137 LASSERT(S_ISREG(cl_inode_mode(inode
)));
1139 env
= cl_env_get(&refcheck
);
1141 return PTR_ERR(env
);
1143 site
= cl_i2sbi(inode
)->ll_site
;
1144 lli
= cl_i2info(inode
);
1145 fid
= &lli
->lli_fid
;
1146 LASSERT(fid_is_sane(fid
));
1148 if (lli
->lli_clob
== NULL
) {
1149 /* clob is slave of inode, empty lli_clob means for new inode,
1150 * there is no clob in cache with the given fid, so it is
1151 * unnecessary to perform lookup-alloc-lookup-insert, just
1152 * alloc and insert directly. */
1153 LASSERT(inode
->i_state
& I_NEW
);
1154 conf
.coc_lu
.loc_flags
= LOC_F_NEW
;
1155 clob
= cl_object_find(env
, lu2cl_dev(site
->ls_top_dev
),
1157 if (!IS_ERR(clob
)) {
1159 * No locking is necessary, as new inode is
1160 * locked by I_NEW bit.
1162 lli
->lli_clob
= clob
;
1163 lli
->lli_has_smd
= lsm_has_objects(md
->lsm
);
1164 lu_object_ref_add(&clob
->co_lu
, "inode", inode
);
1166 result
= PTR_ERR(clob
);
1168 result
= cl_conf_set(env
, lli
->lli_clob
, &conf
);
1171 cl_env_put(env
, &refcheck
);
1174 CERROR("Failure to initialize cl object "DFID
": %d\n",
1180 * Wait for others drop their references of the object at first, then we drop
1181 * the last one, which will lead to the object be destroyed immediately.
1182 * Must be called after cl_object_kill() against this object.
1184 * The reason we want to do this is: destroying top object will wait for sub
1185 * objects being destroyed first, so we can't let bottom layer (e.g. from ASTs)
1186 * to initiate top object destroying which may deadlock. See bz22520.
1188 static void cl_object_put_last(struct lu_env
*env
, struct cl_object
*obj
)
1190 struct lu_object_header
*header
= obj
->co_lu
.lo_header
;
1191 wait_queue_t waiter
;
1193 if (unlikely(atomic_read(&header
->loh_ref
) != 1)) {
1194 struct lu_site
*site
= obj
->co_lu
.lo_dev
->ld_site
;
1195 struct lu_site_bkt_data
*bkt
;
1197 bkt
= lu_site_bkt_from_fid(site
, &header
->loh_fid
);
1199 init_waitqueue_entry_current(&waiter
);
1200 add_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1203 set_current_state(TASK_UNINTERRUPTIBLE
);
1204 if (atomic_read(&header
->loh_ref
) == 1)
1206 waitq_wait(&waiter
, TASK_UNINTERRUPTIBLE
);
1209 set_current_state(TASK_RUNNING
);
1210 remove_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1213 cl_object_put(env
, obj
);
1216 void cl_inode_fini(struct inode
*inode
)
1219 struct cl_inode_info
*lli
= cl_i2info(inode
);
1220 struct cl_object
*clob
= lli
->lli_clob
;
1227 cookie
= cl_env_reenter();
1228 env
= cl_env_get(&refcheck
);
1229 emergency
= IS_ERR(env
);
1231 mutex_lock(&ccc_inode_fini_guard
);
1232 LASSERT(ccc_inode_fini_env
!= NULL
);
1233 cl_env_implant(ccc_inode_fini_env
, &refcheck
);
1234 env
= ccc_inode_fini_env
;
1237 * cl_object cache is a slave to inode cache (which, in turn
1238 * is a slave to dentry cache), don't keep cl_object in memory
1239 * when its master is evicted.
1241 cl_object_kill(env
, clob
);
1242 lu_object_ref_del(&clob
->co_lu
, "inode", inode
);
1243 cl_object_put_last(env
, clob
);
1244 lli
->lli_clob
= NULL
;
1246 cl_env_unplant(ccc_inode_fini_env
, &refcheck
);
1247 mutex_unlock(&ccc_inode_fini_guard
);
1249 cl_env_put(env
, &refcheck
);
1250 cl_env_reexit(cookie
);
1255 * return IF_* type for given lu_dirent entry.
1256 * IF_* flag shld be converted to particular OS file type in
1257 * platform llite module.
1259 __u16
ll_dirent_type_get(struct lu_dirent
*ent
)
1262 struct luda_type
*lt
;
1265 if (le32_to_cpu(ent
->lde_attrs
) & LUDA_TYPE
) {
1266 const unsigned align
= sizeof(struct luda_type
) - 1;
1268 len
= le16_to_cpu(ent
->lde_namelen
);
1269 len
= (len
+ align
) & ~align
;
1270 lt
= (void *)ent
->lde_name
+ len
;
1271 type
= IFTODT(le16_to_cpu(lt
->lt_type
));
1277 * build inode number from passed @fid */
1278 __u64
cl_fid_build_ino(const struct lu_fid
*fid
, int api32
)
1280 if (BITS_PER_LONG
== 32 || api32
)
1281 return fid_flatten32(fid
);
1283 return fid_flatten(fid
);
1287 * build inode generation from passed @fid. If our FID overflows the 32-bit
1288 * inode number then return a non-zero generation to distinguish them. */
1289 __u32
cl_fid_build_gen(const struct lu_fid
*fid
)
1293 if (fid_is_igif(fid
)) {
1294 gen
= lu_igif_gen(fid
);
1298 gen
= (fid_flatten(fid
) >> 32);
1302 /* lsm is unreliable after hsm implementation as layout can be changed at
1303 * any time. This is only to support old, non-clio-ized interfaces. It will
1304 * cause deadlock if clio operations are called with this extra layout refcount
1305 * because in case the layout changed during the IO, ll_layout_refresh() will
1306 * have to wait for the refcount to become zero to destroy the older layout.
1308 * Notice that the lsm returned by this function may not be valid unless called
1309 * inside layout lock - MDS_INODELOCK_LAYOUT. */
1310 struct lov_stripe_md
*ccc_inode_lsm_get(struct inode
*inode
)
1312 return lov_lsm_get(cl_i2info(inode
)->lli_clob
);
1315 void inline ccc_inode_lsm_put(struct inode
*inode
, struct lov_stripe_md
*lsm
)
1317 lov_lsm_put(cl_i2info(inode
)->lli_clob
, lsm
);