@@ -104,7 +104,7 @@ public void restartInProgressTasks() {
104
104
continue ;
105
105
}
106
106
107
- if (StreamTaskStatus .Status .IN_PROGRESS . equals ( task .getStatus () )) {
107
+ if (StreamTaskStatus .Status .IN_PROGRESS == task .getStatus ()) {
108
108
String flowId = task .getFlowId ();
109
109
if (flowId == null ) {
110
110
resume (resumedFlows , task );
@@ -127,41 +127,41 @@ public void restartInProgressTasks() {
127
127
.forEach (this ::handleFinishedOrFailed );
128
128
}
129
129
130
- private boolean isStaleAndUnassigned (TaskStatus ts ) {
130
+ private boolean isStaleAndUnassigned (TaskStatus taskStatus ) {
131
131
Instant now = Instant .now ();
132
- return taskActorDoesNotExists (ts .getTaskId ())
133
- && Duration .between (ts .getUpdatedDate ().toInstant (), now ).compareTo (STALE_THRESHOLD ) > 0 ;
132
+ return taskActorDoesNotExists (taskStatus .getTaskId ())
133
+ && Duration .between (taskStatus .getUpdatedDate ().toInstant (), now ).compareTo (STALE_THRESHOLD ) > 0 ;
134
134
}
135
135
136
- private void handleFinishedOrFailed (TaskStatus ts ) {
137
- String flowId = ts .getFlowId ();
136
+ private void handleFinishedOrFailed (TaskStatus taskStatus ) {
137
+ String flowId = taskStatus .getFlowId ();
138
138
if (flowId == null || taskActorDoesNotExists (UUID .fromString (flowId ))) {
139
- deleteStatusTask (ts .getTaskId ());
139
+ deleteStatusTask (taskStatus .getTaskId ());
140
140
} else {
141
141
logger .debug ("Skipping delete of child {} because parent {} still has an actor" ,
142
- ts .getTaskId (), flowId );
142
+ taskStatus .getTaskId (), flowId );
143
143
}
144
144
}
145
145
146
- private void checkOutdatedTask (TaskStatus ts ) {
146
+ private void checkOutdatedTask (TaskStatus taskStatus ) {
147
147
Instant now = Instant .now ();
148
148
149
- boolean inProgOrInit = StreamTaskStatus .Status .IN_PROGRESS . equals ( ts . getStatus () )
150
- || StreamTaskStatus .Status .INITIATED . equals ( ts . getStatus () );
149
+ boolean inProgOrInit = StreamTaskStatus .Status .IN_PROGRESS == taskStatus . getStatus ()
150
+ || StreamTaskStatus .Status .INITIATED == taskStatus . getStatus ();
151
151
if (!inProgOrInit ) return ;
152
152
153
- Instant created = ts .getCreatedDate ().toInstant ();
153
+ Instant created = taskStatus .getCreatedDate ().toInstant ();
154
154
if (Duration .between (created , now ).compareTo (OUTDATED_THRESHOLD ) > 0 ) {
155
- ts .setStatus (StreamTaskStatus .Status .COMPLETED_WITH_ERROR );
156
- taskStatusDao .save (ts );
155
+ taskStatus .setStatus (StreamTaskStatus .Status .COMPLETED_WITH_ERROR );
156
+ taskStatusDao .save (taskStatus );
157
157
return ;
158
158
}
159
159
160
- Instant updated = ts .getUpdatedDate ().toInstant ();
160
+ Instant updated = taskStatus .getUpdatedDate ().toInstant ();
161
161
if (Duration .between (updated , now ).compareTo (RESTART_FLAG_THRESHOLD ) > 0
162
- && ts .getRestartFlag () > 0 ) {
163
- ts .setRestartFlag (0 );
164
- taskStatusDao .save (ts );
162
+ && taskStatus .getRestartFlag () > 0 ) {
163
+ taskStatus .setRestartFlag (0 );
164
+ taskStatusDao .save (taskStatus );
165
165
}
166
166
}
167
167
0 commit comments