job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.
Dependents: JobSchedulerDemo Borsch
scheduler.cpp
- Committer:
- sgnezdov
- Date:
- 2017-08-04
- Revision:
- 19:965c8721cc8a
- Parent:
- 18:8be206ad1eb4
File content as of revision 19:965c8721cc8a:
#include "scheduler.h" // silence scheduler tracing, must be before mbed_trace.h #define MBED_TRACE_MAX_LEVEL 0 #include "mbed-trace/mbed_trace.h" #define TRACE_GROUP "schd" Timeout WakeOnce; namespace JobScheduler { const ActionType JobAddAT(1); const ActionType JobRunAT(3); const ActionType JobQuitAT(4); const ActionType JobListAT(5); bool descendingTimeline(Appointment *a1, Appointment *a2) { time_t t1 = a1->GetTime(); time_t t2 = a2->GetTime(); bool rv = t1 >= t2; tr_debug("apt %d:%d >= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv); return rv; }; /** JobAddReq adds new job to the scheduler. */ struct JobAddReq: Action { Appointment *apt; Response<JobID> response; JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {} }; struct JobListReq: Action { LinkedList<Appointment>& _jobsToFill; JobListReq(LinkedList<Appointment>& jobs) : Action(JobListAT), _jobsToFill(jobs) {} }; Scheduler::Scheduler(JobService *jobService) : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) { } void Scheduler::updateAdapter(void *thisPointer) { Scheduler *self = static_cast<Scheduler*>(thisPointer); self->updateHandler(); } void Scheduler::runAdapter(void *thisPointer) { Scheduler *self = static_cast<Scheduler*>(thisPointer); self->runHandler(); } void Scheduler::Start() { _updater.start(callback(Scheduler::updateAdapter, this)); _runner.start(callback(Scheduler::runAdapter, this)); } void Scheduler::Stop() { _runs.put(NULL); Action uReq = Action(JobQuitAT); _updates.put(&uReq); uReq.resQueue.get(); } void Scheduler::WaitToStop() { _updater.join(); _runner.join(); } Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) { Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0)); if (NULL == apt) { tr_error("[JobAdd] failed to allocate appointment"); return Response<JobID>(1, 0); } return this->reschedule(apt); } Response<JobID> Scheduler::reschedule(Appointment *apt) { JobAddReq req(apt); // set next appointment time time_t now = time(NULL); // now in seconds apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now)); if (apt->GetTime() == 0) { // there is no next run time; delete appointment tr_debug("[reschedule] NO next appointment"); delete apt; req.response.error = 2; return req.response; } tr_debug("[reschedule] put"); _updates.put(&req); tr_debug("[reschedule] get"); // default is wait forever osEvent evt = req.resQueue.get(); if (evt.status == osEventMessage) { if (evt.value.p != NULL) { tr_debug("[reschedule] completed ok"); } else { tr_error("[reschedule] NOT added (C1)"); } } else { // not sure what condition is tr_error("[reschedule] NOT added (C2)"); delete apt; apt = NULL; } // yes, return a copy of the structure return req.response; } void Scheduler::JobRemove(JobID jobID) { tr_error("JobRemove is not implemented"); } void Scheduler::AppointmentList(LinkedList<Appointment>& apts) { JobListReq req(apts); _updates.put(&req); osEvent evt = req.resQueue.get(); if (evt.status != osEventMessage) { // not sure what condition is tr_error("[JobList] status error"); } } static Action jobRunReq(JobRunAT); void Scheduler::onWakeOnce() { _updates.put(&jobRunReq); } void Scheduler::updateHandler() { osThreadId_t tid = osThreadGetId(); tr_debug("Scheduler monitor thread ID: 0x%X", tid); while (!_quitUpdater) { tr_debug("[updateHandler] waiting for action"); // wait forever ... osEvent evt = _updates.get(); if (evt.status == osEventMessage) { tr_debug("[updateHandler] process action"); this->process((Action*)evt.value.p); } else { tr_error("[updateHandler] NOT osEventMessage"); } } } void Scheduler::process(Action *action) { time_t now = time(NULL); // now in seconds switch(action->type) { case JobAddAT: { tr_debug("[process] JobAddAT"); JobAddReq *req = static_cast<JobAddReq*>(action); Job *job = req->apt->GetJob(); if (job->GetID() == 0) { // assign job its ID job->Init(_nextJobID++); tr_debug("assigned new job its id %d", job->GetID()); } else { tr_debug("job already has id %d", job->GetID()); } node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline); if (NULL == tmp) { tr_error("[process] timeline insert failed for job ID %d", job->GetID()); action->resQueue.put(NULL); // internal state has not changed return; } req->response.data = job->GetID(); //tr_debug("[process] simulate error"); //action->resQueue.put(NULL); action->resQueue.put(&req->response); break; } case JobRunAT: { tr_debug("[process] JobRunAT"); // execute job run logic after switch break; } case JobQuitAT: { tr_debug("[process] JobQuitAT"); action->resQueue.put(NULL); _quitUpdater = true; return; } case JobListAT: { tr_debug("[process] JobListAT"); JobListReq *req = static_cast<JobListReq*>(action); int loc = 1; node<Appointment>* n = _timeline.pop(loc); while (n != NULL) { Appointment* apt = n->data; tr_debug("[process] adding appointment with job ID %d to list", apt->GetJob()->GetID()); req->_jobsToFill.append(apt); ++loc; n = _timeline.pop(loc); } action->resQueue.put(NULL); return; } default: tr_warn("[process] unknown action type"); action->resQueue.put(NULL); } node<Appointment> *wakeNode = _timeline.pop(1); if (wakeNode == NULL) { tr_debug("[process] found no nodes to run"); return; } Appointment *wakeApt = wakeNode->data; Job* wakeJob = wakeApt->GetJob(); if (now < wakeApt->GetTime()) { // request wake up time_t sleepTime = wakeApt->GetTime() - now; tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime); WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime); } else { // process job tr_debug("[process] job ID %d ready to run", wakeJob->GetID()); _timeline.remove(1); _runs.put(wakeApt); // make sure we run this function one more time // in case there are jobs left to run // // don't use WakeOnce, because it appears to work only for // one wake up request at a time. _updates.put(&jobRunReq); } } void Scheduler::runHandler() { osThreadId_t tid = osThreadGetId(); tr_debug("Scheduler runner thread ID: 0x%X", tid); while (!_quitRunner) { tr_debug("[runHandler] waiting for action"); // wait forever ... osEvent evt = _runs.get(); if (evt.status == osEventMessage) { tr_debug("[runHandler] run action"); Appointment *apt = (Appointment*)evt.value.p; if (NULL == apt) { tr_debug("[runHandler] quit requested"); _quitRunner = true; break; } Job *job = apt->GetJob(); JobType *jt = _jobService->GetJob(job->GetTypeID()); if (jt == NULL) { tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID()); // NO reschedule delete apt; continue; } tr_debug("Job Started"); jt->RunJob(); tr_debug("Job Finished"); this->reschedule(apt); } else { tr_error("[runHandler] NOT osEventMessage"); } } } }