Skip to content

Commit

Permalink
afamqp: New AMQP destination driver
Browse files Browse the repository at this point in the history
This driver implements an AMQP destination (based on the rabbitmq-c
library), supporting persistence, all the exchange types, and uses a
creative way to get messages accross: all the name-value pairs
selected with the value-pairs() syntax will be sent as headers, while
the message payload can be set with the body() option (empty by
default).

Most settings have sensible defaults, except for a few, noted below:

@module afamqp
destination d_amqp {
    amqp(
        vhost("/")
        host("127.0.0.1")
        port(5672)
        username("guest") # mandatory, no default
        password("guest") # mandatory, no default
        exchange("syslog")
        exchange_type("fanout")
        #routing_key("")
        #body("")
        persistent(yes)
        value-pairs(
            scope("selected-macros" "nv-pairs" "sdata")
        )
    );
};

Publishing the name-value pairs as headers makes it possible to use a
headers exchange type and subscribe only to interesting log streams,
in a much more flexible way than using the routing_key() option.

The routing_key() and body() options can contain any template, they
will be expanded before publication.

Signed-off-by: Attila Nagy <bra@fsn.hu>
Signed-off-by: Gergely Nagy <algernon@balabit.hu>
  • Loading branch information
algernon committed Oct 18, 2012
1 parent f870f6c commit efdee10
Show file tree
Hide file tree
Showing 13 changed files with 1,066 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Expand Up @@ -4,3 +4,6 @@
[submodule "lib/ivykis"]
path = lib/ivykis
url = https://github.com/buytenh/ivykis.git
[submodule "modules/afamqp/rabbitmq-c"]
path = modules/afamqp/rabbitmq-c
url = https://github.com/alanxz/rabbitmq-c.git
2 changes: 1 addition & 1 deletion autogen.sh
Expand Up @@ -3,7 +3,7 @@
# This script is needed to setup build environment from checked out
# source tree.
#
SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client"
SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c"
GIT=`which git`

autogen_submodules()
Expand Down
52 changes: 52 additions & 0 deletions configure.in
Expand Up @@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1"
JSON_C_MIN_VERSION="0.9"
PCRE_MIN_VERSION="6.1"
LMC_MIN_VERSION="0.1.6"
LRMQ_MIN_VERSION="0.0.1"

dnl ***************************************************************************
dnl Initial setup
Expand Down Expand Up @@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client,
Link against the system supplied or the built-in libmongo-client library.]
,,with_libmongo_client="internal")

AC_ARG_ENABLE(amqp,
[ --enable-amqp Enable amqp destination (default: auto)]
,,enable_amqp="auto")

AC_ARG_WITH(librabbitmq,
[ --with-librabbitmq-client=[system/internal]
Link against the system supplied or the built-in librabbitmq library.]
,,with_librabbitmq_client="internal")

AC_ARG_WITH(ivykis,
[ --with-ivykis=[system/internal]
Link against the system supplied or the built-in ivykis library.]
Expand Down Expand Up @@ -826,6 +836,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then
enable_smtp=$libesmtp
fi

dnl ***************************************************************************
dnl rabbitmq-c headers/libraries
dnl ***************************************************************************

if test "x$with_librabbitmq_client" = "xinternal"; then
if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then
AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c])
# these can only be used in modules/amqp as it assumes
# the current directory just one below rabbitmq-c

LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq"
LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq"
LIBRABBITMQ_SUBDIRS="rabbitmq-c"
else
AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c])
with_librabbitmq_client="no"
fi
elif test "x$with_librabbitmq_client" = "xsystem"; then
PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no")
fi

if test "x$with_librabbitmq_client" = "xno"; then
enable_amqp="no"
fi

dnl ***************************************************************************
dnl misc features to be enabled
dnl ***************************************************************************
Expand Down Expand Up @@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then
AC_MSG_RESULT([$enable_mongodb])
fi

