Kenji Arai / mbed-os_TYBLE16

Dependents:   TYBLE16_simple_data_logger TYBLE16_MP3_Air

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers equeue.c Source File

equeue.c

00001 /*
00002  * Flexible event queue for dispatching events
00003  *
00004  * Copyright (c) 2016-2019 ARM Limited
00005  *
00006  * Licensed under the Apache License, Version 2.0 (the "License");
00007  * you may not use this file except in compliance with the License.
00008  * You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 #include "events/equeue.h"
00019 
00020 #include <stdlib.h>
00021 #include <stdint.h>
00022 #include <string.h>
00023 
00024 // check if the event is allocaded by user - event address is outside queues internal buffer address range
00025 #define EQUEUE_IS_USER_ALLOCATED_EVENT(e) ((q->buffer == NULL) || ((uintptr_t)(e) < (uintptr_t)q->buffer) || ((uintptr_t)(e) > ((uintptr_t)q->slab.data)))
00026 
00027 // calculate the relative-difference between absolute times while
00028 // correctly handling overflow conditions
00029 static inline int equeue_tickdiff(unsigned a, unsigned b)
00030 {
00031     return (int)(unsigned)(a - b);
00032 }
00033 
00034 // calculate the relative-difference between absolute times, but
00035 // also clamp to zero, resulting in only non-zero values.
00036 static inline int equeue_clampdiff(unsigned a, unsigned b)
00037 {
00038     int diff = equeue_tickdiff(a, b);
00039     return diff > 0 ? diff : 0;
00040 }
00041 
00042 // Increment the unique id in an event, hiding the event from cancel
00043 static inline void equeue_incid(equeue_t *q, struct equeue_event *e)
00044 {
00045     e->id += 1;
00046     if (((unsigned)e->id << q->npw2) == 0) {
00047         e->id = 1;
00048     }
00049 }
00050 
00051 
00052 // equeue lifetime management
00053 int equeue_create(equeue_t *q, size_t size)
00054 {
00055     // dynamically allocate the specified buffer
00056     void *buffer = malloc(size);
00057     if (!buffer) {
00058         return -1;
00059     }
00060 
00061     int err = equeue_create_inplace(q, size, buffer);
00062     q->allocated = buffer;
00063     return err;
00064 }
00065 
00066 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer)
00067 {
00068     // setup queue around provided buffer
00069     // ensure buffer and size are aligned
00070     if (size >= sizeof(void *)) {
00071         q->buffer = (void *)(((uintptr_t) buffer + sizeof(void *) -1) & ~(sizeof(void *) -1));
00072         size -= (char *) q->buffer - (char *) buffer;
00073         size &= ~(sizeof(void *) -1);
00074     } else {
00075         // don't align when size less then pointer size
00076         // e.g. static queue (size == 1)
00077         q->buffer = buffer;
00078     }
00079 
00080     q->allocated = 0;
00081 
00082     q->npw2 = 0;
00083     for (unsigned s = size; s; s >>= 1) {
00084         q->npw2++;
00085     }
00086 
00087     q->chunks = 0;
00088     q->slab.size = size;
00089     q->slab.data = q->buffer;
00090 
00091     q->queue = 0;
00092     equeue_tick_init();
00093     q->tick = equeue_tick();
00094     q->generation = 0;
00095     q->break_requested = false;
00096 
00097     q->background.active = false;
00098     q->background.update = 0;
00099     q->background.timer = 0;
00100 
00101     // initialize platform resources
00102     int err;
00103     err = equeue_sema_create(&q->eventsema);
00104     if (err < 0) {
00105         return err;
00106     }
00107 
00108     err = equeue_mutex_create(&q->queuelock);
00109     if (err < 0) {
00110         return err;
00111     }
00112 
00113     err = equeue_mutex_create(&q->memlock);
00114     if (err < 0) {
00115         return err;
00116     }
00117 
00118     return 0;
00119 }
00120 
00121 void equeue_destroy(equeue_t *q)
00122 {
00123     // call destructors on pending events
00124     for (struct equeue_event *es = q->queue; es; es = es->next) {
00125         for (struct equeue_event *e = es->sibling; e; e = e->sibling) {
00126             if (e->dtor) {
00127                 e->dtor(e + 1);
00128             }
00129         }
00130         if (es->dtor) {
00131             es->dtor(es + 1);
00132         }
00133     }
00134     // notify background timer
00135     if (q->background.update) {
00136         q->background.update(q->background.timer, -1);
00137     }
00138 
00139     // clean up platform resources + memory
00140     equeue_mutex_destroy(&q->memlock);
00141     equeue_mutex_destroy(&q->queuelock);
00142     equeue_sema_destroy(&q->eventsema);
00143     free(q->allocated);
00144 }
00145 
00146 
00147 // equeue chunk allocation functions
00148 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size)
00149 {
00150     // add event overhead
00151     size += sizeof(struct equeue_event);
00152     size = (size + sizeof(void *) -1) & ~(sizeof(void *) -1);
00153 
00154     equeue_mutex_lock(&q->memlock);
00155 
00156     // check if a good chunk is available
00157     for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) {
00158         if ((*p)->size >= size) {
00159             struct equeue_event *e = *p;
00160             if (e->sibling) {
00161                 *p = e->sibling;
00162                 (*p)->next = e->next;
00163             } else {
00164                 *p = e->next;
00165             }
00166 
00167             equeue_mutex_unlock(&q->memlock);
00168             return e;
00169         }
00170     }
00171 
00172     // otherwise allocate a new chunk out of the slab
00173     if (q->slab.size >= size) {
00174         struct equeue_event *e = (struct equeue_event *)q->slab.data;
00175         q->slab.data += size;
00176         q->slab.size -= size;
00177         e->size = size;
00178         e->id = 1;
00179 
00180         equeue_mutex_unlock(&q->memlock);
00181         return e;
00182     }
00183 
00184     equeue_mutex_unlock(&q->memlock);
00185     return 0;
00186 }
00187 
00188 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e)
00189 {
00190     equeue_mutex_lock(&q->memlock);
00191 
00192     // stick chunk into list of chunks
00193     struct equeue_event **p = &q->chunks;
00194     while (*p && (*p)->size < e->size) {
00195         p = &(*p)->next;
00196     }
00197 
00198     if (*p && (*p)->size == e->size) {
00199         e->sibling = *p;
00200         e->next = (*p)->next;
00201     } else {
00202         e->sibling = 0;
00203         e->next = *p;
00204     }
00205     *p = e;
00206 
00207     equeue_mutex_unlock(&q->memlock);
00208 }
00209 
00210 void *equeue_alloc(equeue_t *q, size_t size)
00211 {
00212     struct equeue_event *e = equeue_mem_alloc(q, size);
00213     if (!e) {
00214         return 0;
00215     }
00216 
00217     e->target = 0;
00218     e->period = -1;
00219     e->dtor = 0;
00220 
00221     return e + 1;
00222 }
00223 
00224 void equeue_dealloc(equeue_t *q, void *p)
00225 {
00226     struct equeue_event *e = (struct equeue_event *)p - 1;
00227 
00228     if (e->dtor) {
00229         e->dtor(e + 1);
00230     }
00231 
00232     if (!EQUEUE_IS_USER_ALLOCATED_EVENT(e)) {
00233         equeue_mem_dealloc(q, e);
00234     }
00235 }
00236 
00237 void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick)
00238 {
00239     e->target = tick + equeue_clampdiff(e->target, tick);
00240     e->generation = q->generation;
00241 
00242     equeue_mutex_lock(&q->queuelock);
00243 
00244     // find the event slot
00245     struct equeue_event **p = &q->queue;
00246     while (*p && equeue_tickdiff((*p)->target, e->target) < 0) {
00247         p = &(*p)->next;
00248     }
00249 
00250     // insert at head in slot
00251     if (*p && (*p)->target == e->target) {
00252         e->next = (*p)->next;
00253         if (e->next) {
00254             e->next->ref = &e->next;
00255         }
00256         e->sibling = *p;
00257         e->sibling->next = 0;
00258         e->sibling->ref = &e->sibling;
00259     } else {
00260         e->next = *p;
00261         if (e->next) {
00262             e->next->ref = &e->next;
00263         }
00264         e->sibling = 0;
00265     }
00266 
00267     *p = e;
00268     e->ref = p;
00269 
00270     // notify background timer
00271     if ((q->background.update && q->background.active) &&
00272             (q->queue == e && !e->sibling)) {
00273         q->background.update(q->background.timer,
00274                              equeue_clampdiff(e->target, tick));
00275     }
00276     equeue_mutex_unlock(&q->queuelock);
00277 }
00278 
00279 // equeue scheduling functions
00280 static int equeue_event_id(equeue_t *q, struct equeue_event *e)
00281 {
00282     // setup event and hash local id with buffer offset for unique id
00283     return ((unsigned)e->id << q->npw2) | ((unsigned char *)e - q->buffer);
00284 }
00285 
00286 static struct equeue_event *equeue_unqueue_by_address(equeue_t *q, struct equeue_event *e)
00287 {
00288     equeue_mutex_lock(&q->queuelock);
00289     // clear the event and check if already in-flight
00290     e->cb = 0;
00291     e->period = -1;
00292 
00293     int diff = equeue_tickdiff(e->target, q->tick);
00294     if (diff < 0 || (diff == 0 && e->generation != q->generation)) {
00295         equeue_mutex_unlock(&q->queuelock);
00296         return 0;
00297     }
00298 
00299     // disentangle from queue
00300     if (e->sibling) {
00301         e->sibling->next = e->next;
00302         if (e->sibling->next) {
00303             e->sibling->next->ref = &e->sibling->next;
00304         }
00305 
00306         *e->ref = e->sibling;
00307         e->sibling->ref = e->ref;
00308     } else {
00309         *e->ref = e->next;
00310         if (e->next) {
00311             e->next->ref = e->ref;
00312         }
00313     }
00314     equeue_mutex_unlock(&q->queuelock);
00315     return e;
00316 }
00317 
00318 static struct equeue_event *equeue_unqueue_by_id(equeue_t *q, int id)
00319 {
00320     // decode event from unique id and check that the local id matches
00321     struct equeue_event *e = (struct equeue_event *)
00322                              &q->buffer[id & ((1u << q->npw2) - 1u)];
00323 
00324     equeue_mutex_lock(&q->queuelock);
00325     if (e->id != (unsigned)id >> q->npw2) {
00326         equeue_mutex_unlock(&q->queuelock);
00327         return 0;
00328     }
00329 
00330     if (0 == equeue_unqueue_by_address(q, e)) {
00331         equeue_mutex_unlock(&q->queuelock);
00332         return 0;
00333     }
00334 
00335     equeue_incid(q, e);
00336     equeue_mutex_unlock(&q->queuelock);
00337 
00338     return e;
00339 }
00340 
00341 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target)
00342 {
00343     equeue_mutex_lock(&q->queuelock);
00344 
00345     // find all expired events and mark a new generation
00346     q->generation += 1;
00347     if (equeue_tickdiff(q->tick, target) <= 0) {
00348         q->tick = target;
00349     }
00350 
00351     struct equeue_event *head = q->queue;
00352     struct equeue_event **p = &head;
00353     while (*p && equeue_tickdiff((*p)->target, target) <= 0) {
00354         p = &(*p)->next;
00355     }
00356 
00357     q->queue = *p;
00358     if (q->queue) {
00359         q->queue->ref = &q->queue;
00360     }
00361 
00362     *p = 0;
00363 
00364     equeue_mutex_unlock(&q->queuelock);
00365 
00366     // reverse and flatten each slot to match insertion order
00367     struct equeue_event **tail = &head;
00368     struct equeue_event *ess = head;
00369     while (ess) {
00370         struct equeue_event *es = ess;
00371         ess = es->next;
00372 
00373         struct equeue_event *prev = 0;
00374         for (struct equeue_event *e = es; e; e = e->sibling) {
00375             e->next = prev;
00376             prev = e;
00377         }
00378 
00379         *tail = prev;
00380         tail = &es->next;
00381     }
00382 
00383     return head;
00384 }
00385 
00386 int equeue_post(equeue_t *q, void (*cb)(void *), void *p)
00387 {
00388     struct equeue_event *e = (struct equeue_event *)p - 1;
00389     unsigned tick = equeue_tick();
00390     e->cb = cb;
00391     e->target = tick + e->target;
00392 
00393     equeue_enqueue(q, e, tick);
00394     int id = equeue_event_id(q, e);
00395     equeue_sema_signal(&q->eventsema);
00396     return id;
00397 }
00398 
00399 void equeue_post_user_allocated(equeue_t *q, void (*cb)(void *), void *p)
00400 {
00401     struct equeue_event *e = (struct equeue_event *)p;
00402     unsigned tick = equeue_tick();
00403     e->cb = cb;
00404     e->target = tick + e->target;
00405 
00406     equeue_enqueue(q, e, tick);
00407     equeue_sema_signal(&q->eventsema);
00408 }
00409 
00410 bool equeue_cancel(equeue_t *q, int id)
00411 {
00412     if (!id) {
00413         return false;
00414     }
00415 
00416     struct equeue_event *e = equeue_unqueue_by_id(q, id);
00417     if (e) {
00418         equeue_dealloc(q, e + 1);
00419         return true;
00420     } else {
00421         return false;
00422     }
00423 }
00424 
00425 bool equeue_cancel_user_allocated(equeue_t *q, void *e)
00426 {
00427     if (!e) {
00428         return false;
00429     }
00430 
00431     struct equeue_event *_e = equeue_unqueue_by_address(q, e);
00432     if (_e) {
00433         equeue_dealloc(q, _e + 1);
00434         return true;
00435     } else {
00436         return false;
00437     }
00438 }
00439 
00440 int equeue_timeleft(equeue_t *q, int id)
00441 {
00442     int ret = -1;
00443 
00444     if (!id) {
00445         return -1;
00446     }
00447 
00448     // decode event from unique id and check that the local id matches
00449     struct equeue_event *e = (struct equeue_event *)
00450                              &q->buffer[id & ((1u << q->npw2) - 1u)];
00451 
00452     equeue_mutex_lock(&q->queuelock);
00453     if (e->id == (unsigned)id >> q->npw2) {
00454         ret = equeue_clampdiff(e->target, equeue_tick());
00455     }
00456     equeue_mutex_unlock(&q->queuelock);
00457     return ret;
00458 }
00459 
00460 int equeue_timeleft_user_allocated(equeue_t *q, void *e)
00461 {
00462     int ret = -1;
00463 
00464     if (!e) {
00465         return -1;
00466     }
00467 
00468     struct equeue_event *_e = (struct equeue_event *)e;
00469     equeue_mutex_lock(&q->queuelock);
00470     ret = equeue_clampdiff(_e->target, equeue_tick());
00471     equeue_mutex_unlock(&q->queuelock);
00472     return ret;
00473 }
00474 
00475 void equeue_break(equeue_t *q)
00476 {
00477     equeue_mutex_lock(&q->queuelock);
00478     q->break_requested = true;
00479     equeue_mutex_unlock(&q->queuelock);
00480     equeue_sema_signal(&q->eventsema);
00481 }
00482 
00483 void equeue_dispatch(equeue_t *q, int ms)
00484 {
00485     unsigned tick = equeue_tick();
00486     unsigned timeout = tick + ms;
00487     q->background.active = false;
00488 
00489     while (1) {
00490         // collect all the available events and next deadline
00491         struct equeue_event *es = equeue_dequeue(q, tick);
00492 
00493         // dispatch events
00494         while (es) {
00495             struct equeue_event *e = es;
00496             es = e->next;
00497 
00498             // actually dispatch the callbacks
00499             void (*cb)(void *) = e->cb;
00500             if (cb) {
00501                 cb(e + 1);
00502             }
00503 
00504             // reenqueue periodic events or deallocate
00505             if (e->period >= 0) {
00506                 e->target += e->period;
00507                 equeue_enqueue(q, e, equeue_tick());
00508             } else {
00509                 equeue_incid(q, e);
00510                 equeue_dealloc(q, e + 1);
00511             }
00512         }
00513 
00514         int deadline = -1;
00515         tick = equeue_tick();
00516 
00517         // check if we should stop dispatching soon
00518         if (ms >= 0) {
00519             deadline = equeue_tickdiff(timeout, tick);
00520             if (deadline <= 0) {
00521                 // update background timer if necessary
00522                 if (q->background.update) {
00523                     equeue_mutex_lock(&q->queuelock);
00524                     if (q->background.update && q->queue) {
00525                         q->background.update(q->background.timer,
00526                                              equeue_clampdiff(q->queue->target, tick));
00527                     }
00528                     q->background.active = true;
00529                     equeue_mutex_unlock(&q->queuelock);
00530                 }
00531                 q->break_requested = false;
00532                 return;
00533             }
00534         }
00535 
00536         // find closest deadline
00537         equeue_mutex_lock(&q->queuelock);
00538         if (q->queue) {
00539             int diff = equeue_clampdiff(q->queue->target, tick);
00540             if ((unsigned)diff < (unsigned)deadline) {
00541                 deadline = diff;
00542             }
00543         }
00544         equeue_mutex_unlock(&q->queuelock);
00545 
00546         // wait for events
00547         equeue_sema_wait(&q->eventsema, deadline);
00548 
00549         // check if we were notified to break out of dispatch
00550         if (q->break_requested) {
00551             equeue_mutex_lock(&q->queuelock);
00552             if (q->break_requested) {
00553                 q->break_requested = false;
00554                 equeue_mutex_unlock(&q->queuelock);
00555                 return;
00556             }
00557             equeue_mutex_unlock(&q->queuelock);
00558         }
00559 
00560         // update tick for next iteration
00561         tick = equeue_tick();
00562     }
00563 }
00564 
00565 
00566 // event functions
00567 void equeue_event_delay(void *p, int ms)
00568 {
00569     struct equeue_event *e = (struct equeue_event *)p - 1;
00570     e->target = ms;
00571 }
00572 
00573 void equeue_event_period(void *p, int ms)
00574 {
00575     struct equeue_event *e = (struct equeue_event *)p - 1;
00576     e->period = ms;
00577 }
00578 
00579 void equeue_event_dtor(void *p, void (*dtor)(void *))
00580 {
00581     struct equeue_event *e = (struct equeue_event *)p - 1;
00582     e->dtor = dtor;
00583 }
00584 
00585 
00586 // simple callbacks
00587 struct ecallback {
00588     void (*cb)(void *);
00589     void *data;
00590 };
00591 
00592 static void ecallback_dispatch(void *p)
00593 {
00594     struct ecallback *e = (struct ecallback *)p;
00595     e->cb(e->data);
00596 }
00597 
00598 int equeue_call(equeue_t *q, void (*cb)(void *), void *data)
00599 {
00600     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00601     if (!e) {
00602         return 0;
00603     }
00604 
00605     e->cb = cb;
00606     e->data = data;
00607     return equeue_post(q, ecallback_dispatch, e);
00608 }
00609 
00610 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void *), void *data)
00611 {
00612     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00613     if (!e) {
00614         return 0;
00615     }
00616 
00617     equeue_event_delay(e, ms);
00618     e->cb = cb;
00619     e->data = data;
00620     return equeue_post(q, ecallback_dispatch, e);
00621 }
00622 
00623 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void *), void *data)
00624 {
00625     struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback));
00626     if (!e) {
00627         return 0;
00628     }
00629 
00630     equeue_event_delay(e, ms);
00631     equeue_event_period(e, ms);
00632     e->cb = cb;
00633     e->data = data;
00634     return equeue_post(q, ecallback_dispatch, e);
00635 }
00636 
00637 
00638 // backgrounding
00639 void equeue_background(equeue_t *q,
00640                        void (*update)(void *timer, int ms), void *timer)
00641 {
00642     equeue_mutex_lock(&q->queuelock);
00643     if (q->background.update) {
00644         q->background.update(q->background.timer, -1);
00645     }
00646 
00647     q->background.update = update;
00648     q->background.timer = timer;
00649 
00650     if (q->background.update && q->queue) {
00651         q->background.update(q->background.timer,
00652                              equeue_clampdiff(q->queue->target, equeue_tick()));
00653     }
00654     q->background.active = true;
00655     equeue_mutex_unlock(&q->queuelock);
00656 }
00657 
00658 struct equeue_chain_context {
00659     equeue_t *q;
00660     equeue_t *target;
00661     int id;
00662 };
00663 
00664 static void equeue_chain_dispatch(void *p)
00665 {
00666     equeue_dispatch((equeue_t *)p, 0);
00667 }
00668 
00669 static void equeue_chain_update(void *p, int ms)
00670 {
00671     struct equeue_chain_context *c = (struct equeue_chain_context *)p;
00672     (void)equeue_cancel(c->target, c->id);
00673 
00674     if (ms >= 0) {
00675         c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q);
00676     } else {
00677         equeue_dealloc(c->q, c);
00678     }
00679 }
00680 
00681 int equeue_chain(equeue_t *q, equeue_t *target)
00682 {
00683     if (!target) {
00684         equeue_background(q, 0, 0);
00685         return 0;
00686     }
00687 
00688     struct equeue_chain_context *c = equeue_alloc(q,
00689                                                   sizeof(struct equeue_chain_context));
00690     if (!c) {
00691         return -1;
00692     }
00693 
00694     c->q = q;
00695     c->target = target;
00696     c->id = 0;
00697 
00698     equeue_background(q, equeue_chain_update, c);
00699     return 0;
00700 }