Skip to content

Commit a498f97

Browse files
committed
add deal for fail msg
1 parent 8230c5e commit a498f97

File tree

3 files changed

+55
-47
lines changed

3 files changed

+55
-47
lines changed

src/main/java/com/dtstack/logstash/assembly/FilterAndOutputThread.java

+14-43
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ public class FilterAndOutputThread implements Runnable {
2626
private List<BaseFilter> filterProcessors;
2727
private List<BaseOutput> outputProcessors;
2828
private int batchSize;
29-
private List<Map<String, Object>> batchEvent = Lists
30-
.newCopyOnWriteArrayList();
31-
32-
private Map<Long, Integer> repeatOffest = Maps.newLinkedHashMap();
3329

3430
public FilterAndOutputThread(
3531
LinkedBlockingQueue<Map<String, Object>> inputQueue,
@@ -46,6 +42,19 @@ public void run() {
4642
A: while (true) {
4743
Map<String, Object> event = null;
4844
try {
45+
46+
//优先处理失败信息
47+
boolean dealFailMsg = false;
48+
for (BaseOutput bo : outputProcessors) {
49+
if (bo.isConsistency()) {
50+
dealFailMsg = dealFailMsg || bo.dealFailedMsg();
51+
}
52+
}
53+
54+
if(dealFailMsg){
55+
continue A;
56+
}
57+
4958
event = inputQueue.take();
5059
if (this.filterProcessors != null) {
5160
for (BaseFilter bf : filterProcessors) {
@@ -55,12 +64,7 @@ public void run() {
5564
}
5665
if (event != null && event.size() > 0) {
5766
for (BaseOutput bo : outputProcessors) {
58-
if (!bo.isConsistency()) {
59-
bo.process(event);
60-
} else {
61-
repeatEvent(bo, event, true, 0);
62-
repeatOffest.clear();
63-
}
67+
bo.process(event);
6468
}
6569
}
6670
} catch (Exception e) {
@@ -70,37 +74,4 @@ public void run() {
7074
}
7175
}
7276

73-
public void repeatEvent(BaseOutput bo, Map<String, Object> event,
74-
boolean source, int index) {
75-
if (bo.getAto().get() == 0) {
76-
bo.process(event);
77-
if (source) {
78-
batchEvent.add(event);
79-
}
80-
} else if (bo.getAto().get() == 1) {
81-
bo.getAto().getAndSet(0);
82-
bo.process(event);
83-
if (source) {
84-
batchEvent.clear();
85-
batchEvent.add(event);
86-
} else {
87-
long size = repeatOffest.size();
88-
repeatOffest.put(size, index);
89-
}
90-
} else if (bo.getAto().get() == 2) {
91-
bo.getAto().getAndSet(0);
92-
long nested = repeatOffest.size() + 1;
93-
int lastTimeIndex = nested == 1 ? 0 : repeatOffest.get(nested - 1);
94-
repeatOffest.put(nested, lastTimeIndex);
95-
int eventSize = batchEvent.size();
96-
for (int i = lastTimeIndex; i < eventSize; i++) {
97-
if (nested < repeatOffest.size()) {
98-
break;
99-
}
100-
repeatEvent(bo, batchEvent.get(i), false, i);
101-
}
102-
logger.warn(bo.getClass().getName() + "--->repeat event size:"
103-
+ batchEvent.size());
104-
}
105-
}
10677
}

src/main/java/com/dtstack/logstash/log/LogbackComponent.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
*/
2020
public class LogbackComponent extends LogComponent{
2121

22-
private static String formatePattern ="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n";
22+
private static String formatePattern ="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} [%file:%line] - %msg%n";
2323

2424
private static int day = 7;
2525

src/main/java/com/dtstack/logstash/outputs/BaseOutput.java

+40-3
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
import java.util.ArrayList;
55
import java.util.List;
66
import java.util.Map;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.TimeUnit;
79
import java.util.concurrent.atomic.AtomicInteger;
10+
811
import org.slf4j.Logger;
912
import org.slf4j.LoggerFactory;
13+
1014
import com.dtstack.logstash.render.FreeMarkerRender;
1115
import com.dtstack.logstash.render.TemplateRender;
16+
import com.google.common.collect.Queues;
1217

1318
/**
1419
*
@@ -32,7 +37,9 @@ public abstract class BaseOutput implements Cloneable, java.io.Serializable{
3237
protected AtomicInteger ato = new AtomicInteger(0);
3338

3439
//数据强一致性是否开启
35-
protected static boolean consistency =false;
40+
protected boolean consistency =false;
41+
42+
public BlockingQueue<Object> failedMsgQueue = Queues.newLinkedBlockingDeque();
3643

3744
public BaseOutput(Map config) {
3845
this.config = config;
@@ -50,6 +57,10 @@ public BaseOutput(Map config) {
5057
} else {
5158
IF = null;
5259
}
60+
61+
if(this.config.containsKey("consistency")){
62+
consistency = (boolean) this.config.get("consistency");
63+
}
5364
}
5465

5566
public abstract void prepare();
@@ -78,10 +89,36 @@ public AtomicInteger getAto() {
7889
return ato;
7990
}
8091

81-
82-
public static boolean isConsistency() {
92+
public boolean isConsistency() {
8393
return consistency;
8494
}
95+
96+
public boolean dealFailedMsg(){
97+
if(failedMsgQueue.size() == 0){
98+
return false;
99+
}
100+
101+
Object msg = null;
102+
while(true){
103+
msg = failedMsgQueue.poll();
104+
if(msg == null){
105+
break;
106+
}
107+
108+
sendFailedMsg(msg);
109+
}
110+
111+
return true;
112+
}
113+
114+
public void addFailedMsg(Object msg){
115+
if(consistency){
116+
failedMsgQueue.offer(msg);
117+
}
118+
}
119+
120+
public void sendFailedMsg(Object msg){
121+
}
85122

86123

87124
@Override

0 commit comments

Comments
 (0)