Skip to content

Commit 8b0b857

Browse files
sapiersapier
sapier
authored and
sapier
committedJan 10, 2014
Make MutexQueue use jsemaphore for signaling
1 parent 10fdbf7 commit 8b0b857

13 files changed

+250
-101
lines changed
 

‎src/client.cpp

+16-2
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,20 @@ Client::Client(
286286
}
287287
}
288288

289+
void Client::Stop()
290+
{
291+
//request all client managed threads to stop
292+
m_mesh_update_thread.Stop();
293+
}
294+
295+
bool Client::isShutdown()
296+
{
297+
298+
if (!m_mesh_update_thread.IsRunning()) return true;
299+
300+
return false;
301+
}
302+
289303
Client::~Client()
290304
{
291305
{
@@ -296,7 +310,7 @@ Client::~Client()
296310
m_mesh_update_thread.Stop();
297311
m_mesh_update_thread.Wait();
298312
while(!m_mesh_update_thread.m_queue_out.empty()) {
299-
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
313+
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
300314
delete r.mesh;
301315
}
302316

@@ -692,7 +706,7 @@ void Client::step(float dtime)
692706
while(!m_mesh_update_thread.m_queue_out.empty())
693707
{
694708
num_processed_meshes++;
695-
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
709+
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
696710
MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p);
697711
if(block)
698712
{

‎src/client.h

+8
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,14 @@ class Client : public con::PeerHandler, public InventoryManager, public IGameDef
289289
);
290290

291291
~Client();
292+
293+
/*
294+
request all threads managed by client to be stopped
295+
*/
296+
void Stop();
297+
298+
299+
bool isShutdown();
292300
/*
293301
The name of the local player should already be set when
294302
calling this, as it is sent in the initialization.

‎src/connection.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -592,8 +592,9 @@ void * Connection::Thread()
592592

593593
runTimeouts(dtime);
594594

595+
//NOTE this is only thread safe for ONE consumer thread!
595596
while(!m_command_queue.empty()){
596-
ConnectionCommand c = m_command_queue.pop_front();
597+
ConnectionCommand c = m_command_queue.pop_frontNoEx();
597598
processCommand(c);
598599
}
599600

@@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent()
15561557
e.type = CONNEVENT_NONE;
15571558
return e;
15581559
}
1559-
return m_event_queue.pop_front();
1560+
return m_event_queue.pop_frontNoEx();
15601561
}
15611562

15621563
ConnectionEvent Connection::waitEvent(u32 timeout_ms)

‎src/game.cpp

+21-11
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ class GameGlobalShaderConstantSetter : public IShaderConstantSetter
813813
services->setVertexShaderConstant("animationTimer", &animation_timer_f, 1);
814814

815815
LocalPlayer* player = m_client->getEnv().getLocalPlayer();
816-
v3f eye_position = player->getEyePosition();
816+
v3f eye_position = player->getEyePosition();
817817
services->setPixelShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
818818
services->setVertexShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
819819

@@ -1876,12 +1876,12 @@ void the_game(
18761876
}
18771877
else if(input->wasKeyDown(getKeySetting("keymap_screenshot")))
18781878
{
1879-
irr::video::IImage* const image = driver->createScreenShot();
1880-
if (image) {
1881-
irr::c8 filename[256];
1882-
snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
1879+
irr::video::IImage* const image = driver->createScreenShot();
1880+
if (image) {
1881+
irr::c8 filename[256];
1882+
snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
18831883
g_settings->get("screenshot_path").c_str(),
1884-
device->getTimer()->getRealTime());
1884+
device->getTimer()->getRealTime());
18851885
if (driver->writeImageToFile(image, filename)) {
18861886
std::wstringstream sstr;
18871887
sstr<<"Saved screenshot to '"<<filename<<"'";
@@ -1891,8 +1891,8 @@ void the_game(
18911891
} else{
18921892
infostream<<"Failed to save screenshot '"<<filename<<"'"<<std::endl;
18931893
}
1894-
image->drop();
1895-
}
1894+
image->drop();
1895+
}
18961896
}
18971897
else if(input->wasKeyDown(getKeySetting("keymap_toggle_hud")))
18981898
{
@@ -2263,7 +2263,7 @@ void the_game(
22632263
new MainRespawnInitiator(
22642264
&respawn_menu_active, &client);
22652265
GUIDeathScreen *menu =
2266-
new GUIDeathScreen(guienv, guiroot, -1,
2266+
new GUIDeathScreen(guienv, guiroot, -1,
22672267
&g_menumgr, respawner);
22682268
menu->drop();
22692269

@@ -2755,7 +2755,7 @@ void the_game(
27552755

27562756
// Sign special case, at least until formspec is properly implemented.
27572757
// Deprecated?
2758-
if(meta && meta->getString("formspec") == "hack:sign_text_input"
2758+
if(meta && meta->getString("formspec") == "hack:sign_text_input"
27592759
&& !random_input
27602760
&& !input->isKeyDown(getKeySetting("keymap_sneak")))
27612761
{
@@ -3222,7 +3222,7 @@ void the_game(
32223222

32233223
driver->getOverrideMaterial().Material.ColorMask = irr::video::ECP_RED;
32243224
driver->getOverrideMaterial().EnableFlags = irr::video::EMF_COLOR_MASK;
3225-
driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
3225+
driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
32263226
irr::scene::ESNRP_SOLID +
32273227
irr::scene::ESNRP_TRANSPARENT +
32283228
irr::scene::ESNRP_TRANSPARENT_EFFECT +
@@ -3433,6 +3433,16 @@ void the_game(
34333433
chat_backend.addMessage(L"", L"# Disconnected.");
34343434
chat_backend.addMessage(L"", L"");
34353435

3436+
client.Stop();
3437+
3438+
//force answer all texture and shader jobs (TODO return empty values)
3439+
3440+
while(!client.isShutdown()) {
3441+
tsrc->processQueue();
3442+
shsrc->processQueue();
3443+
sleep_ms(100);
3444+
}
3445+
34363446
// Client scope (client is destructed before destructing *def and tsrc)
34373447
}while(0);
34383448
} // try-catch

‎src/httpfetch.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ class CurlFetchThread : public JThread
594594
*/
595595

596596
while (!m_requests.empty()) {
597-
Request req = m_requests.pop_front();
597+
Request req = m_requests.pop_frontNoEx();
598598
processRequest(req);
599599
}
600600
processQueued(&pool);

‎src/itemdef.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ class CItemDefManager: public IWritableItemDefManager
642642
void processQueue(IGameDef *gamedef)
643643
{
644644
#ifndef SERVER
645+
//NOTE this is only thread safe for ONE consumer thread!
645646
while(!m_get_clientcached_queue.empty())
646647
{
647648
GetRequest<std::string, ClientCached*, u8, u8>

‎src/jthread/jsemaphore.h

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class JSemaphore {
3636

3737
void Post();
3838
void Wait();
39+
bool Wait(unsigned int time_ms);
3940

4041
int GetValue();
4142

‎src/jthread/pthread/jsemaphore.cpp

+31
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
1717
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1818
*/
1919
#include <assert.h>
20+
#include <errno.h>
21+
#include <sys/time.h>
2022
#include "jthread/jsemaphore.h"
23+
2124
#define UNUSED(expr) do { (void)(expr); } while (0)
25+
2226
JSemaphore::JSemaphore() {
2327
int sem_init_retval = sem_init(&m_semaphore,0,0);
2428
assert(sem_init_retval == 0);
@@ -49,6 +53,33 @@ void JSemaphore::Wait() {
4953
UNUSED(sem_wait_retval);
5054
}
5155

56+
bool JSemaphore::Wait(unsigned int time_ms) {
57+
struct timespec waittime;
58+
struct timeval now;
59+
60+
if (gettimeofday(&now, NULL) == -1) {
61+
assert("Unable to get time by clock_gettime!" == 0);
62+
return false;
63+
}
64+
65+
waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000);
66+
waittime.tv_sec = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec;
67+
waittime.tv_nsec %= 1000*1000*1000;
68+
69+
errno = 0;
70+
int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime);
71+
72+
if (sem_wait_retval == 0)
73+
{
74+
return true;
75+
}
76+
else {
77+
assert((errno == ETIMEDOUT) || (errno == EINTR));
78+
return false;
79+
}
80+
return sem_wait_retval == 0 ? true : false;
81+
}
82+
5283
int JSemaphore::GetValue() {
5384

5485
int retval = 0;

‎src/jthread/win32/jsemaphore.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,21 @@ void JSemaphore::Wait() {
5151
INFINITE);
5252
}
5353

54+
bool JSemaphore::Wait(unsigned int time_ms) {
55+
unsigned int retval = WaitForSingleObject(
56+
m_hSemaphore,
57+
time_ms);
58+
59+
if (retval == WAIT_OBJECT_0)
60+
{
61+
return true;
62+
}
63+
else {
64+
assert(retval == WAIT_TIMEOUT);
65+
return false;
66+
}
67+
}
68+
5469
int JSemaphore::GetValue() {
5570

5671
long int retval = 0;

‎src/shader.cpp

+11-13
Original file line numberDiff line numberDiff line change
@@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name)
427427
/* infostream<<"Waiting for shader from main thread, name=\""
428428
<<name<<"\""<<std::endl;*/
429429

430-
try{
431-
while(true) {
432-
// Wait result for a second
433-
GetResult<std::string, u32, u8, u8>
434-
result = result_queue.pop_front(1000);
435-
436-
if (result.key == name) {
437-
return result.item;
438-
}
430+
while(true) {
431+
GetResult<std::string, u32, u8, u8>
432+
result = result_queue.pop_frontNoEx();
433+
434+
if (result.key == name) {
435+
return result.item;
436+
}
437+
else {
438+
errorstream << "Got shader with invalid name: " << result.key << std::endl;
439439
}
440440
}
441-
catch(ItemNotFoundException &e){
442-
errorstream<<"Waiting for shader " << name << " timed out."<<std::endl;
443-
return 0;
444-
}
441+
445442
}
446443

447444
infostream<<"getShaderId(): Failed"<<std::endl;
@@ -537,6 +534,7 @@ void ShaderSource::processQueue()
537534
/*
538535
Fetch shaders
539536
*/
537+
//NOTE this is only thread safe for ONE consumer thread!
540538
if(!m_get_shader_queue.empty()){
541539
GetRequest<std::string, u32, u8, u8>
542540
request = m_get_shader_queue.pop();

‎src/tile.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ void TextureSource::processQueue()
775775
/*
776776
Fetch textures
777777
*/
778+
//NOTE this is only thread safe for ONE consumer thread!
778779
if(!m_get_texture_queue.empty())
779780
{
780781
GetRequest<std::string, u32, u8, u8>

‎src/util/container.h

+106-45
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
2424
#include "../exceptions.h"
2525
#include "../jthread/jmutex.h"
2626
#include "../jthread/jmutexautolock.h"
27-
#include "../porting.h" // For sleep_ms
27+
#include "../jthread/jsemaphore.h"
2828
#include <list>
2929
#include <vector>
3030
#include <map>
@@ -201,6 +201,12 @@ class Queue
201201
++m_list_size;
202202
}
203203

204+
void push_front(T t)
205+
{
206+
m_list.push_front(t);
207+
++m_list_size;
208+
}
209+
204210
T pop_front()
205211
{
206212
if(m_list.empty())
@@ -247,86 +253,141 @@ template<typename T>
247253
class MutexedQueue
248254
{
249255
public:
256+
template<typename Key, typename U, typename Caller, typename CallerData>
257+
friend class RequestQueue;
258+
250259
MutexedQueue()
251260
{
252261
}
253262
bool empty()
254263
{
255264
JMutexAutoLock lock(m_mutex);
256-
return m_list.empty();
265+
return (m_size.GetValue() == 0);
257266
}
258267
void push_back(T t)
259268
{
260269
JMutexAutoLock lock(m_mutex);
261270
m_list.push_back(t);
271+
m_size.Post();
262272
}
263-
T pop_front(u32 wait_time_max_ms=0)
273+
274+
/* this version of pop_front returns a empty element of T on timeout.
275+
* Make sure default constructor of T creates a recognizable "empty" element
276+
*/
277+
T pop_frontNoEx(u32 wait_time_max_ms)
264278
{
265-
u32 wait_time_ms = 0;
279+
if (m_size.Wait(wait_time_max_ms))
280+
{
281+
JMutexAutoLock lock(m_mutex);
266282

267-
for(;;)
283+
typename std::list<T>::iterator begin = m_list.begin();
284+
T t = *begin;
285+
m_list.erase(begin);
286+
return t;
287+
}
288+
else
268289
{
269-
{
270-
JMutexAutoLock lock(m_mutex);
271-
272-
if(!m_list.empty())
273-
{
274-
typename std::list<T>::iterator begin = m_list.begin();
275-
T t = *begin;
276-
m_list.erase(begin);
277-
return t;
278-
}
279-
280-
if(wait_time_ms >= wait_time_max_ms)
281-
throw ItemNotFoundException("MutexedQueue: queue is empty");
282-
}
283-
284-
// Wait a while before trying again
285-
sleep_ms(10);
286-
wait_time_ms += 10;
290+
return T();
287291
}
288292
}
293+
294+
T pop_front(u32 wait_time_max_ms)
295+
{
296+
if (m_size.Wait(wait_time_max_ms))
297+
{
298+
JMutexAutoLock lock(m_mutex);
299+
300+
typename std::list<T>::iterator begin = m_list.begin();
301+
T t = *begin;
302+
m_list.erase(begin);
303+
return t;
304+
}
305+
else
306+
{
307+
throw ItemNotFoundException("MutexedQueue: queue is empty");
308+
}
309+
}
310+
311+
T pop_frontNoEx()
312+
{
313+
m_size.Wait();
314+
315+
JMutexAutoLock lock(m_mutex);
316+
317+
typename std::list<T>::iterator begin = m_list.begin();
318+
T t = *begin;
319+
m_list.erase(begin);
320+
return t;
321+
}
322+
289323
T pop_back(u32 wait_time_max_ms=0)
290324
{
291-
u32 wait_time_ms = 0;
325+
if (m_size.Wait(wait_time_max_ms))
326+
{
327+
JMutexAutoLock lock(m_mutex);
328+
329+
typename std::list<T>::iterator last = m_list.end();
330+
last--;
331+
T t = *last;
332+
m_list.erase(last);
333+
return t;
334+
}
335+
else
336+
{
337+
throw ItemNotFoundException("MutexedQueue: queue is empty");
338+
}
339+
}
340+
341+
/* this version of pop_back returns a empty element of T on timeout.
342+
* Make sure default constructor of T creates a recognizable "empty" element
343+
*/
344+
T pop_backNoEx(u32 wait_time_max_ms=0)
345+
{
346+
if (m_size.Wait(wait_time_max_ms))
347+
{
348+
JMutexAutoLock lock(m_mutex);
292349

293-
for(;;)
350+
typename std::list<T>::iterator last = m_list.end();
351+
last--;
352+
T t = *last;
353+
m_list.erase(last);
354+
return t;
355+
}
356+
else
294357
{
295-
{
296-
JMutexAutoLock lock(m_mutex);
297-
298-
if(!m_list.empty())
299-
{
300-
typename std::list<T>::iterator last = m_list.end();
301-
last--;
302-
T t = *last;
303-
m_list.erase(last);
304-
return t;
305-
}
306-
307-
if(wait_time_ms >= wait_time_max_ms)
308-
throw ItemNotFoundException("MutexedQueue: queue is empty");
309-
}
310-
311-
// Wait a while before trying again
312-
sleep_ms(10);
313-
wait_time_ms += 10;
358+
return T();
314359
}
315360
}
316361

362+
T pop_backNoEx()
363+
{
364+
m_size.Wait();
365+
366+
JMutexAutoLock lock(m_mutex);
367+
368+
typename std::list<T>::iterator last = m_list.end();
369+
last--;
370+
T t = *last;
371+
m_list.erase(last);
372+
return t;
373+
}
374+
375+
protected:
317376
JMutex & getMutex()
318377
{
319378
return m_mutex;
320379
}
321380

381+
// NEVER EVER modify the >>list<< you got by using this function!
382+
// You may only modify it's content
322383
std::list<T> & getList()
323384
{
324385
return m_list;
325386
}
326387

327-
protected:
328388
JMutex m_mutex;
329389
std::list<T> m_list;
390+
JSemaphore m_size;
330391
};
331392

332393
#endif

‎src/util/thread.h

+35-27
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
2424
#include "../jthread/jthread.h"
2525
#include "../jthread/jmutex.h"
2626
#include "../jthread/jmutexautolock.h"
27+
#include "porting.h"
2728

2829
template<typename T>
2930
class MutexedVariable
@@ -123,36 +124,38 @@ class RequestQueue
123124
void add(Key key, Caller caller, CallerData callerdata,
124125
ResultQueue<Key, T, Caller, CallerData> *dest)
125126
{
126-
JMutexAutoLock lock(m_queue.getMutex());
127-
128-
/*
129-
If the caller is already on the list, only update CallerData
130-
*/
131-
for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
132-
i = m_queue.getList().begin();
133-
i != m_queue.getList().end(); ++i)
134127
{
135-
GetRequest<Key, T, Caller, CallerData> &request = *i;
136-
137-
if(request.key == key)
128+
JMutexAutoLock lock(m_queue.getMutex());
129+
130+
/*
131+
If the caller is already on the list, only update CallerData
132+
*/
133+
for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
134+
i = m_queue.getList().begin();
135+
i != m_queue.getList().end(); ++i)
138136
{
139-
for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
140-
i = request.callers.begin();
141-
i != request.callers.end(); ++i)
137+
GetRequest<Key, T, Caller, CallerData> &request = *i;
138+
139+
if(request.key == key)
142140
{
143-
CallerInfo<Caller, CallerData, Key, T> &ca = *i;
144-
if(ca.caller == caller)
141+
for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
142+
i = request.callers.begin();
143+
i != request.callers.end(); ++i)
145144
{
146-
ca.data = callerdata;
147-
return;
145+
CallerInfo<Caller, CallerData, Key, T> &ca = *i;
146+
if(ca.caller == caller)
147+
{
148+
ca.data = callerdata;
149+
return;
150+
}
148151
}
152+
CallerInfo<Caller, CallerData, Key, T> ca;
153+
ca.caller = caller;
154+
ca.data = callerdata;
155+
ca.dest = dest;
156+
request.callers.push_back(ca);
157+
return;
149158
}
150-
CallerInfo<Caller, CallerData, Key, T> ca;
151-
ca.caller = caller;
152-
ca.data = callerdata;
153-
ca.dest = dest;
154-
request.callers.push_back(ca);
155-
return;
156159
}
157160
}
158161

@@ -168,12 +171,17 @@ class RequestQueue
168171
ca.dest = dest;
169172
request.callers.push_back(ca);
170173

171-
m_queue.getList().push_back(request);
174+
m_queue.push_back(request);
175+
}
176+
177+
GetRequest<Key, T, Caller, CallerData> pop(unsigned int timeout_ms)
178+
{
179+
return m_queue.pop_front(timeout_ms);
172180
}
173181

174-
GetRequest<Key, T, Caller, CallerData> pop(bool wait_if_empty=false)
182+
GetRequest<Key, T, Caller, CallerData> pop()
175183
{
176-
return m_queue.pop_front(wait_if_empty);
184+
return m_queue.pop_frontNoEx();
177185
}
178186

179187
void pushResult(GetRequest<Key, T, Caller, CallerData> req,

0 commit comments

Comments
 (0)
Please sign in to comment.