6
6
7
7
from artiq .protocols .sync_struct import Subscriber
8
8
from artiq .protocols .pc_rpc import AsyncioClient
9
- from artiq .protocols .logging import parse_log_message , log_with_name
9
+ from artiq .protocols .logging import LogParser
10
10
from artiq .tools import Condition , TaskObject
11
11
12
12
@@ -41,14 +41,14 @@ async def call(self, method, *args, **kwargs):
41
41
try :
42
42
targets , _ = remote .get_rpc_id ()
43
43
remote .select_rpc_target (targets [0 ])
44
- r = await getattr (remote , method )(* args , ** kwargs )
44
+ r = await getattr (remote , method )()
45
45
finally :
46
46
remote .close_rpc ()
47
47
return r
48
48
49
49
async def _ping (self ):
50
50
try :
51
- ok = await asyncio .wait_for (self .call ("ping" ),
51
+ ok = await asyncio .wait_for (self ._call_controller ("ping" ),
52
52
self .ping_timeout )
53
53
if ok :
54
54
self .retry_timer_cur = self .retry_timer
@@ -71,21 +71,8 @@ async def _wait_and_ping(self):
71
71
else :
72
72
break
73
73
74
- async def forward_logs (self , stream ):
75
- source = "controller({})" .format (self .name )
76
- while True :
77
- try :
78
- entry = (await stream .readline ())
79
- if not entry :
80
- break
81
- entry = entry [:- 1 ]
82
- level , name , message = parse_log_message (entry .decode ())
83
- log_with_name (name , level , message , extra = {"source" : source })
84
- except :
85
- logger .debug ("exception in log forwarding" , exc_info = True )
86
- break
87
- logger .debug ("stopped log forwarding of stream %s of %s" ,
88
- stream , self .name )
74
+ def _get_log_source (self ):
75
+ return "controller({})" .format (self .name )
89
76
90
77
async def launcher (self ):
91
78
try :
@@ -96,10 +83,12 @@ async def launcher(self):
96
83
self .process = await asyncio .create_subprocess_exec (
97
84
* shlex .split (self .command ),
98
85
stdout = subprocess .PIPE , stderr = subprocess .PIPE )
99
- asyncio .ensure_future (self .forward_logs (
100
- self .process .stdout ))
101
- asyncio .ensure_future (self .forward_logs (
102
- self .process .stderr ))
86
+ asyncio .ensure_future (
87
+ LogParser (self ._get_log_source ).stream_task (
88
+ self .process .stdout ))
89
+ asyncio .ensure_future (
90
+ LogParser (self ._get_log_source ).stream_task (
91
+ self .process .stderr ))
103
92
await self ._wait_and_ping ()
104
93
except FileNotFoundError :
105
94
logger .warning ("Controller %s failed to start" , self .name )
0 commit comments