Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ third_party/whisper.cpp
# whisper.cpp models
*.bin

# ELF binary
# ELF binaries
/skyeye
/skyeye.exe
/skyeye-scaler
/skyeye-scaler.exe

# Local dev scripts ignored by default
scripts/
Expand Down
3 changes: 2 additions & 1 deletion cmd/skyeye-scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/dharmab/skyeye/internal/cli"
"github.com/dharmab/skyeye/pkg/coalitions"
skynet "github.com/dharmab/skyeye/pkg/net"
"github.com/dharmab/skyeye/pkg/simpleradio"
srstypes "github.com/dharmab/skyeye/pkg/simpleradio/types"
"github.com/lithammer/shortuuid/v3"
Expand Down Expand Up @@ -76,7 +77,7 @@ func init() {
scaler.Flags().DurationVar(&stopDelay, "stop-delay", 10*time.Minute, "Delay before sending stop requests after the SRS player count drops to 0")

scaler.Flags().StringVar(&srsAddress, "srs-server-address", "localhost:5002", "Address of the SRS server")
scaler.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", 10*time.Second, "Connection timeout for SRS client")
scaler.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for SRS client")
scaler.Flags().StringVar(&srsExternalAWACSModePassword, "srs-eam-password", "", "SRS external AWACS mode password")
scaler.Flags().StringSliceVar(&srsFrequencies, "srs-frequencies", []string{"251.0AM", "133.0AM", "30.0FM"}, "List of SRS frequencies to use")
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/skyeye/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dharmab/skyeye/internal/cli"
"github.com/dharmab/skyeye/internal/conf"
"github.com/dharmab/skyeye/pkg/coalitions"
skynet "github.com/dharmab/skyeye/pkg/net"
"github.com/dharmab/skyeye/pkg/synthesizer/voices"
"github.com/ggerganov/whisper.cpp/bindings/go/pkg/whisper"
)
Expand Down Expand Up @@ -90,13 +91,13 @@ func init() {
skyeye.Flags().StringVar(&acmiFile, "acmi-file", "", "path to ACMI file")
skyeye.Flags().StringVar(&telemetryAddress, "telemetry-address", "localhost:42674", "Address of the real-time telemetry service")
skyeye.MarkFlagsMutuallyExclusive("acmi-file", "telemetry-address")
skyeye.Flags().DurationVar(&telemetryConnectionTimeout, "telemetry-connection-timeout", 10*time.Second, "Connection timeout for real-time telemetry client")
skyeye.Flags().DurationVar(&telemetryConnectionTimeout, "telemetry-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for real-time telemetry client")
skyeye.Flags().StringVar(&telemetryPassword, "telemetry-password", "", "Password for the real-time telemetry service")
skyeye.Flags().DurationVar(&telemetryUpdateInterval, "telemetry-update-interval", 2*time.Second, "Interval at which trackfiles are updated from telemetry data")

// SRS
skyeye.Flags().StringVar(&srsAddress, "srs-server-address", "localhost:5002", "Address of the SRS server")
skyeye.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", 10*time.Second, "Connection timeout for SRS client")
skyeye.Flags().DurationVar(&srsConnectionTimeout, "srs-connection-timeout", skynet.DefaultConnectionTimeout, "Connection timeout for SRS client")
skyeye.Flags().StringVar(&srsExternalAWACSModePassword, "srs-eam-password", "", "SRS external AWACS mode password")
skyeye.Flags().StringSliceVar(&srsFrequencies, "srs-frequencies", []string{"251.0AM", "133.0AM", "30.0FM"}, "List of SRS frequencies to use")

Expand Down
14 changes: 13 additions & 1 deletion internal/application/app.go
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many comments, needs human rewrite.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/dharmab/skyeye/pkg/commands"
"github.com/dharmab/skyeye/pkg/composer"
"github.com/dharmab/skyeye/pkg/controller"
skynet "github.com/dharmab/skyeye/pkg/net"
"github.com/dharmab/skyeye/pkg/parser"
"github.com/dharmab/skyeye/pkg/radar"
"github.com/dharmab/skyeye/pkg/recognizer"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -81,7 +83,15 @@ func NewApplication(config conf.Configuration) (*Application, error) {
var chatListener *commands.ChatListener
if config.EnableGRPC {
log.Info().Str("address", config.GRPCAddress).Msg("constructing gRPC clients")
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
// Add gRPC-level keepalive (not TCP keepalive)
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: skynet.GRPCKeepaliveTime, // Send gRPC keepalive ping every 30s
Timeout: skynet.GRPCKeepaliveTimeout, // Wait 10s for ping ack
PermitWithoutStream: true, // Send pings even without active streams
}),
}
if config.GRPCAPIKey != "" {
log.Info().Msg("configuring gRPC client connection with provided API key")
opts = append(opts, grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -94,6 +104,8 @@ func NewApplication(config conf.Configuration) (*Application, error) {
if err != nil {
return nil, err
}
// Note: gRPC connections are lazy - they dial on first RPC
// The keepalive params will detect dead connections
missionClient := mission.NewMissionServiceClient(grpcClient)
coalitionClient := grpccoalition.NewCoalitionServiceClient(grpcClient)
netClient := net.NewNetServiceClient(grpcClient)
Expand Down
84 changes: 84 additions & 0 deletions pkg/net/timeouts.go
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate this package comment, needs human rewrite.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package net provides common network configuration constants and utilities
// for the skyeye application.
//
// # Timeout Configuration
//
// The constants defined in this package represent recommended defaults that
// balance reliability and responsiveness for typical deployment scenarios.
// These defaults assume reasonably stable network conditions and may need
// adjustment based on your environment:
//
// - LAN deployments: Current defaults are suitable
// - WAN/Internet deployments: May need longer timeouts for connection establishment
// - High-latency links: Should increase timeout values proportionally
// - Unstable networks: May benefit from shorter timeouts with more aggressive retry logic
//
// # Usage
//
// Application code should use these constants as defaults for CLI flags,
// allowing operators to override based on their network conditions:
//
// flag.DurationVar(&timeout, "timeout", net.DefaultConnectionTimeout, "connection timeout")
//
// The package also provides helper functions for calculating related timeouts:
//
// - CalculateReadTimeout: Determines read timeout from connection timeout
// - CalculateDeadlineRefreshInterval: Determines how often to refresh deadlines
package net

import "time"

const (
// DefaultConnectionTimeout is the default timeout for establishing network connections.
// This applies to initial connection attempts for TCP, UDP resolution, and similar operations.
DefaultConnectionTimeout = 30 * time.Second

// ReadTimeoutMultiplier defines how much longer read timeouts should be compared to
// connection timeouts for streaming data. A multiplier of 2 allows for transient
// network delays while still detecting truly dead connections.
//
// Rationale: A 2x multiplier provides a balance between responsiveness to network
// issues and tolerance for normal network latency/congestion. Lower values may cause
// false positives during temporary network slowdowns; higher values delay detection
// of truly dead connections.
ReadTimeoutMultiplier = 2

// DeadlineRefreshDivisor determines how frequently read deadlines should be refreshed
// for long-lived streaming connections. The refresh interval is calculated as:
// readTimeout / DeadlineRefreshDivisor. A divisor of 2 means we refresh at the
// halfway point, ensuring the deadline never expires during active streaming.
//
// Rationale: Refreshing at the halfway point (divisor of 2) ensures the deadline is
// always fresh with minimal overhead. This prevents legitimate long-running streams
// from timing out while still allowing the full timeout period to detect dead connections.
DeadlineRefreshDivisor = 2

// ReconnectDelay is the delay before retrying a failed connection attempt.
// This applies to automatic reconnection logic after connection failures.
ReconnectDelay = 10 * time.Second

// GRPCKeepaliveTime is the interval at which gRPC keepalive pings are sent.
// This matches the default connection timeout to detect dead connections quickly.
GRPCKeepaliveTime = 30 * time.Second

// GRPCKeepaliveTimeout is how long to wait for a keepalive ping acknowledgment
// before considering the connection dead.
GRPCKeepaliveTimeout = 10 * time.Second

// OpenAIHTTPTimeout is the timeout for HTTP requests to the OpenAI API.
// Audio transcription can be slow for large files, so this is set higher
// than typical HTTP timeouts.
OpenAIHTTPTimeout = 60 * time.Second
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove this and just use the standard 30s

)

// CalculateReadTimeout returns the recommended read timeout based on a connection timeout.
// This is useful for streaming connections where reads may take longer than initial connection.
func CalculateReadTimeout(connectionTimeout time.Duration) time.Duration {
return connectionTimeout * ReadTimeoutMultiplier
}

// CalculateDeadlineRefreshInterval returns the interval at which read deadlines should
// be refreshed for long-lived streaming connections.
func CalculateDeadlineRefreshInterval(readTimeout time.Duration) time.Duration {
return readTimeout / DeadlineRefreshDivisor
}
158 changes: 158 additions & 0 deletions pkg/net/timeouts_test.go
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are not valuable and should be removed.

Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package net

import (
"testing"
"time"
)

func TestCalculateReadTimeout(t *testing.T) {
t.Parallel()
tests := []struct {
name string
connectionTimeout time.Duration
expected time.Duration
}{
{
name: "default connection timeout",
connectionTimeout: DefaultConnectionTimeout,
expected: 60 * time.Second,
},
{
name: "10 second connection timeout",
connectionTimeout: 10 * time.Second,
expected: 20 * time.Second,
},
{
name: "1 minute connection timeout",
connectionTimeout: 1 * time.Minute,
expected: 2 * time.Minute,
},
{
name: "zero timeout",
connectionTimeout: 0,
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
actual := CalculateReadTimeout(tt.connectionTimeout)
if actual != tt.expected {
t.Errorf("CalculateReadTimeout(%v) = %v, expected %v", tt.connectionTimeout, actual, tt.expected)
}
})
}
}

func TestCalculateDeadlineRefreshInterval(t *testing.T) {
t.Parallel()
tests := []struct {
name string
readTimeout time.Duration
expected time.Duration
}{
{
name: "60 second read timeout",
readTimeout: 60 * time.Second,
expected: 30 * time.Second,
},
{
name: "20 second read timeout",
readTimeout: 20 * time.Second,
expected: 10 * time.Second,
},
{
name: "2 minute read timeout",
readTimeout: 2 * time.Minute,
expected: 1 * time.Minute,
},
{
name: "zero timeout",
readTimeout: 0,
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
actual := CalculateDeadlineRefreshInterval(tt.readTimeout)
if actual != tt.expected {
t.Errorf("CalculateDeadlineRefreshInterval(%v) = %v, expected %v", tt.readTimeout, actual, tt.expected)
}
})
}
}

func TestCalculateReadTimeoutMultiplier(t *testing.T) {
t.Parallel()
// Verify that the multiplier constant is correctly applied
connectionTimeout := 15 * time.Second
readTimeout := CalculateReadTimeout(connectionTimeout)

expected := time.Duration(int64(connectionTimeout) * ReadTimeoutMultiplier)
if readTimeout != expected {
t.Errorf("ReadTimeoutMultiplier not correctly applied: got %v, expected %v", readTimeout, expected)
}
}

func TestCalculateDeadlineRefreshDivisor(t *testing.T) {
t.Parallel()
// Verify that the divisor constant is correctly applied
readTimeout := 40 * time.Second
refreshInterval := CalculateDeadlineRefreshInterval(readTimeout)

expected := time.Duration(int64(readTimeout) / DeadlineRefreshDivisor)
if refreshInterval != expected {
t.Errorf("DeadlineRefreshDivisor not correctly applied: got %v, expected %v", refreshInterval, expected)
}
}

func TestEndToEndTimeoutCalculation(t *testing.T) {
t.Parallel()
// Test the complete flow from connection timeout to refresh interval
connectionTimeout := DefaultConnectionTimeout // 30s

readTimeout := CalculateReadTimeout(connectionTimeout)
if readTimeout != 60*time.Second {
t.Errorf("Expected read timeout of 60s for default connection timeout, got %v", readTimeout)
}

refreshInterval := CalculateDeadlineRefreshInterval(readTimeout)
if refreshInterval != 30*time.Second {
t.Errorf("Expected refresh interval of 30s for 60s read timeout, got %v", refreshInterval)
}
}

func TestConstantValues(t *testing.T) {
t.Parallel()
// Verify the constant values match expected defaults
if DefaultConnectionTimeout != 30*time.Second {
t.Errorf("DefaultConnectionTimeout = %v, expected 30s", DefaultConnectionTimeout)
}

if ReadTimeoutMultiplier != 2 {
t.Errorf("ReadTimeoutMultiplier = %v, expected 2", ReadTimeoutMultiplier)
}

if DeadlineRefreshDivisor != 2 {
t.Errorf("DeadlineRefreshDivisor = %v, expected 2", DeadlineRefreshDivisor)
}

if ReconnectDelay != 10*time.Second {
t.Errorf("ReconnectDelay = %v, expected 10s", ReconnectDelay)
}

if GRPCKeepaliveTime != 30*time.Second {
t.Errorf("GRPCKeepaliveTime = %v, expected 30s", GRPCKeepaliveTime)
}

if GRPCKeepaliveTimeout != 10*time.Second {
t.Errorf("GRPCKeepaliveTimeout = %v, expected 10s", GRPCKeepaliveTimeout)
}

if OpenAIHTTPTimeout != 60*time.Second {
t.Errorf("OpenAIHTTPTimeout = %v, expected 60s", OpenAIHTTPTimeout)
}
}
9 changes: 9 additions & 0 deletions pkg/recognizer/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"errors"
"fmt"
"math"
"net/http"

skynet "github.com/dharmab/skyeye/pkg/net"
"github.com/dharmab/skyeye/pkg/pcm"
openai "github.com/openai/openai-go"
"github.com/openai/openai-go/option"
Expand All @@ -23,10 +25,17 @@ type openAIRecognizer struct {
var _ Recognizer = &openAIRecognizer{}

func newOpenAIRecognizer(apiKey, model, callsign string) Recognizer {
// Create HTTP client with timeout
// Use 60s for audio transcription which can be slow for large files
httpClient := &http.Client{
Timeout: skynet.OpenAIHTTPTimeout,
}

return &openAIRecognizer{
callsign: callsign,
client: openai.NewClient(
option.WithAPIKey(apiKey),
option.WithHTTPClient(httpClient),
),
model: model,
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/simpleradio/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Client struct {

// address is the address of the SRS server, including the port.
address string
// connectionTimeout is the timeout for establishing connections to the SRS server.
connectionTimeout time.Duration
// tcpConnection is the TCP connection to the SRS server used for messages.
tcpConnection *net.TCPConn
// udpConnection is the UDP connection to the SRS server used for audio and pings.
Expand Down Expand Up @@ -81,7 +83,8 @@ func NewClient(config types.ClientConfiguration) (*Client, error) {
}

client := &Client{
address: config.Address,
address: config.Address,
connectionTimeout: config.ConnectionTimeout,
clientInfo: types.ClientInfo{
Name: config.ClientName,
GUID: guid,
Expand Down
Loading
Loading