Skip to content

Commit

Permalink
Fixing component migration
Browse files Browse the repository at this point in the history
 - Simplified migration handling in primary namespace
 - Fixing various race conditions
  • Loading branch information
Thomas Heller committed Feb 12, 2018
1 parent 922deeb commit c56b0fd
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 124 deletions.
37 changes: 14 additions & 23 deletions hpx/components/component_storage/server/migrate_from_storage.hpp
Expand Up @@ -146,29 +146,20 @@ namespace hpx { namespace components { namespace server
return make_ready_future(naming::invalid_id);
}

return agas::begin_migration(to_resurrect)
.then(
[to_resurrect, target_locality](
future<std::pair<naming::id_type, naming::address> > && f)
-> future<naming::id_type>
{
// rethrow errors
std::pair<naming::id_type, naming::address> r = f.get();

// retrieve the data from the given storage
typedef typename server::component_storage::migrate_from_here_action
action_type;
return async<action_type>(r.first, to_resurrect.get_gid())
.then(util::bind_back(
&detail::migrate_from_storage_here<Component>,
to_resurrect, r.second, target_locality));
})
.then(
[to_resurrect](future<naming::id_type> && f) -> naming::id_type
{
agas::end_migration(to_resurrect).get();
return f.get();
});
auto r = agas::begin_migration(to_resurrect);

// retrieve the data from the given storage
typedef typename server::component_storage::migrate_from_here_action
action_type;
return async<action_type>(r.first, to_resurrect.get_gid())
.then(util::bind_back(
&detail::migrate_from_storage_here<Component>,
to_resurrect, r.second, target_locality)).then(
[to_resurrect](future<naming::id_type> && f) -> naming::id_type
{
agas::end_migration(to_resurrect);
return f.get();
});
}

template <typename Component>
Expand Down
25 changes: 8 additions & 17 deletions hpx/components/component_storage/server/migrate_to_storage.hpp
Expand Up @@ -180,25 +180,16 @@ namespace hpx { namespace components { namespace server
return make_ready_future(naming::invalid_id);
}

return agas::begin_migration(to_migrate)
.then(
[to_migrate, target_storage](
future<std::pair<naming::id_type, naming::address> > && f)
-> future<naming::id_type>
{
// rethrow errors
std::pair<naming::id_type, naming::address> r = f.get();

// perform actual object migration
typedef server::migrate_to_storage_here_action<Component>
action_type;
return async<action_type>(r.first, to_migrate, r.second,
target_storage);
})
.then(
auto r= agas::begin_migration(to_migrate);

// perform actual object migration
typedef server::migrate_to_storage_here_action<Component>
action_type;
return async<action_type>(r.first, to_migrate, r.second,
target_storage).then(
[to_migrate](future<naming::id_type> && f) -> naming::id_type
{
agas::end_migration(to_migrate).get();
agas::end_migration(to_migrate);
return f.get();
});
}
Expand Down
6 changes: 3 additions & 3 deletions hpx/runtime/agas/addressing_service.hpp
Expand Up @@ -1377,9 +1377,9 @@ struct HPX_EXPORT addressing_service
/// start/stop migration of an object
///
/// \returns Current locality and address of the object to migrate
hpx::future<std::pair<naming::id_type, naming::address> >
begin_migration_async(naming::id_type const& id);
hpx::future<bool> end_migration_async(naming::id_type const& id);
std::pair<naming::id_type, naming::address>
begin_migration(naming::id_type const& id);
bool end_migration(naming::id_type const& id);

/// Maintain list of migrated objects
std::pair<bool, components::pinned_ptr>
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/agas/interface.hpp
Expand Up @@ -481,9 +481,9 @@ HPX_API_EXPORT hpx::future<hpx::id_type> on_symbol_namespace_event(
std::string const& name, bool call_for_past_events);

///////////////////////////////////////////////////////////////////////////////
HPX_API_EXPORT hpx::future<std::pair<naming::id_type, naming::address> >
HPX_API_EXPORT std::pair<naming::id_type, naming::address>
begin_migration(naming::id_type const& id);
HPX_API_EXPORT hpx::future<bool> end_migration(naming::id_type const& id);
HPX_API_EXPORT bool end_migration(naming::id_type const& id);

HPX_API_EXPORT hpx::future<void>
mark_as_migrated(naming::gid_type const& gid,
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/agas/primary_namespace.hpp
Expand Up @@ -55,9 +55,9 @@ struct HPX_EXPORT primary_namespace
naming::address addr() const;
naming::id_type gid() const;

future<std::pair<naming::id_type, naming::address>>
std::pair<naming::id_type, naming::address>
begin_migration(naming::gid_type id);
future<bool> end_migration(naming::gid_type id);
bool end_migration(naming::gid_type id);

bool bind_gid(gva g, naming::gid_type id, naming::gid_type locality);
future<bool> bind_gid_async(gva g, naming::gid_type id, naming::gid_type locality);
Expand Down
10 changes: 1 addition & 9 deletions hpx/runtime/agas/server/primary_namespace.hpp
Expand Up @@ -140,7 +140,7 @@ struct HPX_EXPORT primary_namespace
refcnt_table_type refcnts_;
typedef std::map<
naming::gid_type,
hpx::util::tuple<bool, std::size_t, lcos::local::condition_variable_any>
hpx::util::tuple<bool, std::size_t, lcos::local::detail::condition_variable>
> migration_table_type;

std::string instance_name_;
Expand Down Expand Up @@ -366,7 +366,6 @@ struct HPX_EXPORT primary_namespace
public:
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, allocate);
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, bind_gid);
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, begin_migration);
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, colocate);
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, end_migration);
HPX_DEFINE_COMPONENT_ACTION(primary_namespace, decrement_credit);
Expand Down Expand Up @@ -403,13 +402,6 @@ HPX_REGISTER_ACTION_DECLARATION(
hpx::agas::server::primary_namespace::bind_gid_action,
primary_namespace_bind_gid_action)

