summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
authorMichael Ellerman <mpe@ellerman.id.au>2017-07-31 20:20:29 +1000
committerMichael Ellerman <mpe@ellerman.id.au>2017-07-31 20:20:29 +1000
commitbb272221e9db79f13d454e1f3fb6b05013be985e (patch)
tree36f4acc50e3fabac71fadd34c720c0a6011db470 /net/ceph
parent253fd51e2f533552ae35a0c661705da6c4842c1b (diff)
parent5771a8c08880cdca3bfb4a3fc6d309d6bba20877 (diff)
downloadlinux-bb272221e9db79f13d454e1f3fb6b05013be985e.tar.gz
linux-bb272221e9db79f13d454e1f3fb6b05013be985e.tar.bz2
linux-bb272221e9db79f13d454e1f3fb6b05013be985e.zip
Merge tag 'v4.13-rc1' into fixes
The fixes branch is based off a random pre-rc1 commit, because we had some fixes that needed to go in before rc1 was released. However we now need to fix some code that went in after that point, but before rc1, so merge rc1 to get that code into fixes so we can fix it!
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/ceph_common.c7
-rw-r--r--net/ceph/crush/crush.c3
-rw-r--r--net/ceph/crush/mapper.c81
-rw-r--r--net/ceph/debugfs.c112
-rw-r--r--net/ceph/messenger.c10
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c905
-rw-r--r--net/ceph/osdmap.c840
8 files changed, 1662 insertions, 304 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 47e94b560ba0..5c036d2f401e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -85,6 +85,7 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_OSD_OP: return "osd_op";
case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
+ case CEPH_MSG_OSD_BACKOFF: return "osd_backoff";
default: return "unknown";
}
}
@@ -598,7 +599,11 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
{
struct ceph_client *client;
struct ceph_entity_addr *myaddr = NULL;
- int err = -ENOMEM;
+ int err;
+
+ err = wait_for_random_bytes();
+ if (err < 0)
+ return ERR_PTR(err);
client = kzalloc(sizeof(*client), GFP_KERNEL);
if (client == NULL)
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 5bf94c04f645..4b428f46a8ca 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -1,6 +1,7 @@
#ifdef __KERNEL__
# include <linux/slab.h>
# include <linux/crush/crush.h>
+void clear_choose_args(struct crush_map *c);
#else
# include "crush_compat.h"
# include "crush.h"
@@ -127,6 +128,8 @@ void crush_destroy(struct crush_map *map)
#ifndef __KERNEL__
kfree(map->choose_tries);
+#else
+ clear_choose_args(map);
#endif
kfree(map);
}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index b5cd8c21bfdf..746b145bfd11 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -302,19 +302,42 @@ static __u64 crush_ln(unsigned int xin)
*
*/
+static __u32 *get_choose_arg_weights(const struct crush_bucket_straw2 *bucket,
+ const struct crush_choose_arg *arg,
+ int position)
+{
+ if (!arg || !arg->weight_set || arg->weight_set_size == 0)
+ return bucket->item_weights;
+
+ if (position >= arg->weight_set_size)
+ position = arg->weight_set_size - 1;
+ return arg->weight_set[position].weights;
+}
+
+static __s32 *get_choose_arg_ids(const struct crush_bucket_straw2 *bucket,
+ const struct crush_choose_arg *arg)
+{
+ if (!arg || !arg->ids)
+ return bucket->h.items;
+
+ return arg->ids;
+}
+
static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
- int x, int r)
+ int x, int r,
+ const struct crush_choose_arg *arg,
+ int position)
{
unsigned int i, high = 0;
unsigned int u;
- unsigned int w;
__s64 ln, draw, high_draw = 0;
+ __u32 *weights = get_choose_arg_weights(bucket, arg, position);
+ __s32 *ids = get_choose_arg_ids(bucket, arg);
for (i = 0; i < bucket->h.size; i++) {
- w = bucket->item_weights[i];
- if (w) {
- u = crush_hash32_3(bucket->h.hash, x,
- bucket->h.items[i], r);
+ dprintk("weight 0x%x item %d\n", weights[i], ids[i]);
+ if (weights[i]) {
+ u = crush_hash32_3(bucket->h.hash, x, ids[i], r);
u &= 0xffff;
/*
@@ -335,7 +358,7 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
* weight means a larger (less negative) value
* for draw.
*/
- draw = div64_s64(ln, w);
+ draw = div64_s64(ln, weights[i]);
} else {
draw = S64_MIN;
}
@@ -352,7 +375,9 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
static int crush_bucket_choose(const struct crush_bucket *in,
struct crush_work_bucket *work,
- int x, int r)
+ int x, int r,
+ const struct crush_choose_arg *arg,
+ int position)
{
dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
BUG_ON(in->size == 0);
@@ -374,7 +399,7 @@ static int crush_bucket_choose(const struct crush_bucket *in,
case CRUSH_BUCKET_STRAW2:
return bucket_straw2_choose(
(const struct crush_bucket_straw2 *)in,
- x, r);
+ x, r, arg, position);
default:
dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
return in->items[0];
@@ -436,7 +461,8 @@ static int crush_choose_firstn(const struct crush_map *map,
unsigned int vary_r,
unsigned int stable,
int *out2,
- int parent_r)
+ int parent_r,
+ const struct crush_choose_arg *choose_args)
{
int rep;
unsigned int ftotal, flocal;
@@ -486,7 +512,10 @@ static int crush_choose_firstn(const struct crush_map *map,
else
item = crush_bucket_choose(
in, work->work[-1-in->id],
- x, r);
+ x, r,
+ (choose_args ?
+ &choose_args[-1-in->id] : 0),
+ outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
skip_rep = 1;
@@ -543,7 +572,8 @@ static int crush_choose_firstn(const struct crush_map *map,
vary_r,
stable,
NULL,
- sub_r) <= outpos)
+ sub_r,
+ choose_args) <= outpos)
/* didn't get leaf */
reject = 1;
} else {
@@ -620,7 +650,8 @@ static void crush_choose_indep(const struct crush_map *map,
unsigned int recurse_tries,
int recurse_to_leaf,
int *out2,
- int parent_r)
+ int parent_r,
+ const struct crush_choose_arg *choose_args)
{
const struct crush_bucket *in = bucket;
int endpos = outpos + left;
@@ -692,7 +723,10 @@ static void crush_choose_indep(const struct crush_map *map,
item = crush_bucket_choose(
in, work->work[-1-in->id],
- x, r);
+ x, r,
+ (choose_args ?
+ &choose_args[-1-in->id] : 0),
+ outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
out[rep] = CRUSH_ITEM_NONE;
@@ -746,7 +780,8 @@ static void crush_choose_indep(const struct crush_map *map,
x, 1, numrep, 0,
out2, rep,
recurse_tries, 0,
- 0, NULL, r);
+ 0, NULL, r,
+ choose_args);
if (out2[rep] == CRUSH_ITEM_NONE) {
/* placed nothing; no leaf */
break;
@@ -823,7 +858,7 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* set the pointer first and then reserve the space for it to
* point to by incrementing the point.
*/
- v += sizeof(struct crush_work *);
+ v += sizeof(struct crush_work);
w->work = v;
v += map->max_buckets * sizeof(struct crush_work_bucket *);
for (b = 0; b < map->max_buckets; ++b) {
@@ -854,11 +889,12 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* @weight: weight vector (for map leaves)
* @weight_max: size of weight vector
* @cwin: pointer to at least crush_work_size() bytes of memory
+ * @choose_args: weights and ids for each known bucket
*/
int crush_do_rule(const struct crush_map *map,
int ruleno, int x, int *result, int result_max,
const __u32 *weight, int weight_max,
- void *cwin)
+ void *cwin, const struct crush_choose_arg *choose_args)
{
int result_len;
struct crush_work *cw = cwin;
@@ -968,11 +1004,6 @@ int crush_do_rule(const struct crush_map *map,
for (i = 0; i < wsize; i++) {
int bno;
- /*
- * see CRUSH_N, CRUSH_N_MINUS macros.
- * basically, numrep <= 0 means relative to
- * the provided result_max
- */
numrep = curstep->arg1;
if (numrep <= 0) {
numrep += result_max;
@@ -1013,7 +1044,8 @@ int crush_do_rule(const struct crush_map *map,
vary_r,
stable,
c+osize,
- 0);
+ 0,
+ choose_args);
} else {
out_size = ((numrep < (result_max-osize)) ?
numrep : (result_max-osize));
@@ -1030,7 +1062,8 @@ int crush_do_rule(const struct crush_map *map,
choose_leaf_tries : 1,
recurse_to_leaf,
c+osize,
- 0);
+ 0,
+ choose_args);
osize += out_size;
}
}
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 71ba13927b3d..fa5233e0d01c 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -77,7 +77,7 @@ static int osdmap_show(struct seq_file *s, void *p)
}
for (i = 0; i < map->max_osd; i++) {
struct ceph_entity_addr *addr = &map->osd_addr[i];
- int state = map->osd_state[i];
+ u32 state = map->osd_state[i];
char sb[64];
seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
@@ -104,6 +104,29 @@ static int osdmap_show(struct seq_file *s, void *p)
seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
pg->pgid.seed, pg->primary_temp.osd);
}
+ for (n = rb_first(&map->pg_upmap); n; n = rb_next(n)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(n, struct ceph_pg_mapping, node);
+
+ seq_printf(s, "pg_upmap %llu.%x [", pg->pgid.pool,
+ pg->pgid.seed);
+ for (i = 0; i < pg->pg_upmap.len; i++)
+ seq_printf(s, "%s%d", (i == 0 ? "" : ","),
+ pg->pg_upmap.osds[i]);
+ seq_printf(s, "]\n");
+ }
+ for (n = rb_first(&map->pg_upmap_items); n; n = rb_next(n)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(n, struct ceph_pg_mapping, node);
+
+ seq_printf(s, "pg_upmap_items %llu.%x [", pg->pgid.pool,
+ pg->pgid.seed);
+ for (i = 0; i < pg->pg_upmap_items.len; i++)
+ seq_printf(s, "%s%d->%d", (i == 0 ? "" : ","),
+ pg->pg_upmap_items.from_to[i][0],
+ pg->pg_upmap_items.from_to[i][1]);
+ seq_printf(s, "]\n");
+ }
up_read(&osdc->lock);
return 0;
@@ -147,17 +170,26 @@ static int monc_show(struct seq_file *s, void *p)
return 0;
}
+static void dump_spgid(struct seq_file *s, const struct ceph_spg *spgid)
+{
+ seq_printf(s, "%llu.%x", spgid->pgid.pool, spgid->pgid.seed);
+ if (spgid->shard != CEPH_SPG_NOSHARD)
+ seq_printf(s, "s%d", spgid->shard);
+}
+
static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
{
int i;
- seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
+ seq_printf(s, "osd%d\t%llu.%x\t", t->osd, t->pgid.pool, t->pgid.seed);
+ dump_spgid(s, &t->spgid);
+ seq_puts(s, "\t[");
for (i = 0; i < t->up.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
- seq_printf(s, "]/%d\t", t->acting.primary);
+ seq_printf(s, "]/%d\te%u\t", t->acting.primary, t->epoch);
if (t->target_oloc.pool_ns) {
seq_printf(s, "%*pE/%*pE\t0x%x",
(int)t->target_oloc.pool_ns->len,
@@ -234,6 +266,73 @@ static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
mutex_unlock(&osd->lock);
}
+static void dump_snapid(struct seq_file *s, u64 snapid)
+{
+ if (snapid == CEPH_NOSNAP)
+ seq_puts(s, "head");
+ else if (snapid == CEPH_SNAPDIR)
+ seq_puts(s, "snapdir");
+ else
+ seq_printf(s, "%llx", snapid);
+}
+
+static void dump_name_escaped(struct seq_file *s, unsigned char *name,
+ size_t len)
+{
+ size_t i;
+
+ for (i = 0; i < len; i++) {
+ if (name[i] == '%' || name[i] == ':' || name[i] == '/' ||
+ name[i] < 32 || name[i] >= 127) {
+ seq_printf(s, "%%%02x", name[i]);
+ } else {
+ seq_putc(s, name[i]);
+ }
+ }
+}
+
+static void dump_hoid(struct seq_file *s, const struct ceph_hobject_id *hoid)
+{
+ if (hoid->snapid == 0 && hoid->hash == 0 && !hoid->is_max &&
+ hoid->pool == S64_MIN) {
+ seq_puts(s, "MIN");
+ return;
+ }
+ if (hoid->is_max) {
+ seq_puts(s, "MAX");
+ return;
+ }
+ seq_printf(s, "%lld:%08x:", hoid->pool, hoid->hash_reverse_bits);
+ dump_name_escaped(s, hoid->nspace, hoid->nspace_len);
+ seq_putc(s, ':');
+ dump_name_escaped(s, hoid->key, hoid->key_len);
+ seq_putc(s, ':');
+ dump_name_escaped(s, hoid->oid, hoid->oid_len);
+ seq_putc(s, ':');
+ dump_snapid(s, hoid->snapid);
+}
+
+static void dump_backoffs(struct seq_file *s, struct ceph_osd *osd)
+{
+ struct rb_node *n;
+
+ mutex_lock(&osd->lock);
+ for (n = rb_first(&osd->o_backoffs_by_id); n; n = rb_next(n)) {
+ struct ceph_osd_backoff *backoff =
+ rb_entry(n, struct ceph_osd_backoff, id_node);
+
+ seq_printf(s, "osd%d\t", osd->o_osd);
+ dump_spgid(s, &backoff->spgid);
+ seq_printf(s, "\t%llu\t", backoff->id);
+ dump_hoid(s, backoff->begin);
+ seq_putc(s, '\t');
+ dump_hoid(s, backoff->end);
+ seq_putc(s, '\n');
+ }
+
+ mutex_unlock(&osd->lock);
+}
+
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
@@ -259,6 +358,13 @@ static int osdc_show(struct seq_file *s, void *pp)
}
dump_linger_requests(s, &osdc->homeless_osd);
+ seq_puts(s, "BACKOFFS\n");
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+ dump_backoffs(s, osd);
+ }
+
up_read(&osdc->lock);
return 0;
}
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 588a91930051..0c31035bbfee 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1288,13 +1288,16 @@ static void prepare_write_message(struct ceph_connection *con)
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
- WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
+
+ if (con->ops->reencode_message)
+ con->ops->reencode_message(m);
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
m->data_length);
- BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
+ WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
+ WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
/* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
@@ -2033,8 +2036,7 @@ static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = from_msgr(con->msgr)->supported_features;
u64 req_feat = from_msgr(con->msgr)->required_features;
- u64 server_feat = ceph_sanitize_features(
- le64_to_cpu(con->in_reply.features));
+ u64 server_feat = le64_to_cpu(con->in_reply.features);
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 250f11f78609..875675765531 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -6,6 +6,7 @@
#include <linux/random.h>
#include <linux/sched.h>
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/debugfs.h>
@@ -297,6 +298,10 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex);
if (monc->sub_renew_sent) {
+ /*
+ * This is only needed for legacy (infernalis or older)
+ * MONs -- see delayed_work().
+ */
monc->sub_renew_after = monc->sub_renew_sent +
(seconds >> 1) * HZ - 1;
dout("%s sent %lu duration %d renew after %lu\n", __func__,
@@ -955,7 +960,8 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc);
}
- if (is_auth) {
+ if (is_auth &&
+ !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
unsigned long now = jiffies;
dout("%s renew subs? now %lu renew after %lu\n",
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 924f07c36ddb..86a9737d8e3f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -12,6 +12,7 @@
#include <linux/bio.h>
#endif
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/messenger.h>
@@ -49,6 +50,7 @@ static void link_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
static void unlink_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
+static void clear_backoffs(struct ceph_osd *osd);
#if 1
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -373,6 +375,7 @@ static void target_copy(struct ceph_osd_request_target *dest,
ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
dest->pgid = src->pgid; /* struct */
+ dest->spgid = src->spgid; /* struct */
dest->pg_num = src->pg_num;
dest->pg_num_mask = src->pg_num_mask;
ceph_osds_copy(&dest->acting, &src->acting);
@@ -384,6 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
dest->flags = src->flags;
dest->paused = src->paused;
+ dest->epoch = src->epoch;
+ dest->last_force_resend = src->last_force_resend;
+
dest->osd = src->osd;
}
@@ -537,7 +543,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);
-static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
{
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}
@@ -552,17 +558,21 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
/* create request message */
- msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
- msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
+ msg_size = CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1; /* spgid */
+ msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ sizeof(struct ceph_osd_reqid); /* reqid */
+ msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
+ msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
msg_size += CEPH_ENCODING_START_BLK_LEN +
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
- msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
msg_size += 8; /* snapid */
msg_size += 8; /* snap_seq */
msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
- msg_size += 4; /* retry_attempt */
+ msg_size += 4 + 8; /* retry_attempt, features */
if (req->r_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -1010,6 +1020,8 @@ static void osd_init(struct ceph_osd *osd)
RB_CLEAR_NODE(&osd->o_node);
osd->o_requests = RB_ROOT;
osd->o_linger_requests = RB_ROOT;
+ osd->o_backoff_mappings = RB_ROOT;
+ osd->o_backoffs_by_id = RB_ROOT;
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
@@ -1021,6 +1033,8 @@ static void osd_cleanup(struct ceph_osd *osd)
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1141,6 +1155,7 @@ static void close_osd(struct ceph_osd *osd)
unlink_linger(osd, lreq);
link_linger(&osdc->homeless_osd, lreq);
}
+ clear_backoffs(osd);
__remove_osd_from_lru(osd);
erase_osd(&osdc->osds, osd);
@@ -1297,7 +1312,7 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
__pool_full(pi);
- WARN_ON(pi->id != t->base_oloc.pool);
+ WARN_ON(pi->id != t->target_oloc.pool);
return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
(osdc->osdmap->epoch < osdc->epoch_barrier);
@@ -1311,19 +1326,21 @@ enum calc_target_result {
static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
struct ceph_osd_request_target *t,
- u32 *last_force_resend,
+ struct ceph_connection *con,
bool any_change)
{
struct ceph_pg_pool_info *pi;
struct ceph_pg pgid, last_pgid;
struct ceph_osds up, acting;
bool force_resend = false;
- bool need_check_tiering = false;
- bool need_resend = false;
+ bool unpaused = false;
+ bool legacy_change;
+ bool split = false;
bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
enum calc_target_result ct_res;
int ret;
+ t->epoch = osdc->osdmap->epoch;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
if (!pi) {
t->osd = CEPH_HOMELESS_OSD;
@@ -1332,33 +1349,33 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
}
if (osdc->osdmap->epoch == pi->last_force_request_resend) {
- if (last_force_resend &&
- *last_force_resend < pi->last_force_request_resend) {
- *last_force_resend = pi->last_force_request_resend;
+ if (t->last_force_resend < pi->last_force_request_resend) {
+ t->last_force_resend = pi->last_force_request_resend;
force_resend = true;
- } else if (!last_force_resend) {
+ } else if (t->last_force_resend == 0) {
force_resend = true;
}
}
- if (ceph_oid_empty(&t->target_oid) || force_resend) {
- ceph_oid_copy(&t->target_oid, &t->base_oid);
- need_check_tiering = true;
- }
- if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
- ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
- need_check_tiering = true;
- }
- if (need_check_tiering &&
- (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ /* apply tiering */
+ ceph_oid_copy(&t->target_oid, &t->base_oid);
+ ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
+ if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
t->target_oloc.pool = pi->read_tier;
if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
t->target_oloc.pool = pi->write_tier;
+
+ pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
+ if (!pi) {
+ t->osd = CEPH_HOMELESS_OSD;
+ ct_res = CALC_TARGET_POOL_DNE;
+ goto out;
+ }
}
- ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
- &t->target_oloc, &pgid);
+ ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
+ &pgid);
if (ret) {
WARN_ON(ret != -ENOENT);
t->osd = CEPH_HOMELESS_OSD;
@@ -1368,7 +1385,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
last_pgid.pool = pgid.pool;
last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
- ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
+ ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
if (any_change &&
ceph_is_new_interval(&t->acting,
&acting,
@@ -1387,13 +1404,16 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
if (t->paused && !target_should_be_paused(osdc, t, pi)) {
t->paused = false;
- need_resend = true;
+ unpaused = true;
}
+ legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
+ ceph_osds_changed(&t->acting, &acting, any_change);
+ if (t->pg_num)
+ split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
- if (ceph_pg_compare(&t->pgid, &pgid) ||
- ceph_osds_changed(&t->acting, &acting, any_change) ||
- force_resend) {
+ if (legacy_change || force_resend || split) {
t->pgid = pgid; /* struct */
+ ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
ceph_osds_copy(&t->acting, &acting);
ceph_osds_copy(&t->up, &up);
t->size = pi->size;
@@ -1403,15 +1423,342 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
t->sort_bitwise = sort_bitwise;
t->osd = acting.primary;
- need_resend = true;
}
- ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
+ if (unpaused || legacy_change || force_resend ||
+ (split && con && CEPH_HAVE_FEATURE(con->peer_features,
+ RESEND_ON_SPLIT)))
+ ct_res = CALC_TARGET_NEED_RESEND;
+ else
+ ct_res = CALC_TARGET_NO_ACTION;
+
out:
dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
return ct_res;
}
+static struct ceph_spg_mapping *alloc_spg_mapping(void)
+{
+ struct ceph_spg_mapping *spg;
+
+ spg = kmalloc(sizeof(*spg), GFP_NOIO);
+ if (!spg)
+ return NULL;
+
+ RB_CLEAR_NODE(&spg->node);
+ spg->backoffs = RB_ROOT;
+ return spg;
+}
+
+static void free_spg_mapping(struct ceph_spg_mapping *spg)
+{
+ WARN_ON(!RB_EMPTY_NODE(&spg->node));
+ WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
+
+ kfree(spg);
+}
+
+/*
+ * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
+ * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is
+ * defined only within a specific spgid; it does not pass anything to
+ * children on split, or to another primary.
+ */
+DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
+ RB_BYPTR, const struct ceph_spg *, node)
+
+static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
+{
+ return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
+}
+
+static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
+ void **pkey, size_t *pkey_len)
+{
+ if (hoid->key_len) {
+ *pkey = hoid->key;
+ *pkey_len = hoid->key_len;
+ } else {
+ *pkey = hoid->oid;
+ *pkey_len = hoid->oid_len;
+ }
+}
+
+static int compare_names(const void *name1, size_t name1_len,
+ const void *name2, size_t name2_len)
+{
+ int ret;
+
+ ret = memcmp(name1, name2, min(name1_len, name2_len));
+ if (!ret) {
+ if (name1_len < name2_len)
+ ret = -1;
+ else if (name1_len > name2_len)
+ ret = 1;
+ }
+ return ret;
+}
+
+static int hoid_compare(const struct ceph_hobject_id *lhs,
+ const struct ceph_hobject_id *rhs)
+{
+ void *effective_key1, *effective_key2;
+ size_t effective_key1_len, effective_key2_len;
+ int ret;
+
+ if (lhs->is_max < rhs->is_max)
+ return -1;
+ if (lhs->is_max > rhs->is_max)
+ return 1;
+
+ if (lhs->pool < rhs->pool)
+ return -1;
+ if (lhs->pool > rhs->pool)
+ return 1;
+
+ if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
+ return -1;
+ if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
+ return 1;
+
+ ret = compare_names(lhs->nspace, lhs->nspace_len,
+ rhs->nspace, rhs->nspace_len);
+ if (ret)
+ return ret;
+
+ hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
+ hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
+ ret = compare_names(effective_key1, effective_key1_len,
+ effective_key2, effective_key2_len);
+ if (ret)
+ return ret;
+
+ ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
+ if (ret)
+ return ret;
+
+ if (lhs->snapid < rhs->snapid)
+ return -1;
+ if (lhs->snapid > rhs->snapid)
+ return 1;
+
+ return 0;
+}
+
+/*
+ * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
+ * compat stuff here.
+ *
+ * Assumes @hoid is zero-initialized.
+ */
+static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
+{
+ u8 struct_v;
+ u32 struct_len;
+ int ret;
+
+ ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
+ &struct_len);
+ if (ret)
+ return ret;
+
+ if (struct_v < 4) {
+ pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
+ goto e_inval;
+ }
+
+ hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->key)) {
+ ret = PTR_ERR(hoid->key);
+ hoid->key = NULL;
+ return ret;
+ }
+
+ hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->oid)) {
+ ret = PTR_ERR(hoid->oid);
+ hoid->oid = NULL;
+ return ret;
+ }
+
+ ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
+ ceph_decode_32_safe(p, end, hoid->hash, e_inval);
+ ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
+
+ hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->nspace)) {
+ ret = PTR_ERR(hoid->nspace);
+ hoid->nspace = NULL;
+ return ret;
+ }
+
+ ceph_decode_64_safe(p, end, hoid->pool, e_inval);
+
+ ceph_hoid_build_hash_cache(hoid);
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
+{
+ return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
+ 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
+}
+
+static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
+{
+ ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
+ ceph_encode_string(p, end, hoid->key, hoid->key_len);
+ ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
+ ceph_encode_64(p, hoid->snapid);
+ ceph_encode_32(p, hoid->hash);
+ ceph_encode_8(p, hoid->is_max);
+ ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
+ ceph_encode_64(p, hoid->pool);
+}
+
+static void free_hoid(struct ceph_hobject_id *hoid)
+{
+ if (hoid) {
+ kfree(hoid->key);
+ kfree(hoid->oid);
+ kfree(hoid->nspace);
+ kfree(hoid);
+ }
+}
+
+static struct ceph_osd_backoff *alloc_backoff(void)
+{
+ struct ceph_osd_backoff *backoff;
+
+ backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
+ if (!backoff)
+ return NULL;
+
+ RB_CLEAR_NODE(&backoff->spg_node);
+ RB_CLEAR_NODE(&backoff->id_node);
+ return backoff;
+}
+
+static void free_backoff(struct ceph_osd_backoff *backoff)
+{
+ WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
+ WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
+
+ free_hoid(backoff->begin);
+ free_hoid(backoff->end);
+ kfree(backoff);
+}
+
+/*
+ * Within a specific spgid, backoffs are managed by ->begin hoid.
+ */
+DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
+ RB_BYVAL, spg_node);
+
+static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
+ const struct ceph_hobject_id *hoid)
+{
+ struct rb_node *n = root->rb_node;
+
+ while (n) {
+ struct ceph_osd_backoff *cur =
+ rb_entry(n, struct ceph_osd_backoff, spg_node);
+ int cmp;
+
+ cmp = hoid_compare(hoid, cur->begin);
+ if (cmp < 0) {
+ n = n->rb_left;
+ } else if (cmp > 0) {
+ if (hoid_compare(hoid, cur->end) < 0)
+ return cur;
+
+ n = n->rb_right;
+ } else {
+ return cur;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Each backoff has a unique id within its OSD session.
+ */
+DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
+
+static void clear_backoffs(struct ceph_osd *osd)
+{
+ while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
+ struct ceph_spg_mapping *spg =
+ rb_entry(rb_first(&osd->o_backoff_mappings),
+ struct ceph_spg_mapping, node);
+
+ while (!RB_EMPTY_ROOT(&spg->backoffs)) {
+ struct ceph_osd_backoff *backoff =
+ rb_entry(rb_first(&spg->backoffs),
+ struct ceph_osd_backoff, spg_node);
+
+ erase_backoff(&spg->backoffs, backoff);
+ erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+ free_backoff(backoff);
+ }
+ erase_spg_mapping(&osd->o_backoff_mappings, spg);
+ free_spg_mapping(spg);
+ }
+}
+
+/*
+ * Set up a temporary, non-owning view into @t.
+ */
+static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
+ const struct ceph_osd_request_target *t)
+{
+ hoid->key = NULL;
+ hoid->key_len = 0;
+ hoid->oid = t->target_oid.name;
+ hoid->oid_len = t->target_oid.name_len;
+ hoid->snapid = CEPH_NOSNAP;
+ hoid->hash = t->pgid.seed;
+ hoid->is_max = false;
+ if (t->target_oloc.pool_ns) {
+ hoid->nspace = t->target_oloc.pool_ns->str;
+ hoid->nspace_len = t->target_oloc.pool_ns->len;
+ } else {
+ hoid->nspace = NULL;
+ hoid->nspace_len = 0;
+ }
+ hoid->pool = t->target_oloc.pool;
+ ceph_hoid_build_hash_cache(hoid);
+}
+
+static bool should_plug_request(struct ceph_osd_request *req)
+{
+ struct ceph_osd *osd = req->r_osd;
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct ceph_hobject_id hoid;
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
+ if (!spg)
+ return false;
+
+ hoid_fill_from_target(&hoid, &req->r_t);
+ backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
+ if (!backoff)
+ return false;
+
+ dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
+ __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
+ backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
+ return true;
+}
+
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
@@ -1483,7 +1830,37 @@ static void setup_request_data(struct ceph_osd_request *req,
WARN_ON(data_len != msg->data_length);
}
-static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void encode_pgid(void **p, const struct ceph_pg *pgid)
+{
+ ceph_encode_8(p, 1);
+ ceph_encode_64(p, pgid->pool);
+ ceph_encode_32(p, pgid->seed);
+ ceph_encode_32(p, -1); /* preferred */
+}
+
+static void encode_spgid(void **p, const struct ceph_spg *spgid)
+{
+ ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
+ encode_pgid(p, &spgid->pgid);
+ ceph_encode_8(p, spgid->shard);
+}
+
+static void encode_oloc(void **p, void *end,
+ const struct ceph_object_locator *oloc)
+{
+ ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
+ ceph_encode_64(p, oloc->pool);
+ ceph_encode_32(p, -1); /* preferred */
+ ceph_encode_32(p, 0); /* key len */
+ if (oloc->pool_ns)
+ ceph_encode_string(p, end, oloc->pool_ns->str,
+ oloc->pool_ns->len);
+ else
+ ceph_encode_32(p, 0);
+}
+
+static void encode_request_partial(struct ceph_osd_request *req,
+ struct ceph_msg *msg)
{
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;
@@ -1500,38 +1877,27 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
setup_request_data(req, msg);
- ceph_encode_32(&p, 1); /* client_inc, always 1 */
+ encode_spgid(&p, &req->r_t.spgid); /* actual spg */
+ ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
ceph_encode_32(&p, req->r_flags);
- ceph_encode_timespec(p, &req->r_mtime);
- p += sizeof(struct ceph_timespec);
- /* reassert_version */
- memset(p, 0, sizeof(struct ceph_eversion));
- p += sizeof(struct ceph_eversion);
-
- /* oloc */
- ceph_start_encoding(&p, 5, 4,
- ceph_oloc_encoding_size(&req->r_t.target_oloc));
- ceph_encode_64(&p, req->r_t.target_oloc.pool);
- ceph_encode_32(&p, -1); /* preferred */
- ceph_encode_32(&p, 0); /* key len */
- if (req->r_t.target_oloc.pool_ns)
- ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
- req->r_t.target_oloc.pool_ns->len);
- else
- ceph_encode_32(&p, 0);
+ /* reqid */
+ ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
+ memset(p, 0, sizeof(struct ceph_osd_reqid));
+ p += sizeof(struct ceph_osd_reqid);
- /* pgid */
- ceph_encode_8(&p, 1);
- ceph_encode_64(&p, req->r_t.pgid.pool);
- ceph_encode_32(&p, req->r_t.pgid.seed);
- ceph_encode_32(&p, -1); /* preferred */
+ /* trace */
+ memset(p, 0, sizeof(struct ceph_blkin_trace_info));
+ p += sizeof(struct ceph_blkin_trace_info);
+
+ ceph_encode_32(&p, 0); /* client_inc, always 0 */
+ ceph_encode_timespec(p, &req->r_mtime);
+ p += sizeof(struct ceph_timespec);
- /* oid */
- ceph_encode_32(&p, req->r_t.target_oid.name_len);
- memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
- p += req->r_t.target_oid.name_len;
+ encode_oloc(&p, end, &req->r_t.target_oloc);
+ ceph_encode_string(&p, end, req->r_t.target_oid.name,
+ req->r_t.target_oid.name_len);
/* ops, can imply data */
ceph_encode_16(&p, req->r_num_ops);
@@ -1552,11 +1918,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
}
ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
+ BUG_ON(p != end - 8); /* space for features */
- BUG_ON(p > end);
- msg->front.iov_len = p - msg->front.iov_base;
- msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
+ /* front_len is finalized in encode_request_finish() */
msg->hdr.data_len = cpu_to_le32(data_len);
/*
* The header "data_off" is a hint to the receiver allowing it
@@ -1565,9 +1930,99 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
*/
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
- dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__,
- req, req->r_t.target_oid.name, req->r_t.target_oid.name_len,
- msg->front.iov_len, data_len);
+ dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
+ req->r_t.target_oid.name, req->r_t.target_oid.name_len);
+}
+
+static void encode_request_finish(struct ceph_msg *msg)
+{
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
+
+ if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
+ /* luminous OSD -- encode features and be done */
+ p = end - 8;
+ ceph_encode_64(&p, msg->con->peer_features);
+ } else {
+ struct {
+ char spgid[CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1];
+ __le32 hash;
+ __le32 epoch;
+ __le32 flags;
+ char reqid[CEPH_ENCODING_START_BLK_LEN +
+ sizeof(struct ceph_osd_reqid)];
+ char trace[sizeof(struct ceph_blkin_trace_info)];
+ __le32 client_inc;
+ struct ceph_timespec mtime;
+ } __packed head;
+ struct ceph_pg pgid;
+ void *oloc, *oid, *tail;
+ int oloc_len, oid_len, tail_len;
+ int len;
+
+ /*
+ * Pre-luminous OSD -- reencode v8 into v4 using @head
+ * as a temporary buffer. Encode the raw PG; the rest
+ * is just a matter of moving oloc, oid and tail blobs
+ * around.
+ */
+ memcpy(&head, p, sizeof(head));
+ p += sizeof(head);
+
+ oloc = p;
+ p += CEPH_ENCODING_START_BLK_LEN;
+ pgid.pool = ceph_decode_64(&p);
+ p += 4 + 4; /* preferred, key len */
+ len = ceph_decode_32(&p);
+ p += len; /* nspace */
+ oloc_len = p - oloc;
+
+ oid = p;
+ len = ceph_decode_32(&p);
+ p += len;
+ oid_len = p - oid;
+
+ tail = p;
+ tail_len = (end - p) - 8;
+
+ p = msg->front.iov_base;
+ ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
+ ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
+ ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
+ ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
+
+ /* reassert_version */
+ memset(p, 0, sizeof(struct ceph_eversion));
+ p += sizeof(struct ceph_eversion);
+
+ BUG_ON(p >= oloc);
+ memmove(p, oloc, oloc_len);
+ p += oloc_len;
+
+ pgid.seed = le32_to_cpu(head.hash);
+ encode_pgid(&p, &pgid); /* raw pg */
+
+ BUG_ON(p >= oid);
+ memmove(p, oid, oid_len);
+ p += oid_len;
+
+ /* tail -- ops, snapid, snapc, retry_attempt */
+ BUG_ON(p >= tail);
+ memmove(p, tail, tail_len);
+ p += tail_len;
+
+ msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
+ }
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+ dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
+ le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
+ le16_to_cpu(msg->hdr.version));
}
/*
@@ -1580,6 +2035,10 @@ static void send_request(struct ceph_osd_request *req)
verify_osd_locked(osd);
WARN_ON(osd->o_osd != req->r_t.osd);
+ /* backoff? */
+ if (should_plug_request(req))
+ return;
+
/*
* We may have a previously queued request message hanging
* around. Cancel it to avoid corrupting the msgr.
@@ -1593,11 +2052,13 @@ static void send_request(struct ceph_osd_request *req)
else
WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
- encode_request(req, req->r_request);
+ encode_request_partial(req, req->r_request);
- dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
+ dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
__func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
- req->r_t.osd, req->r_flags, req->r_attempts);
+ req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
+ req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
+ req->r_attempts);
req->r_t.paused = false;
req->r_stamp = jiffies;
@@ -1645,7 +2106,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
again:
- ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ ct_res = calc_target(osdc, &req->r_t, NULL, false);
if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
goto promote;
@@ -1737,13 +2198,12 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
static void finish_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
- struct ceph_osd *osd = req->r_osd;
- verify_osd_locked(osd);
+ WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
- WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
- unlink_request(osd, req);
+ if (req->r_osd)
+ unlink_request(req->r_osd, req);
atomic_dec(&osdc->num_requests);
/*
@@ -2441,7 +2901,7 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
struct ceph_osd_client *osdc = lreq->osdc;
struct ceph_osd *osd;
- calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
+ calc_target(osdc, &lreq->t, NULL, false);
osd = lookup_create_osd(osdc, lreq->t.osd, true);
link_linger(osd, lreq);
@@ -3059,7 +3519,7 @@ recalc_linger_target(struct ceph_osd_linger_request *lreq)
struct ceph_osd_client *osdc = lreq->osdc;
enum calc_target_result ct_res;
- ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
+ ct_res = calc_target(osdc, &lreq->t, NULL, true);
if (ct_res == CALC_TARGET_NEED_RESEND) {
struct ceph_osd *osd;
@@ -3117,6 +3577,7 @@ static void scan_requests(struct ceph_osd *osd,
list_add_tail(&lreq->scan_item, need_resend_linger);
break;
case CALC_TARGET_POOL_DNE:
+ list_del_init(&lreq->scan_item);
check_linger_pool_dne(lreq);
break;
}
@@ -3130,8 +3591,8 @@ static void scan_requests(struct ceph_osd *osd,
n = rb_next(n); /* unlink_request(), check_pool_dne() */
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
- ct_res = calc_target(osdc, &req->r_t,
- &req->r_last_force_resend, false);
+ ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
+ false);
switch (ct_res) {
case CALC_TARGET_NO_ACTION:
force_resend_writes = cleared_full ||
@@ -3229,8 +3690,25 @@ static void kick_requests(struct ceph_osd_client *osdc,
struct list_head *need_resend_linger)
{
struct ceph_osd_linger_request *lreq, *nlreq;
+ enum calc_target_result ct_res;
struct rb_node *n;
+ /* make sure need_resend targets reflect latest map */
+ for (n = rb_first(need_resend); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ n = rb_next(n);
+
+ if (req->r_t.epoch < osdc->osdmap->epoch) {
+ ct_res = calc_target(osdc, &req->r_t, NULL, false);
+ if (ct_res == CALC_TARGET_POOL_DNE) {
+ erase_request(need_resend, req);
+ check_pool_dne(req);
+ }
+ }
+ }
+
for (n = rb_first(need_resend); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
@@ -3239,8 +3717,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
n = rb_next(n);
erase_request(need_resend, req); /* before link_request() */
- WARN_ON(req->r_osd);
- calc_target(osdc, &req->r_t, NULL, false);
osd = lookup_create_osd(osdc, req->r_t.osd, true);
link_request(osd, req);
if (!req->r_linger) {
@@ -3383,6 +3859,8 @@ static void kick_osd_requests(struct ceph_osd *osd)
{
struct rb_node *n;
+ clear_backoffs(osd);
+
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
@@ -3428,6 +3906,261 @@ out_unlock:
up_write(&osdc->lock);
}
+struct MOSDBackoff {
+ struct ceph_spg spgid;
+ u32 map_epoch;
+ u8 op;
+ u64 id;
+ struct ceph_hobject_id *begin;
+ struct ceph_hobject_id *end;
+};
+
+static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
+{
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front.iov_len;
+ u8 struct_v;
+ u32 struct_len;
+ int ret;
+
+ ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
+ if (ret)
+ return ret;
+
+ ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
+ if (ret)
+ return ret;
+
+ ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
+ ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
+ ceph_decode_8_safe(&p, end, m->op, e_inval);
+ ceph_decode_64_safe(&p, end, m->id, e_inval);
+
+ m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
+ if (!m->begin)
+ return -ENOMEM;
+
+ ret = decode_hoid(&p, end, m->begin);
+ if (ret) {
+ free_hoid(m->begin);
+ return ret;
+ }
+
+ m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
+ if (!m->end) {
+ free_hoid(m->begin);
+ return -ENOMEM;
+ }
+
+ ret = decode_hoid(&p, end, m->end);
+ if (ret) {
+ free_hoid(m->begin);
+ free_hoid(m->end);
+ return ret;
+ }
+
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static struct ceph_msg *create_backoff_message(
+ const struct ceph_osd_backoff *backoff,
+ u32 map_epoch)
+{
+ struct ceph_msg *msg;
+ void *p, *end;
+ int msg_size;
+
+ msg_size = CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1; /* spgid */
+ msg_size += 4 + 1 + 8; /* map_epoch, op, id */
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ hoid_encoding_size(backoff->begin);
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ hoid_encoding_size(backoff->end);
+
+ msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
+ if (!msg)
+ return NULL;
+
+ p = msg->front.iov_base;
+ end = p + msg->front_alloc_len;
+
+ encode_spgid(&p, &backoff->spgid);
+ ceph_encode_32(&p, map_epoch);
+ ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
+ ceph_encode_64(&p, backoff->id);
+ encode_hoid(&p, end, backoff->begin);
+ encode_hoid(&p, end, backoff->end);
+ BUG_ON(p != end);
+
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+ return msg;
+}
+
+static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
+{
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct ceph_msg *msg;
+
+ dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+ m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
+ if (!spg) {
+ spg = alloc_spg_mapping();
+ if (!spg) {
+ pr_err("%s failed to allocate spg\n", __func__);
+ return;
+ }
+ spg->spgid = m->spgid; /* struct */
+ insert_spg_mapping(&osd->o_backoff_mappings, spg);
+ }
+
+ backoff = alloc_backoff();
+ if (!backoff) {
+ pr_err("%s failed to allocate backoff\n", __func__);
+ return;
+ }
+ backoff->spgid = m->spgid; /* struct */
+ backoff->id = m->id;
+ backoff->begin = m->begin;
+ m->begin = NULL; /* backoff now owns this */
+ backoff->end = m->end;
+ m->end = NULL; /* ditto */
+
+ insert_backoff(&spg->backoffs, backoff);
+ insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+
+ /*
+ * Ack with original backoff's epoch so that the OSD can
+ * discard this if there was a PG split.
+ */
+ msg = create_backoff_message(backoff, m->map_epoch);
+ if (!msg) {
+ pr_err("%s failed to allocate msg\n", __func__);
+ return;
+ }
+ ceph_con_send(&osd->o_con, msg);
+}
+
+static bool target_contained_by(const struct ceph_osd_request_target *t,
+ const struct ceph_hobject_id *begin,
+ const struct ceph_hobject_id *end)
+{
+ struct ceph_hobject_id hoid;
+ int cmp;
+
+ hoid_fill_from_target(&hoid, t);
+ cmp = hoid_compare(&hoid, begin);
+ return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
+}
+
+static void handle_backoff_unblock(struct ceph_osd *osd,
+ const struct MOSDBackoff *m)
+{
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct rb_node *n;
+
+ dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+ m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+ backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
+ if (!backoff) {
+ pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
+ __func__, osd->o_osd, m->spgid.pgid.pool,
+ m->spgid.pgid.seed, m->spgid.shard, m->id);
+ return;
+ }
+
+ if (hoid_compare(backoff->begin, m->begin) &&
+ hoid_compare(backoff->end, m->end)) {
+ pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
+ __func__, osd->o_osd, m->spgid.pgid.pool,
+ m->spgid.pgid.seed, m->spgid.shard, m->id);
+ /* unblock it anyway... */
+ }
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
+ BUG_ON(!spg);
+
+ erase_backoff(&spg->backoffs, backoff);
+ erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+ free_backoff(backoff);
+
+ if (RB_EMPTY_ROOT(&spg->backoffs)) {
+ erase_spg_mapping(&osd->o_backoff_mappings, spg);
+ free_spg_mapping(spg);
+ }
+
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
+ /*
+ * Match against @m, not @backoff -- the PG may
+ * have split on the OSD.
+ */
+ if (target_contained_by(&req->r_t, m->begin, m->end)) {
+ /*
+ * If no other installed backoff applies,
+ * resend.
+ */
+ send_request(req);
+ }
+ }
+ }
+}
+
+static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ struct MOSDBackoff m;
+ int ret;
+
+ down_read(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown\n", __func__, osd->o_osd);
+ up_read(&osdc->lock);
+ return;
+ }
+ WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+ mutex_lock(&osd->lock);
+ ret = decode_MOSDBackoff(msg, &m);
+ if (ret) {
+ pr_err("failed to decode MOSDBackoff: %d\n", ret);
+ ceph_msg_dump(msg);
+ goto out_unlock;
+ }
+
+ switch (m.op) {
+ case CEPH_OSD_BACKOFF_OP_BLOCK:
+ handle_backoff_block(osd, &m);
+ break;
+ case CEPH_OSD_BACKOFF_OP_UNBLOCK:
+ handle_backoff_unblock(osd, &m);
+ break;
+ default:
+ pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
+ }
+
+ free_hoid(m.begin);
+ free_hoid(m.end);
+
+out_unlock:
+ mutex_unlock(&osd->lock);
+ up_read(&osdc->lock);
+}
+
/*
* Process osd watch notifications
*/
@@ -4365,6 +5098,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
case CEPH_MSG_OSD_OPREPLY:
handle_reply(osd, msg);
break;
+ case CEPH_MSG_OSD_BACKOFF:
+ handle_backoff(osd, msg);
+ break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
break;
@@ -4487,6 +5223,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
*skip = 0;
switch (type) {
case CEPH_MSG_OSD_MAP:
+ case CEPH_MSG_OSD_BACKOFF:
case CEPH_MSG_WATCH_NOTIFY:
return alloc_msg_with_page_vector(hdr);
case CEPH_MSG_OSD_OPREPLY:
@@ -4571,6 +5308,11 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}
+static void osd_reencode_message(struct ceph_msg *msg)
+{
+ encode_request_finish(msg);
+}
+
static int osd_sign_message(struct ceph_msg *msg)
{
struct ceph_osd *o = msg->con->private;
@@ -4595,6 +5337,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
+ .reencode_message = osd_reencode_message,
.sign_message = osd_sign_message,
.check_message_signature = osd_check_message_signature,
.fault = osd_fault,
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 55e3a477f92d..864789c5974e 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -11,7 +11,7 @@
#include <linux/crush/hash.h>
#include <linux/crush/mapper.h>
-char *ceph_osdmap_state_str(char *str, int len, int state)
+char *ceph_osdmap_state_str(char *str, int len, u32 state)
{
if (!len)
return str;
@@ -138,19 +138,175 @@ bad:
return -EINVAL;
}
-static int skip_name_map(void **p, void *end)
+static struct crush_choose_arg_map *alloc_choose_arg_map(void)
{
- int len;
- ceph_decode_32_safe(p, end, len ,bad);
- while (len--) {
- int strlen;
- *p += sizeof(u32);
- ceph_decode_32_safe(p, end, strlen, bad);
- *p += strlen;
+ struct crush_choose_arg_map *arg_map;
+
+ arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
+ if (!arg_map)
+ return NULL;
+
+ RB_CLEAR_NODE(&arg_map->node);
+ return arg_map;
}
- return 0;
-bad:
- return -EINVAL;
+
+static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
+{
+ if (arg_map) {
+ int i, j;
+
+ WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
+
+ for (i = 0; i < arg_map->size; i++) {
+ struct crush_choose_arg *arg = &arg_map->args[i];
+
+ for (j = 0; j < arg->weight_set_size; j++)
+ kfree(arg->weight_set[j].weights);
+ kfree(arg->weight_set);
+ kfree(arg->ids);
+ }
+ kfree(arg_map->args);
+ kfree(arg_map);
+ }
+}
+
+DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
+ node);
+
+void clear_choose_args(struct crush_map *c)
+{
+ while (!RB_EMPTY_ROOT(&c->choose_args)) {
+ struct crush_choose_arg_map *arg_map =
+ rb_entry(rb_first(&c->choose_args),
+ struct crush_choose_arg_map, node);
+
+ erase_choose_arg_map(&c->choose_args, arg_map);
+ free_choose_arg_map(arg_map);
+ }
+}
+
+static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
+{
+ u32 *a = NULL;
+ u32 len;
+ int ret;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len) {
+ u32 i;
+
+ a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
+ if (!a) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_need(p, end, len * sizeof(u32), e_inval);
+ for (i = 0; i < len; i++)
+ a[i] = ceph_decode_32(p);
+ }
+
+ *plen = len;
+ return a;
+
+e_inval:
+ ret = -EINVAL;
+fail:
+ kfree(a);
+ return ERR_PTR(ret);
+}
+
+/*
+ * Assumes @arg is zero-initialized.
+ */
+static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
+{
+ int ret;
+
+ ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
+ if (arg->weight_set_size) {
+ u32 i;
+
+ arg->weight_set = kmalloc_array(arg->weight_set_size,
+ sizeof(*arg->weight_set),
+ GFP_NOIO);
+ if (!arg->weight_set)
+ return -ENOMEM;
+
+ for (i = 0; i < arg->weight_set_size; i++) {
+ struct crush_weight_set *w = &arg->weight_set[i];
+
+ w->weights = decode_array_32_alloc(p, end, &w->size);
+ if (IS_ERR(w->weights)) {
+ ret = PTR_ERR(w->weights);
+ w->weights = NULL;
+ return ret;
+ }
+ }
+ }
+
+ arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
+ if (IS_ERR(arg->ids)) {
+ ret = PTR_ERR(arg->ids);
+ arg->ids = NULL;
+ return ret;
+ }
+
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static int decode_choose_args(void **p, void *end, struct crush_map *c)
+{
+ struct crush_choose_arg_map *arg_map = NULL;
+ u32 num_choose_arg_maps, num_buckets;
+ int ret;
+
+ ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
+ while (num_choose_arg_maps--) {
+ arg_map = alloc_choose_arg_map();
+ if (!arg_map) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_64_safe(p, end, arg_map->choose_args_index,
+ e_inval);
+ arg_map->size = c->max_buckets;
+ arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
+ GFP_NOIO);
+ if (!arg_map->args) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_32_safe(p, end, num_buckets, e_inval);
+ while (num_buckets--) {
+ struct crush_choose_arg *arg;
+ u32 bucket_index;
+
+ ceph_decode_32_safe(p, end, bucket_index, e_inval);
+ if (bucket_index >= arg_map->size)
+ goto e_inval;
+
+ arg = &arg_map->args[bucket_index];
+ ret = decode_choose_arg(p, end, arg);
+ if (ret)
+ goto fail;
+ }
+
+ insert_choose_arg_map(&c->choose_args, arg_map);
+ }
+
+ return 0;
+
+e_inval:
+ ret = -EINVAL;
+fail:
+ free_choose_arg_map(arg_map);
+ return ret;
}
static void crush_finalize(struct crush_map *c)
@@ -187,7 +343,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
void **p = &pbyval;
void *start = pbyval;
u32 magic;
- u32 num_name_maps;
dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
@@ -195,6 +350,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
if (c == NULL)
return ERR_PTR(-ENOMEM);
+ c->choose_args = RB_ROOT;
+
/* set tunables to default values */
c->choose_local_tries = 2;
c->choose_local_fallback_tries = 5;
@@ -353,12 +510,9 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
}
}
- /* ignore trailing name maps. */
- for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
- err = skip_name_map(p, end);
- if (err < 0)
- goto done;
- }
+ ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
+ ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+ ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
/* tunables */
ceph_decode_need(p, end, 3*sizeof(u32), done);
@@ -391,6 +545,21 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
dout("crush decode tunable chooseleaf_stable = %d\n",
c->chooseleaf_stable);
+ if (*p != end) {
+ /* class_map */
+ ceph_decode_skip_map(p, end, 32, 32, bad);
+ /* class_name */
+ ceph_decode_skip_map(p, end, 32, string, bad);
+ /* class_bucket */
+ ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
+ }
+
+ if (*p != end) {
+ err = decode_choose_args(p, end, c);
+ if (err)
+ goto bad;
+ }
+
done:
crush_finalize(c);
dout("crush_decode success\n");
@@ -418,75 +587,49 @@ int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
return 0;
}
-/*
- * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
- * to a set of osds) and primary_temp (explicit primary setting)
- */
-static int __insert_pg_mapping(struct ceph_pg_mapping *new,
- struct rb_root *root)
+int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
{
- struct rb_node **p = &root->rb_node;
- struct rb_node *parent = NULL;
- struct ceph_pg_mapping *pg = NULL;
- int c;
+ int ret;
- dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new);
- while (*p) {
- parent = *p;
- pg = rb_entry(parent, struct ceph_pg_mapping, node);
- c = ceph_pg_compare(&new->pgid, &pg->pgid);
- if (c < 0)
- p = &(*p)->rb_left;
- else if (c > 0)
- p = &(*p)->rb_right;
- else
- return -EEXIST;
- }
+ ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
+ if (ret)
+ return ret;
+
+ if (lhs->shard < rhs->shard)
+ return -1;
+ if (lhs->shard > rhs->shard)
+ return 1;
- rb_link_node(&new->node, parent, p);
- rb_insert_color(&new->node, root);
return 0;
}
-static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
- struct ceph_pg pgid)
+static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
{
- struct rb_node *n = root->rb_node;
struct ceph_pg_mapping *pg;
- int c;
- while (n) {
- pg = rb_entry(n, struct ceph_pg_mapping, node);
- c = ceph_pg_compare(&pgid, &pg->pgid);
- if (c < 0) {
- n = n->rb_left;
- } else if (c > 0) {
- n = n->rb_right;
- } else {
- dout("__lookup_pg_mapping %lld.%x got %p\n",
- pgid.pool, pgid.seed, pg);
- return pg;
- }
- }
- return NULL;
+ pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
+ if (!pg)
+ return NULL;
+
+ RB_CLEAR_NODE(&pg->node);
+ return pg;
}
-static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
+static void free_pg_mapping(struct ceph_pg_mapping *pg)
{
- struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
+ WARN_ON(!RB_EMPTY_NODE(&pg->node));
- if (pg) {
- dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
- pg);
- rb_erase(&pg->node, root);
- kfree(pg);
- return 0;
- }
- dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
- return -ENOENT;
+ kfree(pg);
}
/*
+ * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
+ * to a set of osds) and primary_temp (explicit primary setting)
+ */
+DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
+ RB_BYPTR, const struct ceph_pg *, node)
+
+/*
* rbtree of pg pool info
*/
static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
@@ -682,11 +825,48 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
*p += len;
}
+ /*
+ * last_force_op_resend_preluminous, will be overridden if the
+ * map was encoded with RESEND_ON_SPLIT
+ */
if (ev >= 15)
pi->last_force_request_resend = ceph_decode_32(p);
else
pi->last_force_request_resend = 0;
+ if (ev >= 16)
+ *p += 4; /* skip min_read_recency_for_promote */
+
+ if (ev >= 17)
+ *p += 8; /* skip expected_num_objects */
+
+ if (ev >= 19)
+ *p += 4; /* skip cache_target_dirty_high_ratio_micro */
+
+ if (ev >= 20)
+ *p += 4; /* skip min_write_recency_for_promote */
+
+ if (ev >= 21)
+ *p += 1; /* skip use_gmt_hitset */
+
+ if (ev >= 22)
+ *p += 1; /* skip fast_read */
+
+ if (ev >= 23) {
+ *p += 4; /* skip hit_set_grade_decay_rate */
+ *p += 4; /* skip hit_set_search_last_n */
+ }
+
+ if (ev >= 24) {
+ /* skip opts */
+ *p += 1 + 1; /* versions */
+ len = ceph_decode_32(p);
+ *p += len;
+ }
+
+ if (ev >= 25)
+ pi->last_force_request_resend = ceph_decode_32(p);
+
/* ignore the rest */
*p = pool_end;
@@ -743,6 +923,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
map->pool_max = -1;
map->pg_temp = RB_ROOT;
map->primary_temp = RB_ROOT;
+ map->pg_upmap = RB_ROOT;
+ map->pg_upmap_items = RB_ROOT;
mutex_init(&map->crush_workspace_mutex);
return map;
@@ -757,14 +939,28 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
struct ceph_pg_mapping, node);
- rb_erase(&pg->node, &map->pg_temp);
- kfree(pg);
+ erase_pg_mapping(&map->pg_temp, pg);
+ free_pg_mapping(pg);
}
while (!RB_EMPTY_ROOT(&map->primary_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->primary_temp),
struct ceph_pg_mapping, node);
- rb_erase(&pg->node, &map->primary_temp);
+ erase_pg_mapping(&map->primary_temp, pg);
+ free_pg_mapping(pg);
+ }
+ while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(rb_first(&map->pg_upmap),
+ struct ceph_pg_mapping, node);
+ rb_erase(&pg->node, &map->pg_upmap);
+ kfree(pg);
+ }
+ while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(rb_first(&map->pg_upmap_items),
+ struct ceph_pg_mapping, node);
+ rb_erase(&pg->node, &map->pg_upmap_items);
kfree(pg);
}
while (!RB_EMPTY_ROOT(&map->pg_pools)) {
@@ -788,7 +984,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
*/
static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
{
- u8 *state;
+ u32 *state;
u32 *weight;
struct ceph_entity_addr *addr;
int i;
@@ -964,47 +1160,40 @@ static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
return __decode_pools(p, end, map, true);
}
-static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
- bool incremental)
+typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
+
+static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
+ decode_mapping_fn_t fn, bool incremental)
{
u32 n;
+ WARN_ON(!incremental && !fn);
+
ceph_decode_32_safe(p, end, n, e_inval);
while (n--) {
+ struct ceph_pg_mapping *pg;
struct ceph_pg pgid;
- u32 len, i;
int ret;
ret = ceph_decode_pgid(p, end, &pgid);
if (ret)
return ret;
- ceph_decode_32_safe(p, end, len, e_inval);
-
- ret = __remove_pg_mapping(&map->pg_temp, pgid);
- BUG_ON(!incremental && ret != -ENOENT);
-
- if (!incremental || len > 0) {
- struct ceph_pg_mapping *pg;
-
- ceph_decode_need(p, end, len*sizeof(u32), e_inval);
-
- if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
- return -EINVAL;
-
- pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
- if (!pg)
- return -ENOMEM;
+ pg = lookup_pg_mapping(mapping_root, &pgid);
+ if (pg) {
+ WARN_ON(!incremental);
+ erase_pg_mapping(mapping_root, pg);
+ free_pg_mapping(pg);
+ }
- pg->pgid = pgid;
- pg->pg_temp.len = len;
- for (i = 0; i < len; i++)
- pg->pg_temp.osds[i] = ceph_decode_32(p);
+ if (fn) {
+ pg = fn(p, end, incremental);
+ if (IS_ERR(pg))
+ return PTR_ERR(pg);
- ret = __insert_pg_mapping(pg, &map->pg_temp);
- if (ret) {
- kfree(pg);
- return ret;
+ if (pg) {
+ pg->pgid = pgid; /* struct */
+ insert_pg_mapping(mapping_root, pg);
}
}
}
@@ -1015,69 +1204,77 @@ e_inval:
return -EINVAL;
}
+static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
+ bool incremental)
+{
+ struct ceph_pg_mapping *pg;
+ u32 len, i;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len == 0 && incremental)
+ return NULL; /* new_pg_temp: [] to remove */
+ if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
+ return ERR_PTR(-EINVAL);
+
+ ceph_decode_need(p, end, len * sizeof(u32), e_inval);
+ pg = alloc_pg_mapping(len * sizeof(u32));
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
+
+ pg->pg_temp.len = len;
+ for (i = 0; i < len; i++)
+ pg->pg_temp.osds[i] = ceph_decode_32(p);
+
+ return pg;
+
+e_inval:
+ return ERR_PTR(-EINVAL);
+}
+
static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_pg_temp(p, end, map, false);
+ return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
+ false);
}
static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_pg_temp(p, end, map, true);
+ return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
+ true);
}
-static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
- bool incremental)
+static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
+ bool incremental)
{
- u32 n;
-
- ceph_decode_32_safe(p, end, n, e_inval);
- while (n--) {
- struct ceph_pg pgid;
- u32 osd;
- int ret;
-
- ret = ceph_decode_pgid(p, end, &pgid);
- if (ret)
- return ret;
-
- ceph_decode_32_safe(p, end, osd, e_inval);
-
- ret = __remove_pg_mapping(&map->primary_temp, pgid);
- BUG_ON(!incremental && ret != -ENOENT);
-
- if (!incremental || osd != (u32)-1) {
- struct ceph_pg_mapping *pg;
-
- pg = kzalloc(sizeof(*pg), GFP_NOFS);
- if (!pg)
- return -ENOMEM;
+ struct ceph_pg_mapping *pg;
+ u32 osd;
- pg->pgid = pgid;
- pg->primary_temp.osd = osd;
+ ceph_decode_32_safe(p, end, osd, e_inval);
+ if (osd == (u32)-1 && incremental)
+ return NULL; /* new_primary_temp: -1 to remove */
- ret = __insert_pg_mapping(pg, &map->primary_temp);
- if (ret) {
- kfree(pg);
- return ret;
- }
- }
- }
+ pg = alloc_pg_mapping(0);
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
- return 0;
+ pg->primary_temp.osd = osd;
+ return pg;
e_inval:
- return -EINVAL;
+ return ERR_PTR(-EINVAL);
}
static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_primary_temp(p, end, map, false);
+ return decode_pg_mapping(p, end, &map->primary_temp,
+ __decode_primary_temp, false);
}
static int decode_new_primary_temp(void **p, void *end,
struct ceph_osdmap *map)
{
- return __decode_primary_temp(p, end, map, true);
+ return decode_pg_mapping(p, end, &map->primary_temp,
+ __decode_primary_temp, true);
}
u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
@@ -1168,6 +1365,75 @@ e_inval:
return -EINVAL;
}
+static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
+ bool __unused)
+{
+ return __decode_pg_temp(p, end, false);
+}
+
+static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
+ false);
+}
+
+static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
+ true);
+}
+
+static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
+}
+
+static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
+ bool __unused)
+{
+ struct ceph_pg_mapping *pg;
+ u32 len, i;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
+ return ERR_PTR(-EINVAL);
+
+ ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
+ pg = kzalloc(sizeof(*pg) + 2 * len * sizeof(u32), GFP_NOIO);
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
+
+ pg->pg_upmap_items.len = len;
+ for (i = 0; i < len; i++) {
+ pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
+ pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
+ }
+
+ return pg;
+
+e_inval:
+ return ERR_PTR(-EINVAL);
+}
+
+static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items,
+ __decode_pg_upmap_items, false);
+}
+
+static int decode_new_pg_upmap_items(void **p, void *end,
+ struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items,
+ __decode_pg_upmap_items, true);
+}
+
+static int decode_old_pg_upmap_items(void **p, void *end,
+ struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
+}
+
/*
* decode a full map.
*/
@@ -1218,13 +1484,21 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
/* osd_state, osd_weight, osd_addrs->client_addr */
ceph_decode_need(p, end, 3*sizeof(u32) +
- map->max_osd*(1 + sizeof(*map->osd_weight) +
+ map->max_osd*((struct_v >= 5 ? sizeof(u32) :
+ sizeof(u8)) +
+ sizeof(*map->osd_weight) +
sizeof(*map->osd_addr)), e_inval);
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
- ceph_decode_copy(p, map->osd_state, map->max_osd);
+ if (struct_v >= 5) {
+ for (i = 0; i < map->max_osd; i++)
+ map->osd_state[i] = ceph_decode_32(p);
+ } else {
+ for (i = 0; i < map->max_osd; i++)
+ map->osd_state[i] = ceph_decode_8(p);
+ }
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
@@ -1257,9 +1531,7 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
if (err)
goto bad;
} else {
- /* XXX can this happen? */
- kfree(map->osd_primary_affinity);
- map->osd_primary_affinity = NULL;
+ WARN_ON(map->osd_primary_affinity);
}
/* crush */
@@ -1268,6 +1540,26 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
if (err)
goto bad;
+ *p += len;
+ if (struct_v >= 3) {
+ /* erasure_code_profiles */
+ ceph_decode_skip_map_of_map(p, end, string, string, string,
+ bad);
+ }
+
+ if (struct_v >= 4) {
+ err = decode_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+ } else {
+ WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
+ WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
+ }
+
/* ignore the rest */
*p = end;
@@ -1314,7 +1606,7 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
* new_up_client: { osd=6, addr=... } # set osd_state and addr
* new_state: { osd=6, xorstate=EXISTS } # clear osd_state
*/
-static int decode_new_up_state_weight(void **p, void *end,
+static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
struct ceph_osdmap *map)
{
void *new_up_client;
@@ -1330,7 +1622,7 @@ static int decode_new_up_state_weight(void **p, void *end,
new_state = *p;
ceph_decode_32_safe(p, end, len, e_inval);
- len *= sizeof(u32) + sizeof(u8);
+ len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
ceph_decode_need(p, end, len, e_inval);
*p += len;
@@ -1366,11 +1658,14 @@ static int decode_new_up_state_weight(void **p, void *end,
len = ceph_decode_32(p);
while (len--) {
s32 osd;
- u8 xorstate;
+ u32 xorstate;
int ret;
osd = ceph_decode_32(p);
- xorstate = ceph_decode_8(p);
+ if (struct_v >= 5)
+ xorstate = ceph_decode_32(p);
+ else
+ xorstate = ceph_decode_8(p);
if (xorstate == 0)
xorstate = CEPH_OSD_UP;
BUG_ON(osd >= map->max_osd);
@@ -1504,7 +1799,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
}
/* new_up_client, new_state, new_weight */
- err = decode_new_up_state_weight(p, end, map);
+ err = decode_new_up_state_weight(p, end, struct_v, map);
if (err)
goto bad;
@@ -1527,6 +1822,32 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
goto bad;
}
+ if (struct_v >= 3) {
+ /* new_erasure_code_profiles */
+ ceph_decode_skip_map_of_map(p, end, string, string, string,
+ bad);
+ /* old_erasure_code_profiles */
+ ceph_decode_skip_set(p, end, string, bad);
+ }
+
+ if (struct_v >= 4) {
+ err = decode_new_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_old_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_new_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_old_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+ }
+
/* ignore the rest */
*p = end;
@@ -1547,12 +1868,13 @@ bad:
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
- WARN_ON(!ceph_oloc_empty(dest));
- WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
+ ceph_oloc_destroy(dest);
dest->pool = src->pool;
if (src->pool_ns)
dest->pool_ns = ceph_get_string(src->pool_ns);
+ else
+ dest->pool_ns = NULL;
}
EXPORT_SYMBOL(ceph_oloc_copy);
@@ -1565,14 +1887,15 @@ EXPORT_SYMBOL(ceph_oloc_destroy);
void ceph_oid_copy(struct ceph_object_id *dest,
const struct ceph_object_id *src)
{
- WARN_ON(!ceph_oid_empty(dest));
+ ceph_oid_destroy(dest);
if (src->name != src->inline_name) {
/* very rare, see ceph_object_id definition */
dest->name = kmalloc(src->name_len + 1,
GFP_NOIO | __GFP_NOFAIL);
+ } else {
+ dest->name = dest->inline_name;
}
-
memcpy(dest->name, src->name, src->name_len + 1);
dest->name_len = src->name_len;
}
@@ -1714,9 +2037,8 @@ void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
dest->primary = src->primary;
}
-static bool is_split(const struct ceph_pg *pgid,
- u32 old_pg_num,
- u32 new_pg_num)
+bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
+ u32 new_pg_num)
{
int old_bits = calc_bits_of(old_pg_num);
int old_mask = (1 << old_bits) - 1;
@@ -1761,7 +2083,7 @@ bool ceph_is_new_interval(const struct ceph_osds *old_acting,
!osds_equal(old_up, new_up) ||
old_size != new_size ||
old_min_size != new_min_size ||
- is_split(pgid, old_pg_num, new_pg_num) ||
+ ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
old_sort_bitwise != new_sort_bitwise;
}
@@ -1885,16 +2207,12 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
* Should only be called with target_oid and target_oloc (as opposed to
* base_oid and base_oloc), since tiering isn't taken into account.
*/
-int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
- struct ceph_object_id *oid,
- struct ceph_object_locator *oloc,
- struct ceph_pg *raw_pgid)
+int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
+ const struct ceph_object_id *oid,
+ const struct ceph_object_locator *oloc,
+ struct ceph_pg *raw_pgid)
{
- struct ceph_pg_pool_info *pi;
-
- pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
- if (!pi)
- return -ENOENT;
+ WARN_ON(pi->id != oloc->pool);
if (!oloc->pool_ns) {
raw_pgid->pool = oloc->pool;
@@ -1926,6 +2244,20 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
}
return 0;
}
+
+int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
+ const struct ceph_object_id *oid,
+ const struct ceph_object_locator *oloc,
+ struct ceph_pg *raw_pgid)
+{
+ struct ceph_pg_pool_info *pi;
+
+ pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
+ if (!pi)
+ return -ENOENT;
+
+ return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
+}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
/*
@@ -1970,23 +2302,57 @@ static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
int *result, int result_max,
- const __u32 *weight, int weight_max)
+ const __u32 *weight, int weight_max,
+ u64 choose_args_index)
{
+ struct crush_choose_arg_map *arg_map;
int r;
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
+ arg_map = lookup_choose_arg_map(&map->crush->choose_args,
+ choose_args_index);
+
mutex_lock(&map->crush_workspace_mutex);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
- weight, weight_max, map->crush_workspace);
+ weight, weight_max, map->crush_workspace,
+ arg_map ? arg_map->args : NULL);
mutex_unlock(&map->crush_workspace_mutex);
return r;
}
+static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
+ struct ceph_osds *set)
+{
+ int i;
+
+ if (ceph_can_shift_osds(pi)) {
+ int removed = 0;
+
+ /* shift left */
+ for (i = 0; i < set->size; i++) {
+ if (!ceph_osd_exists(osdmap, set->osds[i])) {
+ removed++;
+ continue;
+ }
+ if (removed)
+ set->osds[i - removed] = set->osds[i];
+ }
+ set->size -= removed;
+ } else {
+ /* set dne devices to NONE */
+ for (i = 0; i < set->size; i++) {
+ if (!ceph_osd_exists(osdmap, set->osds[i]))
+ set->osds[i] = CRUSH_ITEM_NONE;
+ }
+ }
+}
+
/*
- * Calculate raw set (CRUSH output) for given PG. The result may
- * contain nonexistent OSDs. ->primary is undefined for a raw set.
+ * Calculate raw set (CRUSH output) for given PG and filter out
+ * nonexistent OSDs. ->primary is undefined for a raw set.
*
* Placement seed (CRUSH input) is returned through @ppps.
*/
@@ -2020,7 +2386,7 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
}
len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
- osdmap->osd_weight, osdmap->max_osd);
+ osdmap->osd_weight, osdmap->max_osd, pi->id);
if (len < 0) {
pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
len, ruleno, pi->id, pi->crush_ruleset, pi->type,
@@ -2029,6 +2395,70 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
}
raw->size = len;
+ remove_nonexistent_osds(osdmap, pi, raw);
+}
+
+/* apply pg_upmap[_items] mappings */
+static void apply_upmap(struct ceph_osdmap *osdmap,
+ const struct ceph_pg *pgid,
+ struct ceph_osds *raw)
+{
+ struct ceph_pg_mapping *pg;
+ int i, j;
+
+ pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
+ if (pg) {
+ /* make sure targets aren't marked out */
+ for (i = 0; i < pg->pg_upmap.len; i++) {
+ int osd = pg->pg_upmap.osds[i];
+
+ if (osd != CRUSH_ITEM_NONE &&
+ osd < osdmap->max_osd &&
+ osdmap->osd_weight[osd] == 0) {
+ /* reject/ignore explicit mapping */
+ return;
+ }
+ }
+ for (i = 0; i < pg->pg_upmap.len; i++)
+ raw->osds[i] = pg->pg_upmap.osds[i];
+ raw->size = pg->pg_upmap.len;
+ return;
+ }
+
+ pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
+ if (pg) {
+ /*
+ * Note: this approach does not allow a bidirectional swap,
+ * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
+ */
+ for (i = 0; i < pg->pg_upmap_items.len; i++) {
+ int from = pg->pg_upmap_items.from_to[i][0];
+ int to = pg->pg_upmap_items.from_to[i][1];
+ int pos = -1;
+ bool exists = false;
+
+ /* make sure replacement doesn't already appear */
+ for (j = 0; j < raw->size; j++) {
+ int osd = raw->osds[j];
+
+ if (osd == to) {
+ exists = true;
+ break;
+ }
+ /* ignore mapping if target is marked out */
+ if (osd == from && pos < 0 &&
+ !(to != CRUSH_ITEM_NONE &&
+ to < osdmap->max_osd &&
+ osdmap->osd_weight[to] == 0)) {
+ pos = j;
+ }
+ }
+ if (!exists && pos >= 0) {
+ raw->osds[pos] = to;
+ return;
+ }
+ }
+ }
}
/*
@@ -2151,18 +2581,16 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap,
*/
static void get_temp_osds(struct ceph_osdmap *osdmap,
struct ceph_pg_pool_info *pi,
- const struct ceph_pg *raw_pgid,
+ const struct ceph_pg *pgid,
struct ceph_osds *temp)
{
- struct ceph_pg pgid;
struct ceph_pg_mapping *pg;
int i;
- raw_pg_to_pg(pi, raw_pgid, &pgid);
ceph_osds_init(temp);
/* pg_temp? */
- pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
+ pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
if (pg) {
for (i = 0; i < pg->pg_temp.len; i++) {
if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
@@ -2185,7 +2613,7 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
}
/* primary_temp? */
- pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
+ pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
if (pg)
temp->primary = pg->primary_temp.osd;
}
@@ -2198,43 +2626,75 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
* resend a request.
*/
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
const struct ceph_pg *raw_pgid,
struct ceph_osds *up,
struct ceph_osds *acting)
{
- struct ceph_pg_pool_info *pi;
+ struct ceph_pg pgid;
u32 pps;
- pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
- if (!pi) {
- ceph_osds_init(up);
- ceph_osds_init(acting);
- goto out;
- }
+ WARN_ON(pi->id != raw_pgid->pool);
+ raw_pg_to_pg(pi, raw_pgid, &pgid);
pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
+ apply_upmap(osdmap, &pgid, up);
raw_to_up_osds(osdmap, pi, up);
apply_primary_affinity(osdmap, pi, pps, up);
- get_temp_osds(osdmap, pi, raw_pgid, acting);
+ get_temp_osds(osdmap, pi, &pgid, acting);
if (!acting->size) {
memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
acting->size = up->size;
if (acting->primary == -1)
acting->primary = up->primary;
}
-out:
WARN_ON(!osds_valid(up) || !osds_valid(acting));
}
+bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
+ const struct ceph_pg *raw_pgid,
+ struct ceph_spg *spgid)
+{
+ struct ceph_pg pgid;
+ struct ceph_osds up, acting;
+ int i;
+
+ WARN_ON(pi->id != raw_pgid->pool);
+ raw_pg_to_pg(pi, raw_pgid, &pgid);
+
+ if (ceph_can_shift_osds(pi)) {
+ spgid->pgid = pgid; /* struct */
+ spgid->shard = CEPH_SPG_NOSHARD;
+ return true;
+ }
+
+ ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
+ for (i = 0; i < acting.size; i++) {
+ if (acting.osds[i] == acting.primary) {
+ spgid->pgid = pgid; /* struct */
+ spgid->shard = i;
+ return true;
+ }
+ }
+
+ return false;
+}
+
/*
* Return acting primary for given PG, or -1 if none.
*/
int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid)
{
+ struct ceph_pg_pool_info *pi;
struct ceph_osds up, acting;
- ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting);
+ pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
+ if (!pi)
+ return -1;
+
+ ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
return acting.primary;
}
EXPORT_SYMBOL(ceph_pg_to_acting_primary);