job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Thu Jul 13 23:07:09 2017 +0000
Revision:
10:8cff30b5b90d
Parent:
9:ee21cd055a97
Child:
12:684ddfc57199
work in progress

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 9:ee21cd055a97 3 #include "mbed-trace/mbed_trace.h"
sgnezdov 9:ee21cd055a97 4 #define TRACE_GROUP "schd"
sgnezdov 9:ee21cd055a97 5
sgnezdov 3:f08f55827736 6 Timeout WakeOnce;
sgnezdov 3:f08f55827736 7
sgnezdov 0:806403f3d0d1 8 void update(void *target) {
sgnezdov 0:806403f3d0d1 9 };
sgnezdov 0:806403f3d0d1 10
sgnezdov 0:806403f3d0d1 11 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 12
sgnezdov 3:f08f55827736 13 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 14 const ActionType JobRunAT(3);
sgnezdov 7:98c8b2eabea3 15 const ActionType JobQuitAT(4);
sgnezdov 3:f08f55827736 16
sgnezdov 2:9bf5366ad5a2 17 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 18 {
sgnezdov 2:9bf5366ad5a2 19 bool rv = a1->GetTime() <= a2->GetTime();
sgnezdov 9:ee21cd055a97 20 //tr_debug("(%d %d:%d)", *d1, *d2, rv);
sgnezdov 2:9bf5366ad5a2 21 return rv;
sgnezdov 2:9bf5366ad5a2 22 };
sgnezdov 3:f08f55827736 23
sgnezdov 2:9bf5366ad5a2 24 /**
sgnezdov 2:9bf5366ad5a2 25 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 26 */
sgnezdov 1:ec6a1d054065 27 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 28 Appointment *apt;
sgnezdov 1:ec6a1d054065 29 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 30 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 31 };
sgnezdov 7:98c8b2eabea3 32
sgnezdov 0:806403f3d0d1 33 Scheduler::Scheduler(JobService *jobService)
sgnezdov 7:98c8b2eabea3 34 : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 35
sgnezdov 0:806403f3d0d1 36 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 37 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 38 self->updateHandler();
sgnezdov 0:806403f3d0d1 39 }
sgnezdov 4:78bcd5a675e1 40
sgnezdov 4:78bcd5a675e1 41 void Scheduler::runAdapter(void *thisPointer) {
sgnezdov 4:78bcd5a675e1 42 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 4:78bcd5a675e1 43 self->runHandler();
sgnezdov 4:78bcd5a675e1 44 }
sgnezdov 0:806403f3d0d1 45
sgnezdov 0:806403f3d0d1 46 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 47 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 4:78bcd5a675e1 48 _runner.start(callback(Scheduler::runAdapter, this));
sgnezdov 0:806403f3d0d1 49 }
sgnezdov 0:806403f3d0d1 50
sgnezdov 0:806403f3d0d1 51 void Scheduler::Stop() {
sgnezdov 7:98c8b2eabea3 52 _runs.put(NULL);
sgnezdov 7:98c8b2eabea3 53
sgnezdov 7:98c8b2eabea3 54 Action uReq = Action(JobQuitAT);
sgnezdov 7:98c8b2eabea3 55 _updates.put(&uReq);
sgnezdov 7:98c8b2eabea3 56 uReq.resQueue.get();
sgnezdov 0:806403f3d0d1 57 }
sgnezdov 7:98c8b2eabea3 58
sgnezdov 0:806403f3d0d1 59 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 60 _updater.join();
sgnezdov 4:78bcd5a675e1 61 _runner.join();
sgnezdov 0:806403f3d0d1 62 }
sgnezdov 0:806403f3d0d1 63
sgnezdov 2:9bf5366ad5a2 64 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 65 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 66 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 67 tr_error("[JobAdd] failed to allocate appointment");
sgnezdov 2:9bf5366ad5a2 68 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 69 }
sgnezdov 5:d8f69ac330f2 70 return this->reschedule(apt);
sgnezdov 5:d8f69ac330f2 71 }
sgnezdov 5:d8f69ac330f2 72
sgnezdov 5:d8f69ac330f2 73 Response<JobID> Scheduler::reschedule(Appointment *apt) {
sgnezdov 2:9bf5366ad5a2 74 JobAddReq req(apt);
sgnezdov 6:5baa0e4ec500 75
sgnezdov 6:5baa0e4ec500 76 // set next appointment time
sgnezdov 6:5baa0e4ec500 77 time_t now = time(NULL); // now in seconds
sgnezdov 6:5baa0e4ec500 78 apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
sgnezdov 6:5baa0e4ec500 79 if (apt->GetTime() == 0) {
sgnezdov 6:5baa0e4ec500 80 // there is no next run time; delete appointment
sgnezdov 10:8cff30b5b90d 81 tr_debug("[reschedule] NO next appointment");
sgnezdov 6:5baa0e4ec500 82 delete apt;
sgnezdov 6:5baa0e4ec500 83 req.response.error = 2;
sgnezdov 6:5baa0e4ec500 84 return req.response;
sgnezdov 6:5baa0e4ec500 85 }
sgnezdov 6:5baa0e4ec500 86
sgnezdov 10:8cff30b5b90d 87 tr_debug("[reschedule] put");
sgnezdov 4:78bcd5a675e1 88 _updates.put(&req);
sgnezdov 10:8cff30b5b90d 89 tr_debug("[reschedule] get");
sgnezdov 0:806403f3d0d1 90 // default is wait forever
sgnezdov 1:ec6a1d054065 91 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 92 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 93 if (evt.value.p != NULL) {
sgnezdov 10:8cff30b5b90d 94 tr_debug("[reschedule] completed ok");
sgnezdov 2:9bf5366ad5a2 95 } else {
sgnezdov 10:8cff30b5b90d 96 tr_error("[reschedule] NOT added (C1)");
sgnezdov 2:9bf5366ad5a2 97 }
sgnezdov 2:9bf5366ad5a2 98 } else {
sgnezdov 2:9bf5366ad5a2 99 // not sure what condition is
sgnezdov 10:8cff30b5b90d 100 tr_error("[reschedule] NOT added (C2)");
sgnezdov 2:9bf5366ad5a2 101 delete apt;
sgnezdov 2:9bf5366ad5a2 102 apt = NULL;
sgnezdov 0:806403f3d0d1 103 }
sgnezdov 2:9bf5366ad5a2 104 // yes, return a copy of the structure
sgnezdov 5:d8f69ac330f2 105 return req.response;
sgnezdov 0:806403f3d0d1 106 }
sgnezdov 0:806403f3d0d1 107
sgnezdov 2:9bf5366ad5a2 108 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 0:806403f3d0d1 109 }
sgnezdov 3:f08f55827736 110
sgnezdov 7:98c8b2eabea3 111 static Action jobRunReq(JobRunAT);
sgnezdov 3:f08f55827736 112 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 113 {
sgnezdov 4:78bcd5a675e1 114 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 115 }
sgnezdov 0:806403f3d0d1 116
sgnezdov 0:806403f3d0d1 117 void Scheduler::updateHandler() {
sgnezdov 7:98c8b2eabea3 118 while (!_quitUpdater) {
sgnezdov 10:8cff30b5b90d 119 tr_debug("[updateHandler] waiting for action");
sgnezdov 0:806403f3d0d1 120 // wait forever ...
sgnezdov 4:78bcd5a675e1 121 osEvent evt = _updates.get();
sgnezdov 0:806403f3d0d1 122 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 123 tr_debug("[updateHandler] process action");
sgnezdov 0:806403f3d0d1 124 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 125 } else {
sgnezdov 10:8cff30b5b90d 126 tr_error("[updateHandler] NOT osEventMessage");
sgnezdov 0:806403f3d0d1 127 }
sgnezdov 0:806403f3d0d1 128 }
sgnezdov 0:806403f3d0d1 129 }
sgnezdov 3:f08f55827736 130
sgnezdov 0:806403f3d0d1 131 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 132 {
sgnezdov 3:f08f55827736 133 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 134 switch(action->type) {
sgnezdov 1:ec6a1d054065 135 case JobAddAT: {
sgnezdov 10:8cff30b5b90d 136 tr_debug("[process] JobAddAT");
sgnezdov 1:ec6a1d054065 137 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 138 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 139 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 140 // assign job its ID
sgnezdov 2:9bf5366ad5a2 141 job->Init(_nextJobID++);
sgnezdov 2:9bf5366ad5a2 142 }
sgnezdov 2:9bf5366ad5a2 143 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 144 if (NULL == tmp) {
sgnezdov 10:8cff30b5b90d 145 tr_error("[process] timeline insert failed");
sgnezdov 2:9bf5366ad5a2 146 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 147 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 148 return;
sgnezdov 2:9bf5366ad5a2 149 }
sgnezdov 3:f08f55827736 150 req->response.data = job->GetID();
sgnezdov 10:8cff30b5b90d 151 //tr_debug("[process] simulate error");
sgnezdov 2:9bf5366ad5a2 152 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 153 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 154 break;
sgnezdov 1:ec6a1d054065 155 }
sgnezdov 3:f08f55827736 156 case JobRunAT: {
sgnezdov 10:8cff30b5b90d 157 tr_debug("[process] JobRunAT");
sgnezdov 3:f08f55827736 158 // execute job run logic after switch
sgnezdov 3:f08f55827736 159 break;
sgnezdov 3:f08f55827736 160 }
sgnezdov 7:98c8b2eabea3 161 case JobQuitAT: {
sgnezdov 10:8cff30b5b90d 162 tr_debug("[process] JobQuitAT");
sgnezdov 7:98c8b2eabea3 163 action->resQueue.put(NULL);
sgnezdov 7:98c8b2eabea3 164 _quitUpdater = true;
sgnezdov 7:98c8b2eabea3 165 return;
sgnezdov 7:98c8b2eabea3 166 }
sgnezdov 1:ec6a1d054065 167 default:
sgnezdov 10:8cff30b5b90d 168 tr_warn("[process] unknown action type");
sgnezdov 1:ec6a1d054065 169 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 170 }
sgnezdov 3:f08f55827736 171 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 3:f08f55827736 172 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 173 Job* wakeJob = wakeApt->GetJob();
sgnezdov 5:d8f69ac330f2 174 if (now < wakeApt->GetTime()) {
sgnezdov 3:f08f55827736 175 // request wake up
sgnezdov 5:d8f69ac330f2 176 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 10:8cff30b5b90d 177 tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 178 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 179 } else {
sgnezdov 3:f08f55827736 180 // process job
sgnezdov 10:8cff30b5b90d 181 tr_debug("[process] job ID %d ready to run", wakeJob->GetID());
sgnezdov 3:f08f55827736 182 _timeline.remove(1);
sgnezdov 4:78bcd5a675e1 183 _runs.put(wakeApt);
sgnezdov 3:f08f55827736 184 }
sgnezdov 4:78bcd5a675e1 185 }
sgnezdov 4:78bcd5a675e1 186
sgnezdov 4:78bcd5a675e1 187 void Scheduler::runHandler() {
sgnezdov 7:98c8b2eabea3 188 while (!_quitRunner) {
sgnezdov 10:8cff30b5b90d 189 tr_debug("[runHandler] waiting for action");
sgnezdov 4:78bcd5a675e1 190 // wait forever ...
sgnezdov 4:78bcd5a675e1 191 osEvent evt = _runs.get();
sgnezdov 4:78bcd5a675e1 192 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 193 tr_debug("[runHandler] run action");
sgnezdov 4:78bcd5a675e1 194 Appointment *apt = (Appointment*)evt.value.p;
sgnezdov 7:98c8b2eabea3 195 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 196 tr_debug("[runHandler] quit requested");
sgnezdov 7:98c8b2eabea3 197 _quitRunner = true;
sgnezdov 7:98c8b2eabea3 198 break;
sgnezdov 7:98c8b2eabea3 199 }
sgnezdov 5:d8f69ac330f2 200 Job *job = apt->GetJob();
sgnezdov 10:8cff30b5b90d 201 JobType *jt = _jobService->GetJob(job->GetTypeID());
sgnezdov 10:8cff30b5b90d 202 if (jt == NULL) {
sgnezdov 10:8cff30b5b90d 203 tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID());
sgnezdov 5:d8f69ac330f2 204 // NO reschedule
sgnezdov 5:d8f69ac330f2 205 delete apt;
sgnezdov 5:d8f69ac330f2 206 continue;
sgnezdov 5:d8f69ac330f2 207 }
sgnezdov 10:8cff30b5b90d 208 tr_debug("Job Started");
sgnezdov 10:8cff30b5b90d 209 jt->RunJob();
sgnezdov 10:8cff30b5b90d 210 tr_debug("Job Finished");
sgnezdov 5:d8f69ac330f2 211 this->reschedule(apt);
sgnezdov 4:78bcd5a675e1 212 } else {
sgnezdov 10:8cff30b5b90d 213 tr_error("[runHandler] NOT osEventMessage");
sgnezdov 4:78bcd5a675e1 214 }
sgnezdov 4:78bcd5a675e1 215 }
sgnezdov 4:78bcd5a675e1 216 }
sgnezdov 3:f08f55827736 217
sgnezdov 0:806403f3d0d1 218
sgnezdov 0:806403f3d0d1 219 }