Skip to content

Commit

Permalink
Merge pull request #136 from kaleido-io/deactivate
Browse files Browse the repository at this point in the history
Add /deactivatepool API for deleting listeners
  • Loading branch information
peterbroadhurst authored May 24, 2023
2 parents 54fb773 + 674be9c commit 730245e
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 22 deletions.
16 changes: 16 additions & 0 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@ export class EventStreamService {
);
}

async deleteSubscriptionByName(ctx: Context, streamId: string, name: string) {
const existingSubscriptions = await this.getSubscriptions(ctx);
const sub = existingSubscriptions.find(s => s.name === name && s.stream === streamId);
if (!sub) {
this.logger.log(`No subscription found for ${name}`);
return false;
}
await lastValueFrom(
this.http.delete(
new URL(`/subscriptions/${sub.id}`, this.baseUrl).href,
this.requestOptions(ctx),
),
);
return true;
}

connect(
url: string,
topic: string,
Expand Down
2 changes: 1 addition & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { NestApplicationOptions, ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { WsAdapter } from '@nestjs/platform-ws';
Expand Down
13 changes: 12 additions & 1 deletion src/tokens/tokens.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
TokenMint,
TokenPool,
TokenPoolActivate,
TokenPoolDeactivate,
TokenPoolEvent,
TokenTransfer,
} from './tokens.interfaces';
Expand Down Expand Up @@ -70,13 +71,23 @@ export class TokensController {
@Post('activatepool')
@HttpCode(200)
@ApiOperation({
summary: 'Activate a token pool to begin receiving transfer events',
summary: 'Activate a token pool to begin receiving transfer and approval events',
})
@ApiBody({ type: TokenPoolActivate })
activatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolActivate) {
return this.service.activatePool(ctx, dto);
}

@Post('deactivatepool')
@HttpCode(204)
@ApiOperation({
summary: 'Deactivate a token pool to delete all listeners and stop receiving events',
})
@ApiBody({ type: TokenPoolDeactivate })
async deactivatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolDeactivate) {
await this.service.deactivatePool(ctx, dto);
}

@Post('checkinterface')
@HttpCode(200)
@ApiOperation({
Expand Down
10 changes: 8 additions & 2 deletions src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,15 @@ export class TokenPoolActivate {
@IsOptional()
config?: TokenPoolConfig;

@ApiProperty({ description: requestIdDescription })
@ApiProperty()
@IsOptional()
requestId?: string;
poolData?: string;
}

export class TokenPoolDeactivate {
@ApiProperty()
@IsNotEmpty()
poolLocator: string;

@ApiProperty()
@IsOptional()
Expand Down
81 changes: 63 additions & 18 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
TokenPool,
TokenPoolActivate,
TokenPoolConfig,
TokenPoolDeactivate,
TokenPoolEvent,
TokenTransfer,
TokenType,
Expand Down Expand Up @@ -316,6 +317,24 @@ export class TokensService {
}
}

private getEventAbis(poolLocator: IValidPoolLocator) {
const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer;
if (transferAbi?.name === undefined) {
throw new NotFoundException('Transfer event ABI not found');
}
const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval;
if (approvalAbi?.name === undefined) {
throw new NotFoundException('Approval event ABI not found');
}
const approvalForAllAbi =
poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll;
return {
transferAbi,
approvalAbi,
approvalForAllAbi,
};
}

async activatePool(ctx: Context, dto: TokenPoolActivate) {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
Expand All @@ -327,49 +346,39 @@ export class TokensService {
abi,
poolLocator.type === TokenType.FUNGIBLE,
);

const eventAbis = this.getEventAbis(poolLocator);
const stream = await this.getStream(ctx);
const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer;
if (transferAbi?.name === undefined) {
throw new NotFoundException('Transfer event ABI not found');
}
const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval;
if (approvalAbi?.name === undefined) {
throw new NotFoundException('Approval event ABI not found');
}
const approvalForAllAbi =
poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll;

const promises = [
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
transferAbi,
eventAbis.transferAbi,
stream.id,
packSubscriptionName(dto.poolLocator, transferAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
),
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
approvalAbi,
eventAbis.approvalAbi,
stream.id,
packSubscriptionName(dto.poolLocator, approvalAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
),
];
if (approvalForAllAbi?.name !== undefined) {
if (eventAbis.approvalForAllAbi?.name !== undefined) {
promises.push(
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
approvalForAllAbi,
eventAbis.approvalForAllAbi,
stream.id,
packSubscriptionName(dto.poolLocator, approvalForAllAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
Expand Down Expand Up @@ -397,6 +406,42 @@ export class TokensService {
return tokenPoolEvent;
}

async deactivatePool(ctx: Context, dto: TokenPoolDeactivate) {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
throw new BadRequestException('Invalid pool locator');
}

const stream = await this.getStream(ctx);
const eventAbis = this.getEventAbis(poolLocator);
const promises = [
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData),
),
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData),
),
];
if (eventAbis.approvalForAllAbi?.name !== undefined) {
promises.push(
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData),
),
);
}

const results = await Promise.all(promises);
if (results.every(deleted => !deleted)) {
throw new NotFoundException('No listeners found');
}
}

checkInterface(dto: CheckInterfaceRequest): CheckInterfaceResponse {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
Expand Down

0 comments on commit 730245e

Please sign in to comment.