Skip to content

Commit

Permalink
Force-delete remaining channel items on close
Browse files Browse the repository at this point in the history
- this fixes #2890
- flyby: refactored HPX_GET_EXCEPTION macro to support lightweight exception creation
  • Loading branch information
hkaiser committed Sep 15, 2017
1 parent 2945dcd commit dfcb9c4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
8 changes: 5 additions & 3 deletions hpx/lcos/local/channel.hpp
Expand Up @@ -207,14 +207,16 @@ namespace hpx { namespace lcos { namespace local

{
util::scoped_unlock<std::unique_lock<mutex_type> > ul(l);
e = HPX_GET_EXCEPTION(hpx::future_cancelled,
e = HPX_GET_EXCEPTION(
hpx::future_cancelled, hpx::lightweight,
"hpx::lcos::local::close",
"canceled waiting on this entry");
}

// all pending requests which can't be satisfied have to be
// canceled at this point
buffer_.cancel_waiting(e);
// canceled at this point, force deleting possibly waiting
// requests
buffer_.cancel_waiting(e, true);
}

private:
Expand Down
10 changes: 6 additions & 4 deletions hpx/lcos/local/receive_buffer.hpp
Expand Up @@ -198,15 +198,16 @@ namespace hpx { namespace lcos { namespace local
return buffer_map_.empty();
}

void cancel_waiting(std::exception_ptr const& e)
void cancel_waiting(std::exception_ptr const& e,
bool force_delete_entries = false)
{
std::lock_guard<mutex_type> l(mtx_);

iterator end = buffer_map_.end();
for (iterator it = buffer_map_.begin(); it != end; /**/)
{
iterator to_delete = it++;
if (to_delete->second->cancel(e))
if (to_delete->second->cancel(e) || force_delete_entries)
{
buffer_map_.erase(to_delete);
}
Expand Down Expand Up @@ -411,15 +412,16 @@ namespace hpx { namespace lcos { namespace local
return buffer_map_.empty();
}

void cancel_waiting(std::exception_ptr const& e)
void cancel_waiting(std::exception_ptr const& e,
bool force_delete_entries = false)
{
std::lock_guard<mutex_type> l(mtx_);

iterator end = buffer_map_.end();
for (iterator it = buffer_map_.begin(); it != end; /**/)
{
iterator to_delete = it++;
if (to_delete->second->cancel(e))
if (to_delete->second->cancel(e) || force_delete_entries)
{
buffer_map_.erase(to_delete);
}
Expand Down
19 changes: 17 additions & 2 deletions hpx/throw_exception.hpp
Expand Up @@ -12,6 +12,9 @@
#include <hpx/config.hpp>
#include <hpx/error.hpp>
#include <hpx/exception_fwd.hpp>
#include <hpx/util/detail/pp/cat.hpp>
#include <hpx/util/detail/pp/expand.hpp>
#include <hpx/util/detail/pp/nargs.hpp>

#include <boost/current_function.hpp>
#include <boost/system/error_code.hpp>
Expand Down Expand Up @@ -100,8 +103,20 @@ namespace hpx
/**/

///////////////////////////////////////////////////////////////////////////////
#define HPX_GET_EXCEPTION(errcode, f, msg) \
hpx::detail::get_exception(errcode, msg, hpx::plain, f, \
#define HPX_GET_EXCEPTION(...) \
HPX_GET_EXCEPTION_(__VA_ARGS__) \
/**/

#define HPX_GET_EXCEPTION_(...) \
HPX_PP_EXPAND(HPX_PP_CAT( \
HPX_GET_EXCEPTION_, HPX_PP_NARGS(__VA_ARGS__) \
)(__VA_ARGS__)) \
/**/
#define HPX_GET_EXCEPTION_3(errcode, f, msg) \
HPX_GET_EXCEPTION_4(errcode, hpx::plain, f, msg) \
/**/
#define HPX_GET_EXCEPTION_4(errcode, mode, f, msg) \
hpx::detail::get_exception(errcode, msg, mode, f, \
__FILE__, __LINE__) \
/**/

Expand Down
1 change: 1 addition & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -14,6 +14,7 @@ set(tests
broadcast_unwrap_future_2885
broadcast_wait_for_2822
call_promise_get_gid_more_than_once
channel_not_empty_2890
channel_register_as_2722
dataflow_791
dataflow_action_2008
Expand Down
29 changes: 29 additions & 0 deletions tests/regressions/lcos/channel_not_empty_2890.cpp
@@ -0,0 +1,29 @@
// Copyright (c) 2017 Element-126
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/hpx.hpp>
#include <hpx/lcos/local/channel.hpp>
#include <hpx/lcos/channel.hpp>
#include <hpx/util/lightweight_test.hpp>

HPX_REGISTER_CHANNEL(int);

int main()
{
{
hpx::lcos::local::channel<int> ch;
ch.set(0);
ch.close();
}

{
hpx::lcos::channel<int> ch (hpx::find_here());
ch.set(0);
ch.close();
}

return hpx::util::report_errors();
}

0 comments on commit dfcb9c4

Please sign in to comment.