Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
equeue.c
00001 /* 00002 * Flexible event queue for dispatching events 00003 * 00004 * Copyright (c) 2016 Christopher Haster 00005 * 00006 * Licensed under the Apache License, Version 2.0 (the "License"); 00007 * you may not use this file except in compliance with the License. 00008 * You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 #include "equeue/equeue.h" 00019 00020 #include <stdlib.h> 00021 #include <string.h> 00022 00023 00024 // calculate the relative-difference between absolute times while 00025 // correctly handling overflow conditions 00026 static inline int equeue_tickdiff(unsigned a, unsigned b) { 00027 return (int)(unsigned)(a - b); 00028 } 00029 00030 // calculate the relative-difference between absolute times, but 00031 // also clamp to zero, resulting in only non-zero values. 00032 static inline int equeue_clampdiff(unsigned a, unsigned b) { 00033 int diff = equeue_tickdiff(a, b); 00034 return ~(diff >> (8*sizeof(int)-1)) & diff; 00035 } 00036 00037 // Increment the unique id in an event, hiding the event from cancel 00038 static inline void equeue_incid(equeue_t *q, struct equeue_event *e) { 00039 e->id += 1; 00040 if ((e->id << q->npw2) == 0) { 00041 e->id = 1; 00042 } 00043 } 00044 00045 00046 // equeue lifetime management 00047 int equeue_create(equeue_t *q, size_t size) { 00048 // dynamically allocate the specified buffer 00049 void *buffer = malloc(size); 00050 if (!buffer) { 00051 return -1; 00052 } 00053 00054 int err = equeue_create_inplace(q, size, buffer); 00055 q->allocated = buffer; 00056 return err; 00057 } 00058 00059 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { 00060 // setup queue around provided buffer 00061 q->buffer = buffer; 00062 q->allocated = 0; 00063 00064 q->npw2 = 0; 00065 for (unsigned s = size; s; s >>= 1) { 00066 q->npw2++; 00067 } 00068 00069 q->chunks = 0; 00070 q->slab.size = size; 00071 q->slab.data = buffer; 00072 00073 q->queue = 0; 00074 q->tick = equeue_tick(); 00075 q->generation = 0; 00076 q->break_requested = false; 00077 00078 q->background.active = false; 00079 q->background.update = 0; 00080 q->background.timer = 0; 00081 00082 // initialize platform resources 00083 int err; 00084 err = equeue_sema_create(&q->eventsema); 00085 if (err < 0) { 00086 return err; 00087 } 00088 00089 err = equeue_mutex_create(&q->queuelock); 00090 if (err < 0) { 00091 return err; 00092 } 00093 00094 err = equeue_mutex_create(&q->memlock); 00095 if (err < 0) { 00096 return err; 00097 } 00098 00099 return 0; 00100 } 00101 00102 void equeue_destroy(equeue_t *q) { 00103 // call destructors on pending events 00104 for (struct equeue_event *es = q->queue; es; es = es->next) { 00105 for (struct equeue_event *e = q->queue; e; e = e->sibling) { 00106 if (e->dtor) { 00107 e->dtor(e + 1); 00108 } 00109 } 00110 } 00111 00112 // notify background timer 00113 if (q->background.update) { 00114 q->background.update(q->background.timer, -1); 00115 } 00116 00117 // clean up platform resources + memory 00118 equeue_mutex_destroy(&q->memlock); 00119 equeue_mutex_destroy(&q->queuelock); 00120 equeue_sema_destroy(&q->eventsema); 00121 free(q->allocated); 00122 } 00123 00124 00125 // equeue chunk allocation functions 00126 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) { 00127 // add event overhead 00128 size += sizeof(struct equeue_event); 00129 size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1); 00130 00131 equeue_mutex_lock(&q->memlock); 00132 00133 // check if a good chunk is available 00134 for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) { 00135 if ((*p)->size >= size) { 00136 struct equeue_event *e = *p; 00137 if (e->sibling) { 00138 *p = e->sibling; 00139 (*p)->next = e->next; 00140 } else { 00141 *p = e->next; 00142 } 00143 00144 equeue_mutex_unlock(&q->memlock); 00145 return e; 00146 } 00147 } 00148 00149 // otherwise allocate a new chunk out of the slab 00150 if (q->slab.size >= size) { 00151 struct equeue_event *e = (struct equeue_event *)q->slab.data; 00152 q->slab.data += size; 00153 q->slab.size -= size; 00154 e->size = size; 00155 e->id = 1; 00156 00157 equeue_mutex_unlock(&q->memlock); 00158 return e; 00159 } 00160 00161 equeue_mutex_unlock(&q->memlock); 00162 return 0; 00163 } 00164 00165 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) { 00166 equeue_mutex_lock(&q->memlock); 00167 00168 // stick chunk into list of chunks 00169 struct equeue_event **p = &q->chunks; 00170 while (*p && (*p)->size < e->size) { 00171 p = &(*p)->next; 00172 } 00173 00174 if (*p && (*p)->size == e->size) { 00175 e->sibling = *p; 00176 e->next = (*p)->next; 00177 } else { 00178 e->sibling = 0; 00179 e->next = *p; 00180 } 00181 *p = e; 00182 00183 equeue_mutex_unlock(&q->memlock); 00184 } 00185 00186 void *equeue_alloc(equeue_t *q, size_t size) { 00187 struct equeue_event *e = equeue_mem_alloc(q, size); 00188 if (!e) { 00189 return 0; 00190 } 00191 00192 e->target = 0; 00193 e->period = -1; 00194 e->dtor = 0; 00195 00196 return e + 1; 00197 } 00198 00199 void equeue_dealloc(equeue_t *q, void *p) { 00200 struct equeue_event *e = (struct equeue_event*)p - 1; 00201 00202 if (e->dtor) { 00203 e->dtor(e+1); 00204 } 00205 00206 equeue_mem_dealloc(q, e); 00207 } 00208 00209 00210 // equeue scheduling functions 00211 static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) { 00212 // setup event and hash local id with buffer offset for unique id 00213 int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer); 00214 e->target = tick + equeue_clampdiff(e->target, tick); 00215 e->generation = q->generation; 00216 00217 equeue_mutex_lock(&q->queuelock); 00218 00219 // find the event slot 00220 struct equeue_event **p = &q->queue; 00221 while (*p && equeue_tickdiff((*p)->target, e->target) < 0) { 00222 p = &(*p)->next; 00223 } 00224 00225 // insert at head in slot 00226 if (*p && (*p)->target == e->target) { 00227 e->next = (*p)->next; 00228 if (e->next) { 00229 e->next->ref = &e->next; 00230 } 00231 00232 e->sibling = *p; 00233 e->sibling->ref = &e->sibling; 00234 } else { 00235 e->next = *p; 00236 if (e->next) { 00237 e->next->ref = &e->next; 00238 } 00239 00240 e->sibling = 0; 00241 } 00242 00243 *p = e; 00244 e->ref = p; 00245 00246 // notify background timer 00247 if ((q->background.update && q->background.active) && 00248 (q->queue == e && !e->sibling)) { 00249 q->background.update(q->background.timer, 00250 equeue_clampdiff(e->target, tick)); 00251 } 00252 00253 equeue_mutex_unlock(&q->queuelock); 00254 00255 return id; 00256 } 00257 00258 static struct equeue_event *equeue_unqueue(equeue_t *q, int id) { 00259 // decode event from unique id and check that the local id matches 00260 struct equeue_event *e = (struct equeue_event *) 00261 &q->buffer[id & ((1 << q->npw2)-1)]; 00262 00263 equeue_mutex_lock(&q->queuelock); 00264 if (e->id != id >> q->npw2) { 00265 equeue_mutex_unlock(&q->queuelock); 00266 return 0; 00267 } 00268 00269 // clear the event and check if already in-flight 00270 e->cb = 0; 00271 e->period = -1; 00272 00273 int diff = equeue_tickdiff(e->target, q->tick); 00274 if (diff < 0 || (diff == 0 && e->generation != q->generation)) { 00275 equeue_mutex_unlock(&q->queuelock); 00276 return 0; 00277 } 00278 00279 // disentangle from queue 00280 if (e->sibling) { 00281 e->sibling->next = e->next; 00282 if (e->sibling->next) { 00283 e->sibling->next->ref = &e->sibling->next; 00284 } 00285 00286 *e->ref = e->sibling; 00287 e->sibling->ref = e->ref; 00288 } else { 00289 *e->ref = e->next; 00290 if (e->next) { 00291 e->next->ref = e->ref; 00292 } 00293 } 00294 00295 equeue_incid(q, e); 00296 equeue_mutex_unlock(&q->queuelock); 00297 00298 return e; 00299 } 00300 00301 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) { 00302 equeue_mutex_lock(&q->queuelock); 00303 00304 // find all expired events and mark a new generation 00305 q->generation += 1; 00306 if (equeue_tickdiff(q->tick, target) <= 0) { 00307 q->tick = target; 00308 } 00309 00310 struct equeue_event *head = q->queue; 00311 struct equeue_event **p = &head; 00312 while (*p && equeue_tickdiff((*p)->target, target) <= 0) { 00313 p = &(*p)->next; 00314 } 00315 00316 q->queue = *p; 00317 if (q->queue) { 00318 q->queue->ref = &q->queue; 00319 } 00320 00321 *p = 0; 00322 00323 equeue_mutex_unlock(&q->queuelock); 00324 00325 // reverse and flatten each slot to match insertion order 00326 struct equeue_event **tail = &head; 00327 struct equeue_event *ess = head; 00328 while (ess) { 00329 struct equeue_event *es = ess; 00330 ess = es->next; 00331 00332 struct equeue_event *prev = 0; 00333 for (struct equeue_event *e = es; e; e = e->sibling) { 00334 e->next = prev; 00335 prev = e; 00336 } 00337 00338 *tail = prev; 00339 tail = &es->next; 00340 } 00341 00342 return head; 00343 } 00344 00345 int equeue_post(equeue_t *q, void (*cb)(void*), void *p) { 00346 struct equeue_event *e = (struct equeue_event*)p - 1; 00347 unsigned tick = equeue_tick(); 00348 e->cb = cb; 00349 e->target = tick + e->target; 00350 00351 int id = equeue_enqueue(q, e, tick); 00352 equeue_sema_signal(&q->eventsema); 00353 return id; 00354 } 00355 00356 void equeue_cancel(equeue_t *q, int id) { 00357 if (!id) { 00358 return; 00359 } 00360 00361 struct equeue_event *e = equeue_unqueue(q, id); 00362 if (e) { 00363 equeue_dealloc(q, e + 1); 00364 } 00365 } 00366 00367 int equeue_timeleft(equeue_t *q, int id) { 00368 int ret = -1; 00369 00370 if (!id) { 00371 return -1; 00372 } 00373 00374 // decode event from unique id and check that the local id matches 00375 struct equeue_event *e = (struct equeue_event *) 00376 &q->buffer[id & ((1 << q->npw2)-1)]; 00377 00378 equeue_mutex_lock(&q->queuelock); 00379 if (e->id == id >> q->npw2) { 00380 ret = equeue_clampdiff(e->target, equeue_tick()); 00381 } 00382 equeue_mutex_unlock(&q->queuelock); 00383 return ret; 00384 } 00385 00386 void equeue_break(equeue_t *q) { 00387 equeue_mutex_lock(&q->queuelock); 00388 q->break_requested = true; 00389 equeue_mutex_unlock(&q->queuelock); 00390 equeue_sema_signal(&q->eventsema); 00391 } 00392 00393 void equeue_dispatch(equeue_t *q, int ms) { 00394 unsigned tick = equeue_tick(); 00395 unsigned timeout = tick + ms; 00396 q->background.active = false; 00397 00398 while (1) { 00399 // collect all the available events and next deadline 00400 struct equeue_event *es = equeue_dequeue(q, tick); 00401 00402 // dispatch events 00403 while (es) { 00404 struct equeue_event *e = es; 00405 es = e->next; 00406 00407 // actually dispatch the callbacks 00408 void (*cb)(void *) = e->cb; 00409 if (cb) { 00410 cb(e + 1); 00411 } 00412 00413 // reenqueue periodic events or deallocate 00414 if (e->period >= 0) { 00415 e->target += e->period; 00416 equeue_enqueue(q, e, equeue_tick()); 00417 } else { 00418 equeue_incid(q, e); 00419 equeue_dealloc(q, e+1); 00420 } 00421 } 00422 00423 int deadline = -1; 00424 tick = equeue_tick(); 00425 00426 // check if we should stop dispatching soon 00427 if (ms >= 0) { 00428 deadline = equeue_tickdiff(timeout, tick); 00429 if (deadline <= 0) { 00430 // update background timer if necessary 00431 if (q->background.update) { 00432 equeue_mutex_lock(&q->queuelock); 00433 if (q->background.update && q->queue) { 00434 q->background.update(q->background.timer, 00435 equeue_clampdiff(q->queue->target, tick)); 00436 } 00437 q->background.active = true; 00438 equeue_mutex_unlock(&q->queuelock); 00439 } 00440 q->break_requested = false; 00441 return; 00442 } 00443 } 00444 00445 // find closest deadline 00446 equeue_mutex_lock(&q->queuelock); 00447 if (q->queue) { 00448 int diff = equeue_clampdiff(q->queue->target, tick); 00449 if ((unsigned)diff < (unsigned)deadline) { 00450 deadline = diff; 00451 } 00452 } 00453 equeue_mutex_unlock(&q->queuelock); 00454 00455 // wait for events 00456 equeue_sema_wait(&q->eventsema, deadline); 00457 00458 // check if we were notified to break out of dispatch 00459 if (q->break_requested) { 00460 equeue_mutex_lock(&q->queuelock); 00461 if (q->break_requested) { 00462 q->break_requested = false; 00463 equeue_mutex_unlock(&q->queuelock); 00464 return; 00465 } 00466 equeue_mutex_unlock(&q->queuelock); 00467 } 00468 00469 // update tick for next iteration 00470 tick = equeue_tick(); 00471 } 00472 } 00473 00474 00475 // event functions 00476 void equeue_event_delay(void *p, int ms) { 00477 struct equeue_event *e = (struct equeue_event*)p - 1; 00478 e->target = ms; 00479 } 00480 00481 void equeue_event_period(void *p, int ms) { 00482 struct equeue_event *e = (struct equeue_event*)p - 1; 00483 e->period = ms; 00484 } 00485 00486 void equeue_event_dtor(void *p, void (*dtor)(void *)) { 00487 struct equeue_event *e = (struct equeue_event*)p - 1; 00488 e->dtor = dtor; 00489 } 00490 00491 00492 // simple callbacks 00493 struct ecallback { 00494 void (*cb)(void*); 00495 void *data; 00496 }; 00497 00498 static void ecallback_dispatch(void *p) { 00499 struct ecallback *e = (struct ecallback*)p; 00500 e->cb(e->data); 00501 } 00502 00503 int equeue_call(equeue_t *q, void (*cb)(void*), void *data) { 00504 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00505 if (!e) { 00506 return 0; 00507 } 00508 00509 e->cb = cb; 00510 e->data = data; 00511 return equeue_post(q, ecallback_dispatch, e); 00512 } 00513 00514 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00515 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00516 if (!e) { 00517 return 0; 00518 } 00519 00520 equeue_event_delay(e, ms); 00521 e->cb = cb; 00522 e->data = data; 00523 return equeue_post(q, ecallback_dispatch, e); 00524 } 00525 00526 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00527 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00528 if (!e) { 00529 return 0; 00530 } 00531 00532 equeue_event_delay(e, ms); 00533 equeue_event_period(e, ms); 00534 e->cb = cb; 00535 e->data = data; 00536 return equeue_post(q, ecallback_dispatch, e); 00537 } 00538 00539 00540 // backgrounding 00541 void equeue_background(equeue_t *q, 00542 void (*update)(void *timer, int ms), void *timer) { 00543 equeue_mutex_lock(&q->queuelock); 00544 if (q->background.update) { 00545 q->background.update(q->background.timer, -1); 00546 } 00547 00548 q->background.update = update; 00549 q->background.timer = timer; 00550 00551 if (q->background.update && q->queue) { 00552 q->background.update(q->background.timer, 00553 equeue_clampdiff(q->queue->target, equeue_tick())); 00554 } 00555 q->background.active = true; 00556 equeue_mutex_unlock(&q->queuelock); 00557 } 00558 00559 struct equeue_chain_context { 00560 equeue_t *q; 00561 equeue_t *target; 00562 int id; 00563 }; 00564 00565 static void equeue_chain_dispatch(void *p) { 00566 equeue_dispatch((equeue_t *)p, 0); 00567 } 00568 00569 static void equeue_chain_update(void *p, int ms) { 00570 struct equeue_chain_context *c = (struct equeue_chain_context *)p; 00571 equeue_cancel(c->target, c->id); 00572 00573 if (ms >= 0) { 00574 c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); 00575 } else { 00576 equeue_dealloc(c->target, c); 00577 } 00578 } 00579 00580 void equeue_chain(equeue_t *q, equeue_t *target) { 00581 if (!target) { 00582 equeue_background(q, 0, 0); 00583 return; 00584 } 00585 00586 struct equeue_chain_context *c = equeue_alloc(q, 00587 sizeof(struct equeue_chain_context)); 00588 00589 c->q = q; 00590 c->target = target; 00591 c->id = 0; 00592 00593 equeue_background(q, equeue_chain_update, c); 00594 }
Generated on Tue Jul 12 2022 12:43:55 by
