Ask Your Question
0

How can I verify the existence of topics in health checks using NestJS Kafka?

asked 2022-03-05 11:00:00 +0000

pufferfish gravatar image

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
1

answered 2021-11-03 08:00:00 +0000

ladyg gravatar image

To verify the existence of topics in health checks using NestJS Kafka, you can use the Admin module provided by the kafka-node library. Here's an example implementation of a NestJS health check that verifies the existence of a Kafka topic:

import { Injectable } from '@nestjs/common';
import { HealthCheck, HealthCheckResult, HealthIndicator } from '@nestjs/terminus';
import { KafkaClient, Admin } from 'kafka-node';

@Injectable()
export class KafkaHealthIndicator extends HealthIndicator {
  async check(): Promise<HealthCheckResult> {
    const client = new KafkaClient({ kafkaHost: 'localhost:9092' });
    const admin = new Admin(client);

    const topicExists = await new Promise<boolean>((resolve) => {
      admin.listTopics((error, topicNames) => {
        if (error) {
          resolve(false);
        } else {
          const topicExists = topicNames.includes('my-kafka-topic');
          resolve(topicExists);
        }
      });
    });

    await client.close();

    const result = topicExists
      ? this.getStatus('kafka', true)
      : this.getStatus('kafka', false);

    return result;
  }
}

In this implementation, we instantiate a KafkaClient object and an Admin object using the kafka-node library. We then call the listTopics method on the Admin object to get a list of all topics in the Kafka cluster. We check if the topic we're interested in (my-kafka-topic in this case) exists in the list of topics. If it does, we return a HealthCheckResult object with a status of true. Otherwise, we return a HealthCheckResult object with a status of false. Finally, we close the KafkaClient object.

To use this Kafka health indicator in a NestJS health check, you can add it to the HealthCheckService provider in your module:

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthCheckService } from './health-check.service';
import { KafkaHealthIndicator } from './kafka-health.indicator';

@Module({
  imports: [TerminusModule],
  providers: [HealthCheckService, KafkaHealthIndicator],
})
export class AppModule {}

You can then add the Kafka health check to the HealthCheckService:

import { Injectable } from '@nestjs/common';
import { TerminusOptions, HealthCheckService } from '@nestjs/terminus';
import { KafkaHealthIndicator } from './kafka-health.indicator';

@Injectable()
export class HealthCheckProvider {
  constructor(
    private readonly healthCheckService: HealthCheckService,
    private readonly kafkaHealthIndicator: KafkaHealthIndicator,
  ) {}

  getTerminusOptions(): TerminusOptions {
    return {
      endpoints: [
        {
          url: '/health',
          healthIndicators: [
            async () => this.healthCheckService.pingCheck('ping'),
            async () => this.kafkaHealthIndicator.check(),
          ],
        },
      ],
    };
  }
}

In this example, we've added the Kafka health check to the array of healthIndicators in the TerminusOptions object. When a health check is performed against the /health URL, the Kafka health check will be executed and its result will be included in the overall health check result.

edit flag offensive delete link more

Your Answer

Please start posting anonymously - your entry will be published after you log in or create a new account. This space is reserved only for answers. If you would like to engage in a discussion, please instead post a comment under the question or an answer that you would like to discuss

Add Answer


Question Tools

Stats

Asked: 2022-03-05 11:00:00 +0000

Seen: 16 times

Last updated: Nov 03 '21