Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

To consume only the latest message from a Kafka topic using Kafka Confluent Consumer in C#, you can use the Consume method with the FromBeginning and MaxPollIntervalMs parameters set. The code would look like this:

using Confluent.Kafka;

var conf = new ConsumerConfig
{
    GroupId = "my-consumer-group",
    BootstrapServers = "localhost:9092",
};

using (var consumer = new ConsumerBuilder<Ignore, string>(conf)
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .Build())
{
    consumer.Subscribe("my-topic");

    // Poll for the latest message
    consumer.Consume(new TimeSpan(0, 0, 1));

    // Consume the latest message
    var result = consumer.Consume(TimeSpan.FromSeconds(1));

    Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
}

Note that the Consume method with a timespan parameter is used twice in this example. The first call with a short timeout (TimeSpan.FromSeconds(1)) is used to poll for the latest message in the topic. The second call with a longer timeout (TimeSpan.FromSeconds(10)) is used to consume the actual message. The MaxPollIntervalMs parameter is set to a large value (e.g. 10 seconds) to ensure that the consumer does not close the connection before the second Consume call is made.