Skip to content

Commit

Permalink
Merge pull request #3087 from STEllAR-GROUP/fixing_3054
Browse files Browse the repository at this point in the history
Fixing assertion in default_distribution_policy
  • Loading branch information
hkaiser committed Jan 1, 2018
2 parents 5555a96 + 3c49dd9 commit 8383d3f
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 47 deletions.
41 changes: 30 additions & 11 deletions hpx/components/containers/container_distribution_policy.hpp
@@ -1,5 +1,5 @@
// Copyright (c) 2014 Bibek Ghimire
// Copyright (c) 2014-2015 Hartmut Kaiser
// Copyright (c) 2014-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand All @@ -12,10 +12,13 @@
#include <hpx/traits/is_distribution_policy.hpp>

#include <hpx/runtime/serialization/serialize.hpp>
#include <hpx/runtime/serialization/shared_ptr.hpp>
#include <hpx/runtime/serialization/vector.hpp>
#include <hpx/util/assert.hpp>

#include <algorithm>
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>
#include <vector>
Expand All @@ -33,12 +36,15 @@ namespace hpx
: num_partitions_(std::size_t(-1))
{}

container_distribution_policy operator()(std::size_t num_partitions) const
container_distribution_policy operator()(
std::size_t num_partitions) const
{
return container_distribution_policy(num_partitions, localities_);
return container_distribution_policy(
num_partitions, get_localities());
}

container_distribution_policy operator()(hpx::id_type const& locality) const
container_distribution_policy operator()(
hpx::id_type const& locality) const
{
return container_distribution_policy(locality);
}
Expand All @@ -47,7 +53,10 @@ namespace hpx
std::vector<id_type> const& localities) const
{
if (num_partitions_ != std::size_t(-1))
return container_distribution_policy(num_partitions_, localities);
{
return container_distribution_policy(
num_partitions_, localities);
}
return container_distribution_policy(localities.size(), localities);
}

Expand Down Expand Up @@ -79,14 +88,25 @@ namespace hpx
///////////////////////////////////////////////////////////////////////
std::size_t get_num_partitions() const
{
std::size_t num_parts = (num_partitions_ == std::size_t(-1)) ?
localities_.size() : num_partitions_;
return (std::max)(num_parts, std::size_t(1));
if (localities_)
{
std::size_t num_parts = (num_partitions_ == std::size_t(-1)) ?
localities_->size() : num_partitions_;
return (std::max)(num_parts, std::size_t(1));
}
return std::size_t(1);
}

std::vector<hpx::id_type> const& get_localities() const
std::vector<hpx::id_type> get_localities() const
{
return localities_;
if (!localities_)
{
// use this locality, if this object was default constructed
return std::vector<id_type>(1, hpx::find_here());
}

HPX_ASSERT(!localities_->empty());
return *localities_;
}

private:
Expand All @@ -104,7 +124,6 @@ namespace hpx
num_partitions_(num_partitions)
{}


container_distribution_policy(std::size_t num_partitions,
std::vector<id_type> && localities)
: components::default_distribution_policy(std::move(localities)),
Expand Down
90 changes: 55 additions & 35 deletions hpx/runtime/components/default_distribution_policy.hpp
Expand Up @@ -20,13 +20,15 @@
#include <hpx/runtime/naming/name.hpp>
#include <hpx/runtime/serialization/serialization_fwd.hpp>
#include <hpx/runtime/serialization/vector.hpp>
#include <hpx/runtime/serialization/shared_ptr.hpp>
#include <hpx/traits/extract_action.hpp>
#include <hpx/traits/is_distribution_policy.hpp>
#include <hpx/traits/promise_local_result.hpp>
#include <hpx/util/assert.hpp>

