diff --git a/libcfs/include/libcfs/Makefile.am b/libcfs/include/libcfs/Makefile.am index cd61d27..a72c3fd 100644 --- a/libcfs/include/libcfs/Makefile.am +++ b/libcfs/include/libcfs/Makefile.am @@ -11,4 +11,4 @@ EXTRA_DIST := curproc.h libcfs_private.h libcfs.h list.h lltrace.h \ libcfs_debug.h libcfsutil.h libcfs_ioctl.h \ libcfs_pack.h libcfs_unpack.h libcfs_string.h \ libcfs_kernelcomm.h libcfs_workitem.h lucache.h \ - libcfs_fail.h params_tree.h + libcfs_fail.h params_tree.h libcfs_heap.h diff --git a/libcfs/include/libcfs/libcfs.h b/libcfs/include/libcfs/libcfs.h index 4fc0bea..31d587c 100644 --- a/libcfs/include/libcfs/libcfs.h +++ b/libcfs/include/libcfs/libcfs.h @@ -310,6 +310,7 @@ void cfs_get_random_bytes(void *buf, int size); #include #include #include +#include #include #include diff --git a/libcfs/include/libcfs/libcfs_heap.h b/libcfs/include/libcfs/libcfs_heap.h new file mode 100644 index 0000000..d4017f8 --- /dev/null +++ b/libcfs/include/libcfs/libcfs_heap.h @@ -0,0 +1,125 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * libcfs/include/libcfs/heap.h + * + * Author: Eric Barton + * Liang Zhen + */ + +#ifndef __LIBCFS_HEAP_H__ +#define __LIBCFS_HEAP_H__ + +typedef struct { + unsigned int chn_index; +} cfs_binheap_node_t; + +#define CBH_SHIFT 9 +#define CBH_SIZE (1 << CBH_SHIFT) /* # ptrs per level */ +#define CBH_MASK (CBH_SIZE - 1) +#define CBH_NOB (CBH_SIZE * sizeof(cfs_binheap_node_t *)) + +#define CBH_POISON 0xdeadbeef + +enum { + CBH_FLAG_ATOMIC_GROW = 1, +}; + +struct cfs_binheap; + +typedef struct { + int (*hop_enter)(struct cfs_binheap *h, + cfs_binheap_node_t *e); + void (*hop_exit)(struct cfs_binheap *h, + cfs_binheap_node_t *e); + int (*hop_compare)(cfs_binheap_node_t *a, + cfs_binheap_node_t *b); +} cfs_binheap_ops_t; + +typedef struct cfs_binheap { + /** Triple indirect */ + cfs_binheap_node_t ****cbh_elements3; + /** double indirect */ + cfs_binheap_node_t ***cbh_elements2; + /** single indirect */ + cfs_binheap_node_t **cbh_elements1; + /** # elements referenced */ + unsigned int cbh_nelements; + /** high water mark */ + unsigned int cbh_hwm; + /** user flags */ + unsigned int cbh_flags; + /** operations table */ + cfs_binheap_ops_t *cbh_ops; + /** private data */ + void *cbh_private; +} cfs_binheap_t; + +void cfs_binheap_destroy(cfs_binheap_t *h); +cfs_binheap_t *cfs_binheap_create(cfs_binheap_ops_t *ops, unsigned int flags, + unsigned count, void *arg); +cfs_binheap_node_t *cfs_binheap_find(cfs_binheap_t *h, unsigned int idx); +int cfs_binheap_insert(cfs_binheap_t *h, cfs_binheap_node_t *e); +void cfs_binheap_remove(cfs_binheap_t *h, cfs_binheap_node_t *e); + +static inline int +cfs_binheap_size(cfs_binheap_t *h) +{ + return h->cbh_nelements; +} + +static inline int +cfs_binheap_is_empty(cfs_binheap_t *h) +{ + return h->cbh_nelements == 0; +} + +static inline cfs_binheap_node_t * +cfs_binheap_root(cfs_binheap_t *h) +{ + return cfs_binheap_find(h, 0); +} + +static inline cfs_binheap_node_t * +cfs_binheap_remove_root(cfs_binheap_t *h) +{ + cfs_binheap_node_t *e = cfs_binheap_find(h, 0); + + if (e != NULL) + cfs_binheap_remove(h, e); + return e; +} + +#endif /* __LIBCFS_HEAP_H__ */ diff --git a/libcfs/include/libcfs/libcfs_private.h b/libcfs/include/libcfs/libcfs_private.h index 9e33e26..ee2d734 100644 --- a/libcfs/include/libcfs/libcfs_private.h +++ b/libcfs/include/libcfs/libcfs_private.h @@ -265,6 +265,7 @@ do { \ # else # define LIBCFS_ALLOC(ptr, size) do { (ptr) = calloc(1,size); } while (0) # endif +# define LIBCFS_ALLOC_ATOMIC(ptr, size) LIBCFS_ALLOC(ptr, size) # define LIBCFS_FREE(a, b) do { free(a); } while (0) void libcfs_debug_dumplog(void); diff --git a/libcfs/libcfs/Makefile.in b/libcfs/libcfs/Makefile.in index 24bc9fd..59c843f 100644 --- a/libcfs/libcfs/Makefile.in +++ b/libcfs/libcfs/Makefile.in @@ -26,7 +26,7 @@ endif libcfs-all-objs := debug.o fail.o nidstrings.o lwt.o module.o tracefile.o watchdog.o \ libcfs_string.o hash.o kernel_user_comm.o prng.o workitem.o \ - upcall_cache.o + upcall_cache.o heap.o libcfs-objs := $(libcfs-linux-objs) $(libcfs-all-objs) diff --git a/libcfs/libcfs/autoMakefile.am b/libcfs/libcfs/autoMakefile.am index 1f2b97f..bd598bf 100644 --- a/libcfs/libcfs/autoMakefile.am +++ b/libcfs/libcfs/autoMakefile.am @@ -44,7 +44,7 @@ if LIBLUSTRE noinst_LIBRARIES= libcfs.a libcfs_a_SOURCES= posix/posix-debug.c user-prim.c user-lock.c user-tcpip.c \ prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \ - workitem.c fail.c + workitem.c fail.c heap.c libcfs_a_CPPFLAGS = $(LLCPPFLAGS) libcfs_a_CFLAGS = $(LLCFLAGS) endif @@ -70,7 +70,7 @@ nodist_libcfs_SOURCES := darwin/darwin-sync.c darwin/darwin-mem.c \ darwin/darwin-debug.c darwin/darwin-proc.c \ darwin/darwin-tracefile.c darwin/darwin-module.c \ posix/posix-debug.c module.c tracefile.c nidstrings.c watchdog.c \ - kernel_user_comm.c hash.c + kernel_user_comm.c hash.c heap.c libcfs_CFLAGS := $(EXTRA_KCFLAGS) libcfs_LDFLAGS := $(EXTRA_KLDFLAGS) diff --git a/libcfs/libcfs/heap.c b/libcfs/libcfs/heap.c new file mode 100644 index 0000000..dcc4ce2 --- /dev/null +++ b/libcfs/libcfs/heap.c @@ -0,0 +1,383 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * libcfs/libcfs/heap.c + * + * Author: Eric Barton + * Liang Zhen + */ + +#define DEBUG_SUBSYSTEM S_LNET + +#include + +#define CBH_ALLOC(ptr, h) \ +do { \ + if ((h)->cbh_flags & CBH_FLAG_ATOMIC_GROW) \ + LIBCFS_ALLOC_ATOMIC((ptr), CBH_NOB); \ + else \ + LIBCFS_ALLOC((ptr), CBH_NOB); \ +} while (0) + +#define CBH_FREE(ptr) LIBCFS_FREE(ptr, CBH_NOB) + +static int +cfs_binheap_grow(cfs_binheap_t *h) +{ + cfs_binheap_node_t ***frag1 = NULL; + cfs_binheap_node_t **frag2; + int hwm = h->cbh_hwm; + + /* need a whole new chunk of pointers */ + LASSERT ((h->cbh_hwm & CBH_MASK) == 0); + + if (hwm == 0) { + /* first use of single indirect */ + CBH_ALLOC(h->cbh_elements1, h); + if (h->cbh_elements1 == NULL) + return -ENOMEM; + + goto out; + } + + hwm -= CBH_SIZE; + if (hwm < CBH_SIZE * CBH_SIZE) { + /* not filled double indirect */ + CBH_ALLOC(frag2, h); + if (frag2 == NULL) + return -ENOMEM; + + if (hwm == 0) { + /* first use of double indirect */ + CBH_ALLOC(h->cbh_elements2, h); + if (h->cbh_elements2 == NULL) { + CBH_FREE(frag2); + return -ENOMEM; + } + } + + h->cbh_elements2[hwm >> CBH_SHIFT] = frag2; + goto out; + } + + hwm -= CBH_SIZE * CBH_SIZE; +#if (CBH_SHIFT * 3 < 32) + if (hwm >= CBH_SIZE * CBH_SIZE * CBH_SIZE) { + /* filled triple indirect */ + return -ENOMEM; + } +#endif + CBH_ALLOC(frag2, h); + if (frag2 == NULL) + return -ENOMEM; + + if (((hwm >> CBH_SHIFT) & CBH_MASK) == 0) { + /* first use of this 2nd level index */ + CBH_ALLOC(frag1, h); + if (frag1 == NULL) { + CBH_FREE(frag2); + return -ENOMEM; + } + } + + if (hwm == 0) { + /* first use of triple indirect */ + CBH_ALLOC(h->cbh_elements3, h); + if (h->cbh_elements3 == NULL) { + CBH_FREE(frag2); + CBH_FREE(frag1); + return -ENOMEM; + } + } + + if (frag1 != NULL) { + LASSERT (h->cbh_elements3[hwm >> (2 * CBH_SHIFT)] == NULL); + h->cbh_elements3[hwm >> (2 * CBH_SHIFT)] = frag1; + } else { + frag1 = h->cbh_elements3[hwm >> (2 * CBH_SHIFT)]; + LASSERT (frag1 != NULL); + } + + frag1[(hwm >> CBH_SHIFT) & CBH_MASK] = frag2; + + out: + h->cbh_hwm += CBH_SIZE; + return 0; +} + +cfs_binheap_t * +cfs_binheap_create(cfs_binheap_ops_t *ops, unsigned int flags, + unsigned count, void *arg) +{ + cfs_binheap_t *h; + + LIBCFS_ALLOC(h, sizeof(*h)); + if (h == NULL) + return NULL; + + h->cbh_ops = ops; + h->cbh_nelements = 0; + h->cbh_hwm = 0; + h->cbh_private = arg; + h->cbh_flags = flags & (~CBH_FLAG_ATOMIC_GROW); + + while (h->cbh_hwm < count) { /* preallocate */ + if (cfs_binheap_grow(h) != 0) { + cfs_binheap_destroy(h); + return NULL; + } + } + + h->cbh_flags |= flags & CBH_FLAG_ATOMIC_GROW; + + return h; +} +CFS_EXPORT_SYMBOL(cfs_binheap_create); + +void +cfs_binheap_destroy(cfs_binheap_t *h) +{ + int idx0; + int idx1; + int n = h->cbh_hwm; + + if (n > 0) { + CBH_FREE(h->cbh_elements1); + n-= CBH_SIZE; + } + + if (n > 0) { + for (idx0 = 0; idx0 < CBH_SIZE && n > 0; idx0++) { + CBH_FREE(h->cbh_elements2[idx0]); + n -= CBH_SIZE; + } + + CBH_FREE(h->cbh_elements2); + } + + if (n > 0) { + for (idx0 = 0; idx0 < CBH_SIZE && n > 0; idx0++) { + + for (idx1 = 0; idx1 < CBH_SIZE && n > 0; idx1++) { + CBH_FREE(h->cbh_elements3[idx0][idx1]); + n -= CBH_SIZE; + } + + CBH_FREE(h->cbh_elements3[idx0]); + } + + CBH_FREE(h->cbh_elements3); + } + + LIBCFS_FREE(h, sizeof(*h)); +} +CFS_EXPORT_SYMBOL(cfs_binheap_destroy); + +static cfs_binheap_node_t ** +cfs_binheap_pointer(cfs_binheap_t *h, unsigned int idx) +{ + if (idx < CBH_SIZE) + return &(h->cbh_elements1[idx]); + + idx -= CBH_SIZE; + if (idx < CBH_SIZE * CBH_SIZE) + return &(h->cbh_elements2[idx >> CBH_SHIFT][idx & CBH_MASK]); + + idx -= CBH_SIZE * CBH_SIZE; + return &(h->cbh_elements3[idx >> (2 * CBH_SHIFT)]\ + [(idx >> CBH_SHIFT) & CBH_MASK]\ + [idx & CBH_MASK]); +} + +cfs_binheap_node_t * +cfs_binheap_find(cfs_binheap_t *h, unsigned int idx) +{ + if (idx >= h->cbh_nelements) + return NULL; + + return *cfs_binheap_pointer(h, idx); +} +CFS_EXPORT_SYMBOL(cfs_binheap_find); + +static int +cfs_binheap_bubble(cfs_binheap_t *h, cfs_binheap_node_t *e) +{ + unsigned int cur_idx = e->chn_index; + cfs_binheap_node_t **cur_ptr; + unsigned int parent_idx; + cfs_binheap_node_t **parent_ptr; + int did_sth = 0; + + cur_ptr = cfs_binheap_pointer(h, cur_idx); + LASSERT(*cur_ptr == e); + + while (cur_idx > 0) { + parent_idx = (cur_idx - 1) >> 1; + + parent_ptr = cfs_binheap_pointer(h, parent_idx); + LASSERT((*parent_ptr)->chn_index == parent_idx); + + if (h->cbh_ops->hop_compare(*parent_ptr, e)) + break; + + (*parent_ptr)->chn_index = cur_idx; + *cur_ptr = *parent_ptr; + cur_ptr = parent_ptr; + cur_idx = parent_idx; + did_sth = 1; + } + + e->chn_index = cur_idx; + *cur_ptr = e; + + return did_sth; +} + +static int +cfs_binheap_sink(cfs_binheap_t *h, cfs_binheap_node_t *e) +{ + unsigned int n = h->cbh_nelements; + unsigned int child_idx; + cfs_binheap_node_t **child_ptr; + cfs_binheap_node_t *child; + unsigned int child2_idx; + cfs_binheap_node_t **child2_ptr; + cfs_binheap_node_t *child2; + unsigned int cur_idx; + cfs_binheap_node_t **cur_ptr; + int did_sth = 0; + + cur_idx = e->chn_index; + cur_ptr = cfs_binheap_pointer(h, cur_idx); + LASSERT(*cur_ptr == e); + + while (cur_idx < n) { + child_idx = (cur_idx << 1) + 1; + if (child_idx >= n) + break; + + child_ptr = cfs_binheap_pointer(h, child_idx); + child = *child_ptr; + + child2_idx = child_idx + 1; + if (child2_idx < n) { + child2_ptr = cfs_binheap_pointer(h, child2_idx); + child2 = *child2_ptr; + + if (h->cbh_ops->hop_compare(child2, child)) { + child_idx = child2_idx; + child_ptr = child2_ptr; + child = child2; + } + } + + LASSERT(child->chn_index == child_idx); + + if (h->cbh_ops->hop_compare(e, child)) + break; + + child->chn_index = cur_idx; + *cur_ptr = child; + cur_ptr = child_ptr; + cur_idx = child_idx; + did_sth = 1; + } + + e->chn_index = cur_idx; + *cur_ptr = e; + + return did_sth; +} + +int +cfs_binheap_insert(cfs_binheap_t *h, cfs_binheap_node_t *e) +{ + cfs_binheap_node_t **new_ptr; + unsigned int new_idx = h->cbh_nelements; + int rc; + + if (new_idx == h->cbh_hwm) { + rc = cfs_binheap_grow(h); + if (rc != 0) + return rc; + } + + if (h->cbh_ops->hop_enter) { + rc = h->cbh_ops->hop_enter(h, e); + if (rc != 0) + return rc; + } + + e->chn_index = new_idx; + new_ptr = cfs_binheap_pointer(h, new_idx); + h->cbh_nelements++; + *new_ptr = e; + + cfs_binheap_bubble(h, e); + + return 0; +} +CFS_EXPORT_SYMBOL(cfs_binheap_insert); + +void +cfs_binheap_remove(cfs_binheap_t *h, cfs_binheap_node_t *e) +{ + unsigned int n = h->cbh_nelements; + unsigned int cur_idx = e->chn_index; + cfs_binheap_node_t **cur_ptr; + cfs_binheap_node_t *last; + + LASSERT(cur_idx != CBH_POISON); + LASSERT(cur_idx < n); + + cur_ptr = cfs_binheap_pointer(h, cur_idx); + LASSERT(*cur_ptr == e); + + n--; + last = *cfs_binheap_pointer(h, n); + h->cbh_nelements = n; + if (last == e) + return; + + last->chn_index = cur_idx; + *cur_ptr = last; + if (!cfs_binheap_bubble(h, *cur_ptr)) + cfs_binheap_sink(h, *cur_ptr); + + e->chn_index = CBH_POISON; + if (h->cbh_ops->hop_exit) + h->cbh_ops->hop_exit(h, e); +} +CFS_EXPORT_SYMBOL(cfs_binheap_remove); diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index f8ad0c3..8cb651d 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -242,6 +242,8 @@ struct obd_export { * but couldn't because of active rpcs */ exp_abort_active_req:1; cfs_list_t exp_queued_rpc; /* RPC to be handled */ + nrs_crr_object_t exp_nrs_obj; + nrs_crr_object_t exp_nrs_obj_hp; /* also protected by exp_lock */ enum lustre_sec_part exp_sp_peer; struct sptlrpc_flavor exp_flvr; /* current */ diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index d9d52d0..86c5610 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -420,6 +420,155 @@ struct lu_context; struct lu_env; struct ldlm_lock; +struct ptlrpc_nrs_policy; + +/** + * NRS operations + */ +typedef struct ptlrpc_nrs_ops { + int (*op_nrs_init)(struct ptlrpc_nrs_policy *); + void (*op_nrs_fini)(struct ptlrpc_nrs_policy *); + void *(*op_nrs_target)(struct ptlrpc_nrs_policy *); + + int (*op_req_add)(struct ptlrpc_nrs_policy *, + struct ptlrpc_request *); + void (*op_req_del)(struct ptlrpc_nrs_policy *, + struct ptlrpc_request *); + struct ptlrpc_request *(*op_req_first)(struct ptlrpc_nrs_policy *); +} ptlrpc_nrs_ops_t; + +enum { + PTLRPC_NRS_FIFO = 1, + PTLRPC_NRS_CRR = 2, +}; + +#define PTLRPC_NRS_NAME_LEN 8 + +/* + * A few concepts for heap based NRS: + * - head : a list of polices, each service has two NRS head, + * one for regular request and another for HP request + * - policy : algorithm to sort request + * - target : each policy can have 1-N targets, i.e: + * each OST has a target, or all OSTs share one target + * - object : client (CRR) or object (ORR) on target + * - request: stub in a ptlrpc request, a object can have 1-N requests + */ + +/** + * NRS policy head: + * - have policy list + * - must have one and only one DEFAULT NRS policy + * - can have one ACTIVE NRS policy, default policy will be used if + * w/o active policy + */ +typedef struct ptlrpc_nrs_head { + cfs_list_t nh_policy_list; + long nh_req_count; +} ptlrpc_nrs_head_t; + +/** + * NRS policy descriptor + */ +typedef struct ptlrpc_nrs_policy { + cfs_list_t nrs_list; + unsigned nrs_active:1; + unsigned nrs_default:1; + unsigned nrs_type; + /** total number of requests */ + long nrs_req_count; + ptlrpc_nrs_ops_t *nrs_ops; + ptlrpc_nrs_head_t *nrs_head; + struct ptlrpc_service *nrs_svc; + void *nrs_private; + char nrs_name[PTLRPC_NRS_NAME_LEN]; +} ptlrpc_nrs_policy_t; + +typedef struct ptlrpc_nrs_target { +} ptlrpc_nrs_target_t; + +typedef struct ptlrpc_nrs_object { +} ptlrpc_nrs_object_t; + +typedef struct ptlrpc_nrs_request { + __u64 nr_key_major; + __u64 nr_key_minor; +} ptlrpc_nrs_request_t; + +/** + * private data structure for FIFO NRS + */ +typedef struct nrs_fifo_target { + ptlrpc_nrs_target_t ft_target; + cfs_list_t ft_list; + __u64 ft_sequence; +} nrs_fifo_target_t; + +typedef struct nrs_fifo_req { + ptlrpc_nrs_request_t fr_req; + cfs_list_t fr_list; +} nrs_fifo_req_t; + +/** + * private data structure for CRR binheap NRS + */ +typedef struct nrs_crr_target { + ptlrpc_nrs_target_t ct_target; + cfs_binheap_t *ct_binheap; + __u64 ct_round; + __u64 ct_sequence; +} nrs_crr_target_t; + +/** + * private data structure for ORR binheap NRS + */ +typedef struct nrs_orr_target { + ptlrpc_nrs_target_t ot_target; + cfs_binheap_t *ot_binheap; + __u64 ot_round; + __u64 ot_sequence; + unsigned ot_quantum; +} nrs_orr_target_t; + +/** + * target object for CRR binheap NRS which is embedded in client structure + */ +typedef struct nrs_crr_object { + ptlrpc_nrs_object_t co_object; + __u64 co_round; +} nrs_crr_object_t; + +/** + * target object for ORR binheap NRS which is embedded in object structure + */ +typedef struct nrs_orr_object { + ptlrpc_nrs_object_t oo_object; + __u64 oo_round; + /** reserved for object round-robin */ + __u64 oo_sequence; + /** reserved for object round-robin */ + unsigned oo_quantum; +} nrs_orr_object_t; + +/** + * embed it to ptlrpc request for CRR binheap NRS + */ +typedef struct nrs_crr_req { + ptlrpc_nrs_request_t cr_req; + cfs_binheap_node_t cr_node; +} nrs_crr_req_t; + +/** + * embed it to ptlrpc request for ORR binheap NRS + */ +typedef struct nrs_orr_req { + ptlrpc_nrs_request_t or_req; + cfs_binheap_node_t or_node; + /** more key */ + __u64 or_start; + /** more key */ + __u64 or_end; +} nrs_orr_req_t; /** * Basic request prioritization operations structure. @@ -472,6 +621,15 @@ struct ptlrpc_request { struct ptlrpc_hpreq_ops *rq_ops; /** history sequence # */ __u64 rq_history_seq; + /** NRS policy */ + ptlrpc_nrs_policy_t *rq_nrs_policy; + /** stub for NRS request */ + union { + ptlrpc_nrs_request_t pub; + nrs_fifo_req_t fifo; + nrs_crr_req_t crr; + nrs_orr_req_t orr; + } rq_nrs_u; /** the index of service's srv_at_array into which request is linked */ time_t rq_at_index; /** Result of request processing */ @@ -698,6 +856,54 @@ struct ptlrpc_request { struct req_capsule rq_pill; }; +static inline int +ptlrpc_nrs_policy_req_add(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + int rc; + + req->rq_nrs_policy = policy; + rc = policy->nrs_ops->op_req_add(policy, req); + if (rc == 0) { + policy->nrs_req_count++; + policy->nrs_head->nh_req_count++; + } else { + req->rq_nrs_policy = NULL; + } + return rc; +} + +static inline void +ptlrpc_nrs_policy_req_del(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + LASSERT(policy->nrs_req_count > 0); + LASSERT(policy->nrs_req_count <= policy->nrs_head->nh_req_count); + + policy->nrs_ops->op_req_del(policy, req); + policy->nrs_head->nh_req_count--; + policy->nrs_req_count--; + req->rq_nrs_policy = NULL; +} + +static inline struct ptlrpc_request * +ptlrpc_nrs_policy_req_first(ptlrpc_nrs_policy_t *policy) +{ + return policy->nrs_ops->op_req_first(policy); +} + +int ptlrpc_server_nrs_register(struct ptlrpc_service *svc, int type, + char *name, ptlrpc_nrs_ops_t *ops, int hp); +void ptlrpc_server_nrs_unregister(struct ptlrpc_service *svc, + int type, int hp); +int ptlrpc_server_nrs_state_set_nolock(struct ptlrpc_service *svc, + int type, int hp, int active); +int ptlrpc_server_nrs_state_set(struct ptlrpc_service *svc, + int type, int hp, int active); +int ptlrpc_server_nrs_state_get_nolock(struct ptlrpc_service *svc, + int type, int hp); +int ptlrpc_server_nrs_state_get(struct ptlrpc_service *svc, int type, int hp); + /** * Call completion handler for rpc if any, return it's status or original * rc if there was no handler defined for this request. @@ -1152,11 +1358,9 @@ struct ptlrpc_service { * sent to this portal */ cfs_spinlock_t srv_rq_lock __cfs_cacheline_aligned; - /** # reqs in either of the queues below */ - /** reqs waiting for service */ - cfs_list_t srv_request_queue; - /** high priority queue */ - cfs_list_t srv_request_hpq; + /* NRS head for regular requests */ + ptlrpc_nrs_head_t srv_req_nrs; + /** # incoming reqs */ int srv_n_queued_reqs; /** # reqs being served */ @@ -1165,6 +1369,8 @@ struct ptlrpc_service { int srv_n_active_hpreq; /** # hp requests handled */ int srv_hpreq_count; + /** NRS head for HP requests */ + ptlrpc_nrs_head_t srv_hpreq_nrs; /** AT stuff */ /** @{ */ diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index b42c41b..fe2b95d 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -369,6 +369,36 @@ ptlrpc_lprocfs_wr_threads_max(struct file *file, const char *buffer, return count; } +static int +ptlrpc_lprocfs_rd_nrs_crr(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct ptlrpc_service *svc = data; + int val; + + /* XXX */ + val = ptlrpc_server_nrs_state_get(svc, PTLRPC_NRS_CRR, 0); + return snprintf(page, count, "%d\n", val); +} + +static int +ptlrpc_lprocfs_wr_nrs_crr(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct ptlrpc_service *svc = data; + int val; + + lprocfs_write_helper(buffer, count, &val); + + /* XXX */ + cfs_spin_lock(&svc->srv_rq_lock); + ptlrpc_server_nrs_state_set_nolock(svc, PTLRPC_NRS_CRR, 0, !!val); + ptlrpc_server_nrs_state_set_nolock(svc, PTLRPC_NRS_CRR, 1, !!val); + cfs_spin_unlock(&svc->srv_rq_lock); + + return count; +} + struct ptlrpc_srh_iterator { __u64 srhi_seq; struct ptlrpc_request *srhi_req; @@ -646,6 +676,10 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, {.name = "timeouts", .read_fptr = ptlrpc_lprocfs_rd_timeouts, .data = svc}, + {.name = "nrs_crr", + .read_fptr = ptlrpc_lprocfs_rd_nrs_crr, + .write_fptr = ptlrpc_lprocfs_wr_nrs_crr, + .data = svc}, {NULL} }; static struct file_operations req_history_fops = { diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 902cdfd..410ac0f 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -453,6 +453,511 @@ static void ptlrpc_at_timer(unsigned long castmeharder) cfs_waitq_signal(&svc->srv_waitq); } +static ptlrpc_nrs_head_t * +ptlrpc_svc_nrs_head(struct ptlrpc_service *svc, int hp) +{ + return hp ? &svc->srv_hpreq_nrs : &svc->srv_req_nrs; +} + +int +ptlrpc_server_nrs_register(struct ptlrpc_service *svc, int type, + char *name, ptlrpc_nrs_ops_t *ops, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy; + ptlrpc_nrs_policy_t *tmp; + int rc; + + /* should have default policy */ + OBD_ALLOC_PTR(policy); + if (policy == NULL) + return -ENOMEM; + + strncpy(policy->nrs_name, name, PTLRPC_NRS_NAME_LEN); + policy->nrs_svc = svc; + policy->nrs_type = type; + policy->nrs_ops = ops; + policy->nrs_head = nrs_head; + rc = policy->nrs_ops->op_nrs_init(policy); + if (rc != 0) { + OBD_FREE_PTR(policy); + return rc; + } + + cfs_spin_lock(&svc->srv_rq_lock); + cfs_list_for_each_entry(tmp, &nrs_head->nh_policy_list, nrs_list) { + if (tmp->nrs_type == policy->nrs_type) { + cfs_spin_unlock(&svc->srv_rq_lock); + CERROR("NRS %d (%s) has been registered\n", type, name); + policy->nrs_ops->op_nrs_fini(policy); + OBD_FREE_PTR(policy); + return -EEXIST; + } + } + + /* the first registered policy should be the default */ + policy->nrs_default = cfs_list_empty(&nrs_head->nh_policy_list); + cfs_list_add(&policy->nrs_list, &nrs_head->nh_policy_list); + cfs_spin_unlock(&svc->srv_rq_lock); + + return 0; +} + +static ptlrpc_nrs_policy_t * +ptlrpc_nrs_policy_find_nolock(ptlrpc_nrs_head_t *nrs_head, int type) +{ + ptlrpc_nrs_policy_t *tmp; + + cfs_list_for_each_entry(tmp, &nrs_head->nh_policy_list, nrs_list) { + if (type == tmp->nrs_type) + return tmp; + } + return NULL; +} + +void +ptlrpc_server_nrs_unregister(struct ptlrpc_service *svc, int type, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy = NULL; + + cfs_spin_lock(&svc->srv_rq_lock); + policy = ptlrpc_nrs_policy_find_nolock(nrs_head, type); + if (policy == NULL) { + cfs_spin_unlock(&svc->srv_rq_lock); + CERROR("Can't find NRS type %d\n", type); + return; + } + + LASSERT(ptlrpc_nrs_policy_req_first(policy) == NULL); + cfs_list_del(&policy->nrs_list); + /* can't unregister default policy unless the service is stopping */ + LASSERT(!policy->nrs_default || svc->srv_is_stopping); + + cfs_spin_unlock(&svc->srv_rq_lock); + + policy->nrs_ops->op_nrs_fini(policy); + LASSERT(policy->nrs_private == NULL); + OBD_FREE_PTR(policy); +} + +int +ptlrpc_server_nrs_state_set_nolock(struct ptlrpc_service *svc, + int type, int hp, int active) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy = NULL; + ptlrpc_nrs_policy_t *tmp = NULL; + + policy = ptlrpc_nrs_policy_find_nolock(nrs_head, type); + if (policy == NULL) { + CERROR("Can't find NRS type %d\n", type); + return -ENOENT; + } + + if (policy->nrs_default) { + CERROR("Can't change default NRS %d\n", type); + return -EPERM; + } + + policy->nrs_active = !!active; + if (!active) + return 0; + + cfs_list_for_each_entry(tmp, &nrs_head->nh_policy_list, nrs_list) { + if (tmp != policy && tmp->nrs_active) + tmp->nrs_active = 0; /* only allow one active policy */ + } + return 0; +} + +int +ptlrpc_server_nrs_state_set(struct ptlrpc_service *svc, + int type, int hp, int active) +{ + int rc; + + cfs_spin_lock(&svc->srv_rq_lock); + rc = ptlrpc_server_nrs_state_set_nolock(svc, type, hp, active); + cfs_spin_unlock(&svc->srv_rq_lock); + + return rc; +} + +int +ptlrpc_server_nrs_state_get_nolock(struct ptlrpc_service *svc, int type, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy = NULL; + + policy = ptlrpc_nrs_policy_find_nolock(nrs_head, type); + if (policy == NULL) { + CERROR("Can't find NRS type %d\n", type); + return -ENOENT; + } + + return policy->nrs_default || policy->nrs_active; +} + +int +ptlrpc_server_nrs_state_get(struct ptlrpc_service *svc, int type, int hp) +{ + int rc; + + cfs_spin_lock(&svc->srv_rq_lock); + rc = ptlrpc_server_nrs_state_get_nolock(svc, type, hp); + cfs_spin_unlock(&svc->srv_rq_lock); + + return rc; +} + +static void +ptlrpc_server_req_add_nolock(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, req->rq_hp); + ptlrpc_nrs_policy_t *pl_act = NULL; + ptlrpc_nrs_policy_t *pl_def = NULL; + ptlrpc_nrs_policy_t *tmp; + int rc = -1; + + /* NB: must call with hold svc::srv_rq_lock */ + cfs_list_for_each_entry(tmp, &nrs_head->nh_policy_list, nrs_list) { + if (pl_def == NULL && tmp->nrs_default) + pl_def = tmp; + else if (pl_act == NULL && tmp->nrs_active) + pl_act = tmp; + + if (pl_act != NULL && pl_def != NULL) + break; + } + + if (pl_act != NULL) { + rc = ptlrpc_nrs_policy_req_add(pl_act, req); + if (rc == 0) + return; + } + + LASSERT(pl_def != NULL); + rc = ptlrpc_nrs_policy_req_add(pl_def, req); + LASSERT(rc == 0); /* default policy should never fail */ +} + +static int +ptlrpc_server_req_pending_nolock(struct ptlrpc_service *svc, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + + /* NB: can be called w/o any lock */ + return nrs_head->nh_req_count > 0; +}; + +static struct ptlrpc_request * +ptlrpc_server_req_first_nolock(struct ptlrpc_service *svc, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy; + struct ptlrpc_request *req; + + /* NB: must call with hold svc::srv_rq_lock */ + /* always try to drain requests from all NRS polices even they are + * inactive, because user can change policy status at runtime */ + cfs_list_for_each_entry(policy, &nrs_head->nh_policy_list, nrs_list) { + req = ptlrpc_nrs_policy_req_first(policy); + if (req != NULL) { + LASSERT(req->rq_nrs_policy == policy); + return req; + } + } + return NULL; +} + +static void +ptlrpc_server_req_del_nolock(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, req->rq_hp); + ptlrpc_nrs_policy_t *policy = req->rq_nrs_policy; + + /* NB: must call with hold svc::srv_rq_lock */ + LASSERT(policy != NULL); + + ptlrpc_nrs_policy_req_del(policy, req); + + /* any pending request on other polices? */ + if (nrs_head->nh_req_count > policy->nrs_req_count) { + /* move current policy to the end so we can round-robin + * over all polices and drain requests */ + cfs_list_del(&policy->nrs_list); + cfs_list_add_tail(&policy->nrs_list, &nrs_head->nh_policy_list); + } +} + +static int +nrs_fifo_init(ptlrpc_nrs_policy_t *policy) +{ + nrs_fifo_target_t *target; + + OBD_ALLOC_PTR(target); + if (target == NULL) + return -ENOMEM; + + CFS_INIT_LIST_HEAD(&target->ft_list); + policy->nrs_private = target; + return 0; +} + +static void +nrs_fifo_fini(ptlrpc_nrs_policy_t *policy) +{ + nrs_fifo_target_t *target = policy->nrs_private; + + LASSERT(target != NULL); + LASSERT(cfs_list_empty(&target->ft_list)); + + OBD_FREE_PTR(target); + policy->nrs_private = NULL; +} + +static void * +nrs_fifo_target(ptlrpc_nrs_policy_t *policy) +{ + return policy->nrs_private; +} + +static int +nrs_fifo_req_add(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + nrs_fifo_target_t *target = policy->nrs_ops->op_nrs_target(policy); + nrs_fifo_req_t *fr = &req->rq_nrs_u.fifo; + + LASSERT(target != NULL); + /* they are for debug */ + fr->fr_req.nr_key_major = 0; + fr->fr_req.nr_key_minor = target->ft_sequence++; + cfs_list_add_tail(&fr->fr_list, &target->ft_list); + return 0; +} + +static void +nrs_fifo_req_del(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + nrs_fifo_req_t *fr = &req->rq_nrs_u.fifo; + + LASSERT(!cfs_list_empty(&fr->fr_list)); + cfs_list_del_init(&fr->fr_list); +} + +static struct ptlrpc_request * +nrs_fifo_req_first(ptlrpc_nrs_policy_t *policy) +{ + nrs_fifo_target_t *target = policy->nrs_ops->op_nrs_target(policy); + + LASSERT(target != NULL); + return cfs_list_empty(&target->ft_list) ? NULL : + cfs_list_entry(target->ft_list.next, + struct ptlrpc_request, + rq_nrs_u.fifo.fr_list); +} + +ptlrpc_nrs_ops_t ptlrpc_nrs_fifo_ops = { + .op_nrs_init = nrs_fifo_init, + .op_nrs_fini = nrs_fifo_fini, + .op_nrs_target = nrs_fifo_target, + .op_req_add = nrs_fifo_req_add, + .op_req_del = nrs_fifo_req_del, + .op_req_first = nrs_fifo_req_first, +}; + +static int +crr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2) +{ + nrs_crr_req_t *req1 = container_of(e1, nrs_crr_req_t, cr_node); + nrs_crr_req_t *req2 = container_of(e2, nrs_crr_req_t, cr_node); + + if (req1->cr_req.nr_key_major < req1->cr_req.nr_key_major) + return 1; + else if (req1->cr_req.nr_key_major > req2->cr_req.nr_key_major) + return 0; + /* equal */ + if (req1->cr_req.nr_key_minor < req2->cr_req.nr_key_minor) + return 1; + else + return 0; +} + +cfs_binheap_ops_t nrs_crr_heap_ops = { + .hop_enter = NULL, + .hop_exit = NULL, + .hop_compare = crr_req_compare, +}; + +static nrs_crr_object_t * +nrs_crr_object_from_req(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + if (req->rq_export == NULL) + return NULL; + + return req->rq_hp ? &req->rq_export->exp_nrs_obj_hp : + &req->rq_export->exp_nrs_obj; +} + +static int +nrs_crr_init(ptlrpc_nrs_policy_t *policy) +{ + nrs_crr_target_t *target; + + OBD_ALLOC_PTR(target); + if (target == NULL) + return -ENOMEM; + + target->ct_binheap = cfs_binheap_create(&nrs_crr_heap_ops, + CBH_FLAG_ATOMIC_GROW, + 4096, NULL); + if (target->ct_binheap == NULL) { + OBD_FREE_PTR(target); + return -ENOMEM; + } + policy->nrs_private = target; + return 0; +} + +static void +nrs_crr_fini(ptlrpc_nrs_policy_t *policy) +{ + nrs_crr_target_t *target = policy->nrs_private; + + LASSERT(target != NULL); + LASSERT(cfs_binheap_is_empty(target->ct_binheap)); + + cfs_binheap_destroy(target->ct_binheap); + OBD_FREE_PTR(target); + + policy->nrs_private = NULL; +} + +static void * +nrs_crr_target(ptlrpc_nrs_policy_t *policy) +{ + /* NB: only one target although we can have more */ + LASSERT(policy->nrs_private != NULL); + return policy->nrs_private; +} + +static int +nrs_crr_req_add(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + nrs_crr_target_t *target = policy->nrs_ops->op_nrs_target(policy); + nrs_crr_object_t *obj = nrs_crr_object_from_req(policy, req); + nrs_crr_req_t *cr = &req->rq_nrs_u.crr; + int rc; + + /* NB: it's really for client round-robin so far, but we can + * change it a little and reuse it for object round-robin */ + LASSERT(target != NULL); + + if (obj == NULL) + return -EINVAL; + + if (obj->co_round < target->ct_round) + obj->co_round = target->ct_round; + + cr->cr_req.nr_key_major = obj->co_round; + cr->cr_req.nr_key_minor = target->ct_sequence; + rc = cfs_binheap_insert(target->ct_binheap, &cr->cr_node); + + if (rc == 0) { + target->ct_sequence++; + obj->co_round++; + } + return rc; +} + +static void +nrs_crr_req_del(ptlrpc_nrs_policy_t *policy, + struct ptlrpc_request *req) +{ + nrs_crr_target_t *target = policy->nrs_ops->op_nrs_target(policy); + nrs_crr_object_t *obj = nrs_crr_object_from_req(policy, req); + nrs_crr_req_t *cr = &req->rq_nrs_u.crr; + cfs_binheap_node_t *node; + + LASSERT(target != NULL); + LASSERT(obj != NULL); + LASSERT(cr->cr_req.nr_key_major < obj->co_round); + + cfs_binheap_remove(target->ct_binheap, &cr->cr_node); + node = cfs_binheap_root(target->ct_binheap); + + if (node == NULL) { /* no more request */ + target->ct_round++; + } else { + cr = container_of(node, nrs_crr_req_t, cr_node); + if (target->ct_round < cr->cr_req.nr_key_major) + target->ct_round = cr->cr_req.nr_key_major; + } +} + +static struct ptlrpc_request * +nrs_crr_req_first(ptlrpc_nrs_policy_t *policy) +{ + nrs_crr_target_t *target = policy->nrs_ops->op_nrs_target(policy); + cfs_binheap_node_t *node = cfs_binheap_root(target->ct_binheap); + + return node == NULL ? NULL : + container_of(node, struct ptlrpc_request, rq_nrs_u.crr.cr_node); +} + +ptlrpc_nrs_ops_t ptlrpc_nrs_heap_ops = { + .op_nrs_init = nrs_crr_init, + .op_nrs_fini = nrs_crr_fini, + .op_nrs_target = nrs_crr_target, + .op_req_add = nrs_crr_req_add, + .op_req_del = nrs_crr_req_del, + .op_req_first = nrs_crr_req_first, +}; + +static void +ptlrpc_svc_nrs_init(struct ptlrpc_service *svc, int hp) +{ + ptlrpc_nrs_head_t *nrs_head; + + nrs_head = ptlrpc_svc_nrs_head(svc, hp); + CFS_INIT_LIST_HEAD(&nrs_head->nh_policy_list); + nrs_head->nh_req_count = 0; +} + +static int +ptlrpc_svc_nrs_setup(struct ptlrpc_service *svc, int hp) +{ + int rc; + + rc = ptlrpc_server_nrs_register(svc, PTLRPC_NRS_FIFO, "fifo", + &ptlrpc_nrs_fifo_ops, hp); + if (rc != 0) + return rc; + + return ptlrpc_server_nrs_register(svc, PTLRPC_NRS_CRR, "heap", + &ptlrpc_nrs_heap_ops, hp); +} + +static void +ptlrpc_svc_nrs_cleanup(struct ptlrpc_service *svc, int hp) +{ + ptlrpc_nrs_head_t *nrs_head = ptlrpc_svc_nrs_head(svc, hp); + ptlrpc_nrs_policy_t *policy; + + while (!cfs_list_empty(&nrs_head->nh_policy_list)) { + policy = cfs_list_entry(nrs_head->nh_policy_list.next, + ptlrpc_nrs_policy_t, nrs_list); + ptlrpc_server_nrs_unregister(svc, policy->nrs_type, hp); + } +} + /** * Initialize service on a given portal. * This includes starting serving threads , allocating and posting rqbds and @@ -527,8 +1032,9 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); - CFS_INIT_LIST_HEAD(&service->srv_request_queue); - CFS_INIT_LIST_HEAD(&service->srv_request_hpq); + ptlrpc_svc_nrs_init(service, 0); + ptlrpc_svc_nrs_init(service, 1); + CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds); CFS_INIT_LIST_HEAD(&service->srv_active_rqbds); CFS_INIT_LIST_HEAD(&service->srv_history_rqbds); @@ -567,6 +1073,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, timeout is less than this, we'll be sending an early reply. */ at_init(&service->srv_at_estimate, 10, 0); + rc = ptlrpc_svc_nrs_setup(service, 0); + if (rc != 0) + GOTO(failed, NULL); + + rc = ptlrpc_svc_nrs_setup(service, 1); + if (rc != 0) + GOTO(failed, NULL); + cfs_spin_lock (&ptlrpc_all_services_lock); cfs_list_add (&service->srv_list, &ptlrpc_all_services); cfs_spin_unlock (&ptlrpc_all_services_lock); @@ -1255,9 +1769,11 @@ static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, if (req->rq_hp == 0) { int opc = lustre_msg_get_opc(req->rq_reqmsg); + if (req->rq_nrs_policy != NULL) + ptlrpc_server_req_del_nolock(svc, req); /* Add to the high priority queue. */ - cfs_list_move_tail(&req->rq_list, &svc->srv_request_hpq); req->rq_hp = 1; + ptlrpc_server_req_add_nolock(svc, req); if (opc != OBD_PING) DEBUG_REQ(D_NET, req, "high priority req"); } @@ -1314,12 +1830,11 @@ static int ptlrpc_server_request_add(struct ptlrpc_service *svc, /* Before inserting the request into the queue, check if it is not * inserted yet, or even already handled -- it may happen due to * a racing ldlm_server_blocking_ast(). */ - if (req->rq_phase == RQ_PHASE_NEW && cfs_list_empty(&req->rq_list)) { + if (req->rq_phase == RQ_PHASE_NEW && req->rq_nrs_policy == NULL) { if (rc) ptlrpc_hpreq_reorder_nolock(svc, req); else - cfs_list_add_tail(&req->rq_list, - &svc->srv_request_queue); + ptlrpc_server_req_add_nolock(svc, req); } cfs_spin_unlock(&svc->srv_rq_lock); @@ -1339,14 +1854,14 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force) if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1) return 0; - return cfs_list_empty(&svc->srv_request_queue) || + return !ptlrpc_server_req_pending_nolock(svc, 0) || svc->srv_hpreq_count < svc->srv_hpreq_ratio; } static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force) { return ptlrpc_server_allow_high(svc, force) && - !cfs_list_empty(&svc->srv_request_hpq); + ptlrpc_server_req_pending_nolock(svc, 1); } /** @@ -1377,7 +1892,7 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force) { return ptlrpc_server_allow_normal(svc, force) && - !cfs_list_empty(&svc->srv_request_queue); + ptlrpc_server_req_pending_nolock(svc, 0); } /** @@ -1407,16 +1922,13 @@ ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) ENTRY; if (ptlrpc_server_high_pending(svc, force)) { - req = cfs_list_entry(svc->srv_request_hpq.next, - struct ptlrpc_request, rq_list); + req = ptlrpc_server_req_first_nolock(svc, 1); svc->srv_hpreq_count++; RETURN(req); - } if (ptlrpc_server_normal_pending(svc, force)) { - req = cfs_list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); + req = ptlrpc_server_req_first_nolock(svc, 0); svc->srv_hpreq_count = 0; RETURN(req); } @@ -1630,7 +2142,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, } } - cfs_list_del_init(&request->rq_list); + ptlrpc_server_req_del_nolock(svc, request); svc->srv_n_active_reqs++; if (request->rq_hp) svc->srv_n_active_hpreq++; @@ -1695,14 +2207,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, } CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc " - "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), + "%s:%s+%d:%d:x"LPU64":%s:%d:"LPU64":"LPU64"\n", + cfs_curproc_comm(), (request->rq_export ? (char *)request->rq_export->exp_client_uuid.uuid : "0"), (request->rq_export ? cfs_atomic_read(&request->rq_export->exp_refcount) : -99), lustre_msg_get_status(request->rq_reqmsg), request->rq_xid, libcfs_id2str(request->rq_peer), - lustre_msg_get_opc(request->rq_reqmsg)); + lustre_msg_get_opc(request->rq_reqmsg), + request->rq_nrs_u.pub.nr_key_major, + request->rq_nrs_u.pub.nr_key_minor); if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING) CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val); @@ -1730,8 +2245,8 @@ put_conn: cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); - CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc " - "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in " + CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc:epoch " + "%s:%s+%d:%d:x"LPU64":%s:%d:"LPU64":"LPU64" Request procesed in " "%ldus (%ldus total) trans "LPU64" rc %d/%d\n", cfs_curproc_comm(), (request->rq_export ? @@ -1742,6 +2257,8 @@ put_conn: request->rq_xid, libcfs_id2str(request->rq_peer), lustre_msg_get_opc(request->rq_reqmsg), + request->rq_nrs_u.pub.nr_key_major, + request->rq_nrs_u.pub.nr_key_minor, timediff, cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL), (request->rq_repmsg ? @@ -2695,7 +3212,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) struct ptlrpc_request *req; req = ptlrpc_server_request_get(service, 1); - cfs_list_del(&req->rq_list); + ptlrpc_server_req_del_nolock(service, req); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; ptlrpc_hpreq_fini(req); @@ -2706,6 +3223,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) LASSERT(service->srv_n_history_rqbds == 0); LASSERT(cfs_list_empty(&service->srv_active_rqbds)); + ptlrpc_svc_nrs_cleanup(service, 0); + ptlrpc_svc_nrs_cleanup(service, 1); + /* Now free all the request buffers since nothing references them * any more... */ while (!cfs_list_empty(&service->srv_idle_rqbds)) { @@ -2768,12 +3288,9 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) } /* How long has the next entry been waiting? */ - if (cfs_list_empty(&svc->srv_request_queue)) - request = cfs_list_entry(svc->srv_request_hpq.next, - struct ptlrpc_request, rq_list); - else - request = cfs_list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); + request = ptlrpc_server_req_first_nolock(svc, 1); + if (request == NULL) + request = ptlrpc_server_req_first_nolock(svc, 0); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); cfs_spin_unlock(&svc->srv_rq_lock);