Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
equeue.c
00001 /* 00002 * Flexible event queue for dispatching events 00003 * 00004 * Copyright (c) 2016 Christopher Haster 00005 * 00006 * Licensed under the Apache License, Version 2.0 (the "License"); 00007 * you may not use this file except in compliance with the License. 00008 * You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 #include "equeue/equeue.h" 00019 00020 #include <stdlib.h> 00021 #include <string.h> 00022 00023 00024 // calculate the relative-difference between absolute times while 00025 // correctly handling overflow conditions 00026 static inline int equeue_tickdiff(unsigned a, unsigned b) { 00027 return (int)(unsigned)(a - b); 00028 } 00029 00030 // calculate the relative-difference between absolute times, but 00031 // also clamp to zero, resulting in only non-zero values. 00032 static inline int equeue_clampdiff(unsigned a, unsigned b) { 00033 int diff = equeue_tickdiff(a, b); 00034 return ~(diff >> (8*sizeof(int)-1)) & diff; 00035 } 00036 00037 // Increment the unique id in an event, hiding the event from cancel 00038 static inline void equeue_incid(equeue_t *q, struct equeue_event *e) { 00039 e->id += 1; 00040 if ((e->id << q->npw2) == 0) { 00041 e->id = 1; 00042 } 00043 } 00044 00045 00046 // equeue lifetime management 00047 int equeue_create(equeue_t *q, size_t size) { 00048 // dynamically allocate the specified buffer 00049 void *buffer = malloc(size); 00050 if (!buffer) { 00051 return -1; 00052 } 00053 00054 int err = equeue_create_inplace(q, size, buffer); 00055 q->allocated = buffer; 00056 return err; 00057 } 00058 00059 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { 00060 // setup queue around provided buffer 00061 q->buffer = buffer; 00062 q->allocated = 0; 00063 00064 q->npw2 = 0; 00065 for (unsigned s = size; s; s >>= 1) { 00066 q->npw2++; 00067 } 00068 00069 q->chunks = 0; 00070 q->slab.size = size; 00071 q->slab.data = buffer; 00072 00073 q->queue = 0; 00074 q->tick = equeue_tick(); 00075 q->generation = 0; 00076 q->break_requested = false; 00077 00078 q->background.active = false; 00079 q->background.update = 0; 00080 q->background.timer = 0; 00081 00082 // initialize platform resources 00083 int err; 00084 err = equeue_sema_create(&q->eventsema); 00085 if (err < 0) { 00086 return err; 00087 } 00088 00089 err = equeue_mutex_create(&q->queuelock); 00090 if (err < 0) { 00091 return err; 00092 } 00093 00094 err = equeue_mutex_create(&q->memlock); 00095 if (err < 0) { 00096 return err; 00097 } 00098 00099 return 0; 00100 } 00101 00102 void equeue_destroy(equeue_t *q) { 00103 // call destructors on pending events 00104 for (struct equeue_event *es = q->queue; es; es = es->next) { 00105 for (struct equeue_event *e = q->queue; e; e = e->sibling) { 00106 if (e->dtor) { 00107 e->dtor(e + 1); 00108 } 00109 } 00110 } 00111 00112 // notify background timer 00113 if (q->background.update) { 00114 q->background.update(q->background.timer, -1); 00115 } 00116 00117 // clean up platform resources + memory 00118 equeue_mutex_destroy(&q->memlock); 00119 equeue_mutex_destroy(&q->queuelock); 00120 equeue_sema_destroy(&q->eventsema); 00121 free(q->allocated); 00122 } 00123 00124 00125 // equeue chunk allocation functions 00126 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) { 00127 // add event overhead 00128 size += sizeof(struct equeue_event); 00129 size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1); 00130 00131 equeue_mutex_lock(&q->memlock); 00132 00133 // check if a good chunk is available 00134 for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) { 00135 if ((*p)->size >= size) { 00136 struct equeue_event *e = *p; 00137 if (e->sibling) { 00138 *p = e->sibling; 00139 (*p)->next = e->next; 00140 } else { 00141 *p = e->next; 00142 } 00143 00144 equeue_mutex_unlock(&q->memlock); 00145 return e; 00146 } 00147 } 00148 00149 // otherwise allocate a new chunk out of the slab 00150 if (q->slab.size >= size) { 00151 struct equeue_event *e = (struct equeue_event *)q->slab.data; 00152 q->slab.data += size; 00153 q->slab.size -= size; 00154 e->size = size; 00155 e->id = 1; 00156 00157 equeue_mutex_unlock(&q->memlock); 00158 return e; 00159 } 00160 00161 equeue_mutex_unlock(&q->memlock); 00162 return 0; 00163 } 00164 00165 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) { 00166 equeue_mutex_lock(&q->memlock); 00167 00168 // stick chunk into list of chunks 00169 struct equeue_event **p = &q->chunks; 00170 while (*p && (*p)->size < e->size) { 00171 p = &(*p)->next; 00172 } 00173 00174 if (*p && (*p)->size == e->size) { 00175 e->sibling = *p; 00176 e->next = (*p)->next; 00177 } else { 00178 e->sibling = 0; 00179 e->next = *p; 00180 } 00181 *p = e; 00182 00183 equeue_mutex_unlock(&q->memlock); 00184 } 00185 00186 void *equeue_alloc(equeue_t *q, size_t size) { 00187 struct equeue_event *e = equeue_mem_alloc(q, size); 00188 if (!e) { 00189 return 0; 00190 } 00191 00192 e->target = 0; 00193 e->period = -1; 00194 e->dtor = 0; 00195 00196 return e + 1; 00197 } 00198 00199 void equeue_dealloc(equeue_t *q, void *p) { 00200 struct equeue_event *e = (struct equeue_event*)p - 1; 00201 00202 if (e->dtor) { 00203 e->dtor(e+1); 00204 } 00205 00206 equeue_mem_dealloc(q, e); 00207 } 00208 00209 00210 // equeue scheduling functions 00211 static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) { 00212 // setup event and hash local id with buffer offset for unique id 00213 int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer); 00214 e->target = tick + equeue_clampdiff(e->target, tick); 00215 e->generation = q->generation; 00216 00217 equeue_mutex_lock(&q->queuelock); 00218 00219 // find the event slot 00220 struct equeue_event **p = &q->queue; 00221 while (*p && equeue_tickdiff((*p)->target, e->target) < 0) { 00222 p = &(*p)->next; 00223 } 00224 00225 // insert at head in slot 00226 if (*p && (*p)->target == e->target) { 00227 e->next = (*p)->next; 00228 if (e->next) { 00229 e->next->ref = &e->next; 00230 } 00231 00232 e->sibling = *p; 00233 e->sibling->ref = &e->sibling; 00234 } else { 00235 e->next = *p; 00236 if (e->next) { 00237 e->next->ref = &e->next; 00238 } 00239 00240 e->sibling = 0; 00241 } 00242 00243 *p = e; 00244 e->ref = p; 00245 00246 // notify background timer 00247 if ((q->background.update && q->background.active) && 00248 (q->queue == e && !e->sibling)) { 00249 q->background.update(q->background.timer, 00250 equeue_clampdiff(e->target, tick)); 00251 } 00252 00253 equeue_mutex_unlock(&q->queuelock); 00254 00255 return id; 00256 } 00257 00258 static struct equeue_event *equeue_unqueue(equeue_t *q, int id) { 00259 // decode event from unique id and check that the local id matches 00260 struct equeue_event *e = (struct equeue_event *) 00261 &q->buffer[id & ((1 << q->npw2)-1)]; 00262 00263 equeue_mutex_lock(&q->queuelock); 00264 if (e->id != id >> q->npw2) { 00265 equeue_mutex_unlock(&q->queuelock); 00266 return 0; 00267 } 00268 00269 // clear the event and check if already in-flight 00270 e->cb = 0; 00271 e->period = -1; 00272 00273 int diff = equeue_tickdiff(e->target, q->tick); 00274 if (diff < 0 || (diff == 0 && e->generation != q->generation)) { 00275 equeue_mutex_unlock(&q->queuelock); 00276 return 0; 00277 } 00278 00279 // disentangle from queue 00280 if (e->sibling) { 00281 e->sibling->next = e->next; 00282 if (e->sibling->next) { 00283 e->sibling->next->ref = &e->sibling->next; 00284 } 00285 00286 *e->ref = e->sibling; 00287 e->sibling->ref = e->ref; 00288 } else { 00289 *e->ref = e->next; 00290 if (e->next) { 00291 e->next->ref = e->ref; 00292 } 00293 } 00294 00295 equeue_incid(q, e); 00296 equeue_mutex_unlock(&q->queuelock); 00297 00298 return e; 00299 } 00300 00301 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) { 00302 equeue_mutex_lock(&q->queuelock); 00303 00304 // find all expired events and mark a new generation 00305 q->generation += 1; 00306 if (equeue_tickdiff(q->tick, target) <= 0) { 00307 q->tick = target; 00308 } 00309 00310 struct equeue_event *head = q->queue; 00311 struct equeue_event **p = &head; 00312 while (*p && equeue_tickdiff((*p)->target, target) <= 0) { 00313 p = &(*p)->next; 00314 } 00315 00316 q->queue = *p; 00317 if (q->queue) { 00318 q->queue->ref = &q->queue; 00319 } 00320 00321 *p = 0; 00322 00323 equeue_mutex_unlock(&q->queuelock); 00324 00325 // reverse and flatten each slot to match insertion order 00326 struct equeue_event **tail = &head; 00327 struct equeue_event *ess = head; 00328 while (ess) { 00329 struct equeue_event *es = ess; 00330 ess = es->next; 00331 00332 struct equeue_event *prev = 0; 00333 for (struct equeue_event *e = es; e; e = e->sibling) { 00334 e->next = prev; 00335 prev = e; 00336 } 00337 00338 *tail = prev; 00339 tail = &es->next; 00340 } 00341 00342 return head; 00343 } 00344 00345 int equeue_post(equeue_t *q, void (*cb)(void*), void *p) { 00346 struct equeue_event *e = (struct equeue_event*)p - 1; 00347 unsigned tick = equeue_tick(); 00348 e->cb = cb; 00349 e->target = tick + e->target; 00350 00351 int id = equeue_enqueue(q, e, tick); 00352 equeue_sema_signal(&q->eventsema); 00353 return id; 00354 } 00355 00356 void equeue_cancel(equeue_t *q, int id) { 00357 if (!id) { 00358 return; 00359 } 00360 00361 struct equeue_event *e = equeue_unqueue(q, id); 00362 if (e) { 00363 equeue_dealloc(q, e + 1); 00364 } 00365 } 00366 00367 void equeue_break(equeue_t *q) { 00368 equeue_mutex_lock(&q->queuelock); 00369 q->break_requested = true; 00370 equeue_mutex_unlock(&q->queuelock); 00371 equeue_sema_signal(&q->eventsema); 00372 } 00373 00374 void equeue_dispatch(equeue_t *q, int ms) { 00375 unsigned tick = equeue_tick(); 00376 unsigned timeout = tick + ms; 00377 q->background.active = false; 00378 00379 while (1) { 00380 // collect all the available events and next deadline 00381 struct equeue_event *es = equeue_dequeue(q, tick); 00382 00383 // dispatch events 00384 while (es) { 00385 struct equeue_event *e = es; 00386 es = e->next; 00387 00388 // actually dispatch the callbacks 00389 void (*cb)(void *) = e->cb; 00390 if (cb) { 00391 cb(e + 1); 00392 } 00393 00394 // reenqueue periodic events or deallocate 00395 if (e->period >= 0) { 00396 e->target += e->period; 00397 equeue_enqueue(q, e, equeue_tick()); 00398 } else { 00399 equeue_incid(q, e); 00400 equeue_dealloc(q, e+1); 00401 } 00402 } 00403 00404 int deadline = -1; 00405 tick = equeue_tick(); 00406 00407 // check if we should stop dispatching soon 00408 if (ms >= 0) { 00409 deadline = equeue_tickdiff(timeout, tick); 00410 if (deadline <= 0) { 00411 // update background timer if necessary 00412 if (q->background.update) { 00413 equeue_mutex_lock(&q->queuelock); 00414 if (q->background.update && q->queue) { 00415 q->background.update(q->background.timer, 00416 equeue_clampdiff(q->queue->target, tick)); 00417 } 00418 q->background.active = true; 00419 equeue_mutex_unlock(&q->queuelock); 00420 } 00421 q->break_requested = false; 00422 return; 00423 } 00424 } 00425 00426 // find closest deadline 00427 equeue_mutex_lock(&q->queuelock); 00428 if (q->queue) { 00429 int diff = equeue_clampdiff(q->queue->target, tick); 00430 if ((unsigned)diff < (unsigned)deadline) { 00431 deadline = diff; 00432 } 00433 } 00434 equeue_mutex_unlock(&q->queuelock); 00435 00436 // wait for events 00437 equeue_sema_wait(&q->eventsema, deadline); 00438 00439 // check if we were notified to break out of dispatch 00440 if (q->break_requested) { 00441 equeue_mutex_lock(&q->queuelock); 00442 if (q->break_requested) { 00443 q->break_requested = false; 00444 equeue_mutex_unlock(&q->queuelock); 00445 return; 00446 } 00447 equeue_mutex_unlock(&q->queuelock); 00448 } 00449 00450 // update tick for next iteration 00451 tick = equeue_tick(); 00452 } 00453 } 00454 00455 00456 // event functions 00457 void equeue_event_delay(void *p, int ms) { 00458 struct equeue_event *e = (struct equeue_event*)p - 1; 00459 e->target = ms; 00460 } 00461 00462 void equeue_event_period(void *p, int ms) { 00463 struct equeue_event *e = (struct equeue_event*)p - 1; 00464 e->period = ms; 00465 } 00466 00467 void equeue_event_dtor(void *p, void (*dtor)(void *)) { 00468 struct equeue_event *e = (struct equeue_event*)p - 1; 00469 e->dtor = dtor; 00470 } 00471 00472 00473 // simple callbacks 00474 struct ecallback { 00475 void (*cb)(void*); 00476 void *data; 00477 }; 00478 00479 static void ecallback_dispatch(void *p) { 00480 struct ecallback *e = (struct ecallback*)p; 00481 e->cb(e->data); 00482 } 00483 00484 int equeue_call(equeue_t *q, void (*cb)(void*), void *data) { 00485 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00486 if (!e) { 00487 return 0; 00488 } 00489 00490 e->cb = cb; 00491 e->data = data; 00492 return equeue_post(q, ecallback_dispatch, e); 00493 } 00494 00495 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00496 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00497 if (!e) { 00498 return 0; 00499 } 00500 00501 equeue_event_delay(e, ms); 00502 e->cb = cb; 00503 e->data = data; 00504 return equeue_post(q, ecallback_dispatch, e); 00505 } 00506 00507 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00508 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00509 if (!e) { 00510 return 0; 00511 } 00512 00513 equeue_event_delay(e, ms); 00514 equeue_event_period(e, ms); 00515 e->cb = cb; 00516 e->data = data; 00517 return equeue_post(q, ecallback_dispatch, e); 00518 } 00519 00520 00521 // backgrounding 00522 void equeue_background(equeue_t *q, 00523 void (*update)(void *timer, int ms), void *timer) { 00524 equeue_mutex_lock(&q->queuelock); 00525 if (q->background.update) { 00526 q->background.update(q->background.timer, -1); 00527 } 00528 00529 q->background.update = update; 00530 q->background.timer = timer; 00531 00532 if (q->background.update && q->queue) { 00533 q->background.update(q->background.timer, 00534 equeue_clampdiff(q->queue->target, equeue_tick())); 00535 } 00536 q->background.active = true; 00537 equeue_mutex_unlock(&q->queuelock); 00538 } 00539 00540 struct equeue_chain_context { 00541 equeue_t *q; 00542 equeue_t *target; 00543 int id; 00544 }; 00545 00546 static void equeue_chain_dispatch(void *p) { 00547 equeue_dispatch((equeue_t *)p, 0); 00548 } 00549 00550 static void equeue_chain_update(void *p, int ms) { 00551 struct equeue_chain_context *c = (struct equeue_chain_context *)p; 00552 equeue_cancel(c->target, c->id); 00553 00554 if (ms >= 0) { 00555 c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); 00556 } else { 00557 equeue_dealloc(c->target, c); 00558 } 00559 } 00560 00561 void equeue_chain(equeue_t *q, equeue_t *target) { 00562 if (!target) { 00563 equeue_background(q, 0, 0); 00564 return; 00565 } 00566 00567 struct equeue_chain_context *c = equeue_alloc(q, 00568 sizeof(struct equeue_chain_context)); 00569 00570 c->q = q; 00571 c->target = target; 00572 c->id = 0; 00573 00574 equeue_background(q, equeue_chain_update, c); 00575 }
Generated on Tue Jul 12 2022 14:23:37 by
