@@ -98,8 +98,19 @@ void file_writer::finalize()
98
98
}
99
99
// whatever is left in our current buffer, move to work list
100
100
chunk_mutex.lock ();
101
+ printf (" Filewriter finalizing thread %u: %lu total bytes, %lu in last chunk, %d uncompressed chunks, and %d compressed chunks to be written out\n " ,
102
+ mTid , (unsigned long )uncompressed_bytes, (unsigned long )uidx, (int )uncompressed_chunks.size (), (int )compressed_chunks.size ());
101
103
chunk.shrink (uidx);
104
+ #ifndef MULTITHREADED_COMPRESS
105
+ chunk = compress_chunk (chunk);
106
+ #ifdef MULTITHREADED_WRITE
107
+ compressed_chunks.push_front (chunk);
108
+ #else
109
+ write_chunk (chunk);
110
+ #endif
111
+ #else
102
112
uncompressed_chunks.push_front (chunk);
113
+ #endif
103
114
chunk = buffer (uncompressed_chunk_size); // ready to go again
104
115
chunk_mutex.unlock ();
105
116
// wrap up work in work lists
@@ -127,10 +138,31 @@ file_writer::~file_writer()
127
138
chunk.release ();
128
139
}
129
140
141
+ void file_writer::write_chunk (buffer& active)
142
+ {
143
+ size_t written = 0 ;
144
+ size_t size = active.size ();
145
+ char * ptr = active.data ();
146
+ int err = 0 ;
147
+ do {
148
+ written = fwrite (ptr, 1 , size, fp);
149
+ ptr += written;
150
+ size -= written;
151
+ err = ferror (fp);
152
+ } while (size > 0 && (err == EAGAIN || err == EWOULDBLOCK || err == EINTR));
153
+ if (size > 0 )
154
+ {
155
+ ELOG (" Failed to write out file (%u bytes left): %s" , (unsigned )size, strerror (ferror (fp)));
156
+ }
157
+ DLOG3 (" Filewriter thread %d wrote out compressed buffer of %lu size\n " , mTid , (unsigned long )active.size ());
158
+ active.release ();
159
+ }
160
+
130
161
void file_writer::serializer ()
131
162
{
132
163
// lock, steal compressed buffer, unlock, store to disk, sleep, repeat
133
164
set_thread_name (" serializer" );
165
+ #ifdef MULTITHREADED_WRITE
134
166
while (1 )
135
167
{
136
168
buffer active;
@@ -151,36 +183,42 @@ void file_writer::serializer()
151
183
// save compressed buffer
152
184
if (active.size () > 0 )
153
185
{
154
- size_t written = 0 ;
155
- size_t size = active.size ();
156
- char * ptr = active.data ();
157
- int err = 0 ;
158
- DLOG3 (" \t hunk=%u" , (unsigned )size);
159
- do {
160
- written = fwrite (ptr, 1 , size, fp);
161
- DLOG3 (" \t\t written=%u / %u" , (unsigned )written, (unsigned )size);
162
- ptr += written;
163
- size -= written;
164
- err = ferror (fp);
165
- } while (size > 0 && (err == EAGAIN || err == EWOULDBLOCK || err == EINTR));
166
- if (size > 0 )
167
- {
168
- ELOG (" Failed to write out file (%u bytes left): %s" , (unsigned )size, strerror (ferror (fp)));
169
- }
170
- active.release ();
186
+ write_chunk (active);
171
187
}
172
188
// if not done and no work done, wait a bit
173
189
else if (!done_compressing)
174
190
{
175
- usleep (10000 );
191
+ usleep (2000 );
176
192
}
177
193
}
194
+ #endif
195
+ }
196
+
197
+ buffer file_writer::compress_chunk (buffer& uncompressed)
198
+ {
199
+ const uint64_t header_size = sizeof (uint64_t ) * 2 ;
200
+ uint64_t compressed_size = density_compress_safe_size (uncompressed.size ()) + header_size;
201
+ buffer compressed (compressed_size);
202
+ density_processing_result result = density_compress ((const uint8_t *)uncompressed.data (), uncompressed.size (),
203
+ (uint8_t *)compressed.data () + header_size, compressed.size (),
204
+ DENSITY_ALGORITHM_CHEETAH);
205
+ uncompressed.release ();
206
+ if (result.state != DENSITY_STATE_OK)
207
+ {
208
+ ABORT (" Failed to compress buffer - aborting from compression thread" );
209
+ }
210
+ uint64_t header[2 ] = { result.bytesWritten , result.bytesRead }; // store compressed and uncompressed sizes
211
+ memcpy (compressed.data (), header, header_size); // use memcpy to avoid aliasing issues
212
+ compressed.shrink (result.bytesWritten + header_size);
213
+ DLOG3 (" Filewriter thread %d handing over compressed buffer of %lu bytes, was %lu bytes uncompressed" , mTid , (unsigned long )(result.bytesWritten + header_size), (unsigned long )result.bytesRead );
214
+ return compressed;
178
215
}
179
216
180
217
void file_writer::compressor ()
181
218
{
182
219
// lock, grab pointer to uncompressed, make new compressed, unlock, compress, sleep, repeat
183
220
set_thread_name (" compressor" );
221
+ #ifdef MULTITHREADED_COMPRESS
184
222
while (1 )
185
223
{
186
224
buffer uncompressed;
@@ -202,29 +240,20 @@ void file_writer::compressor()
202
240
// compress it
203
241
if (uncompressed.size () > 0 )
204
242
{
205
- const uint64_t header_size = sizeof (uint64_t ) * 2 ;
206
- uint64_t compressed_size = density_compress_safe_size (uncompressed.size ()) + header_size;
207
- buffer compressed (compressed_size);
208
- density_processing_result result = density_compress ((const uint8_t *)uncompressed.data (), uncompressed.size (),
209
- (uint8_t *)compressed.data () + header_size, compressed.size (),
210
- DENSITY_ALGORITHM_CHEETAH);
211
- uncompressed.release ();
212
- if (result.state != DENSITY_STATE_OK)
213
- {
214
- ELOG (" Failed to compress buffer - aborting compression thread" );
215
- break ;
216
- }
217
- uint64_t header[2 ] = { result.bytesWritten , result.bytesRead }; // store compressed and uncompressed sizes
218
- memcpy (compressed.data (), header, header_size); // use memcpy to avoid aliasing issues
219
- compressed.shrink (result.bytesWritten + header_size);
243
+ buffer compressed = compress_chunk (uncompressed);
244
+ #ifdef MULTITHREADED_WRITE
220
245
chunk_mutex.lock ();
221
246
compressed_chunks.push_front (compressed);
222
247
chunk_mutex.unlock ();
248
+ #else
249
+ write_chunk (compressed);
250
+ #endif
223
251
}
224
252
// if not done and no work done, wait a bit
225
253
else if (!done_feeding)
226
254
{
227
- usleep (100000 );
255
+ usleep (2000 );
228
256
}
229
257
}
258
+ #endif
230
259
}
0 commit comments