paint-brush
NestJS के साथ काफ्का संदेशों का उपभोग कैसे करेंद्वारा@vdolzhenko
6,992 रीडिंग
6,992 रीडिंग

NestJS के साथ काफ्का संदेशों का उपभोग कैसे करें

द्वारा Viktoria Dolzhenko7m2023/12/28
Read on Terminal Reader

बहुत लंबा; पढ़ने के लिए

काफ्का एक संदेश दलाल है जिसमें कुछ सेवाएँ संदेश उत्पन्न करती हैं और अन्य उन्हें प्राप्त करती हैं। इस लेख में हम काफ्का से संदेशों का उपभोग करने के लिए एक छोटा सा एप्लिकेशन लिखेंगे। और निश्चित रूप से e2e परीक्षण। सबसे पहले हमें एक नियंत्रक बनाना होगा जो संदेशों को संसाधित करेगा। फिर NestJs ढांचे में मुख्य सेवाओं में माइक्रोसर्विसेज चलाएँ।
featured image - NestJS के साथ काफ्का संदेशों का उपभोग कैसे करें
Viktoria Dolzhenko HackerNoon profile picture
0-item

हममें से कई लोग संदेश प्रकाशित करने के लिए काफ्का का उपयोग करते हैं, लेकिन हम उन्हें कैसे प्राप्त करते हैं? इस लेख में, हम काफ्का के संदेशों के उपभोग के लिए एक छोटा सा एप्लिकेशन लिखेंगे। और, ज़ाहिर है, e2e परीक्षण।

आइए सबसे पहले समझें कि काफ्का कैसे काम करता है और यह क्या है।

काफ्का एक संदेश दलाल है जिसमें कुछ सेवाएँ संदेश उत्पन्न करती हैं और अन्य उन्हें प्राप्त करती हैं। ब्रोकरों का उपयोग मुख्य रूप से सेवाओं के बीच संदेश भेजने के लिए माइक्रोसर्विस आर्किटेक्चर वाले सिस्टम में किया जाता है।


संदेशों को विषयों में संग्रहीत किया जाता है. संदेश भेजते समय, निर्माता विषय के नाम के साथ-साथ संदेश को भी इंगित करता है, जिसमें एक कुंजी और एक मान होता है। और बस; निर्माता का काम ख़त्म हो गया है.


फिर उपभोक्ता खेल में आते हैं, वे वांछित विषय की सदस्यता लेते हैं, और संदेश पढ़ना शुरू करते हैं। प्रत्येक एप्लिकेशन की अपनी कतार होती है, जिसे पढ़कर उपभोक्ता ऑफसेट पॉइंटर को स्थानांतरित करता है।



काफ्का की विशिष्ट विशेषताएं हैं:

  • गारंटी दें कि सभी संदेशों को ठीक उसी क्रम में क्रमबद्ध किया जाएगा जिस क्रम में वे विषय में आए थे


  • काफ्का स्टोर कुछ देर के लिए संदेश पढ़ते हैं


  • उच्च थ्रूपुट


अब, आइए NestJs ढांचे का उपयोग करके काफ्का के साथ काम करें। सबसे पहले, हमें एक नियंत्रक बनाना होगा जो संदेशों को संसाधित करेगा।


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


@EventPattern विशेषता पर ध्यान दें, जो इंगित करता है कि हमारा handleEvent() फ़ंक्शन कॉन्फ़िगरेशन फ़ाइल config.get('kafka.topics.exampleTopic') में निर्दिष्ट विषय से संदेश प्राप्त करेगा। @Payload() विशेषता विषय संदेश से मूल्य प्राप्त करने में मदद करती है।


अपने एप्लिकेशन को काफ्का दलालों से जोड़ने के लिए, आपको दो काम करने होंगे। आरंभ करने के लिए, स्टार्टअप फ़ाइल में माइक्रोसर्विस कनेक्ट करें:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


और फिर माइक्रोसर्विसेज को main.ts में चलाएँ:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


एप्लिकेशन का परीक्षण करने के लिए, मैं @testcontainers/kafka पैकेज का उपयोग करता हूं। इसकी मदद से, मैंने एक ज़ूकीपर कंटेनर और फिर एक काफ्का कंटेनर बनाया:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


कृपया ध्यान दें कि, इस फ़ाइल में, मैंने ब्रोकर पते को नव निर्मित कंटेनर में ओवरराइड कर दिया है।

परीक्षण फ़ाइल में ही, beforeAll फ़ंक्शन में, मैं एक काफ्का क्लाइंट बनाता हूँ। निर्माता के साथ, मैं एक विषय भी बनाता हूं और अपना एप्लिकेशन लॉन्च करता हूं।


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


बेशक, afterAll फ़ंक्शन में, आपको कंटेनरों को रोकने की आवश्यकता है:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


मैंने एक परीक्षण लिखा जो सत्यापित करता है कि जब किसी विषय पर कोई संदेश आता है, तो नियंत्रक से हमारा हैंडलर फ़ंक्शन आवश्यक सेवा फ़ंक्शन को कॉल करता है। ऐसा करने के लिए, मैं handleExampleEvent फ़ंक्शन के कार्यान्वयन को ओवरराइड करता हूं और इसके कॉल होने की प्रतीक्षा करता हूं।


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


बस इतना ही। यदि आप NestJs ढांचे का उपयोग करते हैं तो काफ्का के साथ काम करना अविश्वसनीय रूप से आसान है। मुझे आशा है कि मेरा अनुभव आपके काम आएगा। एक उदाहरण एप्लिकेशन https://github.com/waksund/kafka पर देखा जा सकता है