yat  0.21pre
Scheduler.h
1 #ifndef theplu_yat_utility_scheduler
2 #define theplu_yat_utility_scheduler
3 
4 // $Id: Scheduler.h 3948 2020-07-20 07:12:46Z peter $
5 
6 /*
7  Copyright (C) 2014, 2015, 2016, 2017, 2019, 2020 Peter Johansson
8 
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10 
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15 
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23 */
24 
25 #include "PriorityQueue.h"
26 #include "Queue.h"
27 
28 #include <boost/exception_ptr.hpp>
29 #include <boost/shared_ptr.hpp>
30 
31 #include <list>
32 #include <mutex>
33 #include <set>
34 #include <thread>
35 #include <vector>
36 
37 namespace theplu {
38 namespace yat {
39 namespace utility {
40 
85  class Scheduler
86  {
87  public:
91  class Job
92  {
93  public:
99  explicit Job(unsigned int prio=0);
100 
104  virtual ~Job(void);
105 
109  unsigned int priority(void) const;
110 
114  virtual void operator()(void)=0;
115  private:
116  friend class Scheduler;
117  friend class JobHandler;
118  void add_prerequisite(const boost::shared_ptr<Job>&,
119  const std::unique_lock<std::mutex>& lock);
120  // \brief remove job from list of prerequisite
121  // \return number of prerequisite (after removal)
122  size_t remove_prerequisite(const boost::shared_ptr<Job>&,
123  const std::unique_lock<std::mutex>& lock);
124  // set of jobs that have to finish before this can run
125  std::set<boost::shared_ptr<Job> > prerequisite_;
126  // avoid accessing prerequisite_ directly but rather via
127  // functions below passing Dependency::Lock
128  std::set<boost::shared_ptr<Job> >&
129  prerequisite(const std::unique_lock<std::mutex>& lock);
130  const std::set<boost::shared_ptr<Job> >&
131  prerequisite(const std::unique_lock<std::mutex>& lock) const;
132  // - pristine is what it says
133  // - prepared - job has been either submitted directly a job that
134  // depends on job has been submitted et.c.
135  // - running - job has been sent to queue for workers to chew on
136  // - completed - job has returned
137  enum status_type { pristine, prepared, running, completed};
138  status_type status_;
139  status_type status(const std::unique_lock<std::mutex>& lock) const;
140  void status(const std::unique_lock<std::mutex>& lock, status_type s);
141  unsigned priority_;
142  unsigned id_;
143  boost::exception_ptr error_;
144  }; // end class Job
145 
151  Scheduler(unsigned int threads);
152 
162  void add_dependency(boost::shared_ptr<Job> job,
163  boost::shared_ptr<Job> prerequisite);
167  void interrupt(void);
168 
175  size_t jobs(void) const;
176 
183  void submit(const boost::shared_ptr<Job>& job);
184 
190  unsigned int threads(void) const;
191 
202  void threads(unsigned int n);
203 
207  void wait(void);
208 
209  private:
210  // forward declaration
211  class JobHandler;
212 
213  typedef boost::shared_ptr<Scheduler::Job> JobPtr;
214 
215  class Messenger
216  {
217  public:
218  virtual void operator()(JobHandler& handler)=0;
219  };
220 
221  typedef boost::shared_ptr<Messenger> MessengerPtr;
222  typedef Queue<MessengerPtr> MessengerQueue;
223 
224  class JobMessenger : public Messenger
225  {
226  public:
227  JobMessenger(const JobPtr&);
228  void operator()(JobHandler& handler);
229  private:
230  JobPtr job_;
231  };
232 
233 
234  class SchedulerIsWaiting : public Messenger
235  {
236  public:
237  void operator()(JobHandler& handler);
238  };
239 
240 
241  class InterruptWorkers : public Messenger
242  {
243  public:
244  void operator()(JobHandler& handler);
245  };
246 
247 
248  // Class used to change number of workers
249  class WorkForceSizer : public Messenger
250  {
251  public:
252  void operator()(JobHandler& handler);
253  };
254 
255 
256  struct LowerPriority
257  {
258  bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
259  };
260 
261  // some typedefs for convenience
262  typedef PriorityQueue<JobPtr, LowerPriority> JobQueue;
263 
264  // \internal class that does the job
265  //
266  // It processes any job that is pushed to the \a queue until a
267  // NULL Job shows up which signals that the work is done. When
268  // NULL is observed the NULL Job is pushed to the queue so
269  // co-workers are notified too.
270  class Worker
271  {
272  public:
273  Worker(JobQueue& queue, MessengerQueue& completed);
274  void operator()(void);
275  private:
276  JobQueue& queue_;
277  MessengerQueue& completed_;
278  }; // end class Worker
279 
280 
281  class Dependency
282  {
283  public:
284  typedef std::unique_lock<std::mutex> Lock;
285 
286  void add(const JobPtr& child, const JobPtr& parent, const Lock& lock);
287  void remove(const JobPtr& parent, const Lock& lock);
288  std::vector<JobPtr>& children(const JobPtr& key, const Lock& lock);
289  std::mutex& mutex(void);
290  private:
291  std::mutex mutex_;
292 
293  // job in key has to finish before jobs in vector
294  std::map<JobPtr, std::vector<JobPtr> > children_;
295 
296  std::vector<JobPtr> empty_vector_;
297  };
298 
299 
300  class JobHandlerData
301  {
302  public:
304  class Count
305  {
306  public:
308  explicit Count(int x=0);
310  void decrement(void);
312  int get(void) const;
314  void increment(void);
316  void set(int x);
317  private:
318  mutable std::mutex mutex_;
319  int x_;
320  };
321 
322  JobHandlerData(void);
323  Queue<boost::exception_ptr>& error(void) const;
324 
325  Dependency& dependency(void);
326  const Dependency& dependency(void) const;
327 
328  const MessengerQueue& messengers(void) const;
329  MessengerQueue& messengers(void); //
330  const JobQueue& queue(void) const; //
331  JobQueue& queue(void);
332 
333  const Count& running_jobs(void) const;
334  Count& running_jobs(void);
335 
336  const Count& n_threads(void) const;
337  Count& n_threads(void);
338  private:
339  mutable Queue<boost::exception_ptr> error_;
340  MessengerQueue messengers_;
341  // This is the queue workers are consuming from.
342  JobQueue queue_;
343 
344  std::mutex mutex_;
345 
346  Count running_jobs_;
347  Count threads_;
348 
349  Dependency dependency_;
350  };
351 
352 
353  // \internal Class that handles job
354  class JobHandler
355  {
356  public:
357  JobHandler(JobHandlerData& data);
358 
359  void operator()(void);
360  void create_workers(unsigned int n);
361  JobHandlerData& data(void);
362  void interrupt_workers(void);
363  void kill_workers(void);
364  void kill_workers(unsigned int n);
365  // If \a job has parent jobs, which need to finish first, update
366  // map children_ to reflect that. If all parents have finished,
367  // send job to queue.
368  void process(JobPtr& job);
369  void remove_joined_workers(void);
370  void scheduler_is_waiting(bool value);
371  unsigned int n_target_workers(void) const;
372  void wait_workers(void) const;
373  private:
374 
375  // If job is ready to be submitted i.e. all prerequisite have
376  // completed, then submit job to queue for workers to chew on.
377  void prepare(JobPtr job, const Dependency::Lock& lock);
378  // handle jobs returned from worker
379  //
380  // function called when job has finished and returned from
381  // worker. If there are any jobs that depend on \a job, those jobs
382  // are notified and if it makes them ready to be processed they
383  // are sent to queue.
384  void post_process(JobPtr job, const Dependency::Lock& lock);
385 
386  void send2queue(JobPtr& job,
387  const std::unique_lock<std::mutex>& lock);
388 
389  JobHandlerData* data_;
390  bool scheduler_is_waiting_;
391  unsigned int job_count_;
392  void join_workers(void);
393 
394  typedef std::list<boost::shared_ptr<std::thread> > WorkerList;
395  // We keep workers here (rather than in JobHandler::operator()
396  // scope), so we can access it from other functions and don't
397  // need to pass it around.
398  WorkerList workers_;
399  // Number of Workers minus number of poison pills sent for them
400  unsigned int n_target_workers_;
401  }; // end class JobHandler
402 
403  // If an error has been detected (and stored in error_), rethrow it.
404  void throw_if_error(void) const;
405 
406  mutable std::mutex mutex_;
407  JobHandlerData data_;
408  std::thread job_handler_;
409  }; // end class Scheduler
410 
411 }}}
412 #endif
Handle a number of jobs and send them to threads.
Definition: Scheduler.h:85
void submit(const boost::shared_ptr< Job > &job)
submit a job to Scheduler
Job(unsigned int prio=0)
constructor
The Department of Theoretical Physics namespace as we define it.
unsigned int threads(void) const
unsigned int priority(void) const
virtual ~Job(void)
destructor
Definition: Scheduler.h:91
thread-safe class around int
Definition: Scheduler.h:304
void add(T &o, ForwardIterator first, ForwardIterator last, const classifier::Target &target)
Definition: utility.h:320
void interrupt(void)
interrrupt all jobs
void add_dependency(boost::shared_ptr< Job > job, boost::shared_ptr< Job > prerequisite)
add a dependency rule
Scheduler(unsigned int threads)
constructor
void wait(void)
wait for all jobs to finish

Generated on Wed Jan 25 2023 03:34:29 for yat by  doxygen 1.8.14