Skip to content

Commit

Permalink
Formalise and fix Producer retries and retry-ordering (#623, #1092, #…
Browse files Browse the repository at this point in the history
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
  • Loading branch information
edenhill committed Jan 10, 2018
1 parent dcbe1fc commit 7f95f09
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 64 deletions.
128 changes: 125 additions & 3 deletions INTRODUCTION.md
Expand Up @@ -140,7 +140,7 @@ for message commit acknowledgements from brokers (any value but 0, see
[`CONFIGURATION.md`](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
for specifics) then librdkafka will hold on to the message until
all expected acks have been received, gracefully handling the following events:

* Broker connection failure
* Topic leader change
* Produce errors signaled by the broker
Expand All @@ -160,9 +160,131 @@ to report the status of message delivery:

See Producer API chapter for more details on delivery report callback usage.

The delivery report callback is optional.
The delivery report callback is optional but highly recommended.


### Producer message delivery success

When a ProduceRequest is successfully handled by the broker and a
ProduceResponse is received (also called the ack) without an error code
the messages from the ProduceRequest are enqueued on the delivery report
queue (if a delivery report callback has been set) and will be passed to
the application on the next invocation rd_kafka_poll().


### Producer message delivery failure

The following sub-chapters explains how different produce errors
are handled.

If the error is retryable and there are remaining retry attempts for
the given message(s), an automatic retry will be scheduled by librdkafka.
The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.


#### Error: Timed out in transmission queue

Internal error ERR__TIMED_OUT_QUEUE.

The connectivity to the broker may be stalled due to networking contention,
local or remote system issues, etc, and the request has not yet been sent.

The producer can be certain that the message has not been sent to the broker.

This is a retryable error, but is not counted as a retry attempt
since the message was never actually transmitted.

A retry at this point will not cause duplicate messages.


#### Error: Timed out in flight to/from broker

Internal error ERR__TIMED_OUT, ERR__TRANSPORT.

Same reasons as for "Timed out in transmission queue" above, with the
difference that the message may have been sent to the broker and might
be stalling waiting for broker replicas to ack the message, or the response
could be stalled due to networking issues.
At this point the producer can't know if the message reached the broker,
nor if the broker wrote the message to disk and replicas.

This is a retryable error.

A retry at this point may cause duplicate messages.


#### Error: Temporary broker-side error

Broker errors ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS,
ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND.

These errors are considered temporary and a retry is warranted.


#### Error: Temporary errors due to stale metadata

Broker errors ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION.

These errors are considered temporary and a retry is warranted, a metadata
request is automatically sent to find a new leader for the partition.

A retry at this point will not cause duplicate messages.


#### Error: Local time out

Internal error ERR__MSG_TIMED_OUT.

The message could not be successfully transmitted before message.timeout.ms
expired, typically due to no leader being available or no broker connection.
The message may have been retried due to other errors but
those error messages are abstracted by the ERR__MSG_TIMED_OUT error code.

Since the message.timeout.ms has passed there will be no more retries.


#### Error: Permanent errors

Any other error is considered a permanent error and the message
will fail immediately, generating a delivery report event with the
distinctive error code.

The full list of permanent errors depend on the broker version and
will likely grow in the future.

Typical permanent broker errors are:
* ERR_CORRUPT_MESSAGE
* ERR_MSG_SIZE_TOO_LARGE - adjust client's or broker's `message.max.bytes`.
* ERR_UNKNOWN_TOPIC_OR_PART - topic or partition does not exist,
automatic topic creation is disabled on the
broker or the application is specifying a
partition that does not exist.
* ERR_RECORD_LIST_TOO_LARGE
* ERR_INVALID_REQUIRED_ACKS
* ERR_TOPIC_AUTHORIZATION_FAILED
* ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
* ERR_CLUSTER_AUTHORIZATION_FAILED


### Producer retries

The ProduceRequest itself is not retried, instead the messages
are put back on the internal partition queue by an insert sort
that maintains their original position (the message order is defined
at the time a message is initially appended to a partition queue, i.e., after
partitioning).
A backoff time (retry.backoff.ms) is set on the retried messages which
effectively blocks retry attempts until the backoff time has expired.


### Reordering

As for all retries, if `max.in.flight` > 1 and `retries` > 0, retried messages
may be produced out of order, since a sub-sequent message in a sub-sequent
ProduceRequest may already be in-flight (and accepted by the broker)
by the time the retry for the failing message is sent.



Expand All @@ -173,7 +295,7 @@ The delivery report callback is optional.

The librdkafka API is documented in the
[`rdkafka.h`](https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h)
header file, the configuration properties are documented in
header file, the configuration properties are documented in
[`CONFIGURATION.md`](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)

### Initialization
Expand Down
32 changes: 22 additions & 10 deletions src/rdkafka_broker.c
Expand Up @@ -530,11 +530,11 @@ static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
/* Requests in retry queue */
retry_cnt = rd_kafka_broker_bufq_timeout_scan(
rkb, 0, &rkb->rkb_retrybufs, NULL,
RD_KAFKA_RESP_ERR__TIMED_OUT, now);
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now);
/* Requests in local queue not sent yet. */
q_cnt = rd_kafka_broker_bufq_timeout_scan(
rkb, 0, &rkb->rkb_outbufs, &req_cnt,
RD_KAFKA_RESP_ERR__TIMED_OUT, now);
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now);

