From ed41fae3bfbbc61dd132419ef7fedef2ee2f5ddf Mon Sep 17 00:00:00 2001 From: s-pakinamkhaled <121400474+s-pakinamkhaled@users.noreply.github.com> Date: Sat, 7 Dec 2024 21:05:29 +0200 Subject: [PATCH] fixed code This project contributed to the Java Design Patterns repository by addressing an issue related to eliminating busy-waiting loops. The code was refactored to improve performance, adhere to Object-Oriented (OO) principles, and enhance maintainability. Key fixes included replacing busy-waiting with efficient mechanisms like wait/notify and ScheduledExecutorService, improving thread safety and code clarity. Comprehensive unit tests were added to validate functionality under various scenarios, including concurrency and interruption handling. These changes ensure robust, modern implementations aligned with OOAD principles. --- .../{Retry.java => HandleErrorIssue.java} | 71 ++++-- .../logaggregation/LogAggregator.java | 67 +++-- .../com/iluwatar/queue/load/leveling/App.java | 46 ++-- .../queue/load/leveling/ServiceExecutor.java | 57 ++++- .../main/java/com/iluwatar/retry/Retry.java | 74 ++++-- .../retry/RetryExponentialBackoff.java | 220 +++++++++------- .../java/com/iluwatar/sessionserver/App.java | 78 +++--- .../java/com/iluwatar/twin/BallThread.java | 51 ++-- .../com/iluwatar/twin/BallThreadTest.java | 237 +++++++++++------- 9 files changed, 586 insertions(+), 315 deletions(-) rename commander/src/main/java/com/iluwatar/commander/{Retry.java => HandleErrorIssue.java} (77%) diff --git a/commander/src/main/java/com/iluwatar/commander/Retry.java b/commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java similarity index 77% rename from commander/src/main/java/com/iluwatar/commander/Retry.java rename to commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java index 71614668254b..c06f6edfc376 100644 --- a/commander/src/main/java/com/iluwatar/commander/Retry.java +++ b/commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java @@ -23,50 +23,81 @@ * THE SOFTWARE. */ package com.iluwatar.commander; - import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -59,6 +62,7 @@ public interface HandleErrorIssue { + /** * Retry pattern. * * @param is the type of object passed into HandleErrorIssue as a parameter. */ - public class Retry { - /** * Operation Interface will define method to be implemented. */ - public interface Operation { void operation(List list) throws Exception; } - /** * HandleErrorIssue defines how to handle errors. * * @param is the type of object to be passed into the method as parameter. */ - public interface HandleErrorIssue { void handleIssue(T obj, Exception e); } private static final SecureRandom RANDOM = new SecureRandom(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Operation op; private final HandleErrorIssue handleError; private final int maxAttempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -86,26 +90,25 @@ public interface HandleErrorIssue { + private final long maxDelay; private final AtomicInteger attempts; private final Predicate test; private final List errors; - Retry(Operation op, HandleErrorIssue handleError, int maxAttempts, long maxDelay, Predicate... ignoreTests) { this.op = op; @@ -77,7 +108,6 @@ public interface HandleErrorIssue { this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); } - /** * Performing the operation with retries. * @@ -86,26 +116,25 @@ public interface HandleErrorIssue { */ public void perform(List list, T obj) { - do { + scheduler.schedule(() -> { try { op.operation(list); - return; - } catch (Exception e) { + }catch (Exception e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { this.handleError.handleIssue(obj, e); + scheduler.shutdown(); return; //return here... don't go further } - try { - long testDelay = - (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); - long delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } + perform(list, obj); } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay(){ + long testDelay = + (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); } -} +} \ No newline at end of file diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..7ec33c2750fd 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -25,12 +25,22 @@ package com.iluwatar.logaggregation; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; + + + + + + + Expand All + + @@ -45,7 +45,7 @@ public class LogAggregator { + /** * Responsible for collecting and buffering logs from different services. * Once the logs reach a certain threshold or after a certain time interval, @@ -40,15 +50,31 @@ */ @Slf4j public class LogAggregator { - private static final int BUFFER_THRESHOLD = 3; private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger logCount = new AtomicInteger(0); /** + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -90,8 +90,8 @@ public void collectLog(LogEntry logEntry) { + * constructor of LogAggregator. * * @param centralLogStore central log store implement @@ -59,7 +85,6 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { this.minLogLevel = minLogLevel; startBufferFlusher(); } - /** * Collects a given log entry, and filters it by the defined log level. * @@ -70,19 +95,15 @@ public void collectLog(LogEntry logEntry) { LOGGER.warn("Log level or threshold level is null. Skipping."); return; } - if (logEntry.getLevel().compareTo(minLogLevel) < 0) { LOGGER.debug("Log level below threshold. Skipping."); return; } - buffer.offer(logEntry); - if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) { flushBuffer(); } } - /** * Stops the log aggregator service and flushes any remaining logs to * the central log store. @@ -90,13 +111,23 @@ public void collectLog(LogEntry logEntry) { * @throws InterruptedException If any thread has interrupted the current thread. */ public void stop() throws InterruptedException { - executorService.shutdownNow(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { LOGGER.error("Log aggregator did not terminate."); } flushBuffer(); - } + + + + + + + Expand All + + @@ -106,15 +106,7 @@ private void flushBuffer() { + + } private void flushBuffer() { LogEntry logEntry; while ((logEntry = buffer.poll()) != null) { @@ -106,15 +137,7 @@ private void flushBuffer() { } private void startBufferFlusher() { - executorService.execute(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(5000); // Flush every 5 seconds. - flushBuffer(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }); + //flush every 5 seconds + scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS); } -} +} \ No newline at end of file diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java index 7042ff7b79a2..2d233ced73e9 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java @@ -26,10 +26,26 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -104,12 +103,7 @@ public static void main(String[] args) { + * Many solutions in the cloud involve running tasks that invoke services. In this environment, if a * service is subjected to intermittent heavy loads, it can cause performance or reliability * issues. @@ -60,58 +76,54 @@ */ @Slf4j public class App { - //Executor shut down time limit. private static final int SHUTDOWN_TIME = 15; - /** * Program entry point. * * @param args command line args */ public static void main(String[] args) { - // An Executor that provides methods to manage termination and methods that can // produce a Future for tracking progress of one or more asynchronous tasks. ExecutorService executor = null; - try { // Create a MessageQueue object. var msgQueue = new MessageQueue(); - LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads."); - // Create three TaskGenerator threads. Each of them will submit different number of jobs. final var taskRunnable1 = new TaskGenerator(msgQueue, 5); final var taskRunnable2 = new TaskGenerator(msgQueue, 1); final var taskRunnable3 = new TaskGenerator(msgQueue, 2); - // Create e service which should process the submitted jobs. final var srvRunnable = new ServiceExecutor(msgQueue); - // Create a ThreadPool of 2 threads and // submit all Runnable task for execution to executor executor = Executors.newFixedThreadPool(2); executor.submit(taskRunnable1); executor.submit(taskRunnable2); executor.submit(taskRunnable3); - // submitting serviceExecutor thread to the Executor service. executor.submit(srvRunnable); - // Initiates an orderly shutdown. LOGGER.info("Initiating shutdown." + " Executor will shutdown only after all the Threads are completed."); executor.shutdown(); - // Wait for SHUTDOWN_TIME seconds for all the threads to complete - // their tasks and then shut down the executor and then exit. - if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { - LOGGER.info("Executor was shut down and Exiting."); - executor.shutdownNow(); - } + srvRunnable.shutdown(SHUTDOWN_TIME); } catch (Exception e) { LOGGER.error(e.getMessage()); } + + + + + + + + Expand Down + + + } } \ No newline at end of file diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..d056902462f3 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -25,6 +25,9 @@ package com.iluwatar.queue.load.leveling; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and @@ -32,31 +35,59 @@ */ @Slf4j public class ServiceExecutor implements Runnable { - private final MessageQueue msgQueue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; - } + + + + + + + Expand All + + @@ -43,19 +46,26 @@ public ServiceExecutor(MessageQueue msgQueue) { + + } /** * The ServiceExecutor thread will retrieve each message and process it. */ public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - var msg = msgQueue.retrieveMsg(); + scheduler.scheduleWithFixedDelay(() -> { + var msg = msgQueue.retrieveMsg(); - if (null != msg) { - LOGGER.info(msg + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve .. "); - } + if (null != msg) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + } + }, 0, 1, TimeUnit.SECONDS); + } - Thread.sleep(1000); + public void shutdown(int shutdownTime) { + // Wait for SHUTDOWN_TIME seconds for all the threads to complete + // their tasks and then shut down the executor and then exit. + try { + if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) { + LOGGER.info("Executor was shut down and Exiting."); + scheduler.shutdownNow(); } - } catch (Exception e) { + } catch (InterruptedException e) { LOGGER.error(e.getMessage()); } } -} + + + + + + + + Expand Down + + + +} \ No newline at end of file diff --git a/retry/src/main/java/com/iluwatar/retry/Retry.java b/retry/src/main/java/com/iluwatar/retry/Retry.java index ad9580454993..5829e1eb666d 100644 --- a/retry/src/main/java/com/iluwatar/retry/Retry.java +++ b/retry/src/main/java/com/iluwatar/retry/Retry.java @@ -23,14 +23,29 @@ * THE SOFTWARE. */ package com.iluwatar.retry; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; + + + + + + + + Expand All + + @@ -38,6 +43,7 @@ + /** * Decorates {@link BusinessOperation business operation} with "retry" capabilities. * @@ -38,12 +53,29 @@ */ public final class Retry implements BusinessOperation { private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxAttempts; private final long delay; private final AtomicInteger attempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -88,22 +94,36 @@ public int attempts() { + private final Predicate test; private final List errors; - /** * Ctor. * @@ -67,7 +99,6 @@ public Retry( this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); } - /** * The errors encountered while retrying, in the encounter order. * @@ -76,7 +107,6 @@ public Retry( public List errors() { return Collections.unmodifiableList(this.errors); } - /** * The number of retries performed. * @@ -88,22 +118,36 @@ public int attempts() { @Override public T perform() throws BusinessException { - do { + final CompletableFuture future = new CompletableFuture<>(); + + performRetry(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be){ + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performRetry(CompletableFuture future){ + scheduler.schedule(() -> { try { - return this.op.perform(); - } catch (BusinessException e) { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; + future.completeExceptionally(e); + return; } - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore - } + performRetry(future); } - } while (true); + }, this.delay, TimeUnit.MILLISECONDS); } -} +} \ No newline at end of file diff --git a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java index 1661095b7298..d43603b1eb1a 100644 --- a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java +++ b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java @@ -22,91 +22,139 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.iluwatar.retry; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; + package com.iluwatar.retry; -/** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. - * - * @param the remote op's return type - */ -public final class RetryExponentialBackoff implements BusinessOperation { - private static final Random RANDOM = new Random(); - private final BusinessOperation op; - private final int maxAttempts; - private final long maxDelay; - private final AtomicInteger attempts; - private final Predicate test; - private final List errors; - - /** - * Ctor. - * - * @param op the {@link BusinessOperation} to retry - * @param maxAttempts number of times to retry - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions - * will be ignored if no tests are given - */ - @SafeVarargs - public RetryExponentialBackoff( - BusinessOperation op, - int maxAttempts, - long maxDelay, - Predicate... ignoreTests - ) { - this.op = op; - this.maxAttempts = maxAttempts; - this.maxDelay = maxDelay; - this.attempts = new AtomicInteger(); - this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); - this.errors = new ArrayList<>(); - } - - /** - * The errors encountered while retrying, in the encounter order. - * - * @return the errors encountered while retrying - */ - public List errors() { - return Collections.unmodifiableList(this.errors); - } - - /** - * The number of retries performed. - * - * @return the number of retries performed - */ - public int attempts() { - return this.attempts.intValue(); - } - - @Override - public T perform() throws BusinessException { - do { - try { - return this.op.perform(); - } catch (BusinessException e) { - this.errors.add(e); - - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; - } - - try { - var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); - var delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } - } - } while (true); - } -} + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collections; + import java.util.List; + import java.util.Random; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.function.Predicate; + + + + + + + + + Expand All + + @@ -40,6 +46,7 @@ + + /** + * Decorates {@link BusinessOperation business operation} with "retry" capabilities. + * + * @param the remote op's return type + */ + public final class RetryExponentialBackoff implements BusinessOperation { + private static final Random RANDOM = new Random(); + private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final int maxAttempts; + private final long maxDelay; + private final AtomicInteger attempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -89,24 +96,41 @@ public int attempts() { + + private final Predicate test; + private final List errors; + /** + * Ctor. + * + * @param op the {@link BusinessOperation} to retry + * @param maxAttempts number of times to retry + * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions + * will be ignored if no tests are given + */ + @SafeVarargs + public RetryExponentialBackoff( + BusinessOperation op, + int maxAttempts, + long maxDelay, + Predicate... ignoreTests + ) { + this.op = op; + this.maxAttempts = maxAttempts; + this.maxDelay = maxDelay; + this.attempts = new AtomicInteger(); + this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); + this.errors = new ArrayList<>(); + } + /** + * The errors encountered while retrying, in the encounter order. + * + * @return the errors encountered while retrying + */ + public List errors() { + return Collections.unmodifiableList(this.errors); + } + /** + * The number of retries performed. + * + * @return the number of retries performed + */ + public int attempts() { + return this.attempts.intValue(); + } + + @Override + public T perform() throws BusinessException { + final CompletableFuture future = new CompletableFuture<>(); + + performWithRetries(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be) { + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performWithRetries(CompletableFuture future) { + scheduler.schedule(() -> { + try { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e) { + this.errors.add(e); + + if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { + future.completeExceptionally(e); + return; + } + + performWithRetries(future); + } + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay() { + var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); + } \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..ee655e7e7878 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -23,17 +23,30 @@ * THE SOFTWARE. */ package com.iluwatar.sessionserver; - import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Instant; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** + + + + + + + + Expand All + + @@ -54,8 +57,9 @@ + * The server session pattern is a behavioral design pattern concerned with assigning the responsibility * of storing session data on the server side. Within the context of stateless protocols like HTTP all * requests are isolated events independent of previous requests. In order to create sessions during @@ -49,16 +62,27 @@ * requests in a list. When a user logs out the session identifier is deleted from the list along with * the appropriate user session data, which is handle by the ({@link LogoutHandler}) class. */ - @Slf4j public class App { // Map to store session data (simulated using a HashMap) - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); + private static final Map sessions = new ConcurrentHashMap<>(); + private static final Map sessionCreationTimes = new ConcurrentHashMap<>(); + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private static final long SESSION_EXPIRATION_TIME = 10000; /** + + + + + + + + Expand All + + @@ -81,31 +85,25 @@ public static void main(String[] args) throws IOException { + * Main entry point. * @param args arguments * @throws IOException ex @@ -66,46 +90,36 @@ public class App { public static void main(String[] args) throws IOException { // Create HTTP server listening on port 8000 HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); - // Set up session management endpoints server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); - // Start the server server.start(); - // Start background task to check for expired sessions sessionExpirationTask(); - LOGGER.info("Server started. Listening on port 8080..."); } private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } - } + scheduler.scheduleWithFixedDelay(() -> { + try { + LOGGER.info("Session expiration checker started..."); + + Instant currentTime = Instant.now(); + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + sessions.remove(entry.getKey()); + iterator.remove(); } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } + LOGGER.info("Session expiration checker finished!"); + + } catch (Exception e) { + LOGGER.error("An error occured: ", e); } - }).start(); + }, SESSION_EXPIRATION_TIME, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } } \ No newline at end of file diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..2e5ed21417c0 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -24,55 +24,76 @@ */ package com.iluwatar.twin; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.Setter; import lombok.extern.slf4j.Slf4j; + + + + + + + + Expand All + + @@ -40,24 +43,19 @@ public class BallThread extends Thread { + /** * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend * and resume. It holds the reference of {@link BallItem} to delegate the draw task. */ - @Slf4j public class BallThread extends Thread { - @Setter private BallItem twin; private volatile boolean isSuspended; - private volatile boolean isRunning = true; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + /** * Run the thread. */ public void run() { - - while (isRunning) { + scheduler.scheduleWithFixedDelay(() -> { if (!isSuspended) { twin.draw(); twin.move(); } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + }, 0, 250, TimeUnit.MILLISECONDS); } public void suspendMe() { + + + + + + + + Expand All + + @@ -70,9 +68,14 @@ public void resumeMe() { + isSuspended = true; LOGGER.info("Begin to suspend BallThread"); } - public void resumeMe() { isSuspended = false; LOGGER.info("Begin to resume BallThread"); } + /** + * Stop the scheduled task. + */ public void stopMe() { - this.isRunning = false; this.isSuspended = true; + if (scheduler != null) { + scheduler.shutdown(); + } } -} - +} \ No newline at end of file diff --git a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java index 26cf78509dcf..c0f1cdbf2bea 100644 --- a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java +++ b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java @@ -21,100 +21,149 @@ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. - */ -package com.iluwatar.twin; - -import static java.lang.Thread.UncaughtExceptionHandler; -import static java.lang.Thread.sleep; -import static java.time.Duration.ofMillis; -import static org.junit.jupiter.api.Assertions.assertTimeout; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import org.junit.jupiter.api.Test; -/** - * BallThreadTest - * */ -class BallThreadTest { - - /** - * Verify if the {@link BallThread} can be resumed - */ - @Test - void testSuspend() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - - final var ballItem = mock(BallItem.class); - ballThread.setTwin(ballItem); - - ballThread.start(); - sleep(200); - verify(ballItem, atLeastOnce()).draw(); - verify(ballItem, atLeastOnce()).move(); - ballThread.suspendMe(); - - sleep(1000); - - ballThread.stopMe(); - ballThread.join(); - - verifyNoMoreInteractions(ballItem); - }); - } - - /** - * Verify if the {@link BallThread} can be resumed - */ - @Test - void testResume() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - - final var ballItem = mock(BallItem.class); - ballThread.setTwin(ballItem); - - ballThread.suspendMe(); - ballThread.start(); - - sleep(1000); - - verifyNoMoreInteractions(ballItem); - - ballThread.resumeMe(); - sleep(300); - verify(ballItem, atLeastOnce()).draw(); - verify(ballItem, atLeastOnce()).move(); - - ballThread.stopMe(); - ballThread.join(); - - verifyNoMoreInteractions(ballItem); - }); - } - - /** - * Verify if the {@link BallThread} is interruptible - */ - @Test - void testInterrupt() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - final var exceptionHandler = mock(UncaughtExceptionHandler.class); - ballThread.setUncaughtExceptionHandler(exceptionHandler); - ballThread.setTwin(mock(BallItem.class)); - ballThread.start(); - ballThread.interrupt(); - ballThread.join(); - verify(exceptionHandler).uncaughtException(eq(ballThread), any(RuntimeException.class)); - verifyNoMoreInteractions(exceptionHandler); - }); - } -} \ No newline at end of file + package com.iluwatar.twin; + + + + + + import static java.lang.Thread.sleep; + + import static java.time.Duration.ofMillis; + + import static org.junit.jupiter.api.Assertions.assertTimeout; + + + + import static org.mockito.Mockito.atLeastOnce; + + import static org.mockito.Mockito.mock; + + import static org.mockito.Mockito.verify; + import static org.mockito.Mockito.verifyNoMoreInteractions; + + import org.junit.jupiter.api.Test; + + /** + * BallThreadTest + * + */ + class BallThreadTest { + + /** + * Verify if the {@link BallThread} can be resumed + */ + @Test + void testSuspend() { + assertTimeout(ofMillis(5000), () -> { + final var ballThread = new BallThread(); + + final var ballItem = mock(BallItem.class); + ballThread.setTwin(ballItem); + + ballThread.start(); + sleep(200); + verify(ballItem, atLeastOnce()).draw(); + verify(ballItem, atLeastOnce()).move(); + ballThread.suspendMe(); + + sleep(1000); + + ballThread.stopMe(); + ballThread.join(); + + verifyNoMoreInteractions(ballItem); + }); + } + + /** + * Verify if the {@link BallThread} can be resumed + */ + @Test + void testResume() { + assertTimeout(ofMillis(5000), () -> { + final var ballThread = new BallThread(); + + final var ballItem = mock(BallItem.class); + ballThread.setTwin(ballItem); + + ballThread.suspendMe(); + ballThread.start(); + + sleep(1000); + + verifyNoMoreInteractions(ballItem); + + ballThread.resumeMe(); + sleep(300); + verify(ballItem, atLeastOnce()).draw(); + verify(ballItem, atLeastOnce()).move(); + + ballThread.stopMe(); + ballThread.join(); + + verifyNoMoreInteractions(ballItem); + }); + + } + + + + + /** + + * Verify if the {@link BallThread} can be stopped + + */ + + @Test + + void testStopped() { + + assertTimeout(ofMillis(5000), () -> { + + final var ballThread = new BallThread(); + + final var twin = mock(BallItem.class); + + ballThread.setTwin(twin); + + + ballThread.start(); + + + + + + + + sleep(300); + + verify(twin, atLeastOnce()).draw(); + + verify(twin, atLeastOnce()).move(); + + + + + // Stop the thread + + ballThread.stopMe(); + + ballThread.join(); + + + + + // Ensure that the thread has stopped and no more interactions occur + + verifyNoMoreInteractions(twin); + + }); + + } + + } \ No newline at end of file