Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Initial Commit
  • Loading branch information
prologic committed Jul 29, 2015
0 parents commit e46af3f
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
@@ -0,0 +1,6 @@
*~
dist
build
*.bak
*.py[co]
*.egg-info
6 changes: 6 additions & 0 deletions Dockerfile
@@ -0,0 +1,6 @@
FROM crux/python:onbuild

EXPOSE 80

ENTRYPOINT ["broker"]
CMD []
17 changes: 17 additions & 0 deletions README.md
@@ -0,0 +1,17 @@
broker
======

OpenKnot Broker.

Installation
------------

Either pull the automatically updated [Docker](http://docker.com/) image:

$ docker pull openknot/broker

Or install from the development repository:

$ git clone https://github.com/openknot/broker.git
$ cd broker
$ pip install -r requirements.txt
21 changes: 21 additions & 0 deletions README.rst
@@ -0,0 +1,21 @@
.. _docker: http://docker.com/
.. _dotCloud: http://dotcloud.com/


broker
======

OpenKnot Broker.

Installation
------------

Either pull the automatically updated `Docker`_ image::
$ docker pull openknot/broker

Or install from the development repository::
$ git clone https://github.com/openknot/broker.git
$ cd broker
$ pip install -r requirements.txt
13 changes: 13 additions & 0 deletions broker/__init__.py
@@ -0,0 +1,13 @@
"""broker - OpenKnot Broker
OpenKnot Broker
:copyright: CopyRight (C) 2015 by James Mills
"""


__author__ = "James Mills, prologic at shortcircuit dot net dot au"
__date__ = "29th July 2015"


from .version import version as __version__ # noqa
8 changes: 8 additions & 0 deletions broker/events.py
@@ -0,0 +1,8 @@
"""Events"""


from circuits import Event


class message(Event):
"""message Event"""
152 changes: 152 additions & 0 deletions broker/main.py
@@ -0,0 +1,152 @@
#!/usr/bin/env python


"""OpenKnot Broker Daemon"""


from __future__ import print_function


import os
import sys
import logging
from os import environ
from logging import getLogger
from json import dumps, loads
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser


from circuits.web import Controller, Server
from circuits import handler, Component, Debugger


from .events import message
from .mqtt import mqtt, MQTT
from .utils import parse_bind, waitfor


def setup_logging(args):
logstream = sys.stderr if args.logfile is None else open(args.logfile, "a")

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG if args.debug else logging.INFO,
stream=logstream,
)

return getLogger(__name__)


def setup_mqtt(args, logger):
host, port = parse_bind(args.url)

