1
1
import asyncio
2
2
3
3
from artiq .management import pyon
4
- from artiq .management .network import AsyncioServer
4
+ from artiq .management .tools import AsyncioServer
5
5
6
6
7
7
_init_string = b"ARTIQ sync_struct\n "
8
8
9
9
10
10
class Subscriber :
11
- def __init__ (self , target_builder , error_cb , notify_cb = None ):
11
+ def __init__ (self , target_builder , notify_cb = None ):
12
12
self .target_builder = target_builder
13
- self .error_cb = error_cb
14
13
self .notify_cb = notify_cb
15
14
16
15
@asyncio .coroutine
@@ -19,7 +18,7 @@ def connect(self, host, port):
19
18
yield from asyncio .open_connection (host , port )
20
19
try :
21
20
self ._writer .write (_init_string )
22
- self ._receive_task = asyncio .Task (self ._receive_cr ())
21
+ self .receive_task = asyncio .Task (self ._receive_cr ())
23
22
except :
24
23
self ._writer .close ()
25
24
del self ._reader
@@ -29,9 +28,9 @@ def connect(self, host, port):
29
28
@asyncio .coroutine
30
29
def close (self ):
31
30
try :
32
- self ._receive_task .cancel ()
31
+ self .receive_task .cancel ()
33
32
try :
34
- yield from asyncio .wait_for (self ._receive_task , None )
33
+ yield from asyncio .wait_for (self .receive_task , None )
35
34
except asyncio .CancelledError :
36
35
pass
37
36
finally :
@@ -41,42 +40,74 @@ def close(self):
41
40
42
41
@asyncio .coroutine
43
42
def _receive_cr (self ):
44
- try :
45
- target = None
46
- while True :
47
- line = yield from self ._reader .readline ()
48
- obj = pyon .decode (line .decode ())
49
- action = obj ["action" ]
50
-
51
- if action == "init" :
52
- target = self .target_builder (obj ["struct" ])
53
- elif action == "append" :
54
- target .append (obj ["x" ])
55
- elif action == "pop" :
56
- target .pop (obj ["i" ])
57
- elif action == "delitem" :
58
- target .__delitem__ (obj ["key" ])
59
- if self .notify_cb is not None :
60
- self .notify_cb ()
61
- except :
62
- self .error_cb ()
63
- raise
43
+ target = None
44
+ while True :
45
+ line = yield from self ._reader .readline ()
46
+ if not line :
47
+ return
48
+ obj = pyon .decode (line .decode ())
49
+ action = obj ["action" ]
50
+
51
+ if action == "init" :
52
+ target = self .target_builder (obj ["struct" ])
53
+ elif action == "append" :
54
+ target .append (obj ["x" ])
55
+ elif action == "insert" :
56
+ target .insert (obj ["i" ], obj ["x" ])
57
+ elif action == "pop" :
58
+ target .pop (obj ["i" ])
59
+ elif action == "delitem" :
60
+ target .__delitem__ (obj ["key" ])
61
+ if self .notify_cb is not None :
62
+ self .notify_cb ()
63
+
64
+
65
+ class Notifier :
66
+ def __init__ (self , backing_struct ):
67
+ self .backing_struct = backing_struct
68
+ self .publisher = None
69
+
70
+ # Backing struct modification methods.
71
+ # All modifications must go through them!
72
+
73
+ def append (self , x ):
74
+ self .backing_struct .append (x )
75
+ if self .publisher is not None :
76
+ self .publisher .publish ({"action" : "append" , "x" : x })
77
+
78
+ def insert (self , i , x ):
79
+ self .backing_struct .insert (i , x )
80
+ if self .publisher is not None :
81
+ self .publisher .publish ({"action" : "insert" , "i" : i , "x" : x })
82
+
83
+ def pop (self , i = - 1 ):
84
+ r = self .backing_struct .pop (i )
85
+ if self .publisher is not None :
86
+ self .publisher .publish ({"action" : "pop" , "i" : i })
87
+ return r
88
+
89
+ def __delitem__ (self , key ):
90
+ self .backing_struct .__delitem__ (key )
91
+ if self .publisher is not None :
92
+ self .publisher .publish ({"action" : "delitem" , "key" : key })
64
93
65
94
66
95
class Publisher (AsyncioServer ):
67
- def __init__ (self , backing_struct ):
96
+ def __init__ (self , notifier ):
68
97
AsyncioServer .__init__ (self )
69
- self .backing_struct = backing_struct
98
+ self .notifier = notifier
70
99
self ._recipients = set ()
71
100
101
+ self .notifier .publisher = self
102
+
72
103
@asyncio .coroutine
73
104
def _handle_connection_cr (self , reader , writer ):
74
105
try :
75
106
line = yield from reader .readline ()
76
107
if line != _init_string :
77
108
return
78
109
79
- obj = {"action" : "init" , "struct" : self .backing_struct }
110
+ obj = {"action" : "init" , "struct" : self .notifier . backing_struct }
80
111
line = pyon .encode (obj ) + "\n "
81
112
writer .write (line .encode ())
82
113
@@ -96,24 +127,8 @@ def _handle_connection_cr(self, reader, writer):
96
127
finally :
97
128
writer .close ()
98
129
99
- def _publish (self , obj ):
130
+ def publish (self , obj ):
100
131
line = pyon .encode (obj ) + "\n "
101
132
line = line .encode ()
102
133
for recipient in self ._recipients :
103
134
recipient .put_nowait (line )
104
-
105
- # Backing struct modification methods.
106
- # All modifications must go through them!
107
-
108
- def append (self , x ):
109
- self .backing_struct .append (x )
110
- self ._publish ({"action" : "append" , "x" : x })
111
-
112
- def pop (self , i = - 1 ):
113
- r = self .backing_struct .pop (i )
114
- self ._publish ({"action" : "pop" , "i" : i })
115
- return r
116
-
117
- def __delitem__ (self , key ):
118
- self .backing_struct .__delitem__ (key )
119
- self ._publish ({"action" : "delitem" , "key" : key })
0 commit comments