yat  0.16.4pre
Scheduler.h
1 #ifndef theplu_yat_utility_scheduler
2 #define theplu_yat_utility_scheduler
3 
4 // $Id: Scheduler.h 3820 2019-07-16 00:05:25Z peter $
5 
6 /*
7  Copyright (C) 2014, 2015, 2016, 2017, 2019 Peter Johansson
8 
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10 
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15 
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23 */
24 
25 #include "PriorityQueue.h"
26 #include "Queue.h"
27 
28 #include <boost/exception_ptr.hpp>
29 #include <boost/thread.hpp>
30 #include <boost/shared_ptr.hpp>
31 
32 #include <set>
33 #include <deque>
34 
35 namespace theplu {
36 namespace yat {
37 namespace utility {
38 
83  class Scheduler
84  {
85  public:
89  class Job
90  {
91  public:
95  explicit Job(unsigned int prio=0);
96 
100  virtual ~Job(void);
101 
105  unsigned int priority(void) const;
106 
110  virtual void operator()(void)=0;
111  private:
112  friend class Scheduler;
113  friend class JobHandler;
114  mutable boost::mutex mutex_;
115  void add_prerequisite(const boost::shared_ptr<Job>&);
116  // \brief remove job from list of prerequisite
117  // \return number of prerequisite (after removal)
118  size_t remove_prerequisite(const boost::shared_ptr<Job>&);
119  void add_observer(const boost::shared_ptr<Job>&);
120 
121  // set of jobs that have to finish before this can run
122  std::set<boost::shared_ptr<Job> > prerequisite_;
123  // jobs that have *this as prerequisite
124  std::vector<boost::shared_ptr<Job> > observers_;
125  // - pristine is what it says
126  // - prepared - job has been either submitted directly a job that
127  // depends on job has been submitted et.c.
128  // - running - job has been sent to queue for workers to chew on
129  // - completed - job has returned
130  enum status { pristine, prepared, running, completed};
131  status status_;
132  unsigned priority_;
133  unsigned id_;
134  boost::exception_ptr error_;
135  }; // end class Job
136 
142  Scheduler(unsigned int threads);
143 
153  void add_dependency(boost::shared_ptr<Job> job,
154  boost::shared_ptr<Job> prerequisite);
158  void interrupt(void);
159 
166  size_t jobs(void) const;
167 
177  void submit(const boost::shared_ptr<Job>& job);
178 
182  void wait(void);
183 
184  private:
185  typedef boost::shared_ptr<Scheduler::Job> JobPtr;
186 
187  struct LowerPriority
188  {
189  bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
190  };
191 
192  // some typedefs for convenience
194 
195  // \internal class that does the job
196  //
197  // It processes any job that is pushed to the \a queue until a
198  // NULL Job shows up which signals that the work is done. When
199  // NULL is observed the NULL Job is pushed to the queue so
200  // co-workers are notified too.
201  class Worker
202  {
203  public:
204  Worker(JobQueue& queue, Queue<JobPtr>& completed);
205  void operator()(void);
206  private:
207  JobQueue& queue_;
208  Queue<JobPtr>& completed_;
209  }; // end class Worker
210 
211 
212  class JobHandlerData
213  {
214  public:
216  class Count
217  {
218  public:
220  explicit Count(int x=0);
222  void decrement(void);
224  int get(void) const;
226  void increment(void);
228  void set(int x);
229  private:
230  mutable boost::mutex mutex_;
231  int x_;
232  };
233 
234  JobHandlerData(unsigned int threads);
235  Queue<boost::exception_ptr>& error(void) const;
236 
237  const Queue<JobPtr>& jobs(void) const;
238  Queue<JobPtr>& jobs(void);
239  const JobQueue& queue(void) const;
240  JobQueue& queue(void);
241 
242  const Count& job_count(void) const;
243  Count& job_count(void);
244 
245  const Count& running_jobs(void) const;
246  Count& running_jobs(void);
247 
248  const Count& threads(void) const;
249  Count& threads(void);
250  private:
251  mutable Queue<boost::exception_ptr> error_;
252  Queue<JobPtr> jobs_;
253  JobQueue queue_;
254 
255  Count job_count_;
256  Count running_jobs_;
257  Count threads_;
258  };
259 
260 
261  // \internal Class that handles job
262  class JobHandler
263  {
264  public:
265  JobHandler(JobHandlerData& data);
266 
267  void operator()(void);
268  private:
269  void process(JobPtr& job);
270 
271  // If job is ready to be submitted i.e. all prerequisite have
272  // completed, then submit job to queue for workers to chew on.
273  void prepare(JobPtr job);
274  // handle jobs returned from worker
275  void post_process(JobPtr job);
276 
277  void send2queue(JobPtr& job);
278 
279  JobHandlerData* data_;
280  };
281 
282 
283  // function called when job has finished and returned from
284  // worker. If there are any jobs that depend on \a job, those jobs
285  // are notified and if it makes them ready to be processed they
286  // are sent to queue.
287  void post_process(boost::shared_ptr<Job> job);
288 
289  // If \a job has parent jobs, which need to finish first, update
290  // map children_ to reflect that. If all parents have finished,
291  // send job to queue.
292  void process(boost::shared_ptr<Job> job);
293 
294  // send job to queue
295  void queue(boost::shared_ptr<Job> job);
296 
297  // If an error has been detected (and stored in error_), rethrow it.
298  void throw_if_error(void) const;
299 
300  JobHandlerData data_;
301  boost::thread job_handler_;
302  }; // end class Scheduler
303 
304 }}}
305 #endif
Handle a number of jobs and send them to threads.
Definition: Scheduler.h:83
void submit(const boost::shared_ptr< Job > &job)
submit a job to Scheduler
Job(unsigned int prio=0)
constructor
The Department of Theoretical Physics namespace as we define it.
virtual ~Job(void)
destructor
Definition: Scheduler.h:89
thread-safe class around int
Definition: Scheduler.h:216
void interrupt(void)
interrrupt all jobs
void add_dependency(boost::shared_ptr< Job > job, boost::shared_ptr< Job > prerequisite)
add a dependency rule
void wait(void)
wait for all jobs to finish
unsigned int priority(void) const

Generated on Thu Dec 12 2019 03:12:08 for yat by  doxygen 1.8.11