41
41
42
42
import com .google .common .collect .Lists ;
43
43
import lombok .extern .slf4j .Slf4j ;
44
+ import org .apache .logging .log4j .LogManager ;
45
+ import org .apache .logging .log4j .Logger ;
44
46
45
47
import java .util .List ;
46
48
import java .util .Map ;
50
52
51
53
@ Slf4j
52
54
public class ExportTaskExecutor implements TransientTaskExecutor {
55
+ private static final Logger LOG = LogManager .getLogger (ExportTaskExecutor .class );
53
56
54
57
List <StatementBase > selectStmtLists ;
55
58
@@ -78,22 +81,32 @@ public Long getId() {
78
81
79
82
@ Override
80
83
public void execute () throws JobException {
84
+ LOG .debug ("[Export Task] taskId: {} starting execution" , taskId );
81
85
if (isCanceled .get ()) {
86
+ LOG .debug ("[Export Task] taskId: {} was already canceled before execution" , taskId );
82
87
throw new JobException ("Export executor has been canceled, task id: {}" , taskId );
83
88
}
89
+ LOG .debug ("[Export Task] taskId: {} updating state to EXPORTING" , taskId );
84
90
exportJob .updateExportJobState (ExportJobState .EXPORTING , taskId , null , null , null );
85
91
List <OutfileInfo > outfileInfoList = Lists .newArrayList ();
86
92
for (int idx = 0 ; idx < selectStmtLists .size (); ++idx ) {
93
+ LOG .debug ("[Export Task] taskId: {} processing statement {}/{}" ,
94
+ taskId , idx + 1 , selectStmtLists .size ());
87
95
if (isCanceled .get ()) {
96
+ LOG .debug ("[Export Task] taskId: {} canceled during execution at statement {}" , taskId , idx + 1 );
88
97
throw new JobException ("Export executor has been canceled, task id: {}" , taskId );
89
98
}
90
99
// check the version of tablets, skip if the consistency is in partition level.
91
100
if (exportJob .getExportTable ().isManagedTable () && !exportJob .isPartitionConsistency ()) {
101
+ LOG .debug ("[Export Task] taskId: {} checking tablet versions for statement {}" , taskId , idx + 1 );
92
102
try {
93
103
Database db = Env .getCurrentEnv ().getInternalCatalog ().getDbOrAnalysisException (
94
104
exportJob .getTableName ().getDb ());
95
105
OlapTable table = db .getOlapTableOrAnalysisException (exportJob .getTableName ().getTbl ());
106
+ LOG .debug ("[Export Lock] taskId: {}, table: {} about to acquire readLock" ,
107
+ taskId , table .getName ());
96
108
table .readLock ();
109
+ LOG .debug ("[Export Lock] taskId: {}, table: {} acquired readLock" , taskId , table .getName ());
97
110
try {
98
111
List <Long > tabletIds ;
99
112
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter ) selectStmtLists .get (idx );
@@ -108,18 +121,26 @@ public void execute() throws JobException {
108
121
long nowVersion = partition .getVisibleVersion ();
109
122
long oldVersion = exportJob .getPartitionToVersion ().get (partition .getName ());
110
123
if (nowVersion != oldVersion ) {
124
+ LOG .debug ("[Export Lock] taskId: {}, table: {} about to release readLock"
125
+ + "due to version mismatch" , taskId , table .getName ());
111
126
exportJob .updateExportJobState (ExportJobState .CANCELLED , taskId , null ,
112
127
CancelType .RUN_FAIL , "The version of tablet {" + tabletId + "} has changed" );
113
128
throw new JobException ("Export Job[{}]: Tablet {} has changed version, old version = {}"
114
129
+ ", now version = {}" , exportJob .getId (), tabletId , oldVersion , nowVersion );
115
130
}
116
131
}
117
132
} catch (Exception e ) {
133
+ LOG .debug ("[Export Lock] taskId: {}, table: {} about to release readLock"
134
+ + "due to exception: {}" , taskId , table .getName (), e .getMessage ());
118
135
exportJob .updateExportJobState (ExportJobState .CANCELLED , taskId , null ,
119
136
ExportFailMsg .CancelType .RUN_FAIL , e .getMessage ());
120
137
throw new JobException (e );
121
138
} finally {
139
+ LOG .debug ("[Export Lock] taskId: {}, table: {} releasing readLock in finally block" ,
140
+ taskId , table .getName ());
122
141
table .readUnlock ();
142
+ LOG .debug ("[Export Lock] taskId: {}, table: {} released readLock successfully" ,
143
+ taskId , table .getName ());
123
144
}
124
145
} catch (AnalysisException e ) {
125
146
exportJob .updateExportJobState (ExportJobState .CANCELLED , taskId , null ,
@@ -129,26 +150,39 @@ public void execute() throws JobException {
129
150
}
130
151
131
152
try (AutoCloseConnectContext r = buildConnectContext ()) {
153
+ LOG .debug ("[Export Task] taskId: {} executing statement {}" , taskId , idx + 1 );
132
154
stmtExecutor = new StmtExecutor (r .connectContext , selectStmtLists .get (idx ));
133
155
stmtExecutor .execute ();
134
156
if (r .connectContext .getState ().getStateType () == MysqlStateType .ERR ) {
157
+ LOG .debug ("[Export Task] taskId: {} failed with MySQL error: {}" , taskId ,
158
+ r .connectContext .getState ().getErrorMessage ());
135
159
exportJob .updateExportJobState (ExportJobState .CANCELLED , taskId , null ,
136
160
ExportFailMsg .CancelType .RUN_FAIL , r .connectContext .getState ().getErrorMessage ());
137
161
return ;
138
162
}
163
+ LOG .debug ("[Export Task] taskId: {} statement {} executed successfully" , taskId , idx + 1 );
139
164
OutfileInfo outfileInfo = getOutFileInfo (r .connectContext .getResultAttachedInfo ());
165
+ LOG .debug ("[Export Task] taskId: {} got outfile info for statement {}:"
166
+ + "fileNumber={}, totalRows={}, fileSize={}" ,
167
+ taskId , idx + 1 , outfileInfo .getFileNumber (),
168
+ outfileInfo .getTotalRows (), outfileInfo .getFileSize ());
140
169
outfileInfoList .add (outfileInfo );
141
170
} catch (Exception e ) {
171
+ LOG .debug ("[Export Task] taskId: {} failed with exception during statement {}: {}" ,
172
+ taskId , idx + 1 , e .getMessage (), e );
142
173
exportJob .updateExportJobState (ExportJobState .CANCELLED , taskId , null ,
143
174
ExportFailMsg .CancelType .RUN_FAIL , e .getMessage ());
144
175
throw new JobException (e );
145
176
}
146
177
}
147
178
if (isCanceled .get ()) {
179
+ LOG .debug ("[Export Task] taskId: {} canceled after processing all statements" , taskId );
148
180
throw new JobException ("Export executor has been canceled, task id: {}" , taskId );
149
181
}
182
+ LOG .debug ("[Export Task] taskId: {} completed successfully, updating state to FINISHED" , taskId );
150
183
exportJob .updateExportJobState (ExportJobState .FINISHED , taskId , outfileInfoList , null , null );
151
184
isFinished .getAndSet (true );
185
+ LOG .debug ("[Export Task] taskId: {} execution completed" , taskId );
152
186
}
153
187
154
188
@ Override
0 commit comments