job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Tue Jul 11 21:47:53 2017 +0000
Revision:
3:f08f55827736
Parent:
2:9bf5366ad5a2
Child:
4:78bcd5a675e1
scheduler can schedule jobs and remove run once jobs

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 3:f08f55827736 3 Timeout WakeOnce;
sgnezdov 3:f08f55827736 4
sgnezdov 0:806403f3d0d1 5 void update(void *target) {
sgnezdov 0:806403f3d0d1 6 };
sgnezdov 0:806403f3d0d1 7
sgnezdov 0:806403f3d0d1 8 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 9
sgnezdov 3:f08f55827736 10 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 11 const ActionType JobRunAT(3);
sgnezdov 3:f08f55827736 12
sgnezdov 2:9bf5366ad5a2 13 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 14 {
sgnezdov 2:9bf5366ad5a2 15 bool rv = a1->GetTime() <= a2->GetTime();
sgnezdov 2:9bf5366ad5a2 16 //printf("(%d %d:%d)", *d1, *d2, rv);
sgnezdov 2:9bf5366ad5a2 17 return rv;
sgnezdov 2:9bf5366ad5a2 18 };
sgnezdov 3:f08f55827736 19
sgnezdov 2:9bf5366ad5a2 20 /**
sgnezdov 2:9bf5366ad5a2 21 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 22 */
sgnezdov 1:ec6a1d054065 23 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 24 Appointment *apt;
sgnezdov 1:ec6a1d054065 25 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 26 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 27 };
sgnezdov 0:806403f3d0d1 28
sgnezdov 3:f08f55827736 29 struct JobRunReq: Action {
sgnezdov 3:f08f55827736 30 JobRunReq() : Action(JobRunAT) {}
sgnezdov 3:f08f55827736 31 };
sgnezdov 3:f08f55827736 32
sgnezdov 0:806403f3d0d1 33 Scheduler::Scheduler(JobService *jobService)
sgnezdov 2:9bf5366ad5a2 34 : _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 35
sgnezdov 0:806403f3d0d1 36 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 37 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 38 self->updateHandler();
sgnezdov 0:806403f3d0d1 39 }
sgnezdov 0:806403f3d0d1 40
sgnezdov 0:806403f3d0d1 41 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 42 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 0:806403f3d0d1 43 }
sgnezdov 0:806403f3d0d1 44
sgnezdov 0:806403f3d0d1 45 void Scheduler::Stop() {
sgnezdov 0:806403f3d0d1 46 // it is not thread-safe, but impact is non-existent.
sgnezdov 0:806403f3d0d1 47 _quit = true;
sgnezdov 0:806403f3d0d1 48 }
sgnezdov 0:806403f3d0d1 49
sgnezdov 0:806403f3d0d1 50 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 51 _updater.join();
sgnezdov 0:806403f3d0d1 52 }
sgnezdov 0:806403f3d0d1 53
sgnezdov 2:9bf5366ad5a2 54 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 55 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 56 if (NULL == apt) {
sgnezdov 2:9bf5366ad5a2 57 printf("[Scheduler::JobAdd] failed to allocate appointment\n");
sgnezdov 2:9bf5366ad5a2 58 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 59 }
sgnezdov 2:9bf5366ad5a2 60 JobAddReq req(apt);
sgnezdov 1:ec6a1d054065 61 _actions.put(&req);
sgnezdov 0:806403f3d0d1 62 // default is wait forever
sgnezdov 1:ec6a1d054065 63 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 64 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 65 if (evt.value.p != NULL) {
sgnezdov 2:9bf5366ad5a2 66 printf("[Scheduler::JobAdd] completed ok\n");
sgnezdov 2:9bf5366ad5a2 67 } else {
sgnezdov 2:9bf5366ad5a2 68 printf("[Scheduler::JobAdd] NOT added (C1)\n");
sgnezdov 2:9bf5366ad5a2 69 }
sgnezdov 2:9bf5366ad5a2 70 } else {
sgnezdov 2:9bf5366ad5a2 71 // not sure what condition is
sgnezdov 2:9bf5366ad5a2 72 printf("[Scheduler::JobAdd] NOT added (C2)\n");
sgnezdov 2:9bf5366ad5a2 73 delete apt;
sgnezdov 2:9bf5366ad5a2 74 apt = NULL;
sgnezdov 0:806403f3d0d1 75 }
sgnezdov 2:9bf5366ad5a2 76 // yes, return a copy of the structure
sgnezdov 1:ec6a1d054065 77 return req.response;
sgnezdov 0:806403f3d0d1 78 }
sgnezdov 0:806403f3d0d1 79
sgnezdov 2:9bf5366ad5a2 80 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 0:806403f3d0d1 81 }
sgnezdov 3:f08f55827736 82
sgnezdov 3:f08f55827736 83 static JobRunReq jobRunReq;
sgnezdov 3:f08f55827736 84 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 85 {
sgnezdov 3:f08f55827736 86 _actions.put(&jobRunReq);
sgnezdov 3:f08f55827736 87 }
sgnezdov 0:806403f3d0d1 88
sgnezdov 0:806403f3d0d1 89 void Scheduler::updateHandler() {
sgnezdov 0:806403f3d0d1 90 while (!_quit) {
sgnezdov 0:806403f3d0d1 91 printf("[Scheduler::updateHandler] waiting for action\n");
sgnezdov 0:806403f3d0d1 92 // wait forever ...
sgnezdov 0:806403f3d0d1 93 osEvent evt = _actions.get();
sgnezdov 0:806403f3d0d1 94 if (evt.status == osEventMessage) {
sgnezdov 0:806403f3d0d1 95 printf("[Scheduler::updateHandler] process action\n");
sgnezdov 0:806403f3d0d1 96 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 97 } else {
sgnezdov 0:806403f3d0d1 98 printf("[Scheduler::updateHandler] NOT osEventMessage\n");
sgnezdov 0:806403f3d0d1 99 }
sgnezdov 0:806403f3d0d1 100 wait(2);
sgnezdov 0:806403f3d0d1 101 }
sgnezdov 0:806403f3d0d1 102 }
sgnezdov 3:f08f55827736 103
sgnezdov 0:806403f3d0d1 104 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 105 {
sgnezdov 3:f08f55827736 106 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 107 switch(action->type) {
sgnezdov 1:ec6a1d054065 108 case JobAddAT: {
sgnezdov 1:ec6a1d054065 109 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 110 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 111 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 112 // assign job its ID
sgnezdov 2:9bf5366ad5a2 113 job->Init(_nextJobID++);
sgnezdov 2:9bf5366ad5a2 114 }
sgnezdov 3:f08f55827736 115 // set next appointment time
sgnezdov 3:f08f55827736 116 req->apt->SetTime(job->GetSchedule()->NextRunTime(now));
sgnezdov 2:9bf5366ad5a2 117 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 118 if (NULL == tmp) {
sgnezdov 2:9bf5366ad5a2 119 printf("[Scheduler::process] timeline insert failed\n");
sgnezdov 2:9bf5366ad5a2 120 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 121 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 122 return;
sgnezdov 2:9bf5366ad5a2 123 }
sgnezdov 3:f08f55827736 124 req->response.data = job->GetID();
sgnezdov 2:9bf5366ad5a2 125 //printf("[Scheduler::process] simulate error\n");
sgnezdov 2:9bf5366ad5a2 126 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 127 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 128 break;
sgnezdov 1:ec6a1d054065 129 }
sgnezdov 3:f08f55827736 130 case JobRunAT: {
sgnezdov 3:f08f55827736 131 // execute job run logic after switch
sgnezdov 3:f08f55827736 132 break;
sgnezdov 3:f08f55827736 133 }
sgnezdov 1:ec6a1d054065 134 default:
sgnezdov 1:ec6a1d054065 135 printf("[Scheduler::process] unknown action type\n");
sgnezdov 1:ec6a1d054065 136 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 137 }
sgnezdov 3:f08f55827736 138 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 3:f08f55827736 139 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 140 Job* wakeJob = wakeApt->GetJob();
sgnezdov 3:f08f55827736 141 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 3:f08f55827736 142 if (sleepTime > 0) {
sgnezdov 3:f08f55827736 143 // request wake up
sgnezdov 3:f08f55827736 144 printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 145 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 146 } else {
sgnezdov 3:f08f55827736 147 // process job
sgnezdov 3:f08f55827736 148 printf("[Scheduler::process] running job ID %d\n", wakeJob->GetID());
sgnezdov 3:f08f55827736 149 _timeline.remove(1);
sgnezdov 3:f08f55827736 150 }
sgnezdov 3:f08f55827736 151
sgnezdov 0:806403f3d0d1 152 }
sgnezdov 0:806403f3d0d1 153
sgnezdov 0:806403f3d0d1 154 }