Skip to content

Commit

Permalink
Async-related script cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
sfan5 committed Aug 28, 2021
1 parent 0f8a6d7 commit 6a1424f
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 156 deletions.
10 changes: 2 additions & 8 deletions builtin/async/init.lua
@@ -1,16 +1,10 @@

core.log("info", "Initializing Asynchronous environment")

function core.job_processor(serialized_func, serialized_param)
local func = loadstring(serialized_func)
function core.job_processor(func, serialized_param)
local param = core.deserialize(serialized_param)
local retval = nil

if type(func) == "function" then
retval = core.serialize(func(param))
else
core.log("error", "ASYNC WORKER: Unable to deserialize function")
end
local retval = core.serialize(func(param))

return retval or core.serialize(nil)
end
Expand Down
7 changes: 0 additions & 7 deletions src/gui/guiEngine.cpp
Expand Up @@ -614,10 +614,3 @@ void GUIEngine::stopSound(s32 handle)
{
m_sound_manager->stopSound(handle);
}

/******************************************************************************/
unsigned int GUIEngine::queueAsync(const std::string &serialized_func,
const std::string &serialized_params)
{
return m_script->queueAsync(serialized_func, serialized_params);
}
4 changes: 0 additions & 4 deletions src/gui/guiEngine.h
Expand Up @@ -175,10 +175,6 @@ class GUIEngine {
return m_scriptdir;
}

/** pass async callback to scriptengine **/
unsigned int queueAsync(const std::string &serialized_fct,
const std::string &serialized_params);

private:

/** find and run the main menu script */
Expand Down
108 changes: 56 additions & 52 deletions src/script/cpp_api/s_async.cpp
Expand Up @@ -32,20 +32,19 @@ extern "C" {
#include "filesys.h"
#include "porting.h"
#include "common/c_internal.h"
#include "lua_api/l_base.h"

/******************************************************************************/
AsyncEngine::~AsyncEngine()
{

// Request all threads to stop
for (AsyncWorkerThread *workerThread : workerThreads) {
workerThread->stop();
}


// Wake up all threads
for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
it != workerThreads.end(); ++it) {
for (auto it : workerThreads) {
(void)it;
jobQueueCounter.post();
}

Expand All @@ -68,6 +67,7 @@ AsyncEngine::~AsyncEngine()
/******************************************************************************/
void AsyncEngine::registerStateInitializer(StateInitializer func)
{
FATAL_ERROR_IF(initDone, "Initializer may not be registered after init");
stateInitializers.push_back(func);
}

Expand All @@ -85,47 +85,47 @@ void AsyncEngine::initialize(unsigned int numEngines)
}

/******************************************************************************/
unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
const std::string &params)
u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &&params,
const std::string &mod_origin)
{
jobQueueMutex.lock();
LuaJobInfo toAdd;
toAdd.id = jobIdCounter++;
toAdd.serializedFunction = func;
toAdd.serializedParams = params;
u32 jobId = jobIdCounter++;

jobQueue.push_back(toAdd);
jobQueue.emplace_back();
auto &to_add = jobQueue.back();
to_add.id = jobId;
to_add.function = std::move(func);
to_add.params = std::move(params);
to_add.mod_origin = mod_origin;

jobQueueCounter.post();

jobQueueMutex.unlock();

return toAdd.id;
return jobId;
}

/******************************************************************************/
LuaJobInfo AsyncEngine::getJob()
bool AsyncEngine::getJob(LuaJobInfo *job)
{
jobQueueCounter.wait();
jobQueueMutex.lock();

LuaJobInfo retval;
bool retval = false;

if (!jobQueue.empty()) {
retval = jobQueue.front();
*job = std::move(jobQueue.front());
jobQueue.pop_front();
retval.valid = true;
retval = true;
}
jobQueueMutex.unlock();

return retval;
}

/******************************************************************************/
void AsyncEngine::putJobResult(const LuaJobInfo &result)
void AsyncEngine::putJobResult(LuaJobInfo &&result)
{
resultQueueMutex.lock();
resultQueue.push_back(result);
resultQueue.emplace_back(std::move(result));
resultQueueMutex.unlock();
}

