job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Wed Aug 02 19:44:44 2017 +0000
Revision:
14:a30cc783ae89
Parent:
13:6be67ee77861
Child:
15:6b8fa5dff770
rolled back GetList

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 9:ee21cd055a97 3 #include "mbed-trace/mbed_trace.h"
sgnezdov 9:ee21cd055a97 4 #define TRACE_GROUP "schd"
sgnezdov 9:ee21cd055a97 5
sgnezdov 3:f08f55827736 6 Timeout WakeOnce;
sgnezdov 3:f08f55827736 7
sgnezdov 0:806403f3d0d1 8 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 9
sgnezdov 3:f08f55827736 10 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 11 const ActionType JobRunAT(3);
sgnezdov 7:98c8b2eabea3 12 const ActionType JobQuitAT(4);
sgnezdov 3:f08f55827736 13
sgnezdov 2:9bf5366ad5a2 14 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 15 {
sgnezdov 12:684ddfc57199 16 time_t t1 = a1->GetTime();
sgnezdov 12:684ddfc57199 17 time_t t2 = a2->GetTime();
sgnezdov 12:684ddfc57199 18 bool rv = t1 <= t2;
sgnezdov 12:684ddfc57199 19 tr_debug("apt %d:%d <= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv);
sgnezdov 2:9bf5366ad5a2 20 return rv;
sgnezdov 2:9bf5366ad5a2 21 };
sgnezdov 3:f08f55827736 22
sgnezdov 2:9bf5366ad5a2 23 /**
sgnezdov 2:9bf5366ad5a2 24 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 25 */
sgnezdov 1:ec6a1d054065 26 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 27 Appointment *apt;
sgnezdov 1:ec6a1d054065 28 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 29 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 30 };
sgnezdov 7:98c8b2eabea3 31
sgnezdov 0:806403f3d0d1 32 Scheduler::Scheduler(JobService *jobService)
sgnezdov 7:98c8b2eabea3 33 : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 34
sgnezdov 0:806403f3d0d1 35 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 36 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 37 self->updateHandler();
sgnezdov 0:806403f3d0d1 38 }
sgnezdov 4:78bcd5a675e1 39
sgnezdov 4:78bcd5a675e1 40 void Scheduler::runAdapter(void *thisPointer) {
sgnezdov 4:78bcd5a675e1 41 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 4:78bcd5a675e1 42 self->runHandler();
sgnezdov 4:78bcd5a675e1 43 }
sgnezdov 0:806403f3d0d1 44
sgnezdov 0:806403f3d0d1 45 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 46 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 4:78bcd5a675e1 47 _runner.start(callback(Scheduler::runAdapter, this));
sgnezdov 0:806403f3d0d1 48 }
sgnezdov 0:806403f3d0d1 49
sgnezdov 0:806403f3d0d1 50 void Scheduler::Stop() {
sgnezdov 7:98c8b2eabea3 51 _runs.put(NULL);
sgnezdov 7:98c8b2eabea3 52
sgnezdov 7:98c8b2eabea3 53 Action uReq = Action(JobQuitAT);
sgnezdov 7:98c8b2eabea3 54 _updates.put(&uReq);
sgnezdov 7:98c8b2eabea3 55 uReq.resQueue.get();
sgnezdov 0:806403f3d0d1 56 }
sgnezdov 7:98c8b2eabea3 57
sgnezdov 0:806403f3d0d1 58 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 59 _updater.join();
sgnezdov 4:78bcd5a675e1 60 _runner.join();
sgnezdov 0:806403f3d0d1 61 }
sgnezdov 0:806403f3d0d1 62
sgnezdov 2:9bf5366ad5a2 63 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 64 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 65 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 66 tr_error("[JobAdd] failed to allocate appointment");
sgnezdov 2:9bf5366ad5a2 67 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 68 }
sgnezdov 5:d8f69ac330f2 69 return this->reschedule(apt);
sgnezdov 5:d8f69ac330f2 70 }
sgnezdov 5:d8f69ac330f2 71
sgnezdov 5:d8f69ac330f2 72 Response<JobID> Scheduler::reschedule(Appointment *apt) {
sgnezdov 2:9bf5366ad5a2 73 JobAddReq req(apt);
sgnezdov 6:5baa0e4ec500 74
sgnezdov 6:5baa0e4ec500 75 // set next appointment time
sgnezdov 6:5baa0e4ec500 76 time_t now = time(NULL); // now in seconds
sgnezdov 6:5baa0e4ec500 77 apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
sgnezdov 6:5baa0e4ec500 78 if (apt->GetTime() == 0) {
sgnezdov 6:5baa0e4ec500 79 // there is no next run time; delete appointment
sgnezdov 10:8cff30b5b90d 80 tr_debug("[reschedule] NO next appointment");
sgnezdov 6:5baa0e4ec500 81 delete apt;
sgnezdov 6:5baa0e4ec500 82 req.response.error = 2;
sgnezdov 6:5baa0e4ec500 83 return req.response;
sgnezdov 6:5baa0e4ec500 84 }
sgnezdov 6:5baa0e4ec500 85
sgnezdov 10:8cff30b5b90d 86 tr_debug("[reschedule] put");
sgnezdov 4:78bcd5a675e1 87 _updates.put(&req);
sgnezdov 10:8cff30b5b90d 88 tr_debug("[reschedule] get");
sgnezdov 0:806403f3d0d1 89 // default is wait forever
sgnezdov 1:ec6a1d054065 90 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 91 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 92 if (evt.value.p != NULL) {
sgnezdov 10:8cff30b5b90d 93 tr_debug("[reschedule] completed ok");
sgnezdov 2:9bf5366ad5a2 94 } else {
sgnezdov 10:8cff30b5b90d 95 tr_error("[reschedule] NOT added (C1)");
sgnezdov 2:9bf5366ad5a2 96 }
sgnezdov 2:9bf5366ad5a2 97 } else {
sgnezdov 2:9bf5366ad5a2 98 // not sure what condition is
sgnezdov 10:8cff30b5b90d 99 tr_error("[reschedule] NOT added (C2)");
sgnezdov 2:9bf5366ad5a2 100 delete apt;
sgnezdov 2:9bf5366ad5a2 101 apt = NULL;
sgnezdov 0:806403f3d0d1 102 }
sgnezdov 2:9bf5366ad5a2 103 // yes, return a copy of the structure
sgnezdov 5:d8f69ac330f2 104 return req.response;
sgnezdov 0:806403f3d0d1 105 }
sgnezdov 0:806403f3d0d1 106
sgnezdov 2:9bf5366ad5a2 107 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 13:6be67ee77861 108 tr_error("JobRemove is not implemented");
sgnezdov 13:6be67ee77861 109 }
sgnezdov 13:6be67ee77861 110
sgnezdov 14:a30cc783ae89 111 // LinkedList<Job> Scheduler::JobList() {
sgnezdov 14:a30cc783ae89 112 // tr_error("JobList is not implemented");
sgnezdov 14:a30cc783ae89 113 // LinkedList<Job> rv;
sgnezdov 14:a30cc783ae89 114 // return rv;
sgnezdov 14:a30cc783ae89 115 // }
sgnezdov 3:f08f55827736 116
sgnezdov 7:98c8b2eabea3 117 static Action jobRunReq(JobRunAT);
sgnezdov 3:f08f55827736 118 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 119 {
sgnezdov 4:78bcd5a675e1 120 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 121 }
sgnezdov 0:806403f3d0d1 122
sgnezdov 0:806403f3d0d1 123 void Scheduler::updateHandler() {
sgnezdov 7:98c8b2eabea3 124 while (!_quitUpdater) {
sgnezdov 10:8cff30b5b90d 125 tr_debug("[updateHandler] waiting for action");
sgnezdov 0:806403f3d0d1 126 // wait forever ...
sgnezdov 4:78bcd5a675e1 127 osEvent evt = _updates.get();
sgnezdov 0:806403f3d0d1 128 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 129 tr_debug("[updateHandler] process action");
sgnezdov 0:806403f3d0d1 130 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 131 } else {
sgnezdov 10:8cff30b5b90d 132 tr_error("[updateHandler] NOT osEventMessage");
sgnezdov 0:806403f3d0d1 133 }
sgnezdov 0:806403f3d0d1 134 }
sgnezdov 0:806403f3d0d1 135 }
sgnezdov 3:f08f55827736 136
sgnezdov 0:806403f3d0d1 137 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 138 {
sgnezdov 3:f08f55827736 139 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 140 switch(action->type) {
sgnezdov 1:ec6a1d054065 141 case JobAddAT: {
sgnezdov 10:8cff30b5b90d 142 tr_debug("[process] JobAddAT");
sgnezdov 1:ec6a1d054065 143 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 144 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 145 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 146 // assign job its ID
sgnezdov 2:9bf5366ad5a2 147 job->Init(_nextJobID++);
sgnezdov 12:684ddfc57199 148 tr_debug("assigned new job its id %d", job->GetID());
sgnezdov 12:684ddfc57199 149 } else {
sgnezdov 12:684ddfc57199 150 tr_debug("job already has id %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 151 }
sgnezdov 2:9bf5366ad5a2 152 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 153 if (NULL == tmp) {
sgnezdov 12:684ddfc57199 154 tr_error("[process] timeline insert failed for job ID %d", job->GetID());
sgnezdov 2:9bf5366ad5a2 155 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 156 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 157 return;
sgnezdov 2:9bf5366ad5a2 158 }
sgnezdov 3:f08f55827736 159 req->response.data = job->GetID();
sgnezdov 10:8cff30b5b90d 160 //tr_debug("[process] simulate error");
sgnezdov 2:9bf5366ad5a2 161 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 162 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 163 break;
sgnezdov 1:ec6a1d054065 164 }
sgnezdov 3:f08f55827736 165 case JobRunAT: {
sgnezdov 10:8cff30b5b90d 166 tr_debug("[process] JobRunAT");
sgnezdov 3:f08f55827736 167 // execute job run logic after switch
sgnezdov 3:f08f55827736 168 break;
sgnezdov 3:f08f55827736 169 }
sgnezdov 7:98c8b2eabea3 170 case JobQuitAT: {
sgnezdov 10:8cff30b5b90d 171 tr_debug("[process] JobQuitAT");
sgnezdov 7:98c8b2eabea3 172 action->resQueue.put(NULL);
sgnezdov 7:98c8b2eabea3 173 _quitUpdater = true;
sgnezdov 7:98c8b2eabea3 174 return;
sgnezdov 7:98c8b2eabea3 175 }
sgnezdov 1:ec6a1d054065 176 default:
sgnezdov 10:8cff30b5b90d 177 tr_warn("[process] unknown action type");
sgnezdov 1:ec6a1d054065 178 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 179 }
sgnezdov 3:f08f55827736 180 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 12:684ddfc57199 181 if (wakeNode == NULL) {
sgnezdov 12:684ddfc57199 182 tr_debug("[process] found no nodes to run");
sgnezdov 12:684ddfc57199 183 return;
sgnezdov 12:684ddfc57199 184 }
sgnezdov 3:f08f55827736 185 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 186 Job* wakeJob = wakeApt->GetJob();
sgnezdov 5:d8f69ac330f2 187 if (now < wakeApt->GetTime()) {
sgnezdov 3:f08f55827736 188 // request wake up
sgnezdov 5:d8f69ac330f2 189 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 10:8cff30b5b90d 190 tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 191 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 192 } else {
sgnezdov 3:f08f55827736 193 // process job
sgnezdov 10:8cff30b5b90d 194 tr_debug("[process] job ID %d ready to run", wakeJob->GetID());
sgnezdov 3:f08f55827736 195 _timeline.remove(1);
sgnezdov 4:78bcd5a675e1 196 _runs.put(wakeApt);
sgnezdov 12:684ddfc57199 197 // make sure we run this function one more time
sgnezdov 12:684ddfc57199 198 // in case there are jobs left to run
sgnezdov 12:684ddfc57199 199 //
sgnezdov 12:684ddfc57199 200 // don't use WakeOnce, because it appears to work only for
sgnezdov 12:684ddfc57199 201 // one wake up request at a time.
sgnezdov 12:684ddfc57199 202 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 203 }
sgnezdov 4:78bcd5a675e1 204 }
sgnezdov 4:78bcd5a675e1 205
sgnezdov 4:78bcd5a675e1 206 void Scheduler::runHandler() {
sgnezdov 7:98c8b2eabea3 207 while (!_quitRunner) {
sgnezdov 10:8cff30b5b90d 208 tr_debug("[runHandler] waiting for action");
sgnezdov 4:78bcd5a675e1 209 // wait forever ...
sgnezdov 4:78bcd5a675e1 210 osEvent evt = _runs.get();
sgnezdov 4:78bcd5a675e1 211 if (evt.status == osEventMessage) {
sgnezdov 10:8cff30b5b90d 212 tr_debug("[runHandler] run action");
sgnezdov 4:78bcd5a675e1 213 Appointment *apt = (Appointment*)evt.value.p;
sgnezdov 7:98c8b2eabea3 214 if (NULL == apt) {
sgnezdov 10:8cff30b5b90d 215 tr_debug("[runHandler] quit requested");
sgnezdov 7:98c8b2eabea3 216 _quitRunner = true;
sgnezdov 7:98c8b2eabea3 217 break;
sgnezdov 7:98c8b2eabea3 218 }
sgnezdov 5:d8f69ac330f2 219 Job *job = apt->GetJob();
sgnezdov 10:8cff30b5b90d 220 JobType *jt = _jobService->GetJob(job->GetTypeID());
sgnezdov 10:8cff30b5b90d 221 if (jt == NULL) {
sgnezdov 10:8cff30b5b90d 222 tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID());
sgnezdov 5:d8f69ac330f2 223 // NO reschedule
sgnezdov 5:d8f69ac330f2 224 delete apt;
sgnezdov 5:d8f69ac330f2 225 continue;
sgnezdov 5:d8f69ac330f2 226 }
sgnezdov 10:8cff30b5b90d 227 tr_debug("Job Started");
sgnezdov 10:8cff30b5b90d 228 jt->RunJob();
sgnezdov 10:8cff30b5b90d 229 tr_debug("Job Finished");
sgnezdov 5:d8f69ac330f2 230 this->reschedule(apt);
sgnezdov 4:78bcd5a675e1 231 } else {
sgnezdov 10:8cff30b5b90d 232 tr_error("[runHandler] NOT osEventMessage");
sgnezdov 4:78bcd5a675e1 233 }
sgnezdov 4:78bcd5a675e1 234 }
sgnezdov 4:78bcd5a675e1 235 }
sgnezdov 3:f08f55827736 236
sgnezdov 0:806403f3d0d1 237
sgnezdov 0:806403f3d0d1 238 }