Skip to content

Commit

Permalink
Make the update continuity tracker redis-backed
Browse files Browse the repository at this point in the history
  • Loading branch information
testower committed Jan 21, 2025
1 parent 6b0ca8d commit f242aa9
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 12 deletions.
25 changes: 25 additions & 0 deletions src/main/java/org/entur/lamassu/cache/UpdateContinuityCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.entur.lamassu.cache;

import java.util.Date;

/**
* Interface for tracking GBFS update continuity.
* Used to store timestamps of last successful updates to detect missed updates.
*/
public interface UpdateContinuityCache {
/**
* Get the timestamp of the last successful update.
*
* @param systemId ID of the system to check
* @return Timestamp of last update or null if no previous update exists
*/
Date getLastUpdateTime(String systemId);

/**
* Store the timestamp of a successful update.
*
* @param systemId ID of the system being updated
* @param timestamp Timestamp of the successful update
*/
void setLastUpdateTime(String systemId, Date timestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.entur.lamassu.cache.impl;

import java.util.Date;
import org.entur.lamassu.cache.UpdateContinuityCache;
import org.redisson.api.RMapCache;

/**
* Redis-backed implementation of UpdateContinuityCache using Redisson.
*/
public class RedisUpdateContinuityCache implements UpdateContinuityCache {

private final RMapCache<String, Date> cache;

public RedisUpdateContinuityCache(RMapCache<String, Date> cache) {
this.cache = cache;
}

@Override
public Date getLastUpdateTime(String systemId) {
return cache.get(systemId);
}

@Override
public void setLastUpdateTime(String systemId, Date timestamp) {
cache.put(systemId, timestamp);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.entur.lamassu.config.cache;

import java.util.Date;
import java.util.Set;
import org.entur.gbfs.validation.model.ValidationResult;
import org.entur.lamassu.cache.StationSpatialIndexId;
import org.entur.lamassu.cache.UpdateContinuityCache;
import org.entur.lamassu.cache.VehicleSpatialIndexId;
import org.entur.lamassu.cache.impl.RedisUpdateContinuityCache;
import org.entur.lamassu.config.project.LamassuProjectInfoConfiguration;
import org.entur.lamassu.model.entities.GeofencingZones;
import org.entur.lamassu.model.entities.PricingPlan;
Expand Down Expand Up @@ -45,6 +48,8 @@ public class RedissonCacheConfig {
public static final String STATION_SPATIAL_INDEX_KEY = "stationSpatialIndex";
public static final String VALIDATION_REPORTS_CACHE_KEY = "validationReportsCache";
public static final String CACHE_READY_KEY = "cacheReady";
public static final String VEHICLE_STATUS_BASES_KEY = "vehicleStatusBases";
public static final String STATION_STATUS_BASES_KEY = "stationStatusBases";

private final String serializationVersion;
private final Config redissonConfig;
Expand Down Expand Up @@ -188,4 +193,22 @@ public RListMultimap<String, ValidationResult> validationResultsCache(
public RBucket<Boolean> cacheReady(RedissonClient redissonClient) {
return redissonClient.getBucket(CACHE_READY_KEY + "_" + serializationVersion);
}

@Bean
public UpdateContinuityCache vehicleUpdateContinuityCache(
RedissonClient redissonClient
) {
return new RedisUpdateContinuityCache(
redissonClient.getMapCache(VEHICLE_STATUS_BASES_KEY + "_" + serializationVersion)
);
}

@Bean
public UpdateContinuityCache stationUpdateContinuityCache(
RedissonClient redissonClient
) {
return new RedisUpdateContinuityCache(
redissonClient.getMapCache(STATION_STATUS_BASES_KEY + "_" + serializationVersion)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,59 +1,70 @@
package org.entur.lamassu.leader.entityupdater;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.entur.gbfs.loader.v3.GbfsV3Delivery;
import org.entur.lamassu.cache.UpdateContinuityCache;
import org.springframework.stereotype.Component;

/**
* Service responsible for tracking continuity of GBFS updates for stations and vehicles.
* This ensures that in case updates are missed, delta calculations will start from scratch.
* Uses Redis for persistence to survive application restarts.
*/
@Component
public class GbfsUpdateContinuityTracker {

private final Map<String, Long> vehicleStatusBases = new ConcurrentHashMap<>();
private final Map<String, Long> stationStatusBases = new ConcurrentHashMap<>();
private final UpdateContinuityCache vehicleStatusBases;
private final UpdateContinuityCache stationStatusBases;

public GbfsUpdateContinuityTracker(
UpdateContinuityCache vehicleUpdateContinuityCache,
UpdateContinuityCache stationUpdateContinuityCache
) {
this.vehicleStatusBases = vehicleUpdateContinuityCache;
this.stationStatusBases = stationUpdateContinuityCache;
}

/**
* Check if there is continuity in vehicle status updates by comparing timestamps.
* Returns false if updates have been missed, indicating we need to start delta calculations from scratch.
*/
public boolean hasVehicleUpdateContinuity(String systemId, GbfsV3Delivery oldDelivery) {
var previousBase = vehicleStatusBases.get(systemId);
var previousBase = vehicleStatusBases.getLastUpdateTime(systemId);

if (oldDelivery.vehicleStatus() == null || previousBase == null) {
return false;
}

return previousBase.equals(oldDelivery.vehicleStatus().getLastUpdated().getTime());
return previousBase.equals(oldDelivery.vehicleStatus().getLastUpdated());
}

public void updateVehicleUpdateContinuity(String systemId, GbfsV3Delivery delivery) {
vehicleStatusBases.put(systemId, delivery.vehicleStatus().getLastUpdated().getTime());
vehicleStatusBases.setLastUpdateTime(
systemId,
delivery.vehicleStatus().getLastUpdated()
);
}

/**
* Check if there is continuity in station status updates by comparing timestamps.
* Returns false if updates have been missed, indicating we need to start delta calculations from scratch.
*/
public boolean hasStationUpdateContinuity(String systemId, GbfsV3Delivery oldDelivery) {
var previousBase = stationStatusBases.get(systemId);
var previousBase = stationStatusBases.getLastUpdateTime(systemId);

if (oldDelivery.stationStatus() == null || previousBase == null) {
return false;
}

return previousBase.equals(oldDelivery.stationStatus().getLastUpdated().getTime());
return previousBase.equals(oldDelivery.stationStatus().getLastUpdated());
}

public void updateStationUpdateContinuity(
String systemId,
GbfsV3Delivery nextDelivery
) {
stationStatusBases.put(
stationStatusBases.setLastUpdateTime(
systemId,
nextDelivery.stationStatus().getLastUpdated().getTime()
nextDelivery.stationStatus().getLastUpdated()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.Date;
import org.entur.gbfs.loader.v3.GbfsV3Delivery;
import org.entur.lamassu.stubs.StubUpdateContinuityCache;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mobilitydata.gbfs.v3_0.station_status.GBFSStationStatus;
Expand All @@ -14,10 +15,14 @@ class GbfsUpdateContinuityTrackerTest {

private static final String SYSTEM_ID = "test-system";
private GbfsUpdateContinuityTracker tracker;
private StubUpdateContinuityCache vehicleCache;
private StubUpdateContinuityCache stationCache;

@BeforeEach
void setUp() {
tracker = new GbfsUpdateContinuityTracker();
vehicleCache = new StubUpdateContinuityCache();
stationCache = new StubUpdateContinuityCache();
tracker = new GbfsUpdateContinuityTracker(vehicleCache, stationCache);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.stubs;

import org.entur.lamassu.cache.UpdateContinuityCache;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
* In-memory stub implementation of UpdateContinuityCache for testing.
*/
public class StubUpdateContinuityCache implements UpdateContinuityCache {

private final Map<String, Date> cache = new HashMap<>();

@Override
public Date getLastUpdateTime(String systemId) {
return cache.get(systemId);
}

@Override
public void setLastUpdateTime(String systemId, Date timestamp) {
cache.put(systemId, timestamp);
}

public void clear() {
cache.clear();
}
}

0 comments on commit f242aa9

Please sign in to comment.