job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Fri Aug 04 00:20:06 2017 +0000
Revision:
19:965c8721cc8a
Parent:
18:8be206ad1eb4
disabled some trace of core modules, because it is no longer necessary

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 19:965c8721cc8a 3 // silence scheduler tracing, must be before mbed_trace.h
sgnezdov 19:965c8721cc8a 4 #define MBED_TRACE_MAX_LEVEL 0
sgnezdov 9:ee21cd055a97 5 #include "mbed-trace/mbed_trace.h"
sgnezdov 9:ee21cd055a97 6 #define TRACE_GROUP "schd"
sgnezdov 9:ee21cd055a97 7
sgnezdov 3:f08f55827736 8 Timeout WakeOnce;
sgnezdov 3:f08f55827736 9
sgnezdov 0:806403f3d0d1 10 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 11
sgnezdov 3:f08f55827736 12 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 13 const ActionType JobRunAT(3);
sgnezdov 7:98c8b2eabea3 14 const ActionType JobQuitAT(4);
sgnezdov 15:6b8fa5dff770 15 const ActionType JobListAT(5);
sgnezdov 3:f08f55827736 16
sgnezdov 2:9bf5366ad5a2 17 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 18 {
sgnezdov 12:684ddfc57199 19 time_t t1 = a1->GetTime();
sgnezdov 12:684ddfc57199 20 time_t t2 = a2->GetTime();
sgnezdov 16:f61b62b119dd 21 bool rv = t1 >= t2;
sgnezdov 16:f61b62b119dd 22 tr_debug("apt %d:%d >= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv);
sgnezdov 2:9bf5366ad5a2 23 return rv;
sgnezdov 2:9bf5366ad5a2 24 };
sgnezdov 3:f08f55827736 25
sgnezdov 2:9bf5366ad5a2 26 /**
sgnezdov 2:9bf5366ad5a2 27 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 28 */
sgnezdov 1:ec6a1d054065 29 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 30 Appointment *apt;
sgnezdov 1:ec6a1d054065 31 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 32 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 33 };
sgnezdov 15:6b8fa5dff770 34
sgnezdov 15:6b8fa5dff770 35 struct JobListReq: Action {
sgnezdov 17:3b565ccd291b 36 LinkedList<Appointment>& _jobsToFill;
sgnezdov 17:3b565ccd291b 37 JobListReq(LinkedList<Appointment>& jobs) : Action(JobListAT), _jobsToFill(jobs) {}
sgnezdov 15:6b8fa5dff770 38 };
sgnezdov 7:98c8b2eabea3 39
sgnezdov 0:806403f3d0d1 40 Scheduler::Scheduler(JobService *jobService)
sgnezdov 7:98c8b2eabea3 41 : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 42
sgnezdov 0:806403f3d0d1 43 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 44 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 45 self->updateHandler();
sgnezdov 0:806403f3d0d1 46 }
sgnezdov 4:78bcd5a675e1 47
sgnezdov 4:78bcd5a675e1 48 void Scheduler::runAdapter(void *thisPointer) {
sgnezdov 4:78bcd5a675e1 49 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 4:78bcd5a675e1 50 self->runHandler();
sgnezdov 4:78bcd5a675e1 51 }
sgnezdov 0:806403f3d0d1 52
sgnezdov 0:806403f3d0d1 53 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 54 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 4:78bcd5a675e1 55 _runner.start(callback(Scheduler::runAdapter, this));
sgnezdov 0:806403f3d0d1 56 }
sgnezdov 0:806403f3d0d1 57
sgnezdov 0:806403f3d0d1 58 void Scheduler::Stop() {
sgnezdov 7:98c8b2eabea3 59 _runs.put(NULL);
sgnezdov 7:98c8b2eabea3 60
sgnezdov 7:98c8b2eabea3 61 Action uReq = Action(JobQuitAT);
sgnezdov 7:98c8b2eabea3 62 _updates.put(&uReq);
sgnezdov 7:98c8b2eabea3 63 uReq.resQueue.get();
sgnezdov 0:806403f3d0d1 64 }
sgnezdov 7:98c8b2eabea3 65
sgnezdov 0:806403f3d0d1 66 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 67 _updater.join();
sgnezdov 4:78bcd5a675e1 68 _runner.join();
sgnezdov 0:806403f3d0d1 69 }
sgnezdov 0:806403f3d0d1 70
sgnezdov 2:9bf5366ad5a2 71 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 72 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 73 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 74 tr_error("[JobAdd] failed to allocate appointment");
sgnezdov 2:9bf5366ad5a2 75 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 76 }
sgnezdov 5:d8f69ac330f2 77 return this->reschedule(apt);
sgnezdov 5:d8f69ac330f2 78 }
sgnezdov 5:d8f69ac330f2 79
sgnezdov 5:d8f69ac330f2 80 Response<JobID> Scheduler::reschedule(Appointment *apt) {
sgnezdov 2:9bf5366ad5a2 81 JobAddReq req(apt);
sgnezdov 6:5baa0e4ec500 82
sgnezdov 6:5baa0e4ec500 83 // set next appointment time
sgnezdov 6:5baa0e4ec500 84 time_t now = time(NULL); // now in seconds
sgnezdov 6:5baa0e4ec500 85 apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
sgnezdov 6:5baa0e4ec500 86 if (apt->GetTime() == 0) {
sgnezdov 6:5baa0e4ec500 87 // there is no next run time; delete appointment
sgnezdov 10:8cff30b5b90d 88 tr_debug("[reschedule] NO next appointment");
sgnezdov 6:5baa0e4ec500 89 delete apt;
sgnezdov 6:5baa0e4ec500 90 req.response.error = 2;
sgnezdov 6:5baa0e4ec500 91 return req.response;
sgnezdov 6:5baa0e4ec500 92 }
sgnezdov 6:5baa0e4ec500 93
sgnezdov 10:8cff30b5b90d 94 tr_debug("[reschedule] put");
sgnezdov 4:78bcd5a675e1 95 _updates.put(&req);
sgnezdov 10:8cff30b5b90d 96 tr_debug("[reschedule] get");
sgnezdov 0:806403f3d0d1 97 // default is wait forever
sgnezdov 1:ec6a1d054065 98 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 99 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 100 if (evt.value.p != NULL) {
sgnezdov 10:8cff30b5b90d 101 tr_debug("[reschedule] completed ok");
sgnezdov 2:9bf5366ad5a2 102 } else {
sgnezdov 10:8cff30b5b90d 103 tr_error("[reschedule] NOT added (C1)");
sgnezdov 2:9bf5366ad5a2 104 }
sgnezdov 2:9bf5366ad5a2 105 } else {
sgnezdov 2:9bf5366ad5a2 106 // not sure what condition is
sgnezdov 10:8cff30b5b90d 107 tr_error("[reschedule] NOT added (C2)");
sgnezdov 2:9bf5366ad5a2 108 delete apt;
sgnezdov 2:9bf5366ad5a2 109 apt = NULL;
sgnezdov 0:806403f3d0d1 110 }
sgnezdov 2:9bf5366ad5a2 111 // yes, return a copy of the structure
sgnezdov 5:d8f69ac330f2 112 return req.response;
sgnezdov 0:806403f3d0d1 113 }
sgnezdov 0:806403f3d0d1 114
sgnezdov 2:9bf5366ad5a2 115 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 13:6be67ee77861 116 tr_error("JobRemove is not implemented");
sgnezdov 13:6be67ee77861 117 }
sgnezdov 13:6be67ee77861 118
sgnezdov 17:3b565ccd291b 119 void Scheduler::AppointmentList(LinkedList<Appointment>& apts) {
sgnezdov 17:3b565ccd291b 120 JobListReq req(apts);
sgnezdov 15:6b8fa5dff770 121 _updates.put(&req);
sgnezdov 15:6b8fa5dff770 122 osEvent evt = req.resQueue.get();
sgnezdov 15:6b8fa5dff770 123 if (evt.status != osEventMessage) {
sgnezdov 15:6b8fa5dff770 124 // not sure what condition is
sgnezdov 15:6b8fa5dff770 125 tr_error("[JobList] status error");
sgnezdov 15:6b8fa5dff770 126 }
sgnezdov 15:6b8fa5dff770 127 }
sgnezdov 3:f08f55827736 128
sgnezdov 7:98c8b2eabea3 129 static Action jobRunReq(JobRunAT);
sgnezdov 3:f08f55827736 130 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 131 {
sgnezdov 4:78bcd5a675e1 132 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 133 }
sgnezdov 0:806403f3d0d1 134
sgnezdov 0:806403f3d0d1 135 void Scheduler::updateHandler() {
sgnezdov 18:8be206ad1eb4 136 osThreadId_t tid = osThreadGetId();
sgnezdov 18:8be206ad1eb4 137 tr_debug("Scheduler monitor thread ID: 0x%X", tid);
sgnezdov 7:98c8b2eabea3 138 while (!_quitUpdater) {
sgnezdov 10:8cff30b5b90d 139 tr_debug("[updateHandler] waiting for action");
sgnezdov 0:806403f3d0d1 140 // wait forever ...
sgnezdov 4:78bcd5a675e1 141 osEvent evt = _updates.get();
sgnezdov 0:806403f3d0d1 142 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 143 tr_debug("[updateHandler] process action");
sgnezdov 0:806403f3d0d1 144 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 145 } else {
sgnezdov 10:8cff30b5b90d 146 tr_error("[updateHandler] NOT osEventMessage");
sgnezdov 0:806403f3d0d1 147 }
sgnezdov 0:806403f3d0d1 148 }
sgnezdov 0:806403f3d0d1 149 }
sgnezdov 3:f08f55827736 150
sgnezdov 0:806403f3d0d1 151 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 152 {
sgnezdov 3:f08f55827736 153 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 154 switch(action->type) {
sgnezdov 1:ec6a1d054065 155 case JobAddAT: {
sgnezdov 10:8cff30b5b90d 156 tr_debug("[process] JobAddAT");
sgnezdov 1:ec6a1d054065 157 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 158 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 159 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 160 // assign job its ID
sgnezdov 2:9bf5366ad5a2 161 job->Init(_nextJobID++);
sgnezdov 12:684ddfc57199 162 tr_debug("assigned new job its id %d", job->GetID());
sgnezdov 12:684ddfc57199 163 } else {
sgnezdov 12:684ddfc57199 164 tr_debug("job already has id %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 165 }
sgnezdov 2:9bf5366ad5a2 166 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 167 if (NULL == tmp) {
sgnezdov 12:684ddfc57199 168 tr_error("[process] timeline insert failed for job ID %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 169 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 170 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 171 return;
sgnezdov 2:9bf5366ad5a2 172 }
sgnezdov 3:f08f55827736 173 req->response.data = job->GetID();
sgnezdov 10:8cff30b5b90d 174 //tr_debug("[process] simulate error");
sgnezdov 2:9bf5366ad5a2 175 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 176 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 177 break;
sgnezdov 1:ec6a1d054065 178 }
sgnezdov 3:f08f55827736 179 case JobRunAT: {
sgnezdov 10:8cff30b5b90d 180 tr_debug("[process] JobRunAT");
sgnezdov 3:f08f55827736 181 // execute job run logic after switch
sgnezdov 3:f08f55827736 182 break;
sgnezdov 3:f08f55827736 183 }
sgnezdov 7:98c8b2eabea3 184 case JobQuitAT: {
sgnezdov 10:8cff30b5b90d 185 tr_debug("[process] JobQuitAT");
sgnezdov 7:98c8b2eabea3 186 action->resQueue.put(NULL);
sgnezdov 7:98c8b2eabea3 187 _quitUpdater = true;
sgnezdov 7:98c8b2eabea3 188 return;
sgnezdov 7:98c8b2eabea3 189 }
sgnezdov 15:6b8fa5dff770 190 case JobListAT: {
sgnezdov 15:6b8fa5dff770 191 tr_debug("[process] JobListAT");
sgnezdov 15:6b8fa5dff770 192 JobListReq *req = static_cast<JobListReq*>(action);
sgnezdov 15:6b8fa5dff770 193 int loc = 1;
sgnezdov 15:6b8fa5dff770 194 node<Appointment>* n = _timeline.pop(loc);
sgnezdov 15:6b8fa5dff770 195 while (n != NULL) {
sgnezdov 17:3b565ccd291b 196 Appointment* apt = n->data;
sgnezdov 17:3b565ccd291b 197 tr_debug("[process] adding appointment with job ID %d to list", apt->GetJob()->GetID());
sgnezdov 17:3b565ccd291b 198 req->_jobsToFill.append(apt);
sgnezdov 15:6b8fa5dff770 199 ++loc;
sgnezdov 15:6b8fa5dff770 200 n = _timeline.pop(loc);
sgnezdov 15:6b8fa5dff770 201 }
sgnezdov 15:6b8fa5dff770 202 action->resQueue.put(NULL);
sgnezdov 15:6b8fa5dff770 203 return;
sgnezdov 15:6b8fa5dff770 204 }
sgnezdov 1:ec6a1d054065 205 default:
sgnezdov 10:8cff30b5b90d 206 tr_warn("[process] unknown action type");
sgnezdov 1:ec6a1d054065 207 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 208 }
sgnezdov 3:f08f55827736 209 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 12:684ddfc57199 210 if (wakeNode == NULL) {
sgnezdov 12:684ddfc57199 211 tr_debug("[process] found no nodes to run");
sgnezdov 12:684ddfc57199 212 return;
sgnezdov 12:684ddfc57199 213 }
sgnezdov 3:f08f55827736 214 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 215 Job* wakeJob = wakeApt->GetJob();
sgnezdov 5:d8f69ac330f2 216 if (now < wakeApt->GetTime()) {
sgnezdov 3:f08f55827736 217 // request wake up
sgnezdov 5:d8f69ac330f2 218 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 10:8cff30b5b90d 219 tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 220 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 221 } else {
sgnezdov 3:f08f55827736 222 // process job
sgnezdov 10:8cff30b5b90d 223 tr_debug("[process] job ID %d ready to run", wakeJob->GetID());
sgnezdov 3:f08f55827736 224 _timeline.remove(1);
sgnezdov 4:78bcd5a675e1 225 _runs.put(wakeApt);
sgnezdov 12:684ddfc57199 226 // make sure we run this function one more time
sgnezdov 12:684ddfc57199 227 // in case there are jobs left to run
sgnezdov 12:684ddfc57199 228 //
sgnezdov 12:684ddfc57199 229 // don't use WakeOnce, because it appears to work only for
sgnezdov 12:684ddfc57199 230 // one wake up request at a time.
sgnezdov 12:684ddfc57199 231 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 232 }
sgnezdov 4:78bcd5a675e1 233 }
sgnezdov 4:78bcd5a675e1 234
sgnezdov 4:78bcd5a675e1 235 void Scheduler::runHandler() {
sgnezdov 18:8be206ad1eb4 236 osThreadId_t tid = osThreadGetId();
sgnezdov 18:8be206ad1eb4 237 tr_debug("Scheduler runner thread ID: 0x%X", tid);
sgnezdov 7:98c8b2eabea3 238 while (!_quitRunner) {
sgnezdov 10:8cff30b5b90d 239 tr_debug("[runHandler] waiting for action");
sgnezdov 4:78bcd5a675e1 240 // wait forever ...
sgnezdov 4:78bcd5a675e1 241 osEvent evt = _runs.get();
sgnezdov 4:78bcd5a675e1 242 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 243 tr_debug("[runHandler] run action");
sgnezdov 4:78bcd5a675e1 244 Appointment *apt = (Appointment*)evt.value.p;
sgnezdov 7:98c8b2eabea3 245 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 246 tr_debug("[runHandler] quit requested");
sgnezdov 7:98c8b2eabea3 247 _quitRunner = true;
sgnezdov 7:98c8b2eabea3 248 break;
sgnezdov 7:98c8b2eabea3 249 }
sgnezdov 5:d8f69ac330f2 250 Job *job = apt->GetJob();
sgnezdov 10:8cff30b5b90d 251 JobType *jt = _jobService->GetJob(job->GetTypeID());
sgnezdov 10:8cff30b5b90d 252 if (jt == NULL) {
sgnezdov 10:8cff30b5b90d 253 tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID());
sgnezdov 5:d8f69ac330f2 254 // NO reschedule
sgnezdov 5:d8f69ac330f2 255 delete apt;
sgnezdov 5:d8f69ac330f2 256 continue;
sgnezdov 5:d8f69ac330f2 257 }
sgnezdov 10:8cff30b5b90d 258 tr_debug("Job Started");
sgnezdov 10:8cff30b5b90d 259 jt->RunJob();
sgnezdov 10:8cff30b5b90d 260 tr_debug("Job Finished");
sgnezdov 5:d8f69ac330f2 261 this->reschedule(apt);
sgnezdov 4:78bcd5a675e1 262 } else {
sgnezdov 10:8cff30b5b90d 263 tr_error("[runHandler] NOT osEventMessage");
sgnezdov 4:78bcd5a675e1 264 }
sgnezdov 4:78bcd5a675e1 265 }
sgnezdov 4:78bcd5a675e1 266 }
sgnezdov 3:f08f55827736 267
sgnezdov 0:806403f3d0d1 268
sgnezdov 0:806403f3d0d1 269 }