Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a508deca4231
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 1202cd7decb3
Choose a head ref
  • 3 commits
  • 8 files changed
  • 1 contributor

Commits on Dec 4, 2018

  1. Copy the full SHA
    4de520a View commit details
  2. Queue: make a type

    grahamc committed Dec 4, 2018
    Copy the full SHA
    26bef5a View commit details
  3. Copy the full SHA
    1202cd7 View commit details
12 changes: 6 additions & 6 deletions ofborg/src/bin/builder.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use ofborg::checkout;
use ofborg::notifyworker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};


fn main() {
@@ -55,7 +55,7 @@ fn main() {
if cfg.runner.build_all_jobs != Some(true) {
queue_name = channel
.declare_queue(easyamqp::QueueConfig {
queue: &format!("build-inputs-{}", cfg.nix.system.clone()),
queue: Queue(&format!("build-inputs-{}", cfg.nix.system.clone())),
passive: false,
durable: true,
exclusive: false,
@@ -69,7 +69,7 @@ fn main() {
warn!("developing and have Graham's permission!");
queue_name = channel
.declare_queue(easyamqp::QueueConfig {
queue: "",
queue: Queue(""),
passive: false,
durable: false,
exclusive: true,
@@ -82,8 +82,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: &queue_name,
exchange: "build-jobs",
queue: Queue(&queue_name),
exchange: Exchange("build-jobs"),
routing_key: None,
no_wait: false,
arguments: None,
@@ -99,7 +99,7 @@ fn main() {
cfg.runner.identity.clone(),
)),
easyamqp::ConsumeConfig {
queue: &queue_name,
queue: Queue(&queue_name),
consumer_tag: &format!("{}-builder", cfg.whoami()),
no_local: false,
no_ack: false,
12 changes: 6 additions & 6 deletions ofborg/src/bin/evaluation-filter.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};


fn main() {
@@ -45,7 +45,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs",
queue: Queue("mass-rebuild-check-jobs"),
passive: false,
durable: true,
exclusive: false,
@@ -57,7 +57,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-inputs",
queue: Queue("mass-rebuild-check-inputs"),
passive: false,
durable: true,
exclusive: false,
@@ -69,8 +69,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "mass-rebuild-check-inputs",
exchange: "github-events",
queue: Queue("mass-rebuild-check-inputs"),
exchange: Exchange("github-events"),
routing_key: Some("pull_request.nixos/nixpkgs"),
no_wait: false,
arguments: None,
@@ -84,7 +84,7 @@ fn main() {
cfg.acl(),
)),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-inputs",
queue: Queue("mass-rebuild-check-inputs"),
consumer_tag: &format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
10 changes: 5 additions & 5 deletions ofborg/src/bin/github-comment-filter.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};


fn main() {
@@ -57,7 +57,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "build-inputs",
queue: Queue("build-inputs"),
passive: false,
durable: true,
exclusive: false,
@@ -69,8 +69,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "build-inputs",
exchange: "github-events",
queue: Queue("build-inputs"),
exchange: Exchange("github-events"),
routing_key: Some("issue_comment.*"),
no_wait: false,
arguments: None,
@@ -85,7 +85,7 @@ fn main() {
cfg.github(),
)),
easyamqp::ConsumeConfig {
queue: "build-inputs",
queue: Queue("build-inputs"),
consumer_tag: &format!("{}-github-comment-filter", cfg.whoami()),
no_local: false,
no_ack: false,
10 changes: 5 additions & 5 deletions ofborg/src/bin/github-comment-poster.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};

fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
@@ -39,7 +39,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "build-results",
queue: Queue("build-results"),
passive: false,
durable: true,
exclusive: false,
@@ -51,8 +51,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "build-results",
exchange: "build-results",
queue: Queue("build-results"),
exchange: Exchange("build-results"),
routing_key: None,
no_wait: false,
arguments: None,
@@ -66,7 +66,7 @@ fn main() {
cfg.github_app(),
)),
easyamqp::ConsumeConfig {
queue: "build-results",
queue: Queue("build-results"),
consumer_tag: &format!("{}-github-comment-poster", cfg.whoami()),
no_local: false,
no_ack: false,
10 changes: 5 additions & 5 deletions ofborg/src/bin/log-message-collector.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};


fn main() {
@@ -36,7 +36,7 @@ fn main() {

let queue_name = channel
.declare_queue(easyamqp::QueueConfig {
queue: "",
queue: Queue(""),
passive: false,
durable: false,
exclusive: true,
@@ -49,8 +49,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: &queue_name,
exchange: "logs",
queue: Queue(&queue_name),
exchange: Exchange("logs"),
routing_key: Some("*.*"),
no_wait: false,
arguments: None,
@@ -64,7 +64,7 @@ fn main() {
100,
)),
easyamqp::ConsumeConfig {
queue: &queue_name,
queue: Queue(&queue_name),
consumer_tag: &format!("{}-log-collector", cfg.whoami()),
no_local: false,
no_ack: false,
6 changes: 3 additions & 3 deletions ofborg/src/bin/mass-rebuilder.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use ofborg::stats;
use ofborg::worker;
use amqp::Basic;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
use ofborg::easyamqp::{Queue, TypedWrappers};

fn main() {
let memory_info = sys_info::mem_info().expect("Unable to get memory information from OS");
@@ -55,7 +55,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs",
queue: Queue("mass-rebuild-check-jobs"),
passive: false,
durable: true,
exclusive: false,
@@ -70,7 +70,7 @@ fn main() {
.consume(
worker::new(mrw),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-jobs",
queue: Queue("mass-rebuild-check-jobs"),
consumer_tag: &format!("{}-mass-rebuild-checker", cfg.whoami()),
no_local: false,
no_ack: false,
10 changes: 5 additions & 5 deletions ofborg/src/bin/stats.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use std::env;
use ofborg::{easyamqp, tasks, worker, config, stats};

use amqp::Basic;
use ofborg::easyamqp::{Exchange, TypedWrappers};
use ofborg::easyamqp::{Exchange, Queue, TypedWrappers};
use hyper::server::{Request, Response, Server};

use std::thread;
@@ -49,7 +49,7 @@ fn main() {

channel
.declare_queue(easyamqp::QueueConfig {
queue: "stats-events",
queue: Queue("stats-events"),
passive: false,
durable: true,
exclusive: false,
@@ -61,8 +61,8 @@ fn main() {

channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "stats-events",
exchange: "stats",
queue: Queue("stats-events"),
exchange: Exchange("stats"),
routing_key: None,
no_wait: false,
arguments: None,
@@ -74,7 +74,7 @@ fn main() {
.consume(
worker::new(collector),
easyamqp::ConsumeConfig {
queue: "stats-events",
queue: Queue("stats-events"),
consumer_tag: &format!("{}-prometheus-stats-collector", cfg.whoami()),
no_local: false,
no_ack: false,
21 changes: 11 additions & 10 deletions ofborg/src/easyamqp.rs
Original file line number Diff line number Diff line change
@@ -4,9 +4,12 @@ use ofborg::config::RabbitMQConfig;
use amqp;
use amqp::Basic;

pub struct Exchange<'a> (pub &'a str);
pub struct Queue<'a> (pub &'a str);

pub struct ConsumeConfig<'a> {
/// Specifies the name of the queue to consume from.
pub queue: &'a str,
pub queue: Queue<'a>,

/// Specifies the identifier for the consumer. The consumer tag is
/// local to a channel, so two clients can use the same consumer
@@ -56,7 +59,7 @@ pub struct BindQueueConfig<'a> {
///
/// The client MUST NOT attempt to bind a queue that does not
/// exist. Error code: not-found
pub queue: &'a str,
pub queue: Queue<'a>,

/// Name of the exchange to bind to.
///
@@ -65,7 +68,7 @@ pub struct BindQueueConfig<'a> {
///
/// The server MUST accept a blank exchange name to mean the
/// default exchange.
pub exchange: &'a str,
pub exchange: Exchange<'a>,

/// Specifies the routing key for the binding. The routing key is
/// used for routing messages depending on the exchange
@@ -115,8 +118,6 @@ impl <'a> Into<&'a str> for ExchangeType<'a> {
}
}


pub struct Exchange<'a> (pub &'a str);
pub struct ExchangeConfig<'a> {
/// Exchange names starting with "amq." are reserved for
/// pre-declared and standardised exchanges. The client MAY
@@ -215,7 +216,7 @@ pub struct QueueConfig<'a> {
/// The queue name can be empty, or a sequence of these
/// characters: letters, digits, hyphen, underscore, period, or
/// colon. Error code: precondition-failed
pub queue: &'a str,
pub queue: Queue<'a>,

/// If set, the server will reply with Declare-Ok if the queue
/// already exists with the same name, and raise an error if not.
@@ -345,7 +346,7 @@ impl TypedWrappers for amqp::Channel {
{
self.basic_consume(
callback,
config.queue,
config.queue.0,
config.consumer_tag,
config.no_local,
config.no_ack,
@@ -377,7 +378,7 @@ impl TypedWrappers for amqp::Channel {
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError> {
self.queue_declare(
config.queue,
config.queue.0,
config.passive,
config.durable,
config.exclusive,
@@ -392,8 +393,8 @@ impl TypedWrappers for amqp::Channel {
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError> {
self.queue_bind(
config.queue,
config.exchange,
config.queue.0,
config.exchange.0,
config.routing_key.unwrap_or(""),
config.no_wait,
config.arguments.unwrap_or(amqp::Table::new()),