Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: JobSchedulerDemo Borsch
scheduler.cpp
- Committer:
- sgnezdov
- Date:
- 2017-07-11
- Revision:
- 5:d8f69ac330f2
- Parent:
- 4:78bcd5a675e1
- Child:
- 6:5baa0e4ec500
File content as of revision 5:d8f69ac330f2:
#include "scheduler.h"
Timeout WakeOnce;
void update(void *target) {
};
namespace JobScheduler {
const ActionType JobAddAT(1);
const ActionType JobRunAT(3);
bool descendingTimeline(Appointment *a1, Appointment *a2)
{
bool rv = a1->GetTime() <= a2->GetTime();
//printf("(%d %d:%d)", *d1, *d2, rv);
return rv;
};
/**
JobAddReq adds new job to the scheduler.
*/
struct JobAddReq: Action {
Appointment *apt;
Response<JobID> response;
JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
};
struct JobRunReq: Action {
JobRunReq() : Action(JobRunAT) {}
};
Scheduler::Scheduler(JobService *jobService)
: _jobService(jobService), _nextJobID(1) { }
void Scheduler::updateAdapter(void *thisPointer) {
Scheduler *self = static_cast<Scheduler*>(thisPointer);
self->updateHandler();
}
void Scheduler::runAdapter(void *thisPointer) {
Scheduler *self = static_cast<Scheduler*>(thisPointer);
self->runHandler();
}
void Scheduler::Start() {
_updater.start(callback(Scheduler::updateAdapter, this));
_runner.start(callback(Scheduler::runAdapter, this));
}
void Scheduler::Stop() {
// it is not thread-safe, but impact is non-existent.
_quit = true;
}
void Scheduler::WaitToStop() {
_updater.join();
_runner.join();
}
Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
if (NULL == apt) {
printf("[Scheduler::JobAdd] failed to allocate appointment\n");
return Response<JobID>(1, 0);
}
return this->reschedule(apt);
}
Response<JobID> Scheduler::reschedule(Appointment *apt) {
JobAddReq req(apt);
printf("[Scheduler::reschedule] put\n");
_updates.put(&req);
printf("[Scheduler::reschedule] get\n");
// default is wait forever
osEvent evt = req.resQueue.get();
if (evt.status == osEventMessage) {
if (evt.value.p != NULL) {
printf("[Scheduler::reschedule] completed ok\n");
} else {
printf("[Scheduler::reschedule] NOT added (C1)\n");
}
} else {
// not sure what condition is
printf("[Scheduler::reschedule] NOT added (C2)\n");
delete apt;
apt = NULL;
}
// yes, return a copy of the structure
return req.response;
}
void Scheduler::JobRemove(JobID jobID) {
}
static JobRunReq jobRunReq;
void Scheduler::onWakeOnce()
{
_updates.put(&jobRunReq);
}
void Scheduler::updateHandler() {
while (!_quit) {
printf("[Scheduler::updateHandler] waiting for action\n");
// wait forever ...
osEvent evt = _updates.get();
if (evt.status == osEventMessage) {
printf("[Scheduler::updateHandler] process action\n");
this->process((Action*)evt.value.p);
} else {
printf("[Scheduler::updateHandler] NOT osEventMessage\n");
}
}
}
void Scheduler::process(Action *action)
{
time_t now = time(NULL); // now in seconds
switch(action->type) {
case JobAddAT: {
printf("[Scheduler::process] JobAddAT\n");
JobAddReq *req = static_cast<JobAddReq*>(action);
Job *job = req->apt->GetJob();
if (job->GetID() == 0) {
// assign job its ID
job->Init(_nextJobID++);
}
// set next appointment time
req->apt->SetTime(job->GetSchedule()->NextRunTime(now));
node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
if (NULL == tmp) {
printf("[Scheduler::process] timeline insert failed\n");
action->resQueue.put(NULL);
// internal state has not changed
return;
}
req->response.data = job->GetID();
//printf("[Scheduler::process] simulate error\n");
//action->resQueue.put(NULL);
action->resQueue.put(&req->response);
break;
}
case JobRunAT: {
printf("[Scheduler::process] JobRunAT\n");
// execute job run logic after switch
break;
}
default:
printf("[Scheduler::process] unknown action type\n");
action->resQueue.put(NULL);
}
node<Appointment> *wakeNode = _timeline.pop(1);
Appointment *wakeApt = wakeNode->data;
Job* wakeJob = wakeApt->GetJob();
if (now < wakeApt->GetTime()) {
// request wake up
time_t sleepTime = wakeApt->GetTime() - now;
printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime);
WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
} else {
// process job
printf("[Scheduler::process] job ID %d ready to run\n", wakeJob->GetID());
_timeline.remove(1);
_runs.put(wakeApt);
}
}
void Scheduler::runHandler() {
while (!_quit) {
printf("[Scheduler::runHandler] waiting for action\n");
// wait forever ...
osEvent evt = _runs.get();
if (evt.status == osEventMessage) {
printf("[Scheduler::runHandler] run action\n");
Appointment *apt = (Appointment*)evt.value.p;
Job *job = apt->GetJob();
jobFunc *f = _jobService->GetJob(job->GetTypeID());
if (f == NULL) {
printf("[Scheduler::runHandler] NO FUNC for job type id %d\n", job->GetTypeID());
// NO reschedule
delete apt;
continue;
}
printf("Job Started\n");
f();
printf("Job Finished\n");
this->reschedule(apt);
} else {
printf("[Scheduler::runHandler] NOT osEventMessage\n");
}
}
}
}