/
KafkaProducer.java
1238 lines (1161 loc) · 68.2 KB
/
KafkaProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
/**
* A Kafka client that publishes records to the Kafka cluster.
* <P>
* The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than
* having multiple instances.
* <p>
* Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value
* pairs.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
* props.put("acks", "all");
* props.put("retries", 0);
* props.put("batch.size", 16384);
* props.put("linger.ms", 1);
* props.put("buffer.memory", 33554432);
* props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
* props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
*
* Producer<String, String> producer = new KafkaProducer<>(props);
* for (int i = 0; i < 100; i++)
* producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
*
* producer.close();
* }</pre>
* <p>
* The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
* to the cluster. Failure to close the producer after use will leak these resources.
* <p>
* The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends
* and immediately returns. This allows the producer to batch together individual records for efficiency.
* <p>
* The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting
* we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
* <p>
* If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
* as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
* <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
* <p>
* The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
* the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
* generally have one of these buffers for each active partition).
* <p>
* By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
* want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
* instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
* arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
* likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
* would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
* records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
* batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
* efficient requests when not under maximal load at the cost of a small amount of latency.
* <p>
* The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
* are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
* exhausted additional send calls will block. The threshold for time to block is determined by <code>max.block.ms</code> after which it throws
* a TimeoutException.
* <p>
* The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with
* their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or
* {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types.
* <p>
* From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer.
* The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular
* producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages
* to multiple partitions (and topics!) atomically.
* </p>
* <p>
* To enable idempotence, the <code>enable.idempotence</code> configuration must be set to true. If set, the
* <code>retries</code> config will default to <code>Integer.MAX_VALUE</code> and the <code>acks</code> config will
* default to <code>all</code>. There are no API changes for the idempotent producer, so existing applications will
* not need to be modified to take advantage of this feature.
* </p>
* <p>
* To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot
* be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the <code>retries</code>
* config unset, as it will be defaulted to <code>Integer.MAX_VALUE</code>. Additionally, if a {@link #send(ProducerRecord)}
* returns an error even with infinite retries (for instance if the message expires in the buffer before being sent),
* then it is recommended to shut down the producer and check the contents of the last produced message to ensure that
* it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.
* </p>
* <p>To use the transactional producer and the attendant APIs, you must set the <code>transactional.id</code>
* configuration property. If the <code>transactional.id</code> is set, idempotence is automatically enabled along with
* the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured
* for durability. In particular, the <code>replication.factor</code> should be at least <code>3</code>, and the
* <code>min.insync.replicas</code> for these topics should be set to 2. Finally, in order for transactional guarantees
* to be realized from end-to-end, the consumers must be configured to read only committed messages as well.
* </p>
* <p>
* The purpose of the <code>transactional.id</code> is to enable transaction recovery across multiple sessions of a
* single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application.
* As such, it should be unique to each producer instance running within a partitioned application.
* </p>
* <p>All the new transactional APIs are blocking and will throw exceptions on failure. The example
* below illustrates how the new APIs are meant to be used. It is similar to the example above, except that all
* 100 messages are part of a single transaction.
* </p>
* <p>
* <pre>
* {@code
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
* props.put("transactional.id", "my-transactional-id");
* Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
*
* producer.initTransactions();
*
* try {
* producer.beginTransaction();
* for (int i = 0; i < 100; i++)
* producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
* producer.commitTransaction();
* } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
* // We can't recover from these exceptions, so our only option is to close the producer and exit.
* producer.close();
* } catch (KafkaException e) {
* // For all other exceptions, just abort the transaction and try again.
* producer.abortTransaction();
* }
* producer.close();
* } </pre>
* </p>
* <p>
* As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the
* {@link #beginTransaction()} and {@link #commitTransaction()} calls will be part of a single transaction. When the
* <code>transactional.id</code> is specified, all messages sent by the producer must be part of a transaction.
* </p>
* <p>
* The transactional producer uses exceptions to communicate error states. In particular, it is not required
* to specify callbacks for <code>producer.send()</code> or to call <code>.get()</code> on the returned Future: a
* <code>KafkaException</code> would be thrown if any of the
* <code>producer.send()</code> or transactional calls hit an irrecoverable error during a transaction. See the {@link #send(ProducerRecord)}
* documentation for more details about detecting errors from a transactional send.
* </p>
* </p>By calling
* <code>producer.abortTransaction()</code> upon receiving a <code>KafkaException</code> we can ensure that any
* successful writes are marked as aborted, hence keeping the transactional guarantees.
* </p>
* <p>
* This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support
* certain client features. For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an
* <code>UnsupportedVersionException</code> when invoking an API that is not available in the running broker version.
* </p>
*/
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
private final String clientId;
// Visible for testing
final Metrics metrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final Metadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final ExtendedSerializer<K> keySerializer;
private final ExtendedSerializer<V> valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final int requestTimeoutMs;
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
private TransactionalRequestResult initTransactionsResult;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
* either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* <p>
* Note: after creating a {@code KafkaProducer} you must always {@link #close()} it to avoid resource leaks.
* @param configs The producer configs
*
*/
public KafkaProducer(final Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null, null, null);
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
* either the string "42" or the integer 42).
* <p>
* Note: after creating a {@code KafkaProducer} you must always {@link #close()} it to avoid resource leaks.
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer,
valueSerializer,
null,
null);
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* <p>
* Note: after creating a {@code KafkaProducer} you must always {@link #close()} it to avoid resource leaks.
* @param properties The producer configs
*/
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null, null, null);
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* <p>
* Note: after creating a {@code KafkaProducer} you must always {@link #close()} it to avoid resource leaks.
* @param properties The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null);
}
@SuppressWarnings("unchecked")
// visible for testing
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config, logContext, log);
int retries = configureRetries(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null, log);
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
}
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
this.metadata,
clientId,
maxInflightRequests,
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = false;
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true;
boolean userConfiguredTransactions = false;
if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
userConfiguredTransactions = true;
boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
if (userConfiguredTransactions)
idempotenceEnabled = true;
if (idempotenceEnabled) {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
else
log.info("Instantiated an idempotent producer.");
}
return transactionManager;
}
private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
boolean userConfiguredRetries = false;
if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
userConfiguredRetries = true;
}
if (idempotenceEnabled && !userConfiguredRetries) {
// We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make
// this the default.
log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
return Integer.MAX_VALUE;
}
if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}
private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}
private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
boolean userConfiguredAcks = false;
short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
userConfiguredAcks = true;
}
if (idempotenceEnabled && !userConfiguredAcks) {
log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
return -1;
}
if (idempotenceEnabled && acks != -1) {
throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}
return acks;
}
private static int parseAcks(String acksString) {
try {
return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
} catch (NumberFormatException e) {
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
}
}
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* Note that this method will raise {@link TimeoutException} if the transactional state cannot
* be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
* if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
* initialized, this method should no longer be used.
*
* @throws IllegalStateException if no transactional.id has been configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
throwIfNoTransactionManager();
if (initTransactionsResult == null) {
initTransactionsResult = transactionManager.initializeTransactions();
sender.wakeup();
}
try {
if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
initTransactionsResult = null;
} else {
throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms.");
}
} catch (InterruptedException e) {
throw new InterruptException("Initialize transactions interrupted.", e);
}
}
/**
* Should be called before the start of each new transaction. Note that prior to the first invocation
* of this method, you must invoke {@link #initTransactions()} exactly one time.
*
* @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
* has not yet been invoked
* @throws ProducerFencedException if another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
*/
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
transactionManager.beginTransaction();
}
/**
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
* {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
* {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
* and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
sender.wakeup();
result.await();
}
/**
* Commits the ongoing transaction. This method will flush any unsent records before actually committing the transaction.
*
* Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable
* errors, this method will throw the last received exception immediately and the transaction will not be committed.
* So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
*/
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await();
}
/**
* Aborts the ongoing transaction. Any unflushed produce messages will be aborted when this call is made.
* This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a
* {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
*/
public void abortTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await();
}
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
* <p>
* The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
* records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
* response after each one.
* <p>
* The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to, the offset
* it was assigned and the timestamp of the record. If
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime} is used by the topic, the timestamp
* will be the user provided timestamp or the record send time if the user did not specify a timestamp for the
* record. If {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime} is used for the
* topic, the timestamp will be the Kafka broker local time when the message is appended.
* <p>
* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
* {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
* get()} on this future will block until the associated request completes and then return the metadata for the record
* or throw any exception that occurred while sending the record.
* <p>
* If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
*
* <pre>
* {@code
* byte[] key = "key".getBytes();
* byte[] value = "value".getBytes();
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
* producer.send(record).get();
* }</pre>
* <p>
* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
*
* <pre>
* {@code
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
* producer.send(myRecord,
* new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
* if(e != null) {
* e.printStackTrace();
* } else {
* System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* }
* });
* }
* </pre>
*
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
* <pre>
* {@code
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
* }
* </pre>
* <p>
* When used as part of a transaction, it is not necessary to define a callback or check the result of the future
* in order to detect errors from <code>send</code>. If any of the send calls failed with an irrecoverable error,
* the final {@link #commitTransaction()} call will fail and throw the exception from the last failed send. When
* this happens, your application should call {@link #abortTransaction()} to reset the state and continue to send
* data.
* </p>
* <p>
* Some transactional send errors cannot be resolved with a call to {@link #abortTransaction()}. In particular,
* if a transactional send finishes with a {@link ProducerFencedException}, a {@link org.apache.kafka.common.errors.OutOfOrderSequenceException},
* a {@link org.apache.kafka.common.errors.UnsupportedVersionException}, or an
* {@link org.apache.kafka.common.errors.AuthorizationException}, then the only option left is to call {@link #close()}.
* Fatal errors cause the producer to enter a defunct state in which future API calls will continue to raise
* the same underyling error wrapped in a new {@link KafkaException}.
* </p>
* <p>
* It is a similar picture when idempotence is enabled, but no <code>transactional.id</code> has been configured.
* In this case, {@link org.apache.kafka.common.errors.UnsupportedVersionException} and
* {@link org.apache.kafka.common.errors.AuthorizationException} are considered fatal errors. However,
* {@link ProducerFencedException} does not need to be handled. Additionally, it is possible to continue
* sending after receiving an {@link org.apache.kafka.common.errors.OutOfOrderSequenceException}, but doing so
* can result in out of order delivery of pending messages. To ensure proper ordering, you should close the
* producer and create a new instance.
* </p>
* <p>
* If the message format of the destination topic is not upgraded to 0.11.0.0, idempotent and transactional
* produce requests will fail with an {@link org.apache.kafka.common.errors.UnsupportedForMessageFormatException}
* error. If this is encountered during a transaction, it is possible to abort and continue. But note that future
* sends to the same topic will continue receiving the same exception until the topic is upgraded.
* </p>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException fatal error indicating that the producer is not allowed to write
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>.
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
// Verify that this producer instance has not been closed. This method throws IllegalStateException if the producer
// has already been closed.
private void throwIfProducerClosed() {
if (ioThread == null || !ioThread.isAlive())
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
private void setReadOnly(Headers headers) {
if (headers instanceof RecordHeaders) {
((RecordHeaders) headers).setReadOnly();
}
}
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic);
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
// In case we already have cached metadata for the topic, but the requested partition is greater
// than expected, issue an update request only once. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
/**
* Validate that the record size isn't too large
*/
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
/**
* Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
* A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
* however no guarantee is made about the completion of records sent after the flush call begins.
* <p>
* This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
* gives a convenient way to ensure all previously sent messages have actually completed.
* <p>
* This example shows how to consume from one Kafka topic and produce to another Kafka topic:
* <pre>
* {@code
* for(ConsumerRecord<String, String> record: consumer.poll(100))
* producer.send(new ProducerRecord("my-topic", record.key(), record.value());
* producer.flush();
* consumer.commit();
* }
* </pre>
*
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur