Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cdf8cbb6e028
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: fd1b7967b112
Choose a head ref
  • 4 commits
  • 6 files changed
  • 1 contributor

Commits on May 24, 2020

  1. lapin: 1.0.0-beta4 -> 1.0.0-rc6

    CloseOnDrop was removed so channels can be used directly now.
    LnL7 committed May 24, 2020
    Copy the full SHA
    69c0fbb View commit details
  2. cargo update

    LnL7 committed May 24, 2020
    Copy the full SHA
    d053fd2 View commit details
  3. update crates

    LnL7 committed May 24, 2020
    Copy the full SHA
    a8b2894 View commit details
  4. Merge pull request #501 from LnL7/cargo-update

    cargo update
    LnL7 authored May 24, 2020
    Copy the full SHA
    fd1b796 View commit details
Showing with 2,199 additions and 883 deletions.
  1. +602 −365 Cargo.lock
  2. +1,583 −501 Cargo.nix
  3. +1 −1 crate-hashes.json
  4. +1 −1 ofborg/Cargo.toml
  5. +9 −11 ofborg/src/easylapin.rs
  6. +3 −4 ofborg/src/stats.rs
967 changes: 602 additions & 365 deletions Cargo.lock

Large diffs are not rendered by default.

2,084 changes: 1,583 additions & 501 deletions Cargo.nix

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crate-hashes.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git#f9aec2f40aef69a459f26003ce47048f8e2a08d1)": "09k6fl7l0rcwilnckdfv3smiv1ilrwi1jxmrrkjwbrj64lky3jdy",
"amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git#b58edf8822072688d882966f7427f0a9e67aee78)": "0a0n8h71lnsl2rbi6v1zmy015f7hl91y5mgx3qzxlfrn3pjz8sy1",
"hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git#5e656ba35ab4ee74aa72b3b5c3a62e1bf351ff6a)": "1p7rn8y71fjwfag65437gz7a56pysz9n69smaknvblyxpjdzmh4d"
}
2 changes: 1 addition & 1 deletion ofborg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -29,4 +29,4 @@ chrono = "0.4.6"
separator = "0.4.1"

async-std = "1.5.0"
lapin = "1.0.0-beta4"
lapin = "1.0.0-rc6"
20 changes: 9 additions & 11 deletions ofborg/src/easylapin.rs
Original file line number Diff line number Diff line change
@@ -18,12 +18,10 @@ use lapin::options::{
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::{AMQPValue, FieldTable};
use lapin::{
BasicProperties, Channel, CloseOnDrop, Connection, ConnectionProperties, ExchangeKind,
};
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind};
use tracing::{debug, trace};

pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapin::Error> {
pub fn from_config(cfg: &RabbitMQConfig) -> Result<Connection, lapin::Error> {
let mut props = FieldTable::default();
props.insert(
"ofborg_version".into(),
@@ -34,7 +32,7 @@ pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapi
task::block_on(Connection::connect(&cfg.as_uri(), opts))
}

impl ChannelExt for CloseOnDrop<Channel> {
impl ChannelExt for Channel {
type Error = lapin::Error;

fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
@@ -81,7 +79,7 @@ impl ChannelExt for CloseOnDrop<Channel> {
}
}

impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {
type Error = lapin::Error;
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;

@@ -117,7 +115,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop<Channel> {

/// Same as a regular channel, but without prefetching,
/// used for services with multiple instances.
pub struct WorkerChannel(pub CloseOnDrop<Channel>);
pub struct WorkerChannel(pub Channel);

impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
type Error = lapin::Error;
@@ -130,12 +128,12 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel {
}

pub struct ChannelNotificationReceiver<'a> {
channel: &'a mut CloseOnDrop<lapin::Channel>,
channel: &'a mut lapin::Channel,
deliver: &'a Delivery,
}

impl<'a> ChannelNotificationReceiver<'a> {
pub fn new(channel: &'a mut CloseOnDrop<lapin::Channel>, deliver: &'a Delivery) -> Self {
pub fn new(channel: &'a mut lapin::Channel, deliver: &'a Delivery) -> Self {
ChannelNotificationReceiver { channel, deliver }
}
}
@@ -149,7 +147,7 @@ impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {

// FIXME the consumer trait for SimpleWorker and SimpleNotifyWorker conflict,
// but one could probably be implemented in terms of the other instead.
pub struct NotifyChannel(pub CloseOnDrop<Channel>);
pub struct NotifyChannel(pub Channel);

impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
type Error = lapin::Error;
@@ -190,7 +188,7 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
}

async fn action_deliver(
chan: &CloseOnDrop<Channel>,
chan: &Channel,
deliver: &Delivery,
action: Action,
) -> Result<(), lapin::Error> {
7 changes: 3 additions & 4 deletions ofborg/src/stats.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ use amqp::protocol::basic;
use amqp::Basic;
use async_std::task;
use lapin::options::BasicPublishOptions;
use lapin::CloseOnDrop;

include!(concat!(env!("OUT_DIR"), "/events.rs"));

@@ -59,16 +58,16 @@ impl SysEvents for RabbitMQ<amqp::Channel> {
}
}

impl RabbitMQ<CloseOnDrop<lapin::Channel>> {
pub fn from_lapin(identity: &str, channel: CloseOnDrop<lapin::Channel>) -> Self {
impl RabbitMQ<lapin::Channel> {
pub fn from_lapin(identity: &str, channel: lapin::Channel) -> Self {
RabbitMQ {
identity: identity.to_owned(),
channel,
}
}
}

impl SysEvents for RabbitMQ<CloseOnDrop<lapin::Channel>> {
impl SysEvents for RabbitMQ<lapin::Channel> {
fn notify(&mut self, event: Event) {
let props = lapin::BasicProperties::default().with_content_type("application/json".into());
task::block_on(async {