Skip to content

Commit

Permalink
Fixing partitioned_vector creation
Browse files Browse the repository at this point in the history
Fixing the size of partitions to avoid race conditions between
possible reallocations during push back and the continuation
to set the local partition data.
  • Loading branch information
Thomas Heller committed Sep 12, 2017
1 parent 61eefd2 commit a77d93f
Showing 1 changed file with 24 additions and 8 deletions.
Expand Up @@ -186,6 +186,10 @@ namespace hpx
server::partitioned_vector_config_data::partition_data
base_type;

partition_data()
: base_type()
{}

partition_data(id_type const& part, std::size_t size,
std::uint32_t locality_id)
: base_type(part, size, locality_id)
Expand Down Expand Up @@ -291,7 +295,7 @@ namespace hpx
);
}
}
wait_all(ptrs);
when_all(ptrs).get();

partition_size_ = get_partition_size();
this->base_type::reset(std::move(id));
Expand Down Expand Up @@ -558,14 +562,19 @@ namespace hpx
std::size_t allocated_size = 0;

std::size_t l = 0;

// Fixing the size of partitions to avoid race conditions between
// possible reallocations during push back and the continuation
// to set the local partition data
partitions_.resize(num_parts);
for (bulk_locality_result const& r: f.get())
{
using naming::get_locality_id_from_id;
std::uint32_t locality = get_locality_id_from_id(r.first);
for (hpx::id_type const& id: r.second)
{
std::size_t size = (std::min)(part_size, size_-allocated_size);
partitions_.push_back(partition_data(id, size, locality));
partitions_[l] = partition_data(id, size, locality);

if (locality == this_locality)
{
Expand All @@ -579,6 +588,7 @@ namespace hpx
)
);
}

++l;

allocated_size += size;
Expand All @@ -590,7 +600,7 @@ namespace hpx
if (size != part_size)
{
partitioned_vector_partition_client(
partitions_.back().partition_
partitions_[l - 1].partition_
).resize(size);
}
break;
Expand All @@ -599,10 +609,13 @@ namespace hpx
{
HPX_ASSERT(size == part_size);
}

HPX_ASSERT(l < num_parts);
}
}
HPX_ASSERT(l == num_parts);

wait_all(ptrs);
when_all(ptrs).get();

// cache our partition size
partition_size_ = get_partition_size();
Expand Down Expand Up @@ -652,13 +665,16 @@ namespace hpx
std::vector<future<void> > ptrs;

partitions_vector_type partitions;
partitions.reserve(rhs.partitions_.size());
// Fixing the size of partitions to avoid race conditions between
// possible reallocations during push back and the continuation
// to set the local partition data
partitions.resize(rhs.partitions_.size());
for (std::size_t i = 0; i != rhs.partitions_.size(); ++i)
{
std::uint32_t locality = rhs.partitions_[i].locality_id_;

partitions.push_back(partition_data(objs[i].get(),
rhs.partitions_[i].size_, locality));
partitions[i] = partition_data(objs[i].get(),
rhs.partitions_[i].size_, locality);

if (locality == this_locality)
{
Expand All @@ -670,7 +686,7 @@ namespace hpx
}
}

wait_all(ptrs);
when_all(ptrs).get();

size_ = rhs.size_;
partition_size_ = rhs.partition_size_;
Expand Down

0 comments on commit a77d93f

Please sign in to comment.