Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9e96a687e22e
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6c856025cc18
Choose a head ref
  • 3 commits
  • 3 files changed
  • 1 contributor

Commits on Oct 16, 2015

  1. Copy the full SHA
    786dc14 View commit details
  2. Copy the full SHA
    f332c1d View commit details
  3. Copy the full SHA
    6c85602 View commit details
Showing with 65 additions and 31 deletions.
  1. +32 −13 artiq/frontend/artiq_ctlmgr.py
  2. +3 −17 artiq/master/log.py
  3. +30 −1 artiq/protocols/logging.py
45 changes: 32 additions & 13 deletions artiq/frontend/artiq_ctlmgr.py
Original file line number Diff line number Diff line change
@@ -5,12 +5,15 @@
import argparse
import os
import logging
import subprocess
import shlex
import socket

from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient, Server
from artiq.protocols.logging import LogForwarder
from artiq.protocols.logging import (LogForwarder,
parse_log_message, log_with_name,
SourceFilter)
from artiq.tools import TaskObject, Condition


@@ -75,14 +78,36 @@ async def _wait_and_ping(self):
else:
break

async def forward_logs(self, stream):
source = "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)


async def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
self.name, self.command)
try:
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command))
*shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs(
self.process.stdout))
asyncio.ensure_future(self.forward_logs(
self.process.stderr))
await self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name)
@@ -236,9 +261,9 @@ def get_argparser():

group = parser.add_argument_group("verbosity")
group.add_argument("-v", "--verbose", default=0, action="count",
help="increase logging level")
help="increase logging level of the manager process")
group.add_argument("-q", "--quiet", default=0, action="count",
help="decrease logging level")
help="decrease logging level of the manager process")

parser.add_argument(
"-s", "--server", default="::1",
@@ -261,19 +286,13 @@ def get_argparser():
return parser


class SourceAdder:
def filter(self, record):
if not hasattr(record, "source"):
record.source = "ctlmgr"
return True


def main():
args = get_argparser().parse_args()

root_logger = logging.getLogger()
root_logger.setLevel(logging.WARNING + args.quiet*10 - args.verbose*10)
source_adder = SourceAdder()
root_logger.setLevel(logging.NOTSET)
source_adder = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10,
"ctlmgr")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
"%(levelname)s:%(source)s:%(name)s:%(message)s"))
20 changes: 3 additions & 17 deletions artiq/master/log.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
import logging.handlers

from artiq.protocols.sync_struct import Notifier
from artiq.protocols.logging import parse_log_message, log_with_name
from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter


class LogBuffer:
@@ -34,21 +34,6 @@ def log_worker(rid, message):
log_worker.worker_pass_rid = True


class SourceFilter:
def __init__(self, master_level):
self.master_level = master_level

def filter(self, record):
if not hasattr(record, "source"):
record.source = "master"
if record.source == "master":
return record.levelno >= self.master_level
else:
# log messages that are forwarded from a source have already
# been filtered, and may have a level below the master level.
return True


def log_args(parser):
group = parser.add_argument_group("logging")
group.add_argument("-v", "--verbose", default=0, action="count",
@@ -69,7 +54,8 @@ def log_args(parser):
def init_log(args):
root_logger = logging.getLogger()
root_logger.setLevel(logging.NOTSET) # we use our custom filter only
flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10)
flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10,
"master")
full_fmt = logging.Formatter(
"%(levelname)s:%(source)s:%(name)s:%(message)s")

31 changes: 30 additions & 1 deletion artiq/protocols/logging.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
from artiq.tools import TaskObject


logger = logging.getLogger(__name__)
_fwd_logger = logging.getLogger("fwd")


@@ -59,20 +60,40 @@ async def _handle_connection_cr(self, reader, writer):
except:
return
line = line[:-1]
linesplit = line.split(":", 4)
linesplit = line.split(":", 3)
if len(linesplit) != 4:
logger.warning("received improperly formatted message, "
"dropping connection")
return
source, level, name, message = linesplit
try:
level = int(level)
except:
logger.warning("received improperly formatted level, "
"dropping connection")
return
log_with_name(name, level, message,
extra={"source": source})
finally:
writer.close()


class SourceFilter:
def __init__(self, local_level, local_source):
self.local_level = local_level
self.local_source = local_source

def filter(self, record):
if not hasattr(record, "source"):
record.source = self.local_source
if record.source == self.local_source:
return record.levelno >= self.local_level
else:
# log messages that are forwarded from a source have already
# been filtered, and may have a level below the local level.
return True


class LogForwarder(logging.Handler, TaskObject):
def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000,
**kwargs):
@@ -96,11 +117,19 @@ async def _do(self):
try:
reader, writer = await asyncio.open_connection(self.host,
self.port)
detect_close = asyncio.ensure_future(reader.read(1))
writer.write(_init_string)
while True:
message = await self._queue.get() + "\n"
writer.write(message.encode())
await writer.drain()
# HACK: detect connection termination through the completion
# of a read operation. For some reason, write/drain operations
# on a closed socket do not raise exceptions, but print
# "asyncio:socket.send() raised exception."
if detect_close.done():
await asyncio.sleep(self.reconnect_timer)
break
except asyncio.CancelledError:
return
except: