job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

scheduler.cpp

Committer:
sgnezdov
Date:
2017-08-04
Revision:
19:965c8721cc8a
Parent:
18:8be206ad1eb4

File content as of revision 19:965c8721cc8a:

#include "scheduler.h"

// silence scheduler tracing, must be before mbed_trace.h
#define MBED_TRACE_MAX_LEVEL 0
#include "mbed-trace/mbed_trace.h"
#define TRACE_GROUP  "schd"

Timeout WakeOnce;

namespace JobScheduler {

    const ActionType JobAddAT(1);
    const ActionType JobRunAT(3);
    const ActionType JobQuitAT(4);
    const ActionType JobListAT(5);

    bool descendingTimeline(Appointment *a1, Appointment *a2)
    {
        time_t t1 = a1->GetTime();
        time_t t2 = a2->GetTime();
        bool rv = t1 >= t2;
        tr_debug("apt %d:%d >= %d:%d is %d)", a1->GetJob()->GetID(), t1, a2->GetJob()->GetID(), t2, rv);
        return rv;
    };
    
    /**
    JobAddReq adds new job to the scheduler.
    */
    struct JobAddReq: Action {
        Appointment *apt;
        Response<JobID> response;
        JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
    };
    
    struct JobListReq: Action {
        LinkedList<Appointment>& _jobsToFill;
        JobListReq(LinkedList<Appointment>& jobs) : Action(JobListAT), _jobsToFill(jobs) {}
    };
               
    Scheduler::Scheduler(JobService *jobService) 
    : _quitUpdater(false), _quitRunner(false), _jobService(jobService), _nextJobID(1) {   }

    void Scheduler::updateAdapter(void *thisPointer) {
        Scheduler *self = static_cast<Scheduler*>(thisPointer);
        self->updateHandler();
    }

    void Scheduler::runAdapter(void *thisPointer) {
        Scheduler *self = static_cast<Scheduler*>(thisPointer);
        self->runHandler();
    }
    
    void Scheduler::Start() {
        _updater.start(callback(Scheduler::updateAdapter, this));
        _runner.start(callback(Scheduler::runAdapter, this));
    }
    
    void Scheduler::Stop() {
        _runs.put(NULL);
        
        Action uReq = Action(JobQuitAT);
        _updates.put(&uReq);
        uReq.resQueue.get();
    }

