Diff: events/equeue/equeue.c
- Revision:
- 0:fb7af294d5d9
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/events/equeue/equeue.c Thu Nov 17 16:43:53 2016 +0000
@@ -0,0 +1,569 @@
+/*
+ * Flexible event queue for dispatching events
+ *
+ * Copyright (c) 2016 Christopher Haster
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "equeue/equeue.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+
+// calculate the relative-difference between absolute times while
+// correctly handling overflow conditions
+static inline int equeue_tickdiff(unsigned a, unsigned b) {
+ return (int)(unsigned)(a - b);
+}
+
+// calculate the relative-difference between absolute times, but
+// also clamp to zero, resulting in only non-zero values.
+static inline int equeue_clampdiff(unsigned a, unsigned b) {
+ int diff = equeue_tickdiff(a, b);
+ return ~(diff >> (8*sizeof(int)-1)) & diff;
+}
+
+// Increment the unique id in an event, hiding the event from cancel
+static inline void equeue_incid(equeue_t *q, struct equeue_event *e) {
+ e->id += 1;
+ if (!(e->id << q->npw2)) {
+ e->id = 1;
+ }
+}
+
+
+// equeue lifetime management
+int equeue_create(equeue_t *q, size_t size) {
+ // dynamically allocate the specified buffer
+ void *buffer = malloc(size);
+ if (!buffer) {
+ return -1;
+ }
+
+ int err = equeue_create_inplace(q, size, buffer);
+ q->allocated = buffer;
+ return err;
+}
+
+int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
+ // setup queue around provided buffer
+ q->buffer = buffer;
+ q->allocated = 0;
+
+ q->npw2 = 0;
+ for (unsigned s = size; s; s >>= 1) {
+ q->npw2++;
+ }
+
+ q->chunks = 0;
+ q->slab.size = size;
+ q->slab.data = buffer;
+
+ q->queue = 0;
+ q->tick = equeue_tick();
+ q->generation = 0;
+ q->breaks = 0;
+
+ q->background.active = false;
+ q->background.update = 0;
+ q->background.timer = 0;
+
+ // initialize platform resources
+ int err;
+ err = equeue_sema_create(&q->eventsema);
+ if (err < 0) {
+ return err;
+ }
+
+ err = equeue_mutex_create(&q->queuelock);
+ if (err < 0) {
+ return err;
+ }
+
+ err = equeue_mutex_create(&q->memlock);
+ if (err < 0) {
+ return err;
+ }
+
+ return 0;
+}
+
+void equeue_destroy(equeue_t *q) {
+ // call destructors on pending events
+ for (struct equeue_event *es = q->queue; es; es = es->next) {
+ for (struct equeue_event *e = q->queue; e; e = e->sibling) {
+ if (e->dtor) {
+ e->dtor(e + 1);
+ }
+ }
+ }
+
+ // notify background timer
+ if (q->background.update) {
+ q->background.update(q->background.timer, -1);
+ }
+
+ // clean up platform resources + memory
+ equeue_mutex_destroy(&q->memlock);
+ equeue_mutex_destroy(&q->queuelock);
+ equeue_sema_destroy(&q->eventsema);
+ free(q->allocated);
+}
+
+
+// equeue chunk allocation functions
+static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) {
+ // add event overhead
+ size += sizeof(struct equeue_event);
+ size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1);
+
+ equeue_mutex_lock(&q->memlock);
+
+ // check if a good chunk is available
+ for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) {
+ if ((*p)->size >= size) {
+ struct equeue_event *e = *p;
+ if (e->sibling) {
+ *p = e->sibling;
+ (*p)->next = e->next;
+ } else {
+ *p = e->next;
+ }
+
+ equeue_mutex_unlock(&q->memlock);
+ return e;
+ }
+ }
+
+ // otherwise allocate a new chunk out of the slab
+ if (q->slab.size >= size) {
+ struct equeue_event *e = (struct equeue_event *)q->slab.data;
+ q->slab.data += size;
+ q->slab.size -= size;
+ e->size = size;
+ e->id = 1;
+
+ equeue_mutex_unlock(&q->memlock);
+ return e;
+ }
+
+ equeue_mutex_unlock(&q->memlock);
+ return 0;
+}
+
+static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) {
+ equeue_mutex_lock(&q->memlock);
+
+ // stick chunk into list of chunks
+ struct equeue_event **p = &q->chunks;
+ while (*p && (*p)->size < e->size) {
+ p = &(*p)->next;
+ }
+
+ if (*p && (*p)->size == e->size) {
+ e->sibling = *p;
+ e->next = (*p)->next;
+ } else {
+ e->sibling = 0;
+ e->next = *p;
+ }
+ *p = e;
+
+ equeue_mutex_unlock(&q->memlock);
+}
+
+void *equeue_alloc(equeue_t *q, size_t size) {
+ struct equeue_event *e = equeue_mem_alloc(q, size);
+ if (!e) {
+ return 0;
+ }
+
+ e->target = 0;
+ e->period = -1;
+ e->dtor = 0;
+
+ return e + 1;
+}
+
+void equeue_dealloc(equeue_t *q, void *p) {
+ struct equeue_event *e = (struct equeue_event*)p - 1;
+
+ if (e->dtor) {
+ e->dtor(e+1);
+ }
+
+ equeue_mem_dealloc(q, e);
+}
+
+
+// equeue scheduling functions
+static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) {
+ // setup event and hash local id with buffer offset for unique id
+ int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
+ e->target = tick + equeue_clampdiff(e->target, tick);
+ e->generation = q->generation;
+
+ equeue_mutex_lock(&q->queuelock);
+
+ // find the event slot
+ struct equeue_event **p = &q->queue;
+ while (*p && equeue_tickdiff((*p)->target, e->target) < 0) {
+ p = &(*p)->next;
+ }
+
+ // insert at head in slot
+ if (*p && (*p)->target == e->target) {
+ e->next = (*p)->next;
+ if (e->next) {
+ e->next->ref = &e->next;
+ }
+
+ e->sibling = *p;
+ e->sibling->ref = &e->sibling;
+ } else {
+ e->next = *p;
+ if (e->next) {
+ e->next->ref = &e->next;
+ }
+
+ e->sibling = 0;
+ }
+
+ *p = e;
+ e->ref = p;
+
+ // notify background timer
+ if ((q->background.update && q->background.active) &&
+ (q->queue == e && !e->sibling)) {
+ q->background.update(q->background.timer,
+ equeue_clampdiff(e->target, tick));
+ }
+
+ equeue_mutex_unlock(&q->queuelock);
+
+ return id;
+}
+
+static struct equeue_event *equeue_unqueue(equeue_t *q, int id) {
+ // decode event from unique id and check that the local id matches
+ struct equeue_event *e = (struct equeue_event *)
+ &q->buffer[id & ((1 << q->npw2)-1)];
+
+ equeue_mutex_lock(&q->queuelock);
+ if (e->id != id >> q->npw2) {
+ equeue_mutex_unlock(&q->queuelock);
+ return 0;
+ }
+
+ // clear the event and check if already in-flight
+ e->cb = 0;
+ e->period = -1;
+
+ int diff = equeue_tickdiff(e->target, q->tick);
+ if (diff < 0 || (diff == 0 && e->generation != q->generation)) {
+ equeue_mutex_unlock(&q->queuelock);
+ return 0;
+ }
+
+ // disentangle from queue
+ if (e->sibling) {
+ e->sibling->next = e->next;
+ if (e->sibling->next) {
+ e->sibling->next->ref = &e->sibling->next;
+ }
+
+ *e->ref = e->sibling;
+ e->sibling->ref = e->ref;
+ } else {
+ *e->ref = e->next;
+ if (e->next) {
+ e->next->ref = e->ref;
+ }
+ }
+
+ equeue_incid(q, e);
+ equeue_mutex_unlock(&q->queuelock);
+
+ return e;
+}
+
+static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) {
+ equeue_mutex_lock(&q->queuelock);
+
+ // find all expired events and mark a new generation
+ q->generation += 1;
+ if (equeue_tickdiff(q->tick, target) <= 0) {
+ q->tick = target;
+ }
+
+ struct equeue_event *head = q->queue;
+ struct equeue_event **p = &head;
+ while (*p && equeue_tickdiff((*p)->target, target) <= 0) {
+ p = &(*p)->next;
+ }
+
+ q->queue = *p;
+ if (q->queue) {
+ q->queue->ref = &q->queue;
+ }
+
+ *p = 0;
+
+ equeue_mutex_unlock(&q->queuelock);
+
+ // reverse and flatten each slot to match insertion order
+ struct equeue_event **tail = &head;
+ struct equeue_event *ess = head;
+ while (ess) {
+ struct equeue_event *es = ess;
+ ess = es->next;
+
+ struct equeue_event *prev = 0;
+ for (struct equeue_event *e = es; e; e = e->sibling) {
+ e->next = prev;
+ prev = e;
+ }
+
+ *tail = prev;
+ tail = &es->next;
+ }
+
+ return head;
+}
+
+int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
+ struct equeue_event *e = (struct equeue_event*)p - 1;
+ unsigned tick = equeue_tick();
+ e->cb = cb;
+ e->target = tick + e->target;
+
+ int id = equeue_enqueue(q, e, tick);
+ equeue_sema_signal(&q->eventsema);
+ return id;
+}
+
+void equeue_cancel(equeue_t *q, int id) {
+ if (!id) {
+ return;
+ }
+
+ struct equeue_event *e = equeue_unqueue(q, id);
+ if (e) {
+ equeue_dealloc(q, e + 1);
+ }
+}
+
+void equeue_break(equeue_t *q) {
+ equeue_mutex_lock(&q->queuelock);
+ q->breaks++;
+ equeue_mutex_unlock(&q->queuelock);
+ equeue_sema_signal(&q->eventsema);
+}
+
+void equeue_dispatch(equeue_t *q, int ms) {
+ unsigned tick = equeue_tick();
+ unsigned timeout = tick + ms;
+ q->background.active = false;
+
+ while (1) {
+ // collect all the available events and next deadline
+ struct equeue_event *es = equeue_dequeue(q, tick);
+
+ // dispatch events
+ while (es) {
+ struct equeue_event *e = es;
+ es = e->next;
+
+ // actually dispatch the callbacks
+ void (*cb)(void *) = e->cb;
+ if (cb) {
+ cb(e + 1);
+ }
+
+ // reenqueue periodic events or deallocate
+ if (e->period >= 0) {
+ e->target += e->period;
+ equeue_enqueue(q, e, equeue_tick());
+ } else {
+ equeue_incid(q, e);
+ equeue_dealloc(q, e+1);
+ }
+ }
+
+ int deadline = -1;
+ tick = equeue_tick();
+
+ // check if we should stop dispatching soon
+ if (ms >= 0) {
+ deadline = equeue_tickdiff(timeout, tick);
+ if (deadline <= 0) {
+ // update background timer if necessary
+ if (q->background.update) {
+ equeue_mutex_lock(&q->queuelock);
+ if (q->background.update && q->queue) {
+ q->background.update(q->background.timer,
+ equeue_clampdiff(q->queue->target, tick));
+ }
+ q->background.active = true;
+ equeue_mutex_unlock(&q->queuelock);
+ }
+ return;
+ }
+ }
+
+ // find closest deadline
+ equeue_mutex_lock(&q->queuelock);
+ if (q->queue) {
+ int diff = equeue_clampdiff(q->queue->target, tick);
+ if ((unsigned)diff < (unsigned)deadline) {
+ deadline = diff;
+ }
+ }
+ equeue_mutex_unlock(&q->queuelock);
+
+ // wait for events
+ equeue_sema_wait(&q->eventsema, deadline);
+
+ // check if we were notified to break out of dispatch
+ if (q->breaks) {
+ equeue_mutex_lock(&q->queuelock);
+ if (q->breaks > 0) {
+ q->breaks--;
+ equeue_mutex_unlock(&q->queuelock);
+ return;
+ }
+ equeue_mutex_unlock(&q->queuelock);
+ }
+
+ // update tick for next iteration
+ tick = equeue_tick();
+ }
+}
+
+
+// event functions
+void equeue_event_delay(void *p, int ms) {
+ struct equeue_event *e = (struct equeue_event*)p - 1;
+ e->target = ms;
+}
+
+void equeue_event_period(void *p, int ms) {
+ struct equeue_event *e = (struct equeue_event*)p - 1;
+ e->period = ms;
+}
+
+void equeue_event_dtor(void *p, void (*dtor)(void *)) {
+ struct equeue_event *e = (struct equeue_event*)p - 1;
+ e->dtor = dtor;
+}
+
+
+// simple callbacks
+struct ecallback {
+ void (*cb)(void*);
+ void *data;
+};
+
+static void ecallback_dispatch(void *p) {
+ struct ecallback *e = (struct ecallback*)p;
+ e->cb(e->data);
+}
+
+int equeue_call(equeue_t *q, void (*cb)(void*), void *data) {
+ struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
+ if (!e) {
+ return 0;
+ }
+
+ e->cb = cb;
+ e->data = data;
+ return equeue_post(q, ecallback_dispatch, e);
+}
+
+int equeue_call_in(equeue_t *q, int ms, void (*cb)(void*), void *data) {
+ struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
+ if (!e) {
+ return 0;
+ }
+
+ equeue_event_delay(e, ms);
+ e->cb = cb;
+ e->data = data;
+ return equeue_post(q, ecallback_dispatch, e);
+}
+
+int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) {
+ struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
+ if (!e) {
+ return 0;
+ }
+
+ equeue_event_delay(e, ms);
+ equeue_event_period(e, ms);
+ e->cb = cb;
+ e->data = data;
+ return equeue_post(q, ecallback_dispatch, e);
+}
+
+
+// backgrounding
+void equeue_background(equeue_t *q,
+ void (*update)(void *timer, int ms), void *timer) {
+ equeue_mutex_lock(&q->queuelock);
+ if (q->background.update) {
+ q->background.update(q->background.timer, -1);
+ }
+
+ q->background.update = update;
+ q->background.timer = timer;
+
+ if (q->background.update && q->queue) {
+ q->background.update(q->background.timer,
+ equeue_clampdiff(q->queue->target, equeue_tick()));
+ }
+ q->background.active = true;
+ equeue_mutex_unlock(&q->queuelock);
+}
+
+struct equeue_chain_context {
+ equeue_t *q;
+ equeue_t *target;
+ int id;
+};
+
+static void equeue_chain_dispatch(void *p) {
+ equeue_dispatch((equeue_t *)p, 0);
+}
+
+static void equeue_chain_update(void *p, int ms) {
+ struct equeue_chain_context *c = (struct equeue_chain_context *)p;
+ equeue_cancel(c->target, c->id);
+
+ if (ms >= 0) {
+ c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q);
+ } else {
+ equeue_dealloc(c->target, c);
+ }
+}
+
+void equeue_chain(equeue_t *q, equeue_t *target) {
+ struct equeue_chain_context *c = equeue_alloc(q,
+ sizeof(struct equeue_chain_context));
+
+ c->q = q;
+ c->target = target;
+ c->id = 0;
+
+ equeue_background(q, equeue_chain_update, c);
+}