Skip to content

Commit 23eddac

Browse files
authored
Introduce plugin logpersister package (#5342)
* Introduce plugin logpersister package Signed-off-by: khanhtc1202 <[email protected]> * Add interface comments Signed-off-by: khanhtc1202 <[email protected]> --------- Signed-off-by: khanhtc1202 <[email protected]>
1 parent 519d3fc commit 23eddac

File tree

12 files changed

+1103
-127
lines changed

12 files changed

+1103
-127
lines changed

pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go

+50-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020

2121
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/cmd/piped/service"
22+
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
2223
"github.com/pipe-cd/pipecd/pkg/config"
2324
"github.com/pipe-cd/pipecd/pkg/crypto"
2425
"github.com/pipe-cd/pipecd/pkg/model"
@@ -30,19 +31,27 @@ import (
3031
type PluginAPI struct {
3132
service.PluginServiceServer
3233

33-
cfg *config.PipedSpec
34+
cfg *config.PipedSpec
35+
apiClient apiClient
36+
3437
Logger *zap.Logger
3538
}
3639

40+
type apiClient interface {
41+
ReportStageLogs(ctx context.Context, req *pipedservice.ReportStageLogsRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsResponse, error)
42+
ReportStageLogsFromLastCheckpoint(ctx context.Context, in *pipedservice.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsFromLastCheckpointResponse, error)
43+
}
44+
3745
// Register registers all handling of this service into the specified gRPC server.
3846
func (a *PluginAPI) Register(server *grpc.Server) {
3947
service.RegisterPluginServiceServer(server, a)
4048
}
4149

42-
func NewPluginAPI(cfg *config.PipedSpec, logger *zap.Logger) *PluginAPI {
50+
func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, logger *zap.Logger) *PluginAPI {
4351
return &PluginAPI{
44-
cfg: cfg,
45-
Logger: logger.Named("plugin-api"),
52+
cfg: cfg,
53+
apiClient: apiClient,
54+
Logger: logger.Named("plugin-api"),
4655
}
4756
}
4857

@@ -71,6 +80,43 @@ func (a *PluginAPI) DecryptSecret(ctx context.Context, req *service.DecryptSecre
7180
}, nil
7281
}
7382

83+
func (a *PluginAPI) ReportStageLogs(ctx context.Context, req *service.ReportStageLogsRequest) (*service.ReportStageLogsResponse, error) {
84+
_, err := a.apiClient.ReportStageLogs(ctx, &pipedservice.ReportStageLogsRequest{
85+
DeploymentId: req.DeploymentId,
86+
StageId: req.StageId,
87+
RetriedCount: req.RetriedCount,
88+
Blocks: req.Blocks,
89+
})
90+
if err != nil {
91+
a.Logger.Error("failed to report stage logs",
92+
zap.String("deploymentID", req.DeploymentId),
93+
zap.String("stageID", req.StageId),
94+
zap.Error(err))
95+
return nil, err
96+
}
97+
98+
return &service.ReportStageLogsResponse{}, nil
99+
}
100+
101+
func (a *PluginAPI) ReportStageLogsFromLastCheckpoint(ctx context.Context, req *service.ReportStageLogsFromLastCheckpointRequest) (*service.ReportStageLogsFromLastCheckpointResponse, error) {
102+
_, err := a.apiClient.ReportStageLogsFromLastCheckpoint(ctx, &pipedservice.ReportStageLogsFromLastCheckpointRequest{
103+
DeploymentId: req.DeploymentId,
104+
StageId: req.StageId,
105+
RetriedCount: req.RetriedCount,
106+
Blocks: req.Blocks,
107+
Completed: req.Completed,
108+
})
109+
if err != nil {
110+
a.Logger.Error("failed to report stage logs from last checkpoint",
111+
zap.String("deploymentID", req.DeploymentId),
112+
zap.String("stageID", req.StageId),
113+
zap.Error(err))
114+
return nil, err
115+
}
116+
117+
return &service.ReportStageLogsFromLastCheckpointResponse{}, nil
118+
}
119+
74120
func initializeSecretDecrypter(sm *config.SecretManagement) (crypto.Decrypter, error) {
75121
if sm == nil {
76122
return nil, nil

pkg/app/pipedv1/cmd/piped/piped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
304304
// Start running plugin service server.
305305
{
306306
var (
307-
service = grpcapi.NewPluginAPI(cfg, input.Logger)
307+
service = grpcapi.NewPluginAPI(cfg, apiClient, input.Logger)
308308
opts = []rpc.Option{
309309
rpc.WithPort(p.pluginServicePort),
310310
rpc.WithGracePeriod(p.gracePeriod),

0 commit comments

Comments
 (0)