yat  0.17.3pre
Scheduler.h
1 #ifndef theplu_yat_utility_scheduler
2 #define theplu_yat_utility_scheduler
3 
4 // $Id: Scheduler.h 3855 2020-01-02 01:11:34Z peter $
5 
6 /*
7  Copyright (C) 2014, 2015, 2016, 2017, 2019, 2020 Peter Johansson
8 
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10 
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15 
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23 */
24 
25 #include "PriorityQueue.h"
26 #include "Queue.h"
27 
28 #include <boost/exception_ptr.hpp>
29 #include <boost/thread.hpp>
30 #include <boost/shared_ptr.hpp>
31 
32 #include <list>
33 #include <set>
34 #include <vector>
35 
36 namespace theplu {
37 namespace yat {
38 namespace utility {
39 
84  class Scheduler
85  {
86  public:
90  class Job
91  {
92  public:
98  explicit Job(unsigned int prio=0);
99 
103  virtual ~Job(void);
104 
108  unsigned int priority(void) const;
109 
113  virtual void operator()(void)=0;
114  private:
115  friend class Scheduler;
116  friend class JobHandler;
117  void add_prerequisite(const boost::shared_ptr<Job>&,
118  const boost::unique_lock<boost::mutex>& lock);
119  // \brief remove job from list of prerequisite
120  // \return number of prerequisite (after removal)
121  size_t remove_prerequisite(const boost::shared_ptr<Job>&,
122  const boost::unique_lock<boost::mutex>& lock);
123  // set of jobs that have to finish before this can run
124  std::set<boost::shared_ptr<Job> > prerequisite_;
125  // avoid accessing prerequisite_ directly but rather via
126  // functions below passing Dependency::Lock
127  std::set<boost::shared_ptr<Job> >&
128  prerequisite(const boost::unique_lock<boost::mutex>& lock);
129  const std::set<boost::shared_ptr<Job> >&
130  prerequisite(const boost::unique_lock<boost::mutex>& lock) const;
131  // - pristine is what it says
132  // - prepared - job has been either submitted directly a job that
133  // depends on job has been submitted et.c.
134  // - running - job has been sent to queue for workers to chew on
135  // - completed - job has returned
136  enum status_type { pristine, prepared, running, completed};
137  status_type status_;
138  status_type status(const boost::unique_lock<boost::mutex>& lock) const;
139  void status(const boost::unique_lock<boost::mutex>& lock, status_type s);
140  unsigned priority_;
141  unsigned id_;
142  boost::exception_ptr error_;
143  }; // end class Job
144 
150  Scheduler(unsigned int threads);
151 
161  void add_dependency(boost::shared_ptr<Job> job,
162  boost::shared_ptr<Job> prerequisite);
166  void interrupt(void);
167 
174  size_t jobs(void) const;
175 
182  void submit(const boost::shared_ptr<Job>& job);
183 
189  unsigned int threads(void) const;
190 
201  void threads(unsigned int n);
202 
206  void wait(void);
207 
208  private:
209  // forward declaration
210  class JobHandler;
211 
212  typedef boost::shared_ptr<Scheduler::Job> JobPtr;
213 
214  class Messenger
215  {
216  public:
217  virtual void operator()(JobHandler& handler)=0;
218  };
219 
220  typedef boost::shared_ptr<Messenger> MessengerPtr;
222 
223  class JobMessenger : public Messenger
224  {
225  public:
226  JobMessenger(const JobPtr&);
227  void operator()(JobHandler& handler);
228  private:
229  JobPtr job_;
230  };
231 
232 
233  class SchedulerIsWaiting : public Messenger
234  {
235  public:
236  void operator()(JobHandler& handler);
237  };
238 
239 
240  class InterruptWorkers : public Messenger
241  {
242  public:
243  void operator()(JobHandler& handler);
244  };
245 
246 
247  // Class used to change number of workers
248  class WorkForceSizer : public Messenger
249  {
250  public:
251  void operator()(JobHandler& handler);
252  };
253 
254 
255  struct LowerPriority
256  {
257  bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
258  };
259 
260  // some typedefs for convenience
262 
263  // \internal class that does the job
264  //
265  // It processes any job that is pushed to the \a queue until a
266  // NULL Job shows up which signals that the work is done. When
267  // NULL is observed the NULL Job is pushed to the queue so
268  // co-workers are notified too.
269  class Worker
270  {
271  public:
272  Worker(JobQueue& queue, MessengerQueue& completed);
273  void operator()(void);
274  private:
275  JobQueue& queue_;
276  MessengerQueue& completed_;
277  }; // end class Worker
278 
279 
280  class Dependency
281  {
282  public:
283  typedef boost::unique_lock<boost::mutex> Lock;
284 
285  void add(const JobPtr& child, const JobPtr& parent, const Lock& lock);
286  void remove(const JobPtr& parent, const Lock& lock);
287  std::vector<JobPtr>& children(const JobPtr& key, const Lock& lock);
288  boost::mutex& mutex(void);
289  private:
290  boost::mutex mutex_;
291 
292  // job in key has to finish before jobs in vector
293  std::map<JobPtr, std::vector<JobPtr> > children_;
294 
295  std::vector<JobPtr> empty_vector_;
296  };
297 
298 
299  class JobHandlerData
300  {
301  public:
303  class Count
304  {
305  public:
307  explicit Count(int x=0);
309  void decrement(void);
311  int get(void) const;
313  void increment(void);
315  void set(int x);
316  private:
317  mutable boost::mutex mutex_;
318  int x_;
319  };
320 
321  JobHandlerData(void);
322  Queue<boost::exception_ptr>& error(void) const;
323 
324  Dependency& dependency(void);
325  const Dependency& dependency(void) const;
326 
327  const MessengerQueue& messengers(void) const;
328  MessengerQueue& messengers(void); //
329  const JobQueue& queue(void) const; //
330  JobQueue& queue(void);
331 
332  const Count& running_jobs(void) const;
333  Count& running_jobs(void);
334 
335  const Count& n_threads(void) const;
336  Count& n_threads(void);
337  private:
338  mutable Queue<boost::exception_ptr> error_;
339  MessengerQueue messengers_;
340  // This is the queue workers are consuming from.
341  JobQueue queue_;
342 
343  boost::mutex mutex_;
344 
345  Count running_jobs_;
346  Count threads_;
347 
348  Dependency dependency_;
349  };
350 
351 
352  // \internal Class that handles job
353  class JobHandler
354  {
355  public:
356  JobHandler(JobHandlerData& data);
357 
358  void operator()(void);
359  void create_workers(unsigned int n);
360  JobHandlerData& data(void);
361  void interrupt_workers(void);
362  void kill_workers(void);
363  void kill_workers(unsigned int n);
364  // If \a job has parent jobs, which need to finish first, update
365  // map children_ to reflect that. If all parents have finished,
366  // send job to queue.
367  void process(JobPtr& job);
368  void remove_joined_workers(void);
369  void scheduler_is_waiting(bool value);
370  unsigned int n_target_workers(void) const;
371  void wait_workers(void) const;
372  private:
373 
374  // If job is ready to be submitted i.e. all prerequisite have
375  // completed, then submit job to queue for workers to chew on.
376  void prepare(JobPtr job, const Dependency::Lock& lock);
377  // handle jobs returned from worker
378  //
379  // function called when job has finished and returned from
380  // worker. If there are any jobs that depend on \a job, those jobs
381  // are notified and if it makes them ready to be processed they
382  // are sent to queue.
383  void post_process(JobPtr job, const Dependency::Lock& lock);
384 
385  void send2queue(JobPtr& job,
386  const boost::unique_lock<boost::mutex>& lock);
387 
388  JobHandlerData* data_;
389  bool scheduler_is_waiting_;
390  unsigned int job_count_;
391  void join_workers(void);
392 
393  typedef std::list<boost::shared_ptr<boost::thread> > WorkerList;
394  // FIXME is this needed here or can it be in
395  // JobHandler::operator() scope? The reason we keep it here is
396  // so we can access it from other functions and don't need to
397  // pass it around.
398  WorkerList workers_;
399  // Number of Workers minus number of poison pills sent for them
400  unsigned int n_target_workers_;
401  }; // end class JobHandler
402 
403  // If an error has been detected (and stored in error_), rethrow it.
404  void throw_if_error(void) const;
405 
406  mutable boost::mutex mutex_;
407  JobHandlerData data_;
408  boost::thread job_handler_;
409  }; // end class Scheduler
410 
411 }}}
412 #endif
Handle a number of jobs and send them to threads.
Definition: Scheduler.h:84
void submit(const boost::shared_ptr< Job > &job)
submit a job to Scheduler
Job(unsigned int prio=0)
constructor
The Department of Theoretical Physics namespace as we define it.
unsigned int threads(void) const
virtual ~Job(void)
destructor
Definition: Scheduler.h:90
thread-safe class around int
Definition: Scheduler.h:303
void interrupt(void)
interrrupt all jobs
void add_dependency(boost::shared_ptr< Job > job, boost::shared_ptr< Job > prerequisite)
add a dependency rule
void wait(void)
wait for all jobs to finish
unsigned int priority(void) const

Generated on Thu Aug 27 2020 03:33:18 for yat by  doxygen 1.8.11