diff --git a/.gitignore b/.gitignore index 59d5118c..6f943036 100644 --- a/.gitignore +++ b/.gitignore @@ -80,8 +80,11 @@ third_party/whisper.cpp # whisper.cpp models *.bin -# ELF binary +# ELF binaries /skyeye +/skyeye.exe +/skyeye-scaler +/skyeye-scaler.exe # Local dev scripts ignored by default scripts/ diff --git a/cmd/skyeye-scaler/main.go b/cmd/skyeye-scaler/main.go index 9da10711..ef6be62b 100644 --- a/cmd/skyeye-scaler/main.go +++ b/cmd/skyeye-scaler/main.go @@ -15,6 +15,7 @@ import ( "github.com/dharmab/skyeye/internal/cli" "github.com/dharmab/skyeye/pkg/coalitions" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/dharmab/skyeye/pkg/simpleradio" srstypes "github.com/dharmab/skyeye/pkg/simpleradio/types" "github.com/lithammer/shortuuid/v3" @@ -76,7 +77,7 @@ func init() { scaler.Flags().DurationVar(&stopDelay, "stop-delay", 10*time.Minute, "Delay before sending stop requests after the SRS player count drops to 0") scaler.Flags().StringVar(&srsAddress, "srs-server-address", "localhost:5002", "Address of the SRS server") - scaler.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", 10*time.Second, "Connection timeout for SRS client") + scaler.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for SRS client") scaler.Flags().StringVar(&srsExternalAWACSModePassword, "srs-eam-password", "", "SRS external AWACS mode password") scaler.Flags().StringSliceVar(&srsFrequencies, "srs-frequencies", []string{"251.0AM", "133.0AM", "30.0FM"}, "List of SRS frequencies to use") } diff --git a/cmd/skyeye/main.go b/cmd/skyeye/main.go index b615eb9d..419117f0 100644 --- a/cmd/skyeye/main.go +++ b/cmd/skyeye/main.go @@ -29,6 +29,7 @@ import ( "github.com/dharmab/skyeye/internal/cli" "github.com/dharmab/skyeye/internal/conf" "github.com/dharmab/skyeye/pkg/coalitions" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/dharmab/skyeye/pkg/synthesizer/voices" "github.com/ggerganov/whisper.cpp/bindings/go/pkg/whisper" ) @@ -90,13 +91,13 @@ func init() { skyeye.Flags().StringVar(&acmiFile, "acmi-file", "", "path to ACMI file") skyeye.Flags().StringVar(&telemetryAddress, "telemetry-address", "localhost:42674", "Address of the real-time telemetry service") skyeye.MarkFlagsMutuallyExclusive("acmi-file", "telemetry-address") - skyeye.Flags().DurationVar(&telemetryConnectionTimeout, "telemetry-connection-timeout", 10*time.Second, "Connection timeout for real-time telemetry client") + skyeye.Flags().DurationVar(&telemetryConnectionTimeout, "telemetry-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for real-time telemetry client") skyeye.Flags().StringVar(&telemetryPassword, "telemetry-password", "", "Password for the real-time telemetry service") skyeye.Flags().DurationVar(&telemetryUpdateInterval, "telemetry-update-interval", 2*time.Second, "Interval at which trackfiles are updated from telemetry data") // SRS skyeye.Flags().StringVar(&srsAddress, "srs-server-address", "localhost:5002", "Address of the SRS server") - skyeye.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", 10*time.Second, "Connection timeout for SRS client") + skyeye.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for SRS client") skyeye.Flags().StringVar(&srsExternalAWACSModePassword, "srs-eam-password", "", "SRS external AWACS mode password") skyeye.Flags().StringSliceVar(&srsFrequencies, "srs-frequencies", []string{"251.0AM", "133.0AM", "30.0FM"}, "List of SRS frequencies to use") diff --git a/internal/application/app.go b/internal/application/app.go index 28f9040b..7cafca61 100644 --- a/internal/application/app.go +++ b/internal/application/app.go @@ -17,6 +17,7 @@ import ( "github.com/dharmab/skyeye/pkg/commands" "github.com/dharmab/skyeye/pkg/composer" "github.com/dharmab/skyeye/pkg/controller" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/dharmab/skyeye/pkg/parser" "github.com/dharmab/skyeye/pkg/radar" "github.com/dharmab/skyeye/pkg/recognizer" @@ -30,6 +31,7 @@ import ( "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" ) @@ -81,7 +83,15 @@ func NewApplication(config conf.Configuration) (*Application, error) { var chatListener *commands.ChatListener if config.EnableGRPC { log.Info().Str("address", config.GRPCAddress).Msg("constructing gRPC clients") - opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + // Add gRPC-level keepalive (not TCP keepalive) + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: skynet.GRPCKeepaliveTime, // Send gRPC keepalive ping every 30s + Timeout: skynet.GRPCKeepaliveTimeout, // Wait 10s for ping ack + PermitWithoutStream: true, // Send pings even without active streams + }), + } if config.GRPCAPIKey != "" { log.Info().Msg("configuring gRPC client connection with provided API key") opts = append(opts, grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -94,6 +104,8 @@ func NewApplication(config conf.Configuration) (*Application, error) { if err != nil { return nil, err } + // Note: gRPC connections are lazy - they dial on first RPC + // The keepalive params will detect dead connections missionClient := mission.NewMissionServiceClient(grpcClient) coalitionClient := grpccoalition.NewCoalitionServiceClient(grpcClient) netClient := net.NewNetServiceClient(grpcClient) diff --git a/pkg/net/timeouts.go b/pkg/net/timeouts.go new file mode 100644 index 00000000..edf2c3ba --- /dev/null +++ b/pkg/net/timeouts.go @@ -0,0 +1,84 @@ +// Package net provides common network configuration constants and utilities +// for the skyeye application. +// +// # Timeout Configuration +// +// The constants defined in this package represent recommended defaults that +// balance reliability and responsiveness for typical deployment scenarios. +// These defaults assume reasonably stable network conditions and may need +// adjustment based on your environment: +// +// - LAN deployments: Current defaults are suitable +// - WAN/Internet deployments: May need longer timeouts for connection establishment +// - High-latency links: Should increase timeout values proportionally +// - Unstable networks: May benefit from shorter timeouts with more aggressive retry logic +// +// # Usage +// +// Application code should use these constants as defaults for CLI flags, +// allowing operators to override based on their network conditions: +// +// flag.DurationVar(&timeout, "timeout", net.DefaultConnectionTimeout, "connection timeout") +// +// The package also provides helper functions for calculating related timeouts: +// +// - CalculateReadTimeout: Determines read timeout from connection timeout +// - CalculateDeadlineRefreshInterval: Determines how often to refresh deadlines +package net + +import "time" + +const ( + // DefaultConnectionTimeout is the default timeout for establishing network connections. + // This applies to initial connection attempts for TCP, UDP resolution, and similar operations. + DefaultConnectionTimeout = 30 * time.Second + + // ReadTimeoutMultiplier defines how much longer read timeouts should be compared to + // connection timeouts for streaming data. A multiplier of 2 allows for transient + // network delays while still detecting truly dead connections. + // + // Rationale: A 2x multiplier provides a balance between responsiveness to network + // issues and tolerance for normal network latency/congestion. Lower values may cause + // false positives during temporary network slowdowns; higher values delay detection + // of truly dead connections. + ReadTimeoutMultiplier = 2 + + // DeadlineRefreshDivisor determines how frequently read deadlines should be refreshed + // for long-lived streaming connections. The refresh interval is calculated as: + // readTimeout / DeadlineRefreshDivisor. A divisor of 2 means we refresh at the + // halfway point, ensuring the deadline never expires during active streaming. + // + // Rationale: Refreshing at the halfway point (divisor of 2) ensures the deadline is + // always fresh with minimal overhead. This prevents legitimate long-running streams + // from timing out while still allowing the full timeout period to detect dead connections. + DeadlineRefreshDivisor = 2 + + // ReconnectDelay is the delay before retrying a failed connection attempt. + // This applies to automatic reconnection logic after connection failures. + ReconnectDelay = 10 * time.Second + + // GRPCKeepaliveTime is the interval at which gRPC keepalive pings are sent. + // This matches the default connection timeout to detect dead connections quickly. + GRPCKeepaliveTime = 30 * time.Second + + // GRPCKeepaliveTimeout is how long to wait for a keepalive ping acknowledgment + // before considering the connection dead. + GRPCKeepaliveTimeout = 10 * time.Second + + // OpenAIHTTPTimeout is the timeout for HTTP requests to the OpenAI API. + // Audio transcription can be slow for large files, so this is set higher + // than typical HTTP timeouts. + OpenAIHTTPTimeout = 60 * time.Second +) + +// CalculateReadTimeout returns the recommended read timeout based on a connection timeout. +// This is useful for streaming connections where reads may take longer than initial connection. +func CalculateReadTimeout(connectionTimeout time.Duration) time.Duration { + return connectionTimeout * ReadTimeoutMultiplier +} + +// CalculateDeadlineRefreshInterval returns the interval at which read deadlines should +// be refreshed for long-lived streaming connections. +func CalculateDeadlineRefreshInterval(readTimeout time.Duration) time.Duration { + return readTimeout / DeadlineRefreshDivisor +} diff --git a/pkg/net/timeouts_test.go b/pkg/net/timeouts_test.go new file mode 100644 index 00000000..ab8cac83 --- /dev/null +++ b/pkg/net/timeouts_test.go @@ -0,0 +1,158 @@ +package net + +import ( + "testing" + "time" +) + +func TestCalculateReadTimeout(t *testing.T) { + t.Parallel() + tests := []struct { + name string + connectionTimeout time.Duration + expected time.Duration + }{ + { + name: "default connection timeout", + connectionTimeout: DefaultConnectionTimeout, + expected: 60 * time.Second, + }, + { + name: "10 second connection timeout", + connectionTimeout: 10 * time.Second, + expected: 20 * time.Second, + }, + { + name: "1 minute connection timeout", + connectionTimeout: 1 * time.Minute, + expected: 2 * time.Minute, + }, + { + name: "zero timeout", + connectionTimeout: 0, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + actual := CalculateReadTimeout(tt.connectionTimeout) + if actual != tt.expected { + t.Errorf("CalculateReadTimeout(%v) = %v, expected %v", tt.connectionTimeout, actual, tt.expected) + } + }) + } +} + +func TestCalculateDeadlineRefreshInterval(t *testing.T) { + t.Parallel() + tests := []struct { + name string + readTimeout time.Duration + expected time.Duration + }{ + { + name: "60 second read timeout", + readTimeout: 60 * time.Second, + expected: 30 * time.Second, + }, + { + name: "20 second read timeout", + readTimeout: 20 * time.Second, + expected: 10 * time.Second, + }, + { + name: "2 minute read timeout", + readTimeout: 2 * time.Minute, + expected: 1 * time.Minute, + }, + { + name: "zero timeout", + readTimeout: 0, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + actual := CalculateDeadlineRefreshInterval(tt.readTimeout) + if actual != tt.expected { + t.Errorf("CalculateDeadlineRefreshInterval(%v) = %v, expected %v", tt.readTimeout, actual, tt.expected) + } + }) + } +} + +func TestCalculateReadTimeoutMultiplier(t *testing.T) { + t.Parallel() + // Verify that the multiplier constant is correctly applied + connectionTimeout := 15 * time.Second + readTimeout := CalculateReadTimeout(connectionTimeout) + + expected := time.Duration(int64(connectionTimeout) * ReadTimeoutMultiplier) + if readTimeout != expected { + t.Errorf("ReadTimeoutMultiplier not correctly applied: got %v, expected %v", readTimeout, expected) + } +} + +func TestCalculateDeadlineRefreshDivisor(t *testing.T) { + t.Parallel() + // Verify that the divisor constant is correctly applied + readTimeout := 40 * time.Second + refreshInterval := CalculateDeadlineRefreshInterval(readTimeout) + + expected := time.Duration(int64(readTimeout) / DeadlineRefreshDivisor) + if refreshInterval != expected { + t.Errorf("DeadlineRefreshDivisor not correctly applied: got %v, expected %v", refreshInterval, expected) + } +} + +func TestEndToEndTimeoutCalculation(t *testing.T) { + t.Parallel() + // Test the complete flow from connection timeout to refresh interval + connectionTimeout := DefaultConnectionTimeout // 30s + + readTimeout := CalculateReadTimeout(connectionTimeout) + if readTimeout != 60*time.Second { + t.Errorf("Expected read timeout of 60s for default connection timeout, got %v", readTimeout) + } + + refreshInterval := CalculateDeadlineRefreshInterval(readTimeout) + if refreshInterval != 30*time.Second { + t.Errorf("Expected refresh interval of 30s for 60s read timeout, got %v", refreshInterval) + } +} + +func TestConstantValues(t *testing.T) { + t.Parallel() + // Verify the constant values match expected defaults + if DefaultConnectionTimeout != 30*time.Second { + t.Errorf("DefaultConnectionTimeout = %v, expected 30s", DefaultConnectionTimeout) + } + + if ReadTimeoutMultiplier != 2 { + t.Errorf("ReadTimeoutMultiplier = %v, expected 2", ReadTimeoutMultiplier) + } + + if DeadlineRefreshDivisor != 2 { + t.Errorf("DeadlineRefreshDivisor = %v, expected 2", DeadlineRefreshDivisor) + } + + if ReconnectDelay != 10*time.Second { + t.Errorf("ReconnectDelay = %v, expected 10s", ReconnectDelay) + } + + if GRPCKeepaliveTime != 30*time.Second { + t.Errorf("GRPCKeepaliveTime = %v, expected 30s", GRPCKeepaliveTime) + } + + if GRPCKeepaliveTimeout != 10*time.Second { + t.Errorf("GRPCKeepaliveTimeout = %v, expected 10s", GRPCKeepaliveTimeout) + } + + if OpenAIHTTPTimeout != 60*time.Second { + t.Errorf("OpenAIHTTPTimeout = %v, expected 60s", OpenAIHTTPTimeout) + } +} diff --git a/pkg/recognizer/openai.go b/pkg/recognizer/openai.go index b4b71d1e..e69b6b01 100644 --- a/pkg/recognizer/openai.go +++ b/pkg/recognizer/openai.go @@ -7,7 +7,9 @@ import ( "errors" "fmt" "math" + "net/http" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/dharmab/skyeye/pkg/pcm" openai "github.com/openai/openai-go" "github.com/openai/openai-go/option" @@ -23,10 +25,17 @@ type openAIRecognizer struct { var _ Recognizer = &openAIRecognizer{} func newOpenAIRecognizer(apiKey, model, callsign string) Recognizer { + // Create HTTP client with timeout + // Use 60s for audio transcription which can be slow for large files + httpClient := &http.Client{ + Timeout: skynet.OpenAIHTTPTimeout, + } + return &openAIRecognizer{ callsign: callsign, client: openai.NewClient( option.WithAPIKey(apiKey), + option.WithHTTPClient(httpClient), ), model: model, } diff --git a/pkg/simpleradio/client.go b/pkg/simpleradio/client.go index bdd516d8..1b3c696d 100644 --- a/pkg/simpleradio/client.go +++ b/pkg/simpleradio/client.go @@ -34,6 +34,8 @@ type Client struct { // address is the address of the SRS server, including the port. address string + // connectionTimeout is the timeout for establishing connections to the SRS server. + connectionTimeout time.Duration // tcpConnection is the TCP connection to the SRS server used for messages. tcpConnection *net.TCPConn // udpConnection is the UDP connection to the SRS server used for audio and pings. @@ -81,7 +83,8 @@ func NewClient(config types.ClientConfiguration) (*Client, error) { } client := &Client{ - address: config.Address, + address: config.Address, + connectionTimeout: config.ConnectionTimeout, clientInfo: types.ClientInfo{ Name: config.ClientName, GUID: guid, diff --git a/pkg/simpleradio/net.go b/pkg/simpleradio/net.go index d801abde..59b61096 100644 --- a/pkg/simpleradio/net.go +++ b/pkg/simpleradio/net.go @@ -11,37 +11,60 @@ import ( "net" "time" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/dharmab/skyeye/pkg/simpleradio/types" "github.com/rs/zerolog/log" ) // connectTCP connects to the SRS server over TCP. func (c *Client) connectTCP() error { - log.Info().Str("address", c.address).Msg("connecting to SRS server TCP socket") + log.Info(). + Str("address", c.address). + Stringer("timeout", c.connectionTimeout). + Msg("connecting to SRS server TCP socket") + tcpAddress, err := net.ResolveTCPAddr("tcp", c.address) if err != nil { return fmt.Errorf("failed to resolve SRS server address %v: %w", c.address, err) } - connection, err := net.DialTCP("tcp", nil, tcpAddress) + + dialer := &net.Dialer{ + Timeout: c.connectionTimeout, + } + connection, err := dialer.Dial("tcp", tcpAddress.String()) if err != nil { return fmt.Errorf("failed to connect to data socket: %w", err) } - c.tcpConnection = connection + + c.tcpConnection = connection.(*net.TCPConn) return nil } // connectUDP connects to the SRS server over UDP. func (c *Client) connectUDP() error { - log.Info().Str("address", c.address).Msg("connecting to SRS server UDP socket") + log.Info(). + Str("address", c.address). + Stringer("timeout", c.connectionTimeout). + Msg("connecting to SRS server UDP socket") + + // Note: UDP is connectionless, so there's no actual "connection" to timeout. + // The timeout here applies only to DNS resolution and local socket setup, + // not to data transmission (which is handled by read deadlines). + dialer := &net.Dialer{ + Timeout: c.connectionTimeout, + } + udpAddress, err := net.ResolveUDPAddr("udp", c.address) if err != nil { return fmt.Errorf("failed to resolve SRS server address %v: %w", c.address, err) } - connection, err := net.DialUDP("udp", nil, udpAddress) + + connection, err := dialer.Dial("udp", udpAddress.String()) if err != nil { return fmt.Errorf("failed to connect to UDP socket: %w", err) } - c.udpConnection = connection + + c.udpConnection = connection.(*net.UDPConn) return nil } @@ -78,12 +101,30 @@ func (c *Client) reconnect(ctx context.Context) error { // receiveUDP listens for incoming UDP packets and routes them to the appropriate channel. func (c *Client) receiveUDP(ctx context.Context, pingChan chan<- []byte, voiceChan chan<- []byte) { + readTimeout := skynet.CalculateReadTimeout(c.connectionTimeout) + + // Set initial read deadline + if err := c.udpConnection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Msg("failed to set initial UDP read deadline") + } + for { select { case <-ctx.Done(): log.Info().Msg("stopping SRS packet receiver due to context cancellation") return default: + // Set read deadline before each read + if err := c.udpConnection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Msg("failed to set UDP read deadline") + } + buf := make([]byte, 1500) n, err := c.udpConnection.Read(buf) switch { @@ -119,12 +160,30 @@ func (c *Client) receiveUDP(ctx context.Context, pingChan chan<- []byte, voiceCh // receiveTCP listens for incoming TCP messages and routes them to the appropriate handler. func (c *Client) receiveTCP(ctx context.Context) { reader := bufio.NewReader(c.tcpConnection) + readTimeout := skynet.CalculateReadTimeout(c.connectionTimeout) + + // Set initial read deadline + if err := c.tcpConnection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Msg("failed to set initial TCP read deadline") + } + for { select { case <-ctx.Done(): log.Info().Msg("stopping SRS client due to context cancellation") return default: + // Set read deadline before each read + if err := c.tcpConnection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Msg("failed to set TCP read deadline") + } + line, err := reader.ReadBytes(byte('\n')) if err != nil { if errors.Is(err, net.ErrClosed) && ctx.Err() != nil { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index e6dd90be..92e0d091 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -10,6 +10,7 @@ import ( "net" "time" + skynet "github.com/dharmab/skyeye/pkg/net" "github.com/rs/zerolog/log" ) @@ -55,7 +56,7 @@ func (c *RealTimeClient) Run(ctx context.Context) error { case <-ctx.Done(): return nil default: - nextAttempt := time.Now().Add(10 * time.Second) + nextAttempt := time.Now().Add(skynet.ReconnectDelay) if err := c.read(ctx); err != nil { if errors.Is(err, context.Canceled) { return nil @@ -82,6 +83,42 @@ func (c *RealTimeClient) read(ctx context.Context) error { return fmt.Errorf("error during client handhake: %w", err) } + readTimeout := skynet.CalculateReadTimeout(c.connectionTimeout) + + // Set up periodic deadline refresh to keep connection alive during long reads + deadlineCtx, deadlineCancel := context.WithCancel(ctx) + defer deadlineCancel() + + go func() { + ticker := time.NewTicker(skynet.CalculateDeadlineRefreshInterval(readTimeout)) + defer ticker.Stop() + + for { + select { + case <-deadlineCtx.Done(): + return + case <-ticker.C: + if err := connection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Time("attemptedDeadline", time.Now().Add(readTimeout)). + Msg("failed to refresh telemetry read deadline") + // Connection may be in bad state, but read will fail + // and trigger reconnect naturally + } + } + } + }() + + // Set initial deadline + if err := connection.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + log.Warn(). + Err(err). + Stringer("readTimeout", readTimeout). + Msg("failed to set initial telemetry read deadline") + } + if err := c.handleLines(ctx, reader); err != nil { return fmt.Errorf("error reading updates: %w", err) }