job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.
Dependents: JobSchedulerDemo Borsch
scheduler.cpp
- Committer:
- sgnezdov
- Date:
- 2017-07-11
- Revision:
- 6:5baa0e4ec500
- Parent:
- 5:d8f69ac330f2
- Child:
- 7:98c8b2eabea3
File content as of revision 6:5baa0e4ec500:
#include "scheduler.h" Timeout WakeOnce; void update(void *target) { }; namespace JobScheduler { const ActionType JobAddAT(1); const ActionType JobRunAT(3); bool descendingTimeline(Appointment *a1, Appointment *a2) { bool rv = a1->GetTime() <= a2->GetTime(); //printf("(%d %d:%d)", *d1, *d2, rv); return rv; }; /** JobAddReq adds new job to the scheduler. */ struct JobAddReq: Action { Appointment *apt; Response<JobID> response; JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {} }; struct JobRunReq: Action { JobRunReq() : Action(JobRunAT) {} }; Scheduler::Scheduler(JobService *jobService) : _jobService(jobService), _nextJobID(1) { } void Scheduler::updateAdapter(void *thisPointer) { Scheduler *self = static_cast<Scheduler*>(thisPointer); self->updateHandler(); } void Scheduler::runAdapter(void *thisPointer) { Scheduler *self = static_cast<Scheduler*>(thisPointer); self->runHandler(); } void Scheduler::Start() { _updater.start(callback(Scheduler::updateAdapter, this)); _runner.start(callback(Scheduler::runAdapter, this)); } void Scheduler::Stop() { // it is not thread-safe, but impact is non-existent. _quit = true; } void Scheduler::WaitToStop() { _updater.join(); _runner.join(); } Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) { Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0)); if (NULL == apt) { printf("[Scheduler::JobAdd] failed to allocate appointment\n"); return Response<JobID>(1, 0); } return this->reschedule(apt); } Response<JobID> Scheduler::reschedule(Appointment *apt) { JobAddReq req(apt); // set next appointment time time_t now = time(NULL); // now in seconds apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now)); if (apt->GetTime() == 0) { // there is no next run time; delete appointment printf("[Scheduler::reschedule] NO next appointment\n"); delete apt; req.response.error = 2; return req.response; } printf("[Scheduler::reschedule] put\n"); _updates.put(&req); printf("[Scheduler::reschedule] get\n"); // default is wait forever osEvent evt = req.resQueue.get(); if (evt.status == osEventMessage) { if (evt.value.p != NULL) { printf("[Scheduler::reschedule] completed ok\n"); } else { printf("[Scheduler::reschedule] NOT added (C1)\n"); } } else { // not sure what condition is printf("[Scheduler::reschedule] NOT added (C2)\n"); delete apt; apt = NULL; } // yes, return a copy of the structure return req.response; } void Scheduler::JobRemove(JobID jobID) { } static JobRunReq jobRunReq; void Scheduler::onWakeOnce() { _updates.put(&jobRunReq); } void Scheduler::updateHandler() { while (!_quit) { printf("[Scheduler::updateHandler] waiting for action\n"); // wait forever ... osEvent evt = _updates.get(); if (evt.status == osEventMessage) { printf("[Scheduler::updateHandler] process action\n"); this->process((Action*)evt.value.p); } else { printf("[Scheduler::updateHandler] NOT osEventMessage\n"); } } } void Scheduler::process(Action *action) { time_t now = time(NULL); // now in seconds switch(action->type) { case JobAddAT: { printf("[Scheduler::process] JobAddAT\n"); JobAddReq *req = static_cast<JobAddReq*>(action); Job *job = req->apt->GetJob(); if (job->GetID() == 0) { // assign job its ID job->Init(_nextJobID++); } node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline); if (NULL == tmp) { printf("[Scheduler::process] timeline insert failed\n"); action->resQueue.put(NULL); // internal state has not changed return; } req->response.data = job->GetID(); //printf("[Scheduler::process] simulate error\n"); //action->resQueue.put(NULL); action->resQueue.put(&req->response); break; } case JobRunAT: { printf("[Scheduler::process] JobRunAT\n"); // execute job run logic after switch break; } default: printf("[Scheduler::process] unknown action type\n"); action->resQueue.put(NULL); } node<Appointment> *wakeNode = _timeline.pop(1); Appointment *wakeApt = wakeNode->data; Job* wakeJob = wakeApt->GetJob(); if (now < wakeApt->GetTime()) { // request wake up time_t sleepTime = wakeApt->GetTime() - now; printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime); WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime); } else { // process job printf("[Scheduler::process] job ID %d ready to run\n", wakeJob->GetID()); _timeline.remove(1); _runs.put(wakeApt); } } void Scheduler::runHandler() { while (!_quit) { printf("[Scheduler::runHandler] waiting for action\n"); // wait forever ... osEvent evt = _runs.get(); if (evt.status == osEventMessage) { printf("[Scheduler::runHandler] run action\n"); Appointment *apt = (Appointment*)evt.value.p; Job *job = apt->GetJob(); jobFunc *f = _jobService->GetJob(job->GetTypeID()); if (f == NULL) { printf("[Scheduler::runHandler] NO FUNC for job type id %d\n", job->GetTypeID()); // NO reschedule delete apt; continue; } printf("Job Started\n"); f(); printf("Job Finished\n"); this->reschedule(apt); } else { printf("[Scheduler::runHandler] NOT osEventMessage\n"); } } } }