HPX_ACTION_USES_MEDIUM_STACK(
hpx::agas::server::primary_namespace::begin_migration_action)

HPX_REGISTER_ACTION_DECLARATION(
hpx::agas::server::primary_namespace::begin_migration_action,
primary_namespace_begin_migration_action)

HPX_ACTION_USES_MEDIUM_STACK(
hpx::agas::server::primary_namespace::end_migration_action)

Expand Down
24 changes: 8 additions & 16 deletions hpx/runtime/components/server/migrate_component.hpp
Expand Up @@ -227,25 +227,17 @@ namespace hpx { namespace components { namespace server
"responsible for managing the address of the given object"));
}

return agas::begin_migration(to_migrate)
.then(
[=](future<std::pair<id_type, naming::address> > && f)
-> future<id_type>
{
// rethrow errors
std::pair<id_type, naming::address> r = f.get();
auto r = agas::begin_migration(to_migrate);

// perform actual object migration
typedef migrate_component_action<
Component, DistPolicy
> action_type;
return async<action_type>(r.first, to_migrate, r.second,
policy);
})
.then(
// perform actual object migration
typedef migrate_component_action<
Component, DistPolicy
> action_type;
return async<action_type>(r.first, to_migrate, r.second,
policy).then(
[to_migrate](future<id_type> && f) -> id_type
{
agas::end_migration(to_migrate).get();
agas::end_migration(to_migrate);
return f.get();
});
}
Expand Down
13 changes: 9 additions & 4 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -71,21 +71,24 @@ namespace hpx { namespace components
if (pin_count_ != ~0x0u)
++pin_count_;
}
void unpin()
bool unpin()
{
bool was_migrated = false;
// make sure to always grab the AGAS lock first
agas::mark_as_migrated(this->gid_,
[this]() mutable -> std::pair<bool, hpx::future<void> >
[this, &was_migrated]() mutable -> std::pair<bool, hpx::future<void> >
{
std::unique_lock<mutex_type> l(mtx_);
was_migrated = this->pin_count_ == ~0x0u;
HPX_ASSERT(this->pin_count_ != 0);
if (this->pin_count_ != ~0x0u)
{
if (--this->pin_count_ == 0)
{
// trigger pending migration if this was the last
// unpin and a migration operation is pending
if (trigger_migration_.valid() && was_marked_for_migration_)
HPX_ASSERT(trigger_migration_.valid());
if (was_marked_for_migration_)
{
was_marked_for_migration_ = false;

Expand All @@ -100,7 +103,9 @@ namespace hpx { namespace components
}
}
return std::make_pair(false, make_ready_future());
});
}).get();

return was_migrated;
}

std::uint32_t pin_count() const
Expand Down
4 changes: 1 addition & 3 deletions hpx/runtime/get_ptr.hpp
Expand Up @@ -54,10 +54,8 @@ namespace hpx
template <typename Component>
void operator()(Component* p)
{
bool was_migrated = p->pin_count() == ~0x0u;
p->unpin();
bool was_migrated = p->unpin();

HPX_ASSERT(was_migrated);
if (was_migrated)
{
components::component_type type =
Expand Down
20 changes: 9 additions & 11 deletions src/runtime/agas/addressing_service.cpp
Expand Up @@ -2668,34 +2668,32 @@ void addressing_service::unmark_as_migrated(
}
}

hpx::future<std::pair<naming::id_type, naming::address> >
addressing_service::begin_migration_async(naming::id_type const& id)
std::pair<naming::id_type, naming::address>
addressing_service::begin_migration(naming::id_type const& id)
{
typedef std::pair<naming::id_type, naming::address> result_type;

if (!id)
{
return hpx::make_exceptional_future<result_type>(
HPX_GET_EXCEPTION(bad_parameter,
"addressing_service::begin_migration_async",
"invalid reference id"));
HPX_THROW_EXCEPTION(bad_parameter,
"addressing_service::begin_migration_async",
"invalid reference id");
}

naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));

return primary_ns_.begin_migration(gid);
}

