paint-brush
NestJS এর সাথে কাফকা বার্তাগুলি কীভাবে গ্রহণ করবেনদ্বারা@vdolzhenko
6,992 পড়া
6,992 পড়া

NestJS এর সাথে কাফকা বার্তাগুলি কীভাবে গ্রহণ করবেন

দ্বারা Viktoria Dolzhenko7m2023/12/28
Read on Terminal Reader

অতিদীর্ঘ; পড়তে

কাফকা একটি বার্তা ব্রোকার যেখানে কিছু পরিষেবা বার্তা তৈরি করে এবং অন্যরা সেগুলি গ্রহণ করে। এই নিবন্ধে আমরা কাফকা থেকে বার্তা গ্রহণের জন্য একটি ছোট অ্যাপ্লিকেশন লিখব। এবং অবশ্যই e2e পরীক্ষা। প্রথমে আমাদের একটি নিয়ামক তৈরি করতে হবে যা বার্তাগুলিকে প্রক্রিয়া করবে। তারপর NestJs ফ্রেমওয়ার্কের প্রধান পরিষেবাগুলিতে মাইক্রোসার্ভিসগুলি চালান৷
featured image - NestJS এর সাথে কাফকা বার্তাগুলি কীভাবে গ্রহণ করবেন
Viktoria Dolzhenko HackerNoon profile picture
0-item

আমরা অনেকেই বার্তা প্রকাশের জন্য কাফকা ব্যবহার করি, কিন্তু আমরা কীভাবে সেগুলি গ্রহণ করব? এই নিবন্ধে, আমরা কাফকার থেকে বার্তা গ্রহণের জন্য একটি ছোট অ্যাপ্লিকেশন লিখব। এবং, অবশ্যই, e2e পরীক্ষা।

আসুন প্রথমে বুঝতে পারি কাফকা কীভাবে কাজ করে এবং এটি কী।

কাফকা একটি বার্তা ব্রোকার যেখানে কিছু পরিষেবা বার্তা তৈরি করে এবং অন্যরা সেগুলি গ্রহণ করে। ব্রোকাররা প্রাথমিকভাবে একটি মাইক্রোসার্ভিস আর্কিটেকচার সহ সিস্টেমে পরিষেবাগুলির মধ্যে বার্তাগুলি পাস করার জন্য ব্যবহৃত হয়।


বার্তা বিষয় সংরক্ষণ করা হয়. একটি বার্তা পাঠানোর সময়, প্রযোজক বিষয়ের নাম নির্দেশ করে, সেইসাথে বার্তাটি নিজেই, যা একটি কী এবং একটি মান নিয়ে গঠিত। এবং এটাই; প্রযোজকের কাজ শেষ।


তারপর ভোক্তারা খেলতে আসে, তারা পছন্দসই বিষয়ের সদস্যতা নেয় এবং বার্তা পড়তে শুরু করে। প্রতিটি অ্যাপ্লিকেশানের নিজস্ব সারি রয়েছে, যা পড়ার মাধ্যমে গ্রাহক অফসেট পয়েন্টার সরান।



কাফকার স্বতন্ত্র বৈশিষ্ট্য হল:

  • গ্যারান্টি দিন যে সমস্ত বার্তা ঠিক সেই ক্রমানুসারে অর্ডার করা হবে যে ক্রমানুসারে তারা বিষয়টিতে এসেছে


  • কাফকা স্টোরে কিছুক্ষণ মেসেজ পড়ে


  • উচ্চ থ্রুপুট


এখন, NestJs ফ্রেমওয়ার্ক ব্যবহার করে কাফকার সাথে কাজ করা যাক। প্রথমত, আমাদের একটি নিয়ামক তৈরি করতে হবে যা বার্তাগুলিকে প্রক্রিয়া করবে।


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


@EventPattern অ্যাট্রিবিউটের দিকে মনোযোগ দিন, যা নির্দেশ করে যে আমাদের handleEvent() ফাংশন কনফিগারেশন ফাইল config.get('kafka.topics.exampleTopic') এ নির্দিষ্ট বিষয় থেকে বার্তা পাবে। @Payload() অ্যাট্রিবিউট টপিক মেসেজ থেকে মান পেতে সাহায্য করে।


আপনার অ্যাপ্লিকেশান কাফকা ব্রোকারদের সাথে সংযুক্ত করতে, আপনাকে দুটি জিনিস করতে হবে৷ শুরু করতে, স্টার্টআপ ফাইলে মাইক্রোসার্ভিস সংযোগ করুন:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


এবং তারপর main.ts এ মাইক্রোসার্ভিস চালান:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


অ্যাপ্লিকেশন পরীক্ষা করতে, আমি @testcontainers/kafka প্যাকেজ ব্যবহার করি। এটির সাহায্যে, আমি একটি চিড়িয়াখানার ধারক এবং তারপর একটি কাফকা ধারক তৈরি করেছি:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


অনুগ্রহ করে মনে রাখবেন, এই ফাইলে, আমি নতুন তৈরি কন্টেইনারে ব্রোকার ঠিকানাটি ওভাররাইড করেছি।

টেস্ট ফাইলেই, beforeAll ফাংশনে, আমি একটি কাফকা ক্লায়েন্ট তৈরি করি। প্রযোজকের সাথে, আমি একটি বিষয় তৈরি করি এবং আমাদের অ্যাপ্লিকেশন চালু করি।


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


অবশ্যই, afterAll ফাংশনে, আপনাকে পাত্রগুলি বন্ধ করতে হবে:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


আমি একটি পরীক্ষা লিখেছিলাম যা যাচাই করে যে যখন একটি বিষয়ে একটি বার্তা আসে, নিয়ামক থেকে আমাদের হ্যান্ডলার ফাংশন প্রয়োজনীয় পরিষেবা ফাংশন কল করে। এটি করার জন্য, আমি handleExampleEvent ফাংশনের বাস্তবায়ন ওভাররাইড করি এবং এটি কল করার জন্য অপেক্ষা করি।


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


এখানেই শেষ. আপনি যদি NestJs ফ্রেমওয়ার্ক ব্যবহার করেন তাহলে কাফকার সাথে কাজ করা অবিশ্বাস্যভাবে সহজ। আমি আশা করি আমার অভিজ্ঞতা আপনার কাজে লাগবে। একটি উদাহরণ অ্যাপ্লিকেশন https://github.com/waksund/kafka এ দেখা যেতে পারে