@@ -64,6 +64,7 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
64
64
// @ts -ignore
65
65
public destroy ( ) ;
66
66
public /*async*/ destroy ( ctx : Context ) {
67
+ this . logger . trace ( '%s: InternalTopicClient.destroy()' , ctx ) ;
67
68
let destroyPromise ;
68
69
if ( this . allStreams . length > 0 ) {
69
70
destroyPromise = new Promise ( ( resolve ) => {
@@ -78,13 +79,15 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
78
79
}
79
80
80
81
public async openWriteStreamWithEvents ( ctx : Context , args : InternalWriteStreamInitArgs & Pick < Ydb . Topic . StreamWriteMessage . IInitRequest , 'messageGroupId' > ) {
82
+ this . logger . trace ( '%s: InternalTopicClient.openWriteStreamWithEvents()' , ctx ) ;
81
83
if ( args . producerId === undefined || args . producerId === null ) {
82
84
const newGUID = uuid_v4 ( ) ;
83
85
args = { ...args , producerId : newGUID , messageGroupId : newGUID }
84
86
} else if ( args . messageGroupId === undefined || args . messageGroupId === null ) {
85
87
args = { ...args , messageGroupId : args . producerId } ;
86
88
}
87
- const writerStream = new InternalTopicWriteStream ( ctx , args , this , this . logger ) ;
89
+ const writerStream = new InternalTopicWriteStream ( ctx , this , this . logger ) ;
90
+ await writerStream . init ( ctx , args ) ;
88
91
writerStream . events . once ( 'end' , ( ) => {
89
92
const index = this . allStreams . findIndex ( v => v === writerStream )
90
93
if ( index >= 0 ) this . allStreams . splice ( index , 1 ) ;
@@ -95,7 +98,9 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
95
98
}
96
99
97
100
public async openReadStreamWithEvents ( ctx : Context , args : InternalReadStreamInitArgs ) {
98
- const readStream = new InternalTopicReadStream ( ctx , args , this , this . logger ) ;
101
+ this . logger . trace ( '%s: InternalTopicClient.openReadStreamWithEvents()' , ctx ) ;
102
+ const readStream = new InternalTopicReadStream ( ctx , this , this . logger ) ;
103
+ await readStream . init ( ctx , args ) ;
99
104
readStream . events . once ( 'end' , ( ) => {
100
105
const index = this . allStreams . findIndex ( v => v === readStream )
101
106
if ( index >= 0 ) this . allStreams . splice ( index , 1 ) ;
@@ -105,31 +110,38 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
105
110
return readStream ;
106
111
}
107
112
108
- public async commitOffset ( _ctx : Context , request : InternalCommitOffsetArgs ) {
113
+ public async commitOffset ( ctx : Context , request : InternalCommitOffsetArgs ) {
114
+ this . logger . trace ( '%s: InternalTopicClient.commitOffset()' , ctx ) ;
109
115
return ( await this . api . commitOffset ( request ) ) as InternalCommitOffsetResult ;
110
116
}
111
117
112
- public async updateOffsetsInTransaction ( _ctx : Context , request : InternalUpdateOffsetsInTransactionArgs ) {
118
+ public async updateOffsetsInTransaction ( ctx : Context , request : InternalUpdateOffsetsInTransactionArgs ) {
119
+ this . logger . trace ( '%s: InternalTopicClient.updateOffsetsInTransaction()' , ctx ) ;
113
120
return ( await this . api . updateOffsetsInTransaction ( request ) ) as InternalUpdateOffsetsInTransactionResult ;
114
121
}
115
122
116
- public async createTopic ( _ctx : Context , request : InternalCreateTopicArgs ) {
123
+ public async createTopic ( ctx : Context , request : InternalCreateTopicArgs ) {
124
+ this . logger . trace ( '%s: InternalTopicClient.createTopic()' , ctx ) ;
117
125
return ( await this . api . createTopic ( request ) ) as InternalCreateTopicResult ;
118
126
}
119
127
120
- public async describeTopic ( _ctx : Context , request : InternalDescribeTopicArgs ) {
128
+ public async describeTopic ( ctx : Context , request : InternalDescribeTopicArgs ) {
129
+ this . logger . trace ( '%s: InternalTopicClient.describeTopic()' , ctx ) ;
121
130
return ( await this . api . describeTopic ( request ) ) as InternalDescribeTopicResult ;
122
131
}
123
132
124
- public async describeConsumer ( _ctx : Context , request : InternalDescribeConsumerArgs ) {
133
+ public async describeConsumer ( ctx : Context , request : InternalDescribeConsumerArgs ) {
134
+ this . logger . trace ( '%s: InternalTopicClient.describeConsumer()' , ctx ) ;
125
135
return ( await this . api . describeConsumer ( request ) ) as InternalDescribeConsumerResult ;
126
136
}
127
137
128
- public async alterTopic ( _ctx : Context , request : InternalAlterTopicArgs ) {
138
+ public async alterTopic ( ctx : Context , request : InternalAlterTopicArgs ) {
139
+ this . logger . trace ( '%s: InternalTopicClient.alterTopic()' , ctx ) ;
129
140
return ( await this . api . alterTopic ( request ) ) as InternalAlterTopicResult ;
130
141
}
131
142
132
- public async dropTopic ( _ctx : Context , request : InternalDropTopicArgs ) {
143
+ public async dropTopic ( ctx : Context , request : InternalDropTopicArgs ) {
144
+ this . logger . trace ( '%s: InternalTopicClient.dropTopic()' , ctx ) ;
133
145
return ( await this . api . dropTopic ( request ) ) as InternalDropTopicResult ;
134
146
}
135
147
}
0 commit comments