Skip to content

Commit

Permalink
Merge pull request #83 from kaleido-io/stream
Browse files Browse the repository at this point in the history
Port eventstream changes from erc1155
  • Loading branch information
peterbroadhurst authored Aug 23, 2022
2 parents eba8492 + 0026f5f commit 31f3ebf
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 42 deletions.
12 changes: 2 additions & 10 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ export class EventStreamService {
return response.data;
}

async createOrUpdateStream(topic: string): Promise<EventStream> {
async createOrUpdateStream(name: string, topic: string): Promise<EventStream> {
const streamDetails = {
name: topic,
name,
errorHandling: 'block',
batchSize: 50,
batchTimeoutMS: 500,
Expand Down Expand Up @@ -232,14 +232,6 @@ export class EventStreamService {
return response.data;
}

async deleteSubscription(subId: string) {
await lastValueFrom(
this.http.delete(`${this.baseUrl}/subscriptions/${subId}`, {
...basicAuth(this.username, this.password),
}),
);
}

async createSubscription(
instancePath: string,
eventABI: IAbiMethod,
Expand Down
39 changes: 28 additions & 11 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
} from '../websocket-events/websocket-events.base';
import {
AckMessageData,
ConnectionListener,
EventListener,
WebSocketMessageBatchData,
WebSocketMessageWithId,
Expand All @@ -42,7 +43,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
url?: string;
topic?: string;

private listeners: EventListener[] = [];
private connectListeners: ConnectionListener[] = [];
private eventListeners: EventListener[] = [];
private awaitingAck: WebSocketMessageWithId[] = [];
private currentClient: WebSocketEx | undefined;
private subscriptionNames = new Map<string, string>();
Expand All @@ -65,8 +67,14 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
super.handleConnection(client);
if (this.server.clients.size === 1) {
this.logger.log(`Initializing event stream proxy`);
this.setCurrentClient(client);
this.startListening();
Promise.all(this.connectListeners.map(l => l.onConnect()))
.then(() => {
this.setCurrentClient(client);
this.startListening();
})
.catch(err => {
this.logger.error(`Error initializing event stream proxy: ${err}`);
});
}
}

Expand Down Expand Up @@ -108,8 +116,12 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
this.currentClient = undefined;
}

addListener(listener: EventListener) {
this.listeners.push(listener);
addConnectionListener(listener: ConnectionListener) {
this.connectListeners.push(listener);
}

addEventListener(listener: EventListener) {
this.eventListeners.push(listener);
}

private async processEvents(events: Event[]) {
Expand All @@ -122,7 +134,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return;
}

for (const listener of this.listeners) {
for (const listener of this.eventListeners) {
try {
await listener.onEvent(subName, event, (msg: WebSocketMessage | undefined) => {
if (msg !== undefined) {
Expand Down Expand Up @@ -150,12 +162,17 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
if (subName !== undefined) {
return subName;
}
const sub = await this.eventstream.getSubscription(subId);
if (sub === undefined) {
return undefined;

try {
const sub = await this.eventstream.getSubscription(subId);
if (sub !== undefined) {
this.subscriptionNames.set(subId, sub.name);
return sub.name;
}
} catch (err) {
this.logger.error(`Error looking up subscription: ${err}`);
}
this.subscriptionNames.set(subId, sub.name);
return sub.name;
return undefined;
}

private setCurrentClient(client: WebSocketEx) {
Expand Down
5 changes: 4 additions & 1 deletion src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { ApiProperty } from '@nestjs/swagger';
import { WebSocketMessage } from '../websocket-events/websocket-events.base';
import { Event } from '../event-stream/event-stream.interfaces';

export interface EventProcessor {
(msg: WebSocketMessage | undefined): void;
}

export interface ConnectionListener {
onConnect: () => void | Promise<void>;
}

export interface EventListener {
onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise<void>;
}
Expand Down
10 changes: 0 additions & 10 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { version as API_VERSION } from '../package.json';
import { AppModule } from './app.module';
import { EventStreamReply } from './event-stream/event-stream.interfaces';
import { EventStreamService } from './event-stream/event-stream.service';
import { EventStreamProxyGateway } from './eventstream-proxy/eventstream-proxy.gateway';
import { RequestLoggingInterceptor } from './request-logging.interceptor';
import {
TokenApprovalEvent,
Expand Down Expand Up @@ -78,20 +77,11 @@ async function bootstrap() {
const password = config.get<string>('ETHCONNECT_PASSWORD', '');
const factoryAddress = config.get<string>('FACTORY_CONTRACT_ADDRESS', '');

const wsUrl = ethConnectUrl.replace('http', 'ws') + '/ws';

app.get(EventStreamService).configure(ethConnectUrl, username, password);
app.get(EventStreamProxyGateway).configure(wsUrl, topic);
app
.get(TokensService)
.configure(ethConnectUrl, fftmUrl, topic, shortPrefix, username, password, factoryAddress);

try {
await app.get(TokensService).migrationCheck();
} catch (err) {
this.logger.debug('Subscription checks skipped (ethconnect may not be up)');
}

if (autoInit.toLowerCase() !== 'false') {
await app.get(TokensService).init();
}
Expand Down
9 changes: 7 additions & 2 deletions src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ describe('TokensService', () => {
let service: TokensService;

const eventstream = {
getStreams: jest.fn(),
createOrUpdateStream: jest.fn(),
getOrCreateSubscription: jest.fn(),
};
eventstream.getStreams.mockReturnValue([]);

const mockPoolQuery = (
withData: boolean | undefined,
Expand Down Expand Up @@ -202,11 +204,14 @@ describe('TokensService', () => {
},
{
provide: EventStreamService,
useValue: { addListener: jest.fn() },
useValue: eventstream,
},
{
provide: EventStreamProxyGateway,
useValue: { addListener: jest.fn() },
useValue: {
addConnectionListener: jest.fn(),
addEventListener: jest.fn(),
},
},
],
})
Expand Down
27 changes: 19 additions & 8 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,14 @@ export class TokensService {
this.username = username;
this.password = password;
this.factoryAddress = factoryAddress.toLowerCase();
this.proxy.addListener(new TokenListener(this));
this.proxy.addConnectionListener(this);
this.proxy.addEventListener(new TokenListener(this));
}

async onConnect() {
const wsUrl = this.baseUrl.replace('http', 'ws') + '/ws';
const stream = await this.getStream();
this.proxy.configure(wsUrl, stream.name);
}

private getMethodAbi(
Expand Down Expand Up @@ -304,9 +311,13 @@ export class TokensService {
}

private async getStream() {
if (this.stream === undefined) {
this.stream = await this.eventstream.createOrUpdateStream(this.topic);
const stream = this.stream;
if (stream !== undefined) {
return stream;
}
await this.migrationCheck();
this.logger.log('Creating stream with name ' + this.topic);
this.stream = await this.eventstream.createOrUpdateStream(this.topic, this.topic);
return this.stream;
}

Expand Down Expand Up @@ -838,10 +849,10 @@ export class TokensService {
class TokenListener implements EventListener {
private readonly logger = new Logger(TokenListener.name);

constructor(private readonly service: TokensService) { }
constructor(private readonly service: TokensService) {}

async onEvent(subName: string, event: Event, process: EventProcessor) {
let signature = this.trimEventSignature(event.signature)
const signature = this.trimEventSignature(event.signature);
switch (signature) {
case tokenCreateEventSignature:
process(await this.transformTokenPoolCreationEvent(subName, event));
Expand Down Expand Up @@ -897,11 +908,11 @@ class TokenListener implements EventListener {
}

private trimEventSignature(signature: string) {
let firstColon = signature.indexOf(":")
const firstColon = signature.indexOf(':');
if (firstColon > 0) {
return signature.substring(firstColon + 1)
return signature.substring(firstColon + 1);
}
return signature
return signature;
}

private async transformTokenPoolCreationEvent(
Expand Down
5 changes: 5 additions & 0 deletions test/app.e2e-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export class TestContext {
this.receiptHandler = handleReceipt;
},

getStreams: jest.fn(),
createOrUpdateStream: jest.fn(),
getSubscription: jest.fn(),
};

Expand All @@ -46,6 +48,9 @@ export class TestContext {
get: jest.fn(),
post: jest.fn(),
};
this.eventstream.getStreams.mockReset().mockReturnValue([]);
this.eventstream.createOrUpdateStream.mockReset().mockReturnValue({ name: TOPIC });

this.eventstream.getSubscription.mockReset();

const moduleFixture: TestingModule = await Test.createTestingModule({
Expand Down

0 comments on commit 31f3ebf

Please sign in to comment.