if test "x$enable_amqp" = "xauto"; then
AC_MSG_CHECKING(whether to enable amqp destination support)
if test "x$with_librabbitmq_client" != "no"; then
enable_amqp="yes"
else
enable_amqp="no"
fi
AC_MSG_RESULT([$enable_amqp])
fi

if test "x$enable_json" != "xno"; then
JSON_LIBS=$JSON_C_LIBS
JSON_CFLAGS=$JSON_C_CFLAGS
Expand Down Expand Up @@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"])
AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"])
AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"])
AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"])
AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"])
AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"])
AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"])

Expand Down Expand Up @@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS)
AC_SUBST(LIBMONGO_SUBDIRS)
AC_SUBST(LIBESMTP_CFLAGS)
AC_SUBST(LIBESMTP_LIBS)
AC_SUBST(LIBRABBITMQ_LIBS)
AC_SUBST(LIBRABBITMQ_CFLAGS)
AC_SUBST(LIBRABBITMQ_SUBDIRS)
AC_SUBST(JSON_LIBS)
AC_SUBST(JSON_CFLAGS)
AC_SUBST(IVYKIS_SUBDIRS)
Expand Down Expand Up @@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf
modules/afuser/Makefile
modules/afmongodb/Makefile
modules/afsmtp/Makefile
modules/afamqp/Makefile
modules/dbparser/Makefile
modules/dbparser/pdbtool/Makefile
modules/dbparser/tests/Makefile
Expand Down Expand Up @@ -1172,6 +1222,7 @@ echo " __thread keyword : ${ac_cv_have_tls:=no}"
echo " Submodules:"
echo " ivykis : $with_ivykis"
echo " libmongo-client : $with_libmongo_client"
echo " librabbitmq : $with_librabbitmq_client"
echo " Features:"
echo " Debug symbols : ${enable_debug:=no}"
echo " GCC profiling : ${enable_gprof:=no}"
Expand All @@ -1192,4 +1243,5 @@ echo " PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}"
echo " MongoDB destination (module): ${enable_mongodb:=no}"
echo " JSON support (module) : ${enable_json:=no}"
echo " SMTP support (module) : ${enable_smtp:=no}"
echo " AMQP destination (module) : ${enable_amqp:=no}"

2 changes: 2 additions & 0 deletions lib/stats.c
Expand Up @@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] =
"severity",
"facility",
"sender",
"smtp",
"amqp",
};


Expand Down
1 change: 1 addition & 0 deletions lib/stats.h
Expand Up @@ -71,6 +71,7 @@ enum
SCS_FACILITY = 25,
SCS_SENDER = 26,
SCS_SMTP = 27,
SCS_AMQP = 28,
SCS_MAX,
SCS_SOURCE_MASK = 0xff
};
Expand Down
2 changes: 1 addition & 1 deletion modules/Makefile.am
@@ -1 +1 @@
SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
35 changes: 35 additions & 0 deletions modules/afamqp/Makefile.am
@@ -0,0 +1,35 @@

SUBDIRS = @LIBRABBITMQ_SUBDIRS@
DIST_SUBDIRS = rabbitmq-c

moduledir = @moduledir@
AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib
module_LTLIBRARIES = libafamqp.la

export top_srcdir

if ENABLE_AMQP

libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS)
libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h
libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS)
libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS)

endif

BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h
EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym

include $(top_srcdir)/build/lex-rules.am

# divert install/uninstall targets to avoid recursing into $(SUBDIRS)

install:
$(MAKE) $(AM_MAKEFLAGS) all
$(MAKE) $(AM_MAKEFLAGS) install-am

uninstall:
$(MAKE) $(AM_MAKEFLAGS) uninstall-am

check:
echo "Make check disabled, since it requires a newer glib"
92 changes: 92 additions & 0 deletions modules/afamqp/afamqp-grammar.ym
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
* Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
* Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

%code requires {

#include "afamqp-parser.h"

}

