d06129e8d96596bd1891e3d8079450ac5cae7b28
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 #include <linux/sched.h>
40 #include <linux/highmem.h>
41 #include <linux/pagemap.h>
43 #define DEBUG_SUBSYSTEM S_LLITE
45 #include "../include/obd_support.h"
46 #include "../include/lustre_lite.h"
47 #include "../include/lustre_dlm.h"
48 #include "llite_internal.h"
50 #define SA_OMITTED_ENTRY_MAX 8ULL
53 /** negative values are for error cases */
54 SA_ENTRY_INIT
= 0, /** init entry */
55 SA_ENTRY_SUCC
= 1, /** stat succeed */
56 SA_ENTRY_INVA
= 2, /** invalid entry */
57 SA_ENTRY_DEST
= 3, /** entry to be destroyed */
61 /* link into sai->sai_entries */
62 struct list_head se_link
;
63 /* link into sai->sai_entries_{received,stated} */
64 struct list_head se_list
;
65 /* link into sai hash table locally */
66 struct list_head se_hash
;
67 /* entry reference count */
69 /* entry index in the sai */
71 /* low layer ldlm lock handle */
75 /* entry size, contains name */
77 /* pointer to async getattr enqueue info */
78 struct md_enqueue_info
*se_minfo
;
79 /* pointer to the async getattr request */
80 struct ptlrpc_request
*se_req
;
81 /* pointer to the target inode */
82 struct inode
*se_inode
;
87 static unsigned int sai_generation
;
88 static DEFINE_SPINLOCK(sai_generation_lock
);
91 * The entry only can be released by the caller, it is necessary to hold lock.
93 static inline int ll_sa_entry_stated(struct ll_sa_entry
*entry
)
96 return (entry
->se_stat
!= SA_ENTRY_INIT
);
99 static inline int ll_sa_entry_hash(int val
)
101 return val
& LL_SA_CACHE_MASK
;
105 * Insert entry to hash SA table.
108 ll_sa_entry_enhash(struct ll_statahead_info
*sai
, struct ll_sa_entry
*entry
)
110 int i
= ll_sa_entry_hash(entry
->se_qstr
.hash
);
112 spin_lock(&sai
->sai_cache_lock
[i
]);
113 list_add_tail(&entry
->se_hash
, &sai
->sai_cache
[i
]);
114 spin_unlock(&sai
->sai_cache_lock
[i
]);
118 * Remove entry from SA table.
121 ll_sa_entry_unhash(struct ll_statahead_info
*sai
, struct ll_sa_entry
*entry
)
123 int i
= ll_sa_entry_hash(entry
->se_qstr
.hash
);
125 spin_lock(&sai
->sai_cache_lock
[i
]);
126 list_del_init(&entry
->se_hash
);
127 spin_unlock(&sai
->sai_cache_lock
[i
]);
130 static inline int agl_should_run(struct ll_statahead_info
*sai
,
133 return (inode
&& S_ISREG(inode
->i_mode
) && sai
->sai_agl_valid
);
136 static inline int sa_sent_full(struct ll_statahead_info
*sai
)
138 return atomic_read(&sai
->sai_cache_count
) >= sai
->sai_max
;
141 static inline int sa_received_empty(struct ll_statahead_info
*sai
)
143 return list_empty(&sai
->sai_entries_received
);
146 static inline int agl_list_empty(struct ll_statahead_info
*sai
)
148 return list_empty(&sai
->sai_entries_agl
);
152 * (1) hit ratio less than 80%
154 * (2) consecutive miss more than 8
155 * then means low hit.
157 static inline int sa_low_hit(struct ll_statahead_info
*sai
)
159 return ((sai
->sai_hit
> 7 && sai
->sai_hit
< 4 * sai
->sai_miss
) ||
160 (sai
->sai_consecutive_miss
> 8));
164 * If the given index is behind of statahead window more than
165 * SA_OMITTED_ENTRY_MAX, then it is old.
167 static inline int is_omitted_entry(struct ll_statahead_info
*sai
, __u64 index
)
169 return ((__u64
)sai
->sai_max
+ index
+ SA_OMITTED_ENTRY_MAX
<
174 * Insert it into sai_entries tail when init.
176 static struct ll_sa_entry
*
177 ll_sa_entry_alloc(struct ll_statahead_info
*sai
, __u64 index
,
178 const char *name
, int len
)
180 struct ll_inode_info
*lli
;
181 struct ll_sa_entry
*entry
;
185 entry_size
= sizeof(struct ll_sa_entry
) + (len
& ~3) + 4;
186 entry
= kzalloc(entry_size
, GFP_NOFS
);
187 if (unlikely(!entry
))
188 return ERR_PTR(-ENOMEM
);
190 CDEBUG(D_READA
, "alloc sa entry %.*s(%p) index %llu\n",
191 len
, name
, entry
, index
);
193 entry
->se_index
= index
;
196 * Statahead entry reference rules:
198 * 1) When statahead entry is initialized, its reference is set as 2.
199 * One reference is used by the directory scanner. When the scanner
200 * searches the statahead cache for the given name, it can perform
201 * lockless hash lookup (only the scanner can remove entry from hash
202 * list), and once found, it needn't to call "atomic_inc()" for the
203 * entry reference. So the performance is improved. After using the
204 * statahead entry, the scanner will call "atomic_dec()" to drop the
205 * reference held when initialization. If it is the last reference,
206 * the statahead entry will be freed.
208 * 2) All other threads, including statahead thread and ptlrpcd thread,
209 * when they process the statahead entry, the reference for target
210 * should be held to guarantee the entry will not be released by the
211 * directory scanner. After processing the entry, these threads will
212 * drop the entry reference. If it is the last reference, the entry
215 * The second reference when initializes the statahead entry is used
216 * by the statahead thread, following the rule 2).
218 atomic_set(&entry
->se_refcount
, 2);
219 entry
->se_stat
= SA_ENTRY_INIT
;
220 entry
->se_size
= entry_size
;
221 dname
= (char *)entry
+ sizeof(struct ll_sa_entry
);
222 memcpy(dname
, name
, len
);
224 entry
->se_qstr
.hash
= full_name_hash(name
, len
);
225 entry
->se_qstr
.len
= len
;
226 entry
->se_qstr
.name
= dname
;
228 lli
= ll_i2info(sai
->sai_inode
);
229 spin_lock(&lli
->lli_sa_lock
);
230 list_add_tail(&entry
->se_link
, &sai
->sai_entries
);
231 INIT_LIST_HEAD(&entry
->se_list
);
232 ll_sa_entry_enhash(sai
, entry
);
233 spin_unlock(&lli
->lli_sa_lock
);
235 atomic_inc(&sai
->sai_cache_count
);
241 * Used by the directory scanner to search entry with name.
243 * Only the caller can remove the entry from hash, so it is unnecessary to hold
244 * hash lock. It is caller's duty to release the init refcount on the entry, so
245 * it is also unnecessary to increase refcount on the entry.
247 static struct ll_sa_entry
*
248 ll_sa_entry_get_byname(struct ll_statahead_info
*sai
, const struct qstr
*qstr
)
250 struct ll_sa_entry
*entry
;
251 int i
= ll_sa_entry_hash(qstr
->hash
);
253 list_for_each_entry(entry
, &sai
->sai_cache
[i
], se_hash
) {
254 if (entry
->se_qstr
.hash
== qstr
->hash
&&
255 entry
->se_qstr
.len
== qstr
->len
&&
256 memcmp(entry
->se_qstr
.name
, qstr
->name
, qstr
->len
) == 0)
263 * Used by the async getattr request callback to find entry with index.
265 * Inside lli_sa_lock to prevent others to change the list during the search.
266 * It needs to increase entry refcount before returning to guarantee that the
267 * entry cannot be freed by others.
269 static struct ll_sa_entry
*
270 ll_sa_entry_get_byindex(struct ll_statahead_info
*sai
, __u64 index
)
272 struct ll_sa_entry
*entry
;
274 list_for_each_entry(entry
, &sai
->sai_entries
, se_link
) {
275 if (entry
->se_index
== index
) {
276 LASSERT(atomic_read(&entry
->se_refcount
) > 0);
277 atomic_inc(&entry
->se_refcount
);
280 if (entry
->se_index
> index
)
286 static void ll_sa_entry_cleanup(struct ll_statahead_info
*sai
,
287 struct ll_sa_entry
*entry
)
289 struct md_enqueue_info
*minfo
= entry
->se_minfo
;
290 struct ptlrpc_request
*req
= entry
->se_req
;
293 entry
->se_minfo
= NULL
;
294 ll_intent_release(&minfo
->mi_it
);
300 entry
->se_req
= NULL
;
301 ptlrpc_req_finished(req
);
305 static void ll_sa_entry_put(struct ll_statahead_info
*sai
,
306 struct ll_sa_entry
*entry
)
308 if (atomic_dec_and_test(&entry
->se_refcount
)) {
309 CDEBUG(D_READA
, "free sa entry %.*s(%p) index %llu\n",
310 entry
->se_qstr
.len
, entry
->se_qstr
.name
, entry
,
313 LASSERT(list_empty(&entry
->se_link
));
314 LASSERT(list_empty(&entry
->se_list
));
315 LASSERT(list_empty(&entry
->se_hash
));
317 ll_sa_entry_cleanup(sai
, entry
);
318 iput(entry
->se_inode
);
321 atomic_dec(&sai
->sai_cache_count
);
326 do_sa_entry_fini(struct ll_statahead_info
*sai
, struct ll_sa_entry
*entry
)
328 struct ll_inode_info
*lli
= ll_i2info(sai
->sai_inode
);
330 LASSERT(!list_empty(&entry
->se_hash
));
331 LASSERT(!list_empty(&entry
->se_link
));
333 ll_sa_entry_unhash(sai
, entry
);
335 spin_lock(&lli
->lli_sa_lock
);
336 entry
->se_stat
= SA_ENTRY_DEST
;
337 list_del_init(&entry
->se_link
);
338 if (likely(!list_empty(&entry
->se_list
)))
339 list_del_init(&entry
->se_list
);
340 spin_unlock(&lli
->lli_sa_lock
);
342 ll_sa_entry_put(sai
, entry
);
346 * Delete it from sai_entries_stated list when fini.
349 ll_sa_entry_fini(struct ll_statahead_info
*sai
, struct ll_sa_entry
*entry
)
351 struct ll_sa_entry
*pos
, *next
;
354 do_sa_entry_fini(sai
, entry
);
356 /* drop old entry, only 'scanner' process does this, no need to lock */
357 list_for_each_entry_safe(pos
, next
, &sai
->sai_entries
, se_link
) {
358 if (!is_omitted_entry(sai
, pos
->se_index
))
360 do_sa_entry_fini(sai
, pos
);
365 * Inside lli_sa_lock.
368 do_sa_entry_to_stated(struct ll_statahead_info
*sai
,
369 struct ll_sa_entry
*entry
, enum se_stat stat
)
371 struct ll_sa_entry
*se
;
372 struct list_head
*pos
= &sai
->sai_entries_stated
;
374 if (!list_empty(&entry
->se_list
))
375 list_del_init(&entry
->se_list
);
377 list_for_each_entry_reverse(se
, &sai
->sai_entries_stated
, se_list
) {
378 if (se
->se_index
< entry
->se_index
) {
384 list_add(&entry
->se_list
, pos
);
385 entry
->se_stat
= stat
;
389 * Move entry to sai_entries_stated and sort with the index.
390 * \retval 1 -- entry to be destroyed.
391 * \retval 0 -- entry is inserted into stated list.
394 ll_sa_entry_to_stated(struct ll_statahead_info
*sai
,
395 struct ll_sa_entry
*entry
, enum se_stat stat
)
397 struct ll_inode_info
*lli
= ll_i2info(sai
->sai_inode
);
400 ll_sa_entry_cleanup(sai
, entry
);
402 spin_lock(&lli
->lli_sa_lock
);
403 if (likely(entry
->se_stat
!= SA_ENTRY_DEST
)) {
404 do_sa_entry_to_stated(sai
, entry
, stat
);
407 spin_unlock(&lli
->lli_sa_lock
);
413 * Insert inode into the list of sai_entries_agl.
415 static void ll_agl_add(struct ll_statahead_info
*sai
,
416 struct inode
*inode
, int index
)
418 struct ll_inode_info
*child
= ll_i2info(inode
);
419 struct ll_inode_info
*parent
= ll_i2info(sai
->sai_inode
);
422 spin_lock(&child
->lli_agl_lock
);
423 if (child
->lli_agl_index
== 0) {
424 child
->lli_agl_index
= index
;
425 spin_unlock(&child
->lli_agl_lock
);
427 LASSERT(list_empty(&child
->lli_agl_list
));
430 spin_lock(&parent
->lli_agl_lock
);
431 if (list_empty(&sai
->sai_entries_agl
))
433 list_add_tail(&child
->lli_agl_list
, &sai
->sai_entries_agl
);
434 spin_unlock(&parent
->lli_agl_lock
);
436 spin_unlock(&child
->lli_agl_lock
);
440 wake_up(&sai
->sai_agl_thread
.t_ctl_waitq
);
443 static struct ll_statahead_info
*ll_sai_alloc(void)
445 struct ll_statahead_info
*sai
;
448 sai
= kzalloc(sizeof(*sai
), GFP_NOFS
);
452 atomic_set(&sai
->sai_refcount
, 1);
454 spin_lock(&sai_generation_lock
);
455 sai
->sai_generation
= ++sai_generation
;
456 if (unlikely(sai_generation
== 0))
457 sai
->sai_generation
= ++sai_generation
;
458 spin_unlock(&sai_generation_lock
);
460 sai
->sai_max
= LL_SA_RPC_MIN
;
462 init_waitqueue_head(&sai
->sai_waitq
);
463 init_waitqueue_head(&sai
->sai_thread
.t_ctl_waitq
);
464 init_waitqueue_head(&sai
->sai_agl_thread
.t_ctl_waitq
);
466 INIT_LIST_HEAD(&sai
->sai_entries
);
467 INIT_LIST_HEAD(&sai
->sai_entries_received
);
468 INIT_LIST_HEAD(&sai
->sai_entries_stated
);
469 INIT_LIST_HEAD(&sai
->sai_entries_agl
);
471 for (i
= 0; i
< LL_SA_CACHE_SIZE
; i
++) {
472 INIT_LIST_HEAD(&sai
->sai_cache
[i
]);
473 spin_lock_init(&sai
->sai_cache_lock
[i
]);
475 atomic_set(&sai
->sai_cache_count
, 0);
480 static inline struct ll_statahead_info
*
481 ll_sai_get(struct ll_statahead_info
*sai
)
483 atomic_inc(&sai
->sai_refcount
);
487 static void ll_sai_put(struct ll_statahead_info
*sai
)
489 struct inode
*inode
= sai
->sai_inode
;
490 struct ll_inode_info
*lli
= ll_i2info(inode
);
492 if (atomic_dec_and_lock(&sai
->sai_refcount
, &lli
->lli_sa_lock
)) {
493 struct ll_sa_entry
*entry
, *next
;
495 if (unlikely(atomic_read(&sai
->sai_refcount
) > 0)) {
496 /* It is race case, the interpret callback just hold
499 spin_unlock(&lli
->lli_sa_lock
);
503 LASSERT(!lli
->lli_opendir_key
);
504 LASSERT(thread_is_stopped(&sai
->sai_thread
));
505 LASSERT(thread_is_stopped(&sai
->sai_agl_thread
));
508 lli
->lli_opendir_pid
= 0;
509 spin_unlock(&lli
->lli_sa_lock
);
511 if (sai
->sai_sent
> sai
->sai_replied
)
512 CDEBUG(D_READA
, "statahead for dir "DFID
513 " does not finish: [sent:%llu] [replied:%llu]\n",
515 sai
->sai_sent
, sai
->sai_replied
);
517 list_for_each_entry_safe(entry
, next
, &sai
->sai_entries
,
519 do_sa_entry_fini(sai
, entry
);
521 LASSERT(list_empty(&sai
->sai_entries
));
522 LASSERT(list_empty(&sai
->sai_entries_received
));
523 LASSERT(list_empty(&sai
->sai_entries_stated
));
525 LASSERT(atomic_read(&sai
->sai_cache_count
) == 0);
526 LASSERT(list_empty(&sai
->sai_entries_agl
));
533 /* Do NOT forget to drop inode refcount when into sai_entries_agl. */
534 static void ll_agl_trigger(struct inode
*inode
, struct ll_statahead_info
*sai
)
536 struct ll_inode_info
*lli
= ll_i2info(inode
);
537 __u64 index
= lli
->lli_agl_index
;
540 LASSERT(list_empty(&lli
->lli_agl_list
));
542 /* AGL maybe fall behind statahead with one entry */
543 if (is_omitted_entry(sai
, index
+ 1)) {
544 lli
->lli_agl_index
= 0;
549 /* Someone is in glimpse (sync or async), do nothing. */
550 rc
= down_write_trylock(&lli
->lli_glimpse_sem
);
552 lli
->lli_agl_index
= 0;
558 * Someone triggered glimpse within 1 sec before.
559 * 1) The former glimpse succeeded with glimpse lock granted by OST, and
560 * if the lock is still cached on client, AGL needs to do nothing. If
561 * it is cancelled by other client, AGL maybe cannot obtain new lock
562 * for no glimpse callback triggered by AGL.
563 * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
564 * Under such case, it is quite possible that the OST will not grant
565 * glimpse lock for AGL also.
566 * 3) The former glimpse failed, compared with other two cases, it is
567 * relative rare. AGL can ignore such case, and it will not muchly
568 * affect the performance.
570 if (lli
->lli_glimpse_time
!= 0 &&
571 time_before(cfs_time_shift(-1), lli
->lli_glimpse_time
)) {
572 up_write(&lli
->lli_glimpse_sem
);
573 lli
->lli_agl_index
= 0;
578 CDEBUG(D_READA
, "Handling (init) async glimpse: inode = "
579 DFID
", idx = %llu\n", PFID(&lli
->lli_fid
), index
);
582 lli
->lli_agl_index
= 0;
583 lli
->lli_glimpse_time
= cfs_time_current();
584 up_write(&lli
->lli_glimpse_sem
);
586 CDEBUG(D_READA
, "Handled (init) async glimpse: inode= "
587 DFID
", idx = %llu, rc = %d\n",
588 PFID(&lli
->lli_fid
), index
, rc
);
593 static void ll_post_statahead(struct ll_statahead_info
*sai
)
595 struct inode
*dir
= sai
->sai_inode
;
597 struct ll_inode_info
*lli
= ll_i2info(dir
);
598 struct ll_sa_entry
*entry
;
599 struct md_enqueue_info
*minfo
;
600 struct lookup_intent
*it
;
601 struct ptlrpc_request
*req
;
602 struct mdt_body
*body
;
605 spin_lock(&lli
->lli_sa_lock
);
606 if (unlikely(list_empty(&sai
->sai_entries_received
))) {
607 spin_unlock(&lli
->lli_sa_lock
);
610 entry
= list_entry(sai
->sai_entries_received
.next
,
611 struct ll_sa_entry
, se_list
);
612 atomic_inc(&entry
->se_refcount
);
613 list_del_init(&entry
->se_list
);
614 spin_unlock(&lli
->lli_sa_lock
);
616 LASSERT(entry
->se_handle
!= 0);
618 minfo
= entry
->se_minfo
;
621 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_MDT_BODY
);
627 child
= entry
->se_inode
;
632 LASSERT(fid_is_zero(&minfo
->mi_data
.op_fid2
));
634 /* XXX: No fid in reply, this is probably cross-ref case.
635 * SA can't handle it yet.
637 if (body
->valid
& OBD_MD_MDS
) {
645 /* unlinked and re-created with the same name */
646 if (unlikely(!lu_fid_eq(&minfo
->mi_data
.op_fid2
, &body
->fid1
))) {
647 entry
->se_inode
= NULL
;
653 it
->d
.lustre
.it_lock_handle
= entry
->se_handle
;
654 rc
= md_revalidate_lock(ll_i2mdexp(dir
), it
, ll_inode2fid(dir
), NULL
);
660 rc
= ll_prep_inode(&child
, req
, dir
->i_sb
, it
);
664 CDEBUG(D_DLMTRACE
, "%s: setting l_data to inode "DFID
"%p\n",
665 ll_get_fsname(child
->i_sb
, NULL
, 0),
666 PFID(ll_inode2fid(child
)), child
);
667 ll_set_lock_data(ll_i2sbi(dir
)->ll_md_exp
, child
, it
, NULL
);
669 entry
->se_inode
= child
;
671 if (agl_should_run(sai
, child
))
672 ll_agl_add(sai
, child
, entry
->se_index
);
675 /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock
676 * reference count by calling "ll_intent_drop_lock()" in spite of the
677 * above operations failed or not. Do not worry about calling
678 * "ll_intent_drop_lock()" more than once.
680 rc
= ll_sa_entry_to_stated(sai
, entry
,
681 rc
< 0 ? SA_ENTRY_INVA
: SA_ENTRY_SUCC
);
682 if (rc
== 0 && entry
->se_index
== sai
->sai_index_wait
)
683 wake_up(&sai
->sai_waitq
);
684 ll_sa_entry_put(sai
, entry
);
687 static int ll_statahead_interpret(struct ptlrpc_request
*req
,
688 struct md_enqueue_info
*minfo
, int rc
)
690 struct lookup_intent
*it
= &minfo
->mi_it
;
691 struct inode
*dir
= minfo
->mi_dir
;
692 struct ll_inode_info
*lli
= ll_i2info(dir
);
693 struct ll_statahead_info
*sai
= NULL
;
694 struct ll_sa_entry
*entry
;
698 if (it_disposition(it
, DISP_LOOKUP_NEG
))
702 /* release ibits lock ASAP to avoid deadlock when statahead
703 * thread enqueues lock on parent in readdir and another
704 * process enqueues lock on child with parent lock held, eg.
707 handle
= it
->d
.lustre
.it_lock_handle
;
708 ll_intent_drop_lock(it
);
711 spin_lock(&lli
->lli_sa_lock
);
713 if (unlikely(!lli
->lli_sai
||
714 lli
->lli_sai
->sai_generation
!= minfo
->mi_generation
)) {
715 spin_unlock(&lli
->lli_sa_lock
);
719 sai
= ll_sai_get(lli
->lli_sai
);
720 if (unlikely(!thread_is_running(&sai
->sai_thread
))) {
722 spin_unlock(&lli
->lli_sa_lock
);
727 entry
= ll_sa_entry_get_byindex(sai
, minfo
->mi_cbdata
);
730 spin_unlock(&lli
->lli_sa_lock
);
736 do_sa_entry_to_stated(sai
, entry
, SA_ENTRY_INVA
);
737 wakeup
= (entry
->se_index
== sai
->sai_index_wait
);
739 entry
->se_minfo
= minfo
;
740 entry
->se_req
= ptlrpc_request_addref(req
);
741 /* Release the async ibits lock ASAP to avoid deadlock
742 * when statahead thread tries to enqueue lock on parent
743 * for readpage and other tries to enqueue lock on child
744 * with parent's lock held, for example: unlink.
746 entry
->se_handle
= handle
;
747 wakeup
= list_empty(&sai
->sai_entries_received
);
748 list_add_tail(&entry
->se_list
,
749 &sai
->sai_entries_received
);
752 spin_unlock(&lli
->lli_sa_lock
);
754 ll_sa_entry_put(sai
, entry
);
756 wake_up(&sai
->sai_thread
.t_ctl_waitq
);
761 ll_intent_release(it
);
770 static void sa_args_fini(struct md_enqueue_info
*minfo
,
771 struct ldlm_enqueue_info
*einfo
)
773 LASSERT(minfo
&& einfo
);
780 * prepare arguments for async stat RPC.
782 static int sa_args_init(struct inode
*dir
, struct inode
*child
,
783 struct ll_sa_entry
*entry
, struct md_enqueue_info
**pmi
,
784 struct ldlm_enqueue_info
**pei
)
786 const struct qstr
*qstr
= &entry
->se_qstr
;
787 struct ll_inode_info
*lli
= ll_i2info(dir
);
788 struct md_enqueue_info
*minfo
;
789 struct ldlm_enqueue_info
*einfo
;
790 struct md_op_data
*op_data
;
792 einfo
= kzalloc(sizeof(*einfo
), GFP_NOFS
);
796 minfo
= kzalloc(sizeof(*minfo
), GFP_NOFS
);
802 op_data
= ll_prep_md_op_data(&minfo
->mi_data
, dir
, child
, qstr
->name
,
803 qstr
->len
, 0, LUSTRE_OPC_ANY
, NULL
);
804 if (IS_ERR(op_data
)) {
807 return PTR_ERR(op_data
);
810 minfo
->mi_it
.it_op
= IT_GETATTR
;
811 minfo
->mi_dir
= igrab(dir
);
812 minfo
->mi_cb
= ll_statahead_interpret
;
813 minfo
->mi_generation
= lli
->lli_sai
->sai_generation
;
814 minfo
->mi_cbdata
= entry
->se_index
;
816 einfo
->ei_type
= LDLM_IBITS
;
817 einfo
->ei_mode
= it_to_lock_mode(&minfo
->mi_it
);
818 einfo
->ei_cb_bl
= ll_md_blocking_ast
;
819 einfo
->ei_cb_cp
= ldlm_completion_ast
;
820 einfo
->ei_cb_gl
= NULL
;
821 einfo
->ei_cbdata
= NULL
;
829 static int do_sa_lookup(struct inode
*dir
, struct ll_sa_entry
*entry
)
831 struct md_enqueue_info
*minfo
;
832 struct ldlm_enqueue_info
*einfo
;
835 rc
= sa_args_init(dir
, NULL
, entry
, &minfo
, &einfo
);
839 rc
= md_intent_getattr_async(ll_i2mdexp(dir
), minfo
, einfo
);
841 sa_args_fini(minfo
, einfo
);
847 * similar to ll_revalidate_it().
848 * \retval 1 -- dentry valid
849 * \retval 0 -- will send stat-ahead request
850 * \retval others -- prepare stat-ahead request failed
852 static int do_sa_revalidate(struct inode
*dir
, struct ll_sa_entry
*entry
,
853 struct dentry
*dentry
)
855 struct inode
*inode
= d_inode(dentry
);
856 struct lookup_intent it
= { .it_op
= IT_GETATTR
,
857 .d
.lustre
.it_lock_handle
= 0 };
858 struct md_enqueue_info
*minfo
;
859 struct ldlm_enqueue_info
*einfo
;
862 if (unlikely(!inode
))
865 if (d_mountpoint(dentry
))
868 entry
->se_inode
= igrab(inode
);
869 rc
= md_revalidate_lock(ll_i2mdexp(dir
), &it
, ll_inode2fid(inode
),
872 entry
->se_handle
= it
.d
.lustre
.it_lock_handle
;
873 ll_intent_release(&it
);
877 rc
= sa_args_init(dir
, inode
, entry
, &minfo
, &einfo
);
879 entry
->se_inode
= NULL
;
884 rc
= md_intent_getattr_async(ll_i2mdexp(dir
), minfo
, einfo
);
886 entry
->se_inode
= NULL
;
888 sa_args_fini(minfo
, einfo
);
894 static void ll_statahead_one(struct dentry
*parent
, const char *entry_name
,
897 struct inode
*dir
= d_inode(parent
);
898 struct ll_inode_info
*lli
= ll_i2info(dir
);
899 struct ll_statahead_info
*sai
= lli
->lli_sai
;
900 struct dentry
*dentry
= NULL
;
901 struct ll_sa_entry
*entry
;
905 entry
= ll_sa_entry_alloc(sai
, sai
->sai_index
, entry_name
,
910 dentry
= d_lookup(parent
, &entry
->se_qstr
);
912 rc
= do_sa_lookup(dir
, entry
);
914 rc
= do_sa_revalidate(dir
, entry
, dentry
);
915 if (rc
== 1 && agl_should_run(sai
, d_inode(dentry
)))
916 ll_agl_add(sai
, d_inode(dentry
), entry
->se_index
);
922 rc1
= ll_sa_entry_to_stated(sai
, entry
,
923 rc
< 0 ? SA_ENTRY_INVA
: SA_ENTRY_SUCC
);
924 if (rc1
== 0 && entry
->se_index
== sai
->sai_index_wait
)
925 wake_up(&sai
->sai_waitq
);
931 /* drop one refcount on entry by ll_sa_entry_alloc */
932 ll_sa_entry_put(sai
, entry
);
935 static int ll_agl_thread(void *arg
)
937 struct dentry
*parent
= arg
;
938 struct inode
*dir
= d_inode(parent
);
939 struct ll_inode_info
*plli
= ll_i2info(dir
);
940 struct ll_inode_info
*clli
;
941 struct ll_sb_info
*sbi
= ll_i2sbi(dir
);
942 struct ll_statahead_info
*sai
= ll_sai_get(plli
->lli_sai
);
943 struct ptlrpc_thread
*thread
= &sai
->sai_agl_thread
;
944 struct l_wait_info lwi
= { 0 };
946 thread
->t_pid
= current_pid();
947 CDEBUG(D_READA
, "agl thread started: sai %p, parent %pd\n",
950 atomic_inc(&sbi
->ll_agl_total
);
951 spin_lock(&plli
->lli_agl_lock
);
952 sai
->sai_agl_valid
= 1;
953 if (thread_is_init(thread
))
954 /* If someone else has changed the thread state
955 * (e.g. already changed to SVC_STOPPING), we can't just
956 * blindly overwrite that setting.
958 thread_set_flags(thread
, SVC_RUNNING
);
959 spin_unlock(&plli
->lli_agl_lock
);
960 wake_up(&thread
->t_ctl_waitq
);
963 l_wait_event(thread
->t_ctl_waitq
,
964 !list_empty(&sai
->sai_entries_agl
) ||
965 !thread_is_running(thread
),
968 if (!thread_is_running(thread
))
971 spin_lock(&plli
->lli_agl_lock
);
972 /* The statahead thread maybe help to process AGL entries,
973 * so check whether list empty again.
975 if (!list_empty(&sai
->sai_entries_agl
)) {
976 clli
= list_entry(sai
->sai_entries_agl
.next
,
977 struct ll_inode_info
, lli_agl_list
);
978 list_del_init(&clli
->lli_agl_list
);
979 spin_unlock(&plli
->lli_agl_lock
);
980 ll_agl_trigger(&clli
->lli_vfs_inode
, sai
);
982 spin_unlock(&plli
->lli_agl_lock
);
986 spin_lock(&plli
->lli_agl_lock
);
987 sai
->sai_agl_valid
= 0;
988 while (!list_empty(&sai
->sai_entries_agl
)) {
989 clli
= list_entry(sai
->sai_entries_agl
.next
,
990 struct ll_inode_info
, lli_agl_list
);
991 list_del_init(&clli
->lli_agl_list
);
992 spin_unlock(&plli
->lli_agl_lock
);
993 clli
->lli_agl_index
= 0;
994 iput(&clli
->lli_vfs_inode
);
995 spin_lock(&plli
->lli_agl_lock
);
997 thread_set_flags(thread
, SVC_STOPPED
);
998 spin_unlock(&plli
->lli_agl_lock
);
999 wake_up(&thread
->t_ctl_waitq
);
1001 CDEBUG(D_READA
, "agl thread stopped: sai %p, parent %pd\n",
1006 static void ll_start_agl(struct dentry
*parent
, struct ll_statahead_info
*sai
)
1008 struct ptlrpc_thread
*thread
= &sai
->sai_agl_thread
;
1009 struct l_wait_info lwi
= { 0 };
1010 struct ll_inode_info
*plli
;
1011 struct task_struct
*task
;
1013 CDEBUG(D_READA
, "start agl thread: sai %p, parent %pd\n",
1016 plli
= ll_i2info(d_inode(parent
));
1017 task
= kthread_run(ll_agl_thread
, parent
, "ll_agl_%u",
1018 plli
->lli_opendir_pid
);
1020 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task
));
1021 thread_set_flags(thread
, SVC_STOPPED
);
1025 l_wait_event(thread
->t_ctl_waitq
,
1026 thread_is_running(thread
) || thread_is_stopped(thread
),
1030 static int ll_statahead_thread(void *arg
)
1032 struct dentry
*parent
= arg
;
1033 struct inode
*dir
= d_inode(parent
);
1034 struct ll_inode_info
*plli
= ll_i2info(dir
);
1035 struct ll_inode_info
*clli
;
1036 struct ll_sb_info
*sbi
= ll_i2sbi(dir
);
1037 struct ll_statahead_info
*sai
= ll_sai_get(plli
->lli_sai
);
1038 struct ptlrpc_thread
*thread
= &sai
->sai_thread
;
1039 struct ptlrpc_thread
*agl_thread
= &sai
->sai_agl_thread
;
1044 struct ll_dir_chain chain
;
1045 struct l_wait_info lwi
= { 0 };
1047 thread
->t_pid
= current_pid();
1048 CDEBUG(D_READA
, "statahead thread starting: sai %p, parent %pd\n",
1051 if (sbi
->ll_flags
& LL_SBI_AGL_ENABLED
)
1052 ll_start_agl(parent
, sai
);
1054 atomic_inc(&sbi
->ll_sa_total
);
1055 spin_lock(&plli
->lli_sa_lock
);
1056 if (thread_is_init(thread
))
1057 /* If someone else has changed the thread state
1058 * (e.g. already changed to SVC_STOPPING), we can't just
1059 * blindly overwrite that setting.
1061 thread_set_flags(thread
, SVC_RUNNING
);
1062 spin_unlock(&plli
->lli_sa_lock
);
1063 wake_up(&thread
->t_ctl_waitq
);
1065 ll_dir_chain_init(&chain
);
1066 page
= ll_get_dir_page(dir
, pos
, &chain
);
1069 struct lu_dirpage
*dp
;
1070 struct lu_dirent
*ent
;
1074 CDEBUG(D_READA
, "error reading dir "DFID
" at %llu/%llu: [rc %d] [parent %u]\n",
1075 PFID(ll_inode2fid(dir
)), pos
, sai
->sai_index
,
1076 rc
, plli
->lli_opendir_pid
);
1080 dp
= page_address(page
);
1081 for (ent
= lu_dirent_start(dp
); ent
;
1082 ent
= lu_dirent_next(ent
)) {
1087 hash
= le64_to_cpu(ent
->lde_hash
);
1088 if (unlikely(hash
< pos
))
1090 * Skip until we find target hash value.
1094 namelen
= le16_to_cpu(ent
->lde_namelen
);
1095 if (unlikely(namelen
== 0))
1097 * Skip dummy record.
1101 name
= ent
->lde_name
;
1102 if (name
[0] == '.') {
1108 } else if (name
[1] == '.' && namelen
== 2) {
1113 } else if (!sai
->sai_ls_all
) {
1115 * skip hidden files.
1117 sai
->sai_skip_hidden
++;
1123 * don't stat-ahead first entry.
1125 if (unlikely(++first
== 1))
1129 l_wait_event(thread
->t_ctl_waitq
,
1130 !sa_sent_full(sai
) ||
1131 !list_empty(&sai
->sai_entries_received
) ||
1132 !list_empty(&sai
->sai_entries_agl
) ||
1133 !thread_is_running(thread
),
1137 while (!list_empty(&sai
->sai_entries_received
))
1138 ll_post_statahead(sai
);
1140 if (unlikely(!thread_is_running(thread
))) {
1141 ll_release_page(page
, 0);
1146 /* If no window for metadata statahead, but there are
1147 * some AGL entries to be triggered, then try to help
1148 * to process the AGL entries.
1150 if (sa_sent_full(sai
)) {
1151 spin_lock(&plli
->lli_agl_lock
);
1152 while (!list_empty(&sai
->sai_entries_agl
)) {
1153 clli
= list_entry(sai
->sai_entries_agl
.next
,
1154 struct ll_inode_info
, lli_agl_list
);
1155 list_del_init(&clli
->lli_agl_list
);
1156 spin_unlock(&plli
->lli_agl_lock
);
1157 ll_agl_trigger(&clli
->lli_vfs_inode
,
1160 if (!list_empty(&sai
->sai_entries_received
))
1164 !thread_is_running(thread
))) {
1165 ll_release_page(page
, 0);
1170 if (!sa_sent_full(sai
))
1173 spin_lock(&plli
->lli_agl_lock
);
1175 spin_unlock(&plli
->lli_agl_lock
);
1181 ll_statahead_one(parent
, name
, namelen
);
1183 pos
= le64_to_cpu(dp
->ldp_hash_end
);
1184 if (pos
== MDS_DIR_END_OFF
) {
1186 * End of directory reached.
1188 ll_release_page(page
, 0);
1190 l_wait_event(thread
->t_ctl_waitq
,
1191 !list_empty(&sai
->sai_entries_received
) ||
1192 sai
->sai_sent
== sai
->sai_replied
||
1193 !thread_is_running(thread
),
1196 while (!list_empty(&sai
->sai_entries_received
))
1197 ll_post_statahead(sai
);
1199 if (unlikely(!thread_is_running(thread
))) {
1204 if (sai
->sai_sent
== sai
->sai_replied
&&
1205 list_empty(&sai
->sai_entries_received
))
1209 spin_lock(&plli
->lli_agl_lock
);
1210 while (!list_empty(&sai
->sai_entries_agl
) &&
1211 thread_is_running(thread
)) {
1212 clli
= list_entry(sai
->sai_entries_agl
.next
,
1213 struct ll_inode_info
, lli_agl_list
);
1214 list_del_init(&clli
->lli_agl_list
);
1215 spin_unlock(&plli
->lli_agl_lock
);
1216 ll_agl_trigger(&clli
->lli_vfs_inode
, sai
);
1217 spin_lock(&plli
->lli_agl_lock
);
1219 spin_unlock(&plli
->lli_agl_lock
);
1225 * chain is exhausted.
1226 * Normal case: continue to the next page.
1228 ll_release_page(page
, le32_to_cpu(dp
->ldp_flags
) &
1230 page
= ll_get_dir_page(dir
, pos
, &chain
);
1232 LASSERT(le32_to_cpu(dp
->ldp_flags
) & LDF_COLLIDE
);
1233 ll_release_page(page
, 1);
1235 * go into overflow page.
1241 if (sai
->sai_agl_valid
) {
1242 spin_lock(&plli
->lli_agl_lock
);
1243 thread_set_flags(agl_thread
, SVC_STOPPING
);
1244 spin_unlock(&plli
->lli_agl_lock
);
1245 wake_up(&agl_thread
->t_ctl_waitq
);
1247 CDEBUG(D_READA
, "stop agl thread: sai %p pid %u\n",
1248 sai
, (unsigned int)agl_thread
->t_pid
);
1249 l_wait_event(agl_thread
->t_ctl_waitq
,
1250 thread_is_stopped(agl_thread
),
1253 /* Set agl_thread flags anyway. */
1254 thread_set_flags(&sai
->sai_agl_thread
, SVC_STOPPED
);
1256 ll_dir_chain_fini(&chain
);
1257 spin_lock(&plli
->lli_sa_lock
);
1258 if (!list_empty(&sai
->sai_entries_received
)) {
1259 thread_set_flags(thread
, SVC_STOPPING
);
1260 spin_unlock(&plli
->lli_sa_lock
);
1262 /* To release the resources held by received entries. */
1263 while (!list_empty(&sai
->sai_entries_received
))
1264 ll_post_statahead(sai
);
1266 spin_lock(&plli
->lli_sa_lock
);
1268 thread_set_flags(thread
, SVC_STOPPED
);
1269 spin_unlock(&plli
->lli_sa_lock
);
1270 wake_up(&sai
->sai_waitq
);
1271 wake_up(&thread
->t_ctl_waitq
);
1274 CDEBUG(D_READA
, "statahead thread stopped: sai %p, parent %pd\n",
1280 * called in ll_file_release().
1282 void ll_stop_statahead(struct inode
*dir
, void *key
)
1284 struct ll_inode_info
*lli
= ll_i2info(dir
);
1289 spin_lock(&lli
->lli_sa_lock
);
1290 if (lli
->lli_opendir_key
!= key
|| lli
->lli_opendir_pid
== 0) {
1291 spin_unlock(&lli
->lli_sa_lock
);
1295 lli
->lli_opendir_key
= NULL
;
1298 struct l_wait_info lwi
= { 0 };
1299 struct ptlrpc_thread
*thread
= &lli
->lli_sai
->sai_thread
;
1301 if (!thread_is_stopped(thread
)) {
1302 thread_set_flags(thread
, SVC_STOPPING
);
1303 spin_unlock(&lli
->lli_sa_lock
);
1304 wake_up(&thread
->t_ctl_waitq
);
1306 CDEBUG(D_READA
, "stop statahead thread: sai %p pid %u\n",
1307 lli
->lli_sai
, (unsigned int)thread
->t_pid
);
1308 l_wait_event(thread
->t_ctl_waitq
,
1309 thread_is_stopped(thread
),
1312 spin_unlock(&lli
->lli_sa_lock
);
1316 * Put the ref which was held when first statahead_enter.
1317 * It maybe not the last ref for some statahead requests
1320 ll_sai_put(lli
->lli_sai
);
1322 lli
->lli_opendir_pid
= 0;
1323 spin_unlock(&lli
->lli_sa_lock
);
1329 * not first dirent, or is "."
1331 LS_NONE_FIRST_DE
= 0,
1333 * the first non-hidden dirent
1337 * the first hidden dirent, that is "."
1342 static int is_first_dirent(struct inode
*dir
, struct dentry
*dentry
)
1344 struct ll_dir_chain chain
;
1345 const struct qstr
*target
= &dentry
->d_name
;
1349 int rc
= LS_NONE_FIRST_DE
;
1351 ll_dir_chain_init(&chain
);
1352 page
= ll_get_dir_page(dir
, pos
, &chain
);
1355 struct lu_dirpage
*dp
;
1356 struct lu_dirent
*ent
;
1359 struct ll_inode_info
*lli
= ll_i2info(dir
);
1362 CERROR("error reading dir "DFID
" at %llu: [rc %d] [parent %u]\n",
1363 PFID(ll_inode2fid(dir
)), pos
,
1364 rc
, lli
->lli_opendir_pid
);
1368 dp
= page_address(page
);
1369 for (ent
= lu_dirent_start(dp
); ent
;
1370 ent
= lu_dirent_next(ent
)) {
1375 hash
= le64_to_cpu(ent
->lde_hash
);
1376 /* The ll_get_dir_page() can return any page containing
1377 * the given hash which may be not the start hash.
1379 if (unlikely(hash
< pos
))
1382 namelen
= le16_to_cpu(ent
->lde_namelen
);
1383 if (unlikely(namelen
== 0))
1385 * skip dummy record.
1389 name
= ent
->lde_name
;
1390 if (name
[0] == '.') {
1396 else if (name
[1] == '.' && namelen
== 2)
1407 if (dot_de
&& target
->name
[0] != '.') {
1408 CDEBUG(D_READA
, "%.*s skip hidden file %.*s\n",
1409 target
->len
, target
->name
,
1414 if (target
->len
!= namelen
||
1415 memcmp(target
->name
, name
, namelen
) != 0)
1416 rc
= LS_NONE_FIRST_DE
;
1420 rc
= LS_FIRST_DOT_DE
;
1422 ll_release_page(page
, 0);
1425 pos
= le64_to_cpu(dp
->ldp_hash_end
);
1426 if (pos
== MDS_DIR_END_OFF
) {
1428 * End of directory reached.
1430 ll_release_page(page
, 0);
1434 * chain is exhausted
1435 * Normal case: continue to the next page.
1437 ll_release_page(page
, le32_to_cpu(dp
->ldp_flags
) &
1439 page
= ll_get_dir_page(dir
, pos
, &chain
);
1442 * go into overflow page.
1444 LASSERT(le32_to_cpu(dp
->ldp_flags
) & LDF_COLLIDE
);
1445 ll_release_page(page
, 1);
1450 ll_dir_chain_fini(&chain
);
1455 ll_sai_unplug(struct ll_statahead_info
*sai
, struct ll_sa_entry
*entry
)
1457 struct ptlrpc_thread
*thread
= &sai
->sai_thread
;
1458 struct ll_sb_info
*sbi
= ll_i2sbi(sai
->sai_inode
);
1461 if (entry
&& entry
->se_stat
== SA_ENTRY_SUCC
)
1466 ll_sa_entry_fini(sai
, entry
);
1469 sai
->sai_consecutive_miss
= 0;
1470 sai
->sai_max
= min(2 * sai
->sai_max
, sbi
->ll_sa_max
);
1472 struct ll_inode_info
*lli
= ll_i2info(sai
->sai_inode
);
1475 sai
->sai_consecutive_miss
++;
1476 if (sa_low_hit(sai
) && thread_is_running(thread
)) {
1477 atomic_inc(&sbi
->ll_sa_wrong
);
1478 CDEBUG(D_READA
, "Statahead for dir " DFID
" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread\n",
1479 PFID(&lli
->lli_fid
), sai
->sai_hit
,
1480 sai
->sai_miss
, sai
->sai_sent
,
1482 spin_lock(&lli
->lli_sa_lock
);
1483 if (!thread_is_stopped(thread
))
1484 thread_set_flags(thread
, SVC_STOPPING
);
1485 spin_unlock(&lli
->lli_sa_lock
);
1489 if (!thread_is_stopped(thread
))
1490 wake_up(&thread
->t_ctl_waitq
);
1494 * Start statahead thread if this is the first dir entry.
1495 * Otherwise if a thread is started already, wait it until it is ahead of me.
1496 * \retval 1 -- find entry with lock in cache, the caller needs to do
1498 * \retval 0 -- find entry in cache, but without lock, the caller needs
1500 * \retval others -- the caller need to process as non-statahead.
1502 int do_statahead_enter(struct inode
*dir
, struct dentry
**dentryp
,
1505 struct ll_inode_info
*lli
= ll_i2info(dir
);
1506 struct ll_statahead_info
*sai
= lli
->lli_sai
;
1507 struct dentry
*parent
;
1508 struct ll_sa_entry
*entry
;
1509 struct ptlrpc_thread
*thread
;
1510 struct l_wait_info lwi
= { 0 };
1511 struct task_struct
*task
;
1513 struct ll_inode_info
*plli
;
1515 LASSERT(lli
->lli_opendir_pid
== current_pid());
1518 thread
= &sai
->sai_thread
;
1519 if (unlikely(thread_is_stopped(thread
) &&
1520 list_empty(&sai
->sai_entries_stated
))) {
1521 /* to release resource */
1522 ll_stop_statahead(dir
, lli
->lli_opendir_key
);
1526 if ((*dentryp
)->d_name
.name
[0] == '.') {
1527 if (sai
->sai_ls_all
||
1528 sai
->sai_miss_hidden
>= sai
->sai_skip_hidden
) {
1530 * Hidden dentry is the first one, or statahead
1531 * thread does not skip so many hidden dentries
1532 * before "sai_ls_all" enabled as below.
1535 if (!sai
->sai_ls_all
)
1537 * It maybe because hidden dentry is not
1538 * the first one, "sai_ls_all" was not
1539 * set, then "ls -al" missed. Enable
1540 * "sai_ls_all" for such case.
1542 sai
->sai_ls_all
= 1;
1545 * Such "getattr" has been skipped before
1546 * "sai_ls_all" enabled as above.
1548 sai
->sai_miss_hidden
++;
1553 entry
= ll_sa_entry_get_byname(sai
, &(*dentryp
)->d_name
);
1554 if (!entry
|| only_unplug
) {
1555 ll_sai_unplug(sai
, entry
);
1556 return entry
? 1 : -EAGAIN
;
1559 if (!ll_sa_entry_stated(entry
)) {
1560 sai
->sai_index_wait
= entry
->se_index
;
1561 lwi
= LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL
,
1562 LWI_ON_SIGNAL_NOOP
, NULL
);
1563 rc
= l_wait_event(sai
->sai_waitq
,
1564 ll_sa_entry_stated(entry
) ||
1565 thread_is_stopped(thread
),
1568 ll_sai_unplug(sai
, entry
);
1573 if (entry
->se_stat
== SA_ENTRY_SUCC
&& entry
->se_inode
) {
1574 struct inode
*inode
= entry
->se_inode
;
1575 struct lookup_intent it
= { .it_op
= IT_GETATTR
,
1576 .d
.lustre
.it_lock_handle
=
1580 rc
= md_revalidate_lock(ll_i2mdexp(dir
), &it
,
1581 ll_inode2fid(inode
), &bits
);
1583 if (!d_inode(*dentryp
)) {
1584 struct dentry
*alias
;
1586 alias
= ll_splice_alias(inode
,
1588 if (IS_ERR(alias
)) {
1589 ll_sai_unplug(sai
, entry
);
1590 return PTR_ERR(alias
);
1593 } else if (d_inode(*dentryp
) != inode
) {
1594 /* revalidate, but inode is recreated */
1595 CDEBUG(D_READA
, "%s: stale dentry %pd inode "DFID
", statahead inode "DFID
"\n",
1596 ll_get_fsname(d_inode(*dentryp
)->i_sb
, NULL
, 0),
1598 PFID(ll_inode2fid(d_inode(*dentryp
))),
1599 PFID(ll_inode2fid(inode
)));
1600 ll_sai_unplug(sai
, entry
);
1605 entry
->se_inode
= NULL
;
1607 if ((bits
& MDS_INODELOCK_LOOKUP
) &&
1608 d_lustre_invalid(*dentryp
))
1609 d_lustre_revalidate(*dentryp
);
1610 ll_intent_release(&it
);
1614 ll_sai_unplug(sai
, entry
);
1618 /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1619 rc
= is_first_dirent(dir
, *dentryp
);
1620 if (rc
== LS_NONE_FIRST_DE
) {
1621 /* It is not "ls -{a}l" operation, no need statahead for it. */
1626 sai
= ll_sai_alloc();
1632 sai
->sai_ls_all
= (rc
== LS_FIRST_DOT_DE
);
1633 sai
->sai_inode
= igrab(dir
);
1634 if (unlikely(!sai
->sai_inode
)) {
1635 CWARN("Do not start stat ahead on dying inode "DFID
"\n",
1636 PFID(&lli
->lli_fid
));
1641 /* get parent reference count here, and put it in ll_statahead_thread */
1642 parent
= dget((*dentryp
)->d_parent
);
1643 if (unlikely(sai
->sai_inode
!= d_inode(parent
))) {
1644 struct ll_inode_info
*nlli
= ll_i2info(d_inode(parent
));
1646 CWARN("Race condition, someone changed %pd just now: old parent "DFID
", new parent "DFID
"\n",
1648 PFID(&lli
->lli_fid
), PFID(&nlli
->lli_fid
));
1650 iput(sai
->sai_inode
);
1655 CDEBUG(D_READA
, "start statahead thread: sai %p, parent %pd\n",
1658 /* The sai buffer already has one reference taken at allocation time,
1659 * but as soon as we expose the sai by attaching it to the lli that
1660 * default reference can be dropped by another thread calling
1661 * ll_stop_statahead. We need to take a local reference to protect
1662 * the sai buffer while we intend to access it.
1667 plli
= ll_i2info(d_inode(parent
));
1668 task
= kthread_run(ll_statahead_thread
, parent
, "ll_sa_%u",
1669 plli
->lli_opendir_pid
);
1670 thread
= &sai
->sai_thread
;
1673 CERROR("can't start ll_sa thread, rc: %d\n", rc
);
1675 lli
->lli_opendir_key
= NULL
;
1676 thread_set_flags(thread
, SVC_STOPPED
);
1677 thread_set_flags(&sai
->sai_agl_thread
, SVC_STOPPED
);
1678 /* Drop both our own local reference and the default
1679 * reference from allocation time.
1683 LASSERT(!lli
->lli_sai
);
1687 l_wait_event(thread
->t_ctl_waitq
,
1688 thread_is_running(thread
) || thread_is_stopped(thread
),
1693 * We don't stat-ahead for the first dirent since we are already in
1700 spin_lock(&lli
->lli_sa_lock
);
1701 lli
->lli_opendir_key
= NULL
;
1702 lli
->lli_opendir_pid
= 0;
1703 spin_unlock(&lli
->lli_sa_lock
);
This page took 0.067581 seconds and 5 git commands to generate.