yat/utility/multiprocess.h

Code
Comments
Other
Rev Date Author Line
4044 01 Mar 21 peter 1 #ifndef _theplu_yat_utility_multi_processor_
4044 01 Mar 21 peter 2 #define _theplu_yat_utility_multi_processor_
4044 01 Mar 21 peter 3
4044 01 Mar 21 peter 4 // $Id$
4044 01 Mar 21 peter 5
4044 01 Mar 21 peter 6 /*
4044 01 Mar 21 peter 7   Copyright (C) 2021 Peter Johansson
4044 01 Mar 21 peter 8
4044 01 Mar 21 peter 9   This file is part of the yat library, http://dev.thep.lu.se/yat
4044 01 Mar 21 peter 10
4044 01 Mar 21 peter 11   The yat library is free software; you can redistribute it and/or
4044 01 Mar 21 peter 12   modify it under the terms of the GNU General Public License as
4044 01 Mar 21 peter 13   published by the Free Software Foundation; either version 3 of the
4044 01 Mar 21 peter 14   License, or (at your option) any later version.
4044 01 Mar 21 peter 15
4044 01 Mar 21 peter 16   The yat library is distributed in the hope that it will be useful,
4044 01 Mar 21 peter 17   but WITHOUT ANY WARRANTY; without even the implied warranty of
4044 01 Mar 21 peter 18   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
4044 01 Mar 21 peter 19   General Public License for more details.
4044 01 Mar 21 peter 20
4044 01 Mar 21 peter 21   You should have received a copy of the GNU General Public License
4044 01 Mar 21 peter 22   along with yat. If not, see <http://www.gnu.org/licenses/>.
4044 01 Mar 21 peter 23 */
4044 01 Mar 21 peter 24
4044 01 Mar 21 peter 25 #include "Queue.h"
4044 01 Mar 21 peter 26 #include "yat_assert.h"
4044 01 Mar 21 peter 27
4047 01 Mar 21 peter 28 #include <iterator>
4047 01 Mar 21 peter 29 #include <map>
4044 01 Mar 21 peter 30 #include <thread>
4044 01 Mar 21 peter 31 #include <vector>
4044 01 Mar 21 peter 32
4044 01 Mar 21 peter 33 namespace theplu {
4044 01 Mar 21 peter 34 namespace yat {
4044 01 Mar 21 peter 35 namespace utility {
4044 01 Mar 21 peter 36
4044 01 Mar 21 peter 37   /**
4044 01 Mar 21 peter 38      Same as
4044 01 Mar 21 peter 39      multiprocess(begin, end, out, cap1, n_threads, cap2, function)
4044 01 Mar 21 peter 40      but use \a compare instead of std::less<T>
4044 01 Mar 21 peter 41
4044 01 Mar 21 peter 42      \since New in yat 0.19
4044 01 Mar 21 peter 43    */
4044 01 Mar 21 peter 44   template<typename InputIterator, typename OutputIterator,
4044 01 Mar 21 peter 45            class Function, class Compare>
4044 01 Mar 21 peter 46   void multiprocess(InputIterator begin, InputIterator end,
4044 01 Mar 21 peter 47                     OutputIterator out, size_t cap1, size_t n_threads,
4044 01 Mar 21 peter 48                     size_t cap2, Function function, Compare compare)
4044 01 Mar 21 peter 49   {
4044 01 Mar 21 peter 50     typedef typename std::iterator_traits<InputIterator>::value_type T;
4044 01 Mar 21 peter 51
4044 01 Mar 21 peter 52     class Reader
4044 01 Mar 21 peter 53     {
4044 01 Mar 21 peter 54     public:
4044 01 Mar 21 peter 55       Reader(InputIterator first, InputIterator last,
4044 01 Mar 21 peter 56              Queue<std::shared_ptr<T>>& queue)
4044 01 Mar 21 peter 57         : first_(first), last_(last), queue_(queue)
4044 01 Mar 21 peter 58       {}
4044 01 Mar 21 peter 59
4044 01 Mar 21 peter 60       void operator()(void)
4044 01 Mar 21 peter 61       {
4044 01 Mar 21 peter 62         for (; first_!=last_; ++first_)
4044 01 Mar 21 peter 63           queue_.push(std::make_shared<T>(*first_));
4044 01 Mar 21 peter 64         // send a kill pill to workers
4044 01 Mar 21 peter 65         queue_.push(std::shared_ptr<T>());
4044 01 Mar 21 peter 66       }
4044 01 Mar 21 peter 67     private:
4044 01 Mar 21 peter 68       InputIterator first_;
4044 01 Mar 21 peter 69       InputIterator last_;
4044 01 Mar 21 peter 70       Queue<std::shared_ptr<T>>& queue_;
4044 01 Mar 21 peter 71     };
4044 01 Mar 21 peter 72
4044 01 Mar 21 peter 73
4044 01 Mar 21 peter 74     class Worker
4044 01 Mar 21 peter 75     {
4044 01 Mar 21 peter 76     public:
4044 01 Mar 21 peter 77       Worker(Queue<std::shared_ptr<T>>& in, const Function& func,
4044 01 Mar 21 peter 78              Queue<std::shared_ptr<T>>& out)
4044 01 Mar 21 peter 79         : in_(in), out_(out), function_(func)
4044 01 Mar 21 peter 80       {}
4044 01 Mar 21 peter 81
4044 01 Mar 21 peter 82
4044 01 Mar 21 peter 83       void operator()(void)
4044 01 Mar 21 peter 84       {
4044 01 Mar 21 peter 85         std::shared_ptr<T> element;
4044 01 Mar 21 peter 86         while (true) {
4044 01 Mar 21 peter 87           in_.pop(element);
4044 01 Mar 21 peter 88           if (element) {
4044 01 Mar 21 peter 89             // Do the work and if function returns true, push worked
4044 01 Mar 21 peter 90             // element into out_ queue.
4044 01 Mar 21 peter 91             if (function_(*element))
4044 01 Mar 21 peter 92               out_.push(element);
4044 01 Mar 21 peter 93           }
4044 01 Mar 21 peter 94           else {
4044 01 Mar 21 peter 95             // When a Worker receive a kill pill, share it with other
4046 01 Mar 21 peter 96             // workers by pushing it into in_, and push it to the
4044 01 Mar 21 peter 97             // Writer to inform that this queue is completed.
4044 01 Mar 21 peter 98             in_.push(element);
4044 01 Mar 21 peter 99             out_.push(element);
4044 01 Mar 21 peter 100             // After that we are done, and can return home.
4044 01 Mar 21 peter 101             return;
4044 01 Mar 21 peter 102           }
4044 01 Mar 21 peter 103         }
4044 01 Mar 21 peter 104       }
4044 01 Mar 21 peter 105
4044 01 Mar 21 peter 106
4044 01 Mar 21 peter 107     private:
4044 01 Mar 21 peter 108       Queue<std::shared_ptr<T>>& in_;
4044 01 Mar 21 peter 109       Queue<std::shared_ptr<T>>& out_;
4044 01 Mar 21 peter 110       Function function_;
4044 01 Mar 21 peter 111       size_t cap_;
4044 01 Mar 21 peter 112     };
4044 01 Mar 21 peter 113
4044 01 Mar 21 peter 114
4044 01 Mar 21 peter 115     class PtrCompare
4044 01 Mar 21 peter 116     {
4044 01 Mar 21 peter 117     public:
4044 01 Mar 21 peter 118       PtrCompare(const Compare& c)
4044 01 Mar 21 peter 119         : compare_(c)
4044 01 Mar 21 peter 120       {}
4044 01 Mar 21 peter 121
4044 01 Mar 21 peter 122       bool operator()(const std::shared_ptr<T>& lhs,
4044 01 Mar 21 peter 123                       const std::shared_ptr<T>& rhs) const
4044 01 Mar 21 peter 124       {
4044 01 Mar 21 peter 125         YAT_ASSERT(lhs.get());
4044 01 Mar 21 peter 126         YAT_ASSERT(rhs.get());
4044 01 Mar 21 peter 127         return compare_(*lhs, *rhs);
4044 01 Mar 21 peter 128       }
4044 01 Mar 21 peter 129     private:
4044 01 Mar 21 peter 130       Compare compare_;
4044 01 Mar 21 peter 131     };
4044 01 Mar 21 peter 132
4044 01 Mar 21 peter 133
4044 01 Mar 21 peter 134     class Writer
4044 01 Mar 21 peter 135     {
4044 01 Mar 21 peter 136     public:
4044 01 Mar 21 peter 137       Writer(OutputIterator out, std::vector<Queue<std::shared_ptr<T>>>& Qs,
4044 01 Mar 21 peter 138              const Compare& compare)
4044 01 Mar 21 peter 139         : out_(out), queue_(Qs), buffer_(PtrCompare(compare))
4044 01 Mar 21 peter 140       {}
4044 01 Mar 21 peter 141
4044 01 Mar 21 peter 142
4044 01 Mar 21 peter 143       void operator()(void)
4044 01 Mar 21 peter 144       {
4044 01 Mar 21 peter 145         for (size_t i=0; i<queue_.size(); ++i)
4044 01 Mar 21 peter 146           read(i);
4044 01 Mar 21 peter 147
4044 01 Mar 21 peter 148         while (!buffer_.empty()) {
4044 01 Mar 21 peter 149           size_t idx = buffer_.begin()->second;
4044 01 Mar 21 peter 150           *out_ = *buffer_.begin()->first;
4044 01 Mar 21 peter 151           ++out_;
4044 01 Mar 21 peter 152           buffer_.erase(buffer_.begin());
4044 01 Mar 21 peter 153           // read from the same queue erased element came from
4044 01 Mar 21 peter 154           read(idx);
4044 01 Mar 21 peter 155         }
4044 01 Mar 21 peter 156       }
4044 01 Mar 21 peter 157     private:
4044 01 Mar 21 peter 158       void read(size_t idx)
4044 01 Mar 21 peter 159       {
4044 01 Mar 21 peter 160         YAT_ASSERT(idx < queue_.size());
4044 01 Mar 21 peter 161         std::shared_ptr<T> element;
4044 01 Mar 21 peter 162         queue_[idx].pop(element);
4044 01 Mar 21 peter 163         // If element was not "null" insert it into buffer. If element
4044 01 Mar 21 peter 164         // was a "null", size of buffer is efectively decreasing by
4044 01 Mar 21 peter 165         // one untile we reach zero-size.
4044 01 Mar 21 peter 166         if (element)
4044 01 Mar 21 peter 167           buffer_.insert(buffer_.end(), std::make_pair(element, idx));
4044 01 Mar 21 peter 168       }
4044 01 Mar 21 peter 169
4044 01 Mar 21 peter 170       OutputIterator out_;
4044 01 Mar 21 peter 171       std::vector<Queue<std::shared_ptr<T>>>& queue_;
4044 01 Mar 21 peter 172       std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_;
4044 01 Mar 21 peter 173     };
4044 01 Mar 21 peter 174
4044 01 Mar 21 peter 175     std::vector<std::thread> threads;
4044 01 Mar 21 peter 176     // queue used to communicate between reader and workers
4044 01 Mar 21 peter 177     Queue<std::shared_ptr<T>> queue;
4044 01 Mar 21 peter 178     queue.capacity(cap1);
4044 01 Mar 21 peter 179
4044 01 Mar 21 peter 180     YAT_ASSERT(n_threads);
4044 01 Mar 21 peter 181     // queues used to communicate between workers and writer; each
4044 01 Mar 21 peter 182     // worker has its own queue and it's the job of the writer to
4044 01 Mar 21 peter 183     // merge them into a sorted output.
4044 01 Mar 21 peter 184     Queue<std::shared_ptr<T>> queue2;
4044 01 Mar 21 peter 185     queue2.capacity(cap2);
4044 01 Mar 21 peter 186     std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2);
4044 01 Mar 21 peter 187
4044 01 Mar 21 peter 188     // launch threads
4044 01 Mar 21 peter 189     threads.push_back(std::thread(Reader(begin, end, queue)));
4044 01 Mar 21 peter 190     for (size_t i=0; i<n_threads; ++i)
4044 01 Mar 21 peter 191       threads.push_back(std::thread(Worker(queue, function, Qs[i])));
4044 01 Mar 21 peter 192     threads.push_back(std::thread(Writer(out, Qs, compare)));
4044 01 Mar 21 peter 193
4044 01 Mar 21 peter 194     // wait for threads to complete
4044 01 Mar 21 peter 195     for (size_t i=0; i<threads.size(); ++i)
4044 01 Mar 21 peter 196       threads[i].join();
4044 01 Mar 21 peter 197   }
4044 01 Mar 21 peter 198
4044 01 Mar 21 peter 199
4044 01 Mar 21 peter 200   /**
4044 01 Mar 21 peter 201      Function reads the sorted range [\c begin, \c end), applies \c
4044 01 Mar 21 peter 202      function on each element and if \c function returns true, element
4076 25 Aug 21 peter 203      is copied into \c out and \c out is incremented.
4044 01 Mar 21 peter 204
4044 01 Mar 21 peter 205      This is done in n_threads + 2 threads. The reading of element
4044 01 Mar 21 peter 206      from the sorted range is done in one range, the calling of \c
4044 01 Mar 21 peter 207      function is done in \c n_threads threads, and the merging of the
4044 01 Mar 21 peter 208      results from those calls and copying to \c out is done in one
4044 01 Mar 21 peter 209      thread.
4044 01 Mar 21 peter 210
4044 01 Mar 21 peter 211      The communication between the threads are done in buffer
4044 01 Mar 21 peter 212      containers and to avoid extreme memory usage, the sizes of these
4044 01 Mar 21 peter 213      containers can be capped with parameter \c cap1 and \c
4044 01 Mar 21 peter 214      cap2. There's a single container communicating between reader
4044 01 Mar 21 peter 215      thread and worker threads, and \c cap1 limits the size of the
4044 01 Mar 21 peter 216      container. Each worker has a container communicating with the
4044 01 Mar 21 peter 217      writer thread, so there are \c n_threads such containers and the
4044 01 Mar 21 peter 218      size of each container is limited by \c cap2.
4044 01 Mar 21 peter 219
4044 01 Mar 21 peter 220      \since new in yat 0.19
4044 01 Mar 21 peter 221    */
4044 01 Mar 21 peter 222   template<typename InputIterator, typename OutputIterator, class Function>
4044 01 Mar 21 peter 223   void multiprocess(InputIterator begin, InputIterator end,
4044 01 Mar 21 peter 224                     OutputIterator out, size_t cap1, size_t n_threads,
4044 01 Mar 21 peter 225                     size_t cap2, Function function)
4044 01 Mar 21 peter 226   {
4044 01 Mar 21 peter 227     typedef typename std::iterator_traits<InputIterator>::value_type T;
4044 01 Mar 21 peter 228     std::less<T> compare;
4044 01 Mar 21 peter 229     multiprocess(begin, end, out, cap1, n_threads, cap2, function, compare);
4044 01 Mar 21 peter 230   }
4044 01 Mar 21 peter 231
4044 01 Mar 21 peter 232 }}} // of namespace utility, yat, and theplu
4044 01 Mar 21 peter 233
4044 01 Mar 21 peter 234 #endif