Expand All @@ -134,26 +134,30 @@ void AsyncEngine::step(lua_State *L)
{
int error_handler = PUSH_ERROR_HANDLER(L);
lua_getglobal(L, "core");
resultQueueMutex.lock();

ScriptApiBase *script = ModApiBase::getScriptApiBase(L);

MutexAutoLock autolock(resultQueueMutex);
while (!resultQueue.empty()) {
LuaJobInfo jobDone = resultQueue.front();
LuaJobInfo j = std::move(resultQueue.front());
resultQueue.pop_front();

lua_getfield(L, -1, "async_event_handler");

if (lua_isnil(L, -1)) {
if (lua_isnil(L, -1))
FATAL_ERROR("Async event handler does not exist!");
}

luaL_checktype(L, -1, LUA_TFUNCTION);

lua_pushinteger(L, jobDone.id);
lua_pushlstring(L, jobDone.serializedResult.data(),
jobDone.serializedResult.size());
lua_pushinteger(L, j.id);
lua_pushlstring(L, j.result.data(), j.result.size());

PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
// Call handler
const char *origin = j.mod_origin.empty() ? nullptr : j.mod_origin.c_str();
script->setOriginDirect(origin);
int result = lua_pcall(L, 2, 0, error_handler);
if (result)
script_error(L, result, origin, "<async>");
}
resultQueueMutex.unlock();

lua_pop(L, 2); // Pop core and error handler
}

