Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ dev: postgres root
proto-gen:
./scripts/proto-gen.sh "api-specs/v1/proto/agents"
./scripts/proto-gen.sh "api-specs/v1/proto/admin"
./scripts/proto-gen.sh "api-specs/v1/proto"
$(MAKE) go-format

.PHONY: go-format
go-format:
goimports -w .
gofmt -s -w .

2 changes: 1 addition & 1 deletion api-specs/v1/proto/admin/admin.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
syntax = "proto3";


package proto;
package krypton.v1.admin;

option go_package = "github.com/openkcm/krypton/pkg/api/v1/proto/admin";

Expand Down
4 changes: 2 additions & 2 deletions api-specs/v1/proto/agents/agents.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
syntax = "proto3";


package proto;
package krypton.v1.agents;

option go_package = "github.com/openkcm/krypton/pkg/api/v1/proto/agents";

service AgentService {
service Service {
rpc Register(RegisterAgentRequest) returns (RegisterAgentResponse) {}
rpc SendHeartbeat(SendHeartbeatRequest) returns (SendHeartbeatResponse) {}
rpc Deregister(DeregisterAgentRequest) returns (DeregisterAgentResponse) {}
Expand Down
18 changes: 18 additions & 0 deletions api-specs/v1/proto/errors.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";


package krypton.v1;

option go_package = "github.com/openkcm/krypton/pkg/api/v1/proto";


message ErrorDetails {
Code code = 1;
}


enum Code {
ERROR_CODE_UNSPECIFIED = 0;
ERROR_CODE_ABORT = 1;
ERROR_CODE_RETRY = 2;
}
50 changes: 39 additions & 11 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,80 @@ import (
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/openkcm/krypton/internal/config"
"github.com/openkcm/krypton/internal/spec"
"github.com/openkcm/krypton/internal/worker"
"github.com/openkcm/krypton/pkg/api/agents"
"github.com/openkcm/krypton/pkg/api/v1/proto/agents"
)

// This is a simple agent that registers itself with the root server,
// sends periodic heartbeats, and deregisters on shutdown.
func main() {
ctx := context.Background()

agentID := os.Getenv("AGENT_ID")
if agentID == "" {
agentID = uuid.New().String()
}

cfg := loadConfig()

agentClient, err := agents.NewClient(cfg.KryptonRoot.Address.URL, cfg.Name, agentID)
handleErr(err, "failed to create agent client")
conn, err := grpc.NewClient(
cfg.KryptonRoot.Address.URL,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
handleErr(err, "failed to connect to root server")
defer conn.Close()

agentsCli := agents.NewServiceClient(conn)

// Example usage: register the agent
registerResp, err := agentClient.Register(context.Background(), agents.RegisterRequest{})
log.Printf("Registering agent %s with ID %s", cfg.Name, agentID)
reg, err := agentsCli.Register(ctx, &agents.RegisterAgentRequest{
AgentName: cfg.Name,
InstanceId: agentID,
})
handleErr(err, "failed to register agent")

keepAliveInterval := time.Duration(registerResp.Config.KeepAlive) * time.Second
agentCfg, err := agents.UnmarshalAgentConfig(reg.GetConfig())
handleErr(err, "failed to unmarshal agent config")

keepAliveInterval := time.Duration(agentCfg.KeepAlive) * time.Second
wrkr, err := worker.New(keepAliveInterval, func(ctx context.Context) error {
_, err := agentClient.SendHeartbeat(ctx, agents.SendHeartbeatRequest{})
log.Printf("Sending heartbeat for agent %s (ID: %s)", cfg.Name, agentID)
_, err := agentsCli.SendHeartbeat(ctx, &agents.SendHeartbeatRequest{
InstanceId: agentID,
AgentName: cfg.Name,
})
return err
})

handleErr(err, "failed to create heartbeat worker")
go wrkr.Start(context.Background())
go wrkr.Start(ctx)

// graceful shutdown on SIGINT/SIGTERM
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan
fmt.Println("Received termination signal, shutting down...")
wrkr.Stop()

_, err = agentClient.Deregister(context.Background(), agents.DeregisterRequest{})
log.Println("Deregistering agent...")
dCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = agentsCli.Deregister(dCtx, &agents.DeregisterAgentRequest{
InstanceId: agentID,
AgentName: cfg.Name,
})
if err != nil {
log.Printf("failed to deregister agent: %v", err)
}

fmt.Println("Received termination signal, shutting down...")
log.Println("Agent shutdown complete")
}

func handleErr(err error, msg string) {
Expand All @@ -80,8 +108,8 @@ func loadConfig() *config.AgentBootstrapConfig {
Role: spec.DefaultRole,
KryptonRoot: config.KryptonRoot{
Address: config.Address{
Type: config.AddressTypeHTTP,
URL: "http://localhost:" + port,
Type: config.AddressTypeGRPC,
URL: "localhost:" + port,
},
},
}
Expand Down
40 changes: 19 additions & 21 deletions cmd/root/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"errors"
"log"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"google.golang.org/grpc"
Expand All @@ -19,8 +20,8 @@ import (
"github.com/openkcm/krypton/internal/core"
"github.com/openkcm/krypton/internal/spec"
"github.com/openkcm/krypton/internal/worker"
"github.com/openkcm/krypton/pkg/api/agents"
"github.com/openkcm/krypton/pkg/api/v1/proto/admin"
"github.com/openkcm/krypton/pkg/api/v1/proto/agents"
"github.com/openkcm/krypton/pkg/store"
storesql "github.com/openkcm/krypton/pkg/store/sql"
)
Expand All @@ -35,13 +36,6 @@ func main() {
_, err := strconv.Atoi(srvPort)
handleErr(err, "invalid SERVER_PORT value")

grpcPort := os.Getenv("GRPC_PORT")
if grpcPort == "" {
grpcPort = "9090"
}
_, err = strconv.Atoi(grpcPort)
handleErr(err, "invalid GRPC_PORT value")

dsn := os.Getenv("DATABASE_URL")
if dsn == "" {
log.Fatal("DATABASE_URL environment variable is required")
Expand All @@ -66,28 +60,32 @@ func main() {
grpcServer := grpc.NewServer()
admin.RegisterServiceServer(grpcServer, admin.NewService(tenantStore))

lis, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", ":"+grpcPort)
// gRPC server setup for agent API
agents.RegisterServiceServer(grpcServer, agents.NewAgentService(agentStore, *cfg))

lis, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", ":"+srvPort)
handleErr(err, "failed to listen on gRPC port")

go func() {
log.Printf("gRPC server listening on :%s", grpcPort)
log.Printf("gRPC server listening on :%s", srvPort)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve gRPC: %v", err)
}
}()
defer grpcServer.GracefulStop()

// HTTP API server setup (for agents API where http client is still used in tests)
mux := agents.NewServerMux(nil, agentStore, *cfg)

// worker initialization
wrkr := initAgentWorker(agentStore)
go wrkr.Start(context.Background())
defer wrkr.Stop()

log.Printf("HTTP server listening on :%s", srvPort)
err = http.ListenAndServe(":"+srvPort, mux)
handleErr(err, "failed to start server")
// graceful shutdown on SIGINT/SIGTERM
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan
log.Println("Received shutdown signal, stopping server...")
wrkr.Stop()
log.Println("Shutting down gracefully...")
}

// For simplicity, we hardcode a sample root configuration here.
Expand All @@ -108,7 +106,7 @@ func loadConfig() *config.RootConfig {
"K0": {
Vault: spec.VaultSpec{
Name: "root-hsm-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
},
"K1": {
Expand Down Expand Up @@ -142,7 +140,7 @@ func loadConfig() *config.RootConfig {
"K2": {
Vault: spec.VaultSpec{
Name: "aws-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
ParentKeyProvider: &spec.ParentKeyProviderRef{
AgentName: "root",
Expand All @@ -151,7 +149,7 @@ func loadConfig() *config.RootConfig {
"K3": {
Vault: spec.VaultSpec{
Name: "aws-dek-vault",
Type: "aws-kms",
Type: spec.VaultTypeInMemory,
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type AddressType string
const (
// AddressTypeHTTP represents HTTP/HTTPS transport.
AddressTypeHTTP AddressType = "http"
// AddressTypeGRPC represents gRPC transport.
AddressTypeGRPC AddressType = "grpc"
)

// Address represents a network address for inter-service communication.
Expand Down
41 changes: 0 additions & 41 deletions internal/krhttp/client.go

This file was deleted.

56 changes: 0 additions & 56 deletions internal/krhttp/client_test.go

This file was deleted.

Loading
Loading