Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 08717c5cf232
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 2dc486850064
Choose a head ref
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on May 21, 2020

  1. convert stats to easyamqp

    LnL7 committed May 21, 2020
    Copy the full SHA
    41c2899 View commit details

Commits on May 23, 2020

  1. Merge pull request #493 from LnL7/lapin-stats

    lapin stats
    LnL7 authored May 23, 2020
    Copy the full SHA
    2dc4868 View commit details
Showing with 56 additions and 69 deletions.
  1. +56 −69 ofborg/src/bin/stats.rs
125 changes: 56 additions & 69 deletions ofborg/src/bin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,83 @@
use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, stats, tasks, worker};

use std::env;
use std::error::Error;
use std::thread;

use amqp::Basic;
use async_std::task;
use hyper::server::{Request, Response, Server};
use tracing::info;

fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, easylapin, stats, tasks};

fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

info!("Hello, world!");
let arg = env::args().nth(1).expect("usage: stats <config>");
let cfg = config::load(arg.as_ref());

let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
info!("Connected to rabbitmq");
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

let events = stats::RabbitMQ::from_amqp(
let events = stats::RabbitMQ::from_lapin(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap(),
task::block_on(conn.create_channel())?,
);

let metrics = stats::MetricCollector::new();

let collector = tasks::statscollector::StatCollectorWorker::new(events, metrics.clone());

let mut channel = session.open_channel(1).unwrap();
channel
.declare_exchange(easyamqp::ExchangeConfig {
exchange: "stats".to_owned(),
exchange_type: easyamqp::ExchangeType::Fanout,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.unwrap();

channel
.declare_queue(easyamqp::QueueConfig {
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "stats".to_owned(),
exchange_type: easyamqp::ExchangeType::Fanout,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;

let queue_name = String::from("stats-events");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;

chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "stats".to_owned(),
routing_key: None,
no_wait: false,
})?;

let handle = chan.consume(
collector,
easyamqp::ConsumeConfig {
queue: "stats-events".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.unwrap();

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "stats-events".to_owned(),
exchange: "stats".to_owned(),
routing_key: None,
consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
})
.unwrap();

channel.basic_prefetch(1).unwrap();
let mut channel = channel
.consume(
worker::new(collector),
easyamqp::ConsumeConfig {
queue: "stats-events".to_owned(),
consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.unwrap();
exclusive: false,
},
)?;

thread::spawn(|| {
let addr = "0.0.0.0:9898";
info!("listening addr {:?}", addr);
Server::http(addr)
.unwrap()
.handle(move |_: Request, res: Response| {
res.send(metrics.prometheus_output().as_bytes()).unwrap();
})
.unwrap();
Server::http(addr)?.handle(move |_: Request, res: Response| {
res.send(metrics.prometheus_output().as_bytes()).unwrap();
})?;
Ok::<_, Box<dyn Error + Sync + Send + '_>>(())
});

channel.start_consuming();

info!("Finished consuming?");
info!("Fetching jobs from {}", &queue_name);
task::block_on(handle);

channel.close(200, "Bye").unwrap();
info!("Closed the channel");
session.close(200, "Good Bye");
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}