To consume only the latest message from a Kafka topic using Kafka Confluent Consumer in C#, you can use the Consume
method with the FromBeginning
and MaxPollIntervalMs
parameters set. The code would look like this:
using Confluent.Kafka;
var conf = new ConsumerConfig
{
GroupId = "my-consumer-group",
BootstrapServers = "localhost:9092",
};
using (var consumer = new ConsumerBuilder<Ignore, string>(conf)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe("my-topic");
// Poll for the latest message
consumer.Consume(new TimeSpan(0, 0, 1));
// Consume the latest message
var result = consumer.Consume(TimeSpan.FromSeconds(1));
Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
}
Note that the Consume
method with a timespan parameter is used twice in this example. The first call with a short timeout (TimeSpan.FromSeconds(1)
) is used to poll for the latest message in the topic. The second call with a longer timeout (TimeSpan.FromSeconds(10)
) is used to consume the actual message. The MaxPollIntervalMs
parameter is set to a large value (e.g. 10 seconds) to ensure that the consumer does not close the connection before the second Consume
call is made.
Please start posting anonymously - your entry will be published after you log in or create a new account. This space is reserved only for answers. If you would like to engage in a discussion, please instead post a comment under the question or an answer that you would like to discuss
Asked: 2022-01-04 11:00:00 +0000
Seen: 1 times
Last updated: Feb 27 '22
How can I deal with Expression.Error related to a column in Power Query?
How can you implement pagination in Oracle for the LISTAGG() function?
What is the process for implementing a FutureBuilder on an OnTap function in Flutter?
How can we require users to be logged in before they can access the root folders in WordPress?
In SCSS, what is the method for grouping and reusing a set of classes and styles?
How can popen() be used to direct streaming data to TAR?
How does iOS retrieve information from a BLE device?
How can Django Admin accommodate a variety of formats and locales for its input fields?