A job scheduler in C++

Not long ago I started writing some C++ code, and a task that I enjoyed implementing was a very basic job scheduler (idea from dailycodingproblem.com). I’m sure there are “holes” to be filled in my implementation regarding performance, concurrency, and general correctness. This is an early dive into the language and this post is mostly for me, to explain some things to myself.

The scheduler takes a job (function) and a time (in milliseconds) and runs the job after the given time.

The first thing was to define a function pointer as the job type.

using Job = void (*)();

 

Another callback that I’ll be using is one to report job errors, which takes an exception as an argument.

using Error = void (*)(const std::exception &);

 

The scheduler class constructor takes a size and an error callback (to report errors). The size is the maximum number of jobs accepted until scheduling blocks and waits for a job to be finished.
An error callback is required. The first measure to ensure this is to delete the constructor that takes null for the error callback, which performs a compile-time check. Explicitly passing null will not be allowed, but a pointer that is null will be checked at runtime.

Scheduler(size_t size, Error error);
Scheduler(size_t size, nullptr_t) = delete;

 
The schedule method takes the job function and the time. This method blocks if the schedule maximum size is reached.

void schedule(Job f, long n);

 

After scheduling the jobs, the scheduler can wait until all the jobs are finished.

void wait();

 

A scheduler instance must not be copied to avoid owners and memory issues. My implementation is based on a condition variable that has deleted its copy constructor, thus the scheduler class cannot be copied. But I would not rely on an implementation detail for an interface aspect, so I explicitly deleted the copy constructor.

Scheduler(const Scheduler &) = delete;

 

The scheduling logic:

  • Take the job function and the time
  • If the maximum accepted number of jobs is reached, block the thread until one of the jobs is finished
  • When a job can be scheduled, open a thread for the function and call the function after time milliseconds
  • If an exception occurs, call the error callback
void Scheduler::schedule(const Job f, long n) {
    std::unique_lock<std::mutex> lock(this->mutex);
    condition.wait(lock, [this] { return this->count < this->size; });
    count++;

    auto job = std::make_shared<Job>(f);

    std::thread thread{
            [n, job, this] {
                std::this_thread::sleep_for(std::chrono::milliseconds(n));

                try {
                    (*job)();
                } catch (const std::exception &e) {
                    this->error(e);
                } catch (...) {
                    this->error(std::runtime_error("Unknown error"));
                }

                condition.notify_one();
                count--;
            }
    };
    thread.detach();
}

 

If waiting for all jobs to finish is needed, calling the wait method will block until the job count is zero.

void Scheduler::wait() {
    std::unique_lock<std::mutex> lock(this->mutex);
    condition.wait(lock, [this] { return this->count == 0; });
}

 

What can be improved? I’m sure many things: concurrency, passing the job exception to a promise, the scheduling itself, passing arguments to the job, and others.

Probably better concurrency abstraction could be used. I specifically wanted to try the condition variable. It’s an internal implementation decision that should be changed without altering the interface.

The job could be an interface, but a function is enough for this demo.

Finally, the full code with usage tested on Ubuntu 18.04 with GCC, C++11.

#include <iostream>
#include <memory>
#include <chrono>
#include <thread>
#include <condition_variable>

using Job = void (*)();

using Error = void (*)(const std::exception &);

class Scheduler {
public:
    Scheduler(size_t size, Error error);

    Scheduler(size_t size, nullptr_t) = delete;

    Scheduler(const Scheduler &) = delete;

    void schedule(Job f, long n);

    void wait();

    virtual ~Scheduler() = default;

private:
    std::condition_variable condition;
    std::mutex mutex;
    size_t size;
    const Error error;
    size_t count{};
};

Scheduler::Scheduler(size_t size, const Error error) : size(size), error(error) {
    if (error == nullptr) {
        throw std::runtime_error("non null error callback required");
    }
}

void Scheduler::schedule(const Job f, long n) {
    std::unique_lock<std::mutex> lock(this->mutex);
    condition.wait(lock, [this] { return this->count < this->size; });
    count++;

    auto job = std::make_shared<Job>(f);

    std::thread thread{
            [n, job, this] {
                std::this_thread::sleep_for(std::chrono::milliseconds(n));

                try {
                    (*job)();
                } catch (const std::exception &e) {
                    this->error(e);
                } catch (...) {
                    this->error(std::runtime_error("Unknown error"));
                }

                condition.notify_one();
                count--;
            }
    };
    thread.detach();
}

void Scheduler::wait() {
    std::unique_lock<std::mutex> lock(this->mutex);
    condition.wait(lock, [this] { return this->count == 0; });
}

int main() {
    auto start = std::chrono::high_resolution_clock::now();

    Scheduler scheduler(2, [](const std::exception &e) {
        std::cout << "Error: " << e.what() << std::endl;
    });

    scheduler.schedule([] { std::cout << 1 << std::endl; }, 1000);

    scheduler.schedule([] {
        std::cout << 2 << std::endl;
        throw "err";
    }, 150);

    scheduler.schedule([] {
        std::cout << 2 << std::endl;
        throw std::out_of_range("err");
    }, 1500);

    scheduler.schedule([] { std::cout << 3 << std::endl; }, 100);

    scheduler.schedule([] { std::cout << 4 << std::endl; }, 3000);

    scheduler.wait();

    auto stop = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
    std::cout << "Waited: " << duration.count() << std::endl;
}

2 thoughts on “A job scheduler in C++”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.