diff --git a/src/main/java/org/entur/lamassu/cache/UpdateContinuityCache.java b/src/main/java/org/entur/lamassu/cache/UpdateContinuityCache.java new file mode 100644 index 00000000..19d0c316 --- /dev/null +++ b/src/main/java/org/entur/lamassu/cache/UpdateContinuityCache.java @@ -0,0 +1,25 @@ +package org.entur.lamassu.cache; + +import java.util.Date; + +/** + * Interface for tracking GBFS update continuity. + * Used to store timestamps of last successful updates to detect missed updates. + */ +public interface UpdateContinuityCache { + /** + * Get the timestamp of the last successful update. + * + * @param systemId ID of the system to check + * @return Timestamp of last update or null if no previous update exists + */ + Date getLastUpdateTime(String systemId); + + /** + * Store the timestamp of a successful update. + * + * @param systemId ID of the system being updated + * @param timestamp Timestamp of the successful update + */ + void setLastUpdateTime(String systemId, Date timestamp); +} diff --git a/src/main/java/org/entur/lamassu/cache/impl/RedisUpdateContinuityCache.java b/src/main/java/org/entur/lamassu/cache/impl/RedisUpdateContinuityCache.java new file mode 100644 index 00000000..613bceb2 --- /dev/null +++ b/src/main/java/org/entur/lamassu/cache/impl/RedisUpdateContinuityCache.java @@ -0,0 +1,27 @@ +package org.entur.lamassu.cache.impl; + +import java.util.Date; +import org.entur.lamassu.cache.UpdateContinuityCache; +import org.redisson.api.RMapCache; + +/** + * Redis-backed implementation of UpdateContinuityCache using Redisson. + */ +public class RedisUpdateContinuityCache implements UpdateContinuityCache { + + private final RMapCache cache; + + public RedisUpdateContinuityCache(RMapCache cache) { + this.cache = cache; + } + + @Override + public Date getLastUpdateTime(String systemId) { + return cache.get(systemId); + } + + @Override + public void setLastUpdateTime(String systemId, Date timestamp) { + cache.put(systemId, timestamp); + } +} diff --git a/src/main/java/org/entur/lamassu/config/cache/RedissonCacheConfig.java b/src/main/java/org/entur/lamassu/config/cache/RedissonCacheConfig.java index 109b81da..0c48402b 100644 --- a/src/main/java/org/entur/lamassu/config/cache/RedissonCacheConfig.java +++ b/src/main/java/org/entur/lamassu/config/cache/RedissonCacheConfig.java @@ -1,9 +1,12 @@ package org.entur.lamassu.config.cache; +import java.util.Date; import java.util.Set; import org.entur.gbfs.validation.model.ValidationResult; import org.entur.lamassu.cache.StationSpatialIndexId; +import org.entur.lamassu.cache.UpdateContinuityCache; import org.entur.lamassu.cache.VehicleSpatialIndexId; +import org.entur.lamassu.cache.impl.RedisUpdateContinuityCache; import org.entur.lamassu.config.project.LamassuProjectInfoConfiguration; import org.entur.lamassu.model.entities.GeofencingZones; import org.entur.lamassu.model.entities.PricingPlan; @@ -45,6 +48,8 @@ public class RedissonCacheConfig { public static final String STATION_SPATIAL_INDEX_KEY = "stationSpatialIndex"; public static final String VALIDATION_REPORTS_CACHE_KEY = "validationReportsCache"; public static final String CACHE_READY_KEY = "cacheReady"; + public static final String VEHICLE_STATUS_BASES_KEY = "vehicleStatusBases"; + public static final String STATION_STATUS_BASES_KEY = "stationStatusBases"; private final String serializationVersion; private final Config redissonConfig; @@ -188,4 +193,22 @@ public RListMultimap validationResultsCache( public RBucket cacheReady(RedissonClient redissonClient) { return redissonClient.getBucket(CACHE_READY_KEY + "_" + serializationVersion); } + + @Bean + public UpdateContinuityCache vehicleUpdateContinuityCache( + RedissonClient redissonClient + ) { + return new RedisUpdateContinuityCache( + redissonClient.getMapCache(VEHICLE_STATUS_BASES_KEY + "_" + serializationVersion) + ); + } + + @Bean + public UpdateContinuityCache stationUpdateContinuityCache( + RedissonClient redissonClient + ) { + return new RedisUpdateContinuityCache( + redissonClient.getMapCache(STATION_STATUS_BASES_KEY + "_" + serializationVersion) + ); + } } diff --git a/src/main/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTracker.java b/src/main/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTracker.java index b18f5371..7c37ce3e 100644 --- a/src/main/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTracker.java +++ b/src/main/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTracker.java @@ -1,36 +1,47 @@ package org.entur.lamassu.leader.entityupdater; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.entur.gbfs.loader.v3.GbfsV3Delivery; +import org.entur.lamassu.cache.UpdateContinuityCache; import org.springframework.stereotype.Component; /** * Service responsible for tracking continuity of GBFS updates for stations and vehicles. * This ensures that in case updates are missed, delta calculations will start from scratch. + * Uses Redis for persistence to survive application restarts. */ @Component public class GbfsUpdateContinuityTracker { - private final Map vehicleStatusBases = new ConcurrentHashMap<>(); - private final Map stationStatusBases = new ConcurrentHashMap<>(); + private final UpdateContinuityCache vehicleStatusBases; + private final UpdateContinuityCache stationStatusBases; + + public GbfsUpdateContinuityTracker( + UpdateContinuityCache vehicleUpdateContinuityCache, + UpdateContinuityCache stationUpdateContinuityCache + ) { + this.vehicleStatusBases = vehicleUpdateContinuityCache; + this.stationStatusBases = stationUpdateContinuityCache; + } /** * Check if there is continuity in vehicle status updates by comparing timestamps. * Returns false if updates have been missed, indicating we need to start delta calculations from scratch. */ public boolean hasVehicleUpdateContinuity(String systemId, GbfsV3Delivery oldDelivery) { - var previousBase = vehicleStatusBases.get(systemId); + var previousBase = vehicleStatusBases.getLastUpdateTime(systemId); if (oldDelivery.vehicleStatus() == null || previousBase == null) { return false; } - return previousBase.equals(oldDelivery.vehicleStatus().getLastUpdated().getTime()); + return previousBase.equals(oldDelivery.vehicleStatus().getLastUpdated()); } public void updateVehicleUpdateContinuity(String systemId, GbfsV3Delivery delivery) { - vehicleStatusBases.put(systemId, delivery.vehicleStatus().getLastUpdated().getTime()); + vehicleStatusBases.setLastUpdateTime( + systemId, + delivery.vehicleStatus().getLastUpdated() + ); } /** @@ -38,22 +49,22 @@ public void updateVehicleUpdateContinuity(String systemId, GbfsV3Delivery delive * Returns false if updates have been missed, indicating we need to start delta calculations from scratch. */ public boolean hasStationUpdateContinuity(String systemId, GbfsV3Delivery oldDelivery) { - var previousBase = stationStatusBases.get(systemId); + var previousBase = stationStatusBases.getLastUpdateTime(systemId); if (oldDelivery.stationStatus() == null || previousBase == null) { return false; } - return previousBase.equals(oldDelivery.stationStatus().getLastUpdated().getTime()); + return previousBase.equals(oldDelivery.stationStatus().getLastUpdated()); } public void updateStationUpdateContinuity( String systemId, GbfsV3Delivery nextDelivery ) { - stationStatusBases.put( + stationStatusBases.setLastUpdateTime( systemId, - nextDelivery.stationStatus().getLastUpdated().getTime() + nextDelivery.stationStatus().getLastUpdated() ); } } diff --git a/src/test/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTrackerTest.java b/src/test/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTrackerTest.java index 3f6cb2dd..ddded0ce 100644 --- a/src/test/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTrackerTest.java +++ b/src/test/java/org/entur/lamassu/leader/entityupdater/GbfsUpdateContinuityTrackerTest.java @@ -5,6 +5,7 @@ import java.util.Date; import org.entur.gbfs.loader.v3.GbfsV3Delivery; +import org.entur.lamassu.stubs.StubUpdateContinuityCache; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mobilitydata.gbfs.v3_0.station_status.GBFSStationStatus; @@ -14,10 +15,14 @@ class GbfsUpdateContinuityTrackerTest { private static final String SYSTEM_ID = "test-system"; private GbfsUpdateContinuityTracker tracker; + private StubUpdateContinuityCache vehicleCache; + private StubUpdateContinuityCache stationCache; @BeforeEach void setUp() { - tracker = new GbfsUpdateContinuityTracker(); + vehicleCache = new StubUpdateContinuityCache(); + stationCache = new StubUpdateContinuityCache(); + tracker = new GbfsUpdateContinuityTracker(vehicleCache, stationCache); } @Test diff --git a/src/test/java/org/entur/lamassu/stubs/StubUpdateContinuityCache.java b/src/test/java/org/entur/lamassu/stubs/StubUpdateContinuityCache.java new file mode 100644 index 00000000..e2bc61fa --- /dev/null +++ b/src/test/java/org/entur/lamassu/stubs/StubUpdateContinuityCache.java @@ -0,0 +1,47 @@ +/* + * + * + * * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by + * * the European Commission - subsequent versions of the EUPL (the "Licence"); + * * You may not use this work except in compliance with the Licence. + * * You may obtain a copy of the Licence at: + * * + * * https://joinup.ec.europa.eu/software/page/eupl + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the Licence is distributed on an "AS IS" basis, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the Licence for the specific language governing permissions and + * * limitations under the Licence. + * + */ + +package org.entur.lamassu.stubs; + +import org.entur.lamassu.cache.UpdateContinuityCache; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * In-memory stub implementation of UpdateContinuityCache for testing. + */ +public class StubUpdateContinuityCache implements UpdateContinuityCache { + + private final Map cache = new HashMap<>(); + + @Override + public Date getLastUpdateTime(String systemId) { + return cache.get(systemId); + } + + @Override + public void setLastUpdateTime(String systemId, Date timestamp) { + cache.put(systemId, timestamp); + } + + public void clear() { + cache.clear(); + } +}