diff --git lustre/llite/dcache.c lustre/llite/dcache.c index 70059ec..ac4fcb8 100644 --- lustre/llite/dcache.c +++ lustre/llite/dcache.c @@ -58,10 +58,8 @@ static void ll_release(struct dentry *de) ENTRY; LASSERT(de != NULL); lld = ll_d2d(de); - if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */ - EXIT; - return; - } + if (lld == NULL) /* NFS copies the de->d_op methods (bug 4655) */ + RETURN_EXIT; #ifndef HAVE_VFS_INTENT_PATCHES if (lld->lld_it) { ll_intent_release(lld->lld_it); @@ -87,15 +85,15 @@ int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name) struct dentry *dchild; ENTRY; - /* XXX: d_name must be in-dentry structure */ - dchild = container_of(d_name, struct dentry, d_name); /* ugh */ - if (d_name->len != name->len) RETURN(1); if (memcmp(d_name->name, name->name, name->len)) RETURN(1); + /* XXX: d_name must be in-dentry structure */ + dchild = container_of(d_name, struct dentry, d_name); /* ugh */ + CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n", name->len, name->name, dchild, d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID, @@ -146,6 +144,7 @@ int find_cbdata(struct inode *inode) /* should NOT be called with the dcache lock, see fs/dcache.c */ static int ll_ddelete(struct dentry *de) { + struct ll_dentry_data *lld; ENTRY; LASSERT(de); @@ -155,6 +154,19 @@ static int ll_ddelete(struct dentry *de) d_unhashed(de) ? "" : "hashed,", list_empty(&de->d_subdirs) ? "" : "subdirs"); + lld = ll_d2d(de); + if (lld != NULL && lld->lld_rwsem_pid != 0) { + struct inode *inode = lld->lld_rwsem_inode; + struct ll_inode_info *lli; + + LASSERT(inode != NULL); + lli = ll_i2info(inode); + lld->lld_rwsem_pid = 0; + up_read(&lli->lli_sa_rwsem); + lld->lld_rwsem_inode = NULL; + iput(inode); + } + /* if not ldlm lock for this inode, set i_nlink to 0 so that * this inode can be recycled later b=20433 */ LASSERT(atomic_read(&de->d_count) == 0); @@ -167,7 +179,7 @@ static int ll_ddelete(struct dentry *de) RETURN(0); } -void ll_set_dd(struct dentry *de) +static int ll_set_dd(struct dentry *de) { ENTRY; LASSERT(de != NULL); @@ -187,10 +199,32 @@ void ll_set_dd(struct dentry *de) else OBD_FREE_PTR(lld); unlock_dentry(de); + } else { + RETURN(-ENOMEM); } } - EXIT; + RETURN(0); +} + +int ll_dops_init(struct dentry *de, int block) +{ + struct ll_dentry_data *lld = ll_d2d(de); + int rc = 0; + + if (lld == NULL && block != 0) { + rc = ll_set_dd(de); + if (rc) + return rc; + + lld = ll_d2d(de); + } + + if (lld != NULL) + lld->lld_sa_generation = 0; + + de->d_op = &ll_d_ops; + return rc; } void ll_intent_drop_lock(struct lookup_intent *it) @@ -389,8 +423,8 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, int first = -1, rc; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name, - LL_IT2STR(it)); + CDEBUG(D_VFSTRACE, "VFS revalidate: name=(%p)%.*s, intent=%s\n", + de, de->d_name.len, de->d_name.name, LL_IT2STR(it)); if (de->d_inode == NULL) { /* We can only use negative dentries if this is stat or lookup, @@ -479,7 +513,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, } } - if (it->it_op == IT_GETATTR) + if (it->it_op == IT_GETATTR && !(lookup_flags & LOOKUP_CREATE)) first = ll_statahead_enter(parent, &de, 0); do_lock: @@ -488,7 +522,7 @@ do_lock: rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags, &req, ll_mdc_blocking_ast, 0); it->it_create_mode &= ~M_CHECK_STALE; - if (it->it_op == IT_GETATTR && !first) + if (!first) /* If there are too many locks on client-side, then some * locks taken by statahead maybe dropped automatically * before the real "revalidate" using them. */ @@ -560,12 +594,14 @@ revalidate_finish: "inode %p refc %d\n", de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode, atomic_read(&de->d_count)); - ll_lookup_finish_locks(it, de); #ifdef DCACHE_LUSTRE_INVALID - lock_dentry(de); - de->d_flags &= ~DCACHE_LUSTRE_INVALID; - unlock_dentry(de); + if (de->d_flags & DCACHE_LUSTRE_INVALID) { + lock_dentry(de); + de->d_flags &= ~DCACHE_LUSTRE_INVALID; + unlock_dentry(de); + } #endif + ll_lookup_finish_locks(it, de); } RETURN(rc); /* This part is here to combat evil-evil race in real_lookup on 2.6 kernels. @@ -614,7 +650,8 @@ out_sa: * For rc == 1 case, should not return directly to prevent losing * statahead windows; for rc == 0 case, the "lookup" will be done later. */ - if (it && it->it_op == IT_GETATTR && rc == 1) { + if (it && it->it_op == IT_GETATTR && !(lookup_flags & LOOKUP_CREATE) && + rc == 1) { first = ll_statahead_enter(parent, &de, 0); if (!first) ll_statahead_exit(parent, de, 1); @@ -642,14 +679,12 @@ out_sa: * issue is moot. */ if (flag == 1 && (++ldd->lld_mnt_count) > 1) { unlock_kernel(); - EXIT; - return; + RETURN_EXIT; } if (flag == 0 && (++ldd->lld_cwd_count) > 1) { unlock_kernel(); - EXIT; - return; + RETURN_EXIT; } unlock_kernel(); @@ -667,8 +702,7 @@ out_sa: unlock_kernel(); } - EXIT; - return; + RETURN_EXIT; } /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag) @@ -689,8 +723,7 @@ out_sa: if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) { /* the "pin" failed */ unlock_kernel(); - EXIT; - return; + RETURN_EXIT; } if (flag) @@ -699,26 +732,37 @@ out_sa: count = --ldd->lld_cwd_count; unlock_kernel(); - if (count != 0) { - EXIT; - return; - } + if (count != 0) + RETURN_EXIT; rc = obd_unpin(sbi->ll_mdc_exp, &handle, flag); - EXIT; - return; + RETURN_EXIT; } #ifdef HAVE_VFS_INTENT_PATCHES int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd) { int rc; - ENTRY; + ENTRY; + + if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST)) { + struct inode *dir = dentry->d_parent->d_inode; + int sa = -1; + + if (nd->flags & LOOKUP_CREATE) { + sa = ll_sa_rwsem_setup(dir, dentry); + if (unlikely(sa)) + RETURN(0); + } - if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST)) rc = ll_revalidate_it(dentry, nd->flags, &nd->intent); - else + + if (sa == 0 && + (rc == 0 || IS_ERR(ERR_PTR(rc)) || dentry->d_inode != NULL)) + ll_sa_rwsem_cleanup(dir, dentry); + } else { rc = ll_revalidate_it(dentry, 0, NULL); + } RETURN(rc); } @@ -729,17 +773,28 @@ int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd) ENTRY; if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) { + struct inode *dir = dentry->d_parent->d_inode; + int sa = -1; struct lookup_intent *it; + it = ll_convert_intent(&nd->intent.open, nd->flags); if (IS_ERR(it)) RETURN(0); - if (it->it_op == (IT_OPEN|IT_CREAT)) - if (nd->intent.open.flags & O_EXCL) { - CDEBUG(D_VFSTRACE, - "create O_EXCL, returning 0\n"); - rc = 0; + + if (it->it_op == (IT_OPEN|IT_CREAT) && + nd->intent.open.flags & O_EXCL) { + CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n"); + rc = 0; + goto out_it; + } + + if (nd->flags & LOOKUP_CREATE) { + sa = ll_sa_rwsem_setup(dir, dentry); + if (unlikely(sa)) { + rc = sa; goto out_it; } + } rc = ll_revalidate_it(dentry, nd->flags, it); @@ -790,6 +845,10 @@ out_it: ll_intent_release(it); OBD_FREE(it, sizeof(*it)); } + + if (sa == 0 && + (rc == 0 || IS_ERR(ERR_PTR(rc)) || dentry->d_inode != NULL)) + ll_sa_rwsem_cleanup(dir, dentry); } else { rc = ll_revalidate_it(dentry, 0, NULL); } diff --git lustre/llite/dir.c lustre/llite/dir.c index 7a40543..dd44fb7 100644 --- lustre/llite/dir.c +++ lustre/llite/dir.c @@ -207,8 +207,9 @@ static void ll_dir_check_page(struct inode *dir, struct page *page) SetPageChecked(page); } -struct page *ll_get_dir_page(struct inode *dir, unsigned long n) +struct page *ll_get_dir_page(struct inode *dir, unsigned long n, int lock) { + struct ll_inode_info *lli = ll_i2info(dir); struct ldlm_res_id res_id; struct lustre_handle lockh; struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); @@ -217,6 +218,9 @@ struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; int rc; + if (lock) + down_write(&lli->lli_sa_rwsem); + fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id); rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); @@ -237,7 +241,7 @@ struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ptlrpc_req_finished(request); if (rc < 0) { CERROR("lock enqueue: rc: %d\n", rc); - return ERR_PTR(rc); + GOTO(out, page = ERR_PTR(rc)); } } ldlm_lock_dump_handle(D_OTHER, &lockh); @@ -258,6 +262,9 @@ struct page *ll_get_dir_page(struct inode *dir, unsigned long n) out_unlock: ldlm_lock_decref(&lockh, LCK_CR); +out: + if (lock) + up_write(&lli->lli_sa_rwsem); return page; fail: @@ -373,7 +380,7 @@ static int ll_readdir_18(struct file *filp, void *dirent, filldir_t filldir) "size %llu\n", CFS_PAGE_SIZE, inode->i_ino, inode->i_generation, idx, npages, i_size_read(inode)); - page = ll_get_dir_page(inode, idx); + page = ll_get_dir_page(inode, idx, 0); /* size might have been updated by mdc_readpage */ npages = dir_pages(inode); diff --git lustre/llite/file.c lustre/llite/file.c index 882bfaa..3ff9536 100644 --- lustre/llite/file.c +++ lustre/llite/file.c @@ -430,14 +430,14 @@ int ll_file_open(struct inode *inode, struct file *file) RETURN(-ENOMEM); if (S_ISDIR(inode->i_mode)) { - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) { LASSERT(lli->lli_sai == NULL); lli->lli_opendir_key = fd; lli->lli_opendir_pid = cfs_curproc_pid(); opendir_set = 1; } - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); } if (inode->i_sb->s_root == file->f_dentry) { diff --git lustre/llite/llite_internal.h lustre/llite/llite_internal.h index 70ab767..92fdd74 100644 --- lustre/llite/llite_internal.h +++ lustre/llite/llite_internal.h @@ -113,6 +113,8 @@ struct ll_dentry_data { struct lookup_intent *lld_it; #endif unsigned int lld_sa_generation; + pid_t lld_rwsem_pid; + struct inode *lld_rwsem_inode; }; #define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata)) @@ -172,11 +174,10 @@ struct ll_inode_info { struct ll_fid f16; } lli_fid; - /* metadata stat-ahead */ - /* - * "opendir_pid" is the token when lookup/revalid -- I am the owner of - * dir statahead. - */ + /* metadata statahead */ + /* protect statahead stuff: lli_opendir_pid, lli_opendir_key, lli_sai, + * lli_sa_dentry, and so on. */ + spinlock_t lli_sa_lock; pid_t lli_opendir_pid; /* * since parent-child threads can share the same @file struct, @@ -184,6 +185,7 @@ struct ll_inode_info { * before child -- it is me should cleanup the dir readahead. */ void *lli_opendir_key; struct ll_statahead_info *lli_sai; + struct rw_semaphore lli_sa_rwsem; struct rw_semaphore lli_truncate_rwsem; /* the most recent attributes from mds, it is used for timestamps * only so far */ @@ -393,17 +395,12 @@ struct ll_sb_info { dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ - /* metadata stat-ahead */ + /* metadata statahead */ unsigned int ll_sa_max; /* max statahead RPCs */ - unsigned int ll_sa_wrong; /* statahead thread stopped for - * low hit ratio */ - unsigned int ll_sa_total; /* statahead thread started + atomic_t ll_sa_total; /* statahead thread started * count */ - unsigned long long ll_sa_blocked; /* ls count waiting for - * statahead */ - unsigned long long ll_sa_cached; /* ls count got in cache */ - unsigned long long ll_sa_hit; /* hit count */ - unsigned long long ll_sa_miss; /* miss count */ + atomic_t ll_sa_wrong; /* statahead thread stopped for + * low hit ratio */ }; #define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024) @@ -668,7 +665,7 @@ static void lprocfs_llite_init_vars(struct lprocfs_static_vars *lvars) extern struct file_operations ll_dir_operations; extern struct inode_operations ll_dir_inode_operations; -struct page *ll_get_dir_page(struct inode *dir, unsigned long n); +struct page *ll_get_dir_page(struct inode *dir, unsigned long n, int lock); static inline unsigned ll_dir_rec_len(unsigned name_len) { @@ -719,7 +716,6 @@ int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, int flag); int lookup_it_finish(struct ptlrpc_request *request, int offset, struct lookup_intent *it, void *data); -void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -801,11 +797,13 @@ int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap, /** * protect race ll_find_aliases vs ll_revalidate_it vs ll_unhash_aliases */ +int ll_sa_rwsem_setup(struct inode *dir, struct dentry *dchild); +void ll_sa_rwsem_cleanup(struct inode *dir, struct dentry *dchild); +int ll_dops_init(struct dentry *de, int block); extern spinlock_t ll_lookup_lock; extern struct dentry_operations ll_d_ops; void ll_intent_drop_lock(struct lookup_intent *); void ll_intent_release(struct lookup_intent *); -extern void ll_set_dd(struct dentry *de); int ll_drop_dentry(struct dentry *dentry); void ll_unhash_aliases(struct inode *); void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft); @@ -1067,40 +1065,32 @@ struct ll_statahead_info { int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result); -void ll_stop_statahead(struct inode *inode, void *key); +void ll_stop_statahead(struct inode *dir, void *key); static inline void ll_statahead_mark(struct inode *dir, struct dentry *dentry) { - struct ll_inode_info *lli; - struct ll_dentry_data *ldd = ll_d2d(dentry); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_dentry_data *ldd; - /* dentry has been move to other directory, no need mark */ - if (unlikely(dir != dentry->d_parent->d_inode)) - return; - - lli = ll_i2info(dir); /* not the same process, don't mark */ - if (lli->lli_opendir_pid != cfs_curproc_pid()) + if (unlikely(lli->lli_opendir_pid != cfs_curproc_pid())) return; - spin_lock(&lli->lli_lock); - if (likely(lli->lli_sai != NULL && ldd != NULL)) + ldd = ll_d2d(dentry); + spin_lock(&lli->lli_sa_lock); + if (likely(lli->lli_opendir_pid == cfs_curproc_pid() && + lli->lli_sai != NULL && ldd != NULL)) ldd->lld_sa_generation = lli->lli_sai->sai_generation; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); } static inline int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) { - struct ll_inode_info *lli; - struct ll_sb_info *sbi; - struct ll_dentry_data *ldd = ll_d2d(*dentryp); - - if (unlikely(dir == NULL)) - return -EAGAIN; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli; - sbi = ll_i2sbi(dir); /* temporarily disable dir stat ahead in interoperability mode */ if (sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID) return -ENOTSUPP; @@ -1113,43 +1103,9 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) if (lli->lli_opendir_pid != cfs_curproc_pid()) return -EAGAIN; - /* - * When "ls" a dentry, the system trigger more than once "revalidate" or - * "lookup", for "getattr", for "getxattr", and maybe for others. - * Under patchless client mode, the operation intent is not accurate, - * it maybe misguide the statahead thread. For example: - * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe - * have the same operation intent -- "IT_GETATTR". - * In fact, one dentry should has only one chance to interact with the - * statahead thread, otherwise the statahead windows will be confused. - * The solution is as following: - * Assign "lld_sa_generation" with "sai_generation" when a dentry - * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR" - * will bypass interacting with statahead thread for checking: - * "lld_sa_generation == lli_sai->sai_generation" - */ - if (ldd && lli->lli_sai && - ldd->lld_sa_generation == lli->lli_sai->sai_generation) - return -EAGAIN; - return do_statahead_enter(dir, dentryp, lookup); } -static void inline ll_dops_init(struct dentry *de, int block) -{ - struct ll_dentry_data *lld = ll_d2d(de); - - if (lld == NULL && block != 0) { - ll_set_dd(de); - lld = ll_d2d(de); - } - - if (lld != NULL) - lld->lld_sa_generation = 0; - - de->d_op = &ll_d_ops; -} - /* llite ioctl register support rountine */ #ifdef __KERNEL__ enum llioc_iter { diff --git lustre/llite/llite_lib.c lustre/llite/llite_lib.c index d37155a..006a8ec 100644 --- lustre/llite/llite_lib.c +++ lustre/llite/llite_lib.c @@ -206,6 +206,8 @@ static struct ll_sb_info *ll_init_sbi(void) /* metadata statahead is enabled by default */ sbi->ll_sa_max = LL_SA_RPC_DEF; + atomic_set(&sbi->ll_sa_total, 0); + atomic_set(&sbi->ll_sa_wrong, 0); RETURN(sbi); @@ -818,6 +820,8 @@ void ll_lli_init(struct ll_inode_info *lli) INIT_LIST_HEAD(&lli->lli_pending_write_llaps); #endif init_rwsem(&lli->lli_truncate_rwsem); + spin_lock_init(&lli->lli_sa_lock); + init_rwsem(&lli->lli_sa_rwsem); } /* COMPAT_146 */ @@ -2144,10 +2148,8 @@ void ll_umount_begin(struct super_block *sb) ENTRY; #ifdef HAVE_UMOUNTBEGIN_VFSMOUNT - if (!(flags & MNT_FORCE)) { - EXIT; - return; - } + if (!(flags & MNT_FORCE)) + RETURN_EXIT; #endif /* Tell the MGC we got umount -f */ @@ -2160,8 +2162,7 @@ void ll_umount_begin(struct super_block *sb) if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", sbi->ll_mdc_exp->exp_handle.h_cookie); - EXIT; - return; + RETURN_EXIT; } obd->obd_force = 1; obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data, @@ -2171,8 +2172,7 @@ void ll_umount_begin(struct super_block *sb) if (obd == NULL) { CERROR("Invalid LOV connection handle "LPX64"\n", sbi->ll_osc_exp->exp_handle.h_cookie); - EXIT; - return; + RETURN_EXIT; } obd->obd_force = 1; diff --git lustre/llite/lproc_llite.c lustre/llite/lproc_llite.c index 15b514a..15ebfdc 100644 --- lustre/llite/lproc_llite.c +++ lustre/llite/lproc_llite.c @@ -633,18 +633,10 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off, struct ll_sb_info *sbi = ll_s2sbi(sb); return snprintf(page, count, - "statahead wrong: %u\n" "statahead total: %u\n" - "ls blocked: %llu\n" - "ls cached: %llu\n" - "hit count: %llu\n" - "miss count: %llu\n", - sbi->ll_sa_wrong, - sbi->ll_sa_total, - sbi->ll_sa_blocked, - sbi->ll_sa_cached, - sbi->ll_sa_hit, - sbi->ll_sa_miss); + "statahead wrong: %u\n", + atomic_read(&sbi->ll_sa_total), + atomic_read(&sbi->ll_sa_wrong)); } static int ll_rd_lazystatfs(char *page, char **start, off_t off, diff --git lustre/llite/namei.c lustre/llite/namei.c index 28a404b..fcd0d70 100644 --- lustre/llite/namei.c +++ lustre/llite/namei.c @@ -50,6 +50,56 @@ #include #include "llite_internal.h" +int ll_sa_rwsem_setup(struct inode *dir, struct dentry *dchild) +{ + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli; + struct ll_dentry_data *lld; + int rc; + + ENTRY; + /* temporarily disable dir stat ahead in interoperability mode */ + if (sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID) + RETURN(0); + + rc = ll_dops_init(dchild, 1); + if (unlikely(rc)) + RETURN(rc); + + lli = ll_i2info(dir); + lld = ll_d2d(dchild); + LASSERT(lld->lld_rwsem_pid == 0); + LASSERT(lld->lld_rwsem_inode == NULL); + lld->lld_rwsem_inode = igrab(dir); + down_read(&lli->lli_sa_rwsem); + lld->lld_rwsem_pid = cfs_curproc_pid(); + RETURN(0); +} + +void ll_sa_rwsem_cleanup(struct inode *dir, struct dentry *dchild) +{ + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli; + struct ll_dentry_data *lld; + + ENTRY; + /* temporarily disable dir stat ahead in interoperability mode */ + if (sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID) + RETURN_EXIT; + + lld = ll_d2d(dchild); + if (lld == NULL || lld->lld_rwsem_pid == 0) + RETURN_EXIT; + + LASSERT(lld->lld_rwsem_inode == dir); + lld->lld_rwsem_pid = 0; + lli = ll_i2info(dir); + up_read(&lli->lli_sa_rwsem); + lld->lld_rwsem_inode = NULL; + iput(dir); + EXIT; +} + /* methods */ int ll_unlock(__u32 mode, struct lustre_handle *lockh) @@ -169,7 +219,7 @@ static int fid_set_inode(struct inode *inode, void *opaque) } struct inode *ll_iget(struct super_block *sb, ino_t hash, - struct lustre_md *md) + struct lustre_md *md) { struct inode *inode; ENTRY; @@ -489,7 +539,9 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) return last_discon; } + lock_dentry(de); de->d_flags |= DCACHE_LUSTRE_INVALID; + unlock_dentry(de); ll_d_add(de, inode); spin_unlock(&dcache_lock); @@ -539,17 +591,13 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset, struct ll_dentry_data *lld = ll_d2d(*de); /* just make sure the ll_dentry_data is ready */ - if (unlikely(lld == NULL)) { - ll_set_dd(*de); - lld = ll_d2d(*de); - if (likely(lld != NULL)) - lld->lld_sa_generation = 0; - } + if (unlikely(lld == NULL)) + ll_dops_init(*de, 1); } /* we have lookup look - unhide dentry */ if (bits & MDS_INODELOCK_LOOKUP) { lock_dentry(*de); - (*de)->d_flags &= ~(DCACHE_LUSTRE_INVALID); + (*de)->d_flags &= ~DCACHE_LUSTRE_INVALID; unlock_dentry(*de); } } else { @@ -593,8 +641,9 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, int rc, first = 0; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%lu/%u(%p),intent=%s\n", - dentry->d_name.len, dentry->d_name.name, parent->i_ino, + CDEBUG(D_VFSTRACE, "VFS lookup: name=(%p)%.*s, dir=%lu/%u(%p), " + "intent=%s\n", + dentry, dentry->d_name.len, dentry->d_name.name, parent->i_ino, parent->i_generation, parent, LL_IT2STR(it)); if (d_mountpoint(dentry)) @@ -611,7 +660,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, RETURN(ERR_PTR(rc)); } - if (it->it_op == IT_GETATTR) { + if (it->it_op == IT_GETATTR && !(lookup_flags & LOOKUP_CREATE)) { first = ll_statahead_enter(parent, &dentry, 1); if (first >= 0) { ll_statahead_exit(parent, dentry, first); @@ -669,10 +718,30 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, struct dentry *de; ENTRY; - if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST)) + if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST)) { + int sa = -1; + + if (nd->flags & LOOKUP_CREATE) { + sa = ll_sa_rwsem_setup(parent, dentry); + if (unlikely(sa)) + RETURN(ERR_PTR(sa)); + } + de = ll_lookup_it(parent, dentry, &nd->intent, nd->flags); - else + + if (sa == 0) { + if (IS_ERR(de) || + (de == NULL && dentry->d_inode != NULL)) { + ll_sa_rwsem_cleanup(parent, dentry); + } else if (de != NULL) { + LASSERT(de != dentry); + LASSERT(de->d_inode != NULL); + ll_sa_rwsem_cleanup(parent, dentry); + } + } + } else { de = ll_lookup_it(parent, dentry, NULL, 0); + } RETURN(de); } @@ -714,7 +783,9 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, ENTRY; if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) { - struct lookup_intent *it; + struct lookup_intent *it = NULL; + int sa = -1; + struct dentry *save = dentry; #if defined(HAVE_FILE_IN_STRUCT_INTENT) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17)) /* Did we came here from failed revalidate just to propagate @@ -723,6 +794,11 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, if (IS_ERR(nd->intent.open.file)) RETURN((struct dentry *)nd->intent.open.file); #endif + if (nd->flags & LOOKUP_CREATE) { + sa = ll_sa_rwsem_setup(parent, save); + if (unlikely(sa)) + RETURN(ERR_PTR(sa)); + } if (ll_d2d(dentry) && ll_d2d(dentry)->lld_it) { it = ll_d2d(dentry)->lld_it; @@ -732,11 +808,11 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, /* We are sure this is new dentry, so we need to create our private data and set the dentry ops */ ll_dops_init(dentry, 1); - RETURN(NULL); + GOTO(out_sa, de = NULL); } it = ll_convert_intent(&nd->intent.open, nd->flags); if (IS_ERR(it)) - RETURN((struct dentry *)it); + GOTO(out_sa, de = (struct dentry *)it); } de = ll_lookup_it(parent, dentry, it, nd->flags); @@ -793,6 +869,18 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, ll_intent_release(it); OBD_FREE(it, sizeof(*it)); } + +out_sa: + if (sa == 0) { + if (IS_ERR(de) || + (de == NULL && save->d_inode != NULL)) { + ll_sa_rwsem_cleanup(parent, save); + } else if (de != NULL) { + LASSERT(de != save); + LASSERT(de->d_inode != NULL); + ll_sa_rwsem_cleanup(parent, save); + } + } } else { de = ll_lookup_it(parent, dentry, NULL, 0); } @@ -865,15 +953,14 @@ static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode, rc = it_open_error(DISP_OPEN_CREATE, it); if (rc) - RETURN(rc); + GOTO(out, rc); mdc_store_inode_generation(request, DLM_INTENT_REC_OFF, DLM_REPLY_REC_OFF); inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, NULL, 0, mode, 0, it); - if (IS_ERR(inode)) { - RETURN(PTR_ERR(inode)); - } + if (IS_ERR(inode)) + GOTO(out, rc = PTR_ERR(inode)); d_instantiate(dentry, inode); /* Negative dentry may be unhashed if parent does not have UPDATE lock, @@ -883,7 +970,10 @@ static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode, if (d_unhashed(dentry)) d_rehash_cond(dentry, 0); spin_unlock(&dcache_lock); - RETURN(0); + EXIT; +out: + ll_sa_rwsem_cleanup(dir, dentry); + return rc; } static void ll_update_times(struct ptlrpc_request *request, int offset, @@ -940,11 +1030,12 @@ static int ll_new_node(struct inode *dir, struct qstr *name, d_drop(dchild); d_instantiate(dchild, inode); - EXIT; } + EXIT; err_exit: ptlrpc_req_finished(request); - + if (dchild) + ll_sa_rwsem_cleanup(dir, dchild); return err; } @@ -952,7 +1043,7 @@ err_exit: static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode, unsigned rdev, struct dentry *dchild) { - int err; + int err, cleanup = 1; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%lu/%u(%p) mode %o dev %x\n", @@ -970,6 +1061,7 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode, case S_IFIFO: case S_IFSOCK: err = ll_new_node(dir, name, NULL, mode, rdev, dchild); + cleanup = 0; break; case S_IFDIR: err = -EPERM; @@ -977,6 +1069,9 @@ static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode, default: err = -EINVAL; } + + if (cleanup && dchild) + ll_sa_rwsem_cleanup(dir, dchild); RETURN(err); } @@ -994,6 +1089,7 @@ static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, stru /* Was there an error? Propagate it! */ if (it->d.lustre.it_status) { rc = it->d.lustre.it_status; + ll_sa_rwsem_cleanup(dir, dentry); goto out; } @@ -1062,15 +1158,17 @@ static int ll_link_generic(struct inode *src, struct inode *dir, if (err) GOTO(out, err); - if (dchild) { + if (dchild) d_drop(dchild); - } ll_update_times(request, REPLY_REC_OFF, dir); EXIT; out: ptlrpc_req_finished(request); - RETURN(err); + if (dchild) + ll_sa_rwsem_cleanup(dir, dchild); + + return err; } static int ll_mkdir_generic(struct inode *dir, struct qstr *name, int mode, @@ -1095,7 +1193,7 @@ static void ll_get_child_fid(struct inode * dir, struct qstr *name, struct ll_fid *fid) { struct dentry *parent, *child; - + parent = list_entry(dir->i_dentry.next, struct dentry, d_alias); child = d_lookup(parent, name); if (child) { diff --git lustre/llite/rw.c lustre/llite/rw.c index acd9264..8c66167 100644 --- lustre/llite/rw.c +++ lustre/llite/rw.c @@ -219,10 +219,8 @@ void ll_truncate(struct inode *inode) i_size_read(inode)); ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1); - if (lli->lli_size_sem_owner != current) { - EXIT; - return; - } + if (lli->lli_size_sem_owner != current) + RETURN_EXIT; if (!lli->lli_smd) { CDEBUG(D_INODE, "truncate on inode %lu with no objects\n", @@ -285,8 +283,7 @@ void ll_truncate(struct inode *inode) else ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1); - EXIT; - return; + RETURN_EXIT; out_unlock: ll_inode_size_unlock(inode, 0); @@ -1234,16 +1231,14 @@ static void __ll_put_llap(struct page *page) exp = ll_i2obdexp(inode); if (exp == NULL) { CERROR("page %p ind %lu gave null export\n", page, page->index); - EXIT; - return; + RETURN_EXIT; } llap = llap_from_page(page, LLAP_ORIGIN_REMOVEPAGE); if (IS_ERR(llap)) { CERROR("page %p ind %lu couldn't find llap: %ld\n", page, page->index, PTR_ERR(llap)); - EXIT; - return; + RETURN_EXIT; } //llap_write_complete(inode, llap); @@ -1282,10 +1277,8 @@ void ll_removepage(struct page *page) /* sync pages or failed read pages can leave pages in the page * cache that don't have our data associated with them anymore */ - if (page_private(page) == 0) { - EXIT; - return; - } + if (page_private(page) == 0) + RETURN_EXIT; ll_ra_accounting(llap, page->mapping); LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n"); diff --git lustre/llite/statahead.c lustre/llite/statahead.c index b415dc9..c2b9f5a 100644 --- lustre/llite/statahead.c +++ lustre/llite/statahead.c @@ -67,7 +67,7 @@ static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED; /** * Check whether first entry was stated already or not. - * No need to hold lli_lock, for: + * No need to hold lli_sa_lock, for: * (1) it is me that remove entry from the list * (2) the statahead thread only add new entry to the list */ @@ -197,19 +197,18 @@ static void ll_sai_put(struct ll_statahead_info *sai) if (atomic_dec_and_test(&sai->sai_refcount)) { struct ll_sai_entry *entry, *next; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (unlikely(atomic_read(&sai->sai_refcount) > 0)) { /* It is race case, the interpret callback just hold * a reference count */ - spin_unlock(&lli->lli_lock); - EXIT; - return; + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; } LASSERT(lli->lli_opendir_key == NULL); lli->lli_sai = NULL; lli->lli_opendir_pid = 0; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); LASSERT(sa_is_stopped(sai)); @@ -259,9 +258,9 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) entry->se_index = index; entry->se_stat = SA_ENTRY_UNSTATED; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); list_add_tail(&entry->se_list, &sai->sai_entries_sent); - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); RETURN(entry); } @@ -270,30 +269,31 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) * delete it from sai_entries_stated head when fini, it need not * to process entry's member. */ -static void ll_sai_entry_fini(struct ll_statahead_info *sai) +static int ll_sai_entry_fini(struct ll_statahead_info *sai) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); struct ll_sai_entry *entry; + int rc = 0; ENTRY; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); sai->sai_index_next++; if (likely(!list_empty(&sai->sai_entries_stated))) { entry = list_entry(sai->sai_entries_stated.next, struct ll_sai_entry, se_list); if (entry->se_index < sai->sai_index_next) { list_del(&entry->se_list); + rc = entry->se_stat; OBD_FREE_PTR(entry); } - } else - LASSERT(sa_is_stopped(sai)); - spin_unlock(&lli->lli_lock); + } + spin_unlock(&lli->lli_sa_lock); - EXIT; + RETURN(rc); } /** - * inside lli_lock. + * inside lli_sa_lock. * \retval NULL : can not find the entry in sai_entries_sent with the index * \retval entry: find the entry in sai_entries_sent with the index */ @@ -311,15 +311,16 @@ ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat, entry->se_req = ptlrpc_request_addref(req); entry->se_minfo = minfo; RETURN(entry); - } else if (entry->se_index > index) + } else if (entry->se_index > index) { RETURN(NULL); + } } } RETURN(NULL); } /** - * inside lli_lock. + * inside lli_sa_lock. * Move entry to sai_entries_received and * insert it into sai_entries_received tail. */ @@ -344,12 +345,12 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry ll_sai_entry_cleanup(entry, 0); - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (!list_empty(&entry->se_list)) list_del_init(&entry->se_list); if (unlikely(entry->se_index < sai->sai_index_next)) { - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); OBD_FREE_PTR(entry); RETURN(0); } @@ -357,7 +358,7 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { if (se->se_index < entry->se_index) { list_add(&entry->se_list, &se->se_list); - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); RETURN(1); } } @@ -366,7 +367,7 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry * I am the first entry. */ list_add(&entry->se_list, &sai->sai_entries_stated); - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); RETURN(1); } @@ -384,12 +385,12 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) int rc = 0; ENTRY; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); LASSERT(!sa_received_empty(sai)); entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry, se_list); list_del_init(&entry->se_list); - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); if (unlikely(entry->se_index < sai->sai_index_next)) { CWARN("Found stale entry: [index %u] [next %u]\n", @@ -464,6 +465,10 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) EXIT; out: + /* The "ll_sai_entry_to_stated()" will drop related ldlm ibits lock + * reference count with ll_intent_drop_lock() called in spite of the + * above operations failed or not. Do not worry about calling + * "ll_intent_drop_lock()" more than once. */ if (likely(ll_sai_entry_to_stated(sai, entry))) cfs_waitq_signal(&sai->sai_waitq); return rc; @@ -485,10 +490,11 @@ static int ll_statahead_interpret(struct obd_export *exp, CDEBUG(D_READA, "interpret statahead %.*s rc %d\n", dentry->d_name.len, dentry->d_name.name, rc); - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); + /* stale entry */ if (unlikely(lli->lli_sai == NULL || lli->lli_sai->sai_generation != minfo->mi_generation)) { - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); ll_intent_release(it); dput(dentry); iput(dir); @@ -498,19 +504,19 @@ static int ll_statahead_interpret(struct obd_export *exp, sai = ll_sai_get(lli->lli_sai); entry = ll_sai_entry_set(sai, (unsigned int)(long)minfo->mi_cbdata, - rc ? SA_ENTRY_UNSTATED : - SA_ENTRY_STATED, req, minfo); + rc < 0 ? rc : SA_ENTRY_STATED, req, + minfo); LASSERT(entry != NULL); if (likely(sa_is_running(sai))) { ll_sai_entry_to_received(sai, entry); sai->sai_replied++; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); } else { if (!list_empty(&entry->se_list)) list_del_init(&entry->se_list); sai->sai_replied++; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); ll_sai_entry_cleanup(entry, 1); } ll_sai_put(sai); @@ -582,8 +588,13 @@ static int do_sa_lookup(struct inode *dir, struct dentry *dentry) rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL, dentry->d_name.name, dentry->d_name.len, 0, NULL); - if (rc == 0) + if (rc == 0) { + CDEBUG(D_READA, "statahead lookup: name=(%p)%.*s, " + "dir=%lu/%u(%p)\n", + dentry, dentry->d_name.len, dentry->d_name.name, + dir->i_ino, dir->i_generation, dir); rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo); + } if (rc) sa_args_fini(minfo, einfo); @@ -607,13 +618,13 @@ static int do_sa_revalidate(struct inode *dir, struct dentry *dentry) int rc; ENTRY; - if (inode == NULL) + if (unlikely(inode == NULL)) RETURN(1); if (d_mountpoint(dentry)) RETURN(1); - if (dentry == dentry->d_sb->s_root) + if (unlikely(dentry == dentry->d_sb->s_root)) RETURN(1); ll_inode2fid(&fid, inode); @@ -631,8 +642,11 @@ static int do_sa_revalidate(struct inode *dir, struct dentry *dentry) rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, inode, dentry->d_name.name, dentry->d_name.len, 0, NULL); - if (rc == 0) + if (rc == 0) { + CDEBUG(D_READA, "statahead revalidate: name=(%p)%.*s\n", + dentry, dentry->d_name.len, dentry->d_name.name); rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo); + } if (rc) sa_args_fini(minfo, einfo); @@ -692,7 +706,7 @@ out: if (rc) { CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n", se, se->se_index, se->se_stat, rc); - se->se_stat = rc; + se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED; if (ll_sai_entry_to_stated(sai, se)) cfs_waitq_signal(&sai->sai_waitq); } else { @@ -722,10 +736,10 @@ static int ll_statahead_thread(void *arg) cfs_daemonize(pname); } - sbi->ll_sa_total++; - spin_lock(&lli->lli_lock); + atomic_inc(&sbi->ll_sa_total); + spin_lock(&lli->lli_sa_lock); thread->t_flags = SVC_RUNNING; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); @@ -758,7 +772,7 @@ static int ll_statahead_thread(void *arg) } } - page = ll_get_dir_page(dir, index); + page = ll_get_dir_page(dir, index, 1); if (IS_ERR(page)) { rc = PTR_ERR(page); CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n", @@ -833,9 +847,9 @@ keep_de: EXIT; out: - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); thread->t_flags = SVC_STOPPED; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); @@ -848,17 +862,18 @@ out: /** * called in ll_file_release(). */ -void ll_stop_statahead(struct inode *inode, void *key) +void ll_stop_statahead(struct inode *dir, void *key) { - struct ll_inode_info *lli = ll_i2info(inode); + struct ll_inode_info *lli = ll_i2info(dir); + ENTRY; if (unlikely(key == NULL)) - return; + RETURN_EXIT; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { - spin_unlock(&lli->lli_lock); - return; + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; } lli->lli_opendir_key = NULL; @@ -869,7 +884,7 @@ void ll_stop_statahead(struct inode *inode, void *key) if (!sa_is_stopped(lli->lli_sai)) { thread->t_flags = SVC_STOPPING; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); CDEBUG(D_READA, "stopping statahead thread, pid %d\n", @@ -878,7 +893,7 @@ void ll_stop_statahead(struct inode *inode, void *key) sa_is_stopped(lli->lli_sai), &lwi); } else { - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); } /* @@ -889,8 +904,9 @@ void ll_stop_statahead(struct inode *inode, void *key) ll_sai_put(lli->lli_sai); } else { lli->lli_opendir_pid = 0; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); } + EXIT; } enum { @@ -929,7 +945,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) break; } - page = ll_get_dir_page(dir, index); + page = ll_get_dir_page(dir, index, 1); if (IS_ERR(page)) { rc = PTR_ERR(page); CERROR("error reading dir %lu/%u page %lu: rc %d\n", @@ -997,7 +1013,7 @@ static int trigger_statahead(struct inode *dir, struct dentry **dentryp) { struct ll_inode_info *lli = ll_i2info(dir); struct l_wait_info lwi = { 0 }; - struct ll_statahead_info *sai; + struct ll_statahead_info *sai = NULL; struct dentry *parent; int rc; ENTRY; @@ -1017,7 +1033,6 @@ static int trigger_statahead(struct inode *dir, struct dentry **dentryp) if (unlikely(sai->sai_inode == NULL)) { CWARN("Do not start stat ahead on dying inode %lu/%u.\n", dir->i_ino, dir->i_generation); - OBD_FREE_PTR(sai); GOTO(out, rc = -ESTALE); } @@ -1031,8 +1046,7 @@ static int trigger_statahead(struct inode *dir, struct dentry **dentryp) PFID(ll_inode_lu_fid(parent->d_inode))); dput(parent); iput(sai->sai_inode); - OBD_FREE_PTR(sai); - RETURN(-EAGAIN); + GOTO(out, rc = -EAGAIN); } lli->lli_sai = sai; @@ -1055,10 +1069,12 @@ static int trigger_statahead(struct inode *dir, struct dentry **dentryp) RETURN(-EEXIST); out: - spin_lock(&lli->lli_lock); + if (sai) + OBD_FREE_PTR(sai); + spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); return rc; } @@ -1075,25 +1091,44 @@ out: int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) { struct ll_inode_info *lli = ll_i2info(dir); + struct ll_dentry_data *ldd; struct ll_statahead_info *sai; - struct ll_sb_info *sbi; - int rc = 0; + int rc; ENTRY; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (unlikely(lli->lli_opendir_pid != cfs_curproc_pid())) { - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); RETURN(-EAGAIN); } - if (likely(lli->lli_sai)) { - sai = ll_sai_get(lli->lli_sai); - spin_unlock(&lli->lli_lock); - } else { - spin_unlock(&lli->lli_lock); + if (unlikely(lli->lli_sai == NULL)) { + spin_unlock(&lli->lli_sa_lock); RETURN(trigger_statahead(dir, dentryp)); } + sai = ll_sai_get(lli->lli_sai); + spin_unlock(&lli->lli_sa_lock); + + /* + * When "ls -l" a dentry, the system trigger more than once "revalidate" + * or "lookup", for "getattr", for "getxattr", and maybe for others. + * Under patchless client mode, the operation intent is not accurate, + * it maybe misguide the statahead thread. For example: + * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe + * have the same operation intent -- "IT_GETATTR". + * In fact, one dentry should has only one chance to interact with the + * statahead thread, otherwise the statahead windows will be confused. + * The solution is as following: + * Assign "lld_sa_generation" with "sai_generation" when a dentry + * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR" + * will bypass interacting with statahead thread for checking: + * "lld_sa_generation == lli_sai->sai_generation" + */ + ldd = ll_d2d(*dentryp); + if (ldd && ldd->lld_sa_generation == sai->sai_generation) + GOTO(out, rc = -EAGAIN); + if (unlikely(sa_is_stopped(sai) && list_empty(&sai->sai_entries_stated))) GOTO(out, rc = -EBADFD); @@ -1119,19 +1154,18 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) } } - sbi = ll_i2sbi(dir); - if (ll_sai_entry_stated(sai)) { - sbi->ll_sa_cached++; - } else { + if (!ll_sai_entry_stated(sai)) { struct l_wait_info lwi =LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - sbi->ll_sa_blocked++; /* thread started already, avoid double-stat. */ rc = l_wait_event(sai->sai_waitq, ll_sai_entry_stated(sai) || sa_is_stopped(sai), &lwi); + if (unlikely(rc < 0)) + GOTO(out, rc); } + rc = 0; if (lookup) { struct dentry *result; @@ -1161,46 +1195,52 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) struct ll_statahead_info *sai; struct ll_sb_info *sbi; struct ll_dentry_data *ldd = ll_d2d(dentry); + int rc; ENTRY; - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (unlikely(lli->lli_opendir_pid != cfs_curproc_pid())) { - spin_unlock(&lli->lli_lock); - EXIT; - return; - } else { - sai = ll_sai_get(lli->lli_sai); - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; } - sbi = ll_i2sbi(dir); - if (result >= 1) { - sbi->ll_sa_hit++; + sai = ll_sai_get(lli->lli_sai); + spin_unlock(&lli->lli_sa_lock); + + sbi = ll_i2sbi(dir); + rc = ll_sai_entry_fini(sai); + /* rc == -ENOENT means such dentry was removed just between statahead + * readdir and pre-fetched, count it as hit. + * + * result == -ENOENT has two meanings: + * 1. such dentry was removed just between statahead pre-fetched and + * main process stat such dentry. + * 2. main process stat non-exist dentry. + * We can not distinguish such two cases, just count them as miss. */ + if (result >= 1 || unlikely(rc == -ENOENT)) { sai->sai_hit++; sai->sai_consecutive_miss = 0; sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); } else { - sbi->ll_sa_miss++; sai->sai_miss++; sai->sai_consecutive_miss++; if (sa_low_hit(sai) && sa_is_running(sai)) { - sbi->ll_sa_wrong++; + atomic_inc(&sbi->ll_sa_wrong); CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio " "too low: hit/miss %u/%u, sent/replied %u/%u, " "stopping statahead thread: pid %d\n", PFID(ll_inode_lu_fid(dir)), sai->sai_hit, sai->sai_miss, sai->sai_sent, sai->sai_replied, cfs_curproc_pid()); - spin_lock(&lli->lli_lock); + spin_lock(&lli->lli_sa_lock); if (!sa_is_stopped(sai)) sai->sai_thread.t_flags = SVC_STOPPING; - spin_unlock(&lli->lli_lock); + spin_unlock(&lli->lli_sa_lock); } } if (!sa_is_stopped(sai)) cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); - ll_sai_entry_fini(sai); if (likely(ldd != NULL)) ldd->lld_sa_generation = sai->sai_generation;