1 #ifndef theplu_yat_utility_scheduler 2 #define theplu_yat_utility_scheduler 25 #include "PriorityQueue.h" 28 #include <boost/exception_ptr.hpp> 29 #include <boost/thread.hpp> 30 #include <boost/shared_ptr.hpp> 98 explicit Job(
unsigned int prio=0);
116 friend class JobHandler;
117 void add_prerequisite(
const boost::shared_ptr<Job>&,
118 const boost::unique_lock<boost::mutex>& lock);
121 size_t remove_prerequisite(
const boost::shared_ptr<Job>&,
122 const boost::unique_lock<boost::mutex>& lock);
124 std::set<boost::shared_ptr<Job> > prerequisite_;
127 std::set<boost::shared_ptr<Job> >&
128 prerequisite(
const boost::unique_lock<boost::mutex>& lock);
129 const std::set<boost::shared_ptr<Job> >&
130 prerequisite(
const boost::unique_lock<boost::mutex>& lock)
const;
136 enum status_type { pristine, prepared, running, completed};
138 status_type status(
const boost::unique_lock<boost::mutex>& lock)
const;
139 void status(
const boost::unique_lock<boost::mutex>& lock, status_type s);
142 boost::exception_ptr error_;
162 boost::shared_ptr<Job> prerequisite);
174 size_t jobs(
void)
const;
182 void submit(
const boost::shared_ptr<Job>& job);
189 unsigned int threads(
void)
const;
212 typedef boost::shared_ptr<Scheduler::Job> JobPtr;
217 virtual void operator()(JobHandler& handler)=0;
220 typedef boost::shared_ptr<Messenger> MessengerPtr;
223 class JobMessenger :
public Messenger
226 JobMessenger(
const JobPtr&);
233 class SchedulerIsWaiting :
public Messenger
240 class InterruptWorkers :
public Messenger
248 class WorkForceSizer :
public Messenger
257 bool operator()(
const JobPtr& lhs,
const JobPtr& rhs)
const;
272 Worker(JobQueue& queue, MessengerQueue& completed);
276 MessengerQueue& completed_;
283 typedef boost::unique_lock<boost::mutex> Lock;
285 void add(
const JobPtr& child,
const JobPtr& parent,
const Lock& lock);
286 void remove(
const JobPtr& parent,
const Lock& lock);
287 std::vector<JobPtr>& children(
const JobPtr& key,
const Lock& lock);
288 boost::mutex& mutex(
void);
293 std::map<JobPtr, std::vector<JobPtr> > children_;
295 std::vector<JobPtr> empty_vector_;
307 explicit Count(
int x=0);
309 void decrement(
void);
313 void increment(
void);
317 mutable boost::mutex mutex_;
321 JobHandlerData(
void);
324 Dependency& dependency(
void);
325 const Dependency& dependency(
void)
const;
327 const MessengerQueue& messengers(
void)
const;
328 MessengerQueue& messengers(
void);
329 const JobQueue& queue(
void)
const;
330 JobQueue& queue(
void);
332 const Count& running_jobs(
void)
const;
333 Count& running_jobs(
void);
335 const Count& n_threads(
void)
const;
336 Count& n_threads(
void);
339 MessengerQueue messengers_;
348 Dependency dependency_;
356 JobHandler(JobHandlerData& data);
359 void create_workers(
unsigned int n);
360 JobHandlerData& data(
void);
361 void interrupt_workers(
void);
362 void kill_workers(
void);
363 void kill_workers(
unsigned int n);
367 void process(JobPtr& job);
368 void remove_joined_workers(
void);
369 void scheduler_is_waiting(
bool value);
370 unsigned int n_target_workers(
void)
const;
371 void wait_workers(
void)
const;
376 void prepare(JobPtr job,
const Dependency::Lock& lock);
383 void post_process(JobPtr job,
const Dependency::Lock& lock);
385 void send2queue(JobPtr& job,
386 const boost::unique_lock<boost::mutex>& lock);
388 JobHandlerData* data_;
389 bool scheduler_is_waiting_;
390 unsigned int job_count_;
391 void join_workers(
void);
393 typedef std::list<boost::shared_ptr<boost::thread> > WorkerList;
400 unsigned int n_target_workers_;
404 void throw_if_error(
void)
const;
406 mutable boost::mutex mutex_;
407 JobHandlerData data_;
408 boost::thread job_handler_;
Handle a number of jobs and send them to threads.
Definition: Scheduler.h:84
void submit(const boost::shared_ptr< Job > &job)
submit a job to Scheduler
Job(unsigned int prio=0)
constructor
The Department of Theoretical Physics namespace as we define it.
unsigned int threads(void) const
virtual void operator()(void)=0
virtual ~Job(void)
destructor
Definition: Scheduler.h:90
thread-safe class around int
Definition: Scheduler.h:303
void interrupt(void)
interrrupt all jobs
void add_dependency(boost::shared_ptr< Job > job, boost::shared_ptr< Job > prerequisite)
add a dependency rule
void wait(void)
wait for all jobs to finish
unsigned int priority(void) const