Expand All @@ -168,8 +172,8 @@ void AsyncEngine::prepareEnvironment(lua_State* L, int top)
/******************************************************************************/
AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
const std::string &name) :
Thread(name),
ScriptApiBase(ScriptingType::Async),
Thread(name),
jobDispatcher(jobDispatcher)
{
lua_State *L = getStack();
Expand All @@ -196,9 +200,9 @@ void* AsyncWorkerThread::run()
{
lua_State *L = getStack();

std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
try {
loadScript(script);
loadMod(getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua",
BUILTIN_MOD_NAME);
} catch (const ModError &e) {
errorstream << "Execution of async base environment failed: "
<< e.what() << std::endl;
Expand All @@ -213,44 +217,44 @@ void* AsyncWorkerThread::run()
}

// Main loop
LuaJobInfo j;
while (!stopRequested()) {
// Wait for job
LuaJobInfo toProcess = jobDispatcher->getJob();

if (!toProcess.valid || stopRequested()) {
if (!jobDispatcher->getJob(&j) || stopRequested())
continue;
}

lua_getfield(L, -1, "job_processor");
if (lua_isnil(L, -1)) {
if (lua_isnil(L, -1))
FATAL_ERROR("Unable to get async job processor!");
}

luaL_checktype(L, -1, LUA_TFUNCTION);

// Call it
lua_pushlstring(L,
toProcess.serializedFunction.data(),
toProcess.serializedFunction.size());
lua_pushlstring(L,
toProcess.serializedParams.data(),
toProcess.serializedParams.size());
if (luaL_loadbuffer(L, j.function.data(), j.function.size(), "=(async)")) {
errorstream << "ASYNC WORKER: Unable to deserialize function" << std::endl;
lua_pushnil(L);
}
lua_pushlstring(L, j.params.data(), j.params.size());

// Call it
setOriginDirect(j.mod_origin.empty() ? nullptr : j.mod_origin.c_str());
int result = lua_pcall(L, 2, 1, error_handler);
if (result) {
PCALL_RES(result);
toProcess.serializedResult = "";
try {
scriptError(result, "<async>");
} catch (const ModError &e) {
errorstream << e.what() << std::endl;
}
} else {
// Fetch result
size_t length;
const char *retval = lua_tolstring(L, -1, &length);
toProcess.serializedResult = std::string(retval, length);
j.result.assign(retval, length);
}

lua_pop(L, 1); // Pop retval

// Put job result
jobDispatcher->putJobResult(toProcess);
if (!j.result.empty())
jobDispatcher->putJobResult(std::move(j));
}

lua_pop(L, 2); // Pop core and error handler
Expand Down
39 changes: 21 additions & 18 deletions src/script/cpp_api/s_async.h
Expand Up @@ -21,7 +21,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,

#include <vector>
#include <deque>
#include <map>

#include "threading/semaphore.h"
#include "threading/thread.h"
Expand All @@ -39,26 +38,29 @@ struct LuaJobInfo
{
LuaJobInfo() = default;

// Function to be called in async environment
std::string serializedFunction = "";
// Parameter to be passed to function
std::string serializedParams = "";
// Result of function call
std::string serializedResult = "";
// Function to be called in async environment (from string.dump)
std::string function;
// Parameter to be passed to function (serialized)
std::string params;
// Result of function call (serialized)
std::string result;
// Name of the mod who invoked this call
std::string mod_origin;
// JobID used to identify a job and match it to callback
unsigned int id = 0;

bool valid = false;
u32 id;
};

// Asynchronous working environment
class AsyncWorkerThread : public Thread, public ScriptApiBase {
class AsyncWorkerThread : public Thread, virtual public ScriptApiBase {
friend class AsyncEngine;
public:
AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);
virtual ~AsyncWorkerThread();

void *run();

protected:
AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);

private:
AsyncEngine *jobDispatcher = nullptr;
};
Expand Down Expand Up @@ -89,7 +91,8 @@ class AsyncEngine {
* @param params Serialized parameters
* @return jobid The job is queued
*/
unsigned int queueAsyncJob(const std::string &func, const std::string &params);
u32 queueAsyncJob(std::string &&func, std::string &&params,
const std::string &mod_origin = "");

/**
* Engine step to process finished jobs
Expand All @@ -102,15 +105,16 @@ class AsyncEngine {
/**
* Get a Job from queue to be processed
* this function blocks until a job is ready
* @return a job to be processed
* @param job a job to be processed
* @return whether a job was available
*/
LuaJobInfo getJob();
bool getJob(LuaJobInfo *job);

/**
* Put a Job result back to result queue
* @param result result of completed job
*/
void putJobResult(const LuaJobInfo &result);
void putJobResult(LuaJobInfo &&result);

/**
* Initialize environment with current registred functions
Expand All @@ -129,11 +133,10 @@ class AsyncEngine {
std::vector<StateInitializer> stateInitializers;

// Internal counter to create job IDs
unsigned int jobIdCounter = 0;
u32 jobIdCounter = 0;

// Mutex to protect job queue
std::mutex jobQueueMutex;

// Job queue
std::deque<LuaJobInfo> jobQueue;

Expand Down
4 changes: 0 additions & 4 deletions src/script/cpp_api/s_base.cpp
Expand Up @@ -331,13 +331,9 @@ void ScriptApiBase::setOriginDirect(const char *origin)

void ScriptApiBase::setOriginFromTableRaw(int index, const char *fxn)
{
#ifdef SCRIPTAPI_DEBUG
lua_State *L = getStack();

m_last_run_mod = lua_istable(L, index) ?
getstringfield_default(L, index, "mod_origin", "") : "";
//printf(">>>> running %s for mod: %s\n", fxn, m_last_run_mod.c_str());
#endif
}

/*
Expand Down
5 changes: 3 additions & 2 deletions src/script/cpp_api/s_base.h
Expand Up @@ -39,7 +39,6 @@ extern "C" {
#include "config.h"

#define SCRIPTAPI_LOCK_DEBUG
#define SCRIPTAPI_DEBUG

// MUST be an invalid mod name so that mods can't
// use that name to bypass security!
Expand Down Expand Up @@ -108,7 +107,9 @@ class ScriptApiBase : protected LuaHelper {
Client* getClient();
#endif

std::string getOrigin() { return m_last_run_mod; }
// IMPORTANT: these cannot be used for any security-related uses, they exist
// only to enrich error messages
const std::string &getOrigin() { return m_last_run_mod; }
void setOriginDirect(const char *origin);
void setOriginFromTableRaw(int index, const char *fxn);

Expand Down

0 comments on commit 6a1424f

Please sign in to comment.