cxtream  0.5.1
C++17 data pipeline with Python bindings.
example.cpp
Go to the documentation of this file.
1 /****************************************************************************
2  * cxtream library
3  * Copyright (c) 2017, Cognexa Solutions s.r.o.
4  * Author(s) Filip Matzner
5  *
6  * This file is distributed under the MIT License.
7  * See the accompanying file LICENSE.txt for the complete license agreement.
8  ****************************************************************************/
10 
11 #define BOOST_TEST_DYN_LINK
12 #define BOOST_TEST_MODULE example_test
13 
14 #include <cxtream/core.hpp>
15 
16 #include <boost/test/unit_test.hpp>
17 #include <range/v3/view/zip.hpp>
18 
19 #include <cassert>
20 #include <string>
21 #include <tuple>
22 #include <vector>
23 
24 BOOST_AUTO_TEST_CASE(test_example)
25 {
26  namespace cxs = cxtream::stream;
27  using cxs::from; using cxs::to; using cxs::by; using cxs::dim;
28 
29  CXTREAM_DEFINE_COLUMN(login, std::string) // helper macro to define a column of strings
30  CXTREAM_DEFINE_COLUMN(age, int)
31 
32  std::vector<std::string> logins = {"marry", "ted", "anna", "josh"};
33  std::vector<int> ages = { 24, 41, 16, 59};
34 
35  auto stream = ranges::view::zip(logins, ages)
36  // create a batched stream out of the raw data
37  | cxs::create<login, age>(2)
38  // make everyone older by one year
39  | cxs::transform(from<age>, to<age>, [](int a) { return a + 1; })
40  // increase each letter in the logins by one (i.e., a->b, e->f ...)
41  | cxs::transform(from<login>, to<login>, [](char c) { return c + 1; }, dim<2>)
42  // increase the ages by the length of the login
43  | cxs::transform(from<login, age>, to<age>, [](std::string l, int a) {
44  return a + l.length();
45  })
46  // probabilistically rename 50% of the people to "buzz"
47  | cxs::transform(from<login>, to<login>, 0.5, [](std::string) -> std::string {
48  return "buzz";
49  })
50  // drop the login column from the stream
51  | cxs::drop<login>
52  // introduce the login column back to the stream
53  | cxs::transform(from<age>, to<login>, [](int a) {
54  return "person_" + std::to_string(a) + "_years_old";
55  })
56  // filter only people older than 30 years
57  | cxs::filter(from<login, age>, by<age>, [](int a) { return a > 30; })
58  // asynchronously buffer the stream during iteration
59  | cxs::buffer(2);
60 
61  // extract the ages from the stream to std::vector
62  ages = cxs::unpack(stream, from<age>);
63  std::vector<int> desired = {45, 64};
64  BOOST_TEST(ages == desired, boost::test_tools::per_element());
65  assert((ages == std::vector<int>{45, 64}));
66 }
constexpr auto transform(from_t< FromColumns... > f, to_t< ToColumns... > t, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
Transform a subset of cxtream columns to a different subset of cxtream columns.
Definition: transform.hpp:177
constexpr ranges::view::view< buffer_fn > buffer
Asynchronously buffers the given range.
Definition: buffer.hpp:161
#define CXTREAM_DEFINE_COLUMN(col_name, col_type)
Macro for fast column definition.
Definition: column.hpp:90
std::string to_string(const T &value)
Convert the given type to std::string.
Definition: string.hpp:91
constexpr auto filter(from_t< FromColumns... > f, by_t< ByColumns... > b, Fun fun, dim_t< Dim > d=dim_t< 1 >{})
Filter stream data.
Definition: filter.hpp:127
constexpr auto unpack(Rng &&rng, from_t< FromColumns... > f, dim_t< Dim > d=dim_t< 1 >{})
Unpack a stream into a tuple of ranges.
Definition: unpack.hpp:100