Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: TYBLE16_simple_data_logger TYBLE16_MP3_Air
equeue.c
00001 /* 00002 * Flexible event queue for dispatching events 00003 * 00004 * Copyright (c) 2016-2019 ARM Limited 00005 * 00006 * Licensed under the Apache License, Version 2.0 (the "License"); 00007 * you may not use this file except in compliance with the License. 00008 * You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 #include "events/equeue.h" 00019 00020 #include <stdlib.h> 00021 #include <stdint.h> 00022 #include <string.h> 00023 00024 // check if the event is allocaded by user - event address is outside queues internal buffer address range 00025 #define EQUEUE_IS_USER_ALLOCATED_EVENT(e) ((q->buffer == NULL) || ((uintptr_t)(e) < (uintptr_t)q->buffer) || ((uintptr_t)(e) > ((uintptr_t)q->slab.data))) 00026 00027 // calculate the relative-difference between absolute times while 00028 // correctly handling overflow conditions 00029 static inline int equeue_tickdiff(unsigned a, unsigned b) 00030 { 00031 return (int)(unsigned)(a - b); 00032 } 00033 00034 // calculate the relative-difference between absolute times, but 00035 // also clamp to zero, resulting in only non-zero values. 00036 static inline int equeue_clampdiff(unsigned a, unsigned b) 00037 { 00038 int diff = equeue_tickdiff(a, b); 00039 return diff > 0 ? diff : 0; 00040 } 00041 00042 // Increment the unique id in an event, hiding the event from cancel 00043 static inline void equeue_incid(equeue_t *q, struct equeue_event *e) 00044 { 00045 e->id += 1; 00046 if (((unsigned)e->id << q->npw2) == 0) { 00047 e->id = 1; 00048 } 00049 } 00050 00051 00052 // equeue lifetime management 00053 int equeue_create(equeue_t *q, size_t size) 00054 { 00055 // dynamically allocate the specified buffer 00056 void *buffer = malloc(size); 00057 if (!buffer) { 00058 return -1; 00059 } 00060 00061 int err = equeue_create_inplace(q, size, buffer); 00062 q->allocated = buffer; 00063 return err; 00064 } 00065 00066 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) 00067 { 00068 // setup queue around provided buffer 00069 // ensure buffer and size are aligned 00070 if (size >= sizeof(void *)) { 00071 q->buffer = (void *)(((uintptr_t) buffer + sizeof(void *) -1) & ~(sizeof(void *) -1)); 00072 size -= (char *) q->buffer - (char *) buffer; 00073 size &= ~(sizeof(void *) -1); 00074 } else { 00075 // don't align when size less then pointer size 00076 // e.g. static queue (size == 1) 00077 q->buffer = buffer; 00078 } 00079 00080 q->allocated = 0; 00081 00082 q->npw2 = 0; 00083 for (unsigned s = size; s; s >>= 1) { 00084 q->npw2++; 00085 } 00086 00087 q->chunks = 0; 00088 q->slab.size = size; 00089 q->slab.data = q->buffer; 00090 00091 q->queue = 0; 00092 equeue_tick_init(); 00093 q->tick = equeue_tick(); 00094 q->generation = 0; 00095 q->break_requested = false; 00096 00097 q->background.active = false; 00098 q->background.update = 0; 00099 q->background.timer = 0; 00100 00101 // initialize platform resources 00102 int err; 00103 err = equeue_sema_create(&q->eventsema); 00104 if (err < 0) { 00105 return err; 00106 } 00107 00108 err = equeue_mutex_create(&q->queuelock); 00109 if (err < 0) { 00110 return err; 00111 } 00112 00113 err = equeue_mutex_create(&q->memlock); 00114 if (err < 0) { 00115 return err; 00116 } 00117 00118 return 0; 00119 } 00120 00121 void equeue_destroy(equeue_t *q) 00122 { 00123 // call destructors on pending events 00124 for (struct equeue_event *es = q->queue; es; es = es->next) { 00125 for (struct equeue_event *e = es->sibling; e; e = e->sibling) { 00126 if (e->dtor) { 00127 e->dtor(e + 1); 00128 } 00129 } 00130 if (es->dtor) { 00131 es->dtor(es + 1); 00132 } 00133 } 00134 // notify background timer 00135 if (q->background.update) { 00136 q->background.update(q->background.timer, -1); 00137 } 00138 00139 // clean up platform resources + memory 00140 equeue_mutex_destroy(&q->memlock); 00141 equeue_mutex_destroy(&q->queuelock); 00142 equeue_sema_destroy(&q->eventsema); 00143 free(q->allocated); 00144 } 00145 00146 00147 // equeue chunk allocation functions 00148 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) 00149 { 00150 // add event overhead 00151 size += sizeof(struct equeue_event); 00152 size = (size + sizeof(void *) -1) & ~(sizeof(void *) -1); 00153 00154 equeue_mutex_lock(&q->memlock); 00155 00156 // check if a good chunk is available 00157 for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) { 00158 if ((*p)->size >= size) { 00159 struct equeue_event *e = *p; 00160 if (e->sibling) { 00161 *p = e->sibling; 00162 (*p)->next = e->next; 00163 } else { 00164 *p = e->next; 00165 } 00166 00167 equeue_mutex_unlock(&q->memlock); 00168 return e; 00169 } 00170 } 00171 00172 // otherwise allocate a new chunk out of the slab 00173 if (q->slab.size >= size) { 00174 struct equeue_event *e = (struct equeue_event *)q->slab.data; 00175 q->slab.data += size; 00176 q->slab.size -= size; 00177 e->size = size; 00178 e->id = 1; 00179 00180 equeue_mutex_unlock(&q->memlock); 00181 return e; 00182 } 00183 00184 equeue_mutex_unlock(&q->memlock); 00185 return 0; 00186 } 00187 00188 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) 00189 { 00190 equeue_mutex_lock(&q->memlock); 00191 00192 // stick chunk into list of chunks 00193 struct equeue_event **p = &q->chunks; 00194 while (*p && (*p)->size < e->size) { 00195 p = &(*p)->next; 00196 } 00197 00198 if (*p && (*p)->size == e->size) { 00199 e->sibling = *p; 00200 e->next = (*p)->next; 00201 } else { 00202 e->sibling = 0; 00203 e->next = *p; 00204 } 00205 *p = e; 00206 00207 equeue_mutex_unlock(&q->memlock); 00208 } 00209 00210 void *equeue_alloc(equeue_t *q, size_t size) 00211 { 00212 struct equeue_event *e = equeue_mem_alloc(q, size); 00213 if (!e) { 00214 return 0; 00215 } 00216 00217 e->target = 0; 00218 e->period = -1; 00219 e->dtor = 0; 00220 00221 return e + 1; 00222 } 00223 00224 void equeue_dealloc(equeue_t *q, void *p) 00225 { 00226 struct equeue_event *e = (struct equeue_event *)p - 1; 00227 00228 if (e->dtor) { 00229 e->dtor(e + 1); 00230 } 00231 00232 if (!EQUEUE_IS_USER_ALLOCATED_EVENT(e)) { 00233 equeue_mem_dealloc(q, e); 00234 } 00235 } 00236 00237 void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) 00238 { 00239 e->target = tick + equeue_clampdiff(e->target, tick); 00240 e->generation = q->generation; 00241 00242 equeue_mutex_lock(&q->queuelock); 00243 00244 // find the event slot 00245 struct equeue_event **p = &q->queue; 00246 while (*p && equeue_tickdiff((*p)->target, e->target) < 0) { 00247 p = &(*p)->next; 00248 } 00249 00250 // insert at head in slot 00251 if (*p && (*p)->target == e->target) { 00252 e->next = (*p)->next; 00253 if (e->next) { 00254 e->next->ref = &e->next; 00255 } 00256 e->sibling = *p; 00257 e->sibling->next = 0; 00258 e->sibling->ref = &e->sibling; 00259 } else { 00260 e->next = *p; 00261 if (e->next) { 00262 e->next->ref = &e->next; 00263 } 00264 e->sibling = 0; 00265 } 00266 00267 *p = e; 00268 e->ref = p; 00269 00270 // notify background timer 00271 if ((q->background.update && q->background.active) && 00272 (q->queue == e && !e->sibling)) { 00273 q->background.update(q->background.timer, 00274 equeue_clampdiff(e->target, tick)); 00275 } 00276 equeue_mutex_unlock(&q->queuelock); 00277 } 00278 00279 // equeue scheduling functions 00280 static int equeue_event_id(equeue_t *q, struct equeue_event *e) 00281 { 00282 // setup event and hash local id with buffer offset for unique id 00283 return ((unsigned)e->id << q->npw2) | ((unsigned char *)e - q->buffer); 00284 } 00285 00286 static struct equeue_event *equeue_unqueue_by_address(equeue_t *q, struct equeue_event *e) 00287 { 00288 equeue_mutex_lock(&q->queuelock); 00289 // clear the event and check if already in-flight 00290 e->cb = 0; 00291 e->period = -1; 00292 00293 int diff = equeue_tickdiff(e->target, q->tick); 00294 if (diff < 0 || (diff == 0 && e->generation != q->generation)) { 00295 equeue_mutex_unlock(&q->queuelock); 00296 return 0; 00297 } 00298 00299 // disentangle from queue 00300 if (e->sibling) { 00301 e->sibling->next = e->next; 00302 if (e->sibling->next) { 00303 e->sibling->next->ref = &e->sibling->next; 00304 } 00305 00306 *e->ref = e->sibling; 00307 e->sibling->ref = e->ref; 00308 } else { 00309 *e->ref = e->next; 00310 if (e->next) { 00311 e->next->ref = e->ref; 00312 } 00313 } 00314 equeue_mutex_unlock(&q->queuelock); 00315 return e; 00316 } 00317 00318 static struct equeue_event *equeue_unqueue_by_id(equeue_t *q, int id) 00319 { 00320 // decode event from unique id and check that the local id matches 00321 struct equeue_event *e = (struct equeue_event *) 00322 &q->buffer[id & ((1u << q->npw2) - 1u)]; 00323 00324 equeue_mutex_lock(&q->queuelock); 00325 if (e->id != (unsigned)id >> q->npw2) { 00326 equeue_mutex_unlock(&q->queuelock); 00327 return 0; 00328 } 00329 00330 if (0 == equeue_unqueue_by_address(q, e)) { 00331 equeue_mutex_unlock(&q->queuelock); 00332 return 0; 00333 } 00334 00335 equeue_incid(q, e); 00336 equeue_mutex_unlock(&q->queuelock); 00337 00338 return e; 00339 } 00340 00341 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) 00342 { 00343 equeue_mutex_lock(&q->queuelock); 00344 00345 // find all expired events and mark a new generation 00346 q->generation += 1; 00347 if (equeue_tickdiff(q->tick, target) <= 0) { 00348 q->tick = target; 00349 } 00350 00351 struct equeue_event *head = q->queue; 00352 struct equeue_event **p = &head; 00353 while (*p && equeue_tickdiff((*p)->target, target) <= 0) { 00354 p = &(*p)->next; 00355 } 00356 00357 q->queue = *p; 00358 if (q->queue) { 00359 q->queue->ref = &q->queue; 00360 } 00361 00362 *p = 0; 00363 00364 equeue_mutex_unlock(&q->queuelock); 00365 00366 // reverse and flatten each slot to match insertion order 00367 struct equeue_event **tail = &head; 00368 struct equeue_event *ess = head; 00369 while (ess) { 00370 struct equeue_event *es = ess; 00371 ess = es->next; 00372 00373 struct equeue_event *prev = 0; 00374 for (struct equeue_event *e = es; e; e = e->sibling) { 00375 e->next = prev; 00376 prev = e; 00377 } 00378 00379 *tail = prev; 00380 tail = &es->next; 00381 } 00382 00383 return head; 00384 } 00385 00386 int equeue_post(equeue_t *q, void (*cb)(void *), void *p) 00387 { 00388 struct equeue_event *e = (struct equeue_event *)p - 1; 00389 unsigned tick = equeue_tick(); 00390 e->cb = cb; 00391 e->target = tick + e->target; 00392 00393 equeue_enqueue(q, e, tick); 00394 int id = equeue_event_id(q, e); 00395 equeue_sema_signal(&q->eventsema); 00396 return id; 00397 } 00398 00399 void equeue_post_user_allocated(equeue_t *q, void (*cb)(void *), void *p) 00400 { 00401 struct equeue_event *e = (struct equeue_event *)p; 00402 unsigned tick = equeue_tick(); 00403 e->cb = cb; 00404 e->target = tick + e->target; 00405 00406 equeue_enqueue(q, e, tick); 00407 equeue_sema_signal(&q->eventsema); 00408 } 00409 00410 bool equeue_cancel(equeue_t *q, int id) 00411 { 00412 if (!id) { 00413 return false; 00414 } 00415 00416 struct equeue_event *e = equeue_unqueue_by_id(q, id); 00417 if (e) { 00418 equeue_dealloc(q, e + 1); 00419 return true; 00420 } else { 00421 return false; 00422 } 00423 } 00424 00425 bool equeue_cancel_user_allocated(equeue_t *q, void *e) 00426 { 00427 if (!e) { 00428 return false; 00429 } 00430 00431 struct equeue_event *_e = equeue_unqueue_by_address(q, e); 00432 if (_e) { 00433 equeue_dealloc(q, _e + 1); 00434 return true; 00435 } else { 00436 return false; 00437 } 00438 } 00439 00440 int equeue_timeleft(equeue_t *q, int id) 00441 { 00442 int ret = -1; 00443 00444 if (!id) { 00445 return -1; 00446 } 00447 00448 // decode event from unique id and check that the local id matches 00449 struct equeue_event *e = (struct equeue_event *) 00450 &q->buffer[id & ((1u << q->npw2) - 1u)]; 00451 00452 equeue_mutex_lock(&q->queuelock); 00453 if (e->id == (unsigned)id >> q->npw2) { 00454 ret = equeue_clampdiff(e->target, equeue_tick()); 00455 } 00456 equeue_mutex_unlock(&q->queuelock); 00457 return ret; 00458 } 00459 00460 int equeue_timeleft_user_allocated(equeue_t *q, void *e) 00461 { 00462 int ret = -1; 00463 00464 if (!e) { 00465 return -1; 00466 } 00467 00468 struct equeue_event *_e = (struct equeue_event *)e; 00469 equeue_mutex_lock(&q->queuelock); 00470 ret = equeue_clampdiff(_e->target, equeue_tick()); 00471 equeue_mutex_unlock(&q->queuelock); 00472 return ret; 00473 } 00474 00475 void equeue_break(equeue_t *q) 00476 { 00477 equeue_mutex_lock(&q->queuelock); 00478 q->break_requested = true; 00479 equeue_mutex_unlock(&q->queuelock); 00480 equeue_sema_signal(&q->eventsema); 00481 } 00482 00483 void equeue_dispatch(equeue_t *q, int ms) 00484 { 00485 unsigned tick = equeue_tick(); 00486 unsigned timeout = tick + ms; 00487 q->background.active = false; 00488 00489 while (1) { 00490 // collect all the available events and next deadline 00491 struct equeue_event *es = equeue_dequeue(q, tick); 00492 00493 // dispatch events 00494 while (es) { 00495 struct equeue_event *e = es; 00496 es = e->next; 00497 00498 // actually dispatch the callbacks 00499 void (*cb)(void *) = e->cb; 00500 if (cb) { 00501 cb(e + 1); 00502 } 00503 00504 // reenqueue periodic events or deallocate 00505 if (e->period >= 0) { 00506 e->target += e->period; 00507 equeue_enqueue(q, e, equeue_tick()); 00508 } else { 00509 equeue_incid(q, e); 00510 equeue_dealloc(q, e + 1); 00511 } 00512 } 00513 00514 int deadline = -1; 00515 tick = equeue_tick(); 00516 00517 // check if we should stop dispatching soon 00518 if (ms >= 0) { 00519 deadline = equeue_tickdiff(timeout, tick); 00520 if (deadline <= 0) { 00521 // update background timer if necessary 00522 if (q->background.update) { 00523 equeue_mutex_lock(&q->queuelock); 00524 if (q->background.update && q->queue) { 00525 q->background.update(q->background.timer, 00526 equeue_clampdiff(q->queue->target, tick)); 00527 } 00528 q->background.active = true; 00529 equeue_mutex_unlock(&q->queuelock); 00530 } 00531 q->break_requested = false; 00532 return; 00533 } 00534 } 00535 00536 // find closest deadline 00537 equeue_mutex_lock(&q->queuelock); 00538 if (q->queue) { 00539 int diff = equeue_clampdiff(q->queue->target, tick); 00540 if ((unsigned)diff < (unsigned)deadline) { 00541 deadline = diff; 00542 } 00543 } 00544 equeue_mutex_unlock(&q->queuelock); 00545 00546 // wait for events 00547 equeue_sema_wait(&q->eventsema, deadline); 00548 00549 // check if we were notified to break out of dispatch 00550 if (q->break_requested) { 00551 equeue_mutex_lock(&q->queuelock); 00552 if (q->break_requested) { 00553 q->break_requested = false; 00554 equeue_mutex_unlock(&q->queuelock); 00555 return; 00556 } 00557 equeue_mutex_unlock(&q->queuelock); 00558 } 00559 00560 // update tick for next iteration 00561 tick = equeue_tick(); 00562 } 00563 } 00564 00565 00566 // event functions 00567 void equeue_event_delay(void *p, int ms) 00568 { 00569 struct equeue_event *e = (struct equeue_event *)p - 1; 00570 e->target = ms; 00571 } 00572 00573 void equeue_event_period(void *p, int ms) 00574 { 00575 struct equeue_event *e = (struct equeue_event *)p - 1; 00576 e->period = ms; 00577 } 00578 00579 void equeue_event_dtor(void *p, void (*dtor)(void *)) 00580 { 00581 struct equeue_event *e = (struct equeue_event *)p - 1; 00582 e->dtor = dtor; 00583 } 00584 00585 00586 // simple callbacks 00587 struct ecallback { 00588 void (*cb)(void *); 00589 void *data; 00590 }; 00591 00592 static void ecallback_dispatch(void *p) 00593 { 00594 struct ecallback *e = (struct ecallback *)p; 00595 e->cb(e->data); 00596 } 00597 00598 int equeue_call(equeue_t *q, void (*cb)(void *), void *data) 00599 { 00600 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00601 if (!e) { 00602 return 0; 00603 } 00604 00605 e->cb = cb; 00606 e->data = data; 00607 return equeue_post(q, ecallback_dispatch, e); 00608 } 00609 00610 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void *), void *data) 00611 { 00612 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00613 if (!e) { 00614 return 0; 00615 } 00616 00617 equeue_event_delay(e, ms); 00618 e->cb = cb; 00619 e->data = data; 00620 return equeue_post(q, ecallback_dispatch, e); 00621 } 00622 00623 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void *), void *data) 00624 { 00625 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00626 if (!e) { 00627 return 0; 00628 } 00629 00630 equeue_event_delay(e, ms); 00631 equeue_event_period(e, ms); 00632 e->cb = cb; 00633 e->data = data; 00634 return equeue_post(q, ecallback_dispatch, e); 00635 } 00636 00637 00638 // backgrounding 00639 void equeue_background(equeue_t *q, 00640 void (*update)(void *timer, int ms), void *timer) 00641 { 00642 equeue_mutex_lock(&q->queuelock); 00643 if (q->background.update) { 00644 q->background.update(q->background.timer, -1); 00645 } 00646 00647 q->background.update = update; 00648 q->background.timer = timer; 00649 00650 if (q->background.update && q->queue) { 00651 q->background.update(q->background.timer, 00652 equeue_clampdiff(q->queue->target, equeue_tick())); 00653 } 00654 q->background.active = true; 00655 equeue_mutex_unlock(&q->queuelock); 00656 } 00657 00658 struct equeue_chain_context { 00659 equeue_t *q; 00660 equeue_t *target; 00661 int id; 00662 }; 00663 00664 static void equeue_chain_dispatch(void *p) 00665 { 00666 equeue_dispatch((equeue_t *)p, 0); 00667 } 00668 00669 static void equeue_chain_update(void *p, int ms) 00670 { 00671 struct equeue_chain_context *c = (struct equeue_chain_context *)p; 00672 (void)equeue_cancel(c->target, c->id); 00673 00674 if (ms >= 0) { 00675 c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); 00676 } else { 00677 equeue_dealloc(c->q, c); 00678 } 00679 } 00680 00681 int equeue_chain(equeue_t *q, equeue_t *target) 00682 { 00683 if (!target) { 00684 equeue_background(q, 0, 0); 00685 return 0; 00686 } 00687 00688 struct equeue_chain_context *c = equeue_alloc(q, 00689 sizeof(struct equeue_chain_context)); 00690 if (!c) { 00691 return -1; 00692 } 00693 00694 c->q = q; 00695 c->target = target; 00696 c->id = 0; 00697 00698 equeue_background(q, equeue_chain_update, c); 00699 return 0; 00700 }
Generated on Tue Jul 12 2022 13:54:18 by
1.7.2