Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: bef27f6ab914
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: ed4fcea2275e
Choose a head ref
  • 5 commits
  • 8 files changed
  • 2 contributors

Commits on Apr 27, 2020

  1. remove amqp arguments

    These are not used anywhere and the table is a type specific to the amqp
    library.
    LnL7 committed Apr 27, 2020

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    a87cf35 View commit details
  2. make amqp error type generic

    LnL7 committed Apr 27, 2020

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    a34a4ac View commit details
  3. remove amqp return types

    LnL7 committed Apr 27, 2020

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    9872346 View commit details
  4. make amqp consume generic and split traits

    This removes the final library specfic type from the traits.
    LnL7 committed Apr 27, 2020

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    LnL7 Daiderd Jordan
    Copy the full SHA
    5ad80e8 View commit details
  5. Merge pull request #473 from LnL7/generic-easyamqp

    generic easyamqp
    grahamc authored Apr 27, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    ed4fcea View commit details
23 changes: 10 additions & 13 deletions ofborg/src/bin/builder.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use amqp::Basic;
use log::{info, log, warn};
use ofborg::checkout;
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::notifyworker;
use ofborg::tasks;

@@ -39,38 +39,37 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

let queue_name: String = if cfg.runner.build_all_jobs != Some(true) {
let queue_name = if cfg.runner.build_all_jobs != Some(true) {
let queue_name = format!("build-inputs-{}", cfg.nix.system.clone());
channel
.declare_queue(easyamqp::QueueConfig {
queue: format!("build-inputs-{}", cfg.nix.system.clone()),
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap()
.queue
.unwrap();
queue_name
} else {
warn!("Building all jobs, please don't use this unless you're");
warn!("developing and have Graham's permission!");
let queue_name = "".to_owned();
channel
.declare_queue(easyamqp::QueueConfig {
queue: "".to_owned(),
queue: queue_name.clone(),
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
arguments: None,
})
.unwrap()
.queue
.unwrap();
queue_name
};

channel
@@ -79,7 +78,6 @@ fn main() {
exchange: "build-jobs".to_owned(),
routing_key: None,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -98,7 +96,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
7 changes: 1 addition & 6 deletions ofborg/src/bin/evaluation-filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use amqp::Basic;
use log::{info, log};
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

@@ -27,7 +27,6 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

@@ -39,7 +38,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -51,7 +49,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -61,7 +58,6 @@ fn main() {
exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/nixpkgs".to_owned()),
no_wait: false,
arguments: None,
})
.unwrap();

@@ -78,7 +74,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
7 changes: 1 addition & 6 deletions ofborg/src/bin/github-comment-filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use amqp::Basic;
use log::{info, log};
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

@@ -26,7 +26,6 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

@@ -39,7 +38,6 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

@@ -51,7 +49,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -61,7 +58,6 @@ fn main() {
exchange: "github-events".to_owned(),
routing_key: Some("issue_comment.*".to_owned()),
no_wait: false,
arguments: None,
})
.unwrap();

@@ -79,7 +75,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
6 changes: 1 addition & 5 deletions ofborg/src/bin/github-comment-poster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use amqp::Basic;
use log::{info, log};
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

@@ -23,7 +23,6 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

@@ -35,7 +34,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -45,7 +43,6 @@ fn main() {
exchange: "build-results".to_owned(),
routing_key: None,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -62,7 +59,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
14 changes: 5 additions & 9 deletions ofborg/src/bin/log-message-collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use log::{info, log};
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks;
use ofborg::worker;

@@ -25,30 +25,27 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

let queue_name = channel
let queue_name = "".to_owned();
channel
.declare_queue(easyamqp::QueueConfig {
queue: "".to_owned(),
queue: queue_name.clone(),
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
arguments: None,
})
.unwrap()
.queue;
.unwrap();

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "logs".to_owned(),
routing_key: Some("*.*".to_owned()),
no_wait: false,
arguments: None,
})
.unwrap();

@@ -65,7 +62,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
4 changes: 1 addition & 3 deletions ofborg/src/bin/mass-rebuilder.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use amqp::Basic;
use log::{error, info, log};
use ofborg::checkout;
use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::stats;
use ofborg::tasks;
use ofborg::worker;
@@ -61,7 +61,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -76,7 +75,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
6 changes: 1 addition & 5 deletions ofborg/src/bin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use amqp::Basic;
use hyper::server::{Request, Response, Server};
use log::{info, log};
use ofborg::easyamqp::TypedWrappers;
use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, stats, tasks, worker};

use std::env;
@@ -35,7 +35,6 @@ fn main() {
auto_delete: false,
no_wait: false,
internal: false,
arguments: None,
})
.unwrap();

@@ -47,7 +46,6 @@ fn main() {
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -57,7 +55,6 @@ fn main() {
exchange: "stats".to_owned(),
routing_key: None,
no_wait: false,
arguments: None,
})
.unwrap();

@@ -72,7 +69,6 @@ fn main() {
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
113 changes: 42 additions & 71 deletions ofborg/src/easyamqp.rs
Original file line number Diff line number Diff line change
@@ -41,10 +41,6 @@ pub struct ConsumeConfig {
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,

/// A set of arguments for the consume. The syntax and semantics
/// of these arguments depends on the server implementation.
pub arguments: Option<amqp::Table>,
}

pub struct BindQueueConfig {
@@ -88,10 +84,6 @@ pub struct BindQueueConfig {
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,

/// A set of arguments for the binding. The syntax and semantics
/// of these arguments depends on the exchange class.
pub arguments: Option<amqp::Table>,
}

pub enum ExchangeType {
@@ -192,11 +184,6 @@ pub struct ExchangeConfig {
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,

/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
pub arguments: Option<amqp::Table>,
}

pub struct QueueConfig {
@@ -274,11 +261,6 @@ pub struct QueueConfig {
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,

/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
pub arguments: Option<amqp::Table>,
}

pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
@@ -314,48 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amq
Ok(session)
}

pub trait TypedWrappers {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError>
where
T: amqp::Consumer + 'static;

fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>;

fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError>;

fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError>;
pub trait ChannelExt {
type Error;
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>;
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
}

impl TypedWrappers for amqp::Channel {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError>
where
T: amqp::Consumer + 'static,
{
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new),
)
}
pub trait ConsumerExt<T> {
type Error;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>;
}

fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError> {
impl ChannelExt for amqp::Channel {
type Error = amqp::AMQPError;

fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
self.exchange_declare(
config.exchange,
config.exchange_type.into(),
@@ -364,35 +320,50 @@ impl TypedWrappers for amqp::Channel {
config.auto_delete,
config.internal,
config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new),
)
amqp::Table::new(),
)?;
Ok(())
}

fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError> {
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> {
self.queue_declare(
config.queue,
config.passive,
config.durable,
config.exclusive,
config.auto_delete,
config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new),
)
amqp::Table::new(),
)?;
Ok(())
}

fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError> {
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> {
self.queue_bind(
config.queue,
config.exchange,
config.routing_key.unwrap_or_else(|| "".to_owned()),
config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new),
)
amqp::Table::new(),
)?;
Ok(())
}
}

impl<T: amqp::Consumer + 'static> ConsumerExt<T> for amqp::Channel {
type Error = amqp::AMQPError;

fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> {
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
amqp::Table::new(),
)?;
Ok(())
}
}