From: Jeff Layton <jlayton(a)redhat.com>
Add a new support library for coordinating a grace period between
multiple nodes. The basic idea is to use a shared RADOS object to
mediate requests for a grace period.
Typically, each node will perform a read operation on the database
and (then possibly) a write operation. In order to ensure atomicity,
we assert that the object version has not changed between the read
and write ops. If it has, then we start over again from scratch.
Change-Id: Idfff61f8d0092fd9095dc14390dfb434f0ad6696
Signed-off-by: Jeff Layton <jlayton(a)redhat.com>
---
src/SAL/recovery/recovery_rados_kv.c | 5 +-
src/include/rados_grace.h | 85 ++++
src/support/CMakeLists.txt | 4 +
src/support/rados_grace.c | 675 +++++++++++++++++++++++++++
4 files changed, 766 insertions(+), 3 deletions(-)
create mode 100644 src/include/rados_grace.h
create mode 100644 src/support/rados_grace.c
diff --git a/src/SAL/recovery/recovery_rados_kv.c b/src/SAL/recovery/recovery_rados_kv.c
index daa6a9c8f6c7..34fba4bd34ac 100644
--- a/src/SAL/recovery/recovery_rados_kv.c
+++ b/src/SAL/recovery/recovery_rados_kv.c
@@ -12,10 +12,9 @@
#include "fsal.h"
#include "netdb.h"
#include <rados/librados.h>
+#include <rados_grace.h>
#include "recovery_rados.h"
-#define DEFAULT_POOL "nfs-ganesha"
-
#define MAX_ITEMS 1024 /* relaxed */
static rados_t clnt;
@@ -30,7 +29,7 @@ static struct config_item rados_kv_params[] = {
rados_kv_parameter, ceph_conf),
CONF_ITEM_STR("userid", 1, MAXPATHLEN, NULL,
rados_kv_parameter, userid),
- CONF_ITEM_STR("pool", 1, MAXPATHLEN, DEFAULT_POOL,
+ CONF_ITEM_STR("pool", 1, MAXPATHLEN, DEFAULT_RADOS_GRACE_POOL,
rados_kv_parameter, pool),
CONFIG_EOL
};
diff --git a/src/include/rados_grace.h b/src/include/rados_grace.h
new file mode 100644
index 000000000000..97bd8a14107b
--- /dev/null
+++ b/src/include/rados_grace.h
@@ -0,0 +1,85 @@
+#ifndef _RADOS_GRACE_H
+#define _RADOS_GRACE_H
+
+#include <stdbool.h>
+
+#define DEFAULT_RADOS_GRACE_POOL "nfs-ganesha"
+#define DEFAULT_RADOS_GRACE_OID "grace"
+
+/*
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ *
+ * ---------------------------------------
+ */
+int rados_grace_create(rados_ioctx_t io_ctx, const char *oid);
+int rados_grace_dump(rados_ioctx_t io_ctx, const char *oid);
+int rados_grace_epochs(rados_ioctx_t io_ctx, const char *oid,
+ uint64_t *cur, uint64_t *rec);
+int rados_grace_enforcing_toggle(rados_ioctx_t io_ctx, const char *oid,
+ int nodes, const char * const *nodeids, uint64_t *pcur,
+ uint64_t *prec, bool start);
+int rados_grace_enforcing_check(rados_ioctx_t io_ctx, const char *oid);
+int rados_grace_join_bulk(rados_ioctx_t io_ctx, const char *oid, int nodes,
+ const char * const *nodeids, uint64_t *pcur, uint64_t *prec,
+ bool start);
+int rados_grace_lift_bulk(rados_ioctx_t io_ctx, const char *oid, int nodes,
+ const char * const *nodeids, uint64_t *pcur, uint64_t *prec,
+ bool remove);
+
+static inline int
+rados_grace_enforcing_on(rados_ioctx_t io_ctx, const char *oid,
+ const char *nodeid, uint64_t *pcur, uint64_t *prec)
+{
+ const char *nodeids[1];
+
+ nodeids[0] = nodeid;
+ return rados_grace_enforcing_toggle(io_ctx, oid, 1, nodeids, pcur,
+ prec, true);
+}
+
+static inline int
+rados_grace_enforcing_off(rados_ioctx_t io_ctx, const char *oid,
+ const char *nodeid, uint64_t *pcur, uint64_t *prec)
+{
+ const char *nodeids[1];
+
+ nodeids[0] = nodeid;
+ return rados_grace_enforcing_toggle(io_ctx, oid, 1, nodeids, pcur,
+ prec, false);
+}
+
+static inline int
+rados_grace_join(rados_ioctx_t io_ctx, const char *oid, const char *nodeid,
+ uint64_t *pcur, uint64_t *prec, bool start)
+{
+ const char *nodeids[1];
+
+ nodeids[0] = nodeid;
+ return rados_grace_join_bulk(io_ctx, oid, 1, nodeids, pcur, prec,
+ start);
+}
+
+static inline int
+rados_grace_lift(rados_ioctx_t io_ctx, const char *oid, const char *nodeid,
+ uint64_t *pcur, uint64_t *prec)
+{
+ const char *nodeids[1];
+
+ nodeids[0] = nodeid;
+ return rados_grace_lift_bulk(io_ctx, oid, 1, nodeids, pcur, prec,
+ false);
+}
+#endif /* _RADOS_GRACE_H */
diff --git a/src/support/CMakeLists.txt b/src/support/CMakeLists.txt
index 42dc7ab7b589..380f01d3d83b 100644
--- a/src/support/CMakeLists.txt
+++ b/src/support/CMakeLists.txt
@@ -41,6 +41,10 @@ set(netgroup_cache_SRCS
add_library(netgroup_cache STATIC ${netgroup_cache_SRCS})
add_sanitizers(netgroup_cache)
+if (USE_RADOS_RECOV)
+ add_library(rados_grace STATIC rados_grace.c)
+endif(USE_RADOS_RECOV)
+
########### next target ###############
SET(support_STAT_SRCS
diff --git a/src/support/rados_grace.c b/src/support/rados_grace.c
new file mode 100644
index 000000000000..e715f872e3ab
--- /dev/null
+++ b/src/support/rados_grace.c
@@ -0,0 +1,675 @@
+/*
+ * vim:noexpandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+ * Author: Jeff Layton <jlayton(a)redhat.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * The rados_grace database is a rados object with a well-known name that
+ * with which all cluster nodes can interact to coordinate grace-period
+ * enforcement.
+ *
+ * It consists of two parts:
+ *
+ * 1) 2 uint64_t epoch values (stored LE) that indicate the serial number of
+ * the current grace period (C) and the serial number of the grace period that
+ * from which recovery is currently allowed (R). These are stored as object
+ * data.
+ *
+ * 2) An omap containing a key value pair for each cluster node. The key is
+ * the hostname of the node running ganesha, and the value is a byte with a
+ * set of flags.
+ *
+ * Consider a single server epoch (E) of an individual NFS server to be the
+ * period between reboots. That consists of an initial grace period and
+ * a regular operation period. An epoch value of 0 is never valid.
+ *
+ * The first value (C) indicates the current server epoch. The client recovery
+ * db should be tagged with this value on creation, or when updating the db
+ * after the grace period has been fully lifted.
+ *
+ * The second uint64_t value in the data tells the NFS server from what
+ * recovery db it is allowed to reclaim. A value of 0 in this field means that
+ * we are out of the cluster-wide grace period and that no recovery is allowed.
+ *
+ * The omap contains a key for each host in the cluster. Typically, nodes join
+ * the cluster by setting their omap key. The value of the omap is a single
+ * byte that contains a set of flags that indicates their current need for a
+ * grace period and whether they are locally enforcing one.
+ *
+ * The grace period handling engine will update and store the flags, and it
+ * can be queried to determine whether other nodes may need a grace period or
+ * are enforcing.
+ */
+#include "config.h"
+#include <stdio.h>
+#include <stdint.h>
+#include <endian.h>
+#include <rados/librados.h>
+#include <errno.h>
+#include <getopt.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <stdbool.h>
+
+/* Each cluster node needs a slot here */
+#define MAX_ITEMS 1024
+
+/* Flags for the omap value flags field */
+
+/* Does this node currently require a grace period? */
+#define RADOS_GRACE_NEED_GRACE 0x1
+
+/* Is this node currently enforcing its grace period locally? */
+#define RADOS_GRACE_ENFORCING 0x2
+
+static void rados_grace_notify(rados_ioctx_t io_ctx, const char *oid)
+{
+ static char *buf;
+ static size_t len;
+
+ /* FIXME: we don't really want or need this to be synchronous */
+ rados_notify2(io_ctx, oid, "", 0, 3000, &buf, &len);
+ rados_buffer_free(buf);
+}
+
+int
+rados_grace_create(rados_ioctx_t io_ctx, const char *oid)
+{
+ int ret;
+ rados_write_op_t op = NULL;
+ uint64_t cur = htole64(1); // starting epoch
+ uint64_t rec = htole64(0); // no recovery yet
+ char buf[sizeof(uint64_t) * 2];
+
+ /*
+ * 2 uint64_t's
+ *
+ * The first denotes the current epoch serial number, the epoch serial
+ * number under which new recovery records should be created.
+ *
+ * The second number denotes the epoch from which clients are allowed
+ * to reclaim.
+ *
+ * An epoch of zero is never allowed, so if rec=0, then the grace
+ * period is no longer in effect, and can't be joined.
+ */
+ memcpy(buf, (char *)&cur, sizeof(cur));
+ memcpy(buf + sizeof(cur), (char *)&rec, sizeof(rec));
+
+ op = rados_create_write_op();
+ /* Create the object */
+ rados_write_op_create(op, LIBRADOS_CREATE_EXCLUSIVE, NULL);
+
+ /* Set serial numbers if we created the object */
+ rados_write_op_write_full(op, buf, sizeof(buf));
+
+ ret = rados_write_op_operate(op, io_ctx, oid, NULL, 0);
+ rados_release_write_op(op);
+ return ret;
+}
+
+int
+rados_grace_dump(rados_ioctx_t io_ctx, const char *oid)
+{
+ int ret;
+ rados_omap_iter_t iter;
+ rados_read_op_t op;
+ char *key_out = NULL;
+ char *val_out = NULL;
+ unsigned char more = '\0';
+ size_t len_out = 0;
+ char buf[sizeof(uint64_t) * 2];
+ uint64_t cur, rec;
+
+ op = rados_create_read_op();
+ rados_read_op_read(op, 0, sizeof(buf), buf, &len_out, NULL);
+ rados_read_op_omap_get_vals2(op, "", "", MAX_ITEMS, &iter,
&more, NULL);
+ ret = rados_read_op_operate(op, io_ctx, oid, 0);
+ if (ret < 0) {
+ printf("%s: ret=%d", __func__, ret);
+ goto out;
+ }
+
+ if (len_out != sizeof(buf)) {
+ ret = -ENOTRECOVERABLE;
+ goto out;
+ }
+
+ if (more) {
+ ret = -ENOTRECOVERABLE;
+ goto out;
+ }
+
+ cur = le64toh(*(uint64_t *)buf);
+ rec = le64toh(*(uint64_t *)(buf + sizeof(uint64_t)));
+ printf("cur=%lu rec=%lu\n", cur, rec);
+ printf("======================================================\n");
+ for (;;) {
+ char need = ' ', enforcing = ' ';
+
+ rados_omap_get_next(iter, &key_out, &val_out, &len_out);
+ if (key_out == NULL || val_out == NULL)
+ break;
+ if (*val_out & RADOS_GRACE_NEED_GRACE)
+ need = 'N';
+ if (*val_out & RADOS_GRACE_ENFORCING)
+ enforcing = 'E';
+ printf("%s\t%c%c\n", key_out, need, enforcing);
+ }
+ rados_omap_get_end(iter);
+out:
+ rados_release_read_op(op);
+ return ret;
+}
+
+int
+rados_grace_epochs(rados_ioctx_t io_ctx, const char *oid,
+ uint64_t *cur, uint64_t *rec)
+{
+ int ret;
+ rados_read_op_t op;
+ size_t len_out = 0;
+ char buf[sizeof(uint64_t) * 2];
+
+ op = rados_create_read_op();
+ rados_read_op_read(op, 0, sizeof(buf), buf, &len_out, NULL);
+ ret = rados_read_op_operate(op, io_ctx, oid, 0);
+ if (ret < 0)
+ goto out;
+
+ ret = -ENOTRECOVERABLE;
+ if (len_out != sizeof(buf))
+ goto out;
+
+ *cur = le64toh(*(uint64_t *)buf);
+ *rec = le64toh(*(uint64_t *)(buf + sizeof(uint64_t)));
+ ret = 0;
+out:
+ rados_release_read_op(op);
+ return ret;
+}
+
+int
+rados_grace_enforcing_toggle(rados_ioctx_t io_ctx, const char *oid, int nodes,
+ const char * const *nodeids, uint64_t *pcur,
+ uint64_t *prec, bool enable)
+{
+ int i, ret;
+ char *flags = NULL;
+ char **vals = NULL;
+ size_t *lens = NULL;
+ uint64_t cur, rec, ver;
+
+ /* allocate an array of flag bytes and set ENFORCING on all of them */
+ flags = calloc(nodes, 1);
+ if (!flags) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* pointers to each val byte */
+ vals = calloc(nodes, sizeof(char *));
+ if (!vals) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* lengths */
+ lens = calloc(nodes, sizeof(size_t));
+ if (!lens) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* each val is one flag byte */
+ for (i = 0; i < nodes; ++i) {
+ vals[i] = &flags[i];
+ lens[i] = 1;
+ }
+
+ do {
+ rados_write_op_t wop;
+ rados_read_op_t rop;
+ rados_omap_iter_t iter;
+ size_t len = 0;
+ unsigned char more = 0;
+ char buf[sizeof(uint64_t) * 2];
+
+ /* read epoch blob */
+ rop = rados_create_read_op();
+ rados_read_op_read(rop, 0, sizeof(buf), buf, &len, NULL);
+ rados_read_op_omap_get_vals2(rop, "", "", MAX_ITEMS, &iter,
+ &more, NULL);
+ ret = rados_read_op_operate(rop, io_ctx, oid, 0);
+ if (ret < 0) {
+ rados_release_read_op(rop);
+ break;
+ }
+
+ if (more || (len != sizeof(buf))) {
+ ret = -ENOTRECOVERABLE;
+ rados_release_read_op(rop);
+ break;
+ }
+
+ ver = rados_get_last_version(io_ctx);
+
+ /*
+ * Walk the returned kv pairs and flip on any existing flags
+ * in the matching nodeid (if there is one)
+ */
+ for (;;) {
+ char *key, *val;
+
+ rados_omap_get_next(iter, &key, &val, &len);
+ if (!key)
+ break;
+ for (i = 0; i < nodes; ++i) {
+ if (strcmp(key, nodeids[i]))
+ continue;
+ flags[i] = *val;
+ if (enable)
+ flags[i] |= RADOS_GRACE_ENFORCING;
+ else
+ flags[i] &= ~RADOS_GRACE_ENFORCING;
+ break;
+ }
+ }
+ rados_omap_get_end(iter);
+ rados_release_read_op(rop);
+
+ /* Get old epoch numbers and version */
+ cur = le64toh(*(uint64_t *)buf);
+ rec = le64toh(*(uint64_t *)(buf + sizeof(uint64_t)));
+
+ /* Attempt to update object */
+ wop = rados_create_write_op();
+
+ /* Ensure that nothing has changed */
+ rados_write_op_assert_version(wop, ver);
+
+ /* Set omap values to given ones */
+ rados_write_op_omap_set(wop, nodeids,
+ (const char * const*)vals, lens, nodes);
+
+ ret = rados_write_op_operate(wop, io_ctx, oid, NULL, 0);
+ rados_release_write_op(wop);
+ if (ret >= 0)
+ rados_grace_notify(io_ctx, oid);
+ } while (ret == -ERANGE);
+
+ if (!ret) {
+ *pcur = cur;
+ *prec = rec;
+ }
+out:
+ free(lens);
+ free(flags);
+ free(vals);
+ return ret;
+}
+
+int
+rados_grace_enforcing_check(rados_ioctx_t io_ctx, const char *oid)
+{
+ int ret;
+ rados_read_op_t rop;
+ rados_omap_iter_t iter;
+ unsigned char more = 0;
+
+ rop = rados_create_read_op();
+ rados_read_op_omap_get_vals2(rop, "", "", MAX_ITEMS, &iter,
+ &more, NULL);
+ ret = rados_read_op_operate(rop, io_ctx, oid, 0);
+ if (ret < 0) {
+ rados_release_read_op(rop);
+ goto out;
+ }
+
+ if (more) {
+ ret = -ENOTRECOVERABLE;
+ rados_release_read_op(rop);
+ goto out;
+ }
+
+ ret = 0;
+ for (;;) {
+ char *key, *val;
+ size_t len = 0;
+
+ rados_omap_get_next(iter, &key, &val, &len);
+ if (!key)
+ break;
+ /* If anyone isn't enforcing, then return an err */
+ if (!(*val & RADOS_GRACE_ENFORCING)) {
+ ret = -EL2NSYNC;
+ break;
+ }
+ }
+ rados_omap_get_end(iter);
+ rados_release_read_op(rop);
+out:
+ return ret;
+}
+
+int
+rados_grace_join_bulk(rados_ioctx_t io_ctx, const char *oid, int nodes,
+ const char * const *nodeids, uint64_t *pcur,
+ uint64_t *prec, bool start)
+{
+ int i, ret;
+ char *flags = NULL;
+ char **vals = NULL;
+ size_t *lens = NULL;
+ uint64_t cur, rec, ver;
+
+ /* flag bytes */
+ flags = malloc(nodes);
+ if (!flags) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* pointers to each val byte */
+ vals = calloc(nodes, sizeof(char *));
+ if (!vals) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* lengths */
+ lens = calloc(nodes, sizeof(size_t));
+ if (!lens) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* each val is one flag byte */
+ for (i = 0; i < nodes; ++i) {
+ vals[i] = &flags[i];
+ lens[i] = 1;
+ }
+
+ do {
+ rados_write_op_t wop;
+ rados_read_op_t rop;
+ rados_omap_iter_t iter;
+ size_t len = 0;
+ unsigned char more = 0;
+ char buf[sizeof(uint64_t) * 2];
+
+ /* read epoch blob */
+ rop = rados_create_read_op();
+ rados_read_op_read(rop, 0, sizeof(buf), buf, &len, NULL);
+ rados_read_op_omap_get_vals2(rop, "", "", MAX_ITEMS, &iter,
+ &more, NULL);
+ ret = rados_read_op_operate(rop, io_ctx, oid, 0);
+ if (ret < 0) {
+ rados_release_read_op(rop);
+ break;
+ }
+
+ if (more || (len != sizeof(buf))) {
+ ret = -ENOTRECOVERABLE;
+ rados_release_read_op(rop);
+ break;
+ }
+
+ ver = rados_get_last_version(io_ctx);
+
+ /*
+ * Walk the returned kv pairs and flip on any existing flags
+ * in the matching nodeid (if there is one)
+ */
+ memset(flags, RADOS_GRACE_NEED_GRACE|RADOS_GRACE_ENFORCING,
+ nodes);
+ for (;;) {
+ char *key, *val;
+
+ rados_omap_get_next(iter, &key, &val, &len);
+ if (!key)
+ break;
+ for (i = 0; i < nodes; ++i) {
+ if (!strcmp(key, nodeids[i])) {
+ flags[i] |= *val;
+ break;
+ }
+ }
+ }
+ rados_omap_get_end(iter);
+ rados_release_read_op(rop);
+
+ /* Get old epoch numbers and version */
+ cur = le64toh(*(uint64_t *)buf);
+ rec = le64toh(*(uint64_t *)(buf + sizeof(uint64_t)));
+
+ /* Only start a new grace period if start bool is set */
+ /* FIXME: do we need this with real membership? */
+ if (rec == 0 && !start)
+ break;
+
+ /* Attempt to update object */
+ wop = rados_create_write_op();
+
+ /* Ensure that nothing has changed */
+ rados_write_op_assert_version(wop, ver);
+
+ /* Update the object data iff rec == 0 */
+ if (rec == 0) {
+ uint64_t tc, tr;
+
+ rec = cur;
+ ++cur;
+ tc = htole64(cur);
+ tr = htole64(rec);
+ memcpy(buf, (char *)&tc, sizeof(tc));
+ memcpy(buf + sizeof(tc), (char *)&tr, sizeof(tr));
+ rados_write_op_write_full(wop, buf, sizeof(buf));
+ }
+
+ /* Set omap values to given ones */
+ rados_write_op_omap_set(wop, nodeids,
+ (const char * const*)vals, lens, nodes);
+
+ ret = rados_write_op_operate(wop, io_ctx, oid, NULL, 0);
+ rados_release_write_op(wop);
+ if (ret >= 0)
+ rados_grace_notify(io_ctx, oid);
+ } while (ret == -ERANGE);
+
+ if (!ret) {
+ *pcur = cur;
+ *prec = rec;
+ }
+out:
+ free(lens);
+ free(flags);
+ free(vals);
+ return ret;
+}
+
+int
+rados_grace_lift_bulk(rados_ioctx_t io_ctx, const char *oid, int nodes,
+ const char * const *nodeids, uint64_t *pcur,
+ uint64_t *prec, bool remove)
+{
+ int ret;
+ char *flags = NULL;
+ char **vals = NULL;
+ size_t *lens = NULL;
+ const char **keys = NULL;
+ uint64_t cur, rec;
+
+ keys = calloc(nodes, sizeof(char *));
+ if (!keys) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* We don't need these arrays if we're just removing the keys */
+ if (!remove) {
+ flags = calloc(nodes, 1);
+ if (!flags) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* pointers to each val byte */
+ vals = calloc(nodes, sizeof(char *));
+ if (!vals) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ lens = calloc(nodes, sizeof(size_t));
+ if (!lens) {
+ ret = -ENOMEM;
+ goto out;
+ }
+ }
+
+ do {
+ int i, k, need;
+ rados_write_op_t wop;
+ rados_read_op_t rop;
+ rados_omap_iter_t iter;
+ char *key, *val;
+ size_t len;
+ char buf[sizeof(uint64_t) * 2];
+ unsigned char more = '\0';
+ uint64_t ver;
+
+ /* read epoch blob and omap keys */
+ rop = rados_create_read_op();
+ rados_read_op_read(rop, 0, sizeof(buf), buf, &len, NULL);
+ rados_read_op_omap_get_vals2(rop, "", "", MAX_ITEMS, &iter,
+ &more, NULL);
+ ret = rados_read_op_operate(rop, io_ctx, oid, 0);
+ if (ret < 0) {
+ rados_release_read_op(rop);
+ break;
+ }
+
+ if (more) {
+ ret = -ENOTRECOVERABLE;
+ rados_release_read_op(rop);
+ break;
+ }
+
+ if (len != sizeof(buf)) {
+ ret = -ENOTRECOVERABLE;
+ rados_release_read_op(rop);
+ break;
+ }
+
+ /* Get old epoch numbers and version */
+ ver = rados_get_last_version(io_ctx);
+ cur = le64toh(*(uint64_t *)buf);
+ rec = le64toh(*(uint64_t *)(buf + sizeof(uint64_t)));
+
+ /*
+ * Walk omap keys, see if it's in nodeids array. Add any that
+ * are and have NEED_GRACE set to the "keys" array along with
+ * the the flags field with the NEED_GRACE flag cleared.
+ *
+ * If the remove flag is set, then just remove them from the
+ * omap and don't bother with changing the flags.
+ */
+ need = 0;
+ k = 0;
+ for (;;) {
+ ret = rados_omap_get_next(iter, &key, &val, &len);
+ if (!key)
+ break;
+
+ if (*val & RADOS_GRACE_NEED_GRACE)
+ ++need;
+ else if (!remove)
+ continue;
+
+ for (i = 0; i < nodes; ++i) {
+ if (strcmp(key, nodeids[i]))
+ continue;
+ keys[k] = nodeids[i];
+ /*
+ * For the removal case, we just need the
+ * keys.
+ */
+ if (!remove) {
+ flags[k] =
+ *val & ~RADOS_GRACE_NEED_GRACE;
+ vals[k] = &flags[k];
+ lens[k] = 1;
+ }
+ ++k;
+ break;
+ }
+ };
+ rados_omap_get_end(iter);
+ rados_release_read_op(rop);
+
+ /* No matching keys? Nothing to do. */
+ if (k == 0)
+ break;
+
+ /* Attempt to update object */
+ wop = rados_create_write_op();
+
+ /* Ensure that nothing has changed */
+ rados_write_op_assert_version(wop, ver);
+
+ /* Set or remove any keys we matched earlier */
+ if (remove)
+ rados_write_op_omap_rm_keys(wop, keys, k);
+ else
+ rados_write_op_omap_set(wop, keys,
+ (const char * const *)vals,
+ lens, k);
+
+ /*
+ * If number of omap records we're setting or removing is the
+ * same as the number of hosts that have NEED_GRACE set, then
+ * fully lift the grace period.
+ */
+ if (need == k) {
+ uint64_t tc, tr;
+
+ rec = 0;
+ tr = htole64(rec);
+ tc = htole64(cur);
+ memcpy(buf, (char *)&tc, sizeof(tc));
+ memcpy(buf + sizeof(tc), (char *)&tr, sizeof(tr));
+ rados_write_op_write_full(wop, buf, sizeof(buf));
+ }
+
+ ret = rados_write_op_operate(wop, io_ctx, oid, NULL, 0);
+ rados_release_write_op(wop);
+ if (ret >= 0)
+ rados_grace_notify(io_ctx, oid);
+ } while (ret == -ERANGE);
+ if (!ret) {
+ *pcur = cur;
+ *prec = rec;
+ }
+out:
+ free(vals);
+ free(lens);
+ free(flags);
+ free(keys);
+ return ret;
+}
--
2.17.0