Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 93465e4

Browse files
committedJun 26, 2016
Added Fiber#dispose and auto-cleanup of Fiber.
A Fiber is composed of both managed (ie object memory) resources and unmanaged (ie pthread) resources. When a Fiber completes normally, the unmanaged resources can be reclaimed simply. When a Fiber that has not completed normally goes out of 'scope' (ie is no longer reachable from any reachable object), the garbage collector cannot simply reclaim the managed resources allocated to the Fiber because the Fiber composes native resources (ie the pthread instance). In this case, the Fiber's pthread invoke function needs to be forced to exit so that the pthread instance can be reclaimed.
1 parent faf044a commit 93465e4

15 files changed

+181
-7
lines changed
 

‎core/fiber.rb

+5
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ def transfer(*args)
5454
raise PrimitiveFailure, "Fiber#transfer primitive failed"
5555
end
5656

57+
def dispose
58+
Rubinius.primitive :fiber_dispose
59+
raise PrimitiveFailure, "Fiber#dispose primitive failed"
60+
end
61+
5762
def alive?
5863
status != "dead"
5964
end

‎machine/builtin/fiber.cpp

+51-6
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,35 @@ namespace rubinius {
108108
}
109109
}
110110

111+
void Fiber::cancel(STATE) {
112+
{
113+
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());
114+
115+
vm()->thread_state()->raise_fiber_cancel();
116+
117+
state->vm()->set_suspending();
118+
119+
restart_context(state->vm());
120+
wakeup();
121+
122+
while(vm()->suspended_p()) {
123+
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
124+
vm()->fiber_wait_condition().notify_one();
125+
}
126+
}
127+
128+
vm()->limited_wait_for([this]{ return vm()->running_p(); });
129+
130+
// Release the canceled Fiber.
131+
state->vm()->set_suspended();
132+
133+
vm()->limited_wait_for([this]{ return vm()->zombie_p(); });
134+
135+
vm()->set_canceled();
136+
137+
state->vm()->set_running();
138+
}
139+
111140
void Fiber::suspend_and_continue(STATE) {
112141
UnmanagedPhase unmanaged(state);
113142

@@ -138,6 +167,8 @@ namespace rubinius {
138167
Object* Fiber::return_value(STATE) {
139168
if(vm()->thread_state()->raise_reason() == cNone) {
140169
return state->vm()->thread()->fiber_value();
170+
} else if(vm()->thread_state()->raise_reason() == cFiberCancel) {
171+
return NULL;
141172
} else {
142173
invoke_context()->thread_state()->set_state(vm()->thread_state());
143174
return NULL;
@@ -178,12 +209,14 @@ namespace rubinius {
178209
vm->thread()->fiber_value(state, cNil);
179210
}
180211

181-
if(vm->fiber()->status() == eTransfer) {
182-
// restart the root Fiber
183-
vm->thread()->fiber()->invoke_context(vm);
184-
vm->thread()->fiber()->restart(state);
185-
} else {
186-
vm->fiber()->invoke_context()->fiber()->restart(state);
212+
if(vm->thread_state()->raise_reason() != cFiberCancel) {
213+
if(vm->fiber()->status() == eTransfer) {
214+
// restart the root Fiber
215+
vm->thread()->fiber()->invoke_context(vm);
216+
vm->thread()->fiber()->restart(state);
217+
} else {
218+
vm->fiber()->invoke_context()->fiber()->restart(state);
219+
}
187220
}
188221

189222
{
@@ -378,6 +411,12 @@ namespace rubinius {
378411
return return_value(state);
379412
}
380413

414+
Object* Fiber::dispose(STATE) {
415+
cancel(state);
416+
417+
return this;
418+
}
419+
381420
Object* Fiber::s_yield(STATE, Arguments& args) {
382421
Fiber* fiber = state->vm()->fiber();
383422
OnStack<1> os(state, fiber);
@@ -424,6 +463,12 @@ namespace rubinius {
424463
}
425464

426465
if(fiber->vm()) {
466+
if(!state->shared().halting_p()) {
467+
if(!fiber->vm()->zombie_p()) {
468+
fiber->cancel(state);
469+
}
470+
}
471+
427472
if(fiber->vm()->zombie_p()) {
428473
VM::discard(state, fiber->vm());
429474
fiber->vm(NULL);

‎machine/builtin/fiber.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ namespace rubinius {
123123

124124
void start(STATE, Arguments& args);
125125
void restart(STATE);
126+
void cancel(STATE);
126127
void suspend_and_continue(STATE);
127128

128129
// Rubinius.primitive :fiber_status
@@ -134,6 +135,9 @@ namespace rubinius {
134135
// Rubinius.primitive :fiber_transfer
135136
Object* transfer(STATE, Arguments& args);
136137

138+
// Rubinius.primitive :fiber_dispose
139+
Object* dispose(STATE);
140+
137141
public: /* TypeInfo */
138142

139143
class Info : public TypeInfo {

‎machine/builtin/system.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -1739,6 +1739,9 @@ namespace rubinius {
17391739
case cThreadKill:
17401740
reason = state->symbol("thread_kill");
17411741
break;
1742+
case cFiberCancel:
1743+
reason = state->symbol("fiber_cancel");
1744+
break;
17421745
}
17431746

17441747
tuple->put(state, 0, reason);

‎machine/environment.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,8 @@ namespace rubinius {
546546
void Environment::halt(STATE, int exit_code) {
547547
utilities::thread::Mutex::LockGuard guard(halt_lock_);
548548

549+
state->shared().set_halting();
550+
549551
if(state->shared().config.system_log_lifetime.value) {
550552
logger::write("process: exit: %s %d %fs",
551553
shared->pid.c_str(), exit_code, shared->run_time());
@@ -562,6 +564,8 @@ namespace rubinius {
562564
shared->machine_threads()->shutdown(state);
563565
}
564566

567+
shared->finalizer()->dispose(state);
568+
565569
shared->thread_nexus()->lock(state->vm());
566570

567571
shared->finalizer()->finish(state);
@@ -743,5 +747,7 @@ namespace rubinius {
743747

744748
State main_state(vm);
745749
state->shared().start_signals(&main_state);
750+
751+
state->shared().set_running();
746752
}
747753
}

‎machine/instructions.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ Object* MachineCode::interpreter(STATE, MachineCode* const mcode) {
195195
case cExit:
196196
call_frame->scope->flush_to_heap(state);
197197
return NULL;
198+
case cFiberCancel:
199+
return NULL;
198200
default:
199201
break;
200202
} // switch

‎machine/memory/finalizer.cpp

+38-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@
2525

2626
namespace rubinius {
2727
namespace memory {
28+
void NativeFinalizer::dispose(STATE) {
29+
// TODO: consider building this on the TypeInfo structure.
30+
if(Fiber* fiber = try_as<Fiber>(object())) {
31+
if(!fiber->vm()->zombie_p()) fiber->cancel(state);
32+
}
33+
}
34+
2835
void NativeFinalizer::finalize(STATE) {
2936
(*finalizer_)(state, object());
3037
}
@@ -35,6 +42,9 @@ namespace rubinius {
3542
}
3643
}
3744

45+
void ExtensionFinalizer::dispose(STATE) {
46+
}
47+
3848
void ExtensionFinalizer::finalize(STATE) {
3949
ManagedPhase managed(state);
4050

@@ -84,6 +94,9 @@ namespace rubinius {
8494
}
8595
}
8696

97+
void ManagedFinalizer::dispose(STATE) {
98+
}
99+
87100
void ManagedFinalizer::finalize(STATE) {
88101
ManagedPhase managed(state);
89102

@@ -168,6 +181,7 @@ namespace rubinius {
168181
}
169182

170183
void FinalizerThread::initialize(STATE) {
184+
Thread::create(state, vm());
171185
synchronization_ = new Synchronization();
172186
}
173187

@@ -212,12 +226,35 @@ namespace rubinius {
212226
vm()->metrics().gc.objects_finalized++;
213227
}
214228
}
229+
230+
state->vm()->thread()->vm()->set_zombie(state);
231+
}
232+
233+
void FinalizerThread::dispose(STATE) {
234+
finishing_ = true;
235+
236+
std::lock_guard<std::mutex> guard(list_mutex());
237+
238+
for(FinalizerObjects::iterator i = process_list_.begin();
239+
i != process_list_.end();
240+
++i)
241+
{
242+
FinalizerObject* fo = *i;
243+
fo->dispose(state);
244+
}
245+
246+
for(FinalizerObjects::iterator i = live_list_.begin();
247+
i != live_list_.end();
248+
++i)
249+
{
250+
FinalizerObject* fo = *i;
251+
fo->dispose(state);
252+
}
215253
}
216254

217255
void FinalizerThread::finish(STATE) {
218256
finishing_ = true;
219257

220-
// TODO: cleanup
221258
while(!process_list_.empty()) {
222259
FinalizerObject* fo = process_list_.back();
223260
process_list_.pop_back();

‎machine/memory/finalizer.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace rubinius {
3939
object_ = obj;
4040
}
4141

42+
virtual void dispose(STATE) = 0;
4243
virtual void finalize(STATE) = 0;
4344
virtual void mark(ImmixGC* gc) = 0;
4445
virtual bool match_p(STATE, Object* object, Object* finalizer) = 0;
@@ -53,6 +54,7 @@ namespace rubinius {
5354
, finalizer_(finalizer)
5455
{ }
5556

57+
void dispose(STATE);
5658
void finalize(STATE);
5759
void mark(ImmixGC* gc);
5860
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -67,6 +69,7 @@ namespace rubinius {
6769
, finalizer_(finalizer)
6870
{ }
6971

72+
void dispose(STATE);
7073
void finalize(STATE);
7174
void mark(ImmixGC* gc);
7275
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -81,6 +84,7 @@ namespace rubinius {
8184
, finalizer_(finalizer)
8285
{ }
8386

87+
void dispose(STATE);
8488
void finalize(STATE);
8589
void mark(ImmixGC* gc);
8690
bool match_p(STATE, Object* object, Object* finalizer);
@@ -128,6 +132,7 @@ namespace rubinius {
128132
}
129133

130134
void finish(STATE);
135+
void dispose(STATE);
131136

132137
void native_finalizer(STATE, Object* obj, FinalizerFunction func);
133138
void extension_finalizer(STATE, Object* obj, FinalizerFunction func);

‎machine/memory/immix_marker.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ namespace memory {
108108
}
109109

110110
state->memory()->clear_mature_mark_in_progress();
111+
112+
state->vm()->thread()->vm()->set_zombie(state);
111113
}
112114
}
113115
}

‎machine/raise_reason.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace rubinius {
1010
cExit,
1111
cCatchThrow,
1212
cThreadKill,
13+
cFiberCancel,
1314
};
1415
}
1516

‎machine/shared_state.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ namespace rubinius {
5858
, type_info_lock_()
5959
, code_resource_lock_()
6060
, use_capi_lock_(false)
61+
, phase_(eBooting)
6162
, om(NULL)
6263
, global_cache(new GlobalCache)
6364
, config(config)

‎machine/shared_state.hpp

+34
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "capi/capi_constants.h"
2222

2323
#include <unistd.h>
24+
#include <atomic>
2425
#include <string>
2526
#include <vector>
2627

@@ -82,6 +83,13 @@ namespace rubinius {
8283
*/
8384

8485
class SharedState {
86+
public:
87+
enum Phase {
88+
eBooting,
89+
eRunning,
90+
eHalting,
91+
};
92+
8593
private:
8694
ThreadNexus* thread_nexus_;
8795
MachineThreads* machine_threads_;
@@ -126,6 +134,8 @@ namespace rubinius {
126134
bool use_capi_lock_;
127135
int primitive_hits_[Primitives::cTotalPrimitives];
128136

137+
std::atomic<Phase> phase_;
138+
129139
public:
130140
Globals globals;
131141
Memory* om;
@@ -141,6 +151,30 @@ namespace rubinius {
141151
SharedState(Environment* env, Configuration& config, ConfigParser& cp);
142152
~SharedState();
143153

154+
bool booting_p() {
155+
return phase_ == eBooting;
156+
}
157+
158+
void set_booting() {
159+
phase_ = eBooting;
160+
}
161+
162+
bool running_p() {
163+
return phase_ == eRunning;
164+
}
165+
166+
void set_running() {
167+
phase_ = eRunning;
168+
}
169+
170+
bool halting_p() {
171+
return phase_ == eHalting;
172+
}
173+
174+
void set_halting() {
175+
phase_ = eHalting;
176+
}
177+
144178
int size();
145179

146180
void set_initialized() {

0 commit comments

Comments
 (0)
Please sign in to comment.