@@ -337,4 +337,80 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
337
337
assertTrue (event .data .keySet ().contains ("key0" ));
338
338
assertTrue (event .data .values ().contains ("v3" ));
339
339
}
340
+
341
+ @ Test
342
+ public void testReconnectAfterBufferFull () throws Exception {
343
+ final CountDownLatch bufferFull = new CountDownLatch (1 );
344
+
345
+ // start mock fluentd
346
+ int port = MockFluentd .randomPort (); // Use a random port available
347
+ final List <Event > elist = new ArrayList <Event >();
348
+ final MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
349
+ public void process (MessagePack msgpack , Socket socket ) throws IOException {
350
+ try {
351
+ BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
352
+ Unpacker unpacker = msgpack .createUnpacker (in );
353
+ while (true ) {
354
+ Event e = unpacker .read (Event .class );
355
+ elist .add (e );
356
+ }
357
+ } catch (EOFException e ) {
358
+ // ignore
359
+ } finally {
360
+ socket .close ();
361
+ }
362
+ }
363
+ });
364
+
365
+ ExecutorService executor = Executors .newSingleThreadExecutor ();
366
+ executor .execute (new Runnable () {
367
+ @ Override
368
+ public void run () {
369
+ try {
370
+ bufferFull .await (20 , TimeUnit .SECONDS );
371
+ fluentd .start ();
372
+ } catch (InterruptedException e ) {
373
+ e .printStackTrace ();
374
+ }
375
+ }
376
+ });
377
+
378
+ // start asyncSenders
379
+ Sender asyncSender = new AsyncRawSocketSender ("localhost" , port );
380
+ String tag = "tag" ;
381
+ int i ;
382
+ for (i = 0 ; i < 1000000 ; i ++) { // Enough to fill the sender's buffer
383
+ Map <String , Object > record = new HashMap <String , Object >();
384
+ record .put ("num" , i );
385
+ record .put ("str" , "name" + i );
386
+
387
+ if (bufferFull .getCount () > 0 ) {
388
+ // Fill the sender's buffer
389
+ if (!asyncSender .emit (tag , record )) {
390
+ // Buffer full. Need to recover the fluentd
391
+ bufferFull .countDown ();
392
+ Thread .sleep (2000 );
393
+ }
394
+ }
395
+ else {
396
+ // Flush the sender's buffer after the fluentd starts
397
+ asyncSender .emit (tag , record );
398
+ break ;
399
+ }
400
+ }
401
+
402
+ // close sender sockets
403
+ asyncSender .close ();
404
+
405
+ // wait for unpacking event data on fluentd
406
+ Thread .sleep (2000 );
407
+
408
+ // close mock server sockets
409
+ fluentd .close ();
410
+
411
+ // check data
412
+ assertEquals (0 , bufferFull .getCount ());
413
+ // check elist size. But, it cannot detect correct elist size because async sender runs independently.
414
+ assert (i - 5 <= elist .size ()|| elist .size () < i + 5 );
415
+ }
340
416
}
0 commit comments