Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
69ce971
Implement transparent proxying for KeyValue pattern with gRPC data plane
jrepp Oct 20, 2025
ab3e05d
Implement transparent proxying for KeyValue pattern with gRPC reflect…
jrepp Oct 20, 2025
f1435ae
Add launcher callback protocol with dynamic port allocation
jrepp Oct 20, 2025
4f9d848
Fix Rust formatting issues
jrepp Oct 20, 2025
102f330
Fix Clippy lint errors in prism-proxy
jrepp Oct 20, 2025
d8a5238
Fix KeyValue unit tests to expect unavailable status without backend
jrepp Oct 21, 2025
f6079ba
Add proxy-integration-runner for end-to-end integration testing
jrepp Oct 21, 2025
9ef4e88
Fix Rust formatting to pass CI lint checks
jrepp Oct 21, 2025
6437d62
Clean up documentation: Create MEMO-038 and consolidate integration t…
jrepp Oct 21, 2025
66ad605
Update changelog: Add launcher callback protocol info and compress en…
jrepp Oct 21, 2025
e72c9eb
Fix MEMO-038 frontmatter and code block formatting
jrepp Oct 21, 2025
d6eabb0
Fix changelog links to use absolute paths with document IDs
jrepp Oct 21, 2025
a64fb48
Fix prismctl build failure by updating go.mod dependencies
jrepp Oct 21, 2025
3c35b96
Merge main into feature/transparent-proxy-ttl-keyval
jrepp Jan 3, 2026
b87076e
Fix duplicate variable declaration in build.rs
jrepp Jan 3, 2026
2ec575e
Merge branch 'main' into feature/transparent-proxy-ttl-keyval
mergify[bot] Jan 3, 2026
6f398cc
Merge branch 'main' into feature/transparent-proxy-ttl-keyval
mergify[bot] Jan 5, 2026
7571087
Merge branch 'main' into feature/transparent-proxy-ttl-keyval
mergify[bot] Jan 6, 2026
75b7b03
Merge branch 'main' into feature/transparent-proxy-ttl-keyval
mergify[bot] Jan 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/prism-admin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/hashicorp/go-hclog v1.6.2
github.com/hashicorp/raft v1.7.3
github.com/hashicorp/raft-boltdb v0.0.0-20250926130943-f41fa5f23d89
github.com/jrepp/prism-data-layer/pkg/launcherclient v0.0.0
github.com/prometheus/client_golang v1.23.2
github.com/spf13/cobra v1.10.1
github.com/spf13/viper v1.21.0
Expand Down Expand Up @@ -67,3 +68,5 @@ require (
)

replace github.com/jrepp/prism-data-layer/pkg/plugin => ../../pkg/plugin

replace github.com/jrepp/prism-data-layer/pkg/launcherclient => ../../pkg/launcherclient
103 changes: 97 additions & 6 deletions cmd/prism-admin/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"
"time"