if (req_cnt + retry_cnt + q_cnt > 0) {
rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_BROKER,
Expand Down Expand Up @@ -2357,6 +2357,7 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
rd_ts_t *next_wakeup) {
int cnt = 0;
int r;
rd_kafka_msg_t *rkm;

rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
"%.*s [%"PRId32"] %i+%i msgs",
Expand Down Expand Up @@ -2386,21 +2387,20 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
if (r == 0)
return 0;

rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
rd_dassert(rkm != NULL);

/* Attempt to fill the batch size, but limit
* our waiting to queue.buffering.max.ms
* and batch.num.messages. */
if (r < rkb->rkb_rk->rk_conf.batch_num_messages) {
rd_kafka_msg_t *rkm_oldest;
rd_ts_t wait_max;

rkm_oldest = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
if (unlikely(!rkm_oldest))
return 0;

/* Calculate maximum wait-time to
* honour queue.buffering.max.ms contract. */
wait_max = rd_kafka_msg_enq_time(rkm_oldest) +
/* Calculate maximum wait-time to honour
* queue.buffering.max.ms contract. */
wait_max = rd_kafka_msg_enq_time(rkm) +
(rkb->rkb_rk->rk_conf.buffering_max_ms * 1000);

if (wait_max > now) {
if (wait_max < *next_wakeup)
*next_wakeup = wait_max;
Expand All @@ -2410,6 +2410,13 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
}
}

/* Honour retry.backoff.ms. */
if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
*next_wakeup = rkm->rkm_u.producer.ts_backoff;
/* Wait for backoff to expire */
return 0;
}

/* Send Produce requests for this toppar */
while (1) {
r = rd_kafka_ProduceRequest(rkb, rktp);
Expand All @@ -2419,6 +2426,11 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
break;
}

/* If there are messages still in the queue, make the next
* wakeup immediate. */
if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0)
*next_wakeup = now;

return cnt;
}

Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_msg.c
Expand Up @@ -557,7 +557,25 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
}

/**
* @brief Find the insert position (i.e., the previous element)
* for message sequence \p msgseq.
*
* @remark This needs to be true: rkmq.first < msgseq < rkmq.last
*/
rd_kafka_msg_t *rd_kafka_msgq_find_msgseq_pos (rd_kafka_msgq_t *rkmq,
uint64_t msgseq) {
rd_kafka_msg_t *rkm, *last = NULL;

TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
if (rkm->rkm_u.producer.msgseq > msgseq)
return last;
last = rkm;
}

rd_assert(!*"msgseq outside rkmq window");
return NULL; /* NOTREACHED */
}



Expand Down
36 changes: 35 additions & 1 deletion src/rdkafka_msg.h
Expand Up @@ -92,6 +92,11 @@ typedef struct rd_kafka_msg_s {
struct {
rd_ts_t ts_timeout; /* Message timeout */
rd_ts_t ts_enq; /* Enqueue/Produce time */
rd_ts_t ts_backoff; /* Backoff next Produce until
* this time. */
uint64_t msgseq; /* Message sequence number,
* used to maintain ordering. */
int retries; /* Number of retries so far */
} producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq rkm_u.producer.ts_enq
Expand Down Expand Up @@ -146,8 +151,9 @@ rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {



TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
typedef struct rd_kafka_msgq_s {
TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;
struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
rd_atomic32_t rkmq_msg_cnt;
rd_atomic64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
Expand All @@ -158,6 +164,9 @@ typedef struct rd_kafka_msgq_s {
#define RD_KAFKA_MSGQ_FOREACH(elm,head) \
TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)

/* @brief Check if queue is empty. Proper locks must be held. */
#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)

/**
* Returns the number of messages in the specified queue.
*/
Expand Down Expand Up @@ -262,6 +271,29 @@ rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
return rkm;
}


static RD_INLINE
int rd_kafka_msg_cmp_msgseq (const void *_a, const void *_b) {
const rd_kafka_msg_t *a = _a, *b = _b;

return a->rkm_u.producer.msgseq - b->rkm_u.producer.msgseq;
}

/**
* @brief Insert message at its sorted position using the msgseq.
* @remark This is an O(n) operation.
* @warning The message must have a msgseq set.
*/
static RD_INLINE RD_UNUSED
void rd_kafka_msgq_enq_sorted (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
rd_dassert(rkm->rkm_u.producer.msgseq != 0);
TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
rkm_link, rd_kafka_msg_cmp_msgseq);
rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}

/**
* Insert message at head of message queue.
*/
Expand Down Expand Up @@ -297,6 +329,8 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_kafka_msgq_t *timedout,
rd_ts_t now);

rd_kafka_msg_t *rd_kafka_msgq_find_msgseq_pos (rd_kafka_msgq_t *rkmq,
uint64_t msgseq);

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock);
Expand Down
11 changes: 10 additions & 1 deletion src/rdkafka_msgset_writer.c
Expand Up @@ -699,6 +699,8 @@ rd_kafka_msgset_writer_write_msg (rd_kafka_msgset_writer_t *msetw,
/**
* @brief Write as many messages from the given message queue to
* the messageset.
*
* May not write any messages.
*/
static void
rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
Expand All @@ -713,10 +715,11 @@ rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
rd_ts_t MaxTimestamp = 0;
rd_kafka_msg_t *rkm;
int msgcnt = 0;
const rd_ts_t now = rd_clock();

/* Internal latency calculation base.
* Uses rkm_ts_timeout which is enqueue time + timeout */
int_latency_base = rd_clock() +
int_latency_base = now +
(rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);

/* Acquire BaseTimestamp from first message. */
Expand All @@ -740,6 +743,12 @@ rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
break;
}

if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
/* Stop accumulation when we've reached
* a message with a retry backoff in the future */
break;
}

/* Move message to buffer's queue */
rd_kafka_msgq_deq(rkmq, rkm, 1);
rd_kafka_msgq_enq(&rkbuf->rkbuf_msgq, rkm);
Expand Down

0 comments on commit 7f95f09

Please sign in to comment.