diff --git a/outputs/stream/internally_consistent_output_stream_wrapper.go b/outputs/stream/internally_consistent_output_stream_wrapper.go index 21509888..41995347 100644 --- a/outputs/stream/internally_consistent_output_stream_wrapper.go +++ b/outputs/stream/internally_consistent_output_stream_wrapper.go @@ -23,7 +23,7 @@ func (node *InternallyConsistentOutputStreamWrapper) Run(ctx ExecutionContext, p afterWatermarkCount++ } } - newPending := make([]Record, afterWatermarkCount) + newPending := make([]Record, 0, afterWatermarkCount) for i := range pending { if pending[i].EventTime.After(watermark) { newPending = append(newPending, pending[i])