Skip to content

Commit

Permalink
Adding force_delete_entries option to channel::close
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Sep 16, 2017
1 parent d74b49a commit f7aed1d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
34 changes: 18 additions & 16 deletions hpx/lcos/channel.hpp
Expand Up @@ -285,24 +285,25 @@ namespace hpx { namespace lcos
}

///////////////////////////////////////////////////////////////////////
void close(launch::apply_policy)
void close(launch::apply_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
hpx::apply(action_type(), this->get_id());
hpx::apply(action_type(), this->get_id(), force_delete_entries);
}
hpx::future<void> close(launch::async_policy)
hpx::future<void> close(
launch::async_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
return hpx::async(action_type(), this->get_id());
return hpx::async(action_type(), this->get_id(), force_delete_entries);
}
void close(launch::sync_policy)
void close(launch::sync_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
action_type()(this->get_id());
action_type()(this->get_id(), force_delete_entries);
}
void close()
void close(bool force_delete_entries = false)
{
close(launch::sync);
close(launch::sync, force_delete_entries);
}

///////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -522,24 +523,25 @@ namespace hpx { namespace lcos
}

///////////////////////////////////////////////////////////////////////
void close(launch::apply_policy)
void close(launch::apply_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
hpx::apply(action_type(), this->get_id());
hpx::apply(action_type(), this->get_id(), force_delete_entries);
}
hpx::future<void> close(launch::async_policy)
hpx::future<void> close(
launch::async_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
return hpx::async(action_type(), this->get_id());
return hpx::async(action_type(), this->get_id(), force_delete_entries);
}
void close(launch::sync_policy)
void close(launch::sync_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
action_type()(this->get_id());
action_type()(this->get_id(), force_delete_entries);
}
void close()
void close(bool force_delete_entries = false)
{
close(launch::sync);
close(launch::sync, force_delete_entries);
}
};
}}
Expand Down
16 changes: 8 additions & 8 deletions hpx/lcos/local/channel.hpp
Expand Up @@ -51,7 +51,7 @@ namespace hpx { namespace lcos { namespace local
virtual bool try_get(std::size_t generation,
hpx::future<T>* f = nullptr) = 0;
virtual hpx::future<void> set(std::size_t generation, T && t) = 0;
virtual void close() = 0;
virtual void close(bool force_delete_entries = false) = 0;

virtual bool requires_delete()
{
Expand Down Expand Up @@ -186,7 +186,7 @@ namespace hpx { namespace lcos { namespace local
return hpx::make_ready_future();
}

void close()
void close(bool force_delete_entries = false)
{
std::unique_lock<mutex_type> l(mtx_);
if(closed_)
Expand Down Expand Up @@ -216,7 +216,7 @@ namespace hpx { namespace lcos { namespace local
// all pending requests which can't be satisfied have to be
// canceled at this point, force deleting possibly waiting
// requests
buffer_.cancel_waiting(e, true);
buffer_.cancel_waiting(e, force_delete_entries);
}

private:
Expand Down Expand Up @@ -454,7 +454,7 @@ namespace hpx { namespace lcos { namespace local
return buffer_.push(std::move(t), l);
}

void close()
void close(bool force_delete_entries = false)
{
std::unique_lock<mutex_type> l(mtx_);

Expand Down Expand Up @@ -693,9 +693,9 @@ namespace hpx { namespace lcos { namespace local
return channel_->set(generation, std::move(val));
}

void close()
void close(bool force_delete_entries = false)
{
channel_->close();
channel_->close(force_delete_entries);
}

///////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -958,9 +958,9 @@ namespace hpx { namespace lcos { namespace local
return channel_->set(generation, hpx::util::unused_type());
}

void close()
void close(bool force_delete_entries = false)
{
channel_->close();
channel_->close(force_delete_entries);
}

///////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions hpx/lcos/server/channel.hpp
Expand Up @@ -104,9 +104,9 @@ namespace hpx { namespace lcos { namespace server
}
HPX_DEFINE_COMPONENT_ACTION(channel, set_generation);

void close()
void close(bool force_delete_entries)
{
channel_.close();
channel_.close(force_delete_entries);
}
HPX_DEFINE_COMPONENT_ACTION(channel, close);

Expand Down
4 changes: 2 additions & 2 deletions tests/regressions/lcos/channel_not_empty_2890.cpp
Expand Up @@ -16,13 +16,13 @@ int main()
{
hpx::lcos::local::channel<int> ch;
ch.set(0);
ch.close();
ch.close(true);
}

{
hpx::lcos::channel<int> ch (hpx::find_here());
ch.set(0);
ch.close();
ch.close(true);
}

return hpx::util::report_errors();
Expand Down

0 comments on commit f7aed1d

Please sign in to comment.