job scheduler works with run once and run periodic schedules. Stop logic is not fully thought through.

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

scheduler.cpp

Committer:
sgnezdov
Date:
2017-07-11
Revision:
6:5baa0e4ec500
Parent:
5:d8f69ac330f2
Child:
7:98c8b2eabea3

File content as of revision 6:5baa0e4ec500:

#include "scheduler.h"

Timeout WakeOnce;

void update(void *target) {
};

namespace JobScheduler {

    const ActionType JobAddAT(1);
    const ActionType JobRunAT(3);

    bool descendingTimeline(Appointment *a1, Appointment *a2)
    {
        bool rv = a1->GetTime() <= a2->GetTime();
        //printf("(%d %d:%d)", *d1, *d2, rv);
        return rv;
    };
    
    /**
    JobAddReq adds new job to the scheduler.
    */
    struct JobAddReq: Action {
        Appointment *apt;
        Response<JobID> response;
        JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
    };
    
    struct JobRunReq: Action {
        JobRunReq() : Action(JobRunAT) {}
    };
       
    Scheduler::Scheduler(JobService *jobService) 
    : _jobService(jobService), _nextJobID(1) {   }

    void Scheduler::updateAdapter(void *thisPointer) {
        Scheduler *self = static_cast<Scheduler*>(thisPointer);
        self->updateHandler();
    }

    void Scheduler::runAdapter(void *thisPointer) {
        Scheduler *self = static_cast<Scheduler*>(thisPointer);
        self->runHandler();
    }
    
    void Scheduler::Start() {
        _updater.start(callback(Scheduler::updateAdapter, this));
        _runner.start(callback(Scheduler::runAdapter, this));
    }
    
    void Scheduler::Stop() {
        // it is not thread-safe, but impact is non-existent.
        _quit = true;
    }
    
    void Scheduler::WaitToStop() {
        _updater.join();
        _runner.join();
    }
    
    Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
        Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
        if (NULL == apt) {
            printf("[Scheduler::JobAdd] failed to allocate appointment\n");
            return Response<JobID>(1, 0);
        }
        return this->reschedule(apt);
    }
    
    Response<JobID> Scheduler::reschedule(Appointment *apt) {
        JobAddReq req(apt);
        
        // set next appointment time
        time_t now = time(NULL); // now in seconds
        apt->SetTime(apt->GetJob()->GetSchedule()->NextRunTime(now));
        if (apt->GetTime() == 0) {
            // there is no next run time; delete appointment
            printf("[Scheduler::reschedule] NO next appointment\n");
            delete apt;
            req.response.error = 2;
            return req.response;
        }
        
        printf("[Scheduler::reschedule] put\n");
        _updates.put(&req);
        printf("[Scheduler::reschedule] get\n");
        // default is wait forever
        osEvent evt = req.resQueue.get();
        if (evt.status == osEventMessage) {
            if (evt.value.p != NULL) {
                printf("[Scheduler::reschedule] completed ok\n");
            } else {
                printf("[Scheduler::reschedule] NOT added (C1)\n");
            }
        } else {
            // not sure what condition is
            printf("[Scheduler::reschedule] NOT added (C2)\n");
            delete apt;
            apt = NULL;
        }
        // yes, return a copy of the structure
        return req.response;        
    }
    
    void Scheduler::JobRemove(JobID jobID) {
    }

    static JobRunReq jobRunReq;
    void Scheduler::onWakeOnce()
    {
        _updates.put(&jobRunReq);
    }
    
    void Scheduler::updateHandler() {
        while (!_quit) {
            printf("[Scheduler::updateHandler] waiting for action\n");
            // wait forever ...
            osEvent evt = _updates.get();
            if (evt.status == osEventMessage) {
                printf("[Scheduler::updateHandler] process action\n");
                this->process((Action*)evt.value.p);
            } else {
                printf("[Scheduler::updateHandler] NOT osEventMessage\n");
            }
        }
    }
   
    void Scheduler::process(Action *action)
    {
        time_t now = time(NULL); // now in seconds
        switch(action->type) {
            case JobAddAT: {
                printf("[Scheduler::process] JobAddAT\n");
                JobAddReq *req = static_cast<JobAddReq*>(action);
                Job *job = req->apt->GetJob();
                if (job->GetID() == 0) {
                    // assign job its ID
                    job->Init(_nextJobID++);
                }
                node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
                if (NULL == tmp) {
                    printf("[Scheduler::process] timeline insert failed\n");
                    action->resQueue.put(NULL);
                    // internal state has not changed
                    return;
                }
                req->response.data = job->GetID();
                //printf("[Scheduler::process] simulate error\n");
                //action->resQueue.put(NULL);
                action->resQueue.put(&req->response);
                break;
            }
            case JobRunAT: {
                printf("[Scheduler::process] JobRunAT\n");
                // execute job run logic after switch
                break;
            }
            default:
                printf("[Scheduler::process] unknown action type\n");
                action->resQueue.put(NULL);
        }
        node<Appointment> *wakeNode = _timeline.pop(1);
        Appointment *wakeApt = wakeNode->data;
        Job* wakeJob = wakeApt->GetJob();
        if (now < wakeApt->GetTime()) {
            // request wake up
            time_t sleepTime = wakeApt->GetTime() - now;
            printf("[Scheduler::process] job %d wake up in %d seconds\n", wakeJob->GetID(), sleepTime);
            WakeOnce.attach(callback(this, &Scheduler::onWakeOnce), sleepTime);
        } else {
            // process job
            printf("[Scheduler::process] job ID %d ready to run\n", wakeJob->GetID());
            _timeline.remove(1);
            _runs.put(wakeApt);
        }
    }
    
    void Scheduler::runHandler() {
        while (!_quit) {
            printf("[Scheduler::runHandler] waiting for action\n");
            // wait forever ...
            osEvent evt = _runs.get();
            if (evt.status == osEventMessage) {
                printf("[Scheduler::runHandler] run action\n");
                Appointment *apt = (Appointment*)evt.value.p;
                Job *job = apt->GetJob();
                jobFunc *f = _jobService->GetJob(job->GetTypeID());
                if (f == NULL) {
                    printf("[Scheduler::runHandler] NO FUNC for job type id %d\n", job->GetTypeID());
                    // NO reschedule
                    delete apt;
                    continue;
                }
                printf("Job Started\n");
                f();
                printf("Job Finished\n");
                this->reschedule(apt);
            } else {
                printf("[Scheduler::runHandler] NOT osEventMessage\n");
            }
        }
    }

        
}