job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Thu Aug 03 22:44:41 2017 +0000
Revision:
18:8be206ad1eb4
Parent:
17:3b565ccd291b
Child:
19:965c8721cc8a
demo stack underflow issue

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 9:ee21cd055a97 3 #include "mbed-trace/mbed_trace.h"
sgnezdov 9:ee21cd055a97 4 #define TRACE_GROUP "schd"
sgnezdov 9:ee21cd055a97 5
sgnezdov 3:f08f55827736 6 Timeout WakeOnce;
sgnezdov 3:f08f55827736 7
sgnezdov 0:806403f3d0d1 8 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 9
sgnezdov 3:f08f55827736 10 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 11 const ActionType JobRunAT(3);
sgnezdov 7:98c8b2eabea3 12 const ActionType JobQuitAT(4);
sgnezdov 15:6b8fa5dff770 13 const ActionType JobListAT(5);
sgnezdov 3:f08f55827736 14
sgnezdov 2:9bf5366ad5a2 15 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 16 {
sgnezdov 12:684ddfc57199 17 time_t t1 = a1->GetTime();
sgnezdov 12:684ddfc57199 18 time_t t2 = a2->GetTime();
sgnezdov 16:f61b62b119dd 19 bool rv = t1 >= t2;
sgnezdov 16:f61b62b119dd 20 tr_debug("apt %d:%d >= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv);
sgnezdov 2:9bf5366ad5a2 21 return rv;
sgnezdov 2:9bf5366ad5a2 22 };
sgnezdov 3:f08f55827736 23
sgnezdov 2:9bf5366ad5a2 24 /**
sgnezdov 2:9bf5366ad5a2 25 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 26 */
sgnezdov 1:ec6a1d054065 27 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 28 Appointment *apt;
sgnezdov 1:ec6a1d054065 29 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 30 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 31 };
sgnezdov 15:6b8fa5dff770 32
sgnezdov 15:6b8fa5dff770 33 struct JobListReq: Action {
sgnezdov 17:3b565ccd291b 34 LinkedList<Appointment>& _jobsToFill;
sgnezdov 17:3b565ccd291b 35 JobListReq(LinkedList<Appointment>& jobs) : Action(JobListAT), _jobsToFill(jobs) {}
sgnezdov 15:6b8fa5dff770 36 };
sgnezdov 7:98c8b2eabea3 37
sgnezdov 0:806403f3d0d1 38 Scheduler::Scheduler(JobService *jobService)
sgnezdov 7:98c8b2eabea3 39 : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 40
sgnezdov 0:806403f3d0d1 41 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 42 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 43 self->updateHandler();
sgnezdov 0:806403f3d0d1 44 }
sgnezdov 4:78bcd5a675e1 45
sgnezdov 4:78bcd5a675e1 46 void Scheduler::runAdapter(void *thisPointer) {
sgnezdov 4:78bcd5a675e1 47 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 4:78bcd5a675e1 48 self->runHandler();
sgnezdov 4:78bcd5a675e1 49 }
sgnezdov 0:806403f3d0d1 50
sgnezdov 0:806403f3d0d1 51 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 52 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 4:78bcd5a675e1 53 _runner.start(callback(Scheduler::runAdapter, this));
sgnezdov 0:806403f3d0d1 54 }
sgnezdov 0:806403f3d0d1 55
sgnezdov 0:806403f3d0d1 56 void Scheduler::Stop() {
sgnezdov 7:98c8b2eabea3 57 _runs.put(NULL);
sgnezdov 7:98c8b2eabea3 58
sgnezdov 7:98c8b2eabea3 59 Action uReq = Action(JobQuitAT);
sgnezdov 7:98c8b2eabea3 60 _updates.put(&uReq);
sgnezdov 7:98c8b2eabea3 61 uReq.resQueue.get();
sgnezdov 0:806403f3d0d1 62 }
sgnezdov 7:98c8b2eabea3 63
sgnezdov 0:806403f3d0d1 64 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 65 _updater.join();
sgnezdov 4:78bcd5a675e1 66 _runner.join();
sgnezdov 0:806403f3d0d1 67 }
sgnezdov 0:806403f3d0d1 68
sgnezdov 2:9bf5366ad5a2 69 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 70 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 71 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 72 tr_error("[JobAdd] failed to allocate appointment");
sgnezdov 2:9bf5366ad5a2 73 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 74 }
sgnezdov 5:d8f69ac330f2 75 return this->reschedule(apt);
sgnezdov 5:d8f69ac330f2 76 }
sgnezdov 5:d8f69ac330f2 77
sgnezdov 5:d8f69ac330f2 78 Response<JobID> Scheduler::reschedule(Appointment *apt) {
sgnezdov 2:9bf5366ad5a2 79 JobAddReq req(apt);
sgnezdov 6:5baa0e4ec500 80
sgnezdov 6:5baa0e4ec500 81 // set next appointment time
sgnezdov 6:5baa0e4ec500 82 time_t now = time(NULL); // now in seconds
sgnezdov 6:5baa0e4ec500 83 apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
sgnezdov 6:5baa0e4ec500 84 if (apt->GetTime() == 0) {
sgnezdov 6:5baa0e4ec500 85 // there is no next run time; delete appointment
sgnezdov 10:8cff30b5b90d 86 tr_debug("[reschedule] NO next appointment");
sgnezdov 6:5baa0e4ec500 87 delete apt;
sgnezdov 6:5baa0e4ec500 88 req.response.error = 2;
sgnezdov 6:5baa0e4ec500 89 return req.response;
sgnezdov 6:5baa0e4ec500 90 }
sgnezdov 6:5baa0e4ec500 91
sgnezdov 10:8cff30b5b90d 92 tr_debug("[reschedule] put");
sgnezdov 4:78bcd5a675e1 93 _updates.put(&req);
sgnezdov 10:8cff30b5b90d 94 tr_debug("[reschedule] get");
sgnezdov 0:806403f3d0d1 95 // default is wait forever
sgnezdov 1:ec6a1d054065 96 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 97 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 98 if (evt.value.p != NULL) {
sgnezdov 10:8cff30b5b90d 99 tr_debug("[reschedule] completed ok");
sgnezdov 2:9bf5366ad5a2 100 } else {
sgnezdov 10:8cff30b5b90d 101 tr_error("[reschedule] NOT added (C1)");
sgnezdov 2:9bf5366ad5a2 102 }
sgnezdov 2:9bf5366ad5a2 103 } else {
sgnezdov 2:9bf5366ad5a2 104 // not sure what condition is
sgnezdov 10:8cff30b5b90d 105 tr_error("[reschedule] NOT added (C2)");
sgnezdov 2:9bf5366ad5a2 106 delete apt;
sgnezdov 2:9bf5366ad5a2 107 apt = NULL;
sgnezdov 0:806403f3d0d1 108 }
sgnezdov 2:9bf5366ad5a2 109 // yes, return a copy of the structure
sgnezdov 5:d8f69ac330f2 110 return req.response;
sgnezdov 0:806403f3d0d1 111 }
sgnezdov 0:806403f3d0d1 112
sgnezdov 2:9bf5366ad5a2 113 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 13:6be67ee77861 114 tr_error("JobRemove is not implemented");
sgnezdov 13:6be67ee77861 115 }
sgnezdov 13:6be67ee77861 116
sgnezdov 17:3b565ccd291b 117 void Scheduler::AppointmentList(LinkedList<Appointment>& apts) {
sgnezdov 17:3b565ccd291b 118 JobListReq req(apts);
sgnezdov 15:6b8fa5dff770 119 _updates.put(&req);
sgnezdov 15:6b8fa5dff770 120 osEvent evt = req.resQueue.get();
sgnezdov 15:6b8fa5dff770 121 if (evt.status != osEventMessage) {
sgnezdov 15:6b8fa5dff770 122 // not sure what condition is
sgnezdov 15:6b8fa5dff770 123 tr_error("[JobList] status error");
sgnezdov 15:6b8fa5dff770 124 }
sgnezdov 15:6b8fa5dff770 125 }
sgnezdov 3:f08f55827736 126
sgnezdov 7:98c8b2eabea3 127 static Action jobRunReq(JobRunAT);
sgnezdov 3:f08f55827736 128 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 129 {
sgnezdov 4:78bcd5a675e1 130 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 131 }
sgnezdov 0:806403f3d0d1 132
sgnezdov 0:806403f3d0d1 133 void Scheduler::updateHandler() {
sgnezdov 18:8be206ad1eb4 134 osThreadId_t tid = osThreadGetId();
sgnezdov 18:8be206ad1eb4 135 tr_debug("Scheduler monitor thread ID: 0x%X", tid);
sgnezdov 7:98c8b2eabea3 136 while (!_quitUpdater) {
sgnezdov 10:8cff30b5b90d 137 tr_debug("[updateHandler] waiting for action");
sgnezdov 0:806403f3d0d1 138 // wait forever ...
sgnezdov 4:78bcd5a675e1 139 osEvent evt = _updates.get();
sgnezdov 0:806403f3d0d1 140 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 141 tr_debug("[updateHandler] process action");
sgnezdov 0:806403f3d0d1 142 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 143 } else {
sgnezdov 10:8cff30b5b90d 144 tr_error("[updateHandler] NOT osEventMessage");
sgnezdov 0:806403f3d0d1 145 }
sgnezdov 0:806403f3d0d1 146 }
sgnezdov 0:806403f3d0d1 147 }
sgnezdov 3:f08f55827736 148
sgnezdov 0:806403f3d0d1 149 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 150 {
sgnezdov 3:f08f55827736 151 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 152 switch(action->type) {
sgnezdov 1:ec6a1d054065 153 case JobAddAT: {
sgnezdov 10:8cff30b5b90d 154 tr_debug("[process] JobAddAT");
sgnezdov 1:ec6a1d054065 155 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 156 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 157 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 158 // assign job its ID
sgnezdov 2:9bf5366ad5a2 159 job->Init(_nextJobID++);
sgnezdov 12:684ddfc57199 160 tr_debug("assigned new job its id %d", job->GetID());
sgnezdov 12:684ddfc57199 161 } else {
sgnezdov 12:684ddfc57199 162 tr_debug("job already has id %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 163 }
sgnezdov 2:9bf5366ad5a2 164 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 165 if (NULL == tmp) {
sgnezdov 12:684ddfc57199 166 tr_error("[process] timeline insert failed for job ID %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 167 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 168 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 169 return;
sgnezdov 2:9bf5366ad5a2 170 }
sgnezdov 3:f08f55827736 171 req->response.data = job->GetID();
sgnezdov 10:8cff30b5b90d 172 //tr_debug("[process] simulate error");
sgnezdov 2:9bf5366ad5a2 173 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 174 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 175 break;
sgnezdov 1:ec6a1d054065 176 }
sgnezdov 3:f08f55827736 177 case JobRunAT: {
sgnezdov 10:8cff30b5b90d 178 tr_debug("[process] JobRunAT");
sgnezdov 3:f08f55827736 179 // execute job run logic after switch
sgnezdov 3:f08f55827736 180 break;
sgnezdov 3:f08f55827736 181 }
sgnezdov 7:98c8b2eabea3 182 case JobQuitAT: {
sgnezdov 10:8cff30b5b90d 183 tr_debug("[process] JobQuitAT");
sgnezdov 7:98c8b2eabea3 184 action->resQueue.put(NULL);
sgnezdov 7:98c8b2eabea3 185 _quitUpdater = true;
sgnezdov 7:98c8b2eabea3 186 return;
sgnezdov 7:98c8b2eabea3 187 }
sgnezdov 15:6b8fa5dff770 188 case JobListAT: {
sgnezdov 15:6b8fa5dff770 189 tr_debug("[process] JobListAT");
sgnezdov 15:6b8fa5dff770 190 JobListReq *req = static_cast<JobListReq*>(action);
sgnezdov 15:6b8fa5dff770 191 int loc = 1;
sgnezdov 15:6b8fa5dff770 192 node<Appointment>* n = _timeline.pop(loc);
sgnezdov 15:6b8fa5dff770 193 while (n != NULL) {
sgnezdov 17:3b565ccd291b 194 Appointment* apt = n->data;
sgnezdov 17:3b565ccd291b 195 tr_debug("[process] adding appointment with job ID %d to list", apt->GetJob()->GetID());
sgnezdov 17:3b565ccd291b 196 req->_jobsToFill.append(apt);
sgnezdov 15:6b8fa5dff770 197 ++loc;
sgnezdov 15:6b8fa5dff770 198 n = _timeline.pop(loc);
sgnezdov 15:6b8fa5dff770 199 }
sgnezdov 15:6b8fa5dff770 200 action->resQueue.put(NULL);
sgnezdov 15:6b8fa5dff770 201 return;
sgnezdov 15:6b8fa5dff770 202 }
sgnezdov 1:ec6a1d054065 203 default:
sgnezdov 10:8cff30b5b90d 204 tr_warn("[process] unknown action type");
sgnezdov 1:ec6a1d054065 205 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 206 }
sgnezdov 3:f08f55827736 207 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 12:684ddfc57199 208 if (wakeNode == NULL) {
sgnezdov 12:684ddfc57199 209 tr_debug("[process] found no nodes to run");
sgnezdov 12:684ddfc57199 210 return;
sgnezdov 12:684ddfc57199 211 }
sgnezdov 3:f08f55827736 212 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 213 Job* wakeJob = wakeApt->GetJob();
sgnezdov 5:d8f69ac330f2 214 if (now < wakeApt->GetTime()) {
sgnezdov 3:f08f55827736 215 // request wake up
sgnezdov 5:d8f69ac330f2 216 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 10:8cff30b5b90d 217 tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 218 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 219 } else {
sgnezdov 3:f08f55827736 220 // process job
sgnezdov 10:8cff30b5b90d 221 tr_debug("[process] job ID %d ready to run", wakeJob->GetID());
sgnezdov 3:f08f55827736 222 _timeline.remove(1);
sgnezdov 4:78bcd5a675e1 223 _runs.put(wakeApt);
sgnezdov 12:684ddfc57199 224 // make sure we run this function one more time
sgnezdov 12:684ddfc57199 225 // in case there are jobs left to run
sgnezdov 12:684ddfc57199 226 //
sgnezdov 12:684ddfc57199 227 // don't use WakeOnce, because it appears to work only for
sgnezdov 12:684ddfc57199 228 // one wake up request at a time.
sgnezdov 12:684ddfc57199 229 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 230 }
sgnezdov 4:78bcd5a675e1 231 }
sgnezdov 4:78bcd5a675e1 232
sgnezdov 4:78bcd5a675e1 233 void Scheduler::runHandler() {
sgnezdov 18:8be206ad1eb4 234 osThreadId_t tid = osThreadGetId();
sgnezdov 18:8be206ad1eb4 235 tr_debug("Scheduler runner thread ID: 0x%X", tid);
sgnezdov 7:98c8b2eabea3 236 while (!_quitRunner) {
sgnezdov 10:8cff30b5b90d 237 tr_debug("[runHandler] waiting for action");
sgnezdov 4:78bcd5a675e1 238 // wait forever ...
sgnezdov 4:78bcd5a675e1 239 osEvent evt = _runs.get();
sgnezdov 4:78bcd5a675e1 240 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 241 tr_debug("[runHandler] run action");
sgnezdov 4:78bcd5a675e1 242 Appointment *apt = (Appointment*)evt.value.p;
sgnezdov 7:98c8b2eabea3 243 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 244 tr_debug("[runHandler] quit requested");
sgnezdov 7:98c8b2eabea3 245 _quitRunner = true;
sgnezdov 7:98c8b2eabea3 246 break;
sgnezdov 7:98c8b2eabea3 247 }
sgnezdov 5:d8f69ac330f2 248 Job *job = apt->GetJob();
sgnezdov 10:8cff30b5b90d 249 JobType *jt = _jobService->GetJob(job->GetTypeID());
sgnezdov 10:8cff30b5b90d 250 if (jt == NULL) {
sgnezdov 10:8cff30b5b90d 251 tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID());
sgnezdov 5:d8f69ac330f2 252 // NO reschedule
sgnezdov 5:d8f69ac330f2 253 delete apt;
sgnezdov 5:d8f69ac330f2 254 continue;
sgnezdov 5:d8f69ac330f2 255 }
sgnezdov 10:8cff30b5b90d 256 tr_debug("Job Started");
sgnezdov 10:8cff30b5b90d 257 jt->RunJob();
sgnezdov 10:8cff30b5b90d 258 tr_debug("Job Finished");
sgnezdov 5:d8f69ac330f2 259 this->reschedule(apt);
sgnezdov 4:78bcd5a675e1 260 } else {
sgnezdov 10:8cff30b5b90d 261 tr_error("[runHandler] NOT osEventMessage");
sgnezdov 4:78bcd5a675e1 262 }
sgnezdov 4:78bcd5a675e1 263 }
sgnezdov 4:78bcd5a675e1 264 }
sgnezdov 3:f08f55827736 265
sgnezdov 0:806403f3d0d1 266
sgnezdov 0:806403f3d0d1 267 }