yat  0.15.2pre
Scheduler.h
1 #ifndef theplu_yat_utility_scheduler
2 #define theplu_yat_utility_scheduler
3 
4 // $Id: Scheduler.h 3694 2017-09-23 11:35:31Z peter $
5 
6 /*
7  Copyright (C) 2014, 2015, 2016, 2017 Peter Johansson
8 
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10 
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15 
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23 */
24 
25 #include "config_public.h"
26 #include "PriorityQueue.h"
27 #include "Queue.h"
28 
29 #include <boost/exception_ptr.hpp>
30 #include <boost/thread.hpp>
31 #include <boost/shared_ptr.hpp>
32 
33 #ifdef YAT_HAVE_ATOMIC
34 #include <atomic>
35 #endif
36 #include <set>
37 #include <deque>
38 
39 namespace theplu {
40 namespace yat {
41 namespace utility {
42 
87  class Scheduler
88  {
89 #ifdef YAT_HAVE_ATOMIC
90  typedef std::atomic<int> running_jobs_type;
91 #else
92  typedef int running_jobs_type;
93 #endif
94  public:
98  class Job
99  {
100  public:
104  explicit Job(unsigned int prio=0);
105 
109  virtual ~Job(void);
110 
114  unsigned int priority(void) const;
115 
119  virtual void operator()(void)=0;
120  private:
121  friend class Scheduler;
122  friend class JobHandler;
123  mutable boost::mutex mutex_;
124  void add_prerequisite(const boost::shared_ptr<Job>&);
125  // \brief remove job from list of prerequisite
126  // \return number of prerequisite (after removal)
127  size_t remove_prerequisite(const boost::shared_ptr<Job>&);
128  void add_observer(const boost::shared_ptr<Job>&);
129 
130  // set of jobs that have to finish before this can run
131  std::set<boost::shared_ptr<Job> > prerequisite_;
132  // jobs that have *this as prerequisite
133  std::vector<boost::shared_ptr<Job> > observers_;
134  // - pristine is what it says
135  // - prepared - job has been either submitted directly a job that
136  // depends on job has been submitted et.c.
137  // - running - job has been sent to queue for workers to chew on
138  // - completed - job has returned
139  enum status { pristine, prepared, running, completed};
140  status status_;
141  unsigned priority_;
142  unsigned id_;
143  boost::exception_ptr error_;
144  }; // end class Job
145 
151  Scheduler(unsigned int threads);
152 
162  void add_dependency(boost::shared_ptr<Job> job,
163  boost::shared_ptr<Job> prerequisite);
167  void interrupt(void);
168 
175  size_t jobs(void) const;
176 
186  void submit(const boost::shared_ptr<Job>& job);
187 
191  void wait(void);
192 
193  private:
194  typedef boost::shared_ptr<Scheduler::Job> JobPtr;
195 
196  struct LowerPriority
197  {
198  bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
199  };
200 
201  // some typedefs for convenience
203 
204  // \internal class that does the job
205  //
206  // It processes any job that is pushed to the \a queue until a
207  // NULL Job shows up which signals that the work is done. When
208  // NULL is observed the NULL Job is pushed to the queue so
209  // co-workers are notified too.
210  class Worker
211  {
212  public:
213  Worker(JobQueue& queue, Queue<JobPtr>& completed);
214  void operator()(void);
215  private:
216  JobQueue& queue_;
217  Queue<JobPtr>& completed_;
218  }; // end class Worker
219 
220  // \internal Class that handles job
221  class JobHandler
222  {
223  public:
224  JobHandler(unsigned int threads, JobQueue& queue, Queue<JobPtr>& jobs,
225  running_jobs_type& running_jobs,
227 
228  void operator()(void);
229  private:
230  void process(JobPtr& job);
231 
232  // If job is ready to be submitted i.e. all prerequisite have
233  // completed, then submit job to queue for workers to chew on.
234  void prepare(JobPtr job);
235  // handle jobs returned from worker
236  void post_process(JobPtr job);
237 
238  void send2queue(JobPtr& job);
239  unsigned int threads_;
240  JobQueue& queue_;
241  Queue<JobPtr>& jobs_;
242  running_jobs_type& running_jobs_;
244  int job_counter_;
245  };
246 
247 
248  // function called when job has finished and returned from
249  // worker. If there are any jobs that depend on \a job, those jobs
250  // are notified and if it makes them ready to be processed they
251  // are sent to queue.
252  void post_process(boost::shared_ptr<Job> job);
253 
254  // If \a job has parent jobs, which need to finish first, update
255  // map children_ to reflect that. If all parents have finished,
256  // send job to queue.
257  void process(boost::shared_ptr<Job> job);
258 
259  // send job to queue
260  void queue(boost::shared_ptr<Job> job);
261 
262  // If an error has been detected (and stored in error_), rethrow it.
263  void throw_if_error(void) const;
264 
265  JobQueue queue_;
266  Queue<JobPtr> jobs_;
267  running_jobs_type running_jobs_;
268  mutable Queue<boost::exception_ptr> error_;
269  boost::thread job_handler_;
270  }; // end class Scheduler
271 
272 }}}
273 #endif
Handle a number of jobs and send them to threads.
Definition: Scheduler.h:87
void submit(const boost::shared_ptr< Job > &job)
submit a job to Scheduler
Job(unsigned int prio=0)
constructor
The Department of Theoretical Physics namespace as we define it.
virtual ~Job(void)
destructor
Definition: Scheduler.h:98
void interrupt(void)
interrrupt all jobs
void add_dependency(boost::shared_ptr< Job > job, boost::shared_ptr< Job > prerequisite)
add a dependency rule
void wait(void)
wait for all jobs to finish
unsigned int priority(void) const

Generated on Fri Jul 13 2018 02:33:27 for yat by  doxygen 1.8.11