Skip to content

Commit

Permalink
feat(kafka): Starts work on implementing client/server
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaufmaner committed Jun 20, 2019
1 parent 56071d0 commit b5ed5dd
Show file tree
Hide file tree
Showing 9 changed files with 944 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ node_modules/
/.idea
/.awcache
/.vscode
*.code-workspace

# bundle
packages/**/*.d.ts
Expand Down
13 changes: 13 additions & 0 deletions integration/microservices/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration/microservices/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"amqp-connection-manager": "2.3.2",
"class-transformer": "0.2.3",
"class-validator": "0.9.1",
"kafkajs": "^1.8.0",
"reflect-metadata": "0.1.13",
"rxjs": "6.5.2",
"typescript": "3.5.1"
Expand Down
187 changes: 187 additions & 0 deletions packages/microservices/client/client-kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { EventEmitter } from 'events';
import { fromEvent, merge, Observable } from 'rxjs';
import { first, map, share, switchMap } from 'rxjs/operators';
import { ReadPacket, RmqOptions } from '../interfaces';
import {
DISCONNECT_EVENT,
ERROR_EVENT,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
} from './../constants';
import { WritePacket } from './../interfaces';
import { ClientProxy } from './client-proxy';

let rqmPackage: any = {};

const REPLY_QUEUE = 'amq.rabbitmq.reply-to';

export class ClientKafka extends ClientProxy {
protected readonly logger = new Logger(ClientProxy.name);
protected connection: Promise<any>;
protected client: any = null;
protected channel: any = null;
protected urls: string[];
protected queue: string;
protected queueOptions: any;
protected responseEmitter: EventEmitter;

constructor(protected readonly options: RmqOptions['options']) {
super();
this. = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
this.groupId =
this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;

loadPackage('amqplib', ClientKafka.name, () => require('amqplib'));
rqmPackage = loadPackage('amqp-connection-manager', ClientKafka.name, () =>
require('amqp-connection-manager'),
);
}

public close(): void {
this.channel && this.channel.close();
this.client && this.client.close();
}

public consumeChannel() {
const noAck =
this.getOptionsProp(this.options, 'noAck') || RQM_DEFAULT_NOACK;
this.channel.addSetup((channel: any) =>
channel.consume(
REPLY_QUEUE,
(msg: any) =>
this.responseEmitter.emit(msg.properties.correlationId, msg),
{
noAck,
},
),
);
}

public connect(): Promise<any> {
if (this.client) {
return this.connection;
}
this.client = this.createClient();
this.handleError(this.client);

const connect$ = this.connect$(this.client);
this.connection = this.mergeDisconnectEvent(this.client, connect$)
.pipe(
switchMap(() => this.createChannel()),
share(),
)
.toPromise();
return this.connection;
}

public createChannel(): Promise<void> {
return new Promise(resolve => {
this.channel = this.client.createChannel({
json: false,
setup: (channel: any) => this.setupChannel(channel, resolve),
});
});
}

public createClient<T = any>(): T {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rqmPackage.connect(this.urls, socketOptions) as T;
}

public mergeDisconnectEvent<T = any>(
instance: any,
source$: Observable<T>,
): Observable<T> {
const close$ = fromEvent(instance, DISCONNECT_EVENT).pipe(
map((err: any) => {
throw err;
}),
);
return merge(source$, close$).pipe(first());
}

public async setupChannel(channel: any, resolve: Function) {
const prefetchCount =
this.getOptionsProp(this.options, 'prefetchCount') ||
RQM_DEFAULT_PREFETCH_COUNT;
const isGlobalPrefetchCount =
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);

this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
this.consumeChannel();
resolve();
}

public handleError(client: any): void {
client.addListener(ERROR_EVENT, (err: any) => this.logger.error(err));
}

public handleMessage(
packet: WritePacket,
callback: (packet: WritePacket) => any,
) {
const { err, response, isDisposed } = packet;
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
}
callback({
err,
response,
});
}

protected publish(
message: ReadPacket,
callback: (packet: WritePacket) => any,
): Function {
try {
const correlationId = randomStringGenerator();
const listener = ({ content }: { content: any }) =>
this.handleMessage(JSON.parse(content.toString()), callback);

Object.assign(message, { id: correlationId });
this.responseEmitter.on(correlationId, listener);
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(message)),
{
replyTo: REPLY_QUEUE,
correlationId,
},
);
return () => this.responseEmitter.removeListener(correlationId, listener);
} catch (err) {
callback({ err });
}
}

protected dispatchEvent(packet: ReadPacket): Promise<any> {
return new Promise((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(packet)),
{},
err => (err ? reject(err) : resolve()),
),
);
}
}
6 changes: 6 additions & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
export const GRPC_DEFAULT_URL = 'localhost:5000';
export const RQM_DEFAULT_URL = 'amqp://localhost';
export const KAFKA_DEFAULT_BROKER = 'localhost:9092';

export const CONNECT_EVENT = 'connect';
export const DISCONNECT_EVENT = 'disconnect';
export const DISCONNECTED_EVENT = 'disconnected';
export const MESSAGE_EVENT = 'message';
export const DATA_EVENT = 'data';
export const ERROR_EVENT = 'error';
export const CLOSE_EVENT = 'close';
export const SUBSCRIBE = 'subscribe';
export const CANCEL_EVENT = 'cancelled';
export const READY_EVENT = 'ready';

export const PATTERN_METADATA = 'microservices:pattern';
export const CLIENT_CONFIGURATION_METADATA = 'microservices:client';
Expand All @@ -32,3 +35,6 @@ export const NO_EVENT_HANDLER = `There is no matching event handler defined in t
export const DISCONNECTED_RMQ_MESSAGE = `Disconnected from RMQ. Trying to reconnect.`;
export const GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
export const GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH = 4 * 1024 * 1024;

export const KAFKA_DEFAULT_CLIENT = 'nestjs';
export const KAFKA_DEFAULT_GROUP = 'nestjs';
1 change: 1 addition & 0 deletions packages/microservices/enums/transport.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export enum Transport {
MQTT,
GRPC,
RMQ,
KAFKA
}
Loading

0 comments on commit b5ed5dd

Please sign in to comment.