Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
testower committed Jan 16, 2025
1 parent 6411b2e commit 68536a9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.entur.lamassu.model.entities.Station;
import org.entur.lamassu.model.provider.FeedProvider;
import org.entur.lamassu.service.SpatialIndexIdGeneratorService;
import org.jetbrains.annotations.NotNull;
import org.mobilitydata.gbfs.v3_0.station_information.GBFSData;
import org.mobilitydata.gbfs.v3_0.station_information.GBFSStationInformation;
import org.mobilitydata.gbfs.v3_0.station_status.GBFSStation;
Expand Down Expand Up @@ -99,35 +100,31 @@ public void addOrUpdateStations(
GBFSStationInformation stationInformationFeed
) {
if (delta.base() == null) {
var systemId = feedProvider.getSystemId();
var existingStations = stationCache.getAll();
var stationsToRemove = existingStations
.stream()
.filter(s -> systemId.equals(s.getSystemId()))
.toList();

if (!stationsToRemove.isEmpty()) {
logger.info(
"Removing {} existing stations for system {} due to null base",
stationsToRemove.size(),
systemId
);

var idsToRemove = stationsToRemove
.stream()
.map(Station::getId)
.collect(Collectors.toSet());
var spatialIdsToRemove = stationsToRemove
.stream()
.map(s -> spatialIndexService.createStationIndexId(s, feedProvider))
.collect(Collectors.toSet());

stationCache.removeAll(idsToRemove);
spatialIndex.removeAll(spatialIdsToRemove);
clearExistingEntities(feedProvider);
}

var stationInfo = extractStationInfo(stationInformationFeed);
UpdateContext context = new UpdateContext(feedProvider, stationInfo);

for (GBFSEntityDelta<GBFSStation> entityDelta : delta.entityDelta()) {
Station currentStation = stationCache.get(entityDelta.entityId());

if (entityDelta.type() == DeltaType.DELETE) {
processDeltaDelete(context, entityDelta, currentStation);
} else if (entityDelta.type() == DeltaType.CREATE) {
processDeltaCreate(context, entityDelta);
} else if (entityDelta.type() == DeltaType.UPDATE) {
processDeltaUpdate(context, entityDelta, currentStation);
}
}

var stationInfo = Optional
updateCaches(context);
}

private static @NotNull Map<String, org.mobilitydata.gbfs.v3_0.station_information.@NotNull GBFSStation> extractStationInfo(
GBFSStationInformation stationInformationFeed
) {
return Optional
.ofNullable(stationInformationFeed)
.map(GBFSStationInformation::getData)
.map(GBFSData::getStations)
Expand All @@ -139,22 +136,35 @@ public void addOrUpdateStations(
s -> s
)
);
}

UpdateContext context = new UpdateContext(feedProvider, stationInfo);
private void clearExistingEntities(FeedProvider feedProvider) {
var systemId = feedProvider.getSystemId();
var existingStations = stationCache.getAll();
var stationsToRemove = existingStations
.stream()
.filter(s -> systemId.equals(s.getSystemId()))
.toList();

if (!stationsToRemove.isEmpty()) {
logger.info(
"Removing {} existing stations for system {} due to null base",
stationsToRemove.size(),
systemId
);

for (GBFSEntityDelta<GBFSStation> entityDelta : delta.entityDelta()) {
Station currentStation = stationCache.get(entityDelta.entityId());
var idsToRemove = stationsToRemove
.stream()
.map(Station::getId)
.collect(Collectors.toSet());
var spatialIdsToRemove = stationsToRemove
.stream()
.map(s -> spatialIndexService.createStationIndexId(s, feedProvider))
.collect(Collectors.toSet());

if (entityDelta.type() == DeltaType.DELETE) {
processDeltaDelete(context, entityDelta, currentStation);
} else if (entityDelta.type() == DeltaType.CREATE) {
processDeltaCreate(context, entityDelta);
} else if (entityDelta.type() == DeltaType.UPDATE) {
processDeltaUpdate(context, entityDelta, currentStation);
}
stationCache.removeAll(idsToRemove);
spatialIndex.removeAll(spatialIdsToRemove);
}

updateCaches(context);
}

private void processDeltaDelete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,32 +91,7 @@ public void addOrUpdateVehicles(
GBFSFileDelta<GBFSVehicle> delta
) {
if (delta.base() == null) {
var systemId = feedProvider.getSystemId();
var existingVehicles = vehicleCache.getAll();
var vehiclesToRemove = existingVehicles
.stream()
.filter(v -> systemId.equals(v.getSystemId()))
.toList();

if (!vehiclesToRemove.isEmpty()) {
logger.info(
"Removing {} existing vehicles for system {} due to null base",
vehiclesToRemove.size(),
systemId
);

var idsToRemove = vehiclesToRemove
.stream()
.map(Vehicle::getId)
.collect(Collectors.toSet());
var spatialIdsToRemove = vehiclesToRemove
.stream()
.map(v -> spatialIndexService.createVehicleIndexId(v, feedProvider))
.collect(Collectors.toSet());

vehicleCache.removeAll(idsToRemove);
spatialIndex.removeAll(spatialIdsToRemove);
}
clearExistingEntities(feedProvider);
}

UpdateContext context = new UpdateContext(feedProvider);
Expand All @@ -134,6 +109,35 @@ public void addOrUpdateVehicles(
updateCaches(context);
}

private void clearExistingEntities(FeedProvider feedProvider) {
var systemId = feedProvider.getSystemId();
var existingVehicles = vehicleCache.getAll();
var vehiclesToRemove = existingVehicles
.stream()
.filter(v -> systemId.equals(v.getSystemId()))
.toList();

if (!vehiclesToRemove.isEmpty()) {
logger.info(
"Removing {} existing vehicles for system {} due to null base",
vehiclesToRemove.size(),
systemId
);

var idsToRemove = vehiclesToRemove
.stream()
.map(Vehicle::getId)
.collect(Collectors.toSet());
var spatialIdsToRemove = vehiclesToRemove
.stream()
.map(v -> spatialIndexService.createVehicleIndexId(v, feedProvider))
.collect(Collectors.toSet());

vehicleCache.removeAll(idsToRemove);
spatialIndex.removeAll(spatialIdsToRemove);
}
}

private void processDeltaDelete(
UpdateContext context,
GBFSEntityDelta<GBFSVehicle> entityDelta
Expand Down

0 comments on commit 68536a9

Please sign in to comment.