From ef2ee6d720532a2f932ba1df5acac470f731a1fa Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Sat, 27 Sep 2025 18:09:38 -0500 Subject: [PATCH 1/6] First draft --- examples/schedule-config.yaml | 58 +++++ gateway-ha/pom.xml | 6 + .../config/ClusterSchedulerConfiguration.java | 47 ++++ .../ha/config/HaGatewayConfiguration.java | 13 ++ .../ha/config/ScheduleConfiguration.java | 101 +++++++++ .../ha/module/ClusterSchedulerModule.java | 61 ++++++ .../ha/scheduler/ClusterScheduler.java | 200 ++++++++++++++++++ 7 files changed, 486 insertions(+) create mode 100644 examples/schedule-config.yaml create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java diff --git a/examples/schedule-config.yaml b/examples/schedule-config.yaml new file mode 100644 index 000000000..b9877ff95 --- /dev/null +++ b/examples/schedule-config.yaml @@ -0,0 +1,58 @@ +# Example configuration for time-based cluster activation +# This file shows how to configure cluster schedules for the Trino Gateway + +# Main configuration +server: + applicationConnectors: + - type: http + port: 8080 + adminConnectors: + - type: http + port: 8081 + +# Database configuration +dataStore: + jdbcUrl: jdbc:h2:./gateway-ha/gateway-ha;DB_CLOSE_DELAY=-1;MODE=MYSQL + user: sa + password: "" + driverClass: org.h2.Driver + +# Monitor configuration +monitor: + active: true + connectionTimeout: 10s + refreshInterval: 10s + backends: ["trino-1", "trino-2"] + +# Backend configurations +backends: + - name: trino-1 + proxyTo: http://localhost:8080 + active: true + routingGroup: adhoc + externalUrl: http://localhost:8080 + - name: trino-2 + proxyTo: http://localhost:8081 + active: true + routingGroup: adhoc + externalUrl: http://localhost:8081 + +# Schedule configuration for time-based activation +scheduleConfiguration: + enabled: true + checkInterval: 5m # Check every 5 minutes + schedules: + - clusterName: trino-1 + cronExpression: "0 0 9-17 * * ?" # Active from 9 AM to 5 PM every day + activeDuringCron: true # Active when cron matches + - clusterName: trino-2 + cronExpression: "0 0 17-9 * * ?" # Active from 5 PM to 9 AM every day + activeDuringCron: true # Active when cron matches + +# Authentication configuration (example) +authentication: + type: none + +# Authorization configuration (example) +authorization: + type: none diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 9fe50911c..1b34b35ea 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -272,6 +272,12 @@ jmxutils + + com.cronutils + cron-utils + 9.2.1 + + com.mysql mysql-connector-j diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java new file mode 100644 index 000000000..2831ea005 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +import com.google.inject.Inject; +import io.trino.gateway.ha.scheduler.ClusterScheduler; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import javax.annotation.Nullable; + +public class ClusterSchedulerConfiguration { + private final ClusterScheduler scheduler; + + @Inject + public ClusterSchedulerConfiguration(@Nullable ClusterScheduler scheduler) { + this.scheduler = scheduler; + } + + @PostConstruct + public void start() { + if (scheduler != null) { + scheduler.start(); + } + } + + @PreDestroy + public void stop() { + if (scheduler != null) { + try { + scheduler.close(); + } catch (Exception e) { + // Ignore + } + } + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java index 61a859ce7..ba30e87b2 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java @@ -40,6 +40,7 @@ public class HaGatewayConfiguration private List extraWhitelistPaths = new ArrayList<>(); private OAuth2GatewayCookieConfiguration oauth2GatewayCookieConfiguration = new OAuth2GatewayCookieConfiguration(); private GatewayCookieConfiguration gatewayCookieConfiguration = new GatewayCookieConfiguration(); + private ScheduleConfiguration scheduleConfiguration = new ScheduleConfiguration(); private List statementPaths = ImmutableList.of(V1_STATEMENT_PATH); private boolean includeClusterHostInResponse; private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration(); @@ -191,6 +192,18 @@ public OAuth2GatewayCookieConfiguration getOauth2GatewayCookieConfiguration() return oauth2GatewayCookieConfiguration; } + @JsonProperty + public ScheduleConfiguration getScheduleConfiguration() + { + return scheduleConfiguration; + } + + @JsonProperty + public void setScheduleConfiguration(ScheduleConfiguration scheduleConfiguration) + { + this.scheduleConfiguration = scheduleConfiguration; + } + public void setOauth2GatewayCookieConfiguration(OAuth2GatewayCookieConfiguration oauth2GatewayCookieConfiguration) { this.oauth2GatewayCookieConfiguration = oauth2GatewayCookieConfiguration; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java new file mode 100644 index 000000000..18e6d3cb4 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.units.Duration; +import java.util.List; + +public class ScheduleConfiguration { + private boolean enabled = false; + private Duration checkInterval = new Duration(5, java.util.concurrent.TimeUnit.MINUTES); + private String timezone = "GMT"; // Default to GMT if not specified + private List schedules; + + @JsonProperty + public boolean isEnabled() { + return enabled; + } + + @JsonProperty + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @JsonProperty + public Duration getCheckInterval() { + return checkInterval; + } + + @JsonProperty + public void setCheckInterval(Duration checkInterval) { + this.checkInterval = checkInterval; + } + + @JsonProperty + public String getTimezone() { + return timezone; + } + + @JsonProperty + public void setTimezone(String timezone) { + this.timezone = timezone; + } + + @JsonProperty + public List getSchedules() { + return schedules; + } + + @JsonProperty + public void setSchedules(List schedules) { + this.schedules = schedules; + } + + public static class ClusterSchedule { + private String clusterName; + private String cronExpression; + private boolean activeDuringCron; + + @JsonProperty + public String getClusterName() { + return clusterName; + } + + @JsonProperty + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + @JsonProperty + public String getCronExpression() { + return cronExpression; + } + + @JsonProperty + public void setCronExpression(String cronExpression) { + this.cronExpression = cronExpression; + } + + @JsonProperty + public boolean isActiveDuringCron() { + return activeDuringCron; + } + + @JsonProperty + public void setActiveDuringCron(boolean activeDuringCron) { + this.activeDuringCron = activeDuringCron; + } + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java new file mode 100644 index 000000000..2bb7e7cb4 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.module; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import io.airlift.log.Logger; +import io.trino.gateway.ha.config.HaGatewayConfiguration; +import io.trino.gateway.ha.config.ScheduleConfiguration; +import io.trino.gateway.ha.router.GatewayBackendManager; +import io.trino.gateway.ha.scheduler.ClusterScheduler; +import jakarta.inject.Singleton; + +public class ClusterSchedulerModule extends AbstractModule { + private static final Logger logger = Logger.get(ClusterSchedulerModule.class); + private final HaGatewayConfiguration configuration; + + // We require all modules to take HaGatewayConfiguration as the only parameter + public ClusterSchedulerModule(HaGatewayConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void configure() { + // Configuration-based binding is handled in the provider methods + if (configuration.getScheduleConfiguration() != null + && configuration.getScheduleConfiguration().isEnabled()) { + logger.info("ClusterScheduler configuration is enabled"); + } else { + logger.info("ClusterScheduler is disabled or not configured"); + } + } + + @Provides + @Singleton + public ScheduleConfiguration provideScheduleConfiguration() { + return configuration.getScheduleConfiguration(); + } + + @Provides + @Singleton + public ClusterScheduler provideClusterScheduler( + GatewayBackendManager backendManager, ScheduleConfiguration scheduleConfiguration) { + if (scheduleConfiguration == null || !scheduleConfiguration.isEnabled()) { + return null; + } + logger.info("Creating ClusterScheduler instance"); + return new ClusterScheduler(backendManager, scheduleConfiguration); + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java new file mode 100644 index 000000000..2b6e3cef8 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java @@ -0,0 +1,200 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.scheduler; + +import com.cronutils.model.Cron; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; +import com.cronutils.parser.CronParser; +import com.google.common.annotations.VisibleForTesting; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.config.ScheduleConfiguration; +import io.trino.gateway.ha.router.GatewayBackendManager; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.inject.Inject; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterScheduler implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(ClusterScheduler.class); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final GatewayBackendManager backendManager; + private final ScheduleConfiguration config; + private final Map executionTimes = new ConcurrentHashMap<>(); + private final CronParser cronParser; + private final ZoneId timezone; + + @Inject + public ClusterScheduler(GatewayBackendManager backendManager, ScheduleConfiguration config) { + this.backendManager = backendManager; + this.config = config; + CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); + this.cronParser = new CronParser(cronDefinition); + + // Initialize timezone from config, default to GMT if not specified or invalid + ZoneId configuredTimezone; + try { + String timezoneStr = config.getTimezone(); + if (timezoneStr == null || timezoneStr.trim().isEmpty()) { + configuredTimezone = ZoneId.of("GMT"); + log.info("No timezone specified in configuration, using default: GMT"); + } else { + configuredTimezone = ZoneId.of(timezoneStr); + log.info("Using configured timezone: {}", timezoneStr); + } + } catch (Exception e) { + configuredTimezone = ZoneId.of("GMT"); + log.warn( + "Invalid timezone '{}' in configuration, falling back to GMT", config.getTimezone(), e); + } + this.timezone = configuredTimezone; + } + + @PostConstruct + public void start() { + if (!config.isEnabled()) { + log.info("Cluster scheduling is disabled"); + return; + } + + // Initialize execution times + for (ScheduleConfiguration.ClusterSchedule schedule : config.getSchedules()) { + try { + Cron cron = cronParser.parse(schedule.getCronExpression()); + executionTimes.put(schedule.getClusterName(), ExecutionTime.forCron(cron)); + log.info( + "Scheduled cluster {} with cron expression: {}, activeDuringCron: {}", + schedule.getClusterName(), + schedule.getCronExpression(), + schedule.isActiveDuringCron()); + } catch (Exception e) { + log.error( + "Failed to parse cron expression for cluster {}: {}", + schedule.getClusterName(), + schedule.getCronExpression(), + e); + } + } + + // Schedule the task + scheduler.scheduleWithFixedDelay( + this::checkAndUpdateClusterStatus, + 0, + (long) config.getCheckInterval().toMillis(), + TimeUnit.MILLISECONDS); + + log.info( + "Started cluster scheduler with check interval: {} (using {} timezone)", + config.getCheckInterval(), + timezone); + } + + @VisibleForTesting + void checkAndUpdateClusterStatus() { + try { + ZonedDateTime now = ZonedDateTime.now(timezone); + log.debug("Checking cluster status at: {} ({})", now, timezone); + + for (Map.Entry entry : executionTimes.entrySet()) { + String clusterName = entry.getKey(); + ExecutionTime executionTime = entry.getValue(); + + // Find the schedule for this cluster + Optional scheduleOpt = + config.getSchedules().stream() + .filter(s -> s.getClusterName().equals(clusterName)) + .findFirst(); + + if (scheduleOpt.isEmpty()) { + log.warn("No schedule configuration found for cluster: {}", clusterName); + continue; + } + + ScheduleConfiguration.ClusterSchedule schedule = scheduleOpt.get(); + boolean cronMatches = executionTime.isMatch(now); + boolean shouldBeActive = cronMatches == schedule.isActiveDuringCron(); + + log.info( + "Cluster: {}, cronMatches: {}, activeDuringCron: {}, shouldBeActive: {}", + clusterName, + cronMatches, + schedule.isActiveDuringCron(), + shouldBeActive); + + // Update cluster status if needed + Optional clusterOpt = + backendManager.getBackendByName(clusterName); + if (clusterOpt.isPresent()) { + ProxyBackendConfiguration cluster = clusterOpt.get(); + boolean currentlyActive = cluster.isActive(); + + log.debug( + "Cluster: {}, currentlyActive: {}, shouldBeActive: {}", + clusterName, + currentlyActive, + shouldBeActive); + + if (currentlyActive != shouldBeActive) { + if (shouldBeActive) { + backendManager.activateBackend(clusterName); + log.info( + "Activated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + clusterName, + cronMatches, + schedule.isActiveDuringCron()); + } else { + backendManager.deactivateBackend(clusterName); + log.info( + "Deactivated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + clusterName, + cronMatches, + schedule.isActiveDuringCron()); + } + } else { + log.debug("Cluster {} status unchanged: active={}", clusterName, currentlyActive); + } + } else { + log.warn("Cluster {} not found in backend manager", clusterName); + } + } + } catch (Exception e) { + log.error("Error in cluster scheduler task", e); + } + } + + @PreDestroy + @Override + public void close() { + scheduler.shutdownNow(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Cluster scheduler did not terminate in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for scheduler to terminate", e); + } + } +} From 0876dcdb6189122d7e1bdfa5d9ad6bf4e0110386 Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Sat, 27 Sep 2025 18:14:54 -0500 Subject: [PATCH 2/6] fmt --- .../config/ClusterSchedulerConfiguration.java | 18 ++-- .../ha/config/ScheduleConfiguration.java | 60 +++++++++----- .../ha/module/ClusterSchedulerModule.java | 32 ++++--- .../ha/scheduler/ClusterScheduler.java | 83 ++++++++++--------- 4 files changed, 118 insertions(+), 75 deletions(-) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java index 2831ea005..f630081cd 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java @@ -17,29 +17,37 @@ import io.trino.gateway.ha.scheduler.ClusterScheduler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; + import javax.annotation.Nullable; -public class ClusterSchedulerConfiguration { +import static java.util.Objects.requireNonNull; + +public class ClusterSchedulerConfiguration +{ private final ClusterScheduler scheduler; @Inject - public ClusterSchedulerConfiguration(@Nullable ClusterScheduler scheduler) { + public ClusterSchedulerConfiguration(@Nullable ClusterScheduler scheduler) + { this.scheduler = scheduler; } @PostConstruct - public void start() { + public void start() + { if (scheduler != null) { scheduler.start(); } } @PreDestroy - public void stop() { + public void stop() + { if (scheduler != null) { try { scheduler.close(); - } catch (Exception e) { + } + catch (Exception e) { // Ignore } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java index 18e6d3cb4..94472786a 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java @@ -15,86 +15,106 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.airlift.units.Duration; + import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; -public class ScheduleConfiguration { +public class ScheduleConfiguration +{ private boolean enabled = false; private Duration checkInterval = new Duration(5, java.util.concurrent.TimeUnit.MINUTES); private String timezone = "GMT"; // Default to GMT if not specified private List schedules; @JsonProperty - public boolean isEnabled() { + public boolean isEnabled() + { return enabled; } @JsonProperty - public void setEnabled(boolean enabled) { + public void setEnabled(boolean enabled) + { this.enabled = enabled; } @JsonProperty - public Duration getCheckInterval() { + public Duration getCheckInterval() + { return checkInterval; } @JsonProperty - public void setCheckInterval(Duration checkInterval) { - this.checkInterval = checkInterval; + public void setCheckInterval(Duration checkInterval) + { + this.checkInterval = requireNonNull(checkInterval, "checkInterval is null"); } @JsonProperty - public String getTimezone() { + public String getTimezone() + { return timezone; } @JsonProperty - public void setTimezone(String timezone) { - this.timezone = timezone; + public void setTimezone(String timezone) + { + this.timezone = requireNonNull(timezone, "timezone is null"); } @JsonProperty - public List getSchedules() { + public List getSchedules() + { return schedules; } @JsonProperty - public void setSchedules(List schedules) { + public void setSchedules(List schedules) + { this.schedules = schedules; } - public static class ClusterSchedule { + public static class ClusterSchedule + { private String clusterName; private String cronExpression; private boolean activeDuringCron; @JsonProperty - public String getClusterName() { + public String getClusterName() + { return clusterName; } @JsonProperty - public void setClusterName(String clusterName) { - this.clusterName = clusterName; + public void setClusterName(String clusterName) + { + this.clusterName = requireNonNull(clusterName, "clusterName is null"); } @JsonProperty - public String getCronExpression() { + public String getCronExpression() + { return cronExpression; } @JsonProperty - public void setCronExpression(String cronExpression) { - this.cronExpression = cronExpression; + public void setCronExpression(String cronExpression) + { + this.cronExpression = requireNonNull(cronExpression, "cronExpression is null"); } @JsonProperty - public boolean isActiveDuringCron() { + public boolean isActiveDuringCron() + { return activeDuringCron; } @JsonProperty - public void setActiveDuringCron(boolean activeDuringCron) { + public void setActiveDuringCron(boolean activeDuringCron) + { this.activeDuringCron = activeDuringCron; } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java index 2bb7e7cb4..85642842a 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterSchedulerModule.java @@ -22,40 +22,50 @@ import io.trino.gateway.ha.scheduler.ClusterScheduler; import jakarta.inject.Singleton; -public class ClusterSchedulerModule extends AbstractModule { - private static final Logger logger = Logger.get(ClusterSchedulerModule.class); +import static java.util.Objects.requireNonNull; + +public class ClusterSchedulerModule + extends AbstractModule +{ + private static final Logger log = Logger.get(ClusterSchedulerModule.class); private final HaGatewayConfiguration configuration; // We require all modules to take HaGatewayConfiguration as the only parameter - public ClusterSchedulerModule(HaGatewayConfiguration configuration) { - this.configuration = configuration; + public ClusterSchedulerModule(HaGatewayConfiguration configuration) + { + this.configuration = requireNonNull(configuration, "configuration is null"); } @Override - public void configure() { + public void configure() + { // Configuration-based binding is handled in the provider methods if (configuration.getScheduleConfiguration() != null && configuration.getScheduleConfiguration().isEnabled()) { - logger.info("ClusterScheduler configuration is enabled"); - } else { - logger.info("ClusterScheduler is disabled or not configured"); + log.info("ClusterScheduler configuration is enabled"); + } + else { + log.info("ClusterScheduler is disabled or not configured"); } } @Provides @Singleton - public ScheduleConfiguration provideScheduleConfiguration() { + public ScheduleConfiguration provideScheduleConfiguration() + { return configuration.getScheduleConfiguration(); } @Provides @Singleton public ClusterScheduler provideClusterScheduler( - GatewayBackendManager backendManager, ScheduleConfiguration scheduleConfiguration) { + GatewayBackendManager backendManager, + ScheduleConfiguration scheduleConfiguration) + { if (scheduleConfiguration == null || !scheduleConfiguration.isEnabled()) { return null; } - logger.info("Creating ClusterScheduler instance"); + log.info("Creating ClusterScheduler instance"); return new ClusterScheduler(backendManager, scheduleConfiguration); } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java index 2b6e3cef8..9d2c00a37 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java @@ -26,6 +26,9 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Map; @@ -34,10 +37,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ClusterScheduler implements AutoCloseable { +public class ClusterScheduler + implements AutoCloseable +{ private static final Logger log = LoggerFactory.getLogger(ClusterScheduler.class); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final GatewayBackendManager backendManager; @@ -47,9 +50,10 @@ public class ClusterScheduler implements AutoCloseable { private final ZoneId timezone; @Inject - public ClusterScheduler(GatewayBackendManager backendManager, ScheduleConfiguration config) { - this.backendManager = backendManager; - this.config = config; + public ClusterScheduler(GatewayBackendManager backendManager, ScheduleConfiguration config) + { + this.backendManager = requireNonNull(backendManager, "backendManager is null"); + this.config = requireNonNull(config, "config is null"); CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); this.cronParser = new CronParser(cronDefinition); @@ -60,20 +64,22 @@ public ClusterScheduler(GatewayBackendManager backendManager, ScheduleConfigurat if (timezoneStr == null || timezoneStr.trim().isEmpty()) { configuredTimezone = ZoneId.of("GMT"); log.info("No timezone specified in configuration, using default: GMT"); - } else { + } + else { configuredTimezone = ZoneId.of(timezoneStr); log.info("Using configured timezone: {}", timezoneStr); } - } catch (Exception e) { + } + catch (Exception e) { configuredTimezone = ZoneId.of("GMT"); - log.warn( - "Invalid timezone '{}' in configuration, falling back to GMT", config.getTimezone(), e); + log.warn("Invalid timezone '{}' in configuration, falling back to GMT", config.getTimezone(), e); } this.timezone = configuredTimezone; } @PostConstruct - public void start() { + public void start() + { if (!config.isEnabled()) { log.info("Cluster scheduling is disabled"); return; @@ -84,14 +90,13 @@ public void start() { try { Cron cron = cronParser.parse(schedule.getCronExpression()); executionTimes.put(schedule.getClusterName(), ExecutionTime.forCron(cron)); - log.info( - "Scheduled cluster {} with cron expression: {}, activeDuringCron: {}", + log.info("Scheduled cluster {} with cron expression: {}, activeDuringCron: {}", schedule.getClusterName(), schedule.getCronExpression(), schedule.isActiveDuringCron()); - } catch (Exception e) { - log.error( - "Failed to parse cron expression for cluster {}: {}", + } + catch (Exception e) { + log.error("Failed to parse cron expression for cluster {}: {}", schedule.getClusterName(), schedule.getCronExpression(), e); @@ -105,14 +110,14 @@ public void start() { (long) config.getCheckInterval().toMillis(), TimeUnit.MILLISECONDS); - log.info( - "Started cluster scheduler with check interval: {} (using {} timezone)", + log.info("Started cluster scheduler with check interval: {} (using {} timezone)", config.getCheckInterval(), timezone); } @VisibleForTesting - void checkAndUpdateClusterStatus() { + void checkAndUpdateClusterStatus() + { try { ZonedDateTime now = ZonedDateTime.now(timezone); log.debug("Checking cluster status at: {} ({})", now, timezone); @@ -122,10 +127,9 @@ void checkAndUpdateClusterStatus() { ExecutionTime executionTime = entry.getValue(); // Find the schedule for this cluster - Optional scheduleOpt = - config.getSchedules().stream() - .filter(s -> s.getClusterName().equals(clusterName)) - .findFirst(); + Optional scheduleOpt = config.getSchedules().stream() + .filter(s -> s.getClusterName().equals(clusterName)) + .findFirst(); if (scheduleOpt.isEmpty()) { log.warn("No schedule configuration found for cluster: {}", clusterName); @@ -136,22 +140,19 @@ void checkAndUpdateClusterStatus() { boolean cronMatches = executionTime.isMatch(now); boolean shouldBeActive = cronMatches == schedule.isActiveDuringCron(); - log.info( - "Cluster: {}, cronMatches: {}, activeDuringCron: {}, shouldBeActive: {}", + log.info("Cluster: {}, cronMatches: {}, activeDuringCron: {}, shouldBeActive: {}", clusterName, cronMatches, schedule.isActiveDuringCron(), shouldBeActive); // Update cluster status if needed - Optional clusterOpt = - backendManager.getBackendByName(clusterName); + Optional clusterOpt = backendManager.getBackendByName(clusterName); if (clusterOpt.isPresent()) { ProxyBackendConfiguration cluster = clusterOpt.get(); boolean currentlyActive = cluster.isActive(); - log.debug( - "Cluster: {}, currentlyActive: {}, shouldBeActive: {}", + log.debug("Cluster: {}, currentlyActive: {}, shouldBeActive: {}", clusterName, currentlyActive, shouldBeActive); @@ -159,40 +160,44 @@ void checkAndUpdateClusterStatus() { if (currentlyActive != shouldBeActive) { if (shouldBeActive) { backendManager.activateBackend(clusterName); - log.info( - "Activated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + log.info("Activated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", clusterName, cronMatches, schedule.isActiveDuringCron()); - } else { + } + else { backendManager.deactivateBackend(clusterName); - log.info( - "Deactivated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + log.info("Deactivated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", clusterName, cronMatches, schedule.isActiveDuringCron()); } - } else { + } + else { log.debug("Cluster {} status unchanged: active={}", clusterName, currentlyActive); } - } else { + } + else { log.warn("Cluster {} not found in backend manager", clusterName); } } - } catch (Exception e) { + } + catch (Exception e) { log.error("Error in cluster scheduler task", e); } } @PreDestroy @Override - public void close() { + public void close() + { scheduler.shutdownNow(); try { if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { log.warn("Cluster scheduler did not terminate in time"); } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Interrupted while waiting for scheduler to terminate", e); } From efc3315a93d0612f49176011e833c33c7b5d8a45 Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Mon, 29 Sep 2025 20:24:10 -0500 Subject: [PATCH 3/6] Test class and format fix --- gateway-ha/pom.xml | 12 +- .../config/ClusterSchedulerConfiguration.java | 5 +- .../ha/config/HaGatewayConfiguration.java | 1 + .../ha/config/ScheduleConfiguration.java | 3 +- .../ha/scheduler/ClusterScheduler.java | 2 + .../ha/scheduler/TestClusterScheduler.java | 285 ++++++++++++++++++ 6 files changed, 296 insertions(+), 12 deletions(-) create mode 100644 gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 1b34b35ea..d7cc5c8a4 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -42,6 +42,12 @@ 0.22.1 + + com.cronutils + cron-utils + 9.2.1 + + com.fasterxml.jackson.core jackson-annotations @@ -272,12 +278,6 @@ jmxutils - - com.cronutils - cron-utils - 9.2.1 - - com.mysql mysql-connector-j diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java index f630081cd..59656a4de 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java @@ -15,13 +15,10 @@ import com.google.inject.Inject; import io.trino.gateway.ha.scheduler.ClusterScheduler; +import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import javax.annotation.Nullable; - -import static java.util.Objects.requireNonNull; - public class ClusterSchedulerConfiguration { private final ClusterScheduler scheduler; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java index ba30e87b2..0290b757e 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java @@ -13,6 +13,7 @@ */ package io.trino.gateway.ha.config; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java index 94472786a..7057bdbc4 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java @@ -17,13 +17,12 @@ import io.airlift.units.Duration; import java.util.List; -import java.util.Objects; import static java.util.Objects.requireNonNull; public class ScheduleConfiguration { - private boolean enabled = false; + private boolean enabled; private Duration checkInterval = new Duration(5, java.util.concurrent.TimeUnit.MINUTES); private String timezone = "GMT"; // Default to GMT if not specified private List schedules; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java index 9d2c00a37..ffa7dbdf0 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java @@ -38,6 +38,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + public class ClusterScheduler implements AutoCloseable { diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java new file mode 100644 index 000000000..37455f908 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java @@ -0,0 +1,285 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.scheduler; + +import com.cronutils.model.Cron; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; +import com.cronutils.parser.CronParser; +import io.airlift.units.Duration; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.config.ScheduleConfiguration; +import io.trino.gateway.ha.router.GatewayBackendManager; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TestClusterScheduler +{ + private static final String CLUSTER_NAME = "test-cluster"; + private static final String CRON_EXPRESSION = "0 0 9-17 * * ?"; // Every day from 9 AM to 5 PM + private static final ZoneId TEST_TIMEZONE = ZoneId.of("America/Los_Angeles"); + + @Mock + private GatewayBackendManager backendManager; + @Mock + private ScheduleConfiguration scheduleConfig; + @Mock + private ScheduleConfiguration.ClusterSchedule clusterSchedule; + @Mock + private ProxyBackendConfiguration backendConfig; + + private ClusterScheduler scheduler; + private ExecutionTime executionTime; + + @BeforeEach + void setUp() + { + // Set up test data + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); + when(scheduleConfig.getSchedules()).thenReturn(List.of(clusterSchedule)); + + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); + when(clusterSchedule.isActiveDuringCron()).thenReturn(true); + + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.of(backendConfig)); + + // Initialize the scheduler + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + + // Set up execution time for testing + CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); + CronParser parser = new CronParser(cronDefinition); + Cron cron = parser.parse(CRON_EXPRESSION); + executionTime = ExecutionTime.forCron(cron); + } + + @Test + void testSchedulerInitialization() + { + assertThat(scheduler).isNotNull(); + verify(scheduleConfig, times(1)).isEnabled(); + verify(scheduleConfig, times(1)).getSchedules(); + } + + @Test + void testClusterActivationWhenCronMatches() + { + // Given: Time is within the active window (9 AM - 5 PM) + ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) + .withHour(10) + .withMinute(0) + .withSecond(0); + + // When: Scheduler checks the status + when(backendConfig.isActive()).thenReturn(false); + // Set up the time to be used by the scheduler + ZonedDateTime originalNow = ZonedDateTime.now(TEST_TIMEZONE); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); + scheduler.checkAndUpdateClusterStatus(); + } + + // Then: Cluster should be activated + verify(backendManager).activateBackend(CLUSTER_NAME); + } + + @Test + void testClusterDeactivationWhenCronDoesNotMatch() + { + // Given: Time is outside the active window (before 9 AM) + ZonedDateTime inactiveTime = ZonedDateTime.now(TEST_TIMEZONE) + .withHour(8) + .withMinute(0) + .withSecond(0); + + // When: Scheduler checks the status + when(backendConfig.isActive()).thenReturn(true); + // Set up the time to be used by the scheduler + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(inactiveTime); + scheduler.checkAndUpdateClusterStatus(); + } + + // Then: Cluster should be deactivated + verify(backendManager).deactivateBackend(CLUSTER_NAME); + } + + @Test + void testNoActionWhenClusterStatusMatchesSchedule() + { + // Given: Time is within active window and cluster is already active + ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) + .withHour(10) + .withMinute(0) + .withSecond(0); + + // When: Scheduler checks the status + when(backendConfig.isActive()).thenReturn(true); + // Set up the time to be used by the scheduler + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); + scheduler.checkAndUpdateClusterStatus(); + } + + // Then: No action should be taken + verify(backendManager, never()).activateBackend(anyString()); + verify(backendManager, never()).deactivateBackend(anyString()); + } + + @Test + void testClusterNotFoundInBackendManager() + { + // Given: Cluster is not found in backend manager + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.empty()); + + // When: Scheduler checks the status + ZonedDateTime now = ZonedDateTime.now(TEST_TIMEZONE); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(now); + scheduler.checkAndUpdateClusterStatus(); + } + + // Then: No action should be taken + verify(backendManager, never()).activateBackend(anyString()); + verify(backendManager, never()).deactivateBackend(anyString()); + } + + @Test + void testSchedulerWithDifferentTimezones() + { + // Given: A different timezone + ZoneId newYorkTime = ZoneId.of("America/New_York"); + when(scheduleConfig.getTimezone()).thenReturn(newYorkTime.toString()); + + // When: Reinitialize scheduler with new timezone + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + + // Then: Scheduler should be initialized with the new timezone + ZonedDateTime testTime = ZonedDateTime.now(newYorkTime); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(newYorkTime)).thenReturn(testTime); + scheduler.checkAndUpdateClusterStatus(); + // Just verify the method was called, detailed testing is done in other tests + verify(backendManager).getBackendByName(CLUSTER_NAME); + } + } + + @Test + void testInvalidCronExpression() + { + // Given: An invalid cron expression + when(clusterSchedule.getCronExpression()).thenReturn("invalid-cron"); + + // When: Scheduler is initialized + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + + // Then: No action should be taken when checking status + ZonedDateTime now = ZonedDateTime.now(TEST_TIMEZONE); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(now); + scheduler.checkAndUpdateClusterStatus(); + verify(backendManager, never()).activateBackend(anyString()); + verify(backendManager, never()).deactivateBackend(anyString()); + } + } + + @Test + void testSchedulerWithMultipleClusters() + { + // Given: Multiple cluster schedules + String secondCluster = "another-cluster"; + ScheduleConfiguration.ClusterSchedule secondSchedule = new ScheduleConfiguration.ClusterSchedule(); + secondSchedule.setClusterName(secondCluster); + secondSchedule.setCronExpression("0 0 18-23 * * ?"); // 6 PM - 11 PM + secondSchedule.setActiveDuringCron(true); + + when(scheduleConfig.getSchedules()).thenReturn(List.of(clusterSchedule, secondSchedule)); + when(backendManager.getBackendByName(secondCluster)).thenReturn(Optional.of(backendConfig)); + + // When: Scheduler checks the status + ZonedDateTime testTime = ZonedDateTime.now(TEST_TIMEZONE).withHour(20); // 8 PM + when(backendConfig.isActive()).thenReturn(false); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(testTime); + scheduler.checkAndUpdateClusterStatus(); + } + + // Then: Both clusters should be checked + verify(backendManager).getBackendByName(CLUSTER_NAME); + verify(backendManager).getBackendByName(secondCluster); + // First cluster should be inactive at 8 PM, second should be active + verify(backendManager).deactivateBackend(CLUSTER_NAME); + verify(backendManager).activateBackend(secondCluster); + } + + @Test + void testSchedulerWithInvertedActiveLogic() + { + // Given: A schedule that's active when cron doesn't match + when(clusterSchedule.isActiveDuringCron()).thenReturn(false); + + // When: Time is within the cron window (9 AM - 5 PM) + ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) + .withHour(10) + .withMinute(0) + .withSecond(0); + + // Then: Cluster should be deactivated because activeDuringCron is false + when(backendConfig.isActive()).thenReturn(true); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); + scheduler.checkAndUpdateClusterStatus(); + verify(backendManager).deactivateBackend(CLUSTER_NAME); + } + + // When: Time is outside the cron window + ZonedDateTime inactiveTime = ZonedDateTime.now(TEST_TIMEZONE) + .withHour(18) + .withMinute(0) + .withSecond(0); + + // Then: Cluster should be activated because activeDuringCron is false + when(backendConfig.isActive()).thenReturn(false); + try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { + mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(inactiveTime); + scheduler.checkAndUpdateClusterStatus(); + verify(backendManager).activateBackend(CLUSTER_NAME); + } + } +} From 14f5b5215991c4f81ed65b24859b48828b40711f Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Tue, 7 Oct 2025 21:56:44 -0500 Subject: [PATCH 4/6] Test fixes and code compliance changes --- gateway-ha/pom.xml | 10 + .../ha/scheduler/ClusterScheduler.java | 22 +- .../ha/scheduler/TestClusterScheduler.java | 387 +++++++++++------- 3 files changed, 252 insertions(+), 167 deletions(-) diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index d7cc5c8a4..24d454b90 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -207,6 +207,11 @@ jakarta.annotation-api + + jakarta.inject + jakarta.inject-api + + jakarta.servlet jakarta.servlet-api @@ -273,6 +278,11 @@ 2.5.2.Final + + org.slf4j + slf4j-api + + org.weakref jmxutils diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java index ffa7dbdf0..3d6fc3888 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java @@ -19,7 +19,6 @@ import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; -import com.google.common.annotations.VisibleForTesting; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.ScheduleConfiguration; import io.trino.gateway.ha.router.GatewayBackendManager; @@ -44,7 +43,7 @@ public class ClusterScheduler implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(ClusterScheduler.class); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final GatewayBackendManager backendManager; private final ScheduleConfiguration config; private final Map executionTimes = new ConcurrentHashMap<>(); @@ -98,31 +97,28 @@ public void start() schedule.isActiveDuringCron()); } catch (Exception e) { - log.error("Failed to parse cron expression for cluster {}: {}", + log.error("Skipping cluster {} due to invalid cron expression '{}': {}", schedule.getClusterName(), schedule.getCronExpression(), - e); + e.getMessage()); } } // Schedule the task - scheduler.scheduleWithFixedDelay( - this::checkAndUpdateClusterStatus, + scheduler.scheduleAtFixedRate( + () -> checkAndUpdateClusterStatus(ZonedDateTime.now(timezone)), 0, - (long) config.getCheckInterval().toMillis(), + config.getCheckInterval().toMillis(), TimeUnit.MILLISECONDS); - log.info("Started cluster scheduler with check interval: {} (using {} timezone)", config.getCheckInterval(), timezone); } - @VisibleForTesting - void checkAndUpdateClusterStatus() + public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) { try { - ZonedDateTime now = ZonedDateTime.now(timezone); - log.debug("Checking cluster status at: {} ({})", now, timezone); + log.debug("Checking cluster status at: {} ({})", currentTime, timezone); for (Map.Entry entry : executionTimes.entrySet()) { String clusterName = entry.getKey(); @@ -139,7 +135,7 @@ void checkAndUpdateClusterStatus() } ScheduleConfiguration.ClusterSchedule schedule = scheduleOpt.get(); - boolean cronMatches = executionTime.isMatch(now); + boolean cronMatches = executionTime.isMatch(currentTime); boolean shouldBeActive = cronMatches == schedule.isActiveDuringCron(); log.info("Cluster: {}, cronMatches: {}, activeDuringCron: {}, shouldBeActive: {}", diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java index 37455f908..8a6b0cc28 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java @@ -13,35 +13,28 @@ */ package io.trino.gateway.ha.scheduler; -import com.cronutils.model.Cron; -import com.cronutils.model.CronType; -import com.cronutils.model.definition.CronDefinition; -import com.cronutils.model.definition.CronDefinitionBuilder; -import com.cronutils.model.time.ExecutionTime; -import com.cronutils.parser.CronParser; import io.airlift.units.Duration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.ScheduleConfiguration; import io.trino.gateway.ha.router.GatewayBackendManager; - -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -49,237 +42,323 @@ class TestClusterScheduler { private static final String CLUSTER_NAME = "test-cluster"; - private static final String CRON_EXPRESSION = "0 0 9-17 * * ?"; // Every day from 9 AM to 5 PM + // Match every minute from 9 AM to 5 PM to ensure the test time is always matched + // Unix cron format: minute hour day month day-of-week (5 parts) + private static final String CRON_EXPRESSION = "* 9-17 * * *"; // Every minute from 9 AM to 5 PM private static final ZoneId TEST_TIMEZONE = ZoneId.of("America/Los_Angeles"); @Mock private GatewayBackendManager backendManager; + @Mock private ScheduleConfiguration scheduleConfig; + @Mock private ScheduleConfiguration.ClusterSchedule clusterSchedule; + @Mock private ProxyBackendConfiguration backendConfig; private ClusterScheduler scheduler; - private ExecutionTime executionTime; @BeforeEach void setUp() { - // Set up test data + // Reset all mocks before each test to ensure clean state + reset(backendManager, scheduleConfig, clusterSchedule, backendConfig); + } + + private void setupTestCluster(boolean activeDuringCron) + { + // Setup for a test cluster - call this in tests that need it when(scheduleConfig.isEnabled()).thenReturn(true); - when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); - when(scheduleConfig.getSchedules()).thenReturn(List.of(clusterSchedule)); - + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule)); when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); - when(clusterSchedule.isActiveDuringCron()).thenReturn(true); + when(clusterSchedule.isActiveDuringCron()).thenReturn(activeDuringCron); + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(java.util.Optional.of(backendConfig)); - when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.of(backendConfig)); - - // Initialize the scheduler + // Initialize the scheduler with the test cluster scheduler = new ClusterScheduler(backendManager, scheduleConfig); - - // Set up execution time for testing - CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX); - CronParser parser = new CronParser(cronDefinition); - Cron cron = parser.parse(CRON_EXPRESSION); - executionTime = ExecutionTime.forCron(cron); + // Note: Don't start the scheduler here to avoid background executions during tests } @Test void testSchedulerInitialization() { + // Only mock what's actually needed in the constructor + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + + // Initialize the scheduler + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + assertThat(scheduler).isNotNull(); - verify(scheduleConfig, times(1)).isEnabled(); - verify(scheduleConfig, times(1)).getSchedules(); + verify(scheduleConfig).getTimezone(); } @Test void testClusterActivationWhenCronMatches() { - // Given: Time is within the active window (9 AM - 5 PM) - ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) - .withHour(10) - .withMinute(0) - .withSecond(0); + // Setup test cluster with activeDuringCron = true + setupTestCluster(true); - // When: Scheduler checks the status + // Initialize the scheduler + scheduler.start(); + + // Time within the cron schedule (9 AM - 5 PM) + ZonedDateTime activeTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); when(backendConfig.isActive()).thenReturn(false); - // Set up the time to be used by the scheduler - ZonedDateTime originalNow = ZonedDateTime.now(TEST_TIMEZONE); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); - scheduler.checkAndUpdateClusterStatus(); - } - - // Then: Cluster should be activated - verify(backendManager).activateBackend(CLUSTER_NAME); + + // Execute + scheduler.checkAndUpdateClusterStatus(activeTime); + + // Verify + // We expect at least one call to activateBackend + verify(backendManager, atLeastOnce()).activateBackend(CLUSTER_NAME); + // We expect exactly one call to getBackendByName + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + // We expect exactly one call to isActive + verify(backendConfig, atLeastOnce()).isActive(); } @Test void testClusterDeactivationWhenCronDoesNotMatch() { - // Given: Time is outside the active window (before 9 AM) - ZonedDateTime inactiveTime = ZonedDateTime.now(TEST_TIMEZONE) - .withHour(8) - .withMinute(0) - .withSecond(0); + // Setup test cluster with activeDuringCron = true + setupTestCluster(true); + + // Initialize the scheduler + scheduler.start(); - // When: Scheduler checks the status + // Time outside the cron schedule (before 9 AM) + ZonedDateTime inactiveTime = ZonedDateTime.of(2025, 9, 29, 8, 0, 0, 0, TEST_TIMEZONE); when(backendConfig.isActive()).thenReturn(true); - // Set up the time to be used by the scheduler - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(inactiveTime); - scheduler.checkAndUpdateClusterStatus(); - } - - // Then: Cluster should be deactivated - verify(backendManager).deactivateBackend(CLUSTER_NAME); + + // Execute + scheduler.checkAndUpdateClusterStatus(inactiveTime); + + // Verify + verify(backendManager, atLeastOnce()).deactivateBackend(CLUSTER_NAME); + // Verify getBackendByName is called at least once (actual implementation may call it multiple times) + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig, atLeastOnce()).isActive(); } @Test void testNoActionWhenClusterStatusMatchesSchedule() { - // Given: Time is within active window and cluster is already active - ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) - .withHour(10) - .withMinute(0) - .withSecond(0); + // Setup test cluster with activeDuringCron = true + setupTestCluster(true); + + // Initialize the scheduler (needed to parse cron expressions) + scheduler.start(); - // When: Scheduler checks the status + // Time within the cron schedule (9 AM - 5 PM) + ZonedDateTime activeTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); when(backendConfig.isActive()).thenReturn(true); - // Set up the time to be used by the scheduler - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); - scheduler.checkAndUpdateClusterStatus(); - } - // Then: No action should be taken + // Execute + scheduler.checkAndUpdateClusterStatus(activeTime); + + // Verify no action taken when status already matches verify(backendManager, never()).activateBackend(anyString()); - verify(backendManager, never()).deactivateBackend(anyString()); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig, atLeastOnce()).isActive(); } @Test void testClusterNotFoundInBackendManager() { - // Given: Cluster is not found in backend manager - when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.empty()); + // Setup test cluster with empty backend + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule)); + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); + when(clusterSchedule.isActiveDuringCron()).thenReturn(true); + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(java.util.Optional.empty()); + + // Initialize scheduler with the test configuration + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + + // Initialize the scheduler + scheduler.start(); + + // Time within the cron schedule (9 AM - 5 PM) + ZonedDateTime testTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); - // When: Scheduler checks the status - ZonedDateTime now = ZonedDateTime.now(TEST_TIMEZONE); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(now); - scheduler.checkAndUpdateClusterStatus(); - } + // Execute + scheduler.checkAndUpdateClusterStatus(testTime); - // Then: No action should be taken + // Verify no action taken when cluster not found verify(backendManager, never()).activateBackend(anyString()); verify(backendManager, never()).deactivateBackend(anyString()); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); } @Test void testSchedulerWithDifferentTimezones() { - // Given: A different timezone + // Set up timezone to New York ZoneId newYorkTime = ZoneId.of("America/New_York"); + // Setup test cluster with activeDuringCron = true + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); when(scheduleConfig.getTimezone()).thenReturn(newYorkTime.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule)); + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); + when(clusterSchedule.isActiveDuringCron()).thenReturn(true); + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(java.util.Optional.of(backendConfig)); - // When: Reinitialize scheduler with new timezone + // Initialize scheduler with the new timezone scheduler = new ClusterScheduler(backendManager, scheduleConfig); - // Then: Scheduler should be initialized with the new timezone - ZonedDateTime testTime = ZonedDateTime.now(newYorkTime); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(newYorkTime)).thenReturn(testTime); - scheduler.checkAndUpdateClusterStatus(); - // Just verify the method was called, detailed testing is done in other tests - verify(backendManager).getBackendByName(CLUSTER_NAME); - } + // Initialize the scheduler + scheduler.start(); + + // Time within the cron schedule (9 AM - 5 PM) in New York time + ZonedDateTime testTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, newYorkTime); + when(backendConfig.isActive()).thenReturn(false); + + // Execute + scheduler.checkAndUpdateClusterStatus(testTime); + + // Verify + verify(backendManager, atLeastOnce()).activateBackend(CLUSTER_NAME); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig, atLeastOnce()).isActive(); + verify(scheduleConfig, atLeastOnce()).getTimezone(); } @Test void testInvalidCronExpression() { - // Given: An invalid cron expression - when(clusterSchedule.getCronExpression()).thenReturn("invalid-cron"); + // Setup test cluster with an invalid cron expression (valid format but invalid values) + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule)); + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + // Using a cron expression with invalid values that will fail validation + when(clusterSchedule.getCronExpression()).thenReturn("99 25 32 13 8"); // Invalid: minute=99, hour=25, day=32, month=13, day-of-week=8 - // When: Scheduler is initialized + // Initialize scheduler with the test configuration scheduler = new ClusterScheduler(backendManager, scheduleConfig); - // Then: No action should be taken when checking status - ZonedDateTime now = ZonedDateTime.now(TEST_TIMEZONE); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(now); - scheduler.checkAndUpdateClusterStatus(); - verify(backendManager, never()).activateBackend(anyString()); - verify(backendManager, never()).deactivateBackend(anyString()); - } + // Initialize the scheduler - this should log an error but not throw + assertThatNoException().isThrownBy(() -> scheduler.start()); + + // Verify the error was logged (you might want to add a test logger to verify this) + // For now, we'll just verify the mocks were called as expected + verify(scheduleConfig).getSchedules(); + verify(clusterSchedule).getClusterName(); + verify(clusterSchedule, atLeastOnce()).getCronExpression(); + + // Verify no action taken due to invalid cron expression + verify(backendManager, never()).activateBackend(anyString()); + verify(backendManager, never()).deactivateBackend(anyString()); } @Test void testSchedulerWithMultipleClusters() { - // Given: Multiple cluster schedules - String secondCluster = "another-cluster"; - ScheduleConfiguration.ClusterSchedule secondSchedule = new ScheduleConfiguration.ClusterSchedule(); - secondSchedule.setClusterName(secondCluster); - secondSchedule.setCronExpression("0 0 18-23 * * ?"); // 6 PM - 11 PM - secondSchedule.setActiveDuringCron(true); - - when(scheduleConfig.getSchedules()).thenReturn(List.of(clusterSchedule, secondSchedule)); - when(backendManager.getBackendByName(secondCluster)).thenReturn(Optional.of(backendConfig)); - - // When: Scheduler checks the status - ZonedDateTime testTime = ZonedDateTime.now(TEST_TIMEZONE).withHour(20); // 8 PM + // Setup first cluster with activeDuringCron = true + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); + when(clusterSchedule.isActiveDuringCron()).thenReturn(true); + + // Setup second cluster with activeDuringCron = false + ScheduleConfiguration.ClusterSchedule secondCluster = mock(ScheduleConfiguration.ClusterSchedule.class); + String secondClusterName = "another-cluster"; + when(secondCluster.getClusterName()).thenReturn(secondClusterName); + when(secondCluster.getCronExpression()).thenReturn(CRON_EXPRESSION); + when(secondCluster.isActiveDuringCron()).thenReturn(false); + + // Setup backend configs + ProxyBackendConfiguration secondBackendConfig = mock(ProxyBackendConfiguration.class); + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(java.util.Optional.of(backendConfig)); + when(backendManager.getBackendByName(secondClusterName)).thenReturn(java.util.Optional.of(secondBackendConfig)); + + // Setup schedules and config + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule, secondCluster)); + + // Initialize scheduler with the test configuration + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + + // Initialize the scheduler + scheduler.start(); + + // Time within the cron schedule (9 AM - 5 PM) + ZonedDateTime testTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); when(backendConfig.isActive()).thenReturn(false); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(testTime); - scheduler.checkAndUpdateClusterStatus(); - } - - // Then: Both clusters should be checked - verify(backendManager).getBackendByName(CLUSTER_NAME); - verify(backendManager).getBackendByName(secondCluster); - // First cluster should be inactive at 8 PM, second should be active - verify(backendManager).deactivateBackend(CLUSTER_NAME); - verify(backendManager).activateBackend(secondCluster); + when(secondBackendConfig.isActive()).thenReturn(true); + + // Execute + scheduler.checkAndUpdateClusterStatus(testTime); + + // Verify first cluster is activated + verify(backendManager, atLeastOnce()).activateBackend(CLUSTER_NAME); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig, atLeastOnce()).isActive(); + + // Verify second cluster is deactivated + verify(backendManager, atLeastOnce()).deactivateBackend(secondClusterName); + verify(backendManager, atLeastOnce()).getBackendByName(secondClusterName); + verify(secondBackendConfig, atLeastOnce()).isActive(); } @Test void testSchedulerWithInvertedActiveLogic() { - // Given: A schedule that's active when cron doesn't match + // Setup test with activeDuringCron = false (inverted logic) + when(scheduleConfig.isEnabled()).thenReturn(true); + when(scheduleConfig.getCheckInterval()).thenReturn(new Duration(1, TimeUnit.MINUTES)); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + when(scheduleConfig.getSchedules()).thenReturn(java.util.List.of(clusterSchedule)); + when(clusterSchedule.getClusterName()).thenReturn(CLUSTER_NAME); + when(clusterSchedule.getCronExpression()).thenReturn(CRON_EXPRESSION); when(clusterSchedule.isActiveDuringCron()).thenReturn(false); + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.of(backendConfig)); - // When: Time is within the cron window (9 AM - 5 PM) - ZonedDateTime activeTime = ZonedDateTime.now(TEST_TIMEZONE) - .withHour(10) - .withMinute(0) - .withSecond(0); + // Initialize the scheduler + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + scheduler.start(); - // Then: Cluster should be deactivated because activeDuringCron is false + // Test 1: During cron time (10 AM) - should be INACTIVE due to inverted logic + ZonedDateTime activeTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); when(backendConfig.isActive()).thenReturn(true); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(activeTime); - scheduler.checkAndUpdateClusterStatus(); - verify(backendManager).deactivateBackend(CLUSTER_NAME); - } - - // When: Time is outside the cron window - ZonedDateTime inactiveTime = ZonedDateTime.now(TEST_TIMEZONE) - .withHour(18) - .withMinute(0) - .withSecond(0); - - // Then: Cluster should be activated because activeDuringCron is false + + // Execute + scheduler.checkAndUpdateClusterStatus(activeTime); + + // Verify cluster is deactivated when cron matches (inverted logic) + verify(backendManager, atLeastOnce()).deactivateBackend(CLUSTER_NAME); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig, atLeastOnce()).isActive(); + + // Reset mocks for second test + reset(backendManager, backendConfig); + + // Re-setup mocks for second test + when(backendManager.getBackendByName(CLUSTER_NAME)).thenReturn(Optional.of(backendConfig)); when(backendConfig.isActive()).thenReturn(false); - try (MockedStatic mocked = mockStatic(ZonedDateTime.class)) { - mocked.when(() -> ZonedDateTime.now(TEST_TIMEZONE)).thenReturn(inactiveTime); - scheduler.checkAndUpdateClusterStatus(); - verify(backendManager).activateBackend(CLUSTER_NAME); - } + + // Test 2: Outside cron time (after 5 PM) - should be ACTIVE due to inverted logic + ZonedDateTime inactiveTime = ZonedDateTime.of(2025, 9, 29, 18, 0, 0, 0, TEST_TIMEZONE); + + // Execute + scheduler.checkAndUpdateClusterStatus(inactiveTime); + + // Verify cluster is activated (inverted logic: active when cron doesn't match) + verify(backendManager).activateBackend(CLUSTER_NAME); + verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); + verify(backendConfig).isActive(); } } From 2a3672dcbbac6c049913039a3519bf96f3e5be6f Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Wed, 8 Oct 2025 12:36:29 -0500 Subject: [PATCH 5/6] addressing sourcery-ai --- .../config/ClusterSchedulerConfiguration.java | 6 ++++- .../ha/config/ScheduleConfiguration.java | 3 ++- .../ha/scheduler/TestClusterScheduler.java | 23 +++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java index 59656a4de..6fb0c47ea 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java @@ -18,9 +18,13 @@ import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClusterSchedulerConfiguration { + private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerConfiguration.class); + private final ClusterScheduler scheduler; @Inject @@ -45,7 +49,7 @@ public void stop() scheduler.close(); } catch (Exception e) { - // Ignore + log.error("Exception occurred while shutting down ClusterScheduler", e); } } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java index 7057bdbc4..d96e240b6 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ScheduleConfiguration.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.airlift.units.Duration; +import java.util.ArrayList; import java.util.List; import static java.util.Objects.requireNonNull; @@ -25,7 +26,7 @@ public class ScheduleConfiguration private boolean enabled; private Duration checkInterval = new Duration(5, java.util.concurrent.TimeUnit.MINUTES); private String timezone = "GMT"; // Default to GMT if not specified - private List schedules; + private List schedules = new ArrayList<>(); @JsonProperty public boolean isEnabled() diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java index 8a6b0cc28..1e34ede14 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/scheduler/TestClusterScheduler.java @@ -199,6 +199,29 @@ void testClusterNotFoundInBackendManager() verify(backendManager, atLeastOnce()).getBackendByName(CLUSTER_NAME); } + @Test + void testSchedulerDoesNotTriggerWhenDisabled() + { + // Setup test cluster with scheduling disabled + when(scheduleConfig.isEnabled()).thenReturn(false); + when(scheduleConfig.getTimezone()).thenReturn(TEST_TIMEZONE.toString()); + + // Initialize the scheduler with disabled configuration + scheduler = new ClusterScheduler(backendManager, scheduleConfig); + scheduler.start(); + + // Time within the cron schedule (9 AM - 5 PM) + ZonedDateTime testTime = ZonedDateTime.of(2025, 9, 29, 10, 0, 0, 0, TEST_TIMEZONE); + + // Execute + scheduler.checkAndUpdateClusterStatus(testTime); + + // Verify no actions are taken when scheduler is disabled + verify(backendManager, never()).activateBackend(anyString()); + verify(backendManager, never()).deactivateBackend(anyString()); + verify(backendManager, never()).getBackendByName(anyString()); + } + @Test void testSchedulerWithDifferentTimezones() { From ffbc046a6a7a4e3c8e427df06874a0ebdbdb0d81 Mon Sep 17 00:00:00 2001 From: Mahendran Vasagam Date: Wed, 8 Oct 2025 22:13:13 -0500 Subject: [PATCH 6/6] Replaces slf logger with airlift logger --- gateway-ha/pom.xml | 5 --- .../config/ClusterSchedulerConfiguration.java | 7 ++--- .../ha/scheduler/ClusterScheduler.java | 31 +++++++++---------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 24d454b90..617064e3b 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -278,11 +278,6 @@ 2.5.2.Final - - org.slf4j - slf4j-api - - org.weakref jmxutils diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java index 6fb0c47ea..dc4db48ec 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterSchedulerConfiguration.java @@ -14,16 +14,15 @@ package io.trino.gateway.ha.config; import com.google.inject.Inject; +import io.airlift.log.Logger; import io.trino.gateway.ha.scheduler.ClusterScheduler; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ClusterSchedulerConfiguration { - private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerConfiguration.class); + private static final Logger log = Logger.get(ClusterSchedulerConfiguration.class); private final ClusterScheduler scheduler; @@ -49,7 +48,7 @@ public void stop() scheduler.close(); } catch (Exception e) { - log.error("Exception occurred while shutting down ClusterScheduler", e); + log.error(e, "Exception occurred while shutting down ClusterScheduler"); } } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java index 3d6fc3888..8decfee3d 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/scheduler/ClusterScheduler.java @@ -19,14 +19,13 @@ import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; +import io.airlift.log.Logger; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.ScheduleConfiguration; import io.trino.gateway.ha.router.GatewayBackendManager; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -42,7 +41,7 @@ public class ClusterScheduler implements AutoCloseable { - private static final Logger log = LoggerFactory.getLogger(ClusterScheduler.class); + private static final Logger log = Logger.get(ClusterScheduler.class); private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final GatewayBackendManager backendManager; private final ScheduleConfiguration config; @@ -68,12 +67,12 @@ public ClusterScheduler(GatewayBackendManager backendManager, ScheduleConfigurat } else { configuredTimezone = ZoneId.of(timezoneStr); - log.info("Using configured timezone: {}", timezoneStr); + log.info("Using configured timezone: %s", timezoneStr); } } catch (Exception e) { configuredTimezone = ZoneId.of("GMT"); - log.warn("Invalid timezone '{}' in configuration, falling back to GMT", config.getTimezone(), e); + log.warn(e, "Invalid timezone '%s' in configuration, falling back to GMT", config.getTimezone()); } this.timezone = configuredTimezone; } @@ -110,7 +109,7 @@ public void start() 0, config.getCheckInterval().toMillis(), TimeUnit.MILLISECONDS); - log.info("Started cluster scheduler with check interval: {} (using {} timezone)", + log.info("Started cluster scheduler with check interval: %s (using %s timezone)", config.getCheckInterval(), timezone); } @@ -118,7 +117,7 @@ public void start() public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) { try { - log.debug("Checking cluster status at: {} ({})", currentTime, timezone); + log.debug("Checking cluster status at: %s (%s)", currentTime, timezone); for (Map.Entry entry : executionTimes.entrySet()) { String clusterName = entry.getKey(); @@ -130,7 +129,7 @@ public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) .findFirst(); if (scheduleOpt.isEmpty()) { - log.warn("No schedule configuration found for cluster: {}", clusterName); + log.warn("No schedule configuration found for cluster: %s", clusterName); continue; } @@ -138,7 +137,7 @@ public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) boolean cronMatches = executionTime.isMatch(currentTime); boolean shouldBeActive = cronMatches == schedule.isActiveDuringCron(); - log.info("Cluster: {}, cronMatches: {}, activeDuringCron: {}, shouldBeActive: {}", + log.info("Cluster: %s, cronMatches: %s, activeDuringCron: %s, shouldBeActive: %s", clusterName, cronMatches, schedule.isActiveDuringCron(), @@ -150,7 +149,7 @@ public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) ProxyBackendConfiguration cluster = clusterOpt.get(); boolean currentlyActive = cluster.isActive(); - log.debug("Cluster: {}, currentlyActive: {}, shouldBeActive: {}", + log.debug("Cluster: %s, currentlyActive: %s, shouldBeActive: %s", clusterName, currentlyActive, shouldBeActive); @@ -158,30 +157,30 @@ public void checkAndUpdateClusterStatus(ZonedDateTime currentTime) if (currentlyActive != shouldBeActive) { if (shouldBeActive) { backendManager.activateBackend(clusterName); - log.info("Activated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + log.info("Activated cluster %s based on schedule (cron match: %s, activeDuringCron: %s)", clusterName, cronMatches, schedule.isActiveDuringCron()); } else { backendManager.deactivateBackend(clusterName); - log.info("Deactivated cluster {} based on schedule (cron match: {}, activeDuringCron: {})", + log.info("Deactivated cluster %s based on schedule (cron match: %s, activeDuringCron: %s)", clusterName, cronMatches, schedule.isActiveDuringCron()); } } else { - log.debug("Cluster {} status unchanged: active={}", clusterName, currentlyActive); + log.debug("Cluster %s status unchanged: active=%s", clusterName, currentlyActive); } } else { - log.warn("Cluster {} not found in backend manager", clusterName); + log.warn("Cluster %s not found in backend manager", clusterName); } } } catch (Exception e) { - log.error("Error in cluster scheduler task", e); + log.error(e, "Error in cluster scheduler task"); } } @@ -197,7 +196,7 @@ public void close() } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.warn("Interrupted while waiting for scheduler to terminate", e); + log.warn(e, "Interrupted while waiting for scheduler to terminate"); } } }