Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: f1d34ee24296
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b7a8808fc32d
Choose a head ref
  • 3 commits
  • 2 files changed
  • 1 contributor

Commits on May 21, 2020

  1. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    bd8bed6 View commit details
  2. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    8424c32 View commit details
  3. Merge pull request #491 from LnL7/lapin-evaluation-filter

    lapin evaluation filter
    LnL7 authored May 21, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    b7a8808 View commit details
Showing with 64 additions and 74 deletions.
  1. +60 −73 ofborg/src/bin/evaluation-filter.rs
  2. +4 −1 ofborg/src/tasks/evaluationfilter.rs
133 changes: 60 additions & 73 deletions ofborg/src/bin/evaluation-filter.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,77 @@
use amqp::Basic;
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

use std::env;
use std::error::Error;

use async_std::task;
use tracing::info;

fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::tasks;

fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

info!("Hello, world!");
let arg = env::args()
.nth(1)
.expect("usage: evaluation-filter <config>");
let cfg = config::load(arg.as_ref());

let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
info!("Connected to rabbitmq");
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

let mut channel = session.open_channel(1).unwrap();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "github-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;

channel
.declare_exchange(easyamqp::ExchangeConfig {
exchange: "github-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.unwrap();
chan.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.unwrap();
let queue_name = String::from("mass-rebuild-check-jobs");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.unwrap();
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/nixpkgs".to_owned()),
no_wait: false,
})?;

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/nixpkgs".to_owned()),
let handle = chan.consume(
tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
})
.unwrap();

channel.basic_prefetch(1).unwrap();
let mut channel = channel
.consume(
worker::new(tasks::evaluationfilter::EvaluationFilterWorker::new(
cfg.acl(),
)),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
consumer_tag: format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.unwrap();

channel.start_consuming();
exclusive: false,
},
)?;

info!("Finished consuming?");
info!("Fetching jobs from {}", &queue_name);
task::block_on(handle);

channel.close(200, "Bye").unwrap();
info!("Closed the channel");
session.close(200, "Good Bye");
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}
5 changes: 4 additions & 1 deletion ofborg/src/tasks/evaluationfilter.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use crate::ghevent;
use crate::message::{evaluationjob, Pr, Repo};
use crate::worker;

use tracing::info;
use tracing::{debug_span, info};

pub struct EvaluationFilterWorker {
acl: acl::ACL,
@@ -30,6 +30,9 @@ impl worker::SimpleWorker for EvaluationFilterWorker {
}

fn consumer(&mut self, job: &ghevent::PullRequestEvent) -> worker::Actions {
let span = debug_span!("job", pr = ?job.number);
let _enter = span.enter();

if !self.acl.is_repo_eligible(&job.repository.full_name) {
info!("Repo not authorized ({})", job.repository.full_name);
return vec![worker::Action::Ack];