Skip to content

Commit 2ca4385

Browse files
Marina SelivanovaSimarMugattarov
Marina Selivanova
authored andcommitted
fix restartInProgressTasks for flow subtasks
1 parent 85db8df commit 2ca4385

File tree

1 file changed

+63
-55
lines changed

1 file changed

+63
-55
lines changed

cqp-core/src/main/java/com/quantori/cqp/core/task/service/TaskPersistenceServiceImpl.java

+63-55
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
public class TaskPersistenceServiceImpl implements TaskPersistenceService {
5454
private static final Logger logger =
5555
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
56+
private static final Duration STALE_THRESHOLD = Duration.ofMinutes(15);
57+
private static final Duration OUTDATED_THRESHOLD = Duration.ofHours(24);
58+
private static final Duration RESTART_FLAG_THRESHOLD = Duration.ofMinutes(5);
5659
private final ObjectMapper objectMapper = new ObjectMapper();
5760
private final ActorSystem<?> actorSystem;
5861
private final ActorRef<TaskServiceActor.Command> rootActorRef;
@@ -91,70 +94,75 @@ public TaskPersistenceServiceImpl(
9194

9295
@Override
9396
public void restartInProgressTasks() {
94-
Set<String> flowIdsForResume = new HashSet<>();
95-
Set<TaskStatus> subTasks = new HashSet<>();
97+
Set<String> resumedFlows = new HashSet<>();
9698
List<TaskStatus> all = taskStatusDao.findAll();
97-
all.forEach(
98-
task -> {
99-
try {
100-
boolean taskActorDoesNotExists = taskActorDoesNotExists(task.getTaskId());
101-
boolean taskWasNotUpdatedForOneMinute = taskWasNotUpdatedForOneMinute(task.getTaskId());
102-
if (taskActorDoesNotExists && taskWasNotUpdatedForOneMinute) {
103-
if (StreamTaskStatus.Status.IN_PROGRESS.equals(task.getStatus())) {
104-
if (Objects.isNull(task.getFlowId())) {
105-
resume(flowIdsForResume, task);
106-
} else {
107-
subTasks.add(task);
108-
}
109-
} else {
110-
deleteStatusTask(task.getTaskId());
111-
}
112-
}
113-
} catch (StreamTaskAlreadyRestartedException e) {
114-
logger.debug(
115-
"The task {} was most likely restarted by another application instance",
116-
task.getTaskId(),
117-
e);
118-
} catch (Exception e) {
119-
logger.error("Cannot restart the task {}", task.getTaskId(), e);
120-
}
99+
100+
for (TaskStatus task : all) {
101+
try {
102+
if (!isStaleAndUnassigned(task)) {
121103
checkOutdatedTask(task);
122-
});
123-
subTasks.forEach(
124-
task -> {
125-
if (!flowIdsForResume.contains(task.getFlowId())) {
126-
deleteStatusTask(task.getTaskId());
104+
continue;
105+
}
106+
107+
if (StreamTaskStatus.Status.IN_PROGRESS.equals(task.getStatus())) {
108+
String flowId = task.getFlowId();
109+
if (flowId == null) {
110+
resume(resumedFlows, task);
111+
} else {
112+
resumedFlows.add(flowId);
127113
}
128-
});
114+
} else {
115+
handleFinishedOrFailed(task);
116+
}
117+
} catch (StreamTaskAlreadyRestartedException e) {
118+
logger.debug("Task {} was likely restarted elsewhere", task.getTaskId(), e);
119+
} catch (Exception e) {
120+
logger.error("Cannot restart task {}", task.getTaskId(), e);
121+
}
122+
checkOutdatedTask(task);
123+
}
124+
125+
all.stream()
126+
.filter(t -> t.getFlowId() != null && !resumedFlows.contains(t.getFlowId()))
127+
.forEach(this::handleFinishedOrFailed);
129128
}
130129

131-
private void checkOutdatedTask(TaskStatus taskStatus) {
132-
if (taskStatus.getStatus().equals(StreamTaskStatus.Status.IN_PROGRESS)
133-
|| taskStatus.getStatus().equals(StreamTaskStatus.Status.INITIATED)) {
134-
if ((Instant.now().getEpochSecond()
135-
- taskStatus.getCreatedDate().toInstant().getEpochSecond())
136-
> 24 * 60 * 60) {
137-
taskStatus.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR);
138-
taskStatusDao.save(taskStatus);
139-
return;
140-
}
141-
if ((Instant.now().getEpochSecond()
142-
- taskStatus.getUpdatedDate().toInstant().getEpochSecond())
143-
> 300
144-
&& taskStatus.getRestartFlag() > 0) {
145-
taskStatus.setRestartFlag(0);
146-
taskStatusDao.save(taskStatus);
147-
}
130+
private boolean isStaleAndUnassigned(TaskStatus ts) {
131+
Instant now = Instant.now();
132+
return taskActorDoesNotExists(ts.getTaskId())
133+
&& Duration.between(ts.getUpdatedDate().toInstant(), now).compareTo(STALE_THRESHOLD) > 0;
134+
}
135+
136+
private void handleFinishedOrFailed(TaskStatus ts) {
137+
String flowId = ts.getFlowId();
138+
if (flowId == null || taskActorDoesNotExists(UUID.fromString(flowId))) {
139+
deleteStatusTask(ts.getTaskId());
140+
} else {
141+
logger.debug("Skipping delete of child {} because parent {} still has an actor",
142+
ts.getTaskId(), flowId);
148143
}
149144
}
150145

151-
private boolean taskWasNotUpdatedForOneMinute(UUID taskId) {
152-
var taskStatus = taskStatusDao.findById(taskId);
146+
private void checkOutdatedTask(TaskStatus ts) {
147+
Instant now = Instant.now();
148+
149+
boolean inProgOrInit = StreamTaskStatus.Status.IN_PROGRESS.equals(ts.getStatus())
150+
|| StreamTaskStatus.Status.INITIATED.equals(ts.getStatus());
151+
if (!inProgOrInit) return;
153152

154-
return taskStatus.isPresent()
155-
&& (Instant.now().getEpochSecond()
156-
- taskStatus.get().getUpdatedDate().toInstant().getEpochSecond()
157-
> 60);
153+
Instant created = ts.getCreatedDate().toInstant();
154+
if (Duration.between(created, now).compareTo(OUTDATED_THRESHOLD) > 0) {
155+
ts.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR);
156+
taskStatusDao.save(ts);
157+
return;
158+
}
159+
160+
Instant updated = ts.getUpdatedDate().toInstant();
161+
if (Duration.between(updated, now).compareTo(RESTART_FLAG_THRESHOLD) > 0
162+
&& ts.getRestartFlag() > 0) {
163+
ts.setRestartFlag(0);
164+
taskStatusDao.save(ts);
165+
}
158166
}
159167

160168
private void resume(Set<String> flowIdsForResume, TaskStatus task) {

0 commit comments

Comments
 (0)