yat  0.21pre
multiprocess.h
1 #ifndef _theplu_yat_utility_multi_processor_
2 #define _theplu_yat_utility_multi_processor_
3 
4 // $Id: multiprocess.h 4076 2021-08-25 01:35:16Z peter $
5 
6 /*
7  Copyright (C) 2021 Peter Johansson
8 
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10 
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15 
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23 */
24 
25 #include "Queue.h"
26 #include "yat_assert.h"
27 
28 #include <iterator>
29 #include <map>
30 #include <thread>
31 #include <vector>
32 
33 namespace theplu {
34 namespace yat {
35 namespace utility {
36 
44  template<typename InputIterator, typename OutputIterator,
45  class Function, class Compare>
46  void multiprocess(InputIterator begin, InputIterator end,
47  OutputIterator out, size_t cap1, size_t n_threads,
48  size_t cap2, Function function, Compare compare)
49  {
50  typedef typename std::iterator_traits<InputIterator>::value_type T;
51 
52  class Reader
53  {
54  public:
55  Reader(InputIterator first, InputIterator last,
56  Queue<std::shared_ptr<T>>& queue)
57  : first_(first), last_(last), queue_(queue)
58  {}
59 
60  void operator()(void)
61  {
62  for (; first_!=last_; ++first_)
63  queue_.push(std::make_shared<T>(*first_));
64  // send a kill pill to workers
65  queue_.push(std::shared_ptr<T>());
66  }
67  private:
68  InputIterator first_;
69  InputIterator last_;
70  Queue<std::shared_ptr<T>>& queue_;
71  };
72 
73 
74  class Worker
75  {
76  public:
77  Worker(Queue<std::shared_ptr<T>>& in, const Function& func,
78  Queue<std::shared_ptr<T>>& out)
79  : in_(in), out_(out), function_(func)
80  {}
81 
82 
83  void operator()(void)
84  {
85  std::shared_ptr<T> element;
86  while (true) {
87  in_.pop(element);
88  if (element) {
89  // Do the work and if function returns true, push worked
90  // element into out_ queue.
91  if (function_(*element))
92  out_.push(element);
93  }
94  else {
95  // When a Worker receive a kill pill, share it with other
96  // workers by pushing it into in_, and push it to the
97  // Writer to inform that this queue is completed.
98  in_.push(element);
99  out_.push(element);
100  // After that we are done, and can return home.
101  return;
102  }
103  }
104  }
105 
106 
107  private:
110  Function function_;
111  size_t cap_;
112  };
113 
114 
115  class PtrCompare
116  {
117  public:
118  PtrCompare(const Compare& c)
119  : compare_(c)
120  {}
121 
122  bool operator()(const std::shared_ptr<T>& lhs,
123  const std::shared_ptr<T>& rhs) const
124  {
125  YAT_ASSERT(lhs.get());
126  YAT_ASSERT(rhs.get());
127  return compare_(*lhs, *rhs);
128  }
129  private:
130  Compare compare_;
131  };
132 
133 
134  class Writer
135  {
136  public:
137  Writer(OutputIterator out, std::vector<Queue<std::shared_ptr<T>>>& Qs,
138  const Compare& compare)
139  : out_(out), queue_(Qs), buffer_(PtrCompare(compare))
140  {}
141 
142 
143  void operator()(void)
144  {
145  for (size_t i=0; i<queue_.size(); ++i)
146  read(i);
147 
148  while (!buffer_.empty()) {
149  size_t idx = buffer_.begin()->second;
150  *out_ = *buffer_.begin()->first;
151  ++out_;
152  buffer_.erase(buffer_.begin());
153  // read from the same queue erased element came from
154  read(idx);
155  }
156  }
157  private:
158  void read(size_t idx)
159  {
160  YAT_ASSERT(idx < queue_.size());
161  std::shared_ptr<T> element;
162  queue_[idx].pop(element);
163  // If element was not "null" insert it into buffer. If element
164  // was a "null", size of buffer is efectively decreasing by
165  // one untile we reach zero-size.
166  if (element)
167  buffer_.insert(buffer_.end(), std::make_pair(element, idx));
168  }
169 
170  OutputIterator out_;
171  std::vector<Queue<std::shared_ptr<T>>>& queue_;
172  std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_;
173  };
174 
175  std::vector<std::thread> threads;
176  // queue used to communicate between reader and workers
178  queue.capacity(cap1);
179 
180  YAT_ASSERT(n_threads);
181  // queues used to communicate between workers and writer; each
182  // worker has its own queue and it's the job of the writer to
183  // merge them into a sorted output.
184  Queue<std::shared_ptr<T>> queue2;
185  queue2.capacity(cap2);
186  std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2);
187 
188  // launch threads
189  threads.push_back(std::thread(Reader(begin, end, queue)));
190  for (size_t i=0; i<n_threads; ++i)
191  threads.push_back(std::thread(Worker(queue, function, Qs[i])));
192  threads.push_back(std::thread(Writer(out, Qs, compare)));
193 
194  // wait for threads to complete
195  for (size_t i=0; i<threads.size(); ++i)
196  threads[i].join();
197  }
198 
199 
222  template<typename InputIterator, typename OutputIterator, class Function>
223  void multiprocess(InputIterator begin, InputIterator end,
224  OutputIterator out, size_t cap1, size_t n_threads,
225  size_t cap2, Function function)
226  {
227  typedef typename std::iterator_traits<InputIterator>::value_type T;
228  std::less<T> compare;
229  multiprocess(begin, end, out, cap1, n_threads, cap2, function, compare);
230  }
231 
232 }}} // of namespace utility, yat, and theplu
233 
234 #endif
void multiprocess(InputIterator begin, InputIterator end, OutputIterator out, size_t cap1, size_t n_threads, size_t cap2, Function function, Compare compare)
Definition: multiprocess.h:46
The Department of Theoretical Physics namespace as we define it.
Multi-thread safe queue.
Definition: Queue.h:58

Generated on Wed Jan 25 2023 03:34:29 for yat by  doxygen 1.8.14