#include #include #include #include struct paxreg_peer; struct paxreg_timer { void *private; }; struct paxreg_val { void *val; size_t val_len; int64_t ts; }; enum paxreg_msg_type { msg_read, msg_read_ack, msg_write, msg_write_ack }; struct paxreg_msg { enum paxreg_msg_type type; union { struct u_read_msg { int64_t local_ts; uint64_t guid; } read; struct u_read_ack { int64_t local_ts; uint64_t guid; struct paxreg_val *last_vis; } read_ack; struct paxreg_val *write; struct paxreg_val *write_ack; } u; }; struct paxreg_peer { /* * proposer role */ uint64_t guid; /* unique id for proposer */ int64_t local_ts; /* localTS */ int64_t curr_ts; /* currTS during read op */ struct paxreg_peer **acceptor; /* also learners */ int n_acceptors; struct paxreg_val **read_ack; /* received READ-ACKs */ int read_acked; int majority; /* responses required * to form consensus */ uint64_t read_timeout; /* READ response timeout */ struct paxreg_timer *timer; /* READ response timer */ /* * acceptor role */ struct paxreg_val *last_visible; /* lastVisible */ int64_t high_ts; /* highestTS */ /* * learner role */ struct paxreg_val **write_ack; /* received WRITE-ACKs */ int write_acked; bool write_majority; /* consensus reached? */ /* backend-managed data */ void *private; /* message output callback */ bool (*msg_send)(struct paxreg_peer *, const struct paxreg_msg *); /* proposer read register completion */ bool (*read_cb)(struct paxreg_peer *, struct paxreg_val *); /* learner write register total completion */ bool (*write_cb)(struct paxreg_peer *, struct paxreg_val *); }; extern void paxreg_set_timeout(struct paxreg_timer *, uint64_t, void (*actor)(struct paxreg_timer *)); /* handled incoming READ-ACK message */ static bool paxreg_msg_read_ack(struct paxreg_peer *self, struct u_read_ack *read_ack) { /* * wait until READ-ACK(currTS, p, lastVisible) received * from majority of acceptors */ if (read_ack->guid != self->guid || read_ack->local_ts != self->curr_ts) return true; /* add to list of READ-ACK received */ self->read_ack[self->read_acked++] = read_ack->last_vis; /* if majority is possible... */ if (self->read_acked >= self->majority) { int i; struct paxreg_val v = { NULL, 0, 0 }; /* v = lastVisible with highest timestamp */ for (i = 0; i < self->read_acked; i++) if (self->read_ack[i]->ts > v.ts) memcpy(&v, self->read_ack[i], sizeof(struct paxreg_val)); /* return (v, currTS) */ v.ts = self->curr_ts; if (!self->read_cb(self, &v)) return false; } return true; } /* handled incoming READ message */ static bool paxreg_msg_read(struct paxreg_peer *self, struct paxreg_peer *sender, struct u_read_msg *read) { struct paxreg_msg msg; /* * if (ts > highestTS) * highestTS = ts * send(READ-ACK, ts, lastVisible) */ if (read->local_ts <= self->high_ts) return true; self->high_ts = read->local_ts; /* msg: READ-ACK(ts, lastVisible) */ msg.type = msg_read_ack; msg.u.read_ack.local_ts = read->local_ts; msg.u.read_ack.guid = read->guid; msg.u.read_ack.last_vis = self->last_visible; /* send message to proposer */ return self->msg_send(sender, &msg); } /* handled incoming WRITE message */ static bool paxreg_msg_write(struct paxreg_peer *self, struct paxreg_val *v) { struct paxreg_msg msg; int i; /* * if (ts >= highestTS) * highestTS = ts * lastVisible = (v, ts) * send(WRITE-ACK, v, ts) to learners */ if (v->ts < self->high_ts) return true; self->high_ts = v->ts; self->last_visible = v; /* msg: WRITE-ACK(v, ts) */ msg.type = msg_write_ack; msg.u.write_ack = v; /* send message to all learners (==accepters in this case) */ for (i = 0; i < self->n_acceptors; i++) if (!self->msg_send(self->acceptor[i], &msg)) return false; return true; } /* handled incoming WRITE-ACK message */ static bool paxreg_msg_write_ack(struct paxreg_peer *self, struct paxreg_val *v) { int i, match = 0; /* add value to list of received acks */ self->write_ack[self->write_acked++] = v; /* return immediately, if majority impossible at this point */ if (self->write_acked < self->majority) return true; /* return immediately, if we've already reached a conclusion */ if (self->write_majority) return true; /* count number of WRITE-ACKs with values equivalent to 'v' */ for (i = 0; i < self->write_acked; i++) { if ((self->write_ack[i]->ts == v->ts) && (self->write_ack[i]->val_len = v->val_len) && (!memcmp(self->write_ack[i]->val, v->val, v->val_len))) match++; } /* if matches are majority, write is complete */ if (match >= self->majority) { self->write_majority = true; return self->write_cb(self, v); } return true; } /* handle incoming message */ bool paxreg_rx_msg(struct paxreg_peer *self, struct paxreg_peer *sender, struct paxreg_msg *msg) { switch (msg->type) { /* READ(currTS, p) */ case msg_read: return paxreg_msg_read(self, sender, &msg->u.read); /* READ-ACK(currTS, p, lastVisible) */ case msg_read_ack: return paxreg_msg_read_ack(self, &msg->u.read_ack); /* WRITE(v, ts) */ case msg_write: return paxreg_msg_write(self, msg->u.write); /* WRITE-ACK(v, ts) */ case msg_write_ack: return paxreg_msg_write_ack(self, msg->u.write_ack); } return false; } /* paxos register write */ bool paxreg_write(struct paxreg_peer *self, struct paxreg_val *val) { struct paxreg_msg msg; int i; /* reset write-ack collection state */ self->write_majority = false; /* msg: WRITE(v, ts) */ msg.type = msg_write; msg.u.write = val; /* send message to all acceptors */ for (i = 0; i < self->n_acceptors; i++) if (!self->msg_send(self->acceptor[i], &msg)) return false; return true; } static void paxreg_read_timeout(struct paxreg_timer *timer) { struct paxreg_peer *self = timer->private; /* call read callback with NULL to indicate an error */ self->read_cb(self, NULL); } /* paxos register read */ bool paxreg_read(struct paxreg_peer *self) { struct paxreg_msg msg; int i; /* increment local timestamp */ self->curr_ts = self->local_ts; self->local_ts++; /* clear read-ack'd list */ self->read_acked = 0; /* msg: READ (currTS, p) */ msg.type = msg_read; msg.u.read.local_ts = self->curr_ts; msg.u.read.guid = self->guid; /* send message to all acceptors */ for (i = 0; i < self->n_acceptors; i++) if (!self->msg_send(self->acceptor[i], &msg)) return false; /* begin timeout period */ paxreg_set_timeout(self->timer, self->read_timeout, paxreg_read_timeout); return true; }