Skip to content

Commit

Permalink
Removed stream
Browse files Browse the repository at this point in the history
  • Loading branch information
williamwoldum committed Oct 24, 2024
1 parent 70e1427 commit f70803c
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 28 deletions.
2 changes: 1 addition & 1 deletion AIS-protobuf
Submodule AIS-protobuf updated 1 files
+19 −0 ais.proto
17 changes: 15 additions & 2 deletions src/implementations/GRPCClientHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ILocation } from '../models/location'
export default class GRPCClientHandler implements IClientHandler {
constructor(private readonly client: AISServiceClientImpl) {}

async GetVesselInfo(request: { mmsi: number; timestamp: number }): Promise<IDetailedVessel> {
async getVesselInfo(request: { mmsi: number; timestamp: number }): Promise<IDetailedVessel> {
const grpcReq: VesselInfoRequest = {
mmsi: request.mmsi,
timestamp: request.timestamp,
Expand All @@ -30,7 +30,20 @@ export default class GRPCClientHandler implements IClientHandler {
return this.convertToDetailedVessel(response)
}

StartStreaming(request: {
async getSimpleVessles(request: { timestamp: number }): Promise<ISimpleVessel[]> {
const response = await this.client.GetSimpleVessels({ timestamp: request.timestamp })
return response.vessels.map(this.convertToSimpleVessel.bind(this))
}

async getMonitoredVessels(request: { timestamp: number; selection: ISelectionArea }): Promise<IMonitoredVessel[]> {
const response = await this.client.GetMonitoredVessels({
timestamp: request.timestamp,
selectedArea: request.selection.points,
})
return response.vessels.map(this.convertToMoniteredVessel.bind(this))
}

startStreaming(request: {
startTime: number
selection: ISelectionArea
timeSpeed: number
Expand Down
32 changes: 16 additions & 16 deletions src/implementations/StreamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ import { IMonitoredVessel } from '../models/monitoredVessel'
import { IPoint } from '../models/point'
import { ISimpleVessel } from '../models/simpleVessel'
import { IStreamManager } from '../interfaces/IStreamManager'
import { Subscription } from 'rxjs'

export default class StreamManager implements IStreamManager {
private allVessels: ISimpleVessel[] | undefined
private monitoredVessels: IMonitoredVessel[] | undefined
private subscription?: Subscription = undefined
private zone: IPoint[] = []

constructor(
Expand All @@ -26,29 +24,31 @@ export default class StreamManager implements IStreamManager {
this.monitoredVessels = vessels
}

public startStream() {
const stream = this.clientHandler.StartStreaming({
startTime: 1725844950,
selection: { points: this.zone },
timeSpeed: 1,
public async fetchNewVesselData() {
const simpleVessels = await this.clientHandler.getSimpleVessles({
timestamp: 1725844950,
})

this.subscription = stream.subscribe((data) => {
this.manageNewSimpleVessels(data.simpleVessels)
this.manageNewMonitoredVessels(data.monitoredVessels)
})
}
let monitoredVessels: IMonitoredVessel[] = []

if (this.zone.length >= 4) {
monitoredVessels = await this.clientHandler.getMonitoredVessels({
selection: { points: this.zone },
timestamp: 1725844950,
})
}
console.log(simpleVessels)
console.log(monitoredVessels)

public endStream() {
this.subscription?.unsubscribe()
this.manageNewSimpleVessels(simpleVessels)
this.manageNewMonitoredVessels(monitoredVessels)
}

public onMonitoringZoneChange(zone: IPoint[] | undefined) {
this.zone = zone || []
this.endStream()
this.setAllVessels(undefined)
this.setMonitoredVessels(undefined)
this.startStream()
this.fetchNewVesselData()
}

private manageNewSimpleVessels(vessels: ISimpleVessel[]) {
Expand Down
10 changes: 8 additions & 2 deletions src/interfaces/IClientHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ import { Observable } from 'rxjs'
import { IDetailedVessel } from '../models/detailedVessel'
import { ISelectionArea } from '../models/selectionArea'
import { IStreamResponse } from '../models/streamResponse'
import { IMonitoredVessel } from '../models/monitoredVessel'
import { ISimpleVessel } from '../models/simpleVessel'

// This interface abstracts away the client implementation for the backend api
// It should never use any GRPC models

export interface IClientHandler {
GetVesselInfo(request: { mmsi: number; timestamp: number }): Promise<IDetailedVessel>
StartStreaming(request: {
getVesselInfo(request: { mmsi: number; timestamp: number }): Promise<IDetailedVessel>
getSimpleVessles(request: { timestamp: number }): Promise<ISimpleVessel[]>
getMonitoredVessels(request: { timestamp: number; selection: ISelectionArea }): Promise<IMonitoredVessel[]>

//Deprecated cant close stream
startStreaming(request: {
startTime: number
selection: ISelectionArea
timeSpeed: number
Expand Down
3 changes: 1 addition & 2 deletions src/interfaces/IStreamManager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { IPoint } from '../models/point'

export interface IStreamManager {
startStream(): void
endStream(): void
fetchNewVesselData(): void
onMonitoringZoneChange(zone: IPoint[] | undefined): void
}
6 changes: 1 addition & 5 deletions src/pages/vesselMapPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ export default function VesselMapPage() {
const [streamManager] = useState(new StreamManager(clientHandler, setAllVessels, setMonitoredVessels))

useEffect(() => {
streamManager.startStream()
// Cleanup function to close the stream when the component unmounts
return () => {
streamManager.endStream()
}
streamManager.fetchNewVesselData()
}, [])

useEffect(() => {
Expand Down

0 comments on commit f70803c

Please sign in to comment.