Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e46af3f

Browse files
committedJul 29, 2015
Initial Commit
0 parents  commit e46af3f

14 files changed

+414
-0
lines changed
 

Diff for: ‎.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
dist
3+
build
4+
*.bak
5+
*.py[co]
6+
*.egg-info

Diff for: ‎Dockerfile

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM crux/python:onbuild
2+
3+
EXPOSE 80
4+
5+
ENTRYPOINT ["broker"]
6+
CMD []

Diff for: ‎README.md

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
broker
2+
======
3+
4+
OpenKnot Broker.
5+
6+
Installation
7+
------------
8+
9+
Either pull the automatically updated [Docker](http://docker.com/) image:
10+
11+
$ docker pull openknot/broker
12+
13+
Or install from the development repository:
14+
15+
$ git clone https://github.com/openknot/broker.git
16+
$ cd broker
17+
$ pip install -r requirements.txt

Diff for: ‎README.rst

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
.. _docker: http://docker.com/
2+
.. _dotCloud: http://dotcloud.com/
3+
4+
5+
broker
6+
======
7+
8+
OpenKnot Broker.
9+
10+
Installation
11+
------------
12+
13+
Either pull the automatically updated `Docker`_ image::
14+
15+
$ docker pull openknot/broker
16+
17+
Or install from the development repository::
18+
19+
$ git clone https://github.com/openknot/broker.git
20+
$ cd broker
21+
$ pip install -r requirements.txt

Diff for: ‎broker/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""broker - OpenKnot Broker
2+
3+
OpenKnot Broker
4+
5+
:copyright: CopyRight (C) 2015 by James Mills
6+
"""
7+
8+
9+
__author__ = "James Mills, prologic at shortcircuit dot net dot au"
10+
__date__ = "29th July 2015"
11+
12+
13+
from .version import version as __version__ # noqa

Diff for: ‎broker/events.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""Events"""
2+
3+
4+
from circuits import Event
5+
6+
7+
class message(Event):
8+
"""message Event"""

Diff for: ‎broker/main.py

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#!/usr/bin/env python
2+
3+
4+
"""OpenKnot Broker Daemon"""
5+
6+
7+
from __future__ import print_function
8+
9+
10+
import os
11+
import sys
12+
import logging
13+
from os import environ
14+
from logging import getLogger
15+
from json import dumps, loads
16+
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
17+
18+
19+
from circuits.web import Controller, Server
20+
from circuits import handler, Component, Debugger
21+
22+
23+
from .events import message
24+
from .mqtt import mqtt, MQTT
25+
from .utils import parse_bind, waitfor
26+
27+
28+
def setup_logging(args):
29+
logstream = sys.stderr if args.logfile is None else open(args.logfile, "a")
30+
31+
logging.basicConfig(
32+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
33+
level=logging.DEBUG if args.debug else logging.INFO,
34+
stream=logstream,
35+
)
36+
37+
return getLogger(__name__)
38+
39+
40+
def setup_mqtt(args, logger):
41+
host, port = parse_bind(args.url)
42+
43+
logger.debug("Waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))
44+
45+
if not waitfor(host, port):
46+
logger.error("Timed out waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))
47+
raise SystemExit(1)
48+
49+
50+
class JSONSerializer(Component):
51+
52+
channel = "web"
53+
54+
# 1 higher than the default response handler
55+
@handler("response", priority=1.0)
56+
def serialize_response_body(self, response):
57+
if isinstance(response.body, dict):
58+
response.headers["Content-Type"] = "application/json"
59+
response.body = dumps(response.body)
60+
61+
62+
class Dispatcher(Component):
63+
64+
def message(self, payload):
65+
protocol = payload.get("protocol", "unknown")
66+
67+
self.fire(mqtt(protocol, payload))
68+
69+
70+
class API(Controller):
71+
72+
channel = "/message"
73+
74+
def POST(self, event, *args, **kwargs):
75+
req, res = event.args[:2]
76+
payload = loads(req.body.read())
77+
78+
self.fire(message(payload))
79+
80+
return {"success": True}
81+
82+
83+
class App(Component):
84+
85+
def init(self, args):
86+
self.args = args
87+
88+
self.logger = getLogger(__name__)
89+
90+
if self.args.debug:
91+
Debugger().register(self)
92+
93+
bind = parse_bind(self.args.bind)
94+
95+
MQTT(args.url).register(self)
96+
97+
Server(bind).register(self)
98+
JSONSerializer().register(self)
99+
100+
API().register(self)
101+
102+
def signal(self, *args):
103+
raise SystemExit(0)
104+
105+
106+
def parse_args():
107+
parser = ArgumentParser(
108+
description=__doc__,
109+
formatter_class=ArgumentDefaultsHelpFormatter
110+
)
111+
112+
parser.add_argument(
113+
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
114+
default=environ.get("BIND", "0.0.0.0:80"),
115+
help="Interface and Port to Bind to"
116+
)
117+
118+
parser.add_argument(
119+
"-d", "--debug", action="store_true", dest="debug",
120+
default=environ.get("DEBUG", False),
121+
help="Enable Debug Mode"
122+
)
123+
124+
parser.add_argument(
125+
"-l", "--logfile", action="store", default=None,
126+
dest="logfile", metavar="FILE", type=str,
127+
help="Log file to store logs in"
128+
)
129+
130+
parser.add_argument(
131+
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
132+
default=environ.get("MQTT_PORT", environ.get("URL", None)), required=True,
133+
help="MQTT URL"
134+
)
135+
136+
return parser.parse_args()
137+
138+
139+
def main():
140+
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 0)
141+
142+
args = parse_args()
143+
144+
logger = setup_logging(args)
145+
146+
setup_mqtt(args, logger)
147+
148+
App(args).run()
149+
150+
151+
if __name__ == "__main__":
152+
main()

Diff for: ‎broker/mqtt.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""MQTT"""
2+
3+
4+
from __future__ import print_function
5+
6+
7+
from paho.mqtt.client import Client
8+
9+
from circuits import Event, Component
10+
11+
12+
from .utils import parse_bind
13+
14+
15+
class mqtt(Event):
16+
"""mqtt Event"""
17+
18+
19+
class MQTT(Component):
20+
21+
def init(self, url):
22+
self.url = url
23+
24+
host, port = parse_bind(self.url)
25+
26+
self.client = Client()
27+
self.client.on_connect = self._on_connect
28+
self.client.on_message = self._on_message
29+
30+
self.client.connect(host, port)
31+
32+
def _on_connect(self, client, userdata, flags, rc):
33+
print("Connected with result code {0}".format(rc))
34+
35+
def _on_message(self, client, userdata, msg):
36+
print("{0} {1}".format(msg.topic, msg.payload))

Diff for: ‎broker/plugin.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""Plugin
2+
3+
Subclass :class:`Plugin` to create broker plugins with standarized CLI Options and API.
4+
"""
5+
6+
7+
from __future__ import print_function
8+
9+
10+
from os import environ
11+
from logging import getLogger
12+
from inspect import getmodule
13+
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
14+
15+
16+
from circuits import Component, Debugger
17+
18+
19+
from .utils import parse_bind
20+
21+
22+
def parse_args(parse=True, description=None):
23+
parser = ArgumentParser(
24+
description=(description or ""),
25+
formatter_class=ArgumentDefaultsHelpFormatter
26+
)
27+
28+
parser.add_argument(
29+
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
30+
default=environ.get("BIND", "0.0.0.0:1338"),
31+
help="Interface and Port to Bind to"
32+
)
33+
34+
parser.add_argument(
35+
"-d", "--debug", action="store_true", dest="debug",
36+
default=environ.get("DEBUG", False),
37+
help="Enable Debug Mode"
38+
)
39+
40+
parser.add_argument(
41+
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
42+
default=environ.get("URL", environ.get("BROKER_PORT", "udp://127.0.0.1:1338")),
43+
help="broker Daemon URL"
44+
)
45+
46+
return parser.parse_args() if parse else parser
47+
48+
49+
class Plugin(Component):
50+
51+
def init(self, parse_args_cb=None):
52+
# Get description from the first line of the plugin's __doc__
53+
description = getattr(getmodule(self), "__doc__", "")
54+
55+
# Allow ArgumentsParser to be extended.
56+
if parse_args_cb is not None:
57+
self.args = parse_args_cb(parse_args(False, description)).parse_args()
58+
else:
59+
self.args = parse_args(description=description)
60+
61+
self.bind = parse_bind(self.args.bind)
62+
self.url = parse_bind(self.args.url)
63+
64+
self.logger = getLogger(__name__)
65+
66+
def started(self, *args):
67+
if self.args.debug:
68+
Debugger().register(self)

Diff for: ‎broker/utils.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Utilities"""
2+
3+
4+
from time import sleep
5+
from socket import AF_INET, SOCK_STREAM, socket
6+
7+
8+
def parse_bind(s, default_port=1338):
9+
# XXX: We ignore the protocol for now
10+
if "://" in s:
11+
_, s = s.split("://", 1)
12+
13+
if ":" in s:
14+
address, port = s.split(":", 1)
15+
port = int(port)
16+
else:
17+
address, port = s, default_port
18+
19+
return address, port
20+
21+
22+
def waitfor(address, port, timeout=10):
23+
sock = socket(AF_INET, SOCK_STREAM)
24+
counter = timeout
25+
while not sock.connect_ex((address, port)) == 0 and counter:
26+
sleep(1)
27+
counter -= 1
28+
return counter

Diff for: ‎broker/version.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""Version Module
2+
3+
So we only have to maintain version information in one place!
4+
"""
5+
6+
version_info = (0, 0, 1, "dev") # (major, minor, patch, dev?)
7+
version = (
8+
".".join(map(str, version_info))
9+
if version_info[-1] != "dev"
10+
else "dev"
11+
)

Diff for: ‎requirements.txt

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
paho-mqtt
2+
3+
# circuits==3.1.0
4+
# Development version of circuits
5+
-e git+https://github.com/circuits/circuits.git#egg=circuits

Diff for: ‎setup.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env python
2+
3+
from setuptools import setup, find_packages
4+
5+
6+
from broker.version import version
7+
8+
9+
def parse_requirements(filename):
10+
with open(filename, "r") as f:
11+
for line in f:
12+
if line and line[:2] not in ("#", "-e"):
13+
yield line.strip()
14+
15+
16+
setup(
17+
name="broker",
18+
version=version,
19+
description="OpenKnot Broker",
20+
long_description=open("README.rst", "r").read(),
21+
author="James Mills",
22+
author_email="James Mills, prologic at shortcircuit dot net dot au",
23+
url="https://github.com/openknot/broker",
24+
download_url="https://github.com/openknot/broker/archive/master.zip",
25+
classifiers=[
26+
"Development Status :: 3 - Alpha",
27+
"Programming Language :: Python :: 2.7",
28+
],
29+
license="TBA",
30+
keywords="openknot broker",
31+
platforms="POSIX",
32+
packages=find_packages("."),
33+
install_requires=list(parse_requirements("requirements.txt")),
34+
entry_points={
35+
"console_scripts": [
36+
"broker=broker.main:main"
37+
]
38+
},
39+
zip_safe=True
40+
)

Diff for: ‎testapi.sh

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
3+
curl -v -o - -d '{"id": 123, "protocol": "test"}' http://localhost:8000/message

0 commit comments

Comments
 (0)
Please sign in to comment.