logger.debug("Waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))

if not waitfor(host, port):
logger.error("Timed out waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))
raise SystemExit(1)


class JSONSerializer(Component):

channel = "web"

# 1 higher than the default response handler
@handler("response", priority=1.0)
def serialize_response_body(self, response):
if isinstance(response.body, dict):
response.headers["Content-Type"] = "application/json"
response.body = dumps(response.body)


class Dispatcher(Component):

def message(self, payload):
protocol = payload.get("protocol", "unknown")

self.fire(mqtt(protocol, payload))


class API(Controller):

channel = "/message"

def POST(self, event, *args, **kwargs):
req, res = event.args[:2]
payload = loads(req.body.read())

self.fire(message(payload))

return {"success": True}


class App(Component):

def init(self, args):
self.args = args

self.logger = getLogger(__name__)

if self.args.debug:
Debugger().register(self)

bind = parse_bind(self.args.bind)

MQTT(args.url).register(self)

Server(bind).register(self)
JSONSerializer().register(self)

API().register(self)

def signal(self, *args):
raise SystemExit(0)


def parse_args():
parser = ArgumentParser(
description=__doc__,
formatter_class=ArgumentDefaultsHelpFormatter
)

parser.add_argument(
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
default=environ.get("BIND", "0.0.0.0:80"),
help="Interface and Port to Bind to"
)

parser.add_argument(
"-d", "--debug", action="store_true", dest="debug",
default=environ.get("DEBUG", False),
help="Enable Debug Mode"
)

parser.add_argument(
"-l", "--logfile", action="store", default=None,
dest="logfile", metavar="FILE", type=str,
help="Log file to store logs in"
)

parser.add_argument(
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
default=environ.get("MQTT_PORT", environ.get("URL", None)), required=True,
help="MQTT URL"
)

return parser.parse_args()


def main():
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 0)

args = parse_args()

logger = setup_logging(args)

setup_mqtt(args, logger)

App(args).run()


if __name__ == "__main__":
main()
36 changes: 36 additions & 0 deletions broker/mqtt.py
@@ -0,0 +1,36 @@
"""MQTT"""


from __future__ import print_function


from paho.mqtt.client import Client

from circuits import Event, Component


from .utils import parse_bind


class mqtt(Event):
"""mqtt Event"""


class MQTT(Component):

def init(self, url):
self.url = url

host, port = parse_bind(self.url)

self.client = Client()
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message

self.client.connect(host, port)

def _on_connect(self, client, userdata, flags, rc):
print("Connected with result code {0}".format(rc))

def _on_message(self, client, userdata, msg):
print("{0} {1}".format(msg.topic, msg.payload))
68 changes: 68 additions & 0 deletions broker/plugin.py
@@ -0,0 +1,68 @@
"""Plugin
Subclass :class:`Plugin` to create broker plugins with standarized CLI Options and API.
"""


from __future__ import print_function


from os import environ
from logging import getLogger
from inspect import getmodule
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser


from circuits import Component, Debugger


from .utils import parse_bind


def parse_args(parse=True, description=None):
parser = ArgumentParser(
description=(description or ""),
formatter_class=ArgumentDefaultsHelpFormatter
)

parser.add_argument(
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
default=environ.get("BIND", "0.0.0.0:1338"),
help="Interface and Port to Bind to"
)

parser.add_argument(
"-d", "--debug", action="store_true", dest="debug",
default=environ.get("DEBUG", False),
help="Enable Debug Mode"
)

parser.add_argument(
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
default=environ.get("URL", environ.get("BROKER_PORT", "udp://127.0.0.1:1338")),
help="broker Daemon URL"
)

return parser.parse_args() if parse else parser


class Plugin(Component):

def init(self, parse_args_cb=None):
# Get description from the first line of the plugin's __doc__
description = getattr(getmodule(self), "__doc__", "")

# Allow ArgumentsParser to be extended.
if parse_args_cb is not None:
self.args = parse_args_cb(parse_args(False, description)).parse_args()
else:
self.args = parse_args(description=description)

self.bind = parse_bind(self.args.bind)
self.url = parse_bind(self.args.url)

self.logger = getLogger(__name__)

def started(self, *args):
if self.args.debug:
Debugger().register(self)
28 changes: 28 additions & 0 deletions broker/utils.py
@@ -0,0 +1,28 @@
"""Utilities"""


from time import sleep
from socket import AF_INET, SOCK_STREAM, socket


def parse_bind(s, default_port=1338):
# XXX: We ignore the protocol for now
if "://" in s:
_, s = s.split("://", 1)

if ":" in s:
address, port = s.split(":", 1)
port = int(port)
else:
address, port = s, default_port

return address, port


def waitfor(address, port, timeout=10):
sock = socket(AF_INET, SOCK_STREAM)
counter = timeout
while not sock.connect_ex((address, port)) == 0 and counter:
sleep(1)
counter -= 1
return counter
11 changes: 11 additions & 0 deletions broker/version.py
@@ -0,0 +1,11 @@
"""Version Module
So we only have to maintain version information in one place!
"""

version_info = (0, 0, 1, "dev") # (major, minor, patch, dev?)
version = (
".".join(map(str, version_info))
if version_info[-1] != "dev"
else "dev"
)
5 changes: 5 additions & 0 deletions requirements.txt
@@ -0,0 +1,5 @@
paho-mqtt

# circuits==3.1.0
# Development version of circuits
-e git+https://github.com/circuits/circuits.git#egg=circuits

0 comments on commit e46af3f

Please sign in to comment.