Skip to content

Commit

Permalink
BIGTOP-4231: Optimize job execution model
Browse files Browse the repository at this point in the history
  • Loading branch information
timyuer committed Sep 21, 2024
1 parent 8a9013c commit 9190504
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public abstract class AbstractJob implements Job {
Expand Down Expand Up @@ -68,7 +70,9 @@ public AbstractJob(JobContext jobContext) {
if (jobContext.getJobId() != null) {
persistState();
List<StagePO> stagePOs = stageDao.findByJobId(jobContext.getJobId());
jobContext.setStagePOS(stagePOs);
Map<Integer, Long> existsStageMap =
stagePOs.stream().collect(Collectors.toMap(StagePO::getOrder, StagePO::getId));
jobContext.setExistsStageMap(existsStageMap);

} else {
JobPO jobPO = new JobPO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
*/
package org.apache.bigtop.manager.server.command.job;

import org.apache.bigtop.manager.dao.po.StagePO;
import org.apache.bigtop.manager.server.model.dto.CommandDTO;

import lombok.Data;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

@Data
Expand All @@ -34,7 +33,7 @@ public class JobContext {
/* internal */
private Long jobId;

private List<StagePO> stagePOS;
private Map<Integer, Long> existsStageMap;

private final AtomicInteger atomicInteger = new AtomicInteger(0);
/* internal */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,8 @@ public AbstractStage(StageContext stageContext) {
state = JobState.PENDING;

// obtain all tasks
List<StagePO> stagePOs = stageContext.getStagePOs();
if (stagePOs != null && !stagePOs.isEmpty()) {
for (StagePO stagePO : stagePOs) {
if (getStageContext().getOrder() == stagePO.getOrder()) {
stageContext.setStageId(stagePO.getId());
break;
}
}
if (getStageContext().isRetry()) {
persistState();

} else {
StagePO stagePO = new StagePO();
stagePO.setState(getState().getName());
Expand All @@ -88,7 +80,7 @@ public AbstractStage(StageContext stageContext) {

beforeCreateTasks();

if (stagePOs != null && !stagePOs.isEmpty()) {
if (getStageContext().isRetry()) {
List<TaskPO> taskPOS = taskDao.findByStageId(stageContext.getStageId());
for (TaskPO taskPO : taskPOS) {
TaskContext taskContext = TaskContext.fromStageContext(stageContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bigtop.manager.server.command.stage;

import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.dao.po.StagePO;
import org.apache.bigtop.manager.server.command.job.JobContext;
import org.apache.bigtop.manager.server.model.dto.CommandDTO;
import org.apache.bigtop.manager.server.model.dto.ComponentDTO;
Expand All @@ -31,6 +30,7 @@
import lombok.Data;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

@Data
Expand Down Expand Up @@ -60,30 +60,33 @@ public class StageContext {

private List<RepoDTO> repoInfoList;

private List<StagePO> stagePOs;
private boolean retry = false;

public static StageContext fromJobContext(JobContext jobContext) {
return fromCommandDTO(
jobContext.getCommandDTO(),
jobContext.getJobId(),
jobContext.getStagePOS(),
jobContext.getExistsStageMap(),
jobContext.getAtomicInteger());
}

public static StageContext fromPayload(
String payload, Long jobId, List<StagePO> stagePOs, AtomicInteger atomicInteger) {
String payload, Long jobId, Map<Integer, Long> existsStageMap, AtomicInteger atomicInteger) {
CommandDTO commandDTO = JsonUtils.readFromString(payload, CommandDTO.class);
return fromCommandDTO(commandDTO, jobId, stagePOs, atomicInteger);
return fromCommandDTO(commandDTO, jobId, existsStageMap, atomicInteger);
}

public static StageContext fromCommandDTO(
CommandDTO commandDTO, Long jobId, List<StagePO> stagePOs, AtomicInteger atomicInteger) {
CommandDTO commandDTO, Long jobId, Map<Integer, Long> existsStageMap, AtomicInteger atomicInteger) {
StageContext context = new StageContext();
// auto-Increment
context.setOrder(atomicInteger.incrementAndGet());
context.setClusterId(commandDTO.getClusterId());
context.setJobId(jobId);
context.setStagePOs(stagePOs);
if (existsStageMap != null && !existsStageMap.isEmpty() && existsStageMap.containsKey(context.getOrder())) {
context.setStageId(existsStageMap.get(context.getOrder()));
context.setRetry(true);
}

switch (commandDTO.getCommandLevel()) {
case CLUSTER -> fromClusterCommandPayload(context, commandDTO);
Expand Down

0 comments on commit 9190504

Please sign in to comment.