cxtream  0.5.1
C++17 data pipeline with Python bindings.
batch.hpp
1 /****************************************************************************
2  * cxtream library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Author(s) Filip Matzner
5  *
6  * This file is distributed under the MIT License.
7  * See the accompanying file LICENSE.txt for the complete license agreement.
8  ****************************************************************************/
9 
10 #ifndef CXTREAM_CORE_STREAM_BATCH_HPP
11 #define CXTREAM_CORE_STREAM_BATCH_HPP
12 
13 #include <cxtream/core/utility/tuple.hpp>
14 
15 #include <range/v3/core.hpp>
16 #include <range/v3/view/all.hpp>
17 #include <range/v3/view/view.hpp>
18 
19 #include <algorithm>
20 
21 namespace cxtream::stream {
22 
24 template<typename Tuple>
25 constexpr bool is_same_batch_size(const Tuple& tuple)
26 {
27  bool same = true;
28  if (std::tuple_size<Tuple>{} > 0) {
29  auto bs = std::get<0>(tuple).value().size();
30  utility::tuple_for_each(tuple, [bs, &same](const auto& column) {
31  same &= (column.value().size() == bs);
32  });
33  }
34  return same;
35 }
36 
38 template<typename Tuple>
39 constexpr std::size_t batch_size(const Tuple& tuple)
40 {
41  static_assert(std::tuple_size<Tuple>{} && "Cannot get batch size if there are no columns");
42  assert(is_same_batch_size(tuple) && "All the columns have to have equal batch size");
43  return std::get<0>(tuple).value().size();
44 }
45 
46 template <typename Rng>
47 struct batch_view : ranges::view_facade<batch_view<Rng>> {
48 private:
50  friend ranges::range_access;
52  Rng rng_;
53  std::size_t n_;
54 
55  struct cursor {
56  private:
57  batch_view<Rng>* rng_ = nullptr;
58  ranges::iterator_t<Rng> it_ = {};
59 
60  using batch_t_ = ranges::range_value_type_t<Rng>;
61  using column_idxs_t = std::make_index_sequence<std::tuple_size<batch_t_>{}>;
62  // the batch into which we accumulate the data
63  // the batch will be a pointer to allow moving from it in const functions
64  std::shared_ptr<batch_t_> batch_ = std::make_shared<batch_t_>();
65 
66  // the subbatch of the original range
67  std::shared_ptr<batch_t_> subbatch_;
68  // the current index into the batch of the original range
69  std::size_t subbatch_idx_ = 0;
70 
71  bool done_ = false;
72 
73  // reserve space in subbatch
74  template <std::size_t... Is>
75  void reserve_batch(std::index_sequence<Is...>)
76  {
77  std::size_t reserve_n = std::min(rng_->n_, std::size_t{1000});
78  (..., (std::get<Is>(*batch_).value().reserve(reserve_n)));
79  }
80 
81  // move i-th element from subbatch_ to batch_
82  template <std::size_t... Is>
83  void move_from_subbatch(std::size_t i, std::index_sequence<Is...>)
84  {
85  (..., (std::get<Is>(*batch_).value().push_back(
86  std::move(std::get<Is>(*subbatch_).value()[i]))));
87  }
88 
89  // find the first non-empty subbatch and return if successful
90  bool find_next()
91  {
92  // do nothing if the end of iteration is reached
93  if (subbatch_idx_ == batch_size(*subbatch_) && it_ == ranges::end(rng_->rng_)) {
94  return false;
95  }
96  // otherwise find the first non-empty subbatch
97  while (subbatch_idx_ == batch_size(*subbatch_)) {
98  if (++it_ == ranges::end(rng_->rng_)) {
99  return false;
100  }
101  subbatch_ = std::make_shared<batch_t_>(*it_);
102  subbatch_idx_ = 0;
103  }
104  return true;
105  }
106 
107  // fill the batch_ with the elements from the current subbatch_
108  void fill_batch()
109  {
110  reserve_batch(column_idxs_t{});
111  do {
112  move_from_subbatch(subbatch_idx_++, column_idxs_t{});
113  } while (batch_size(*batch_) < rng_->n_ && find_next());
114  }
115 
116  public:
117  cursor() = default;
118  explicit cursor(batch_view<Rng>& rng)
119  : rng_{&rng}
120  , it_{ranges::begin(rng_->rng_)}
121  {
122  static_assert(std::tuple_size<std::decay_t<decltype(*batch_)>>{} &&
123  "The range to be batched has to contain at least one column");
124  // do nothing if the subrange is empty
125  if (it_ != ranges::end(rng_->rng_)) {
126  subbatch_ = std::make_shared<batch_t_>(*it_);
127  // if the first subbatch is empty, try to find the next non-empty one
128  if (batch_size(*subbatch_) == 0) next();
129  else fill_batch();
130  }
131  else done_ = true;
132  }
133 
134  decltype(auto) read() const
135  {
136  return *batch_;
137  }
138 
139  bool equal(ranges::default_sentinel) const
140  {
141  return done_;
142  }
143 
144  bool equal(const cursor& that) const
145  {
146  assert(rng_ == that.rng_);
147  return it_ == that.it_ && subbatch_idx_ == that.subbatch_idx_;
148  }
149 
150  void next()
151  {
152  batch_ = std::make_shared<batch_t_>();
153  if (find_next()) fill_batch();
154  else done_ = true;
155  }
156  }; // struct cursor
157 
158  cursor begin_cursor() { return cursor{*this}; }
159 
160 public:
161  batch_view() = default;
162  batch_view(Rng rng, std::size_t n)
163  : rng_{rng}
164  , n_{n}
165  {
166  }
167 }; // class batch_view
168 
169 class batch_fn {
170 private:
172  friend ranges::view::view_access;
174 
175  static auto bind(batch_fn batch, std::size_t n)
176  {
177  return ranges::make_pipeable(std::bind(batch, std::placeholders::_1, n));
178  }
179 
180 public:
181  template <typename Rng, CONCEPT_REQUIRES_(ranges::InputRange<Rng>())>
182  batch_view<ranges::view::all_t<Rng>> operator()(Rng&& rng, std::size_t n) const
183  {
184  return {ranges::view::all(std::forward<Rng>(rng)), n};
185  }
186 }; // class batch_fn
187 
200 constexpr ranges::view::view<batch_fn> batch{};
201 
202 } // namespace cxtream::stream
203 #endif
constexpr auto tuple_for_each(Tuple &&tuple, Fun &&fun)
Apply a function on each element of a tuple.
Definition: tuple.hpp:120
constexpr ranges::view::view< batch_fn > batch
Accumulate the stream and yield batches of a different size.
Definition: batch.hpp:200