Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ dev: postgres root
proto-gen:
./scripts/proto-gen.sh "api-specs/v1/proto/agents"
./scripts/proto-gen.sh "api-specs/v1/proto/admin"
./scripts/proto-gen.sh "api-specs/v1/proto"
$(MAKE) go-format

.PHONY: go-format
go-format:
goimports -w .
gofmt -s -w .

18 changes: 18 additions & 0 deletions api-specs/v1/proto/errors.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";


package proto;

option go_package = "github.com/openkcm/krypton/pkg/api/v1/proto";


message ErrorDetails {
Code code = 1;
}


enum Code {
ERROR_CODE_UNSPECIFIED = 0;
ERROR_CODE_ABORT = 1;
ERROR_CODE_RETRY = 2;
}
50 changes: 39 additions & 11 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,80 @@ import (
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/openkcm/krypton/internal/config"
"github.com/openkcm/krypton/internal/spec"
"github.com/openkcm/krypton/internal/worker"
"github.com/openkcm/krypton/pkg/api/agents"
"github.com/openkcm/krypton/pkg/api/v1/proto/agents"
)

// This is a simple agent that registers itself with the root server,
// sends periodic heartbeats, and deregisters on shutdown.
func main() {
ctx := context.Background()

agentID := os.Getenv("AGENT_ID")
if agentID == "" {
agentID = uuid.New().String()
}

cfg := loadConfig()

agentClient, err := agents.NewClient(cfg.KryptonRoot.Address.URL, cfg.Name, agentID)
handleErr(err, "failed to create agent client")
conn, err := grpc.NewClient(
cfg.KryptonRoot.Address.URL,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
handleErr(err, "failed to connect to root server")
defer conn.Close()

agentsCli := agents.NewAgentServiceClient(conn)

// Example usage: register the agent
registerResp, err := agentClient.Register(context.Background(), agents.RegisterRequest{})
log.Printf("Registering agent %s with ID %s", cfg.Name, agentID)
reg, err := agentsCli.Register(ctx, &agents.RegisterAgentRequest{
AgentName: cfg.Name,
InstanceId: agentID,
})
handleErr(err, "failed to register agent")

keepAliveInterval := time.Duration(registerResp.Config.KeepAlive) * time.Second
agentCfg, err := agents.UnmarshalAgentConfig(reg.GetConfig())
handleErr(err, "failed to unmarshal agent config")

keepAliveInterval := time.Duration(agentCfg.KeepAlive) * time.Second
wrkr, err := worker.New(keepAliveInterval, func(ctx context.Context) error {
_, err := agentClient.SendHeartbeat(ctx, agents.SendHeartbeatRequest{})
log.Printf("Sending heartbeat for agent %s (ID: %s)", cfg.Name, agentID)
_, err := agentsCli.SendHeartbeat(ctx, &agents.SendHeartbeatRequest{
InstanceId: agentID,
AgentName: cfg.Name,
})
return err
})

handleErr(err, "failed to create heartbeat worker")
go wrkr.Start(context.Background())
go wrkr.Start(ctx)

// graceful shutdown on SIGINT/SIGTERM
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan
fmt.Println("Received termination signal, shutting down...")
wrkr.Stop()

_, err = agentClient.Deregister(context.Background(), agents.DeregisterRequest{})
log.Println("Deregistering agent...")
dCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = agentsCli.Deregister(dCtx, &agents.DeregisterAgentRequest{
InstanceId: agentID,
AgentName: cfg.Name,
})
if err != nil {
log.Printf("failed to deregister agent: %v", err)
}

fmt.Println("Received termination signal, shutting down...")
log.Println("Agent shutdown complete")
}

func handleErr(err error, msg string) {
Expand All @@ -80,8 +108,8 @@ func loadConfig() *config.AgentBootstrapConfig {
Role: spec.DefaultRole,
KryptonRoot: config.KryptonRoot{
Address: config.Address{
Type: config.AddressTypeHTTP,
URL: "http://localhost:" + port,
Type: config.AddressTypeGRPC,
URL: "localhost:" + port,
},
},
}
Expand Down
40 changes: 19 additions & 21 deletions cmd/root/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"errors"
"log"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"google.golang.org/grpc"
Expand All @@ -19,8 +20,8 @@ import (
"github.com/openkcm/krypton/internal/core"
"github.com/openkcm/krypton/internal/spec"
"github.com/openkcm/krypton/internal/worker"
"github.com/openkcm/krypton/pkg/api/agents"
"github.com/openkcm/krypton/pkg/api/v1/proto/admin"
"github.com/openkcm/krypton/pkg/api/v1/proto/agents"
"github.com/openkcm/krypton/pkg/store"
storesql "github.com/openkcm/krypton/pkg/store/sql"
)
Expand All @@ -35,13 +36,6 @@ func main() {
_, err := strconv.Atoi(srvPort)
handleErr(err, "invalid SERVER_PORT value")

grpcPort := os.Getenv("GRPC_PORT")
if grpcPort == "" {
grpcPort = "9090"
}
_, err = strconv.Atoi(grpcPort)
handleErr(err, "invalid GRPC_PORT value")

dsn := os.Getenv("DATABASE_URL")
if dsn == "" {
log.Fatal("DATABASE_URL environment variable is required")
Expand All @@ -66,28 +60,32 @@ func main() {
grpcServer := grpc.NewServer()
admin.RegisterServiceServer(grpcServer, admin.NewService(tenantStore))

lis, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", ":"+grpcPort)
// gRPC server setup for agent API
agents.RegisterAgentServiceServer(grpcServer, agents.NewAgentService(agentStore, *cfg))

lis, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", ":"+srvPort)
handleErr(err, "failed to listen on gRPC port")

go func() {
log.Printf("gRPC server listening on :%s", grpcPort)
log.Printf("gRPC server listening on :%s", srvPort)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve gRPC: %v", err)
}
}()
defer grpcServer.GracefulStop()

// HTTP API server setup (for agents API where http client is still used in tests)
mux := agents.NewServerMux(nil, agentStore, *cfg)

// worker initialization
wrkr := initAgentWorker(agentStore)
go wrkr.Start(context.Background())
defer wrkr.Stop()

log.Printf("HTTP server listening on :%s", srvPort)
err = http.ListenAndServe(":"+srvPort, mux)
handleErr(err, "failed to start server")
// graceful shutdown on SIGINT/SIGTERM
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan
log.Println("Received shutdown signal, stopping server...")
wrkr.Stop()
log.Println("Shutting down gracefully...")
}

// For simplicity, we hardcode a sample root configuration here.
Expand All @@ -108,7 +106,7 @@ func loadConfig() *config.RootConfig {
"K0": {
Vault: spec.VaultSpec{
Name: "root-hsm-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
},
"K1": {
Expand Down Expand Up @@ -142,7 +140,7 @@ func loadConfig() *config.RootConfig {
"K2": {
Vault: spec.VaultSpec{
Name: "aws-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
ParentKeyProvider: &spec.ParentKeyProviderRef{
AgentName: "root",
Expand All @@ -151,7 +149,7 @@ func loadConfig() *config.RootConfig {
"K3": {
Vault: spec.VaultSpec{
Name: "aws-dek-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type AddressType string
const (
// AddressTypeHTTP represents HTTP/HTTPS transport.
AddressTypeHTTP AddressType = "http"
// AddressTypeGRPC represents gRPC transport.
AddressTypeGRPC AddressType = "grpc"
)

// Address represents a network address for inter-service communication.
Expand Down
41 changes: 0 additions & 41 deletions internal/krhttp/client.go

This file was deleted.

56 changes: 0 additions & 56 deletions internal/krhttp/client_test.go

This file was deleted.

Loading
Loading