Skip to content

Commit

Permalink
master: TCP server for remote logging
Browse files Browse the repository at this point in the history
sbourdeauducq committed Oct 15, 2015
1 parent 9e2e233 commit cbda753
Showing 4 changed files with 91 additions and 41 deletions.
21 changes: 15 additions & 6 deletions artiq/frontend/artiq_master.py
Original file line number Diff line number Diff line change
@@ -5,9 +5,10 @@
import atexit
import os

from artiq.protocols.pc_rpc import Server
from artiq.protocols.pc_rpc import Server as RPCServer
from artiq.protocols.sync_struct import Publisher
from artiq.master.log import log_args, init_log
from artiq.protocols.logging import Server as LoggingServer
from artiq.master.log import log_args, init_log, log_worker
from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.scheduler import Scheduler
from artiq.master.worker_db import get_last_rid
@@ -27,6 +28,9 @@ def get_argparser():
group.add_argument(
"--port-control", default=3251, type=int,
help="TCP port to listen to for control (default: %(default)d)")
group.add_argument(
"--port-logging", default=1066, type=int,
help="TCP port to listen to for remote logging (default: %(default)d)")

group = parser.add_argument_group("databases")
group.add_argument("--device-db", default="device_db.pyon",
@@ -49,7 +53,7 @@ def get_argparser():

def main():
args = get_argparser().parse_args()
log_buffer, log_forwarder = init_log(args)
log_buffer = init_log(args)
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
@@ -67,7 +71,7 @@ def main():
else:
repo_backend = FilesystemBackend(args.repository)
repository = Repository(repo_backend, device_db.get_device_db,
log_forwarder.log_worker)
log_worker)
atexit.register(repository.close)
repository.scan_async()

@@ -76,14 +80,14 @@ def main():
"get_device": device_db.get,
"get_dataset": dataset_db.get,
"update_dataset": dataset_db.update,
"log": log_forwarder.log_worker
"log": log_worker
}
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
worker_handlers["scheduler_submit"] = scheduler.submit
scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

server_control = Server({
server_control = RPCServer({
"master_device_db": device_db,
"master_dataset_db": dataset_db,
"master_schedule": scheduler,
@@ -104,6 +108,11 @@ def main():
args.bind, args.port_notify))
atexit.register(lambda: loop.run_until_complete(server_notify.stop()))

server_logging = LoggingServer()
loop.run_until_complete(server_logging.start(
args.bind, args.port_logging))
atexit.register(lambda: loop.run_until_complete(server_logging.stop()))

loop.run_forever()

if __name__ == "__main__":
42 changes: 7 additions & 35 deletions artiq/master/log.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
import logging.handlers

from artiq.protocols.sync_struct import Notifier
from artiq.protocols.logging import parse_log_message, log_with_name


class LogBuffer:
@@ -25,38 +26,11 @@ def emit(self, record):
self.log_buffer.log(record.levelno, record.source, record.created, message)


name_to_level = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}


def parse_log_message(msg):
for name, level in name_to_level.items():
if msg.startswith(name + ":"):
remainder = msg[len(name) + 1:]
try:
idx = remainder.index(":")
except:
continue
return level, remainder[:idx], remainder[idx+1:]
return logging.INFO, "print", msg


fwd_logger = logging.getLogger("fwd")


class LogForwarder:
def log_worker(self, rid, message):
level, name, message = parse_log_message(message)
fwd_logger.name = name
fwd_logger.log(level, message,
extra={"source": "worker({})".format(rid)})
log_worker.worker_pass_rid = True
def log_worker(rid, message):
level, name, message = parse_log_message(message)
log_with_name(name, level, message,
extra={"source": "worker({})".format(rid)})
log_worker.worker_pass_rid = True


class SourceFilter:
@@ -120,6 +94,4 @@ def init_log(args):
handler.addFilter(flt)
root_logger.addHandler(handler)

log_forwarder = LogForwarder()

return log_buffer, log_forwarder
return log_buffer
67 changes: 67 additions & 0 deletions artiq/protocols/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asyncio
import logging

from artiq.protocols.asyncio_server import AsyncioServer


_fwd_logger = logging.getLogger("fwd")


def log_with_name(name, *args, **kwargs):
_fwd_logger.name = name
_fwd_logger.log(*args, **kwargs)


_name_to_level = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}


def parse_log_message(msg):
for name, level in _name_to_level.items():
if msg.startswith(name + ":"):
remainder = msg[len(name) + 1:]
try:
idx = remainder.index(":")
except:
continue
return level, remainder[:idx], remainder[idx+1:]
return logging.INFO, "print", msg


_init_string = b"ARTIQ logging\n"


class Server(AsyncioServer):
async def _handle_connection_cr(self, reader, writer):
try:
line = await reader.readline()
if line != _init_string:
return

while True:
line = await reader.readline()
if not line:
break
try:
line = line.decode()
except:
return
line = line[:-1]
linesplit = line.split(":", 4)
if len(linesplit) != 4:
return
source, levelname, name, message = linesplit
try:
level = _name_to_level[levelname]
except KeyError:
return
log_with_name(name, level, message,
extra={"source": source})
finally:
writer.close()
2 changes: 2 additions & 0 deletions doc/manual/default_network_ports.rst
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ Default network ports
+--------------------------+--------------+
| Core device (mon/inj) | 3250 (UDP) |
+--------------------------+--------------+
| Master (logging) | 1066 |
+--------------------------+--------------+
| InfluxDB bridge | 3248 |
+--------------------------+--------------+
| Controller manager | 3249 |

0 comments on commit cbda753

Please sign in to comment.