Skip to content

Commit

Permalink
Merge pull request #3237 from STEllAR-GROUP/pipeline_example
Browse files Browse the repository at this point in the history
Adding Pipeline example
  • Loading branch information
msimberg committed Mar 14, 2018
2 parents 387857e + 054103f commit 86e53e6
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Expand Up @@ -17,6 +17,7 @@ set(subdirs
jacobi_smp
nqueen
performance_counters
pipeline
quickstart
qt
random_mem_access
Expand Down
18 changes: 18 additions & 0 deletions examples/pipeline/CMakeLists.txt
@@ -0,0 +1,18 @@
# Copyright (c) 2018 Thomas Heller
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(pipeline emitter worker collector)

foreach(app ${pipeline})
add_hpx_executable(${app} SOURCES ${app}.cpp FOLDER "Examples/Pipeline")

add_hpx_pseudo_target(examples.pipeline.${app})

add_hpx_pseudo_dependencies(examples.pipeline
examples.pipeline.${app})

add_hpx_pseudo_dependencies(examples.pipeline.${app}
${app}_exe)
endforeach()
29 changes: 29 additions & 0 deletions examples/pipeline/Readme.md
@@ -0,0 +1,29 @@
<!-- Copyright (c) 2018 Thomas Heller -->
<!-- -->
<!-- Distributed under the Boost Software License, Version 1.0. (See accompanying -->
<!-- file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -->

This example demonstrates a pipeline split up in 3 processes:
- emitter.cpp:
* Process producing input for worker. This serves as the master process
- worker.cpp:
* Process working on inputs. There can be multiple instances of this process
- collector.cpp:
* Process collecting the result of the workers

When using the MPI parcelport, the example can be run like this:

```
mpirun -np 1 ./bin/emitter : -np 1 ./bin/collector : -np N-1 ./bin/worker
```

For elasticity, the applicate can be started as following:

```
host0$ ./bin/emitter --hpx:hpx=<host0> --hpx:console
host1$ ./bin/collector --hpx:hpx=<host1> --hpx:agas=<host0> --hpx:connect --hpx:run-hpx-main
host2$ ./bin/worker --hpx:hpx=<host2> --hpx:agas=<host0> --hpx:connect --hpx:run-hpx-main
host3$ ./bin/worker --hpx:hpx=<host3> --hpx:agas=<host0> --hpx:connect --hpx:run-hpx-main
...
hostN$ ./bin/worker --hpx:hpx=<hostN> --hpx:agas=<host0> --hpx:connect --hpx:run-hpx-main
```
45 changes: 45 additions & 0 deletions examples/pipeline/collector.cpp
@@ -0,0 +1,45 @@
// Copyright (c) 2018 Thomas Heller
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <iostream>

HPX_REGISTER_CHANNEL(int)

hpx::future<void> f3(hpx::lcos::channel<int>& c2)
{
hpx::future<int> f = c2.get();
return f.then(
[&c2](hpx::future<int> f)
{
try
{
std::cout << "Final stage: " << f.get()
<< ". Executed on locality " << hpx::get_locality_id()
<< " " << hpx::get_locality_name()
<< '\n';
return f3(c2);
}
catch(...)
{
std::cout << "Pipeline done.\n";
return hpx::make_ready_future();
}
}
);
}

int main()
{
std::cout << "Starting collector\n";
hpx::lcos::channel<int> c2(hpx::find_here());
c2.register_as("pipeline/collector");

f3(c2).get();

return 0;
}
35 changes: 35 additions & 0 deletions examples/pipeline/emitter.cpp
@@ -0,0 +1,35 @@
// Copyright (c) 2018 Thomas Heller
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <iostream>

HPX_REGISTER_CHANNEL(int)

void f1(hpx::lcos::channel<int>& c1)
{
for (std::size_t i = 0; true; ++i)
{
std::cout << "First Stage: " << i
<< ". Executed on locality " << hpx::get_locality_id()
<< " " << hpx::get_locality_name()
<< '\n';
c1.set(i);
hpx::this_thread::sleep_for(std::chrono::seconds(5));
}
c1.close();
}

int main()
{
hpx::lcos::channel<int> c1(hpx::find_here());
c1.register_as("pipeline/emitter");

f1(c1);

return 0;
}
51 changes: 51 additions & 0 deletions examples/pipeline/worker.cpp
@@ -0,0 +1,51 @@
// Copyright (c) 2018 Thomas Heller
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <iostream>

HPX_REGISTER_CHANNEL(int)

hpx::future<void> f2(hpx::lcos::channel<int>& c1, hpx::lcos::channel<int>& c2)
{
hpx::future<int> f = c1.get();
return f.then(
[&c1, &c2](hpx::future<int> f)
{
try
{
int i = f.get();
std::cout << "Second stage: " << i
<< ". Executed on locality " << hpx::get_locality_id()
<< " " << hpx::get_locality_name()
<< '\n';
c2.set(i + 1);
hpx::this_thread::sleep_for(std::chrono::microseconds(10));
return f2(c1, c2);
}
catch(...)
{
c2.close();
return hpx::make_ready_future();
}
}
);
}

int main()
{
std::cout << "Starting worker\n";
hpx::lcos::channel<int> c1;
hpx::lcos::channel<int> c2;

c1.connect_to("pipeline/emitter");
c2.connect_to("pipeline/collector");

f2(c1, c2).get();

return 0;
}

0 comments on commit 86e53e6

Please sign in to comment.