Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers equeue.c Source File

equeue.c

00001 /*
00002  * Flexible event queue for dispatching events
00003  *
00004  * Copyright (c) 2016 Christopher Haster
00005  *
00006  * Licensed under the Apache License, Version 2.0 (the "License");
00007  * you may not use this file except in compliance with the License.
00008  * You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 #include "equeue/equeue.h"
00019 
00020 #include <stdlib.h>
00021 #include <string.h>
00022 
00023 
00024 // calculate the relative-difference between absolute times while
00025 // correctly handling overflow conditions
00026 static inline int equeue_tickdiff(unsigned a, unsigned b) {
00027     return (int)(unsigned)(a - b);
00028 }
00029 
00030 // calculate the relative-difference between absolute times, but
00031 // also clamp to zero, resulting in only non-zero values.
00032 static inline int equeue_clampdiff(unsigned a, unsigned b) {
00033     int diff = equeue_tickdiff(a, b);
00034     return ~(diff >> (8*sizeof(int)-1)) & diff;
00035 }
00036 
00037 // Increment the unique id in an event, hiding the event from cancel
00038 static inline void equeue_incid(equeue_t *q, struct equeue_event *e) {
00039     e->id += 1;
00040     if ((e->id << q->npw2) == 0) {
00041         e->id = 1;
00042     }
00043 }
00044 
00045 
00046 // equeue lifetime management
00047 int equeue_create(equeue_t *q, size_t size) {
00048     // dynamically allocate the specified buffer
00049     void *buffer = malloc(size);
00050     if (!buffer) {
00051         return -1;
00052     }
00053 
00054     int err = equeue_create_inplace(q, size, buffer);
00055     q->allocated = buffer;
00056     return err;
00057 }
00058 
00059 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
00060     // setup queue around provided buffer
00061     q->buffer = buffer;
00062     q->allocated = 0;
00063 
00064     q->npw2 = 0;
00065     for (unsigned s = size; s; s >>= 1) {
00066         q->npw2++;
00067     }
00068 
00069     q->chunks = 0;
00070     q->slab.size = size;
00071     q->slab.data = buffer;
00072 
00073     q->queue = 0;
00074     q->tick = equeue_tick();
00075     q->generation = 0;
00076     q->break_requested = false;
00077 
00078     q->background.active = false;
00079     q->background.update = 0;
00080     q->background.timer = 0;
00081 
00082     // initialize platform resources
00083     int err;
00084     err = equeue_sema_create(&q->eventsema);
00085     if (err < 0) {
00086         return err;
00087     }
00088 
00089     err = equeue_mutex_create(&q->queuelock);
00090     if (err < 0) {
00091         return err;
00092     }
00093 
00094     err = equeue_mutex_create(&q->memlock);
00095     if (err < 0) {
00096         return err;
00097     }
00098 
00099     return 0;
00100 }
00101 
00102 void equeue_destroy(equeue_t *q) {
00103     // call destructors on pending events
00104     for (struct equeue_event *es = q->queue; es; es = es->next) {
00105         for (struct equeue_event *e = q->queue; e; e = e->sibling) {
00106             if (e->dtor) {
00107                 e->dtor(e + 1);
00108             }
00109         }
00110     }
00111 
00112     // notify background timer
00113     if (q->background.update) {
00114         q->background.update(q->background.timer, -1);
00115     }
00116 
00117     // clean up platform resources + memory
00118     equeue_mutex_destroy(&q->memlock);
00119     equeue_mutex_destroy(&q->queuelock);
00120     equeue_sema_destroy(&q->eventsema);
00121     free(q->allocated);
00122 }
00123 
00124 
00125 // equeue chunk allocation functions
00126 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) {
00127     // add event overhead
00128     size += sizeof(struct equeue_event);
00129     size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1);
00130 
00131     equeue_mutex_lock(&q->memlock);
00132 
00133     // check if a good chunk is available
00134     for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) {
00135         if ((*p)->size >= size) {
00136             struct equeue_event *e = *p;
00137             if (e->sibling) {
00138                 *p = e->sibling;
00139                 (*p)->next = e->next;
00140             } else {
00141                 *p = e->next;
00142             }
00143 
00144             equeue_mutex_unlock(&q->memlock);
00145             return e;
00146         }
00147     }
00148 
00149     // otherwise allocate a new chunk out of the slab
00150     if (q->slab.size >= size) {
00151         struct equeue_event *e = (struct equeue_event *)q->slab.data;
00152         q->slab.data += size;
00153         q->slab.size -= size;
00154         e->size = size;
00155         e->id = 1;
00156 
00157         equeue_mutex_unlock(&q->memlock);
00158         return e;
00159     }
00160 
00161     equeue_mutex_unlock(&q->memlock);
00162     return 0;
00163 }
00164 
00165 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) {
00166     equeue_mutex_lock(&q->memlock);
00167 
00168     // stick chunk into list of chunks
00169     struct equeue_event **p = &q->chunks;
00170     while (*p && (*p)->size < e->size) {
00171         p = &(*p)->next;
00172     }
00173 
00174     if (*p && (*p)->size == e->size) {
00175         e->sibling = *p;
00176         e->next = (*p)->next;
00177     } else {
00178         e->sibling = 0;
00179         e->next = *p;
00180     }
00181     *p = e;
00182 
00183     equeue_mutex_unlock(&q->memlock);
00184 }
00185 
00186 void *equeue_alloc(equeue_t *q, size_t size) {
00187     struct equeue_event *e = equeue_mem_alloc(q, size);
00188     if (!e) {
00189         return 0;
00190     }
00191 
00192     e->target = 0;
00193     e->period = -1;
00194     e->dtor = 0;
00195 
00196     return e + 1;
00197 }
00198 
00199 void equeue_dealloc(equeue_t *q, void *p) {
00200     struct equeue_event *e = (struct equeue_event*)p - 1;
00201 
00202     if (e->dtor) {
00203         e->dtor(e+1);
00204     }
00205 
00206     equeue_mem_dealloc(q, e);
00207 }
00208 
00209 
00210 // equeue scheduling functions
00211 static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) {
00212     // setup event and hash local id with buffer offset for unique id
00213     int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
00214     e->target = tick + equeue_clampdiff(e->target, tick);
00215     e->generation = q->generation;
00216 
00217     equeue_mutex_lock(&q->queuelock);
00218 
00219     // find the event slot
00220     struct equeue_event **p = &q->queue;
00221     while (*p && equeue_tickdiff((*p)->target, e->target) < 0) {
00222         p = &(*p)->next;
00223     }
00224 
00225     // insert at head in slot
00226     if (*p && (*p)->target == e->target) {
00227         e->next = (*p)->next;
00228         if (e->next) {
00229             e->next->ref = &e->next;
00230         }
00231 
00232         e->sibling = *p;
00233         e->sibling->ref = &e->sibling;
00234     } else {
00235         e->next = *p;
00236         if (e->next) {
00237             e->next->ref = &e->next;
00238         }
00239 
00240         e->sibling = 0;
00241     }
00242 
00243     *p = e;
00244     e->ref = p;
00245 
00246     // notify background timer
00247     if ((q->background.update && q->background.active) &&
00248         (q->queue == e && !e->sibling)) {
00249         q->background.update(q->background.timer,
00250                 equeue_clampdiff(e->target, tick));
00251     }
00252 
00253     equeue_mutex_unlock(&q->queuelock);
00254 
00255     return id;
00256 }
00257 
00258 static struct equeue_event *equeue_unqueue(equeue_t *q, int id) {
00259     // decode event from unique id and check that the local id matches
00260     struct equeue_event *e = (struct equeue_event *)
00261             &q->buffer[id & ((1 << q->npw2)-1)];
00262 
00263     equeue_mutex_lock(&q->queuelock);
00264     if (e->id != id >> q->npw2) {
00265         equeue_mutex_unlock(&q->queuelock);
00266         return 0;
00267     }
00268 
00269     // clear the event and check if already in-flight
00270     e->cb = 0;
00271     e->period = -1;
00272 
00273     int diff = equeue_tickdiff(e->target, q->tick);
00274     if (diff < 0 || (diff == 0 && e->generation != q->generation)) {
00275         equeue_mutex_unlock(&q->queuelock);
00276         return 0;
00277     }
00278 
00279     // disentangle from queue
00280     if (e->sibling) {
00281         e->sibling->next = e->next;
00282         if (e->sibling->next) {
00283             e->sibling->next->ref = &e->sibling->next;
00284         }
00285 
00286         *e->ref = e->sibling;
00287         e->sibling->ref = e->ref;
00288     } else {
00289         *e->ref = e->next;
00290         if (e->next) {
00291             e->next->ref = e->ref;
00292         }
00293     }
00294 
00295     equeue_incid(q, e);
00296     equeue_mutex_unlock(&q->queuelock);
00297 
00298     return e;
00299 }
00300 
00301 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) {
00302     equeue_mutex_lock(&q->queuelock);
00303 
00304     // find all expired events and mark a new generation
00305     q->generation += 1;
00306     if (equeue_tickdiff(q->tick, target) <= 0) {
00307         q->tick = target;
00308     }
00309 
00310     struct equeue_event *head = q->queue;
00311     struct equeue_event **p = &head;
00312     while (*p && equeue_tickdiff((*p)->target, target) <= 0) {
00313         p = &(*p)->next;
00314     }
00315 
00316     q->queue = *p;
00317     if (q->queue) {
00318         q->queue->ref = &q->queue;
00319     }
00320 
00321     *p = 0;
00322 
00323     equeue_mutex_unlock(&q->queuelock);
00324 
00325     // reverse and flatten each slot to match insertion order
00326     struct equeue_event **tail = &head;
00327     struct equeue_event *ess = head;
00328     while (ess) {
00329         struct equeue_event *es = ess;
00330         ess = es->next;
00331 
00332         struct equeue_event *prev = 0;
00333         for (struct equeue_event *e = es; e; e = e->sibling) {
00334             e->next = prev;
00335             prev = e;
00336         }
00337 
00338         *tail = prev;
00339         tail = &es->next;
00340     }
00341 
00342     return head;
00343 }
00344 
00345 int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
00346     struct equeue_event *e = (struct equeue_event*)p - 1;
00347     unsigned tick = equeue_tick();
00348     e->cb = cb;
00349     e->target = tick + e->target;
00350 
00351     int id = equeue_enqueue(q, e, tick);
00352     equeue_sema_signal(&q->eventsema);
00353     return id;
00354 }
00355 
00356 void equeue_cancel(equeue_t *q, int id) {
00357     if (!id) {
00358         return;
00359     }
00360 
00361     struct equeue_event *e = equeue_unqueue(q, id);
00362     if (e) {
00363         equeue_dealloc(q, e + 1);
00364     }
00365 }
00366 
00367 void equeue_break(equeue_t *q) {
00368     equeue_mutex_lock(&q->queuelock);
00369     q->break_requested = true;
00370     equeue_mutex_unlock(&q->queuelock);
00371     equeue_sema_signal(&q->eventsema);
00372 }
00373 
00374 void equeue_dispatch(equeue_t *q, int ms) {
00375     unsigned tick = equeue_tick();
00376     unsigned timeout = tick + ms;
00377     q->background.active = false;
00378 
00379     while (1) {
00380         // collect all the available events and next deadline
00381         struct equeue_event *es = equeue_dequeue(q, tick);
00382 
00383         // dispatch events
00384         while (es) {
00385             struct equeue_event *e = es;
00386             es = e->next;
00387 
00388             // actually dispatch the callbacks
00389             void (*cb)(void *) = e->cb;
00390             if (cb) {
00391                 cb(e + 1);
00392             }
00393 
00394             // reenqueue periodic events or deallocate
00395             if (e->period >= 0) {
00396                 e->target += e->period;
00397                 equeue_enqueue(q, e, equeue_tick());
00398             } else {
00399                 equeue_incid(q, e);
00400                 equeue_dealloc(q, e+1);
00401             }
00402         }
00403 
00404         int deadline = -1;
00405         tick = equeue_tick();
00406 
00407         // check if we should stop dispatching soon
00408         if (ms >= 0) {
00409             deadline = equeue_tickdiff(timeout, tick);
00410             if (deadline <= 0) {
00411                 // update background timer if necessary
00412                 if (q->background.update) {
00413                     equeue_mutex_lock(&q->queuelock);
00414                     if (q->background.update && q->queue) {
00415                         q->background.update(q->background.timer,
00416                                 equeue_clampdiff(q->queue->target, tick));
00417                     }
00418                     q->background.active = true;
00419                     equeue_mutex_unlock(&q->queuelock);
00420                 }
00421                 q->break_requested = false;
00422                 return;
00423             }
00424         }
00425 
00426         // find closest deadline
00427         equeue_mutex_lock(&q->queuelock);
00428         if (q->queue) {
00429             int diff = equeue_clampdiff(q->queue->target, tick);
00430             if ((unsigned)diff < (unsigned)deadline) {
00431                 deadline = diff;
00432             }
00433         }
00434         equeue_mutex_unlock(&q->queuelock);
00435 
00436         // wait for events
00437         equeue_sema_wait(&q->eventsema, deadline);
00438 
00439         // check if we were notified to break out of dispatch
00440         if (q->break_requested) {
00441             equeue_mutex_lock(&q->queuelock);
00442             if (q->break_requested) {
00443                 q->break_requested = false;
00444                 equeue_mutex_unlock(&q->queuelock);
00445                 return;
00446             }
00447             equeue_mutex_unlock(&q->queuelock);
00448         }
00449 
00450         // update tick for next iteration
00451         tick = equeue_tick();
00452     }
00453 }
00454 
00455 
00456 // event functions
00457 void equeue_event_delay(void *p, int ms) {
00458     struct equeue_event *e = (struct equeue_event*)p - 1;
00459     e->target = ms;
00460 }
00461 
00462 void equeue_event_period(void *p, int ms) {
00463     struct equeue_event *e = (struct equeue_event*)p - 1;
00464     e->period = ms;
00465 }
00466 
00467 void equeue_event_dtor(void *p, void (*dtor)(void *)) {
00468     struct equeue_event *e = (struct equeue_event*)p - 1;
00469     e->dtor = dtor;
00470 }
00471 
00472 
00473 // simple callbacks
00474 struct ecallback {
00475     void (*cb)(void*);
00476     void *data;
00477 };
00478 
00479 static void ecallback_dispatch(void *p) {
00480     struct ecallback *e = (struct ecallback*)p;
00481     e->cb(e->data);
00482 }
00483 
00484 int equeue_call(equeue_t *q, void (*cb)(void*), void *data) {
00485     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00486     if (!e) {
00487         return 0;
00488     }
00489 
00490     e->cb = cb;
00491     e->data = data;
00492     return equeue_post(q, ecallback_dispatch, e);
00493 }
00494 
00495 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void*), void *data) {
00496     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00497     if (!e) {
00498         return 0;
00499     }
00500 
00501     equeue_event_delay(e, ms);
00502     e->cb = cb;
00503     e->data = data;
00504     return equeue_post(q, ecallback_dispatch, e);
00505 }
00506 
00507 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) {
00508     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00509     if (!e) {
00510         return 0;
00511     }
00512 
00513     equeue_event_delay(e, ms);
00514     equeue_event_period(e, ms);
00515     e->cb = cb;
00516     e->data = data;
00517     return equeue_post(q, ecallback_dispatch, e);
00518 }
00519 
00520 
00521 // backgrounding
00522 void equeue_background(equeue_t *q,
00523         void (*update)(void *timer, int ms), void *timer) {
00524     equeue_mutex_lock(&q->queuelock);
00525     if (q->background.update) {
00526         q->background.update(q->background.timer, -1);
00527     }
00528 
00529     q->background.update = update;
00530     q->background.timer = timer;
00531 
00532     if (q->background.update && q->queue) {
00533         q->background.update(q->background.timer,
00534                 equeue_clampdiff(q->queue->target, equeue_tick()));
00535     }
00536     q->background.active = true;
00537     equeue_mutex_unlock(&q->queuelock);
00538 }
00539 
00540 struct equeue_chain_context {
00541     equeue_t *q;
00542     equeue_t *target;
00543     int id;
00544 };
00545 
00546 static void equeue_chain_dispatch(void *p) {
00547     equeue_dispatch((equeue_t *)p, 0);
00548 }
00549 
00550 static void equeue_chain_update(void *p, int ms) {
00551     struct equeue_chain_context *c = (struct equeue_chain_context *)p;
00552     equeue_cancel(c->target, c->id);
00553 
00554     if (ms >= 0) {
00555         c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q);
00556     } else {
00557         equeue_dealloc(c->target, c);
00558     }
00559 }
00560 
00561 void equeue_chain(equeue_t *q, equeue_t *target) {
00562     if (!target) {
00563         equeue_background(q, 0, 0);
00564         return;
00565     }
00566 
00567     struct equeue_chain_context *c = equeue_alloc(q,
00568             sizeof(struct equeue_chain_context));
00569 
00570     c->q = q;
00571     c->target = target;
00572     c->id = 0;
00573 
00574     equeue_background(q, equeue_chain_update, c);
00575 }