1 #ifndef _theplu_yat_utility_multi_processor_ 2 #define _theplu_yat_utility_multi_processor_ 26 #include "yat_assert.h" 44 template<
typename InputIterator,
typename OutputIterator,
45 class Function,
class Compare>
47 OutputIterator out,
size_t cap1,
size_t n_threads,
48 size_t cap2, Function
function, Compare compare)
50 typedef typename std::iterator_traits<InputIterator>::value_type T;
55 Reader(InputIterator first, InputIterator last,
56 Queue<std::shared_ptr<T>>& queue)
57 : first_(first), last_(last), queue_(queue)
62 for (; first_!=last_; ++first_)
63 queue_.push(std::make_shared<T>(*first_));
65 queue_.push(std::shared_ptr<T>());
77 Worker(
Queue<std::shared_ptr<T>>& in,
const Function& func,
78 Queue<std::shared_ptr<T>>& out)
79 : in_(in), out_(out), function_(func)
85 std::shared_ptr<T> element;
91 if (function_(*element))
118 PtrCompare(
const Compare& c)
122 bool operator()(
const std::shared_ptr<T>& lhs,
123 const std::shared_ptr<T>& rhs)
const 125 YAT_ASSERT(lhs.get());
126 YAT_ASSERT(rhs.get());
127 return compare_(*lhs, *rhs);
137 Writer(OutputIterator out, std::vector<
Queue<std::shared_ptr<T>>>& Qs,
138 const Compare& compare)
139 : out_(out), queue_(Qs), buffer_(PtrCompare(compare))
143 void operator()(
void)
145 for (
size_t i=0; i<queue_.size(); ++i)
148 while (!buffer_.empty()) {
149 size_t idx = buffer_.begin()->second;
150 *out_ = *buffer_.begin()->first;
152 buffer_.erase(buffer_.begin());
158 void read(
size_t idx)
160 YAT_ASSERT(idx < queue_.size());
161 std::shared_ptr<T> element;
162 queue_[idx].pop(element);
167 buffer_.insert(buffer_.end(), std::make_pair(element, idx));
171 std::vector<Queue<std::shared_ptr<T>>>& queue_;
172 std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_;
175 std::vector<std::thread> threads;
180 YAT_ASSERT(n_threads);
186 std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2);
189 threads.push_back(std::thread(Reader(begin, end, queue)));
190 for (
size_t i=0; i<n_threads; ++i)
191 threads.push_back(std::thread(Worker(queue,
function, Qs[i])));
192 threads.push_back(std::thread(Writer(out, Qs, compare)));
195 for (
size_t i=0; i<threads.size(); ++i)
222 template<
typename InputIterator,
typename OutputIterator,
class Function>
224 OutputIterator out,
size_t cap1,
size_t n_threads,
225 size_t cap2, Function
function)
227 typedef typename std::iterator_traits<InputIterator>::value_type T;
228 std::less<T> compare;
229 multiprocess(begin, end, out, cap1, n_threads, cap2,
function, compare);
void multiprocess(InputIterator begin, InputIterator end, OutputIterator out, size_t cap1, size_t n_threads, size_t cap2, Function function, Compare compare)
Definition: multiprocess.h:46
The Department of Theoretical Physics namespace as we define it.
Multi-thread safe queue.
Definition: Queue.h:58
size_t capacity(void)
Definition: BasicQueue.h:77