Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: NixOS/ofborg
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 7d1014f1c2c9
Choose a base ref
...
head repository: NixOS/ofborg
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c3c0ee723e1e
Choose a head ref
  • 2 commits
  • 1 file changed
  • 2 contributors

Commits on Apr 30, 2020

  1. limit prefetch count to one for lapin channels

    Otherwise the consumer will pull all messages in the queue so other
    instances don't have a chance to share the load.
    LnL7 committed Apr 30, 2020
    Copy the full SHA
    e0fce0e View commit details
  2. Merge pull request #477 from LnL7/lapin-qos

    limit prefetch count to one for lapin channels
    grahamc authored Apr 30, 2020
    Copy the full SHA
    c3c0ee7 View commit details
Showing with 5 additions and 1 deletion.
  1. +5 −1 ofborg/src/easylapin.rs
6 changes: 5 additions & 1 deletion ofborg/src/easylapin.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use async_std::stream::StreamExt;
use async_std::task;
use lapin::message::Delivery;
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::{AMQPValue, FieldTable};
@@ -85,6 +85,8 @@ impl<W: SimpleWorker + 'static> ConsumerExt<W> for CloseOnDrop<Channel> {
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;

fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.basic_qos(1, BasicQosOptions::default()))?;

let mut consumer = task::block_on(self.basic_consume(
&config.queue,
&config.consumer_tag,
@@ -133,6 +135,8 @@ impl<W: SimpleNotifyWorker + 'static> ConsumerExt<W> for NotifyChannel {
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;

fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?;

let mut consumer = task::block_on(self.0.basic_consume(
&config.queue,
&config.consumer_tag,