"github.com/jrepp/prism-data-layer/pkg/launcherclient"
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -51,6 +52,10 @@ func init() {
serveCmd.Flags().String("data-dir", "", "Raft data directory")
serveCmd.Flags().String("db", "", "Database URN (e.g., sqlite:///var/lib/prism-admin/admin.db)")

// Launcher integration flags
serveCmd.Flags().String("admin", "", "Admin/launcher endpoint (launcher://host:port or host:port)")
serveCmd.Flags().String("process-id", "", "Process ID (assigned by launcher)")

// Bind legacy flags
viper.BindPFlag("server.port", serveCmd.Flags().Lookup("port"))
viper.BindPFlag("server.listen", serveCmd.Flags().Lookup("listen"))
Expand All @@ -72,6 +77,80 @@ func runServe(cmd *cobra.Command, args []string) error {
ctx := context.Background()
log := slog.Default()

// Check if launched by launcher (launcher integration)
adminEndpoint, _ := cmd.Flags().GetString("admin")
processID, _ := cmd.Flags().GetString("process-id")
var launcherClient *launcherclient.Client
var actualListener net.Listener

if adminEndpoint != "" && processID != "" {
launcherAddr, isLauncher, err := launcherclient.ParseAdminEndpoint(adminEndpoint)
if err != nil {
return fmt.Errorf("invalid admin endpoint: %w", err)
}

if isLauncher {
fmt.Printf("[INFO] Launched by launcher, connecting to %s with process ID %s\n", launcherAddr, processID)

// Get dynamic port from OS
listener, actualPort, err := launcherclient.GetActualPort()
if err != nil {
return fmt.Errorf("failed to get dynamic port: %w", err)
}
actualListener = listener
defer listener.Close()

fmt.Printf("[INFO] Got dynamic port: %d\n", actualPort)

// Create launcher client
launcherClient, err = launcherclient.New(&launcherclient.Config{
LauncherAddr: launcherAddr,
ProcessID: processID,
ProcessType: "admin",
ActualPort: actualPort,
})
if err != nil {
return fmt.Errorf("failed to create launcher client: %w", err)
}
defer launcherClient.Close()

// Connect to launcher
if err := launcherClient.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to launcher: %w", err)
}

// Perform handshake and discover topology
topology, err := launcherClient.Handshake(ctx)
if err != nil {
return fmt.Errorf("handshake failed: %w", err)
}

fmt.Printf("[INFO] Handshake successful!\n")
fmt.Printf("[INFO] Launcher ID: %s\n", topology.LauncherID)
fmt.Printf("[INFO] Admin endpoints: %v\n", topology.AdminEndpoints)
fmt.Printf("[INFO] Proxy endpoints: %v\n", topology.ProxyEndpoints)

// Start heartbeat loop
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := launcherClient.SendHeartbeat(ctx); err != nil {
log.Error("heartbeat failed", "error", err)
}
}
}
}()

// Override gRPC port with actual port
cmd.Flags().Set("grpc-port", fmt.Sprintf("%d", actualPort))
}
}

// Load configuration
clusterCfg, controlPlaneCfg, _, err := LoadConfig()
if err != nil {
Expand Down Expand Up @@ -116,9 +195,12 @@ func runServe(cmd *cobra.Command, args []string) error {
}
defer raftNode.Stop()

// Bootstrap cluster (if not already bootstrapped)
if err := raftNode.Bootstrap(clusterCfg.Peers); err != nil {
return fmt.Errorf("failed to bootstrap cluster: %w", err)
// Bootstrap cluster (only first node should bootstrap)
// Other nodes will join the cluster once the first node is bootstrapped
if clusterCfg.NodeID == 1 {
if err := raftNode.Bootstrap(clusterCfg.Peers); err != nil {
return fmt.Errorf("failed to bootstrap cluster: %w", err)
}
}

// Wait for leader election
Expand Down Expand Up @@ -150,9 +232,18 @@ func runServe(cmd *cobra.Command, args []string) error {
// Start gRPC server
address := controlPlaneCfg.Listen

lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
var lis net.Listener
if actualListener != nil {
// Use the listener from launcher client (dynamic port)
lis = actualListener
fmt.Printf("[INFO] Using launcher-assigned port: %s\n", lis.Addr().String())
} else {
// Standard mode - listen on configured port
var err error
lis, err = net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
}

grpcServer := grpc.NewServer()
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions cmd/pattern-launcher/main.go → cmd/prism-launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ func main() {
log.Printf("No admin endpoint configured, running standalone")
}

// Create control service for process callbacks
controlService := launcher.NewControlService(*launcherID, adminClient)

// Create gRPC server
grpcServer := grpc.NewServer()
pb.RegisterPatternLauncherServer(grpcServer, service)
pb.RegisterLauncherControlServer(grpcServer, controlService)

// Enable reflection for grpcurl
reflection.Register(grpcServer)
Expand Down
Loading
Loading