yat/utility/Scheduler.cc

Code
Comments
Other
Rev Date Author Line
3346 06 Nov 14 peter 1 // $Id$
3346 06 Nov 14 peter 2
3346 06 Nov 14 peter 3 /*
4164 13 Mar 22 peter 4   Copyright (C) 2014, 2015, 2017, 2019, 2020, 2022 Peter Johansson
3346 06 Nov 14 peter 5
3346 06 Nov 14 peter 6   This file is part of the yat library, http://dev.thep.lu.se/yat
3346 06 Nov 14 peter 7
3346 06 Nov 14 peter 8   The yat library is free software; you can redistribute it and/or
3346 06 Nov 14 peter 9   modify it under the terms of the GNU General Public License as
3346 06 Nov 14 peter 10   published by the Free Software Foundation; either version 3 of the
3346 06 Nov 14 peter 11   License, or (at your option) any later version.
3346 06 Nov 14 peter 12
3346 06 Nov 14 peter 13   The yat library is distributed in the hope that it will be useful,
3346 06 Nov 14 peter 14   but WITHOUT ANY WARRANTY; without even the implied warranty of
3346 06 Nov 14 peter 15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
3346 06 Nov 14 peter 16   General Public License for more details.
3346 06 Nov 14 peter 17
3346 06 Nov 14 peter 18   You should have received a copy of the GNU General Public License
3346 06 Nov 14 peter 19   along with yat. If not, see <http://www.gnu.org/licenses/>.
3346 06 Nov 14 peter 20 */
3346 06 Nov 14 peter 21
3346 06 Nov 14 peter 22 #include <config.h>
3346 06 Nov 14 peter 23
3346 06 Nov 14 peter 24 #include "Scheduler.h"
3346 06 Nov 14 peter 25
3423 15 Sep 15 peter 26 #include <boost/exception/all.hpp>
3848 23 Sep 19 peter 27 #include <boost/make_shared.hpp>
3405 06 Apr 15 peter 28
3346 06 Nov 14 peter 29 #include <cassert>
3346 06 Nov 14 peter 30
3346 06 Nov 14 peter 31 namespace theplu {
3346 06 Nov 14 peter 32 namespace yat {
3346 06 Nov 14 peter 33 namespace utility {
3346 06 Nov 14 peter 34
3848 23 Sep 19 peter 35   Scheduler::Scheduler(unsigned int n_threads)
3848 23 Sep 19 peter 36     : job_handler_(JobHandler(data_))
3346 06 Nov 14 peter 37   {
3848 23 Sep 19 peter 38     assert(n_threads);
3848 23 Sep 19 peter 39     this->threads(n_threads);
3346 06 Nov 14 peter 40   }
3346 06 Nov 14 peter 41
3346 06 Nov 14 peter 42
3401 31 Mar 15 peter 43   void Scheduler::add_dependency(boost::shared_ptr<Job> job,
3401 31 Mar 15 peter 44                                  boost::shared_ptr<Job> prerequisite)
3401 31 Mar 15 peter 45   {
3848 23 Sep 19 peter 46     throw_if_error();
3853 01 Jan 20 peter 47     Dependency::Lock lock(data_.dependency().mutex());
3853 01 Jan 20 peter 48     assert(job->status(lock) == Job::pristine);
3853 01 Jan 20 peter 49     data_.dependency().add(job, prerequisite, lock);
3401 31 Mar 15 peter 50   }
3401 31 Mar 15 peter 51
3401 31 Mar 15 peter 52
3405 06 Apr 15 peter 53   void Scheduler::interrupt(void)
3405 06 Apr 15 peter 54   {
3828 23 Jul 19 peter 55     throw_if_error();
3828 23 Jul 19 peter 56     if (!job_handler_.joinable())
3828 23 Jul 19 peter 57       return;
3828 23 Jul 19 peter 58
3853 01 Jan 20 peter 59     data_.messengers().push(boost::make_shared<InterruptWorkers>());
4164 13 Mar 22 peter 60     // wait for job handler to finish
4164 13 Mar 22 peter 61     job_handler_.join();
4164 13 Mar 22 peter 62     throw_if_error();
3405 06 Apr 15 peter 63   }
3405 06 Apr 15 peter 64
3405 06 Apr 15 peter 65
3679 18 Aug 17 peter 66   size_t Scheduler::jobs(void) const
3679 18 Aug 17 peter 67   {
3694 23 Sep 17 peter 68     throw_if_error();
3848 23 Sep 19 peter 69     return data_.running_jobs().get();
3679 18 Aug 17 peter 70   }
3679 18 Aug 17 peter 71
3679 18 Aug 17 peter 72
3681 22 Aug 17 peter 73   void Scheduler::submit(const JobPtr& job)
3346 06 Nov 14 peter 74   {
3694 23 Sep 17 peter 75     throw_if_error();
3828 23 Jul 19 peter 76     // if JobHandler is not joinable implies it is not executing,
3828 23 Jul 19 peter 77     // launch a new thread executing a JobHandler.
3828 23 Jul 19 peter 78     if (!job_handler_.joinable()) {
3848 23 Sep 19 peter 79
3828 23 Jul 19 peter 80       /*
3828 23 Jul 19 peter 81         We cannot relaunch a thread, so instead we create a new thread
3828 23 Jul 19 peter 82         and move its guts to our member variable, job_handler_.
3828 23 Jul 19 peter 83        */
3948 20 Jul 20 peter 84       job_handler_ = std::thread(JobHandler(data_));
3828 23 Jul 19 peter 85     }
3848 23 Sep 19 peter 86     data_.messengers().push(boost::make_shared<JobMessenger>(job));
3346 06 Nov 14 peter 87   }
3346 06 Nov 14 peter 88
3346 06 Nov 14 peter 89
3848 23 Sep 19 peter 90   unsigned int Scheduler::threads(void) const
3826 18 Jul 19 peter 91   {
3848 23 Sep 19 peter 92     return data_.n_threads().get();
3826 18 Jul 19 peter 93   }
3826 18 Jul 19 peter 94
3826 18 Jul 19 peter 95
3848 23 Sep 19 peter 96   void Scheduler::threads(unsigned int n)
3848 23 Sep 19 peter 97   {
4204 26 Aug 22 peter 98     throw_if_error();
3848 23 Sep 19 peter 99     data_.n_threads().set(n);
3848 23 Sep 19 peter 100     if (job_handler_.joinable())
3848 23 Sep 19 peter 101       data_.messengers().push(boost::make_shared<WorkForceSizer>());
3848 23 Sep 19 peter 102     assert(threads() == n);
3848 23 Sep 19 peter 103   }
3848 23 Sep 19 peter 104
3848 23 Sep 19 peter 105
3694 23 Sep 17 peter 106   void Scheduler::throw_if_error(void) const
3346 06 Nov 14 peter 107   {
3681 22 Aug 17 peter 108     boost::exception_ptr error;
4164 13 Mar 22 peter 109     if (data_.error().try_pop(error)) {
4164 13 Mar 22 peter 110       if (job_handler_.joinable()) {
4164 13 Mar 22 peter 111         // let the job handler know we're waiting
4164 13 Mar 22 peter 112         const_cast<JobHandlerData&>(data_).messengers().push(boost::make_shared<SchedulerIsWaiting>());
4164 13 Mar 22 peter 113         const_cast<std::thread&>(job_handler_).join();
4164 13 Mar 22 peter 114       }
3681 22 Aug 17 peter 115       boost::rethrow_exception(error);
4164 13 Mar 22 peter 116     }
3346 06 Nov 14 peter 117   }
3346 06 Nov 14 peter 118
3346 06 Nov 14 peter 119
3684 22 Aug 17 peter 120   // This function (or interrupt) has to be called before Scheduler
3684 22 Aug 17 peter 121   // goes out of scope, which triggers the idea to let destructor call
3684 22 Aug 17 peter 122   // ::wait(). However, wait() might very well throw, and according to
3684 22 Aug 17 peter 123   // this article
3684 22 Aug 17 peter 124   // (http://bin-login.name/ftp/pub/docs/programming_languages/cpp/cffective_cpp/MAGAZINE/SU_FRAME.HTM#destruct)
3684 22 Aug 17 peter 125   // from Herb Sutter basically saying that destructors that throw are
3684 22 Aug 17 peter 126   // evil.
3348 13 Nov 14 peter 127   void Scheduler::wait(void)
3348 13 Nov 14 peter 128   {
3828 23 Jul 19 peter 129     if (!job_handler_.joinable())
3828 23 Jul 19 peter 130       return;
3828 23 Jul 19 peter 131
3681 22 Aug 17 peter 132     // first signal to JobHandler that Scheduler is waiting
3853 01 Jan 20 peter 133     data_.messengers().push(boost::make_shared<SchedulerIsWaiting>());
3348 13 Nov 14 peter 134
3681 22 Aug 17 peter 135     // wait for job handler to finish
3681 22 Aug 17 peter 136     job_handler_.join();
3681 22 Aug 17 peter 137     throw_if_error();
3348 13 Nov 14 peter 138   }
3348 13 Nov 14 peter 139
3348 13 Nov 14 peter 140
3346 06 Nov 14 peter 141   // Scheduler::Job
3346 06 Nov 14 peter 142
3402 31 Mar 15 peter 143   Scheduler::Job::Job(unsigned int prio)
3402 31 Mar 15 peter 144     : status_(pristine), priority_(prio), id_(0) {}
3346 06 Nov 14 peter 145
3346 06 Nov 14 peter 146
3346 06 Nov 14 peter 147   Scheduler::Job::~Job(void)
3346 06 Nov 14 peter 148   {}
3346 06 Nov 14 peter 149
3346 06 Nov 14 peter 150
3853 01 Jan 20 peter 151   std::set<Scheduler::JobPtr>&
3853 01 Jan 20 peter 152   Scheduler::Job::prerequisite(const Scheduler::Dependency::Lock& lock)
3853 01 Jan 20 peter 153   {
3853 01 Jan 20 peter 154     return prerequisite_;
3853 01 Jan 20 peter 155   }
3853 01 Jan 20 peter 156
3853 01 Jan 20 peter 157
3853 01 Jan 20 peter 158   const std::set<Scheduler::JobPtr>&
3853 01 Jan 20 peter 159   Scheduler::Job::prerequisite(const Scheduler::Dependency::Lock& lock) const
3853 01 Jan 20 peter 160   {
3853 01 Jan 20 peter 161     return prerequisite_;
3853 01 Jan 20 peter 162   }
3853 01 Jan 20 peter 163
3853 01 Jan 20 peter 164
3402 31 Mar 15 peter 165   unsigned int Scheduler::Job::priority(void) const
3402 31 Mar 15 peter 166   {
3402 31 Mar 15 peter 167     return priority_;
3402 31 Mar 15 peter 168   }
3402 31 Mar 15 peter 169
3402 31 Mar 15 peter 170
3402 31 Mar 15 peter 171   bool Scheduler::LowerPriority::operator()(const JobPtr& lhs,
3402 31 Mar 15 peter 172                                             const JobPtr& rhs) const
3402 31 Mar 15 peter 173   {
3402 31 Mar 15 peter 174     if (rhs.get() == NULL)
3402 31 Mar 15 peter 175       return false;
3402 31 Mar 15 peter 176     if (lhs.get() == NULL)
3402 31 Mar 15 peter 177       return true;
3402 31 Mar 15 peter 178
3402 31 Mar 15 peter 179     if (lhs->priority() != rhs->priority())
3402 31 Mar 15 peter 180       return lhs->priority() < rhs->priority();
3402 31 Mar 15 peter 181
3402 31 Mar 15 peter 182     return lhs->id_ > rhs->id_;
3402 31 Mar 15 peter 183   }
3402 31 Mar 15 peter 184
3402 31 Mar 15 peter 185
3853 01 Jan 20 peter 186   void
3853 01 Jan 20 peter 187   Scheduler::Job::add_prerequisite(const JobPtr& prereq,
3853 01 Jan 20 peter 188                                    const Scheduler::Dependency::Lock& lock)
3681 22 Aug 17 peter 189   {
3681 22 Aug 17 peter 190     prerequisite_.insert(prereq);
3681 22 Aug 17 peter 191   }
3681 22 Aug 17 peter 192
3681 22 Aug 17 peter 193
3853 01 Jan 20 peter 194   size_t
3853 01 Jan 20 peter 195   Scheduler::Job::remove_prerequisite(const JobPtr& prereq,
3853 01 Jan 20 peter 196                                       const Scheduler::Dependency::Lock& lock)
3681 22 Aug 17 peter 197   {
3681 22 Aug 17 peter 198     prerequisite_.erase(prereq);
3681 22 Aug 17 peter 199     return prerequisite_.size();
3681 22 Aug 17 peter 200   }
3681 22 Aug 17 peter 201
3681 22 Aug 17 peter 202
3853 01 Jan 20 peter 203   Scheduler::Job::status_type
3948 20 Jul 20 peter 204   Scheduler::Job::status(const std::unique_lock<std::mutex>& lock) const
3681 22 Aug 17 peter 205   {
3853 01 Jan 20 peter 206     return status_;
3681 22 Aug 17 peter 207   }
3681 22 Aug 17 peter 208
3681 22 Aug 17 peter 209
3948 20 Jul 20 peter 210   void Scheduler::Job::status(const std::unique_lock<std::mutex>& lock,
3853 01 Jan 20 peter 211                               Scheduler::Job::status_type s)
3853 01 Jan 20 peter 212   {
3853 01 Jan 20 peter 213     status_ = s;
3853 01 Jan 20 peter 214   }
3853 01 Jan 20 peter 215
3853 01 Jan 20 peter 216
3853 01 Jan 20 peter 217
3346 06 Nov 14 peter 218   // Scheduler::Worker
3346 06 Nov 14 peter 219
3848 23 Sep 19 peter 220   Scheduler::Worker::Worker(JobQueue& q, MessengerQueue& c)
3346 06 Nov 14 peter 221     : queue_(q), completed_(c)
3346 06 Nov 14 peter 222   {
3346 06 Nov 14 peter 223   }
3346 06 Nov 14 peter 224
3346 06 Nov 14 peter 225
3346 06 Nov 14 peter 226   void Scheduler::Worker::operator()(void)
3346 06 Nov 14 peter 227   {
3346 06 Nov 14 peter 228     while (true) {
3346 06 Nov 14 peter 229       boost::shared_ptr<Job> job;
3346 06 Nov 14 peter 230       // get next job
3346 06 Nov 14 peter 231       queue_.pop(job);
3346 06 Nov 14 peter 232       // NULL job indicates poison pill
3346 06 Nov 14 peter 233       if (job.get()==NULL) {
3848 23 Sep 19 peter 234         completed_.push(boost::make_shared<JobMessenger>(job));
3848 23 Sep 19 peter 235         return;
3346 06 Nov 14 peter 236       }
3346 06 Nov 14 peter 237       // action
3405 06 Apr 15 peter 238       try {
3405 06 Apr 15 peter 239         (*job)();
3405 06 Apr 15 peter 240       }
3405 06 Apr 15 peter 241       catch (...) {
3405 06 Apr 15 peter 242         job->error_ = boost::current_exception();
3405 06 Apr 15 peter 243         // return job to scheduler so it can act on the error
3848 23 Sep 19 peter 244         completed_.push(boost::make_shared<JobMessenger>(job));
3405 06 Apr 15 peter 245         // exit work and go home
3405 06 Apr 15 peter 246         return;
3405 06 Apr 15 peter 247       }
3346 06 Nov 14 peter 248       // return job to scheduler
3848 23 Sep 19 peter 249       completed_.push(boost::make_shared<JobMessenger>(job));
3346 06 Nov 14 peter 250     }
3346 06 Nov 14 peter 251   }
3346 06 Nov 14 peter 252
3681 22 Aug 17 peter 253
3853 01 Jan 20 peter 254   void Scheduler::Dependency::add(const JobPtr& child, const JobPtr& parent,
3853 01 Jan 20 peter 255                                   const Lock& lock)
3853 01 Jan 20 peter 256   {
3853 01 Jan 20 peter 257     // ignore dependency if parent has already completed
3853 01 Jan 20 peter 258     if (parent->status(lock) == Job::completed)
3853 01 Jan 20 peter 259       return;
3853 01 Jan 20 peter 260     children_[parent].push_back(child);
3853 01 Jan 20 peter 261     child->add_prerequisite(parent, lock);
3853 01 Jan 20 peter 262   }
3853 01 Jan 20 peter 263
3853 01 Jan 20 peter 264
3853 01 Jan 20 peter 265   void Scheduler::Dependency::remove(const JobPtr& parent, const Lock& lock)
3853 01 Jan 20 peter 266   {
3853 01 Jan 20 peter 267     typedef std::vector<JobPtr> Children;
3853 01 Jan 20 peter 268     std::map<JobPtr, Children>::iterator it = children_.find(parent);
3853 01 Jan 20 peter 269     if (it == children_.end())
3853 01 Jan 20 peter 270       return;
3853 01 Jan 20 peter 271
3853 01 Jan 20 peter 272     // children of parent
3853 01 Jan 20 peter 273     Children& children = it->second;
3853 01 Jan 20 peter 274     for (Children::iterator child=children.begin(); child!=children.end();
3853 01 Jan 20 peter 275          ++child) {
3853 01 Jan 20 peter 276       // remove parent from child
3853 01 Jan 20 peter 277       (*child)->remove_prerequisite(parent, lock);
3853 01 Jan 20 peter 278     }
3853 01 Jan 20 peter 279     children_.erase(it);
3853 01 Jan 20 peter 280   }
3853 01 Jan 20 peter 281
3853 01 Jan 20 peter 282
3853 01 Jan 20 peter 283   std::vector<boost::shared_ptr<Scheduler::Job> >&
3853 01 Jan 20 peter 284   Scheduler::Dependency::children(const JobPtr& key, const Lock& lock)
3853 01 Jan 20 peter 285   {
3853 01 Jan 20 peter 286     std::map<JobPtr, std::vector<JobPtr> >::iterator it = children_.find(key);
3853 01 Jan 20 peter 287     if (it == children_.end())
3853 01 Jan 20 peter 288       return empty_vector_;
3853 01 Jan 20 peter 289     return it->second;
3853 01 Jan 20 peter 290   }
3853 01 Jan 20 peter 291
3853 01 Jan 20 peter 292
3948 20 Jul 20 peter 293   std::mutex& Scheduler::Dependency::mutex(void)
3853 01 Jan 20 peter 294   {
3853 01 Jan 20 peter 295     return mutex_;
3853 01 Jan 20 peter 296   }
3853 01 Jan 20 peter 297
3853 01 Jan 20 peter 298
3848 23 Sep 19 peter 299   // Scheduler::Messenger
3848 23 Sep 19 peter 300   Scheduler::JobMessenger::JobMessenger(const Scheduler::JobPtr& job)
3848 23 Sep 19 peter 301     : job_(job)
3848 23 Sep 19 peter 302   {
3848 23 Sep 19 peter 303   }
3848 23 Sep 19 peter 304
3848 23 Sep 19 peter 305
3848 23 Sep 19 peter 306   void Scheduler::JobMessenger::operator()(Scheduler::JobHandler& handler)
3848 23 Sep 19 peter 307   {
3848 23 Sep 19 peter 308     if (job_)
3848 23 Sep 19 peter 309       handler.process(job_);
3848 23 Sep 19 peter 310   }
3848 23 Sep 19 peter 311
3848 23 Sep 19 peter 312
3848 23 Sep 19 peter 313   void Scheduler::SchedulerIsWaiting::operator()(Scheduler::JobHandler& handler)
3848 23 Sep 19 peter 314   {
3848 23 Sep 19 peter 315     handler.scheduler_is_waiting(true);
3848 23 Sep 19 peter 316   }
3848 23 Sep 19 peter 317
3848 23 Sep 19 peter 318
3848 23 Sep 19 peter 319   void Scheduler::InterruptWorkers::operator()(Scheduler::JobHandler& handler)
3848 23 Sep 19 peter 320   {
3848 23 Sep 19 peter 321     handler.scheduler_is_waiting(true);
3848 23 Sep 19 peter 322     handler.interrupt_workers();
3848 23 Sep 19 peter 323   }
3848 23 Sep 19 peter 324
3848 23 Sep 19 peter 325
3848 23 Sep 19 peter 326   void Scheduler::WorkForceSizer::operator()(Scheduler::JobHandler& handler)
3848 23 Sep 19 peter 327   {
3848 23 Sep 19 peter 328     unsigned int wanted_n = handler.data().n_threads().get();
3848 23 Sep 19 peter 329     unsigned int current_n = handler.n_target_workers();
3848 23 Sep 19 peter 330
3848 23 Sep 19 peter 331     if (wanted_n < current_n) {
3848 23 Sep 19 peter 332       handler.kill_workers(current_n - wanted_n);
3848 23 Sep 19 peter 333     }
3848 23 Sep 19 peter 334     else if (wanted_n > current_n) {
3848 23 Sep 19 peter 335       handler.remove_joined_workers();
3848 23 Sep 19 peter 336       handler.create_workers(wanted_n - current_n);
3848 23 Sep 19 peter 337     }
3848 23 Sep 19 peter 338   }
3848 23 Sep 19 peter 339
3848 23 Sep 19 peter 340
3823 16 Jul 19 peter 341   // Scheduler::JobHandlerData
3848 23 Sep 19 peter 342   Scheduler::JobHandlerData::JobHandlerData(void)
3848 23 Sep 19 peter 343     : running_jobs_(0)
3823 16 Jul 19 peter 344   {}
3681 22 Aug 17 peter 345
3823 16 Jul 19 peter 346
3853 01 Jan 20 peter 347   Scheduler::Dependency& Scheduler::JobHandlerData::dependency(void)
3853 01 Jan 20 peter 348   {
3853 01 Jan 20 peter 349     return dependency_;
3853 01 Jan 20 peter 350   }
3853 01 Jan 20 peter 351
3853 01 Jan 20 peter 352
3853 01 Jan 20 peter 353   const Scheduler::Dependency&
3853 01 Jan 20 peter 354   Scheduler::JobHandlerData::dependency(void) const
3853 01 Jan 20 peter 355   {
3853 01 Jan 20 peter 356     return dependency_;
3853 01 Jan 20 peter 357   }
3853 01 Jan 20 peter 358
3853 01 Jan 20 peter 359
3823 16 Jul 19 peter 360   Queue<boost::exception_ptr>&
3823 16 Jul 19 peter 361   Scheduler::JobHandlerData::error(void) const
3681 22 Aug 17 peter 362   {
3823 16 Jul 19 peter 363     return error_;
3681 22 Aug 17 peter 364   }
3681 22 Aug 17 peter 365
3681 22 Aug 17 peter 366
3848 23 Sep 19 peter 367   const Scheduler::MessengerQueue&
3848 23 Sep 19 peter 368   Scheduler::JobHandlerData::messengers(void) const
3823 16 Jul 19 peter 369   {
3848 23 Sep 19 peter 370     return messengers_;
3823 16 Jul 19 peter 371   }
3823 16 Jul 19 peter 372
3823 16 Jul 19 peter 373
3848 23 Sep 19 peter 374   Scheduler::MessengerQueue& Scheduler::JobHandlerData::messengers(void)
3823 16 Jul 19 peter 375   {
3848 23 Sep 19 peter 376     return messengers_;
3823 16 Jul 19 peter 377   }
3823 16 Jul 19 peter 378
3823 16 Jul 19 peter 379
3823 16 Jul 19 peter 380   const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
3823 16 Jul 19 peter 381   {
3823 16 Jul 19 peter 382     return queue_;
3823 16 Jul 19 peter 383   }
3823 16 Jul 19 peter 384
3823 16 Jul 19 peter 385
3823 16 Jul 19 peter 386   Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
3823 16 Jul 19 peter 387   {
3823 16 Jul 19 peter 388     return queue_;
3823 16 Jul 19 peter 389   }
3823 16 Jul 19 peter 390
3823 16 Jul 19 peter 391
3823 16 Jul 19 peter 392   const Scheduler::JobHandlerData::Count&
3823 16 Jul 19 peter 393   Scheduler::JobHandlerData::running_jobs(void) const
3823 16 Jul 19 peter 394   {
3823 16 Jul 19 peter 395     return running_jobs_;
3823 16 Jul 19 peter 396   }
3823 16 Jul 19 peter 397
3823 16 Jul 19 peter 398
3823 16 Jul 19 peter 399   Scheduler::JobHandlerData::Count&
3823 16 Jul 19 peter 400   Scheduler::JobHandlerData::running_jobs(void)
3823 16 Jul 19 peter 401   {
3823 16 Jul 19 peter 402     return running_jobs_;
3823 16 Jul 19 peter 403   }
3823 16 Jul 19 peter 404
3823 16 Jul 19 peter 405
3823 16 Jul 19 peter 406   const Scheduler::JobHandlerData::Count&
3848 23 Sep 19 peter 407   Scheduler::JobHandlerData::n_threads(void) const
3823 16 Jul 19 peter 408   {
3823 16 Jul 19 peter 409     return threads_;
3823 16 Jul 19 peter 410   }
3823 16 Jul 19 peter 411
3823 16 Jul 19 peter 412
3823 16 Jul 19 peter 413   Scheduler::JobHandlerData::Count&
3848 23 Sep 19 peter 414   Scheduler::JobHandlerData::n_threads(void)
3823 16 Jul 19 peter 415   {
3823 16 Jul 19 peter 416     return threads_;
3823 16 Jul 19 peter 417   }
3823 16 Jul 19 peter 418
3823 16 Jul 19 peter 419
3823 16 Jul 19 peter 420   // Scheduler::JobHandlerData::Count
3823 16 Jul 19 peter 421   Scheduler::JobHandlerData::Count::Count(int x)
3823 16 Jul 19 peter 422     : x_(x)
3823 16 Jul 19 peter 423   {
3823 16 Jul 19 peter 424   }
3823 16 Jul 19 peter 425
3823 16 Jul 19 peter 426
3823 16 Jul 19 peter 427   void Scheduler::JobHandlerData::Count::decrement(void)
3823 16 Jul 19 peter 428   {
3948 20 Jul 20 peter 429     std::unique_lock<std::mutex> lock(mutex_);
3848 23 Sep 19 peter 430     assert(x_ > 0);
3823 16 Jul 19 peter 431     --x_;
3823 16 Jul 19 peter 432   }
3823 16 Jul 19 peter 433
3823 16 Jul 19 peter 434
3823 16 Jul 19 peter 435   int Scheduler::JobHandlerData::Count::get(void) const
3823 16 Jul 19 peter 436   {
3948 20 Jul 20 peter 437     std::unique_lock<std::mutex> lock(mutex_);
3823 16 Jul 19 peter 438     return x_;
3823 16 Jul 19 peter 439   }
3823 16 Jul 19 peter 440
3823 16 Jul 19 peter 441
3823 16 Jul 19 peter 442   void Scheduler::JobHandlerData::Count::increment(void)
3823 16 Jul 19 peter 443   {
3948 20 Jul 20 peter 444     std::unique_lock<std::mutex> lock(mutex_);
3823 16 Jul 19 peter 445     ++x_;
3823 16 Jul 19 peter 446   }
3823 16 Jul 19 peter 447
3823 16 Jul 19 peter 448
3823 16 Jul 19 peter 449   void Scheduler::JobHandlerData::Count::set(int x)
3823 16 Jul 19 peter 450   {
3948 20 Jul 20 peter 451     std::unique_lock<std::mutex> lock(mutex_);
3823 16 Jul 19 peter 452     x_ = x;
3823 16 Jul 19 peter 453   }
3823 16 Jul 19 peter 454
3823 16 Jul 19 peter 455
3823 16 Jul 19 peter 456   // Scheduler::JobHandler
3823 16 Jul 19 peter 457
3823 16 Jul 19 peter 458   Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
3870 24 Feb 20 peter 459     : data_(&data), scheduler_is_waiting_(false), job_count_(0),
3870 24 Feb 20 peter 460       n_target_workers_(0)
3870 24 Feb 20 peter 461   {
3870 24 Feb 20 peter 462   }
3823 16 Jul 19 peter 463
3823 16 Jul 19 peter 464
3848 23 Sep 19 peter 465   void Scheduler::JobHandler::create_workers(unsigned int n)
3848 23 Sep 19 peter 466   {
3848 23 Sep 19 peter 467     for (size_t i=0; i<n; ++i) {
3848 23 Sep 19 peter 468       Worker worker(data_->queue(), data_->messengers());
3948 20 Jul 20 peter 469       workers_.push_back(boost::make_shared<std::thread>(worker));
3848 23 Sep 19 peter 470     }
3848 23 Sep 19 peter 471     n_target_workers_ += n;
3848 23 Sep 19 peter 472   }
3823 16 Jul 19 peter 473
3848 23 Sep 19 peter 474
3848 23 Sep 19 peter 475   Scheduler::JobHandlerData& Scheduler::JobHandler::data(void)
3848 23 Sep 19 peter 476   {
3848 23 Sep 19 peter 477     assert(data_);
3848 23 Sep 19 peter 478     return *data_;
3848 23 Sep 19 peter 479   }
3848 23 Sep 19 peter 480
3848 23 Sep 19 peter 481
3848 23 Sep 19 peter 482   void Scheduler::JobHandler::kill_workers(unsigned int n)
3848 23 Sep 19 peter 483   {
3848 23 Sep 19 peter 484     assert(n_target_workers_ >= n);
3848 23 Sep 19 peter 485     JobPtr poison_pill;
3848 23 Sep 19 peter 486     for (size_t i=0; i<n; ++i) {
3848 23 Sep 19 peter 487       data_->queue().push(poison_pill);
3848 23 Sep 19 peter 488     }
3848 23 Sep 19 peter 489     n_target_workers_ -= n;
3848 23 Sep 19 peter 490   }
3848 23 Sep 19 peter 491
3848 23 Sep 19 peter 492
3848 23 Sep 19 peter 493   void Scheduler::JobHandler::kill_workers(void)
3848 23 Sep 19 peter 494   {
3848 23 Sep 19 peter 495     kill_workers(n_target_workers_);
3848 23 Sep 19 peter 496     assert(n_target_workers_ == 0);
3848 23 Sep 19 peter 497   }
3848 23 Sep 19 peter 498
3848 23 Sep 19 peter 499
3848 23 Sep 19 peter 500   void Scheduler::JobHandler::interrupt_workers(void)
3848 23 Sep 19 peter 501   {
4204 26 Aug 22 peter 502     // if # of workers is zero, kill_workers have already been called
4204 26 Aug 22 peter 503     // (either directly or via interrupt_workers) and the jobs in the
4204 26 Aug 22 peter 504     // queue below are most likely poison pills and stealing them
4204 26 Aug 22 peter 505     // would prevent workers from dying.
4204 26 Aug 22 peter 506     if (n_target_workers_ == 0)
4204 26 Aug 22 peter 507       return;
4164 13 Mar 22 peter 508     JobPtr tmp;
4164 13 Mar 22 peter 509     // We would like to clear the queue and reduce running_jobs with
4164 13 Mar 22 peter 510     // the reduced size, but ATM there is no way to lock the queue to
4204 26 Aug 22 peter 511     // ensure a Worker is not modifying it between we assess the size
4164 13 Mar 22 peter 512     // and clear it.
4164 13 Mar 22 peter 513     while (data_->queue().try_pop(tmp))
4164 13 Mar 22 peter 514       data_->running_jobs().decrement();
4164 13 Mar 22 peter 515
3848 23 Sep 19 peter 516     kill_workers();
4204 26 Aug 22 peter 517     assert(n_target_workers_ == 0);
3848 23 Sep 19 peter 518   }
3848 23 Sep 19 peter 519
3848 23 Sep 19 peter 520
3848 23 Sep 19 peter 521   unsigned int Scheduler::JobHandler::n_target_workers(void) const
3848 23 Sep 19 peter 522   {
3848 23 Sep 19 peter 523     return n_target_workers_;
3848 23 Sep 19 peter 524   }
3848 23 Sep 19 peter 525
3848 23 Sep 19 peter 526
3853 01 Jan 20 peter 527   void
3853 01 Jan 20 peter 528   Scheduler::JobHandler::post_process(JobPtr job,
3853 01 Jan 20 peter 529                                       const Scheduler::Dependency::Lock& lock)
3681 22 Aug 17 peter 530   {
3823 16 Jul 19 peter 531     assert(job);
3823 16 Jul 19 peter 532     assert(data_);
3848 23 Sep 19 peter 533     assert(data_->running_jobs().get() > 0);
3823 16 Jul 19 peter 534     data_->running_jobs().decrement();
3823 16 Jul 19 peter 535     assert(data_->running_jobs().get() >= 0);
3853 01 Jan 20 peter 536     job->status(lock, Job::completed);
3681 22 Aug 17 peter 537
3681 22 Aug 17 peter 538     if (job->error_) {
3823 16 Jul 19 peter 539       data_->error().push(job->error_);
4164 13 Mar 22 peter 540       interrupt_workers();
3681 22 Aug 17 peter 541       return;
3681 22 Aug 17 peter 542     }
3681 22 Aug 17 peter 543
3853 01 Jan 20 peter 544     // save children so we can loop over them below
3853 01 Jan 20 peter 545     std::vector<JobPtr> children = data_->dependency().children(job, lock);
3853 01 Jan 20 peter 546     // remove the dependency
3853 01 Jan 20 peter 547     data_->dependency().remove(job, lock);
3853 01 Jan 20 peter 548     for (std::vector<JobPtr>::iterator child=children.begin();
3853 01 Jan 20 peter 549          child!=children.end(); ++child) {
3853 01 Jan 20 peter 550       // If child has been submitted, i.e., not pristine and child has
3853 01 Jan 20 peter 551       // no (unfinished parents), send it to queue.
3853 01 Jan 20 peter 552       if ((*child)->status(lock) != Job::pristine &&
3853 01 Jan 20 peter 553           (*child)->prerequisite(lock).empty()) {
3853 01 Jan 20 peter 554         assert((*child)->status(lock) == Job::prepared);
3853 01 Jan 20 peter 555         send2queue(*child, lock);
3686 22 Aug 17 peter 556       }
3853 01 Jan 20 peter 557     }
3681 22 Aug 17 peter 558   }
3681 22 Aug 17 peter 559
3681 22 Aug 17 peter 560
3853 01 Jan 20 peter 561   void Scheduler::JobHandler::prepare(JobPtr job,
3853 01 Jan 20 peter 562                                       const Scheduler::Dependency::Lock& lock)
3681 22 Aug 17 peter 563   {
4204 26 Aug 22 peter 564     if (!n_target_workers_)
4204 26 Aug 22 peter 565       return;
4204 26 Aug 22 peter 566     assert(n_target_workers_);
3853 01 Jan 20 peter 567     assert(job->status(lock) == Job::pristine);
3853 01 Jan 20 peter 568     job->status(lock, Job::prepared);
3848 23 Sep 19 peter 569     job->id_ = job_count_;
3848 23 Sep 19 peter 570     ++job_count_;
3681 22 Aug 17 peter 571
3853 01 Jan 20 peter 572     std::set<JobPtr>& parents = job->prerequisite(lock);
3681 22 Aug 17 peter 573
3681 22 Aug 17 peter 574     typedef std::set<JobPtr>::iterator iterator;
3853 01 Jan 20 peter 575     for (iterator parent=parents.begin(); parent!=parents.end(); ++parent) {
3853 01 Jan 20 peter 576       if ((*parent)->status(lock) == Job::pristine) {
3853 01 Jan 20 peter 577         prepare(*parent, lock);
3681 22 Aug 17 peter 578       }
3681 22 Aug 17 peter 579     }
3681 22 Aug 17 peter 580
3681 22 Aug 17 peter 581     // If all prerequisite are finished, send job to queue
3853 01 Jan 20 peter 582     if (parents.empty())
3853 01 Jan 20 peter 583       send2queue(job, lock);
3681 22 Aug 17 peter 584   }
3681 22 Aug 17 peter 585
3681 22 Aug 17 peter 586
3848 23 Sep 19 peter 587   void Scheduler::JobHandler::remove_joined_workers(void)
3848 23 Sep 19 peter 588   {
3848 23 Sep 19 peter 589     for (WorkerList::iterator w=workers_.begin(); w!=workers_.end(); ) {
3848 23 Sep 19 peter 590       if ((*w)->joinable() == false)
3848 23 Sep 19 peter 591         workers_.erase(w++);
3848 23 Sep 19 peter 592       else
3848 23 Sep 19 peter 593         ++w;
3848 23 Sep 19 peter 594     }
3848 23 Sep 19 peter 595   }
3848 23 Sep 19 peter 596
3848 23 Sep 19 peter 597
3853 01 Jan 20 peter 598   void
3853 01 Jan 20 peter 599   Scheduler::JobHandler::send2queue(JobPtr& job,
3853 01 Jan 20 peter 600                                     const Scheduler::Dependency::Lock& lock)
3681 22 Aug 17 peter 601   {
4204 26 Aug 22 peter 602     assert(n_target_workers_);
3853 01 Jan 20 peter 603     job->status(lock, Job::running);
3823 16 Jul 19 peter 604     data_->running_jobs().increment();
3823 16 Jul 19 peter 605     assert(data_->running_jobs().get() > 0);
3823 16 Jul 19 peter 606     data_->queue().push(job);
3681 22 Aug 17 peter 607   }
3681 22 Aug 17 peter 608
3681 22 Aug 17 peter 609
3681 22 Aug 17 peter 610   void Scheduler::JobHandler::process(JobPtr& job)
3681 22 Aug 17 peter 611   {
3853 01 Jan 20 peter 612     Dependency::Lock lock(data_->dependency().mutex());
3848 23 Sep 19 peter 613     assert(job);
3853 01 Jan 20 peter 614     switch (job->status(lock)) {
3681 22 Aug 17 peter 615     case(Job::pristine):
3853 01 Jan 20 peter 616       prepare(job, lock);
3681 22 Aug 17 peter 617       break;
3681 22 Aug 17 peter 618     case(Job::running):
3853 01 Jan 20 peter 619       post_process(job, lock);
3681 22 Aug 17 peter 620       break;
3681 22 Aug 17 peter 621     default:
3681 22 Aug 17 peter 622       assert(0 &&
3681 22 Aug 17 peter 623              "job either pristine (fr. Scheduler) or running (fr. Worker)");
3681 22 Aug 17 peter 624     }
3681 22 Aug 17 peter 625   }
3681 22 Aug 17 peter 626
3681 22 Aug 17 peter 627
3848 23 Sep 19 peter 628   void Scheduler::JobHandler::wait_workers(void) const
3848 23 Sep 19 peter 629   {
3848 23 Sep 19 peter 630     for (WorkerList::const_iterator w=workers_.begin(); w!=workers_.end(); ++w)
3848 23 Sep 19 peter 631       if ((*w)->joinable())
3848 23 Sep 19 peter 632         (*w)->join();
3848 23 Sep 19 peter 633   }
3848 23 Sep 19 peter 634
3848 23 Sep 19 peter 635
3681 22 Aug 17 peter 636   void Scheduler::JobHandler::operator()(void)
3681 22 Aug 17 peter 637   {
3823 16 Jul 19 peter 638     assert(data_);
3848 23 Sep 19 peter 639     assert(workers_.empty());
3848 23 Sep 19 peter 640     create_workers(data_->n_threads().get());
3681 22 Aug 17 peter 641
3848 23 Sep 19 peter 642     // Process messengers stored in data_->messengers() coming from
3848 23 Sep 19 peter 643     // both the Scheduler and Workers. We keep waiting here until:
3848 23 Sep 19 peter 644     //   1) scheduler_is_waiting is true i.e. Scheduler::wait has been
3848 23 Sep 19 peter 645     //   called
3848 23 Sep 19 peter 646     //   2) There are no jobs running
3848 23 Sep 19 peter 647     //   3) There are no more messengers
3848 23 Sep 19 peter 648     MessengerPtr msg;
3681 22 Aug 17 peter 649
3848 23 Sep 19 peter 650     while (!scheduler_is_waiting_ || data_->running_jobs().get() ||
3848 23 Sep 19 peter 651            !data_->messengers().empty()) {
3848 23 Sep 19 peter 652       data_->messengers().pop(msg);
3848 23 Sep 19 peter 653       (*msg)(*this);
3681 22 Aug 17 peter 654     }
3681 22 Aug 17 peter 655
3848 23 Sep 19 peter 656     // give poison pills to workers
3848 23 Sep 19 peter 657     kill_workers();
3848 23 Sep 19 peter 658     // wait for workers
3848 23 Sep 19 peter 659     wait_workers();
3848 23 Sep 19 peter 660     // clean up
3848 23 Sep 19 peter 661     workers_.clear();
3681 22 Aug 17 peter 662   }
3681 22 Aug 17 peter 663
3848 23 Sep 19 peter 664
3848 23 Sep 19 peter 665   void Scheduler::JobHandler::scheduler_is_waiting(bool value)
3848 23 Sep 19 peter 666   {
3848 23 Sep 19 peter 667     scheduler_is_waiting_ = value;
3848 23 Sep 19 peter 668   }
3848 23 Sep 19 peter 669
3346 06 Nov 14 peter 670 }}}