%code {

#include "cfg-parser.h"
#include "afamqp-grammar.h"
#include "plugin.h"
#include "vptransform.h"

extern LogDriver *last_driver;
extern ValuePairs *last_value_pairs;
extern ValuePairsTransformSet *last_vp_transset;
}

%name-prefix "afamqp_"
%lex-param {CfgLexer *lexer}
%parse-param {CfgLexer *lexer}
%parse-param {LogDriver **instance}
%parse-param {gpointer arg}


/* INCLUDE_DECLS */

%token KW_AMQP
%token KW_EXCHANGE
%token KW_EXCHANGE_TYPE
%token KW_PERSISTENT
%token KW_VHOST
%token KW_ROUTING_KEY
%token KW_BODY

%%

start
: LL_CONTEXT_DESTINATION KW_AMQP
{
last_driver = *instance = afamqp_dd_new();
}
'(' afamqp_options ')' { YYACCEPT; }
;

afamqp_options
: afamqp_option afamqp_options
|
;

afamqp_option
: KW_HOST '(' string ')' { afamqp_dd_set_host(last_driver, $3); free($3); }
| KW_PORT '(' LL_NUMBER ')' { afamqp_dd_set_port(last_driver, $3); }
| KW_VHOST '(' string ')' { afamqp_dd_set_vhost(last_driver, $3); free($3); }
| KW_EXCHANGE '(' string ')' { afamqp_dd_set_exchange(last_driver, $3); free($3); }
| KW_EXCHANGE_TYPE '(' string ')' { afamqp_dd_set_exchange_type(last_driver, $3); free($3); }
| KW_ROUTING_KEY '(' string ')' { afamqp_dd_set_routing_key(last_driver, $3); free($3); }
| KW_BODY '(' string ')' { afamqp_dd_set_body(last_driver, $3); free($3); }
| KW_PERSISTENT '(' yesno ')' { afamqp_dd_set_persistent(last_driver, $3); }
| KW_USERNAME '(' string ')' { afamqp_dd_set_user(last_driver, $3); free($3); }
| KW_PASSWORD '(' string ')' { afamqp_dd_set_password(last_driver, $3); free($3); }
| value_pair_option { afamqp_dd_set_value_pairs(last_driver, $1); }
| dest_driver_option
;

/* INCLUDE_RULES */

%%
59 changes: 59 additions & 0 deletions modules/afamqp/afamqp-parser.c
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
* Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
* Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#include "afamqp.h"
#include "cfg-parser.h"
#include "afamqp-grammar.h"

extern int afamqp_debug;
int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg);

static CfgLexerKeyword afamqp_keywords[] = {
{ "amqp", KW_AMQP },
{ "vhost", KW_VHOST },
{ "host", KW_HOST },
{ "port", KW_PORT },
{ "exchange", KW_EXCHANGE },
{ "exchange_type", KW_EXCHANGE_TYPE },
{ "routing_key", KW_ROUTING_KEY },
{ "persistent", KW_PERSISTENT },
{ "username", KW_USERNAME },
{ "password", KW_PASSWORD },
{ "log_fifo_size", KW_LOG_FIFO_SIZE },
{ "body", KW_BODY },
{ NULL }
};

CfgParser afamqp_parser =
{
#if ENABLE_DEBUG
.debug_flag = &afamqp_debug,
#endif
.name = "afamqp",
.keywords = afamqp_keywords,
.parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse,
.cleanup = (void (*)(gpointer)) log_pipe_unref,
};

CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **)
36 changes: 36 additions & 0 deletions modules/afamqp/afamqp-parser.h
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
* Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
* Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#ifndef AFAMQP_PARSER_H_INCLUDED
#define AFAMQP_PARSER_H_INCLUDED

#include "cfg-parser.h"
#include "cfg-lexer.h"
#include "afamqp.h"

extern CfgParser afamqp_parser;

CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **)

#endif

0 comments on commit efdee10

Please sign in to comment.