Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: b7a8808fc32d
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 874bfcd028f9
Choose a head ref
  • 4 commits
  • 4 files changed
  • 1 contributor

Commits on May 21, 2020

  1. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    3ce9f3e View commit details
  2. convert mass rebuilder to lapin

    LnL7 committed May 21, 2020

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    9120d16 View commit details
  3. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    910330c View commit details
  4. Merge pull request #492 from LnL7/lapin-mass-rebuilder

    lapin mass rebuilder
    LnL7 authored May 21, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    874bfcd View commit details
Showing with 102 additions and 74 deletions.
  1. +52 −62 ofborg/src/bin/mass-rebuilder.rs
  2. +1 −1 ofborg/src/bin/stats.rs
  3. +46 −8 ofborg/src/stats.rs
  4. +3 −3 ofborg/src/tasks/evaluate.rs
114 changes: 52 additions & 62 deletions ofborg/src/bin/mass-rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use ofborg::checkout;
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::stats;
use ofborg::tasks;
use ofborg::worker;

use std::env;
use std::error::Error;
use std::path::Path;
use std::process;

use amqp::Basic;
use async_std::task;
use tracing::{error, info};

use ofborg::checkout;
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::stats;
use ofborg::tasks;

// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn main() {
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args().nth(1).expect("usage: mass-rebuilder <config>");
let cfg = config::load(arg.as_ref());

let memory_info = sys_info::mem_info().expect("Unable to get memory information from OS");

if memory_info.avail < 8 * 1024 * 1024 {
@@ -26,68 +32,52 @@ fn main() {
process::exit(1);
};

let cfg = config::load(env::args().nth(1).unwrap().as_ref());

ofborg::setup_log();

info!("Hello, world!");

let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
info!("Connected to rabbitmq");

let mut channel = session.open_channel(1).unwrap();
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix();

let events = stats::RabbitMQ::new(
let events = stats::RabbitMQ::from_lapin(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap(),
task::block_on(conn.create_channel())?,
);

let mrw = tasks::evaluate::EvaluationWorker::new(
cloner,
&nix,
cfg.github(),
cfg.github_app_vendingmachine(),
cfg.acl(),
cfg.runner.identity.clone(),
events,
cfg.tag_paths.clone().unwrap(),
);

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
let queue_name = String::from("mass-rebuild-check-jobs");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;

let handle = chan.consume(
tasks::evaluate::EvaluationWorker::new(
cloner,
&nix,
cfg.github(),
cfg.github_app_vendingmachine(),
cfg.acl(),
cfg.runner.identity.clone(),
events,
cfg.tag_paths.clone().unwrap(),
),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
})
.unwrap();

channel.basic_prefetch(1).unwrap();
let mut channel = channel
.consume(
worker::new(mrw),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.unwrap();

channel.start_consuming();
exclusive: false,
},
)?;

info!("Finished consuming?");
info!("Fetching jobs from {}", queue_name);
task::block_on(handle);

channel.close(200, "Bye").unwrap();
info!("Closed the channel");
session.close(200, "Good Bye");
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}
2 changes: 1 addition & 1 deletion ofborg/src/bin/stats.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ fn main() {
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
info!("Connected to rabbitmq");

let events = stats::RabbitMQ::new(
let events = stats::RabbitMQ::from_amqp(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap(),
);
54 changes: 46 additions & 8 deletions ofborg/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use amqp::protocol::basic::BasicProperties;
use amqp::{Basic, Channel};
use amqp::protocol::basic;
use amqp::Basic;
use async_std::task;
use lapin::options::BasicPublishOptions;
use lapin::CloseOnDrop;

include!(concat!(env!("OUT_DIR"), "/events.rs"));

@@ -19,23 +22,23 @@ pub struct EventMessage {
pub events: Vec<Event>,
}

pub struct RabbitMQ {
pub struct RabbitMQ<C> {
identity: String,
channel: Channel,
channel: C,
}

impl RabbitMQ {
pub fn new(identity: &str, channel: Channel) -> RabbitMQ {
impl RabbitMQ<amqp::Channel> {
pub fn from_amqp(identity: &str, channel: amqp::Channel) -> Self {
RabbitMQ {
identity: identity.to_owned(),
channel,
}
}
}

impl SysEvents for RabbitMQ {
impl SysEvents for RabbitMQ<amqp::Channel> {
fn notify(&mut self, event: Event) {
let props = BasicProperties {
let props = basic::BasicProperties {
..Default::default()
};
self.channel
@@ -55,3 +58,38 @@ impl SysEvents for RabbitMQ {
.unwrap();
}
}

impl RabbitMQ<CloseOnDrop<lapin::Channel>> {
pub fn from_lapin(identity: &str, channel: CloseOnDrop<lapin::Channel>) -> Self {
RabbitMQ {
identity: identity.to_owned(),
channel,
}
}
}

impl SysEvents for RabbitMQ<CloseOnDrop<lapin::Channel>> {
fn notify(&mut self, event: Event) {
let props = lapin::BasicProperties::default().with_content_type("application/json".into());
task::block_on(async {
let _confirmaton = self
.channel
.basic_publish(
&String::from("stats"),
&"".to_owned(),
BasicPublishOptions::default(),
serde_json::to_string(&EventMessage {
sender: self.identity.clone(),
events: vec![event],
})
.unwrap()
.into_bytes(),
props,
)
.await
.unwrap()
.await
.unwrap();
});
}
}
6 changes: 3 additions & 3 deletions ofborg/src/tasks/evaluate.rs
Original file line number Diff line number Diff line change
@@ -80,6 +80,9 @@ impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E>
}

fn consumer(&mut self, job: &evaluationjob::EvaluationJob) -> worker::Actions {
let span = debug_span!("job", pr = ?job.pr.number);
let _enter = span.enter();

let mut vending_machine = self
.github_vend
.write()
@@ -249,9 +252,6 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> {
// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn evaluate_job(&mut self) -> Result<worker::Actions, EvalWorkerError> {
let span = debug_span!("job", pr = ?self.job.pr.number);
let _enter = span.enter();

let job = self.job;
let repo = self
.client_app