From 55f1ef39ce2d3aba2af448ff8235f92567a94ab1 Mon Sep 17 00:00:00 2001 From: kimminkyeu Date: Sun, 8 Sep 2024 13:22:55 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=EA=B8=B0=EB=B3=B8=20Executor=20?= =?UTF-8?q?=EC=A0=84=EB=9E=B5=20=EC=82=AC=EC=9A=A9=20(ForkJoinPool)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../image/AsyncImageAnalyzePipeline.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/backend/src/main/java/org/example/image/AsyncImageAnalyzePipeline.java b/backend/src/main/java/org/example/image/AsyncImageAnalyzePipeline.java index 058f0d66..9d2ccb02 100644 --- a/backend/src/main/java/org/example/image/AsyncImageAnalyzePipeline.java +++ b/backend/src/main/java/org/example/image/AsyncImageAnalyzePipeline.java @@ -66,7 +66,8 @@ public class AsyncImageAnalyzePipeline { * baeldung.com - threadpool with completablefuture * */ - private final Executor singleThreadExecutor = Executors.newSingleThreadExecutor(); // 큐 + 싱글 쓰레드 + // Default folkJoinPool을 사용하는 것으로 변경합니다. + // private final Executor singleThreadExecutor = Executors.newSingleThreadExecutor(); // 큐 + 싱글 쓰레드 private final Set onGoingTasks = new HashSet<>(); // 현재 처리 중인 작업을 임시 기록 합니다. /** @@ -76,13 +77,13 @@ public class AsyncImageAnalyzePipeline { // @LogExecution public void analyze(Long imageLocationId) { CompletableFuture.runAsync( - () -> onStartTask.accept(imageLocationId), singleThreadExecutor + () -> onStartTask.accept(imageLocationId) ).thenRunAsync( - () -> extractClothAndColorTask.accept(imageAnalyzeManager, imageLocationId), singleThreadExecutor + () -> extractClothAndColorTask.accept(imageAnalyzeManager, imageLocationId) ).thenRunAsync( - () -> colorScoringTask.accept(imageRedisService, imageLocationId), singleThreadExecutor + () -> colorScoringTask.accept(imageRedisService, imageLocationId) ).whenCompleteAsync( - (__, err) -> onCompleteTask.accept(err, imageLocationId), singleThreadExecutor + (__, err) -> onCompleteTask.accept(err, imageLocationId) ); } @@ -106,9 +107,9 @@ public void updateScore(Long imageLocationId, UpdateScoreType updateScoreType) } CompletableFuture.runAsync( - () -> pendingTask.accept(UPDATE_SCORE_PENDING_WAIT_MS), singleThreadExecutor + () -> pendingTask.accept(UPDATE_SCORE_PENDING_WAIT_MS) ).thenRunAsync( - () -> this.updateScore(imageLocationId, updateScoreType), singleThreadExecutor + () -> this.updateScore(imageLocationId, updateScoreType) ).orTimeout( /* for safety, throw a TimeoutException in case of a timeout */ UPDATE_SCORE_MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -138,8 +139,6 @@ protected boolean isScoreInitialized(Long imageLocationId) { private final BiConsumer extractClothAndColorTask = (analyzeManager, imageLocationId) -> { try { - // add to in-task record - onGoingTasks.add(imageLocationId); analyzeManager.analyze(imageLocationId); log.debug("(1) Extract cloth info done - image {}", imageLocationId); } catch (IOException e) {