hpx::future<bool> addressing_service::end_migration_async(
bool addressing_service::end_migration(
naming::id_type const& id
)
{
if (!id)
{
return hpx::make_exceptional_future<bool>(
HPX_GET_EXCEPTION(bad_parameter,
"addressing_service::end_migration_async",
"invalid reference id"));
HPX_THROW_EXCEPTION(bad_parameter,
"addressing_service::end_migration_async",
"invalid reference id");
}

naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
Expand Down
8 changes: 4 additions & 4 deletions src/runtime/agas/interface.cpp
Expand Up @@ -483,17 +483,17 @@ hpx::future<hpx::id_type> on_symbol_namespace_event(
}

///////////////////////////////////////////////////////////////////////////////
hpx::future<std::pair<naming::id_type, naming::address> >
std::pair<naming::id_type, naming::address>
begin_migration(naming::id_type const& id)
{
naming::resolver_client& resolver = naming::get_agas_client();
return resolver.begin_migration_async(id);
return resolver.begin_migration(id);
}

hpx::future<bool> end_migration(naming::id_type const& id)
bool end_migration(naming::id_type const& id)
{
naming::resolver_client& resolver = naming::get_agas_client();
return resolver.end_migration_async(id);
return resolver.end_migration(id);
}

hpx::future<void> mark_as_migrated(naming::gid_type const& gid,
Expand Down
35 changes: 12 additions & 23 deletions src/runtime/agas/primary_namespace.cpp
Expand Up @@ -43,11 +43,6 @@ HPX_REGISTER_ACTION_ID(
primary_namespace_bind_gid_action,
hpx::actions::primary_namespace_bind_gid_action_id)

HPX_REGISTER_ACTION_ID(
primary_namespace::begin_migration_action,
primary_namespace_begin_migration_action,
hpx::actions::primary_namespace_begin_migration_action_id)

HPX_REGISTER_ACTION_ID(
primary_namespace::end_migration_action,
primary_namespace_end_migration_action,
Expand Down Expand Up @@ -169,28 +164,22 @@ namespace hpx { namespace agas {
naming::id_type::unmanaged);
}

future<std::pair<naming::id_type, naming::address>>
std::pair<naming::id_type, naming::address>
primary_namespace::begin_migration(naming::gid_type id)
{
naming::id_type dest = naming::id_type(get_service_instance(id),
naming::id_type::unmanaged);
if (naming::get_locality_from_gid(dest.get_gid()) == hpx::get_locality())
{
return hpx::make_ready_future(server_->begin_migration(id));
}
server::primary_namespace::begin_migration_action action;
return hpx::async(action, std::move(dest), id);
HPX_ASSERT(
naming::get_locality_from_gid(get_service_instance(id)) ==
hpx::get_locality());

return server_->begin_migration(id);
}
future<bool> primary_namespace::end_migration(naming::gid_type id)
bool primary_namespace::end_migration(naming::gid_type id)
{
naming::id_type dest = naming::id_type(get_service_instance(id),
naming::id_type::unmanaged);
if (naming::get_locality_from_gid(dest.get_gid()) == hpx::get_locality())
{
return hpx::make_ready_future(server_->end_migration(id));
}
server::primary_namespace::end_migration_action action;
return hpx::async(action, std::move(dest), id);
HPX_ASSERT(
naming::get_locality_from_gid(get_service_instance(id)) ==
hpx::get_locality());

return server_->end_migration(id);
}

bool primary_namespace::bind_gid(
Expand Down
16 changes: 9 additions & 7 deletions src/runtime/agas/server/primary_namespace_server.cpp
Expand Up @@ -229,6 +229,7 @@ primary_namespace::begin_migration(naming::gid_type id)

std::unique_lock<mutex_type> l(mutex_);

wait_for_migration_locked(l, id, hpx::throws);
resolved_type r = resolve_gid_locked(l, id, hpx::throws);
if (get<0>(r) == naming::invalid_gid)
{
Expand All @@ -250,6 +251,10 @@ primary_namespace::begin_migration(naming::gid_type id)
HPX_ASSERT(p.second);
it = p.first;
}
else
{
HPX_ASSERT(hpx::util::get<0>(it->second) == false);
}

// flag this id as being migrated
hpx::util::get<0>(it->second) = true; //-V601
Expand All @@ -276,15 +281,11 @@ bool primary_namespace::end_migration(naming::gid_type id)
if (it == migrating_objects_.end() || !get<0>(it->second))
return false;

// ignore before notifying everyone about the ended migration.
{
hpx::util::ignore_while_checking<std::unique_lock<mutex_type>> il(&l);
get<2>(it->second).notify_all(hpx::throws);
}

// flag this id as not being migrated anymore
get<0>(it->second) = false;

get<2>(it->second).notify_all(std::move(l), hpx::throws);

return true;
}

Expand All @@ -305,7 +306,8 @@ void primary_namespace::wait_for_migration_locked(

get<2>(it->second).wait(l, ec);

if (--get<1>(it->second) == 0 && !get<0>(it->second))
HPX_ASSERT(hpx::util::get<0>(it->second) == false);
if (--get<1>(it->second) == 0)
migrating_objects_.erase(it);
}
}
Expand Down

0 comments on commit c56b0fd

Please sign in to comment.