job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

Committer:
sgnezdov
Date:
Tue Jul 11 23:03:23 2017 +0000
Revision:
7:98c8b2eabea3
Parent:
6:5baa0e4ec500
Child:
9:ee21cd055a97
improved scheduler quit processing

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sgnezdov 0:806403f3d0d1 1 #include "scheduler.h"
sgnezdov 0:806403f3d0d1 2
sgnezdov 3:f08f55827736 3 Timeout WakeOnce;
sgnezdov 3:f08f55827736 4
sgnezdov 0:806403f3d0d1 5 void update(void *target) {
sgnezdov 0:806403f3d0d1 6 };
sgnezdov 0:806403f3d0d1 7
sgnezdov 0:806403f3d0d1 8 namespace JobScheduler {
sgnezdov 1:ec6a1d054065 9
sgnezdov 3:f08f55827736 10 const ActionType JobAddAT(1);
sgnezdov 3:f08f55827736 11 const ActionType JobRunAT(3);
sgnezdov 7:98c8b2eabea3 12 const ActionType JobQuitAT(4);
sgnezdov 3:f08f55827736 13
sgnezdov 2:9bf5366ad5a2 14 bool descendingTimeline(Appointment *a1, Appointment *a2)
sgnezdov 2:9bf5366ad5a2 15 {
sgnezdov 2:9bf5366ad5a2 16 bool rv = a1->GetTime() <= a2->GetTime();
sgnezdov 2:9bf5366ad5a2 17 //printf("(%d %d:%d)", *d1, *d2, rv);
sgnezdov 2:9bf5366ad5a2 18 return rv;
sgnezdov 2:9bf5366ad5a2 19 };
sgnezdov 3:f08f55827736 20
sgnezdov 2:9bf5366ad5a2 21 /**
sgnezdov 2:9bf5366ad5a2 22 JobAddReq adds new job to the scheduler.
sgnezdov 2:9bf5366ad5a2 23 */
sgnezdov 1:ec6a1d054065 24 struct JobAddReq: Action {
sgnezdov 2:9bf5366ad5a2 25 Appointment *apt;
sgnezdov 1:ec6a1d054065 26 Response<JobID> response;
sgnezdov 2:9bf5366ad5a2 27 JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
sgnezdov 1:ec6a1d054065 28 };
sgnezdov 7:98c8b2eabea3 29
sgnezdov 0:806403f3d0d1 30 Scheduler::Scheduler(JobService *jobService)
sgnezdov 7:98c8b2eabea3 31 : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { }
sgnezdov 0:806403f3d0d1 32
sgnezdov 0:806403f3d0d1 33 void Scheduler::updateAdapter(void *thisPointer) {
sgnezdov 0:806403f3d0d1 34 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 0:806403f3d0d1 35 self->updateHandler();
sgnezdov 0:806403f3d0d1 36 }
sgnezdov 4:78bcd5a675e1 37
sgnezdov 4:78bcd5a675e1 38 void Scheduler::runAdapter(void *thisPointer) {
sgnezdov 4:78bcd5a675e1 39 Scheduler *self = static_cast<Scheduler*>(thisPointer);
sgnezdov 4:78bcd5a675e1 40 self->runHandler();
sgnezdov 4:78bcd5a675e1 41 }
sgnezdov 0:806403f3d0d1 42
sgnezdov 0:806403f3d0d1 43 void Scheduler::Start() {
sgnezdov 0:806403f3d0d1 44 _updater.start(callback(Scheduler::updateAdapter, this));
sgnezdov 4:78bcd5a675e1 45 _runner.start(callback(Scheduler::runAdapter, this));
sgnezdov 0:806403f3d0d1 46 }
sgnezdov 0:806403f3d0d1 47
sgnezdov 0:806403f3d0d1 48 void Scheduler::Stop() {
sgnezdov 7:98c8b2eabea3 49 _runs.put(NULL);
sgnezdov 7:98c8b2eabea3 50
sgnezdov 7:98c8b2eabea3 51 Action uReq = Action(JobQuitAT);
sgnezdov 7:98c8b2eabea3 52 _updates.put(&uReq);
sgnezdov 7:98c8b2eabea3 53 uReq.resQueue.get();
sgnezdov 0:806403f3d0d1 54 }
sgnezdov 7:98c8b2eabea3 55
sgnezdov 0:806403f3d0d1 56 void Scheduler::WaitToStop() {
sgnezdov 0:806403f3d0d1 57 _updater.join();
sgnezdov 4:78bcd5a675e1 58 _runner.join();
sgnezdov 0:806403f3d0d1 59 }
sgnezdov 0:806403f3d0d1 60
sgnezdov 2:9bf5366ad5a2 61 Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
sgnezdov 2:9bf5366ad5a2 62 Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
sgnezdov 2:9bf5366ad5a2 63 if (NULL == apt) {
sgnezdov 2:9bf5366ad5a2 64 printf("[Scheduler::JobAdd] failed to allocate appointment\n");
sgnezdov 2:9bf5366ad5a2 65 return Response<JobID>(1, 0);
sgnezdov 2:9bf5366ad5a2 66 }
sgnezdov 5:d8f69ac330f2 67 return this->reschedule(apt);
sgnezdov 5:d8f69ac330f2 68 }
sgnezdov 5:d8f69ac330f2 69
sgnezdov 5:d8f69ac330f2 70 Response<JobID> Scheduler::reschedule(Appointment *apt) {
sgnezdov 2:9bf5366ad5a2 71 JobAddReq req(apt);
sgnezdov 6:5baa0e4ec500 72
sgnezdov 6:5baa0e4ec500 73 // set next appointment time
sgnezdov 6:5baa0e4ec500 74 time_t now = time(NULL); // now in seconds
sgnezdov 6:5baa0e4ec500 75 apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
sgnezdov 6:5baa0e4ec500 76 if (apt->GetTime() == 0) {
sgnezdov 6:5baa0e4ec500 77 // there is no next run time; delete appointment
sgnezdov 6:5baa0e4ec500 78 printf("[Scheduler::reschedule] NO next appointment\n");
sgnezdov 6:5baa0e4ec500 79 delete apt;
sgnezdov 6:5baa0e4ec500 80 req.response.error = 2;
sgnezdov 6:5baa0e4ec500 81 return req.response;
sgnezdov 6:5baa0e4ec500 82 }
sgnezdov 6:5baa0e4ec500 83
sgnezdov 5:d8f69ac330f2 84 printf("[Scheduler::reschedule] put\n");
sgnezdov 4:78bcd5a675e1 85 _updates.put(&req);
sgnezdov 5:d8f69ac330f2 86 printf("[Scheduler::reschedule] get\n");
sgnezdov 0:806403f3d0d1 87 // default is wait forever
sgnezdov 1:ec6a1d054065 88 osEvent evt = req.resQueue.get();
sgnezdov 0:806403f3d0d1 89 if (evt.status == osEventMessage) {
sgnezdov 2:9bf5366ad5a2 90 if (evt.value.p != NULL) {
sgnezdov 5:d8f69ac330f2 91 printf("[Scheduler::reschedule] completed ok\n");
sgnezdov 2:9bf5366ad5a2 92 } else {
sgnezdov 5:d8f69ac330f2 93 printf("[Scheduler::reschedule] NOT added (C1)\n");
sgnezdov 2:9bf5366ad5a2 94 }
sgnezdov 2:9bf5366ad5a2 95 } else {
sgnezdov 2:9bf5366ad5a2 96 // not sure what condition is
sgnezdov 5:d8f69ac330f2 97 printf("[Scheduler::reschedule] NOT added (C2)\n");
sgnezdov 2:9bf5366ad5a2 98 delete apt;
sgnezdov 2:9bf5366ad5a2 99 apt = NULL;
sgnezdov 0:806403f3d0d1 100 }
sgnezdov 2:9bf5366ad5a2 101 // yes, return a copy of the structure
sgnezdov 5:d8f69ac330f2 102 return req.response;
sgnezdov 0:806403f3d0d1 103 }
sgnezdov 0:806403f3d0d1 104
sgnezdov 2:9bf5366ad5a2 105 void Scheduler::JobRemove(JobID jobID) {
sgnezdov 0:806403f3d0d1 106 }
sgnezdov 3:f08f55827736 107
sgnezdov 7:98c8b2eabea3 108 static Action jobRunReq(JobRunAT);
sgnezdov 3:f08f55827736 109 void Scheduler::onWakeOnce()
sgnezdov 3:f08f55827736 110 {
sgnezdov 4:78bcd5a675e1 111 _updates.put(&jobRunReq);
sgnezdov 3:f08f55827736 112 }
sgnezdov 0:806403f3d0d1 113
sgnezdov 0:806403f3d0d1 114 void Scheduler::updateHandler() {
sgnezdov 7:98c8b2eabea3 115 while (!_quitUpdater) {
sgnezdov 0:806403f3d0d1 116 printf("[Scheduler::updateHandler] waiting for action\n");
sgnezdov 0:806403f3d0d1 117 // wait forever ...
sgnezdov 4:78bcd5a675e1 118 osEvent evt = _updates.get();
sgnezdov 0:806403f3d0d1 119 if (evt.status == osEventMessage) {
sgnezdov 0:806403f3d0d1 120 printf("[Scheduler::updateHandler] process action\n");
sgnezdov 0:806403f3d0d1 121 this->process((Action*)evt.value.p);
sgnezdov 0:806403f3d0d1 122 } else {
sgnezdov 0:806403f3d0d1 123 printf("[Scheduler::updateHandler] NOT osEventMessage\n");
sgnezdov 0:806403f3d0d1 124 }
sgnezdov 0:806403f3d0d1 125 }
sgnezdov 0:806403f3d0d1 126 }
sgnezdov 3:f08f55827736 127
sgnezdov 0:806403f3d0d1 128 void Scheduler::process(Action *action)
sgnezdov 0:806403f3d0d1 129 {
sgnezdov 3:f08f55827736 130 time_t now = time(NULL); // now in seconds
sgnezdov 1:ec6a1d054065 131 switch(action->type) {
sgnezdov 1:ec6a1d054065 132 case JobAddAT: {
sgnezdov 5:d8f69ac330f2 133 printf("[Scheduler::process] JobAddAT\n");
sgnezdov 1:ec6a1d054065 134 JobAddReq *req = static_cast<JobAddReq*>(action);
sgnezdov 2:9bf5366ad5a2 135 Job *job = req->apt->GetJob();
sgnezdov 2:9bf5366ad5a2 136 if (job->GetID() == 0) {
sgnezdov 2:9bf5366ad5a2 137 // assign job its ID
sgnezdov 2:9bf5366ad5a2 138 job->Init(_nextJobID++);
sgnezdov 2:9bf5366ad5a2 139 }
sgnezdov 2:9bf5366ad5a2 140 node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
sgnezdov 2:9bf5366ad5a2 141 if (NULL == tmp) {
sgnezdov 2:9bf5366ad5a2 142 printf("[Scheduler::process] timeline insert failed\n");
sgnezdov 2:9bf5366ad5a2 143 action->resQueue.put(NULL);
sgnezdov 3:f08f55827736 144 // internal state has not changed
sgnezdov 2:9bf5366ad5a2 145 return;
sgnezdov 2:9bf5366ad5a2 146 }
sgnezdov 3:f08f55827736 147 req->response.data = job->GetID();
sgnezdov 2:9bf5366ad5a2 148 //printf("[Scheduler::process] simulate error\n");
sgnezdov 2:9bf5366ad5a2 149 //action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 150 action->resQueue.put(&req->response);
sgnezdov 1:ec6a1d054065 151 break;
sgnezdov 1:ec6a1d054065 152 }
sgnezdov 3:f08f55827736 153 case JobRunAT: {
sgnezdov 5:d8f69ac330f2 154 printf("[Scheduler::process] JobRunAT\n");
sgnezdov 3:f08f55827736 155 // execute job run logic after switch
sgnezdov 3:f08f55827736 156 break;
sgnezdov 3:f08f55827736 157 }
sgnezdov 7:98c8b2eabea3 158 case JobQuitAT: {
sgnezdov 7:98c8b2eabea3 159 printf("[Scheduler::process] JobQuitAT\n");
sgnezdov 7:98c8b2eabea3 160 action->resQueue.put(NULL);
sgnezdov 7:98c8b2eabea3 161 _quitUpdater = true;
sgnezdov 7:98c8b2eabea3 162 return;
sgnezdov 7:98c8b2eabea3 163 }
sgnezdov 1:ec6a1d054065 164 default:
sgnezdov 1:ec6a1d054065 165 printf("[Scheduler::process] unknown action type\n");
sgnezdov 1:ec6a1d054065 166 action->resQueue.put(NULL);
sgnezdov 1:ec6a1d054065 167 }
sgnezdov 3:f08f55827736 168 node<Appointment> *wakeNode = _timeline.pop(1);
sgnezdov 3:f08f55827736 169 Appointment *wakeApt = wakeNode->data;
sgnezdov 3:f08f55827736 170 Job* wakeJob = wakeApt->GetJob();
sgnezdov 5:d8f69ac330f2 171 if (now < wakeApt->GetTime()) {
sgnezdov 3:f08f55827736 172 // request wake up
sgnezdov 5:d8f69ac330f2 173 time_t sleepTime = wakeApt->GetTime() - now;
sgnezdov 3:f08f55827736 174 printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime);
sgnezdov 3:f08f55827736 175 WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
sgnezdov 3:f08f55827736 176 } else {
sgnezdov 3:f08f55827736 177 // process job
sgnezdov 5:d8f69ac330f2 178 printf("[Scheduler::process] job ID %d ready to run\n", wakeJob->GetID());
sgnezdov 3:f08f55827736 179 _timeline.remove(1);
sgnezdov 4:78bcd5a675e1 180 _runs.put(wakeApt);
sgnezdov 3:f08f55827736 181 }
sgnezdov 4:78bcd5a675e1 182 }
sgnezdov 4:78bcd5a675e1 183
sgnezdov 4:78bcd5a675e1 184 void Scheduler::runHandler() {
sgnezdov 7:98c8b2eabea3 185 while (!_quitRunner) {
sgnezdov 4:78bcd5a675e1 186 printf("[Scheduler::runHandler] waiting for action\n");
sgnezdov 4:78bcd5a675e1 187 // wait forever ...
sgnezdov 4:78bcd5a675e1 188 osEvent evt = _runs.get();
sgnezdov 4:78bcd5a675e1 189 if (evt.status == osEventMessage) {
sgnezdov 4:78bcd5a675e1 190 printf("[Scheduler::runHandler] run action\n");
sgnezdov 4:78bcd5a675e1 191 Appointment *apt = (Appointment*)evt.value.p;
sgnezdov 7:98c8b2eabea3 192 if (NULL == apt) {
sgnezdov 7:98c8b2eabea3 193 printf("[Scheduler::runHandler] quit requested\n");
sgnezdov 7:98c8b2eabea3 194 _quitRunner = true;
sgnezdov 7:98c8b2eabea3 195 break;
sgnezdov 7:98c8b2eabea3 196 }
sgnezdov 5:d8f69ac330f2 197 Job *job = apt->GetJob();
sgnezdov 5:d8f69ac330f2 198 jobFunc *f = _jobService->GetJob(job->GetTypeID());
sgnezdov 5:d8f69ac330f2 199 if (f == NULL) {
sgnezdov 5:d8f69ac330f2 200 printf("[Scheduler::runHandler] NO FUNC for job type id %d\n", job->GetTypeID());
sgnezdov 5:d8f69ac330f2 201 // NO reschedule
sgnezdov 5:d8f69ac330f2 202 delete apt;
sgnezdov 5:d8f69ac330f2 203 continue;
sgnezdov 5:d8f69ac330f2 204 }
sgnezdov 5:d8f69ac330f2 205 printf("Job Started\n");
sgnezdov 5:d8f69ac330f2 206 f();
sgnezdov 5:d8f69ac330f2 207 printf("Job Finished\n");
sgnezdov 5:d8f69ac330f2 208 this->reschedule(apt);
sgnezdov 4:78bcd5a675e1 209 } else {
sgnezdov 4:78bcd5a675e1 210 printf("[Scheduler::runHandler] NOT osEventMessage\n");
sgnezdov 4:78bcd5a675e1 211 }
sgnezdov 4:78bcd5a675e1 212 }
sgnezdov 4:78bcd5a675e1 213 }
sgnezdov 3:f08f55827736 214
sgnezdov 0:806403f3d0d1 215
sgnezdov 0:806403f3d0d1 216 }