Skip to content

Commit 0e37c12

Browse files
twitchardclaude
andauthored
Add collate and SilenceFiller helpers (#525)
Co-authored-by: Claude <[email protected]>
1 parent ecafb3b commit 0e37c12

File tree

5 files changed

+542
-0
lines changed

5 files changed

+542
-0
lines changed

src/wrapper/SilenceFiller.ts

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import { Readable } from "stream";
2+
3+
/**
4+
* SilenceFiller is a Readable stream that intersperses incoming audio data
5+
* with bytes of silence. This is important in some cases to keep an audio
6+
* stream "alive". Audio players, such as ffmpeg, can interpret inactivity as
7+
* meaning the stream is ended, or disconnected.
8+
*
9+
* @example
10+
* ```typescript
11+
* import { SilenceFiller } from 'hume';
12+
*
13+
* const BYTES_PER_SAMPLE = 2; // 16-bit samples
14+
* const SAMPLE_RATE = 48000;
15+
* const BUFFER_SIZE = Math.floor(SAMPLE_RATE * 0.1 * BYTES_PER_SAMPLE); // 100ms buffer
16+
* const silenceFiller = new SilenceFiller(BUFFER_SIZE, SAMPLE_RATE, BYTES_PER_SAMPLE, 10);
17+
*
18+
* // Pipe silence filler output to audio player stdin
19+
* silenceFiller.pipe(audioPlayer.stdin);
20+
*
21+
* // Handle pipe errors
22+
* silenceFiller.on('error', (err) => {
23+
* console.error("SilenceFiller error:", err);
24+
* });
25+
*
26+
* // Write audio data as it arrives
27+
* silenceFiller.writeAudio(audioBuffer);
28+
*
29+
* // End the stream when done
30+
* await silenceFiller.endStream();
31+
* ```
32+
*/
33+
export class SilenceFiller extends Readable {
34+
private unclockedSilenceFiller: UnclockedSilenceFiller;
35+
private isStarted: boolean = false;
36+
private pushInterval: NodeJS.Timeout | null = null;
37+
private bytesPerSample: number;
38+
private pushIntervalMs: number;
39+
40+
/**
41+
* Creates a new SilenceFiller instance.
42+
*
43+
* @param pushIntervalMs - The interval in milliseconds for pushing audio data (default: 5ms).
44+
* @param sampleRate - The sample rate of the audio (e.g., 48000).
45+
* @param bytesPerSample - The number of bytes per audio sample (e.g., 2 for 16-bit).
46+
* @param bufferSize - How much to 'prebuffer'. If you set this too low there
47+
* is a chance that playback will stutter, but if you set it too high
48+
* playback will take longer to start.
49+
*/
50+
constructor(
51+
pushIntervalMs: number = 5,
52+
sampleRate: number = 48000,
53+
bytesPerSample: number = 2,
54+
bufferSize: number = 9600,
55+
) {
56+
super({ objectMode: false });
57+
this.unclockedSilenceFiller = new UnclockedSilenceFiller(bufferSize, sampleRate, bytesPerSample);
58+
this.bytesPerSample = bytesPerSample;
59+
this.pushIntervalMs = pushIntervalMs;
60+
}
61+
62+
/**
63+
* Writes audio data to the silence filler.
64+
*
65+
* @param audioBuffer - The audio buffer to write.
66+
*/
67+
writeAudio(audioBuffer: Buffer): void {
68+
const now = Date.now();
69+
try {
70+
this.unclockedSilenceFiller.writeAudio(audioBuffer, now);
71+
if (!this.isStarted && this.unclockedSilenceFiller.donePrebuffering) {
72+
this.isStarted = true;
73+
this.startPushInterval();
74+
}
75+
} catch (error) {
76+
console.error(`[SilenceFiller] Error writing audio:`, error);
77+
this.emit("error", error);
78+
}
79+
}
80+
81+
private startPushInterval(): void {
82+
this.pushInterval = setInterval(() => {
83+
this.pushData();
84+
}, this.pushIntervalMs);
85+
}
86+
87+
private pushData(): void {
88+
if (!this.isStarted) return;
89+
90+
try {
91+
const now = Date.now();
92+
const audioChunk = this.unclockedSilenceFiller.readAudio(now);
93+
94+
if (audioChunk && audioChunk.length > 0) {
95+
// Ensure chunk size is aligned to bytesPerSample
96+
const alignedChunkSize = Math.floor(audioChunk.length / this.bytesPerSample) * this.bytesPerSample;
97+
98+
if (alignedChunkSize > 0) {
99+
const chunk = audioChunk.subarray(0, alignedChunkSize);
100+
this.push(chunk);
101+
}
102+
}
103+
} catch (error) {
104+
console.error(`[SilenceFiller] Error pushing data:`, error);
105+
this.emit("error", error);
106+
}
107+
}
108+
109+
_read(): void {}
110+
111+
_destroy(error: Error | null, callback: (error?: Error | null) => void): void {
112+
super._destroy(error, callback);
113+
}
114+
115+
/**
116+
* Ends the stream and drains all remaining audio data.
117+
*
118+
* @returns A promise that resolves when the stream has ended.
119+
*/
120+
endStream(): Promise<void> {
121+
return new Promise((resolve) => {
122+
// Stop pushing data
123+
if (this.pushInterval) {
124+
clearInterval(this.pushInterval);
125+
this.pushInterval = null;
126+
}
127+
128+
// Drain all remaining audio from SilenceFiller
129+
const now = Date.now();
130+
131+
// Keep reading until no more audio is available
132+
while (true) {
133+
const remainingChunk = this.unclockedSilenceFiller.readAudio(now);
134+
135+
if (!remainingChunk || remainingChunk.length === 0) {
136+
break;
137+
}
138+
139+
const alignedChunkSize = Math.floor(remainingChunk.length / this.bytesPerSample) * this.bytesPerSample;
140+
if (alignedChunkSize > 0) {
141+
const chunk = remainingChunk.subarray(0, alignedChunkSize);
142+
this.push(chunk);
143+
}
144+
}
145+
146+
this.push(null); // Signal end of stream
147+
148+
this.once("end", () => {
149+
resolve();
150+
});
151+
});
152+
}
153+
}
154+
155+
/**
156+
* Does the actual calculation of how interspersing audio with silence
157+
* is "pure" in the sense that it does not rely on the system clock.
158+
* It's up to the caller to provide timestamps.
159+
*
160+
* @internal
161+
*/
162+
export class UnclockedSilenceFiller {
163+
private audioQueue: Buffer[] = [];
164+
private totalBufferedBytes: number = 0;
165+
private startTimestamp: number | null = null;
166+
private totalBytesSent: number = 0;
167+
public donePrebuffering: boolean = false;
168+
private bufferSize: number;
169+
private sampleRate: number;
170+
private bytesPerSample: number;
171+
172+
constructor(bufferSize: number, sampleRate: number, bytesPerSample: number) {
173+
this.bufferSize = bufferSize;
174+
this.sampleRate = sampleRate;
175+
this.bytesPerSample = bytesPerSample;
176+
}
177+
178+
writeAudio(audioBuffer: Buffer, timestamp: number): void {
179+
this.audioQueue.push(audioBuffer);
180+
this.totalBufferedBytes += audioBuffer.length;
181+
182+
if (this.startTimestamp === null) {
183+
this.startTimestamp = timestamp;
184+
}
185+
186+
if (!this.donePrebuffering && this.totalBufferedBytes >= this.bufferSize) {
187+
this.donePrebuffering = true;
188+
}
189+
}
190+
191+
readAudio(timestamp: number): Buffer | null {
192+
if (this.startTimestamp === null || !this.donePrebuffering) {
193+
return null;
194+
}
195+
196+
const elapsedMs = timestamp - this.startTimestamp;
197+
198+
const targetBytesSent = Math.floor(((this.sampleRate * elapsedMs) / 1000) * this.bytesPerSample);
199+
200+
const bytesNeeded = targetBytesSent - this.totalBytesSent;
201+
202+
if (bytesNeeded <= 0) {
203+
return null;
204+
}
205+
206+
// Ensure bytesNeeded is a multiple of bytesPerSample
207+
const alignedBytesNeeded = Math.floor(bytesNeeded / this.bytesPerSample) * this.bytesPerSample;
208+
209+
if (alignedBytesNeeded <= 0) {
210+
return null;
211+
}
212+
213+
let chunk = Buffer.alloc(0);
214+
215+
// Drain from queue until we have enough bytes
216+
while (chunk.length < alignedBytesNeeded && this.audioQueue.length > 0) {
217+
const nextBuffer = this.audioQueue.shift()!;
218+
chunk = Buffer.concat([chunk, nextBuffer]);
219+
this.totalBufferedBytes -= nextBuffer.length;
220+
}
221+
222+
// If we have more than needed, put the excess back
223+
if (chunk.length > alignedBytesNeeded) {
224+
const excess = chunk.subarray(alignedBytesNeeded);
225+
this.audioQueue.unshift(excess);
226+
this.totalBufferedBytes += excess.length;
227+
chunk = chunk.subarray(0, alignedBytesNeeded);
228+
}
229+
230+
// Fill remaining with silence if needed
231+
if (chunk.length < alignedBytesNeeded) {
232+
const silenceNeeded = Buffer.alloc(alignedBytesNeeded - chunk.length, 0);
233+
chunk = Buffer.concat([chunk, silenceNeeded]);
234+
}
235+
236+
// Update total bytes sent
237+
this.totalBytesSent += chunk.length;
238+
239+
return chunk;
240+
}
241+
}

src/wrapper/collate.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Takes an async iterator that yields interleaved items from different groups
3+
* and produces an iterator that yields items in group order.
4+
*
5+
* Example:
6+
* Input: A1, B1, A2, A3 (final), C1, C2, C3 (final), B2 (final)
7+
* Output: A1, A2, A3, B1, B2, C1, C2, C3
8+
*
9+
* This is useful when using synthesizeJsonStreaming with num_generations > 1
10+
*
11+
* @example
12+
* ```typescript
13+
*
14+
* import { collate } from 'hume';
15+
*
16+
* const stream = hume.synthesizeJsonStreaming({
17+
* ...
18+
* })
19+
*
20+
* const contiguous = collate(
21+
* stream
22+
* (chunk) => chunk.generationId,
23+
* (chunk) => chunk.isLastChunk
24+
* );
25+
*
26+
* for await (const item of contiguous) {
27+
* audioPlayer.write(item.audio)
28+
* }
29+
* ```
30+
*
31+
* @param source - Async iterable that yields interleaved items.
32+
* @param groupBy - Function to determine a "key" that determines the group identity for each item.
33+
* @param isFinal - Function to determine if an item is the final item in its group.
34+
* @returns An async iterable that yields items in group order.
35+
*/
36+
export async function* collate<TItem, TKey>(
37+
source: AsyncIterable<TItem>,
38+
groupBy: (x: TItem) => TKey,
39+
isFinal: (x: TItem) => boolean,
40+
): AsyncIterable<TItem> {
41+
const buffers = new Map<TKey, TItem[]>();
42+
const order: TKey[] = [];
43+
let current: TKey | undefined;
44+
45+
const ensure = (k: TKey) => {
46+
if (!buffers.has(k)) {
47+
buffers.set(k, []);
48+
order.push(k);
49+
}
50+
};
51+
52+
const flushGroup = function* (k: TKey) {
53+
const buf = buffers.get(k);
54+
if (!buf) return;
55+
for (const item of buf) yield item;
56+
buffers.delete(k);
57+
};
58+
59+
const nextGroup = (): TKey | undefined => {
60+
// pop the next group in first-seen order that still has a buffer
61+
while (order.length && !buffers.has(order[0])) order.shift();
62+
return order.shift();
63+
};
64+
65+
for await (const item of source) {
66+
const k = groupBy(item);
67+
68+
if (current === undefined) current = k;
69+
ensure(k);
70+
buffers.get(k)!.push(item);
71+
72+
// if we just saw the final item for the current group, flush it and advance
73+
if (k === current && isFinal(item)) {
74+
yield* flushGroup(current);
75+
current = nextGroup();
76+
}
77+
}
78+
79+
// stream ended; flush remaining groups in first-seen order
80+
if (current !== undefined) {
81+
if (buffers.has(current)) yield* flushGroup(current);
82+
while (true) {
83+
const k = nextGroup();
84+
if (k === undefined) break;
85+
yield* flushGroup(k);
86+
}
87+
}
88+
}

src/wrapper/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ export { getAudioStream } from "./getAudioStream";
99
export { MimeType, getBrowserSupportedMimeType } from "./getBrowserSupportedMimeType";
1010
export { HumeClient } from "./HumeClient";
1111
export { EVIWebAudioPlayer, EVIWebAudioPlayerFFTOptions, EVIWebAudioPlayerOptions } from "./EVIWebAudioPlayer";
12+
export { collate } from "./collate";
13+
export { SilenceFiller } from "./SilenceFiller";

0 commit comments

Comments
 (0)