4044 |
01 Mar 21 |
peter |
1 |
#ifndef _theplu_yat_utility_multi_processor_ |
4044 |
01 Mar 21 |
peter |
2 |
#define _theplu_yat_utility_multi_processor_ |
4044 |
01 Mar 21 |
peter |
3 |
|
4044 |
01 Mar 21 |
peter |
// $Id$ |
4044 |
01 Mar 21 |
peter |
5 |
|
4044 |
01 Mar 21 |
peter |
6 |
/* |
4044 |
01 Mar 21 |
peter |
Copyright (C) 2021 Peter Johansson |
4044 |
01 Mar 21 |
peter |
8 |
|
4044 |
01 Mar 21 |
peter |
This file is part of the yat library, http://dev.thep.lu.se/yat |
4044 |
01 Mar 21 |
peter |
10 |
|
4044 |
01 Mar 21 |
peter |
The yat library is free software; you can redistribute it and/or |
4044 |
01 Mar 21 |
peter |
modify it under the terms of the GNU General Public License as |
4044 |
01 Mar 21 |
peter |
published by the Free Software Foundation; either version 3 of the |
4044 |
01 Mar 21 |
peter |
License, or (at your option) any later version. |
4044 |
01 Mar 21 |
peter |
15 |
|
4044 |
01 Mar 21 |
peter |
The yat library is distributed in the hope that it will be useful, |
4044 |
01 Mar 21 |
peter |
but WITHOUT ANY WARRANTY; without even the implied warranty of |
4044 |
01 Mar 21 |
peter |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
4044 |
01 Mar 21 |
peter |
General Public License for more details. |
4044 |
01 Mar 21 |
peter |
20 |
|
4044 |
01 Mar 21 |
peter |
You should have received a copy of the GNU General Public License |
4044 |
01 Mar 21 |
peter |
along with yat. If not, see <http://www.gnu.org/licenses/>. |
4044 |
01 Mar 21 |
peter |
23 |
*/ |
4044 |
01 Mar 21 |
peter |
24 |
|
4044 |
01 Mar 21 |
peter |
25 |
#include "Queue.h" |
4044 |
01 Mar 21 |
peter |
26 |
#include "yat_assert.h" |
4044 |
01 Mar 21 |
peter |
27 |
|
4047 |
01 Mar 21 |
peter |
28 |
#include <iterator> |
4047 |
01 Mar 21 |
peter |
29 |
#include <map> |
4044 |
01 Mar 21 |
peter |
30 |
#include <thread> |
4044 |
01 Mar 21 |
peter |
31 |
#include <vector> |
4044 |
01 Mar 21 |
peter |
32 |
|
4044 |
01 Mar 21 |
peter |
33 |
namespace theplu { |
4044 |
01 Mar 21 |
peter |
34 |
namespace yat { |
4044 |
01 Mar 21 |
peter |
35 |
namespace utility { |
4044 |
01 Mar 21 |
peter |
36 |
|
4044 |
01 Mar 21 |
peter |
37 |
/** |
4044 |
01 Mar 21 |
peter |
Same as |
4044 |
01 Mar 21 |
peter |
multiprocess(begin, end, out, cap1, n_threads, cap2, function) |
4044 |
01 Mar 21 |
peter |
but use \a compare instead of std::less<T> |
4044 |
01 Mar 21 |
peter |
41 |
|
4044 |
01 Mar 21 |
peter |
\since New in yat 0.19 |
4044 |
01 Mar 21 |
peter |
43 |
*/ |
4044 |
01 Mar 21 |
peter |
44 |
template<typename InputIterator, typename OutputIterator, |
4044 |
01 Mar 21 |
peter |
45 |
class Function, class Compare> |
4044 |
01 Mar 21 |
peter |
46 |
void multiprocess(InputIterator begin, InputIterator end, |
4044 |
01 Mar 21 |
peter |
47 |
OutputIterator out, size_t cap1, size_t n_threads, |
4044 |
01 Mar 21 |
peter |
48 |
size_t cap2, Function function, Compare compare) |
4044 |
01 Mar 21 |
peter |
49 |
{ |
4044 |
01 Mar 21 |
peter |
50 |
typedef typename std::iterator_traits<InputIterator>::value_type T; |
4044 |
01 Mar 21 |
peter |
51 |
|
4044 |
01 Mar 21 |
peter |
52 |
class Reader |
4044 |
01 Mar 21 |
peter |
53 |
{ |
4044 |
01 Mar 21 |
peter |
54 |
public: |
4044 |
01 Mar 21 |
peter |
55 |
Reader(InputIterator first, InputIterator last, |
4044 |
01 Mar 21 |
peter |
56 |
Queue<std::shared_ptr<T>>& queue) |
4044 |
01 Mar 21 |
peter |
57 |
: first_(first), last_(last), queue_(queue) |
4044 |
01 Mar 21 |
peter |
58 |
{} |
4044 |
01 Mar 21 |
peter |
59 |
|
4044 |
01 Mar 21 |
peter |
60 |
void operator()(void) |
4044 |
01 Mar 21 |
peter |
61 |
{ |
4044 |
01 Mar 21 |
peter |
62 |
for (; first_!=last_; ++first_) |
4044 |
01 Mar 21 |
peter |
63 |
queue_.push(std::make_shared<T>(*first_)); |
4044 |
01 Mar 21 |
peter |
// send a kill pill to workers |
4044 |
01 Mar 21 |
peter |
65 |
queue_.push(std::shared_ptr<T>()); |
4044 |
01 Mar 21 |
peter |
66 |
} |
4044 |
01 Mar 21 |
peter |
67 |
private: |
4044 |
01 Mar 21 |
peter |
68 |
InputIterator first_; |
4044 |
01 Mar 21 |
peter |
69 |
InputIterator last_; |
4044 |
01 Mar 21 |
peter |
70 |
Queue<std::shared_ptr<T>>& queue_; |
4044 |
01 Mar 21 |
peter |
71 |
}; |
4044 |
01 Mar 21 |
peter |
72 |
|
4044 |
01 Mar 21 |
peter |
73 |
|
4044 |
01 Mar 21 |
peter |
74 |
class Worker |
4044 |
01 Mar 21 |
peter |
75 |
{ |
4044 |
01 Mar 21 |
peter |
76 |
public: |
4044 |
01 Mar 21 |
peter |
77 |
Worker(Queue<std::shared_ptr<T>>& in, const Function& func, |
4044 |
01 Mar 21 |
peter |
78 |
Queue<std::shared_ptr<T>>& out) |
4044 |
01 Mar 21 |
peter |
79 |
: in_(in), out_(out), function_(func) |
4044 |
01 Mar 21 |
peter |
80 |
{} |
4044 |
01 Mar 21 |
peter |
81 |
|
4044 |
01 Mar 21 |
peter |
82 |
|
4044 |
01 Mar 21 |
peter |
83 |
void operator()(void) |
4044 |
01 Mar 21 |
peter |
84 |
{ |
4044 |
01 Mar 21 |
peter |
85 |
std::shared_ptr<T> element; |
4044 |
01 Mar 21 |
peter |
86 |
while (true) { |
4044 |
01 Mar 21 |
peter |
87 |
in_.pop(element); |
4044 |
01 Mar 21 |
peter |
88 |
if (element) { |
4044 |
01 Mar 21 |
peter |
// Do the work and if function returns true, push worked |
4044 |
01 Mar 21 |
peter |
// element into out_ queue. |
4044 |
01 Mar 21 |
peter |
91 |
if (function_(*element)) |
4044 |
01 Mar 21 |
peter |
92 |
out_.push(element); |
4044 |
01 Mar 21 |
peter |
93 |
} |
4044 |
01 Mar 21 |
peter |
94 |
else { |
4044 |
01 Mar 21 |
peter |
// When a Worker receive a kill pill, share it with other |
4046 |
01 Mar 21 |
peter |
// workers by pushing it into in_, and push it to the |
4044 |
01 Mar 21 |
peter |
// Writer to inform that this queue is completed. |
4044 |
01 Mar 21 |
peter |
98 |
in_.push(element); |
4044 |
01 Mar 21 |
peter |
99 |
out_.push(element); |
4044 |
01 Mar 21 |
peter |
// After that we are done, and can return home. |
4044 |
01 Mar 21 |
peter |
101 |
return; |
4044 |
01 Mar 21 |
peter |
102 |
} |
4044 |
01 Mar 21 |
peter |
103 |
} |
4044 |
01 Mar 21 |
peter |
104 |
} |
4044 |
01 Mar 21 |
peter |
105 |
|
4044 |
01 Mar 21 |
peter |
106 |
|
4044 |
01 Mar 21 |
peter |
107 |
private: |
4044 |
01 Mar 21 |
peter |
108 |
Queue<std::shared_ptr<T>>& in_; |
4044 |
01 Mar 21 |
peter |
109 |
Queue<std::shared_ptr<T>>& out_; |
4044 |
01 Mar 21 |
peter |
110 |
Function function_; |
4044 |
01 Mar 21 |
peter |
111 |
size_t cap_; |
4044 |
01 Mar 21 |
peter |
112 |
}; |
4044 |
01 Mar 21 |
peter |
113 |
|
4044 |
01 Mar 21 |
peter |
114 |
|
4044 |
01 Mar 21 |
peter |
115 |
class PtrCompare |
4044 |
01 Mar 21 |
peter |
116 |
{ |
4044 |
01 Mar 21 |
peter |
117 |
public: |
4044 |
01 Mar 21 |
peter |
118 |
PtrCompare(const Compare& c) |
4044 |
01 Mar 21 |
peter |
119 |
: compare_(c) |
4044 |
01 Mar 21 |
peter |
120 |
{} |
4044 |
01 Mar 21 |
peter |
121 |
|
4044 |
01 Mar 21 |
peter |
122 |
bool operator()(const std::shared_ptr<T>& lhs, |
4044 |
01 Mar 21 |
peter |
123 |
const std::shared_ptr<T>& rhs) const |
4044 |
01 Mar 21 |
peter |
124 |
{ |
4044 |
01 Mar 21 |
peter |
125 |
YAT_ASSERT(lhs.get()); |
4044 |
01 Mar 21 |
peter |
126 |
YAT_ASSERT(rhs.get()); |
4044 |
01 Mar 21 |
peter |
127 |
return compare_(*lhs, *rhs); |
4044 |
01 Mar 21 |
peter |
128 |
} |
4044 |
01 Mar 21 |
peter |
129 |
private: |
4044 |
01 Mar 21 |
peter |
130 |
Compare compare_; |
4044 |
01 Mar 21 |
peter |
131 |
}; |
4044 |
01 Mar 21 |
peter |
132 |
|
4044 |
01 Mar 21 |
peter |
133 |
|
4044 |
01 Mar 21 |
peter |
134 |
class Writer |
4044 |
01 Mar 21 |
peter |
135 |
{ |
4044 |
01 Mar 21 |
peter |
136 |
public: |
4044 |
01 Mar 21 |
peter |
137 |
Writer(OutputIterator out, std::vector<Queue<std::shared_ptr<T>>>& Qs, |
4044 |
01 Mar 21 |
peter |
138 |
const Compare& compare) |
4044 |
01 Mar 21 |
peter |
139 |
: out_(out), queue_(Qs), buffer_(PtrCompare(compare)) |
4044 |
01 Mar 21 |
peter |
140 |
{} |
4044 |
01 Mar 21 |
peter |
141 |
|
4044 |
01 Mar 21 |
peter |
142 |
|
4044 |
01 Mar 21 |
peter |
143 |
void operator()(void) |
4044 |
01 Mar 21 |
peter |
144 |
{ |
4044 |
01 Mar 21 |
peter |
145 |
for (size_t i=0; i<queue_.size(); ++i) |
4044 |
01 Mar 21 |
peter |
146 |
read(i); |
4044 |
01 Mar 21 |
peter |
147 |
|
4044 |
01 Mar 21 |
peter |
148 |
while (!buffer_.empty()) { |
4044 |
01 Mar 21 |
peter |
149 |
size_t idx = buffer_.begin()->second; |
4044 |
01 Mar 21 |
peter |
150 |
*out_ = *buffer_.begin()->first; |
4044 |
01 Mar 21 |
peter |
151 |
++out_; |
4044 |
01 Mar 21 |
peter |
152 |
buffer_.erase(buffer_.begin()); |
4044 |
01 Mar 21 |
peter |
// read from the same queue erased element came from |
4044 |
01 Mar 21 |
peter |
154 |
read(idx); |
4044 |
01 Mar 21 |
peter |
155 |
} |
4044 |
01 Mar 21 |
peter |
156 |
} |
4044 |
01 Mar 21 |
peter |
157 |
private: |
4044 |
01 Mar 21 |
peter |
158 |
void read(size_t idx) |
4044 |
01 Mar 21 |
peter |
159 |
{ |
4044 |
01 Mar 21 |
peter |
160 |
YAT_ASSERT(idx < queue_.size()); |
4044 |
01 Mar 21 |
peter |
161 |
std::shared_ptr<T> element; |
4044 |
01 Mar 21 |
peter |
162 |
queue_[idx].pop(element); |
4044 |
01 Mar 21 |
peter |
// If element was not "null" insert it into buffer. If element |
4044 |
01 Mar 21 |
peter |
// was a "null", size of buffer is efectively decreasing by |
4044 |
01 Mar 21 |
peter |
// one untile we reach zero-size. |
4044 |
01 Mar 21 |
peter |
166 |
if (element) |
4044 |
01 Mar 21 |
peter |
167 |
buffer_.insert(buffer_.end(), std::make_pair(element, idx)); |
4044 |
01 Mar 21 |
peter |
168 |
} |
4044 |
01 Mar 21 |
peter |
169 |
|
4044 |
01 Mar 21 |
peter |
170 |
OutputIterator out_; |
4044 |
01 Mar 21 |
peter |
171 |
std::vector<Queue<std::shared_ptr<T>>>& queue_; |
4044 |
01 Mar 21 |
peter |
172 |
std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_; |
4044 |
01 Mar 21 |
peter |
173 |
}; |
4044 |
01 Mar 21 |
peter |
174 |
|
4044 |
01 Mar 21 |
peter |
175 |
std::vector<std::thread> threads; |
4044 |
01 Mar 21 |
peter |
// queue used to communicate between reader and workers |
4044 |
01 Mar 21 |
peter |
177 |
Queue<std::shared_ptr<T>> queue; |
4044 |
01 Mar 21 |
peter |
178 |
queue.capacity(cap1); |
4044 |
01 Mar 21 |
peter |
179 |
|
4044 |
01 Mar 21 |
peter |
180 |
YAT_ASSERT(n_threads); |
4044 |
01 Mar 21 |
peter |
// queues used to communicate between workers and writer; each |
4044 |
01 Mar 21 |
peter |
// worker has its own queue and it's the job of the writer to |
4044 |
01 Mar 21 |
peter |
// merge them into a sorted output. |
4044 |
01 Mar 21 |
peter |
184 |
Queue<std::shared_ptr<T>> queue2; |
4044 |
01 Mar 21 |
peter |
185 |
queue2.capacity(cap2); |
4044 |
01 Mar 21 |
peter |
186 |
std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2); |
4044 |
01 Mar 21 |
peter |
187 |
|
4044 |
01 Mar 21 |
peter |
// launch threads |
4044 |
01 Mar 21 |
peter |
189 |
threads.push_back(std::thread(Reader(begin, end, queue))); |
4044 |
01 Mar 21 |
peter |
190 |
for (size_t i=0; i<n_threads; ++i) |
4044 |
01 Mar 21 |
peter |
191 |
threads.push_back(std::thread(Worker(queue, function, Qs[i]))); |
4044 |
01 Mar 21 |
peter |
192 |
threads.push_back(std::thread(Writer(out, Qs, compare))); |
4044 |
01 Mar 21 |
peter |
193 |
|
4044 |
01 Mar 21 |
peter |
// wait for threads to complete |
4044 |
01 Mar 21 |
peter |
195 |
for (size_t i=0; i<threads.size(); ++i) |
4044 |
01 Mar 21 |
peter |
196 |
threads[i].join(); |
4044 |
01 Mar 21 |
peter |
197 |
} |
4044 |
01 Mar 21 |
peter |
198 |
|
4044 |
01 Mar 21 |
peter |
199 |
|
4044 |
01 Mar 21 |
peter |
200 |
/** |
4044 |
01 Mar 21 |
peter |
Function reads the sorted range [\c begin, \c end), applies \c |
4044 |
01 Mar 21 |
peter |
function on each element and if \c function returns true, element |
4076 |
25 Aug 21 |
peter |
is copied into \c out and \c out is incremented. |
4044 |
01 Mar 21 |
peter |
204 |
|
4044 |
01 Mar 21 |
peter |
This is done in n_threads + 2 threads. The reading of element |
4044 |
01 Mar 21 |
peter |
from the sorted range is done in one range, the calling of \c |
4044 |
01 Mar 21 |
peter |
function is done in \c n_threads threads, and the merging of the |
4044 |
01 Mar 21 |
peter |
results from those calls and copying to \c out is done in one |
4044 |
01 Mar 21 |
peter |
thread. |
4044 |
01 Mar 21 |
peter |
210 |
|
4044 |
01 Mar 21 |
peter |
The communication between the threads are done in buffer |
4044 |
01 Mar 21 |
peter |
containers and to avoid extreme memory usage, the sizes of these |
4044 |
01 Mar 21 |
peter |
containers can be capped with parameter \c cap1 and \c |
4044 |
01 Mar 21 |
peter |
cap2. There's a single container communicating between reader |
4044 |
01 Mar 21 |
peter |
thread and worker threads, and \c cap1 limits the size of the |
4044 |
01 Mar 21 |
peter |
container. Each worker has a container communicating with the |
4044 |
01 Mar 21 |
peter |
writer thread, so there are \c n_threads such containers and the |
4044 |
01 Mar 21 |
peter |
size of each container is limited by \c cap2. |
4044 |
01 Mar 21 |
peter |
219 |
|
4044 |
01 Mar 21 |
peter |
\since new in yat 0.19 |
4044 |
01 Mar 21 |
peter |
221 |
*/ |
4044 |
01 Mar 21 |
peter |
222 |
template<typename InputIterator, typename OutputIterator, class Function> |
4044 |
01 Mar 21 |
peter |
223 |
void multiprocess(InputIterator begin, InputIterator end, |
4044 |
01 Mar 21 |
peter |
224 |
OutputIterator out, size_t cap1, size_t n_threads, |
4044 |
01 Mar 21 |
peter |
225 |
size_t cap2, Function function) |
4044 |
01 Mar 21 |
peter |
226 |
{ |
4044 |
01 Mar 21 |
peter |
227 |
typedef typename std::iterator_traits<InputIterator>::value_type T; |
4044 |
01 Mar 21 |
peter |
228 |
std::less<T> compare; |
4044 |
01 Mar 21 |
peter |
229 |
multiprocess(begin, end, out, cap1, n_threads, cap2, function, compare); |
4044 |
01 Mar 21 |
peter |
230 |
} |
4044 |
01 Mar 21 |
peter |
231 |
|
4044 |
01 Mar 21 |
peter |
232 |
}}} // of namespace utility, yat, and theplu |
4044 |
01 Mar 21 |
peter |
233 |
|
4044 |
01 Mar 21 |
peter |
234 |
#endif |