Skip to content

Commit

Permalink
Merge pull request #2992 from STEllAR-GROUP/fixing_2991
Browse files Browse the repository at this point in the history
Adding non-blocking (on destruction) service executors
  • Loading branch information
hkaiser committed Nov 11, 2017
2 parents 5fcd512 + 3465bc8 commit a7e2523
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 21 deletions.
6 changes: 5 additions & 1 deletion hpx/runtime/threads/executors/service_executors.hpp
Expand Up @@ -68,6 +68,9 @@ namespace hpx { namespace threads { namespace executors
void add_no_count(closure_type&& f);
void thread_wrapper(closure_type&& f);

// detaches this object from the underlying thread pool object
void detach();

protected:
// Return the requested policy element
std::size_t get_policy_element(
Expand All @@ -79,6 +82,7 @@ namespace hpx { namespace threads { namespace executors
mutex_type mtx_;
hpx::util::atomic_count task_count_;
compat::condition_variable shutdown_cv_;
bool blocking_;
};
}

Expand Down Expand Up @@ -179,4 +183,4 @@ namespace hpx { namespace threads { namespace executors

#include <hpx/config/warnings_suffix.hpp>

#endif /*HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP*/
#endif /* HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP */
6 changes: 4 additions & 2 deletions hpx/runtime/threads/run_as_os_thread.hpp
@@ -1,4 +1,4 @@
// Copyright (c) 2016 Hartmut Kaiser
// Copyright (c) 2016-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand Down Expand Up @@ -27,8 +27,10 @@ namespace hpx { namespace threads
HPX_ASSERT(get_self_ptr() != nullptr);

parallel::execution::io_pool_executor scheduler;
return parallel::execution::async_execute(scheduler,
auto result = parallel::execution::async_execute(scheduler,
std::forward<F>(f), std::forward<Ts>(vs)...);
scheduler.detach();
return result;
}
}}

Expand Down
10 changes: 10 additions & 0 deletions hpx/runtime/threads/thread_executor.hpp
Expand Up @@ -196,6 +196,11 @@ namespace hpx { namespace threads
return create_id(reinterpret_cast<std::size_t>(this));
}

virtual void detach()
{
// by default, do nothing
}

protected:
static executor_id create_id(std::size_t id)
{
Expand Down Expand Up @@ -429,6 +434,11 @@ namespace hpx { namespace threads
stacksize, ec);
}

void detach()
{
executor_data_->detach();
}

/// Return a reference to the default executor for this process.
static scheduled_executor& default_executor();
};
Expand Down
50 changes: 32 additions & 18 deletions src/runtime/threads/executors/service_executor.cpp
Expand Up @@ -34,7 +34,8 @@ namespace hpx { namespace threads { namespace executors { namespace detail
service_executor::service_executor(
char const* pool_name, char const* pool_name_suffix)
: pool_(get_thread_pool(pool_name, pool_name_suffix)),
task_count_(0)
task_count_(0),
blocking_(true)
{
if (!pool_) {
HPX_THROW_EXCEPTION(bad_parameter,
Expand All @@ -46,18 +47,27 @@ namespace hpx { namespace threads { namespace executors { namespace detail
service_executor::~service_executor()
{
std::unique_lock<mutex_type> l(mtx_);
while (task_count_ > 0)
if (blocking_)
{
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
while (task_count_ > 0)
{
hpx::this_thread::suspend();
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
{
hpx::this_thread::suspend();
}
}
}
}

void service_executor::detach()
{
std::unique_lock<mutex_type> l(mtx_);
blocking_ = false;
}

void service_executor::thread_wrapper(closure_type&& f) //-V669
{
f(); // execute the actual thread function
Expand All @@ -79,15 +89,17 @@ namespace hpx { namespace threads { namespace executors { namespace detail
typedef void result_type;

thread_wrapper_helper(
service_executor* exec
, service_executor::closure_type&& f
) : exec_(exec)
service_executor* exec, service_executor::closure_type&& f)
: exec_(exec)
, f_(std::move(f))
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->thread_wrapper(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down Expand Up @@ -128,19 +140,21 @@ namespace hpx { namespace threads { namespace executors { namespace detail
{
typedef void result_type;

delayed_add_helper(
service_executor* exec
, service_executor::closure_type&& f
, boost::asio::io_service& io_service
, util::steady_clock::time_point const& abs_time
) : exec_(exec)
delayed_add_helper(service_executor* exec,
service_executor::closure_type&& f,
boost::asio::io_service& io_service,
util::steady_clock::time_point const& abs_time)
: exec_(exec)
, f_(std::move(f))
, timer_(io_service, abs_time)
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->add_no_count(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/threads/CMakeLists.txt
Expand Up @@ -7,6 +7,7 @@
set(tests
block_os_threads_1036
resume_priority
run_as_os_thread_lockup_2991
thread_data_1111
thread_pool_executor_1112
thread_rescheduling
Expand Down
43 changes: 43 additions & 0 deletions tests/regressions/threads/run_as_os_thread_lockup_2991.cpp
@@ -0,0 +1,43 @@
// Copyright (c) 2017 Maciej Brodowicz
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_init.hpp>
#include <hpx/hpx.hpp>
#include <hpx/runtime/threads/run_as_os_thread.hpp>

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void locker()
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";
}

int hpx_main()
{
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";

std::cout << std::this_thread::get_id() << ": about to run on io thread\n";
hpx::threads::run_as_os_thread(locker);
//sleep(2);
}
std::cout << std::this_thread::get_id() << ": exiting\n";

return hpx::finalize();
}

int main(int argc, char **argv)
{
return hpx::init(argc, argv);
}

0 comments on commit a7e2523

Please sign in to comment.