Skip to content

Commit cb20c25

Browse files
committed
Merge pull request #1 from fluent/master
pull master
2 parents 5e8f2c2 + cc34053 commit cb20c25

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ private void connect() throws IOException {
8282
}
8383
}
8484

85-
private void reconnect(boolean forceReconnection) throws IOException {
85+
private void reconnect() throws IOException {
8686
if (socket == null) {
8787
connect();
88-
} else if (forceReconnection || socket.isClosed() || (!socket.isConnected())) {
88+
} else if (socket.isClosed() || (!socket.isConnected())) {
8989
close();
9090
connect();
9191
}
@@ -178,7 +178,7 @@ private synchronized boolean send(byte[] bytes) {
178178
public synchronized void flush() {
179179
try {
180180
// check whether connection is established or not
181-
reconnect(!reconnector.isErrorHistoryEmpty());
181+
reconnect();
182182
// write data
183183
out.write(getBuffer());
184184
out.flush();
@@ -187,6 +187,7 @@ public synchronized void flush() {
187187
} catch (IOException e) {
188188
LOG.error(this.getClass().getName(), "flush", e);
189189
reconnector.addErrorHistory(System.currentTimeMillis());
190+
close();
190191
}
191192
}
192193

src/test/java/org/fluentd/logger/TestFluentLogger.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.fluentd.logger.sender.Event;
44
import org.fluentd.logger.sender.NullSender;
55
import org.fluentd.logger.util.MockFluentd;
6-
import org.junit.Ignore;
76
import org.junit.Test;
87
import org.msgpack.MessagePack;
98
import org.msgpack.unpacker.Unpacker;
@@ -20,6 +19,8 @@
2019
import java.util.concurrent.TimeUnit;
2120

2221
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
2324

2425
public class TestFluentLogger {
2526
private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class);
@@ -227,26 +228,29 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
227228

228229
// start loggers
229230
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
231+
assertFalse(logger.isConnected());
230232
{
231233
Map<String, Object> data = new HashMap<String, Object>();
232234
data.put("k1", "v1");
233235
data.put("k2", "v2");
234236
logger.log("test01", data);
235237
}
238+
assertTrue(logger.isConnected());
236239

237240
TimeUnit.MILLISECONDS.sleep(500);
238241
_logger.info("Closing the current fluentd instance");
239242
fluentd1.closeClientSockets();
240243
fluentd1.close();
241244

242245
TimeUnit.MILLISECONDS.sleep(500);
243-
246+
assertTrue(logger.isConnected());
244247
{
245248
Map<String, Object> data = new HashMap<String, Object>();
246249
data.put("k3", "v3");
247250
data.put("k4", "v4");
248251
logger.log("test01", data);
249252
}
253+
assertFalse(logger.isConnected());
250254

251255
final List<Event> elist2 = new ArrayList<Event>();
252256
MockFluentd fluentd2 = new MockFluentd(port, new MockFluentd.MockProcess() {
@@ -274,19 +278,18 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
274278
data.put("k6", "v6");
275279
logger.log("test01", data);
276280
}
281+
assertTrue(logger.isConnected());
277282

278283
// close loggers
279284
FluentLogger.closeAll();
280285
Thread.sleep(2000);
281286

282287
fluentd2.close();
283288

284-
285289
// wait for unpacking event data on fluentd
286290
TimeUnit.MILLISECONDS.sleep(2000);
287291
threadManager.join();
288292

289-
290293
// check data
291294
assertEquals(1, elist1.size());
292295
assertEquals("testtag.test01", elist1.get(0).tag);
@@ -361,6 +364,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
361364

362365
final FluentLogger logger = FluentLogger.getLogger(null, host, port);
363366
ExecutorService executorService = Executors.newFixedThreadPool(N);
367+
/*
368+
* Each thread emits the following events LOOP times
369+
* Thread#0: {'0' => 0}
370+
* Thread#1: {'0' => 0, '1' => 1}
371+
* Thread#2: {'0' => 0, '1' => 1, '2' => 2}
372+
* :
373+
* Thread#(N-1): {'0' => 0, '1' => 1, '2' => 2 ... '(N-1)' => (N-1)}
374+
*/
364375
for (int i = 0; i < N; i++) {
365376
final int ii = i;
366377
executorService.execute(new Runnable() {

src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525

2626
import static org.junit.Assert.assertEquals;
27+
import static org.junit.Assert.assertFalse;
2728
import static org.junit.Assert.assertTrue;
2829

2930

@@ -275,9 +276,11 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
275276
fluentd.start();
276277

277278
Sender sender = new RawSocketSender("localhost", port);
279+
assertFalse(sender.isConnected());
278280
Map<String, Object> data = new HashMap<String, Object>();
279281
data.put("key0", "v0");
280282
sender.emit("tag0", data);
283+
assertTrue(sender.isConnected());
281284

282285
// close fluentd to make the next sending failed
283286
TimeUnit.MILLISECONDS.sleep(500);
@@ -289,6 +292,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
289292
data = new HashMap<String, Object>();
290293
data.put("key0", "v1");
291294
sender.emit("tag0", data);
295+
assertFalse(sender.isConnected());
292296

293297
// wait to avoid the suppression of reconnection
294298
TimeUnit.MILLISECONDS.sleep(500);

0 commit comments

Comments
 (0)