cxtream  0.5.1
C++17 data pipeline with Python bindings.
buffer.hpp
1 /****************************************************************************
2  * cxtream library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Author(s) Filip Matzner
5  *
6  * This file is distributed under the MIT License.
7  * See the accompanying file LICENSE.txt for the complete license agreement.
8  ****************************************************************************/
9 
10 #ifndef CXTREAM_CORE_STREAM_BUFFER_HPP
11 #define CXTREAM_CORE_STREAM_BUFFER_HPP
12 
13 #include <cxtream/core/thread.hpp>
14 
15 #include <range/v3/core.hpp>
16 #include <range/v3/view/all.hpp>
17 #include <range/v3/view/view.hpp>
18 
19 #include <climits>
20 #include <deque>
21 #include <future>
22 
23 namespace cxtream::stream {
24 
25 template<typename Rng>
26 struct buffer_view : ranges::view_facade<buffer_view<Rng>> {
27 private:
29  friend ranges::range_access;
31 
32  Rng rng_;
33  std::size_t n_;
34 
35  struct cursor {
36  private:
37  buffer_view<Rng>* rng_ = nullptr;
38  ranges::iterator_t<Rng> it_ = {};
39 
40  std::size_t n_;
41  std::deque<std::shared_future<ranges::range_value_type_t<Rng>>> buffer_;
42 
43  void pop_buffer()
44  {
45  if (!buffer_.empty()) {
46  buffer_.pop_front();
47  }
48  }
49 
50  void fill_buffer()
51  {
52  while (it_ != ranges::end(rng_->rng_) && buffer_.size() < n_) {
53  auto task = [it = it_]() { return *it; };
54  buffer_.emplace_back(global_thread_pool.enqueue(std::move(task)));
55  ++it_;
56  }
57  }
58 
59  public:
60  cursor() = default;
61  explicit cursor(buffer_view<Rng>& rng)
62  : rng_{&rng}
63  , it_{ranges::begin(rng.rng_)}
64  , n_{rng.n_}
65  {
66  fill_buffer();
67  }
68 
69  decltype(auto) read() const
70  {
71  return buffer_.front().get();
72  }
73 
74  bool equal(ranges::default_sentinel) const
75  {
76  return buffer_.empty() && it_ == ranges::end(rng_->rng_);
77  }
78 
79  bool equal(const cursor& that) const
80  {
81  assert(rng_ == that.rng_);
82  return n_ == that.n_ && it_ == that.it_;
83  }
84 
85  void next()
86  {
87  pop_buffer();
88  fill_buffer();
89  }
90  }; // class buffer_view
91 
92  cursor begin_cursor()
93  {
94  return cursor{*this};
95  }
96 
97 public:
98  buffer_view() = default;
99 
100  buffer_view(Rng rng, std::size_t n)
101  : rng_{rng}
102  , n_{n}
103  {
104  }
105 
106  CONCEPT_REQUIRES(ranges::SizedRange<Rng const>())
107  constexpr ranges::range_size_type_t<Rng> size() const
108  {
109  return ranges::size(rng_);
110  }
111 
112  CONCEPT_REQUIRES(ranges::SizedRange<Rng>())
113  constexpr ranges::range_size_type_t<Rng> size()
114  {
115  return ranges::size(rng_);
116  }
117 };
118 
119 class buffer_fn {
120 private:
122  friend ranges::view::view_access;
124 
125  static auto bind(buffer_fn buffer, std::size_t n = std::numeric_limits<std::size_t>::max())
126  {
127  return ranges::make_pipeable(std::bind(buffer, std::placeholders::_1, n));
128  }
129 
130 public:
131  template<typename Rng, CONCEPT_REQUIRES_(ranges::ForwardRange<Rng>())>
132  buffer_view<ranges::view::all_t<Rng>>
133  operator()(Rng&& rng, std::size_t n = std::numeric_limits<std::size_t>::max()) const
134  {
135  return {ranges::view::all(std::forward<Rng>(rng)), n};
136  }
137 
139  template<typename Rng, CONCEPT_REQUIRES_(!ranges::ForwardRange<Rng>())>
140  void operator()(Rng&&, std::size_t n = 0) const
141  {
142  CONCEPT_ASSERT_MSG(ranges::ForwardRange<Rng>(),
143  "stream::buffer only works on ranges satisfying the ForwardRange concept.");
144  }
146 };
147 
161 constexpr ranges::view::view<buffer_fn> buffer{};
162 
163 } // end namespace cxtream::stream
164 #endif
constexpr ranges::view::view< buffer_fn > buffer
Asynchronously buffers the given range.
Definition: buffer.hpp:161
static thread_pool global_thread_pool
Global thread pool object.
Definition: thread.hpp:98
std::future< std::result_of_t< Fun(Args...)> > enqueue(Fun fun, Args... args)
Definition: thread.hpp:67