    void Scheduler::WaitToStop() {
        _updater.join();
        _runner.join();
    }
    
    Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
        Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
        if (NULL == apt) {
            tr_error("[JobAdd] failed to allocate appointment");
            return Response<JobID>(1, 0);
        }
        return this->reschedule(apt);
    }
    
    Response<JobID> Scheduler::reschedule(Appointment *apt) {
        JobAddReq req(apt);
        
        // set next appointment time
        time_t now = time(NULL); // now in seconds
        apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
        if (apt->GetTime() == 0) {
            // there is no next run time; delete appointment
            tr_debug("[reschedule] NO next appointment");
            delete apt;
            req.response.error = 2;
            return req.response;
        }
        
        tr_debug("[reschedule] put");
        _updates.put(&req);
        tr_debug("[reschedule] get");
        // default is wait forever
        osEvent evt = req.resQueue.get();
        if (evt.status == osEventMessage) {
            if (evt.value.p != NULL) {
                tr_debug("[reschedule] completed ok");
            } else {
                tr_error("[reschedule] NOT added (C1)");
            }
        } else {
            // not sure what condition is
            tr_error("[reschedule] NOT added (C2)");
            delete apt;
            apt = NULL;
        }
        // yes, return a copy of the structure
        return req.response;        
    }
    
    void Scheduler::JobRemove(JobID jobID) {
        tr_error("JobRemove is not implemented");
    }
    
    void Scheduler::AppointmentList(LinkedList<Appointment>& apts) {
        JobListReq req(apts);
        _updates.put(&req);
        osEvent evt = req.resQueue.get();
        if (evt.status != osEventMessage) {
            // not sure what condition is
            tr_error("[JobList] status error");
        }
    }

    static Action jobRunReq(JobRunAT);
    void Scheduler::onWakeOnce()
    {
        _updates.put(&jobRunReq);
    }
    
    void Scheduler::updateHandler() {
        osThreadId_t tid = osThreadGetId();
        tr_debug("Scheduler monitor thread ID: 0x%X", tid);
        while (!_quitUpdater) {
            tr_debug("[updateHandler] waiting for action");
            // wait forever ...
            osEvent evt = _updates.get();
            if (evt.status == osEventMessage) {
                tr_debug("[updateHandler] process action");
                this->process((Action*)evt.value.p);
            } else {
                tr_error("[updateHandler] NOT osEventMessage");
            }
        }
    }
   
    void Scheduler::process(Action *action)
    {
        time_t now = time(NULL); // now in seconds
        switch(action->type) {
            case JobAddAT: {
                tr_debug("[process] JobAddAT");
                JobAddReq *req = static_cast<JobAddReq*>(action);
                Job *job = req->apt->GetJob();
                if (job->GetID() == 0) {
                    // assign job its ID
                    job->Init(_nextJobID++);
                    tr_debug("assigned new job its id %d", job->GetID());
                } else {
                    tr_debug("job already has id %d", job->GetID());
                }
                node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
                if (NULL == tmp) {
                    tr_error("[process] timeline insert failed for job ID %d", job->GetID());
                    action->resQueue.put(NULL);
                    // internal state has not changed
                    return;
                }
                req->response.data = job->GetID();
                //tr_debug("[process] simulate error");
                //action->resQueue.put(NULL);
                action->resQueue.put(&req->response);
                break;
            }
            case JobRunAT: {
                tr_debug("[process] JobRunAT");
                // execute job run logic after switch
                break;
            }
            case JobQuitAT: {
                tr_debug("[process] JobQuitAT");
                action->resQueue.put(NULL);
                _quitUpdater = true;
                return;
            }
            case JobListAT: {
                tr_debug("[process] JobListAT");
                JobListReq *req = static_cast<JobListReq*>(action);
                int loc = 1;
                node<Appointment>* n = _timeline.pop(loc);
                while (n != NULL) {
                    Appointment* apt = n->data;
                    tr_debug("[process] adding appointment with job ID %d to list", apt->GetJob()->GetID());
                    req->_jobsToFill.append(apt);
                    ++loc;
                    n = _timeline.pop(loc);
                }
                action->resQueue.put(NULL);
                return;
            }
            default:
                tr_warn("[process] unknown action type");
                action->resQueue.put(NULL);
        }
        node<Appointment> *wakeNode = _timeline.pop(1);
        if (wakeNode == NULL) {
            tr_debug("[process] found no nodes to run");
            return;
        }
        Appointment *wakeApt = wakeNode->data;
        Job* wakeJob = wakeApt->GetJob();
        if (now < wakeApt->GetTime()) {
            // request wake up
            time_t sleepTime = wakeApt->GetTime() - now;
            tr_debug("[process] job %d wake up in %d seconds", wakeJob->GetID(), sleepTime);
            WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
        } else {
            // process job
            tr_debug("[process] job ID %d ready to run", wakeJob->GetID());
            _timeline.remove(1);
            _runs.put(wakeApt);
            // make sure we run this function one more time
            // in case there are jobs left to run
            //
            // don't use WakeOnce, because it appears to work only for 
            // one wake up request at a time.
            _updates.put(&jobRunReq);
        }
    }
    
    void Scheduler::runHandler() {
        osThreadId_t tid = osThreadGetId();
        tr_debug("Scheduler runner thread ID: 0x%X", tid);
        while (!_quitRunner) {
            tr_debug("[runHandler] waiting for action");
            // wait forever ...
            osEvent evt = _runs.get();
            if (evt.status == osEventMessage) {
                tr_debug("[runHandler] run action");
                Appointment *apt = (Appointment*)evt.value.p;
                if (NULL == apt) {
                    tr_debug("[runHandler] quit requested");
                    _quitRunner = true;
                    break;
                }
                Job *job = apt->GetJob();
                JobType *jt = _jobService->GetJob(job->GetTypeID());
                if (jt == NULL) {
                    tr_error("[runHandler] NO FUNC for job type id %d", job->GetTypeID());
                    // NO reschedule
                    delete apt;
                    continue;
                }
                tr_debug("Job Started");
                jt->RunJob();
                tr_debug("Job Finished");
                this->reschedule(apt);
            } else {
                tr_error("[runHandler] NOT osEventMessage");
            }
        }
    }

        
}