3
3
import java .io .IOException ;
4
4
import java .util .List ;
5
5
import java .util .Map ;
6
- import java .util .concurrent .ExecutorService ;
7
- import java .util .concurrent .Executors ;
8
- import java .util .concurrent .LinkedBlockingQueue ;
9
6
import org .apache .commons .cli .CommandLine ;
10
- import org .apache .commons .lang3 .StringUtils ;
11
7
import org .slf4j .Logger ;
12
8
import org .slf4j .LoggerFactory ;
9
+ import com .dtstack .logstash .assembly .pthread .FilterThread ;
10
+ import com .dtstack .logstash .assembly .pthread .InputThread ;
11
+ import com .dtstack .logstash .assembly .pthread .OutputThread ;
12
+ import com .dtstack .logstash .assembly .qlist .InputQueueList ;
13
+ import com .dtstack .logstash .assembly .qlist .OutPutQueueList ;
13
14
import com .dtstack .logstash .configs .YamlConfig ;
14
- import com .dtstack .logstash .factory . FilterFactory ;
15
+ import com .dtstack .logstash .exception . ExceptionUtil ;
15
16
import com .dtstack .logstash .factory .InputFactory ;
16
- import com .dtstack .logstash .factory .OutputFactory ;
17
- import com .dtstack .logstash .filters .BaseFilter ;
18
17
import com .dtstack .logstash .inputs .BaseInput ;
19
18
import com .dtstack .logstash .outputs .BaseOutput ;
20
- import com .dtstack .logstash .property .SystemProperty ;
21
- import com .dtstack .logstash .utils .Machine ;
22
19
import com .google .common .collect .Lists ;
23
20
24
21
/**
32
29
public class AssemblyPipeline {
33
30
34
31
private static Logger logger = LoggerFactory .getLogger (AssemblyPipeline .class );
32
+
33
+ private InputQueueList initInputQueueList ;
35
34
36
- private static ExecutorService filterOutputExecutor =null ;
37
-
38
- private static ExecutorService inputExecutor =null ;
39
-
40
- private InputQueueList initInputQueueList =null ;
41
-
42
- private List <BaseInput > baseInputs =null ;
35
+ private OutPutQueueList initOutputQueueList ;
36
+
37
+ private List <BaseInput > baseInputs ;
43
38
44
39
private List <BaseOutput > allBaseOutputs = Lists .newCopyOnWriteArrayList ();
45
40
41
+
46
42
/**
47
43
* 组装管道
48
44
* @param cmdLine
49
45
* @return
50
46
* @throws IOException
51
47
*/
52
48
@ SuppressWarnings ({ "unchecked" , "rawtypes" })
53
- public InputQueueList assemblyPipeline (CommandLine cmdLine ) throws IOException {
49
+ public void assemblyPipeline (CommandLine cmdLine ) throws IOException {
54
50
try {
55
51
logger .debug ("load config start ..." );
56
52
Map configs = new YamlConfig ().parse (cmdLine .getOptionValue ("f" ));
57
53
logger .debug (configs .toString ());
58
54
logger .debug ("initInputQueueList start ..." );
59
- initInputQueueList =initInputQueueList (cmdLine );
55
+ initInputQueueList =InputQueueList .getInputQueueListInstance (CmdLineParams .getFilterWork (cmdLine ), CmdLineParams .getInputQueueSize (cmdLine ));
56
+ if (initInputQueueList ==null ||initInputQueueList .getQueueList ().size ()==0 ){
57
+ logger .error ("init inputQueueList is error" );
58
+ System .exit (1 );
59
+ }
60
60
List <Map > inputs = (List <Map >) configs .get ("inputs" );
61
61
if (inputs ==null ||inputs .size ()==0 ){
62
62
logger .error ("input plugin is not empty" );
63
63
System .exit (1 );
64
64
}
65
-
66
- List <Map > outputs = (List <Map >) configs .get ("outputs" );
65
+ logger .debug ("initOutputQueueList start ..." );
66
+ initOutputQueueList = OutPutQueueList .getOutPutQueueListInstance (CmdLineParams .getOutputWork (cmdLine ), CmdLineParams .getOutputQueueSize (cmdLine ));
67
+ if (initOutputQueueList ==null ||initOutputQueueList .getQueueList ().size ()==0 ){
68
+ logger .error ("init outputQueueList is error" );
69
+ System .exit (1 );
70
+ }
71
+ List <Map > outputs = (List <Map >) configs .get ("outputs" );
67
72
if (outputs ==null ||outputs .size ()==0 ){
68
73
logger .error ("output plugin is not empty" );
69
74
System .exit (1 );
70
75
}
71
76
List <Map > filters = (List <Map >) configs .get ("filters" );
72
77
logger .debug ("init input plugin start ..." );
73
- baseInputs =InputFactory .getBatchInstance (inputs , initInputQueueList );
78
+ baseInputs =InputFactory .getBatchInstance (inputs ,initInputQueueList );
74
79
initInputQueueList .startElectionIdleQueue ();
75
- if (isInputQueueSizeLog (cmdLine ))initInputQueueList .startLogQueueSize ();
80
+ initOutputQueueList .startElectionIdleQueue ();
81
+ if (CmdLineParams .isQueueSizeLog (cmdLine )){
82
+ initInputQueueList .startLogQueueSize ();
83
+ initOutputQueueList .startLogQueueSize ();
84
+ }
76
85
logger .debug ("input thread start ..." );
77
- initInputPutThread (baseInputs );
78
- logger .debug ("FilterAndOutput thread start ..." );
79
- initFilterAndOutputThread (outputs ,filters ,initInputQueueList .getQueueList (),getOutBatchSize (cmdLine ));
86
+ InputThread .initInputThread (baseInputs );
87
+ logger .debug ("filter thread start ..." );
88
+ FilterThread .initFilterThread (filters ,initInputQueueList ,initOutputQueueList );
89
+ logger .debug ("output thread start ..." );
90
+ OutputThread .initOutPutThread (outputs ,initOutputQueueList ,allBaseOutputs );
91
+ //add shutdownhook
92
+ ShutDownHook shutDownHook = new ShutDownHook (initInputQueueList ,initOutputQueueList ,baseInputs ,allBaseOutputs );
93
+ shutDownHook .addShutDownHook ();
80
94
}catch (Exception t ){
81
- logger .error ("assemblyPipeline is error:{}" , t . getCause ( ));
95
+ logger .error ("assemblyPipeline is error:{}" ,ExceptionUtil . getErrorMessage ( t ));
82
96
System .exit (1 );
83
97
}
84
- return initInputQueueList ;
85
98
}
86
-
87
-
88
- protected InputQueueList initInputQueueList (CommandLine cmdLine ){
89
- int filterWorks = getFilterWork (cmdLine );
90
- int queueSize = getInputQueueSize (cmdLine );
91
- InputQueueList queueList = new InputQueueList ();
92
- List <LinkedBlockingQueue <Map <String ,Object >>> list =queueList .getQueueList ();
93
- for (int i =0 ;i <filterWorks ;i ++){
94
- list .add (new LinkedBlockingQueue <Map <String ,Object >>(queueSize ));
95
- }
96
- return queueList ;
97
- }
98
-
99
-
100
- protected void initInputPutThread (List <BaseInput > baseInputs ) {
101
- // TODO Auto-generated method stub
102
- inputExecutor = Executors .newFixedThreadPool (baseInputs .size ());
103
- for (BaseInput input :baseInputs ){
104
- inputExecutor .submit (new InputThread (input ));
105
- }
106
- }
107
-
108
- @ SuppressWarnings ("rawtypes" )
109
- protected void initFilterAndOutputThread (List <Map > outputs , List <Map > filters , List <LinkedBlockingQueue <Map <String ,Object >>> queues ,int batchSize ) throws Exception {
110
- filterOutputExecutor = Executors .newFixedThreadPool (queues .size ());
111
- for (LinkedBlockingQueue <Map <String ,Object >> queue :queues ){
112
- List <BaseOutput > baseOutputs = OutputFactory .getBatchInstance (outputs );
113
- List <BaseFilter > baseFilters = FilterFactory .getBatchInstance (filters );
114
- filterOutputExecutor .submit (new FilterAndOutputThread (queue ,baseFilters ,baseOutputs ,batchSize ));
115
- allBaseOutputs .addAll (baseOutputs );
116
- }
117
- }
118
-
119
- /**
120
- * 获取filter线程数
121
- * @param line
122
- * @return
123
- */
124
- protected static int getFilterWork (CommandLine line ){
125
- String number =line .getOptionValue ("w" );
126
- int works =StringUtils .isNotBlank (number )?Integer .parseInt (number ):Machine .availableProcessors ();
127
- logger .warn ("getFilterWork--->" +works );
128
- return works ;
129
- }
130
-
131
- /**
132
- *获取queue size的大小
133
- * @param line
134
- * @return
135
- */
136
- protected static int getInputQueueSize (CommandLine line ){
137
- String number =line .getOptionValue ("q" );
138
- return StringUtils .isNotBlank (number )?Integer .parseInt (number ):Integer .parseInt (SystemProperty .getSystemProperty ("inputQueueSize" ));
139
- }
140
-
141
- /**
142
- * 获取batch size的大小
143
- * @param line
144
- * @return
145
- */
146
- protected static int getOutBatchSize (CommandLine line ){
147
- String number =line .getOptionValue ("b" );
148
- return StringUtils .isNotBlank (number )?Integer .parseInt (number ):Integer .parseInt (SystemProperty .getSystemProperty ("batchSize" ));
149
- }
150
-
151
-
152
- /**
153
- * 是否开启InputQueueSize log日志标准输出
154
- * @param line
155
- * @return
156
- */
157
- protected static boolean isInputQueueSizeLog (CommandLine line ){
158
- return line .hasOption ("t" );
159
- }
160
-
161
-
162
- public List <BaseInput > getBaseInputs () {
163
- return this .baseInputs ;
164
- }
165
-
166
-
167
- public List <BaseOutput > getAllBaseOutputs () {
168
- return allBaseOutputs ;
169
- }
170
-
171
- }
99
+ }
0 commit comments