For high read loads with large read size we are seeing that the
Ganesha memory consumption is growing unconditionally. Normally the
memory consumption would subside when the read load reduces but if the
read load kept on increasing or even continues, after some time OOM
gets triggered.
This is with tirpc 1.8.0 and a quick look suggests that the problem might
also be there in the latest code.
High mem consumption happens when the incoming rate of read requests from the
connected clients is higher than the rate at which the read reply can
be sent over the wire. As the rate of sending the reply back is
slower, the completed read RPC requests kept on accumulating in the
queue (on one of the pools from ioq_ifqh) while the worker thread
resumes accepting and executing more requests after adding the
currently completed one to the queue. Each queued entry is pinning the
memory equal to the length of the read request. For example, if there
are 200 clients, and each read RPC length is 1MB, if there are 20
completed outstanding read requests per client in the queues waiting
to be sent over the network, Ganesha
will end up consuming more than 4GB of memory.
To mitigate this issue, we are keeping count of the memory allocated
by the read RPCs and if that is increased beyond some configurable
threshold, we block the worker threads behind the poolq_head's qmutex
and they are not able to queue the requests (and hence not able to
accept new requests) until the memory consumption falls back into
acceptable range. This might not be ideal as it could potentially
penalise the clients which are not doing read rpcs but it does the
work and prevents the Ganesha from crashing the system with OOM. (we
have panic on oom enabled on our systems).
Does the issue and the patch (based on 1.8.0) below makes sense?
diff --git a/ntirpc/rpc/types.h b/ntirpc/rpc/types.h
index 60db0f6..a5852e0 100644
--- a/ntirpc/rpc/types.h
+++ b/ntirpc/rpc/types.h
@@ -115,6 +115,7 @@ typedef int32_t rpc_inline_t;
#define TIRPC_SET_DEBUG_FLAGS 3
#define TIRPC_GET_OTHER_FLAGS 4
#define TIRPC_SET_OTHER_FLAGS 5
+#define TIRPC_SET_UV_MEM_LIMIT 6
/*
* Debug flags support
@@ -182,6 +183,7 @@ typedef struct tirpc_pkg_params {
mem_2_size_t aligned_;
mem_2_size_t calloc_;
mem_p_size_t realloc_;
+ uint64_t max_uv_memory;
} tirpc_pkg_params;
extern tirpc_pkg_params __ntirpc_pkg_params;
diff --git a/ntirpc/rpc/xdr_ioq.h b/ntirpc/rpc/xdr_ioq.h
index def27bb..2293ea9 100644
--- a/ntirpc/rpc/xdr_ioq.h
+++ b/ntirpc/rpc/xdr_ioq.h
@@ -87,6 +87,7 @@ struct xdr_ioq {
struct poolq_head *ioq_pool;
struct xdr_ioq_uv_head ioq_uv; /* header/vectors */
+ uint64_t ioquv_totalsize;
uint64_t id;
struct svc_rpc_fwd *resp;
diff --git a/src/rpc_generic.c b/src/rpc_generic.c
index 0b237f3..a7505d4 100644
--- a/src/rpc_generic.c
+++ b/src/rpc_generic.c
@@ -137,6 +137,7 @@ tirpc_pkg_params __ntirpc_pkg_params = {
tirpc_aligned,
tirpc_calloc,
tirpc_realloc,
+ 0,
};
bool
@@ -161,6 +162,9 @@ tirpc_control(const u_int rq, void *in)
case TIRPC_SET_OTHER_FLAGS:
__ntirpc_pkg_params.other_flags = *(int *)in;
break;
+ case TIRPC_SET_UV_MEM_LIMIT:
+ __ntirpc_pkg_params.max_uv_memory = *(uint64_t *)in;
+ break;
default:
return (false);
}
diff --git a/src/svc_ioq.c b/src/svc_ioq.c
index bba809a..04a0095 100644
--- a/src/svc_ioq.c
+++ b/src/svc_ioq.c
@@ -86,6 +86,7 @@
*/
static int num_send_queues; /* must be a power of 2 */
static struct poolq_head *ioq_ifqh;
+static uint64_t uv_total_mem = 0;
static inline int
svc_ioq_mask(int fd)
@@ -271,9 +272,19 @@ static void
svc_ioq_write(SVCXPRT *xprt, struct xdr_ioq *xioq, struct poolq_head *ifph)
{
struct poolq_entry *have;
-
+ bool hold_lock = false; /* Hold lock during socket send */
+ uint64_t total_mem = 0;
+
+ if (__ntirpc_pkg_params.max_uv_memory) {
+ total_mem = atomic_fetch_uint64_t(&uv_total_mem);
+ if (total_mem > __ntirpc_pkg_params.max_uv_memory) {
+ mutex_lock(&ifph->qmutex);
+ hold_lock = true;
+ }
+ }
for (;;) {
int rc = 0;
+ size_t ioquv_totalsize = xioq->ioquv_totalsize;
/* do i/o unlocked */
if (svc_work_pool.params.thrd_max
@@ -297,13 +308,31 @@ svc_ioq_write(SVCXPRT *xprt, struct xdr_ioq
*xioq, struct poolq_head *ifph)
XDR_DESTROY(xioq->xdrs);
- mutex_lock(&ifph->qmutex);
+ if (__ntirpc_pkg_params.max_uv_memory) {
+ total_mem = atomic_sub_uint64_t(&uv_total_mem,
+ ioquv_totalsize);
+ if (total_mem > __ntirpc_pkg_params.max_uv_memory &&
+ !hold_lock) {
+ hold_lock = true;
+ mutex_lock(&ifph->qmutex);
+ } else if (hold_lock &&
+ (total_mem <
+ __ntirpc_pkg_params.max_uv_memory*2/3)) {
+ /* Time to release lock */
+ hold_lock = false;
+ mutex_unlock(&ifph->qmutex);
+ }
+ }
+
+ if (!hold_lock)
+ mutex_lock(&ifph->qmutex);
if (--(ifph->qcount) == 0)
break;
have = TAILQ_FIRST(&ifph->qh);
TAILQ_REMOVE(&ifph->qh, have, q);
- mutex_unlock(&ifph->qmutex);
+ if (!hold_lock)
+ mutex_unlock(&ifph->qmutex);
xioq = _IOQ(have);
xprt = (SVCXPRT *)xioq->xdrs[0].x_lib[1];
@@ -327,6 +356,10 @@ svc_ioq_write_now(SVCXPRT *xprt, struct xdr_ioq *xioq)
struct poolq_head *ifph = &ioq_ifqh[svc_ioq_mask(xprt->xp_fd)];
SVC_REF(xprt, SVC_REF_FLAG_NONE);
+ if (__ntirpc_pkg_params.max_uv_memory && xioq->ioquv_totalsize) {
+ (void)atomic_add_uint64_t(&uv_total_mem,
+ xioq->ioquv_totalsize);
+ }
mutex_lock(&ifph->qmutex);
if ((ifph->qcount)++ > 0) {
diff --git a/src/xdr_ioq.c b/src/xdr_ioq.c
index 84b423f..946be86 100644
--- a/src/xdr_ioq.c
+++ b/src/xdr_ioq.c
@@ -379,6 +379,7 @@ xdr_ioq_uv_append(struct xdr_ioq *xioq, u_int ioq_flags)
base = mem_alloc(xioq->ioq_uv.max_bsize);
memcpy(base, uv->v.vio_head, len);
mem_free(uv->v.vio_base, size);
+ xioq->ioquv_totalsize +=
(xioq->ioq_uv.max_bsize - size);
uv->v.vio_base =
uv->v.vio_head = base + 0;
uv->v.vio_tail = base + len;
@@ -389,6 +390,7 @@ xdr_ioq_uv_append(struct xdr_ioq *xioq, u_int ioq_flags)
}
uv = xdr_ioq_uv_create(xioq->ioq_uv.min_bsize, UIO_FLAG_FREE);
(xioq->ioq_uv.uvqh.qcount)++;
+ xioq->ioquv_totalsize += xioq->ioq_uv.min_bsize;
TAILQ_INSERT_TAIL(&xioq->ioq_uv.uvqh.qh, &uv->uvq, q);
} else {
/* XXX empty buffer slot (not supported for now) */
--
2.18.0