Sergei G / JobScheduler

Dependencies:   LinkedList

Dependents:   JobSchedulerDemo Borsch

scheduler.cpp

Committer:
sgnezdov
Date:
2017-07-11
Revision:
2:9bf5366ad5a2
Parent:
1:ec6a1d054065
Child:
3:f08f55827736

File content as of revision 2:9bf5366ad5a2:

#include "scheduler.h"

void update(void *target) {
};

namespace JobScheduler {

    bool descendingTimeline(Appointment *a1, Appointment *a2)
    {
        bool rv = a1->GetTime() <= a2->GetTime();
        //printf("(%d %d:%d)", *d1, *d2, rv);
        return rv;
    };

    /**
    JobAddReq adds new job to the scheduler.
    */
    struct JobAddReq: Action {
        Appointment *apt;
        Response<JobID> response;
        JobAddReq(Appointment *a) : Action(JobAddAT), apt(a), response(NoError, 0) {}
    };
    
    Scheduler::Scheduler(JobService *jobService) 
    : _jobService(jobService), _nextJobID(1) {   }

    void Scheduler::updateAdapter(void *thisPointer) {
        Scheduler *self = static_cast<Scheduler*>(thisPointer);
        self->updateHandler();
    }
    
    void Scheduler::Start() {
        _updater.start(callback(Scheduler::updateAdapter, this));
    }
    
    void Scheduler::Stop() {
        // it is not thread-safe, but impact is non-existent.
        _quit = true;
    }
    
    void Scheduler::WaitToStop() {
        _updater.join();
    }
    
    Response<JobID> Scheduler::JobAdd(JobTypeID jobTID, ISchedule *schedule, IJobData *data) {
        Appointment *apt = new Appointment(jobTID, schedule, data, time_t(0));
        if (NULL == apt) {
            printf("[Scheduler::JobAdd] failed to allocate appointment\n");
            return Response<JobID>(1, 0);
        }
        JobAddReq req(apt);
        _actions.put(&req);
        // default is wait forever
        osEvent evt = req.resQueue.get();
        if (evt.status == osEventMessage) {
            if (evt.value.p != NULL) {
                printf("[Scheduler::JobAdd] completed ok\n");
            } else {
                printf("[Scheduler::JobAdd] NOT added (C1)\n");
            }
        } else {
            // not sure what condition is
            printf("[Scheduler::JobAdd] NOT added (C2)\n");
            delete apt;
            apt = NULL;
        }
        // yes, return a copy of the structure
        return req.response;
    }
    
    void Scheduler::JobRemove(JobID jobID) {
    }
    
    void Scheduler::updateHandler() {
        while (!_quit) {
            printf("[Scheduler::updateHandler] waiting for action\n");
            // wait forever ...
            osEvent evt = _actions.get();
            if (evt.status == osEventMessage) {
                printf("[Scheduler::updateHandler] process action\n");
                this->process((Action*)evt.value.p);
            } else {
                printf("[Scheduler::updateHandler] NOT osEventMessage\n");
            }
            wait(2);
        }
    }

    void Scheduler::process(Action *action)
    {
        switch(action->type) {
            case JobAddAT: {
                JobAddReq *req = static_cast<JobAddReq*>(action);
                Job *job = req->apt->GetJob();
                if (job->GetID() == 0) {
                    // assign job its ID
                    job->Init(_nextJobID++);
                }
                node<Appointment> *tmp = _timeline.insertOrdered(req->apt, descendingTimeline);
                if (NULL == tmp) {
                    printf("[Scheduler::process] timeline insert failed\n");
                    action->resQueue.put(NULL);
                    return;
                }
                //printf("[Scheduler::process] simulate error\n");
                //action->resQueue.put(NULL);
                action->resQueue.put(&req->response);
                break;
            }
            default:
                printf("[Scheduler::process] unknown action type\n");
                action->resQueue.put(NULL);
        }
    }
        
}