#include <algorithm>
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>
#include <vector>
Expand All @@ -53,8 +55,7 @@ namespace hpx { namespace components
public:
/// Default-construct a new instance of a \a default_distribution_policy.
/// This policy will represent one locality (the local locality).
default_distribution_policy()
{}
default_distribution_policy() = default;

/// Create a new \a default_distribution policy representing the given
/// set of localities.
Expand All @@ -73,7 +74,7 @@ namespace hpx { namespace components
/// \param locs [in] The list of localities the new instance should
/// represent
default_distribution_policy operator()(
std::vector<id_type> && locs) const
std::vector<id_type>&& locs) const
{
return default_distribution_policy(std::move(locs));
}
Expand Down Expand Up @@ -105,12 +106,15 @@ namespace hpx { namespace components
{
using components::stub_base;

for (hpx::id_type const& loc: localities_)
if (localities_)
{
if (get_num_items(1, loc) != 0)
for (hpx::id_type const& loc: *localities_)
{
return stub_base<Component>::create_async(
loc, std::forward<Ts>(vs)...);
if (get_num_items(1, loc) != 0)
{
return stub_base<Component>::create_async(
loc, std::forward<Ts>(vs)...);
}
}
}

Expand Down Expand Up @@ -144,32 +148,32 @@ namespace hpx { namespace components
{
using components::stub_base;

if (localities_.size() > 1)
if (localities_ && localities_->size() > 1)
{
// schedule creation of all objects across given localities
std::vector<hpx::future<std::vector<hpx::id_type> > > objs;
objs.reserve(localities_.size());
for (hpx::id_type const& loc: localities_)
objs.reserve(localities_->size());
for (hpx::id_type const& loc: *localities_)
{
objs.push_back(stub_base<Component>::bulk_create_async(
loc, get_num_items(count, loc), vs...));
}

// consolidate all results
auto localities = localities_;
return hpx::dataflow(hpx::launch::sync,
[=](std::vector<hpx::future<std::vector<hpx::id_type> > > && v)
mutable -> std::vector<bulk_locality_result>
[localities](
std::vector<hpx::future<std::vector<hpx::id_type> > > && v
) mutable -> std::vector<bulk_locality_result>
{
HPX_ASSERT(localities_.size() == v.size());
HPX_ASSERT(localities->size() == v.size());

std::vector<bulk_locality_result> result;
result.reserve(v.size());

for (std::size_t i = 0; i != v.size(); ++i)
{
result.emplace_back(
std::move(localities_[i]), v[i].get()
);
result.emplace_back((*localities)[i], v[i].get());
}
return result;
},
Expand Down Expand Up @@ -271,44 +275,46 @@ namespace hpx { namespace components
///
std::size_t get_num_localities() const
{
return (std::max)(std::size_t(1), localities_.size());
return !localities_ ? std::size_t(1) : localities_->size();
}

/// Returns the locality which is anticipated to be used for the next
/// async operation
hpx::id_type get_next_target() const
{
return localities_.empty() ? hpx::find_here() : localities_.front();
return !localities_ ? hpx::find_here() : localities_->front();
}

protected:
/// \cond NOINTERNAL
std::size_t
get_num_items(std::size_t items, hpx::id_type const& loc) const
std::size_t get_num_items(
std::size_t items, hpx::id_type const& loc) const
{
// make sure the given id is known to this distribution policy
HPX_ASSERT(
std::find(localities_.begin(), localities_.end(), loc) !=
localities_.end() ||
(localities_.empty() && loc == hpx::find_here())
localities_ &&
std::find(localities_->begin(), localities_->end(), loc) !=
localities_->end()
);

// this distribution policy places an equal number of items onto
// each locality
std::size_t locs = (std::max)(std::size_t(1), localities_.size());
std::size_t locs = localities_->size();

// the overall number of items to create is smaller than the number
// of localities
if (items < locs)
{
auto it = std::find(localities_.begin(), localities_.end(), loc);
std::size_t num_loc = std::distance(localities_.begin(), it);
auto it = std::find(localities_->begin(), localities_->end(), loc);
std::size_t num_loc = std::distance(localities_->begin(), it);
return (items < num_loc) ? 1 : 0;
}

// the last locality might get less items
if (localities_.size() > 1 && loc == localities_.back())
if (locs > 1 && loc == localities_->back())
{
return items - detail::round_to_multiple(items, locs, locs-1);
}

// otherwise just distribute evenly
return (items + locs - 1) / locs;
Expand All @@ -318,18 +324,31 @@ namespace hpx { namespace components
protected:
/// \cond NOINTERNAL
default_distribution_policy(std::vector<id_type> const& localities)
: localities_(localities)
{}
: localities_(std::make_shared<std::vector<id_type>>(localities))
{
if (localities_->empty())
{
HPX_THROW_EXCEPTION(invalid_status,
"default_distribution_policy::default_distribution_policy",
"unexpectedly empty list of localities");
}
}

default_distribution_policy(std::vector<id_type> && localities)
: localities_(std::move(localities))
{}

default_distribution_policy(id_type const& locality)
: localities_(std::make_shared<std::vector<id_type>>(std::move(localities)))
{
localities_.push_back(locality);
if (localities_->empty())
{
HPX_THROW_EXCEPTION(invalid_status,
"default_distribution_policy::default_distribution_policy",
"unexpectedly empty list of localities");
}
}

default_distribution_policy(id_type const& locality)
: localities_(std::make_shared<std::vector<id_type>>(1, locality))
{}

friend class hpx::serialization::access;

template <typename Archive>
Expand All @@ -338,7 +357,8 @@ namespace hpx { namespace components
ar & localities_;
}

std::vector<id_type> localities_; // localities to create things on
// localities to create things on
std::shared_ptr<std::vector<id_type>> localities_;
/// \endcond
};

Expand Down
4 changes: 3 additions & 1 deletion tests/regressions/components/CMakeLists.txt
@@ -1,9 +1,10 @@
# Copyright (c) 2007-2016 Hartmut Kaiser
# Copyright (c) 2007-2017 Hartmut Kaiser
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
bulk_new_3054
client_base_registration
create_n_components_2323
create_remote_component_2334
Expand All @@ -13,6 +14,7 @@ set(tests
returned_client_2150
)

set(bulk_new_3054_PARAMETERS LOCALITIES 2)
set(create_remote_component_2334_PARAMETERS LOCALITIES 2)
set(new_2848_PARAMETERS LOCALITIES 2)

Expand Down
39 changes: 39 additions & 0 deletions tests/regressions/components/bulk_new_3054.cpp
@@ -0,0 +1,39 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/include/components.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <vector>

///////////////////////////////////////////////////////////////////////////////
struct test_server : hpx::components::component_base<test_server>
{
test_server(int) {}
};

typedef hpx::components::component<test_server> server_type;
HPX_REGISTER_COMPONENT(server_type, test_server);

void test_bulk_new()
{
auto locs = hpx::find_all_localities();

std::vector<hpx::id_type> ids =
hpx::new_<test_server[]>(hpx::default_layout(locs), 10, 42).get();

hpx::future<std::vector<hpx::id_type>> ids_f =
hpx::new_<test_server[]>(hpx::default_layout(locs), 10, 42);
}

///////////////////////////////////////////////////////////////////////////////
int main()
{
test_bulk_new();

return 0;
}

0 comments on commit 8383d3f

Please sign in to comment.