Single directory performance is a critical for HPC workloads. In a typical use case an application creates a separate output file for each node and task in a job. As nodes and tasks increase, hundreds of thousands of files may be created in a single directory within a short window of time. Today, both filename lookup and file system modifying operations (such as create and unlink) are protected with a single lock for an entire ldiskfs directory. PDO project will remove this bottleneck by introducing a parallel locking mechanism for entire ldiskfs directories. This work will enable multiple application threads to simultaneously lookup, create and unlink in parallel. This patch contains: - pdirops support for ldiskfs - integrate with osd-ldiskfs Index: linux-4.18.0-423.el8/fs/ext4/Makefile =================================================================== --- linux-4.18.0-423.el8.orig/fs/ext4/Makefile +++ linux-4.18.0-423.el8/fs/ext4/Makefile @@ -7,6 +7,7 @@ obj-$(CONFIG_EXT4_FS) += ext4.o ext4-y := balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \ extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \ + htree_lock.o \ indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \ mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \ super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o Index: linux-4.18.0-423.el8/fs/ext4/ext4.h =================================================================== --- linux-4.18.0-423.el8.orig/fs/ext4/ext4.h +++ linux-4.18.0-423.el8/fs/ext4/ext4.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -966,6 +967,9 @@ struct ext4_inode_info { __u32 i_dtime; ext4_fsblk_t i_file_acl; + /* following fields for parallel directory operations -bzzz */ + struct semaphore i_append_sem; + /* * i_block_group is the number of the block group which contains * this file's inode. Constant across the lifetime of the inode, @@ -2217,6 +2221,72 @@ struct dx_hash_info */ #define HASH_NB_ALWAYS 1 +/* assume name-hash is protected by upper layer */ +#define EXT4_HTREE_LOCK_HASH 0 + +enum ext4_pdo_lk_types { +#if EXT4_HTREE_LOCK_HASH + EXT4_LK_HASH, +#endif + EXT4_LK_DX, /* index block */ + EXT4_LK_DE, /* directory entry block */ + EXT4_LK_SPIN, /* spinlock */ + EXT4_LK_MAX, +}; + +/* read-only bit */ +#define EXT4_LB_RO(b) (1 << (b)) +/* read + write, high bits for writer */ +#define EXT4_LB_RW(b) ((1 << (b)) | (1 << (EXT4_LK_MAX + (b)))) + +enum ext4_pdo_lock_bits { + /* DX lock bits */ + EXT4_LB_DX_RO = EXT4_LB_RO(EXT4_LK_DX), + EXT4_LB_DX = EXT4_LB_RW(EXT4_LK_DX), + /* DE lock bits */ + EXT4_LB_DE_RO = EXT4_LB_RO(EXT4_LK_DE), + EXT4_LB_DE = EXT4_LB_RW(EXT4_LK_DE), + /* DX spinlock bits */ + EXT4_LB_SPIN_RO = EXT4_LB_RO(EXT4_LK_SPIN), + EXT4_LB_SPIN = EXT4_LB_RW(EXT4_LK_SPIN), + /* accurate searching */ + EXT4_LB_EXACT = EXT4_LB_RO(EXT4_LK_MAX << 1), +}; + +enum ext4_pdo_lock_opc { + /* external */ + EXT4_HLOCK_READDIR = (EXT4_LB_DE_RO | EXT4_LB_DX_RO), + EXT4_HLOCK_LOOKUP = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO | + EXT4_LB_EXACT), + EXT4_HLOCK_DEL = (EXT4_LB_DE | EXT4_LB_SPIN_RO | + EXT4_LB_EXACT), + EXT4_HLOCK_ADD = (EXT4_LB_DE | EXT4_LB_SPIN_RO), + + /* internal */ + EXT4_HLOCK_LOOKUP_SAFE = (EXT4_LB_DE_RO | EXT4_LB_DX_RO | + EXT4_LB_EXACT), + EXT4_HLOCK_DEL_SAFE = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT), + EXT4_HLOCK_SPLIT = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN), +}; + +extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits); +#define ext4_htree_lock_head_free(lhead) htree_lock_head_free(lhead) + +extern struct htree_lock *ext4_htree_lock_alloc(void); +#define ext4_htree_lock_free(lck) htree_lock_free(lck) + +extern void ext4_htree_lock(struct htree_lock *lck, + struct htree_lock_head *lhead, + struct inode *dir, unsigned flags); +#define ext4_htree_unlock(lck) htree_unlock(lck) + +extern struct buffer_head *__ext4_find_entry(struct inode *dir, + const struct qstr *d_name, + struct ext4_dir_entry_2 **res_dir, + int *inlined, struct htree_lock *lck); +extern int __ext4_add_entry(handle_t *handle, struct dentry *dentry, + struct inode *inode, struct htree_lock *lck); + struct ext4_filename { const struct qstr *usr_fname; struct fscrypt_str disk_name; @@ -2519,12 +2589,20 @@ void ext4_insert_dentry(struct inode *in struct ext4_filename *fname, void *data); static inline void ext4_update_dx_flag(struct inode *inode) { + /* Disable it for ldiskfs, because going from a DX directory to + * a non-DX directory while it is in use will completely break + * the htree-locking. + * If we really want to support this operation in the future, + * we need to exclusively lock the directory at here which will + * increase complexity of code */ +#if 0 if (!ext4_has_feature_dir_index(inode->i_sb) && ext4_test_inode_flag(inode, EXT4_INODE_INDEX)) { /* ext4_iget() should have caught this... */ WARN_ON_ONCE(ext4_has_feature_metadata_csum(inode->i_sb)); ext4_clear_inode_flag(inode, EXT4_INODE_INDEX); } +#endif } static const unsigned char ext4_filetype_table[] = { DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK Index: linux-4.18.0-423.el8/fs/ext4/namei.c =================================================================== --- linux-4.18.0-423.el8.orig/fs/ext4/namei.c +++ linux-4.18.0-423.el8/fs/ext4/namei.c @@ -55,6 +55,7 @@ struct buffer_head *ext4_append(handle_t { struct ext4_map_blocks map; struct buffer_head *bh; + struct ext4_inode_info *ei = EXT4_I(inode); int err; if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb && @@ -62,6 +63,10 @@ struct buffer_head *ext4_append(handle_t EXT4_SB(inode->i_sb)->s_max_dir_size_kb))) return ERR_PTR(-ENOSPC); + /* with parallel dir operations all appends + * have to be serialized -bzzz */ + down(&ei->i_append_sem); + *block = inode->i_size >> inode->i_sb->s_blocksize_bits; map.m_lblk = *block; map.m_len = 1; @@ -72,18 +77,24 @@ struct buffer_head *ext4_append(handle_t * directory. */ err = ext4_map_blocks(NULL, inode, &map, 0); - if (err < 0) + if (err < 0) { + up(&ei->i_append_sem); return ERR_PTR(err); + } if (err) { + up(&ei->i_append_sem); EXT4_ERROR_INODE(inode, "Logical block already allocated"); return ERR_PTR(-EFSCORRUPTED); } bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE); - if (IS_ERR(bh)) + if (IS_ERR(bh)) { + up(&ei->i_append_sem); return bh; + } inode->i_size += inode->i_sb->s_blocksize; EXT4_I(inode)->i_disksize = inode->i_size; + up(&ei->i_append_sem); BUFFER_TRACE(bh, "get_write_access"); err = ext4_journal_get_write_access(handle, bh); if (err) { @@ -288,7 +299,8 @@ static unsigned dx_node_limit(struct ino static struct dx_frame *dx_probe(struct ext4_filename *fname, struct inode *dir, struct dx_hash_info *hinfo, - struct dx_frame *frame); + struct dx_frame *frame, + struct htree_lock *lck); static void dx_release(struct dx_frame *frames); static int dx_make_map(struct inode *dir, struct buffer_head *bh, struct dx_hash_info *hinfo, @@ -302,12 +314,13 @@ static void dx_insert_block(struct dx_fr static int ext4_htree_next_block(struct inode *dir, __u32 hash, struct dx_frame *frame, struct dx_frame *frames, - __u32 *start_hash); + __u32 *start_hash, struct htree_lock *lck); static struct buffer_head * ext4_dx_find_entry(struct inode *dir, struct ext4_filename *fname, - struct ext4_dir_entry_2 **res_dir); + struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck); static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname, - struct inode *dir, struct inode *inode); + struct inode *dir, struct inode *inode, + struct htree_lock *lck); /* checksumming functions */ void initialize_dirent_tail(struct ext4_dir_entry_tail *t, @@ -771,6 +784,227 @@ struct stats dx_show_entries(struct dx_h } #endif /* DX_DEBUG */ +/* private data for htree_lock */ +struct ext4_dir_lock_data { + unsigned ld_flags; /* bits-map for lock types */ + unsigned ld_count; /* # entries of the last DX block */ + struct dx_entry ld_at_entry; /* copy of leaf dx_entry */ + struct dx_entry *ld_at; /* position of leaf dx_entry */ +}; + +#define ext4_htree_lock_data(l) ((struct ext4_dir_lock_data *)(l)->lk_private) +#define ext4_find_entry(dir, name, dirent, inline) \ + __ext4_find_entry(dir, name, dirent, inline, NULL) +#define ext4_add_entry(handle, dentry, inode) \ + __ext4_add_entry(handle, dentry, inode, NULL) + +/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */ +#define EXT4_HTREE_NODE_CHANGED (0xcafeULL << 32) + +static void ext4_htree_event_cb(void *target, void *event) +{ + u64 *block = (u64 *)target; + + if (*block == dx_get_block((struct dx_entry *)event)) + *block = EXT4_HTREE_NODE_CHANGED; +} + +struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits) +{ + struct htree_lock_head *lhead; + + lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0); + if (lhead != NULL) { + htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR, + ext4_htree_event_cb); + } + return lhead; +} +EXPORT_SYMBOL(ext4_htree_lock_head_alloc); + +struct htree_lock *ext4_htree_lock_alloc(void) +{ + return htree_lock_alloc(EXT4_LK_MAX, + sizeof(struct ext4_dir_lock_data)); +} +EXPORT_SYMBOL(ext4_htree_lock_alloc); + +static htree_lock_mode_t ext4_htree_mode(unsigned flags) +{ + switch (flags) { + default: /* 0 or unknown flags require EX lock */ + return HTREE_LOCK_EX; + case EXT4_HLOCK_READDIR: + return HTREE_LOCK_PR; + case EXT4_HLOCK_LOOKUP: + return HTREE_LOCK_CR; + case EXT4_HLOCK_DEL: + case EXT4_HLOCK_ADD: + return HTREE_LOCK_CW; + } +} + +/* return PR for read-only operations, otherwise return EX */ +static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags) +{ + int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE; + + /* 0 requires EX lock */ + return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR; +} + +static int ext4_htree_safe_locked(struct htree_lock *lck) +{ + int writer; + + if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX) + return 1; + + writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) == + EXT4_LB_DE; + if (writer) /* all readers & writers are excluded? */ + return lck->lk_mode == HTREE_LOCK_EX; + + /* all writers are excluded? */ + return lck->lk_mode == HTREE_LOCK_PR || + lck->lk_mode == HTREE_LOCK_PW || + lck->lk_mode == HTREE_LOCK_EX; +} + +/* relock htree_lock with EX mode if it's change operation, otherwise + * relock it with PR mode. It's noop if PDO is disabled. */ +static void ext4_htree_safe_relock(struct htree_lock *lck) +{ + if (!ext4_htree_safe_locked(lck)) { + unsigned flags = ext4_htree_lock_data(lck)->ld_flags; + + htree_change_lock(lck, ext4_htree_safe_mode(flags)); + } +} + +void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead, + struct inode *dir, unsigned flags) +{ + htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) : + ext4_htree_safe_mode(flags); + + ext4_htree_lock_data(lck)->ld_flags = flags; + htree_lock(lck, lhead, mode); + if (!is_dx(dir)) + ext4_htree_safe_relock(lck); /* make sure it's safe locked */ +} +EXPORT_SYMBOL(ext4_htree_lock); + +static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at, + unsigned lmask, int wait, void *ev) +{ + u32 key = (at == NULL) ? 0 : dx_get_block(at); + u32 mode; + + /* NOOP if htree is well protected or caller doesn't require the lock */ + if (ext4_htree_safe_locked(lck) || + !(ext4_htree_lock_data(lck)->ld_flags & lmask)) + return 1; + + mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ? + HTREE_LOCK_PW : HTREE_LOCK_PR; + while (1) { + if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev)) + return 1; + if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */ + return 0; + cpu_relax(); /* spin until granted */ + } +} + +static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask) +{ + return ext4_htree_safe_locked(lck) || + htree_node_is_granted(lck, ffz(~lmask)); +} + +static void ext4_htree_node_unlock(struct htree_lock *lck, + unsigned lmask, void *buf) +{ + /* NB: it's safe to call mutiple times or even it's not locked */ + if (!ext4_htree_safe_locked(lck) && + htree_node_is_granted(lck, ffz(~lmask))) + htree_node_unlock(lck, ffz(~lmask), buf); +} + +#define ext4_htree_dx_lock(lck, key) \ + ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL) +#define ext4_htree_dx_lock_try(lck, key) \ + ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL) +#define ext4_htree_dx_unlock(lck) \ + ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL) +#define ext4_htree_dx_locked(lck) \ + ext4_htree_node_locked(lck, EXT4_LB_DX) + +static void ext4_htree_dx_need_lock(struct htree_lock *lck) +{ + struct ext4_dir_lock_data *ld; + + if (ext4_htree_safe_locked(lck)) + return; + + ld = ext4_htree_lock_data(lck); + switch (ld->ld_flags) { + default: + return; + case EXT4_HLOCK_LOOKUP: + ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE; + return; + case EXT4_HLOCK_DEL: + ld->ld_flags = EXT4_HLOCK_DEL_SAFE; + return; + case EXT4_HLOCK_ADD: + ld->ld_flags = EXT4_HLOCK_SPLIT; + return; + } +} + +#define ext4_htree_de_lock(lck, key) \ + ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL) +#define ext4_htree_de_unlock(lck) \ + ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL) + +#define ext4_htree_spin_lock(lck, key, event) \ + ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event) +#define ext4_htree_spin_unlock(lck) \ + ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL) +#define ext4_htree_spin_unlock_listen(lck, p) \ + ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p) + +static void ext4_htree_spin_stop_listen(struct htree_lock *lck) +{ + if (!ext4_htree_safe_locked(lck) && + htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN))) + htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN)); +} + +enum { + DX_HASH_COL_IGNORE, /* ignore collision while probing frames */ + DX_HASH_COL_YES, /* there is collision and it does matter */ + DX_HASH_COL_NO, /* there is no collision */ +}; + +static int dx_probe_hash_collision(struct htree_lock *lck, + struct dx_entry *entries, + struct dx_entry *at, u32 hash) +{ + if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) { + return DX_HASH_COL_IGNORE; /* don't care about collision */ + + } else if (at == entries + dx_get_count(entries) - 1) { + return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */ + + } else { /* hash collision? */ + return ((dx_get_hash(at + 1) & ~1) == hash) ? + DX_HASH_COL_YES : DX_HASH_COL_NO; + } +} + /* * Probe for a directory leaf block to search. * @@ -782,10 +1016,11 @@ struct stats dx_show_entries(struct dx_h */ static struct dx_frame * dx_probe(struct ext4_filename *fname, struct inode *dir, - struct dx_hash_info *hinfo, struct dx_frame *frame_in) + struct dx_hash_info *hinfo, struct dx_frame *frame_in, + struct htree_lock *lck) { unsigned count, indirect, level, i; - struct dx_entry *at, *entries, *p, *q, *m; + struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL; struct dx_root_info *info; struct dx_frame *frame = frame_in; struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR); @@ -851,8 +1086,15 @@ dx_probe(struct ext4_filename *fname, st level = 0; blocks[0] = 0; while (1) { + if (indirect == 0) { /* the last index level */ + /* NB: ext4_htree_dx_lock() could be noop if + * DX-lock flag is not set for current operation */ + ext4_htree_dx_lock(lck, dx); + ext4_htree_spin_lock(lck, dx, NULL); + } count = dx_get_count(entries); - if (!count || count > dx_get_limit(entries)) { + if (count == 0 || count > dx_get_limit(entries)) { + ext4_htree_spin_unlock(lck); /* release spin */ ext4_warning_inode(dir, "dx entry: count %u beyond limit %u", count, dx_get_limit(entries)); @@ -901,8 +1143,70 @@ dx_probe(struct ext4_filename *fname, st goto fail; } } - if (++level > indirect) + + if (indirect == 0) { /* the last index level */ + struct ext4_dir_lock_data *ld; + u64 myblock; + + /* By default we only lock DE-block, however, we will + * also lock the last level DX-block if: + * a) there is hash collision + * we will set DX-lock flag (a few lines below) + * and redo to lock DX-block + * see detail in dx_probe_hash_collision() + * b) it's a retry from splitting + * we need to lock the last level DX-block so nobody + * else can split any leaf blocks under the same + * DX-block, see detail in ext4_dx_add_entry() + */ + if (ext4_htree_dx_locked(lck)) { + /* DX-block is locked, just lock DE-block + * and return */ + ext4_htree_spin_unlock(lck); + if (!ext4_htree_safe_locked(lck)) + ext4_htree_de_lock(lck, frame->at); + return frame; + } + /* it's pdirop and no DX lock */ + if (dx_probe_hash_collision(lck, entries, at, hash) == + DX_HASH_COL_YES) { + /* found hash collision, set DX-lock flag + * and retry to abtain DX-lock */ + ext4_htree_spin_unlock(lck); + ext4_htree_dx_need_lock(lck); + continue; + } + ld = ext4_htree_lock_data(lck); + /* because I don't lock DX, so @at can't be trusted + * after I release spinlock so I have to save it */ + ld->ld_at = at; + ld->ld_at_entry = *at; + ld->ld_count = dx_get_count(entries); + + frame->at = &ld->ld_at_entry; + myblock = dx_get_block(at); + + /* NB: ordering locking */ + ext4_htree_spin_unlock_listen(lck, &myblock); + /* other thread can split this DE-block because: + * a) I don't have lock for the DE-block yet + * b) I released spinlock on DX-block + * if it happened I can detect it by listening + * splitting event on this DE-block */ + ext4_htree_de_lock(lck, frame->at); + ext4_htree_spin_stop_listen(lck); + + if (myblock == EXT4_HTREE_NODE_CHANGED) { + /* someone split this DE-block before + * I locked it, I need to retry and lock + * valid DE-block */ + ext4_htree_de_unlock(lck); + continue; + } return frame; + } + dx = at; + indirect--; blocks[level] = block; frame++; frame->bh = ext4_read_dirblock(dir, block, INDEX); @@ -973,7 +1277,7 @@ static void dx_release(struct dx_frame * static int ext4_htree_next_block(struct inode *dir, __u32 hash, struct dx_frame *frame, struct dx_frame *frames, - __u32 *start_hash) + __u32 *start_hash, struct htree_lock *lck) { struct dx_frame *p; struct buffer_head *bh; @@ -988,12 +1292,22 @@ static int ext4_htree_next_block(struct * this loop, num_frames indicates the number of interior * nodes need to be read. */ + ext4_htree_de_unlock(lck); while (1) { - if (++(p->at) < p->entries + dx_get_count(p->entries)) - break; + if (num_frames > 0 || ext4_htree_dx_locked(lck)) { + /* num_frames > 0 : + * DX block + * ext4_htree_dx_locked: + * frame->at is reliable pointer returned by dx_probe, + * otherwise dx_probe already knew no collision */ + if (++(p->at) < p->entries + dx_get_count(p->entries)) + break; + } if (p == frames) return 0; num_frames++; + if (num_frames == 1) + ext4_htree_dx_unlock(lck); p--; } @@ -1016,6 +1330,13 @@ static int ext4_htree_next_block(struct * block so no check is necessary */ while (num_frames--) { + if (num_frames == 0) { + /* it's not always necessary, we just don't want to + * detect hash collision again */ + ext4_htree_dx_need_lock(lck); + ext4_htree_dx_lock(lck, p->at); + } + bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX); if (IS_ERR(bh)) return PTR_ERR(bh); @@ -1024,6 +1345,7 @@ static int ext4_htree_next_block(struct p->bh = bh; p->at = p->entries = ((struct dx_node *) bh->b_data)->entries; } + ext4_htree_de_lock(lck, p->at); return 1; } @@ -1171,10 +1493,10 @@ int ext4_htree_fill_tree(struct file *di } hinfo.hash = start_hash; hinfo.minor_hash = 0; - frame = dx_probe(NULL, dir, &hinfo, frames); + /* assume it's PR locked */ + frame = dx_probe(NULL, dir, &hinfo, frames, NULL); if (IS_ERR(frame)) return PTR_ERR(frame); - /* Add '.' and '..' from the htree header */ if (!start_hash && !start_minor_hash) { de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data; @@ -1214,7 +1536,7 @@ int ext4_htree_fill_tree(struct file *di count += ret; hashval = ~0; ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS, - frame, frames, &hashval); + frame, frames, &hashval, NULL); *next_hash = hashval; if (ret < 0) { err = ret; @@ -1413,10 +1735,10 @@ static int is_dx_internal_node(struct in * The returned buffer_head has ->b_count elevated. The caller is expected * to brelse() it when appropriate. */ -static struct buffer_head * ext4_find_entry (struct inode *dir, +struct buffer_head *__ext4_find_entry(struct inode *dir, const struct qstr *d_name, struct ext4_dir_entry_2 **res_dir, - int *inlined) + int *inlined, struct htree_lock *lck) { struct super_block *sb; struct buffer_head *bh_use[NAMEI_RA_SIZE]; @@ -1465,7 +1787,7 @@ static struct buffer_head * ext4_find_en goto restart; } if (is_dx(dir)) { - ret = ext4_dx_find_entry(dir, &fname, res_dir); + ret = ext4_dx_find_entry(dir, &fname, res_dir, lck); /* * On success, or if the error was file not found, * return. Otherwise, fall back to doing a search the @@ -1475,6 +1797,7 @@ static struct buffer_head * ext4_find_en goto cleanup_and_exit; dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, " "falling back\n")); + ext4_htree_safe_relock(lck); ret = NULL; } nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb); @@ -1566,10 +1889,12 @@ cleanup_and_exit: ext4_fname_free_filename(&fname); return ret; } +EXPORT_SYMBOL(__ext4_find_entry); static struct buffer_head * ext4_dx_find_entry(struct inode *dir, struct ext4_filename *fname, - struct ext4_dir_entry_2 **res_dir) + struct ext4_dir_entry_2 **res_dir, + struct htree_lock *lck) { struct super_block * sb = dir->i_sb; struct dx_frame frames[EXT4_HTREE_LEVEL], *frame; @@ -1580,7 +1905,7 @@ static struct buffer_head * ext4_dx_find #ifdef CONFIG_EXT4_FS_ENCRYPTION *res_dir = NULL; #endif - frame = dx_probe(fname, dir, NULL, frames); + frame = dx_probe(fname, dir, NULL, frames, lck); if (IS_ERR(frame)) return (struct buffer_head *) frame; do { @@ -1602,7 +1927,7 @@ static struct buffer_head * ext4_dx_find /* Check to see if we should continue to search */ retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame, - frames, NULL); + frames, NULL, lck); if (retval < 0) { ext4_warning_inode(dir, "error %d reading directory index block", @@ -1777,8 +2102,9 @@ static struct ext4_dir_entry_2* dx_pack_ * Returns pointer to de in block into which the new entry will be inserted. */ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir, - struct buffer_head **bh,struct dx_frame *frame, - struct dx_hash_info *hinfo) + struct buffer_head **bh, struct dx_frame *frames, + struct dx_frame *frame, struct dx_hash_info *hinfo, + struct htree_lock *lck) { unsigned blocksize = dir->i_sb->s_blocksize; unsigned continued; @@ -1854,8 +2180,14 @@ static struct ext4_dir_entry_2 *do_split hash2, split, count-split)); /* Fancy dance to stay within two buffers */ - de2 = dx_move_dirents(data1, data2, map + split, count - split, - blocksize); + if (hinfo->hash < hash2) { + de2 = dx_move_dirents(data1, data2, map + split, + count - split, blocksize); + } else { + /* make sure we will add entry to the same block which + * we have already locked */ + de2 = dx_move_dirents(data1, data2, map, split, blocksize); + } de = dx_pack_dirents(data1, blocksize); de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) - (char *) de, @@ -1876,12 +2208,21 @@ static struct ext4_dir_entry_2 *do_split dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2, blocksize, 1)); - /* Which block gets the new entry? */ - if (hinfo->hash >= hash2) { - swap(*bh, bh2); - de = de2; + ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL, + frame->at); /* notify block is being split */ + if (hinfo->hash < hash2) { + dx_insert_block(frame, hash2 + continued, newblock); + + } else { + /* switch block number */ + dx_insert_block(frame, hash2 + continued, + dx_get_block(frame->at)); + dx_set_block(frame->at, newblock); + (frame->at)++; } - dx_insert_block(frame, hash2 + continued, newblock); + ext4_htree_spin_unlock(lck); + ext4_htree_dx_unlock(lck); + err = ext4_handle_dirty_dirent_node(handle, dir, bh2); if (err) goto journal_error; @@ -2155,7 +2496,7 @@ static int make_indexed_dir(handle_t *ha if (retval) goto out_frames; - de = do_split(handle,dir, &bh2, frame, &fname->hinfo); + de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL); if (IS_ERR(de)) { retval = PTR_ERR(de); goto out_frames; @@ -2265,8 +2606,8 @@ out: * may not sleep between calling this and putting something into * the entry, as someone else might have used it while you slept. */ -static int ext4_add_entry(handle_t *handle, struct dentry *dentry, - struct inode *inode) +int __ext4_add_entry(handle_t *handle, struct dentry *dentry, + struct inode *inode, struct htree_lock *lck) { struct inode *dir = d_inode(dentry->d_parent); struct buffer_head *bh = NULL; @@ -2307,9 +2648,10 @@ static int ext4_add_entry(handle_t *hand if (dentry->d_name.len == 2 && memcmp(dentry->d_name.name, "..", 2) == 0) return ext4_update_dotdot(handle, dentry, inode); - retval = ext4_dx_add_entry(handle, &fname, dir, inode); + retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck); if (!retval || (retval != ERR_BAD_DX_DIR)) goto out; + ext4_htree_safe_relock(lck); /* Can we just ignore htree data? */ if (ext4_has_metadata_csum(sb)) { EXT4_ERROR_INODE(dir, @@ -2372,12 +2714,14 @@ out: ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY); return retval; } +EXPORT_SYMBOL(__ext4_add_entry); /* * Returns 0 for success, or a negative error value */ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname, - struct inode *dir, struct inode *inode) + struct inode *dir, struct inode *inode, + struct htree_lock *lck) { struct dx_frame frames[EXT4_HTREE_LEVEL], *frame; struct dx_entry *entries, *at; @@ -2389,7 +2733,7 @@ static int ext4_dx_add_entry(handle_t *h again: restart = 0; - frame = dx_probe(fname, dir, NULL, frames); + frame = dx_probe(fname, dir, NULL, frames, lck); if (IS_ERR(frame)) return PTR_ERR(frame); entries = frame->entries; @@ -2424,6 +2768,11 @@ again: struct dx_node *node2; struct buffer_head *bh2; + if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */ + ext4_htree_safe_relock(lck); + restart = 1; + goto cleanup; + } while (frame > frames) { if (dx_get_count((frame - 1)->entries) < dx_get_limit((frame - 1)->entries)) { @@ -2525,8 +2874,32 @@ again: restart = 1; goto journal_error; } + } else if (!ext4_htree_dx_locked(lck)) { + struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck); + + /* not well protected, require DX lock */ + ext4_htree_dx_need_lock(lck); + at = frame > frames ? (frame - 1)->at : NULL; + + /* NB: no risk of deadlock because it's just a try. + * + * NB: we check ld_count for twice, the first time before + * having DX lock, the second time after holding DX lock. + * + * NB: We never free blocks for directory so far, which + * means value returned by dx_get_count() should equal to + * ld->ld_count if nobody split any DE-block under @at, + * and ld->ld_at still points to valid dx_entry. */ + if ((ld->ld_count != dx_get_count(entries)) || + !ext4_htree_dx_lock_try(lck, at) || + (ld->ld_count != dx_get_count(entries))) { + restart = 1; + goto cleanup; + } + /* OK, I've got DX lock and nothing changed */ + frame->at = ld->ld_at; } - de = do_split(handle, dir, &bh, frame, &fname->hinfo); + de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck); if (IS_ERR(de)) { err = PTR_ERR(de); goto cleanup; @@ -2537,6 +2910,8 @@ again: journal_error: ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */ cleanup: + ext4_htree_dx_unlock(lck); + ext4_htree_de_unlock(lck); brelse(bh); dx_release(frames); /* @restart is true means htree-path has been changed, we need to Index: linux-4.18.0-423.el8/fs/ext4/super.c =================================================================== --- linux-4.18.0-423.el8.orig/fs/ext4/super.c +++ linux-4.18.0-423.el8/fs/ext4/super.c @@ -1136,6 +1136,7 @@ static struct inode *ext4_alloc_inode(st inode_set_iversion(&ei->vfs_inode, 1); spin_lock_init(&ei->i_raw_lock); + sema_init(&ei->i_append_sem, 1); INIT_LIST_HEAD(&ei->i_prealloc_list); spin_lock_init(&ei->i_prealloc_lock); ext4_es_init_tree(&ei->i_es_tree);