@@ -8,11 +8,11 @@ namespace Downloader
8
8
{
9
9
public class ConcurrentStream : IDisposable
10
10
{
11
- private readonly SemaphoreSlim _queueCheckerSemaphore = new SemaphoreSlim ( 0 ) ;
11
+ private readonly SemaphoreSlim _queueConsumerLocker = new SemaphoreSlim ( 0 ) ;
12
12
private readonly ManualResetEventSlim _completionEvent = new ManualResetEventSlim ( true ) ;
13
+ private readonly ManualResetEventSlim _stopWriteNewPacketEvent = new ManualResetEventSlim ( true ) ;
13
14
private readonly ConcurrentBag < Packet > _inputBag = new ConcurrentBag < Packet > ( ) ;
14
- private int ? _resourceReleaseThreshold ;
15
- private long _packetCounter = 0 ;
15
+ private long _maxMemoryBufferBytes = 0 ;
16
16
private bool _disposed ;
17
17
private Stream _stream ;
18
18
private string _path ;
@@ -29,7 +29,7 @@ public string Path
29
29
}
30
30
}
31
31
}
32
-
32
+
33
33
public byte [ ] Data
34
34
{
35
35
get
@@ -49,24 +49,35 @@ public byte[] Data
49
49
50
50
public long Length => _stream ? . Length ?? 0 ;
51
51
52
- public ConcurrentStream ( Stream stream )
52
+ public long MaxMemoryBufferBytes
53
+ {
54
+ get => _maxMemoryBufferBytes ;
55
+ set
56
+ {
57
+ _maxMemoryBufferBytes = ( value <= 0 ) ? long . MaxValue : value ;
58
+ }
59
+ }
60
+
61
+ public ConcurrentStream ( Stream stream , long maxMemoryBufferBytes = 0 )
53
62
{
54
63
_stream = stream ;
64
+ MaxMemoryBufferBytes = maxMemoryBufferBytes ;
55
65
Initial ( ) ;
56
66
}
57
67
58
- public ConcurrentStream ( string filename , long initSize )
68
+ public ConcurrentStream ( string filename , long initSize , long maxMemoryBufferBytes = 0 )
59
69
{
60
70
_path = filename ;
61
71
_stream = new FileStream ( filename , FileMode . OpenOrCreate , FileAccess . ReadWrite , FileShare . Read ) ;
72
+ MaxMemoryBufferBytes = maxMemoryBufferBytes ;
62
73
63
74
if ( initSize > 0 )
64
75
_stream . SetLength ( initSize ) ;
65
76
66
77
Initial ( ) ;
67
78
}
68
79
69
- public ConcurrentStream ( )
80
+ public ConcurrentStream ( ) // parameterless constructor for deserialization
70
81
{
71
82
_stream = new MemoryStream ( ) ;
72
83
Initial ( ) ;
@@ -88,44 +99,46 @@ public Stream OpenRead()
88
99
89
100
public void WriteAsync ( long position , byte [ ] bytes , int length )
90
101
{
102
+ _stopWriteNewPacketEvent . Wait ( ) ;
91
103
_inputBag . Add ( new Packet ( position , bytes , length ) ) ;
92
104
_completionEvent . Reset ( ) ;
93
- _queueCheckerSemaphore . Release ( ) ;
105
+ _queueConsumerLocker . Release ( ) ;
106
+ ReleaseQueue ( length ) ;
94
107
}
95
108
96
109
private async Task Watcher ( )
97
110
{
98
111
while ( ! _disposed )
99
112
{
100
- await _queueCheckerSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
113
+ await _queueConsumerLocker . WaitAsync ( ) . ConfigureAwait ( false ) ;
101
114
if ( _inputBag . TryTake ( out var packet ) )
102
115
{
103
116
await WritePacket ( packet ) . ConfigureAwait ( false ) ;
104
- ReleasePackets ( packet . Data . Length ) ;
105
117
packet . Dispose ( ) ;
106
118
}
107
119
}
108
120
}
121
+
109
122
private async Task WritePacket ( Packet packet )
110
123
{
111
124
if ( _stream . CanSeek )
112
125
{
113
126
_stream . Position = packet . Position ;
114
127
await _stream . WriteAsync ( packet . Data , 0 , packet . Length ) . ConfigureAwait ( false ) ;
115
- _packetCounter ++ ;
116
128
}
117
129
118
130
if ( _inputBag . IsEmpty )
119
131
_completionEvent . Set ( ) ;
120
132
}
121
133
122
- private void ReleasePackets ( int packetSize )
134
+ private void ReleaseQueue ( int packetSize )
123
135
{
124
- _resourceReleaseThreshold ??= 1024 * 1024 * 50 / packetSize ; // 50MB / a packet size
125
-
126
136
// Clean up RAM every _resourceReleaseThreshold packet
127
- if ( _packetCounter % _resourceReleaseThreshold == 0 )
128
- GC . Collect ( ) ;
137
+ if ( MaxMemoryBufferBytes < packetSize * _inputBag . Count )
138
+ {
139
+ _stopWriteNewPacketEvent . Set ( ) ;
140
+ Flush ( ) ;
141
+ }
129
142
}
130
143
131
144
public void Flush ( )
@@ -141,7 +154,7 @@ public void Dispose()
141
154
{
142
155
Flush ( ) ;
143
156
_disposed = true ;
144
- _queueCheckerSemaphore . Dispose ( ) ;
157
+ _queueConsumerLocker . Dispose ( ) ;
145
158
_stream . Dispose ( ) ;
146
159
}
147
160
}
0 commit comments