Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2dc486850064
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 5e48f5f0191b
Choose a head ref
  • 4 commits
  • 6 files changed
  • 2 contributors

Commits on May 22, 2020

  1. Copy the full SHA
    51b4c3d View commit details
  2. move prefetch qos to WorkerChannel

    This way the log collecter can use a channel with prefetching while the
    other workers still consume items one by one.
    LnL7 committed May 22, 2020
    Copy the full SHA
    f652901 View commit details

Commits on May 23, 2020

  1. Update ofborg/src/easylapin.rs

    Co-authored-by: Cole Helbling <cole.e.helbling@outlook.com>
    LnL7 and cole-h authored May 23, 2020
    Copy the full SHA
    b6e870f View commit details
  2. Merge pull request #495 from LnL7/lapin-log-message-collector

    convert log-message-collector to easylapin
    LnL7 authored May 23, 2020
    Copy the full SHA
    5e48f5f View commit details
2 changes: 1 addition & 1 deletion ofborg/src/bin/evaluation-filter.rs
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ fn main() -> Result<(), Box<dyn Error>> {
no_wait: false,
})?;

let handle = chan.consume(
let handle = easylapin::WorkerChannel(chan).consume(
tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
2 changes: 1 addition & 1 deletion ofborg/src/bin/github-comment-filter.rs
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ fn main() -> Result<(), Box<dyn Error>> {
no_wait: false,
})?;

let handle = chan.consume(
let handle = easylapin::WorkerChannel(chan).consume(
tasks::githubcommentfilter::GitHubCommentWorker::new(cfg.acl(), cfg.github()),
easyamqp::ConsumeConfig {
queue: "build-inputs".to_owned(),
2 changes: 1 addition & 1 deletion ofborg/src/bin/github-comment-poster.rs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn Error>> {
no_wait: false,
})?;

let handle = chan.consume(
let handle = easylapin::WorkerChannel(chan).consume(
tasks::githubcommentposter::GitHubCommentPoster::new(cfg.github_app_vendingmachine()),
easyamqp::ConsumeConfig {
queue: "build-results".to_owned(),
115 changes: 55 additions & 60 deletions ofborg/src/bin/log-message-collector.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,73 @@
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

use std::env;
use std::error::Error;
use std::path::PathBuf;

use async_std::task;
use tracing::info;

fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::tasks;

fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
info!("Connected to rabbitmq");
let arg = env::args()
.nth(1)
.expect("usage: log-message-collector <config>");
let cfg = config::load(arg.as_ref());

let mut channel = session.open_channel(1).unwrap();
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

channel
.declare_exchange(easyamqp::ExchangeConfig {
exchange: "logs".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.unwrap();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "logs".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;

let queue_name = "".to_owned();
channel
.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
})
.unwrap();
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
})?;

channel
.bind_queue(easyamqp::BindQueueConfig {
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "logs".to_owned(),
routing_key: Some("*.*".to_owned()),
no_wait: false,
})?;

// Regular channel, we want prefetching here.
let handle = chan.consume(
tasks::log_message_collector::LogMessageCollector::new(
PathBuf::from(cfg.log_storage.clone().unwrap().path),
100,
),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
exchange: "logs".to_owned(),
routing_key: Some("*.*".to_owned()),
consumer_tag: format!("{}-log-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
})
.unwrap();

let mut channel = channel
.consume(
worker::new(tasks::log_message_collector::LogMessageCollector::new(
PathBuf::from(cfg.log_storage.clone().unwrap().path),
100,
)),
easyamqp::ConsumeConfig {
queue: queue_name,
consumer_tag: format!("{}-log-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.unwrap();

channel.start_consuming();
exclusive: false,
},
)?;

info!("Finished consuming?");
info!("Fetching jobs from {}", &queue_name);
task::block_on(handle);

channel.close(200, "Bye").unwrap();
info!("Closed the channel");
session.close(200, "Good Bye");
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}
2 changes: 1 addition & 1 deletion ofborg/src/bin/mass-rebuilder.rs
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ fn main() -> Result<(), Box<dyn Error>> {
no_wait: false,
})?;

let handle = chan.consume(
let handle = easylapin::WorkerChannel(chan).consume(
tasks::evaluate::EvaluationWorker::new(
cloner,
&nix,
16 changes: 14 additions & 2 deletions ofborg/src/easylapin.rs
Original file line number Diff line number Diff line change
@@ -86,8 +86,6 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;

fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.basic_qos(1, BasicQosOptions::default()))?;

let mut consumer = task::block_on(self.basic_consume(
&config.queue,
&config.consumer_tag,
@@ -117,6 +115,20 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {
}
}

/// Same as a regular channel, but without prefetching,
/// used for services with multiple instances.
pub struct WorkerChannel(pub CloseOnDrop<Channel>);

impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
type Error = lapin::Error;
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;

fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?;
self.0.consume(worker, config)
}
}

struct ChannelNotificationReceiver<'a> {
channel: &'a mut CloseOnDrop<lapin::Channel>,
deliver: &'a Delivery,