To verify the existence of topics in health checks using NestJS Kafka, you can use the Admin
module provided by the kafka-node
library. Here's an example implementation of a NestJS health check that verifies the existence of a Kafka topic:
import { Injectable } from '@nestjs/common';
import { HealthCheck, HealthCheckResult, HealthIndicator } from '@nestjs/terminus';
import { KafkaClient, Admin } from 'kafka-node';
@Injectable()
export class KafkaHealthIndicator extends HealthIndicator {
async check(): Promise<HealthCheckResult> {
const client = new KafkaClient({ kafkaHost: 'localhost:9092' });
const admin = new Admin(client);
const topicExists = await new Promise<boolean>((resolve) => {
admin.listTopics((error, topicNames) => {
if (error) {
resolve(false);
} else {
const topicExists = topicNames.includes('my-kafka-topic');
resolve(topicExists);
}
});
});
await client.close();
const result = topicExists
? this.getStatus('kafka', true)
: this.getStatus('kafka', false);
return result;
}
}
In this implementation, we instantiate a KafkaClient
object and an Admin
object using the kafka-node
library. We then call the listTopics
method on the Admin
object to get a list of all topics in the Kafka cluster. We check if the topic we're interested in (my-kafka-topic
in this case) exists in the list of topics. If it does, we return a HealthCheckResult
object with a status
of true
. Otherwise, we return a HealthCheckResult
object with a status
of false
. Finally, we close the KafkaClient
object.
To use this Kafka health indicator in a NestJS health check, you can add it to the HealthCheckService
provider in your module:
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthCheckService } from './health-check.service';
import { KafkaHealthIndicator } from './kafka-health.indicator';
@Module({
imports: [TerminusModule],
providers: [HealthCheckService, KafkaHealthIndicator],
})
export class AppModule {}
You can then add the Kafka health check to the HealthCheckService
:
import { Injectable } from '@nestjs/common';
import { TerminusOptions, HealthCheckService } from '@nestjs/terminus';
import { KafkaHealthIndicator } from './kafka-health.indicator';
@Injectable()
export class HealthCheckProvider {
constructor(
private readonly healthCheckService: HealthCheckService,
private readonly kafkaHealthIndicator: KafkaHealthIndicator,
) {}
getTerminusOptions(): TerminusOptions {
return {
endpoints: [
{
url: '/health',
healthIndicators: [
async () => this.healthCheckService.pingCheck('ping'),
async () => this.kafkaHealthIndicator.check(),
],
},
],
};
}
}
In this example, we've added the Kafka health check to the array of healthIndicators
in the TerminusOptions
object. When a health check is performed against the /health
URL, the Kafka health check will be executed and its result will be included in the overall health check result.
Please start posting anonymously - your entry will be published after you log in or create a new account. This space is reserved only for answers. If you would like to engage in a discussion, please instead post a comment under the question or an answer that you would like to discuss
Asked: 2022-03-05 11:00:00 +0000
Seen: 16 times
Last updated: Nov 03 '21