BA
/
BaBoRo1
Embed:
(wiki syntax)
Show/hide line numbers
equeue.c
00001 /* 00002 * Flexible event queue for dispatching events 00003 * 00004 * Copyright (c) 2016 Christopher Haster 00005 * 00006 * Licensed under the Apache License, Version 2.0 (the "License"); 00007 * you may not use this file except in compliance with the License. 00008 * You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 #include "equeue/equeue.h" 00019 00020 #include <stdlib.h> 00021 #include <string.h> 00022 00023 00024 // calculate the relative-difference between absolute times while 00025 // correctly handling overflow conditions 00026 static inline int equeue_tickdiff(unsigned a, unsigned b) { 00027 return (int)(unsigned)(a - b); 00028 } 00029 00030 // calculate the relative-difference between absolute times, but 00031 // also clamp to zero, resulting in only non-zero values. 00032 static inline int equeue_clampdiff(unsigned a, unsigned b) { 00033 int diff = equeue_tickdiff(a, b); 00034 return ~(diff >> (8*sizeof(int)-1)) & diff; 00035 } 00036 00037 // Increment the unique id in an event, hiding the event from cancel 00038 static inline void equeue_incid(equeue_t *q, struct equeue_event *e) { 00039 e->id += 1; 00040 if ((e->id << q->npw2) == 0) { 00041 e->id = 1; 00042 } 00043 } 00044 00045 00046 // equeue lifetime management 00047 int equeue_create(equeue_t *q, size_t size) { 00048 // dynamically allocate the specified buffer 00049 void *buffer = malloc(size); 00050 if (!buffer) { 00051 return -1; 00052 } 00053 00054 int err = equeue_create_inplace(q, size, buffer); 00055 q->allocated = buffer; 00056 return err; 00057 } 00058 00059 int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { 00060 // setup queue around provided buffer 00061 q->buffer = buffer; 00062 q->allocated = 0; 00063 00064 q->npw2 = 0; 00065 for (unsigned s = size; s; s >>= 1) { 00066 q->npw2++; 00067 } 00068 00069 q->chunks = 0; 00070 q->slab.size = size; 00071 q->slab.data = buffer; 00072 00073 q->queue = 0; 00074 q->tick = equeue_tick(); 00075 q->generation = 0; 00076 q->break_requested = false; 00077 00078 q->background.active = false; 00079 q->background.update = 0; 00080 q->background.timer = 0; 00081 00082 // initialize platform resources 00083 int err; 00084 err = equeue_sema_create(&q->eventsema); 00085 if (err < 0) { 00086 return err; 00087 } 00088 00089 err = equeue_mutex_create(&q->queuelock); 00090 if (err < 0) { 00091 return err; 00092 } 00093 00094 err = equeue_mutex_create(&q->memlock); 00095 if (err < 0) { 00096 return err; 00097 } 00098 00099 return 0; 00100 } 00101 00102 void equeue_destroy(equeue_t *q) { 00103 // call destructors on pending events 00104 for (struct equeue_event *es = q->queue; es; es = es->next) { 00105 for (struct equeue_event *e = q->queue; e; e = e->sibling) { 00106 if (e->dtor) { 00107 e->dtor(e + 1); 00108 } 00109 } 00110 } 00111 00112 // notify background timer 00113 if (q->background.update) { 00114 q->background.update(q->background.timer, -1); 00115 } 00116 00117 // clean up platform resources + memory 00118 equeue_mutex_destroy(&q->memlock); 00119 equeue_mutex_destroy(&q->queuelock); 00120 equeue_sema_destroy(&q->eventsema); 00121 free(q->allocated); 00122 } 00123 00124 00125 // equeue chunk allocation functions 00126 static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) { 00127 // add event overhead 00128 size += sizeof(struct equeue_event); 00129 size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1); 00130 00131 equeue_mutex_lock(&q->memlock); 00132 00133 // check if a good chunk is available 00134 for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) { 00135 if ((*p)->size >= size) { 00136 struct equeue_event *e = *p; 00137 if (e->sibling) { 00138 *p = e->sibling; 00139 (*p)->next = e->next; 00140 } else { 00141 *p = e->next; 00142 } 00143 00144 equeue_mutex_unlock(&q->memlock); 00145 return e; 00146 } 00147 } 00148 00149 // otherwise allocate a new chunk out of the slab 00150 if (q->slab.size >= size) { 00151 struct equeue_event *e = (struct equeue_event *)q->slab.data; 00152 q->slab.data += size; 00153 q->slab.size -= size; 00154 e->size = size; 00155 e->id = 1; 00156 00157 equeue_mutex_unlock(&q->memlock); 00158 return e; 00159 } 00160 00161 equeue_mutex_unlock(&q->memlock); 00162 return 0; 00163 } 00164 00165 static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) { 00166 equeue_mutex_lock(&q->memlock); 00167 00168 // stick chunk into list of chunks 00169 struct equeue_event **p = &q->chunks; 00170 while (*p && (*p)->size < e->size) { 00171 p = &(*p)->next; 00172 } 00173 00174 if (*p && (*p)->size == e->size) { 00175 e->sibling = *p; 00176 e->next = (*p)->next; 00177 } else { 00178 e->sibling = 0; 00179 e->next = *p; 00180 } 00181 *p = e; 00182 00183 equeue_mutex_unlock(&q->memlock); 00184 } 00185 00186 void *equeue_alloc(equeue_t *q, size_t size) { 00187 struct equeue_event *e = equeue_mem_alloc(q, size); 00188 if (!e) { 00189 return 0; 00190 } 00191 00192 e->target = 0; 00193 e->period = -1; 00194 e->dtor = 0; 00195 00196 return e + 1; 00197 } 00198 00199 void equeue_dealloc(equeue_t *q, void *p) { 00200 struct equeue_event *e = (struct equeue_event*)p - 1; 00201 00202 if (e->dtor) { 00203 e->dtor(e+1); 00204 } 00205 00206 equeue_mem_dealloc(q, e); 00207 } 00208 00209 00210 // equeue scheduling functions 00211 static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned tick) { 00212 // setup event and hash local id with buffer offset for unique id 00213 int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer); 00214 e->target = tick + equeue_clampdiff(e->target, tick); 00215 e->generation = q->generation; 00216 00217 equeue_mutex_lock(&q->queuelock); 00218 00219 // find the event slot 00220 struct equeue_event **p = &q->queue; 00221 while (*p && equeue_tickdiff((*p)->target, e->target) < 0) { 00222 p = &(*p)->next; 00223 } 00224 00225 // insert at head in slot 00226 if (*p && (*p)->target == e->target) { 00227 e->next = (*p)->next; 00228 if (e->next) { 00229 e->next->ref = &e->next; 00230 } 00231 00232 e->sibling = *p; 00233 e->sibling->ref = &e->sibling; 00234 } else { 00235 e->next = *p; 00236 if (e->next) { 00237 e->next->ref = &e->next; 00238 } 00239 00240 e->sibling = 0; 00241 } 00242 00243 *p = e; 00244 e->ref = p; 00245 00246 // notify background timer 00247 if ((q->background.update && q->background.active) && 00248 (q->queue == e && !e->sibling)) { 00249 q->background.update(q->background.timer, 00250 equeue_clampdiff(e->target, tick)); 00251 } 00252 00253 equeue_mutex_unlock(&q->queuelock); 00254 00255 return id; 00256 } 00257 00258 static struct equeue_event *equeue_unqueue(equeue_t *q, int id) { 00259 // decode event from unique id and check that the local id matches 00260 struct equeue_event *e = (struct equeue_event *) 00261 &q->buffer[id & ((1 << q->npw2)-1)]; 00262 00263 equeue_mutex_lock(&q->queuelock); 00264 if (e->id != id >> q->npw2) { 00265 equeue_mutex_unlock(&q->queuelock); 00266 return 0; 00267 } 00268 00269 // clear the event and check if already in-flight 00270 e->cb = 0; 00271 e->period = -1; 00272 00273 int diff = equeue_tickdiff(e->target, q->tick); 00274 if (diff < 0 || (diff == 0 && e->generation != q->generation)) { 00275 equeue_mutex_unlock(&q->queuelock); 00276 return 0; 00277 } 00278 00279 // disentangle from queue 00280 if (e->sibling) { 00281 e->sibling->next = e->next; 00282 if (e->sibling->next) { 00283 e->sibling->next->ref = &e->sibling->next; 00284 } 00285 00286 *e->ref = e->sibling; 00287 e->sibling->ref = e->ref; 00288 } else { 00289 *e->ref = e->next; 00290 if (e->next) { 00291 e->next->ref = e->ref; 00292 } 00293 } 00294 00295 equeue_incid(q, e); 00296 equeue_mutex_unlock(&q->queuelock); 00297 00298 return e; 00299 } 00300 00301 static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) { 00302 equeue_mutex_lock(&q->queuelock); 00303 00304 // find all expired events and mark a new generation 00305 q->generation += 1; 00306 if (equeue_tickdiff(q->tick, target) <= 0) { 00307 q->tick = target; 00308 } 00309 00310 struct equeue_event *head = q->queue; 00311 struct equeue_event **p = &head; 00312 while (*p && equeue_tickdiff((*p)->target, target) <= 0) { 00313 p = &(*p)->next; 00314 } 00315 00316 q->queue = *p; 00317 if (q->queue) { 00318 q->queue->ref = &q->queue; 00319 } 00320 00321 *p = 0; 00322 00323 equeue_mutex_unlock(&q->queuelock); 00324 00325 // reverse and flatten each slot to match insertion order 00326 struct equeue_event **tail = &head; 00327 struct equeue_event *ess = head; 00328 while (ess) { 00329 struct equeue_event *es = ess; 00330 ess = es->next; 00331 00332 struct equeue_event *prev = 0; 00333 for (struct equeue_event *e = es; e; e = e->sibling) { 00334 e->next = prev; 00335 prev = e; 00336 } 00337 00338 *tail = prev; 00339 tail = &es->next; 00340 } 00341 00342 return head; 00343 } 00344 00345 int equeue_post(equeue_t *q, void (*cb)(void*), void *p) { 00346 struct equeue_event *e = (struct equeue_event*)p - 1; 00347 unsigned tick = equeue_tick(); 00348 e->cb = cb; 00349 e->target = tick + e->target; 00350 00351 int id = equeue_enqueue(q, e, tick); 00352 equeue_sema_signal(&q->eventsema); 00353 return id; 00354 } 00355 00356 void equeue_cancel(equeue_t *q, int id) { 00357 if (!id) { 00358 return; 00359 } 00360 00361 struct equeue_event *e = equeue_unqueue(q, id); 00362 if (e) { 00363 equeue_dealloc(q, e + 1); 00364 } 00365 } 00366 00367 void equeue_break(equeue_t *q) { 00368 equeue_mutex_lock(&q->queuelock); 00369 q->break_requested = true; 00370 equeue_mutex_unlock(&q->queuelock); 00371 equeue_sema_signal(&q->eventsema); 00372 } 00373 00374 void equeue_dispatch(equeue_t *q, int ms) { 00375 unsigned tick = equeue_tick(); 00376 unsigned timeout = tick + ms; 00377 q->background.active = false; 00378 00379 while (1) { 00380 // collect all the available events and next deadline 00381 struct equeue_event *es = equeue_dequeue(q, tick); 00382 00383 // dispatch events 00384 while (es) { 00385 struct equeue_event *e = es; 00386 es = e->next; 00387 00388 // actually dispatch the callbacks 00389 void (*cb)(void *) = e->cb; 00390 if (cb) { 00391 cb(e + 1); 00392 } 00393 00394 // reenqueue periodic events or deallocate 00395 if (e->period >= 0) { 00396 e->target += e->period; 00397 equeue_enqueue(q, e, equeue_tick()); 00398 } else { 00399 equeue_incid(q, e); 00400 equeue_dealloc(q, e+1); 00401 } 00402 } 00403 00404 int deadline = -1; 00405 tick = equeue_tick(); 00406 00407 // check if we should stop dispatching soon 00408 if (ms >= 0) { 00409 deadline = equeue_tickdiff(timeout, tick); 00410 if (deadline <= 0) { 00411 // update background timer if necessary 00412 if (q->background.update) { 00413 equeue_mutex_lock(&q->queuelock); 00414 if (q->background.update && q->queue) { 00415 q->background.update(q->background.timer, 00416 equeue_clampdiff(q->queue->target, tick)); 00417 } 00418 q->background.active = true; 00419 equeue_mutex_unlock(&q->queuelock); 00420 } 00421 q->break_requested = false; 00422 return; 00423 } 00424 } 00425 00426 // find closest deadline 00427 equeue_mutex_lock(&q->queuelock); 00428 if (q->queue) { 00429 int diff = equeue_clampdiff(q->queue->target, tick); 00430 if ((unsigned)diff < (unsigned)deadline) { 00431 deadline = diff; 00432 } 00433 } 00434 equeue_mutex_unlock(&q->queuelock); 00435 00436 // wait for events 00437 equeue_sema_wait(&q->eventsema, deadline); 00438 00439 // check if we were notified to break out of dispatch 00440 if (q->break_requested) { 00441 equeue_mutex_lock(&q->queuelock); 00442 if (q->break_requested) { 00443 q->break_requested = false; 00444 equeue_mutex_unlock(&q->queuelock); 00445 return; 00446 } 00447 equeue_mutex_unlock(&q->queuelock); 00448 } 00449 00450 // update tick for next iteration 00451 tick = equeue_tick(); 00452 } 00453 } 00454 00455 00456 // event functions 00457 void equeue_event_delay(void *p, int ms) { 00458 struct equeue_event *e = (struct equeue_event*)p - 1; 00459 e->target = ms; 00460 } 00461 00462 void equeue_event_period(void *p, int ms) { 00463 struct equeue_event *e = (struct equeue_event*)p - 1; 00464 e->period = ms; 00465 } 00466 00467 void equeue_event_dtor(void *p, void (*dtor)(void *)) { 00468 struct equeue_event *e = (struct equeue_event*)p - 1; 00469 e->dtor = dtor; 00470 } 00471 00472 00473 // simple callbacks 00474 struct ecallback { 00475 void (*cb)(void*); 00476 void *data; 00477 }; 00478 00479 static void ecallback_dispatch(void *p) { 00480 struct ecallback *e = (struct ecallback*)p; 00481 e->cb(e->data); 00482 } 00483 00484 int equeue_call(equeue_t *q, void (*cb)(void*), void *data) { 00485 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00486 if (!e) { 00487 return 0; 00488 } 00489 00490 e->cb = cb; 00491 e->data = data; 00492 return equeue_post(q, ecallback_dispatch, e); 00493 } 00494 00495 int equeue_call_in(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00496 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00497 if (!e) { 00498 return 0; 00499 } 00500 00501 equeue_event_delay(e, ms); 00502 e->cb = cb; 00503 e->data = data; 00504 return equeue_post(q, ecallback_dispatch, e); 00505 } 00506 00507 int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) { 00508 struct ecallback *e = equeue_alloc(q, sizeof(struct ecallback)); 00509 if (!e) { 00510 return 0; 00511 } 00512 00513 equeue_event_delay(e, ms); 00514 equeue_event_period(e, ms); 00515 e->cb = cb; 00516 e->data = data; 00517 return equeue_post(q, ecallback_dispatch, e); 00518 } 00519 00520 00521 // backgrounding 00522 void equeue_background(equeue_t *q, 00523 void (*update)(void *timer, int ms), void *timer) { 00524 equeue_mutex_lock(&q->queuelock); 00525 if (q->background.update) { 00526 q->background.update(q->background.timer, -1); 00527 } 00528 00529 q->background.update = update; 00530 q->background.timer = timer; 00531 00532 if (q->background.update && q->queue) { 00533 q->background.update(q->background.timer, 00534 equeue_clampdiff(q->queue->target, equeue_tick())); 00535 } 00536 q->background.active = true; 00537 equeue_mutex_unlock(&q->queuelock); 00538 } 00539 00540 struct equeue_chain_context { 00541 equeue_t *q; 00542 equeue_t *target; 00543 int id; 00544 }; 00545 00546 static void equeue_chain_dispatch(void *p) { 00547 equeue_dispatch((equeue_t *)p, 0); 00548 } 00549 00550 static void equeue_chain_update(void *p, int ms) { 00551 struct equeue_chain_context *c = (struct equeue_chain_context *)p; 00552 equeue_cancel(c->target, c->id); 00553 00554 if (ms >= 0) { 00555 c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q); 00556 } else { 00557 equeue_dealloc(c->target, c); 00558 } 00559 } 00560 00561 void equeue_chain(equeue_t *q, equeue_t *target) { 00562 if (!target) { 00563 equeue_background(q, 0, 0); 00564 return; 00565 } 00566 00567 struct equeue_chain_context *c = equeue_alloc(q, 00568 sizeof(struct equeue_chain_context)); 00569 00570 c->q = q; 00571 c->target = target; 00572 c->id = 0; 00573 00574 equeue_background(q, equeue_chain_update, c); 00575 }
Generated on Tue Jul 12 2022 12:21:49 by
