Skip to content

Commit

Permalink
implementation for summary of list of records
Browse files Browse the repository at this point in the history
- returns aggregations: total, statuses, avg, max, min and total
  duration
- Grouped aggregations on namespace, pipeline and time duration

Signed-off-by: Avinal Kumar <[email protected]>
Co-authored-by: Khurram Baig <[email protected]>
  • Loading branch information
avinal and khrm committed Dec 6, 2023
1 parent 0d441f4 commit 50b33b7
Show file tree
Hide file tree
Showing 9 changed files with 843 additions and 756 deletions.
238 changes: 238 additions & 0 deletions pkg/api/server/v1alpha2/lister/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package lister

import (
"context"
"fmt"
"strings"

"github.com/google/cel-go/cel"
tdb "github.com/tektoncd/results/pkg/api/server/db"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"
)

const (
durationQuery = "(data->'status'->>'completionTime')::TIMESTAMP WITH TIME ZONE - (data->'status'->>'startTime')::TIMESTAMP WITH TIME ZONE"
statusQuery = "(data->'status'->'conditions'->0->>'reason')"
groupByTimeQuery = "(data->'metadata'->>'creationTimestamp')::TIMESTAMP WITH TIME ZONE"
groupByParentQuery = "data->'metadata'->>'namespace'"
groupByPipelineQuery = "data->'metadata'->'labels'->>'tekton.dev/pipeline'"
)

type summaryRequest interface {
GetParent() string
GetFilter() string
GetGroupBy() string
GetSummary() string
}

// Aggregator contains the query builders for filters and aggregate functions for summary
type Aggregator struct {
queryBuilders []queryBuilder
aggregators []aggregateFunc
}

// buildQuery applies filters
func (a *Aggregator) buildQuery(ctx context.Context, db *gorm.DB) (*gorm.DB, error) {
var err error
db = db.WithContext(ctx)

for _, builder := range a.queryBuilders {
db, err = builder.build(db)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
return db, err
}

// Aggregate function runs the aggregation tasks and returns Summary
func (a *Aggregator) Aggregate(ctx context.Context, db *gorm.DB) (*pb.RecordListSummary, error) {
var err error
summary := make([]map[string]interface{}, 0)
db = db.Model(&tdb.Record{})
db, err = a.buildQuery(ctx, db)
if err != nil {
return nil, err
}

db = a.applyAggregateFunc(ctx, db)
db.Scan(&summary)

sm, err := ToSummary(summary)
if err != nil {
return nil, err
}
return sm, nil
}

// ToSummary converts the array of summary map to Summary proto
func ToSummary(summary []map[string]interface{}) (*pb.RecordListSummary, error) {
var data []*structpb.Struct
for _, s := range summary {
m := make(map[string]*structpb.Value)
for sk, sv := range s {
pbValue, err := structpb.NewValue(sv)
if err != nil {
return nil, err
}
m[sk] = pbValue
}
data = append(data, &structpb.Struct{Fields: m})
}

return &pb.RecordListSummary{
Summary: data,
}, nil
}

var validGroups = map[string]bool{
"year": true,
"month": true,
"week": true,
"day": true,
"hour": true,
"minute": true,
"pipeline": false,
"namespace": false,
}

func newAggregator(env *cel.Env, aggregateObjectRequest summaryRequest, clauses ...equalityClause) (*Aggregator, error) {
filter := &filter{
env: env,
expr: strings.TrimSpace(aggregateObjectRequest.GetFilter()),
equalityClauses: clauses,
}

summary := strings.Split(strings.TrimSpace(aggregateObjectRequest.GetSummary()), ",")
if len(summary) == 0 {
summary = append(summary, "total")
}

aggregators, err := getAggregateFunc(summary)
if err != nil {
return nil, err
}

group := strings.TrimSpace(aggregateObjectRequest.GetGroupBy())
if group != "" {
groupQuery, err := checkAndBuildGroupQuery(group)
if err != nil {
return nil, err
}
aggregators = append(aggregators, groupBy(groupQuery))
}

return &Aggregator{
aggregators: aggregators,
queryBuilders: []queryBuilder{
filter,
},
}, nil
}

func getDuration(fn, query, value string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
return db.Select(db.Statement.Selects, fn+"("+query+")::INTERVAL as "+value)
}
}

func groupBy(groupSelect string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
return db.Select(db.Statement.Selects, groupSelect).Group("group_value")
}
}

func checkAndBuildGroupQuery(query string) (string, error) {
parts := strings.Split(strings.TrimSpace(query), " ")
isTime, ok := validGroups[parts[0]]
if !ok {
return "", status.Errorf(codes.InvalidArgument, "group_by does not recognize %s", query)
}
switch {
case isTime && len(parts) == 1:
return fmt.Sprintf("EXTRACT(EPOCH FROM DATE_TRUNC('%s', %s)) AS group_value", parts[0], groupByTimeQuery), nil
case isTime && len(parts) == 2:
if parts[1] != "completionTime" && parts[1] != "startTime" {
return "", status.Errorf(codes.InvalidArgument, "group_by does not recognize %s", parts[1])
}
return fmt.Sprintf("EXTRACT(EPOCH FROM DATE_TRUNC('%s', (data->'status'->>'%s')::TIMESTAMP WITH TIME ZONE)) AS group_value", parts[0], parts[1]), nil
case !isTime && len(parts) == 1:
switch parts[0] {
case "namespace":
return fmt.Sprintf("%s AS group_value", groupByParentQuery), nil
case "pipeline":
return fmt.Sprintf("%s AS group_value", groupByPipelineQuery), nil
}
default:
return "", status.Errorf(codes.InvalidArgument, "group_by does not recognize %s", query)
}
return "", nil
}

func getStatus(query, reason string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
statusSelect := ""
if reason == "Others" {
statusSelect = "COUNT(CASE WHEN " + query + " NOT IN ('Failed', 'Succeeded', 'Cancelled', 'Running') THEN 1 END) AS others"
} else {
statusSelect = "COUNT(CASE WHEN " + query + " = '" + reason + "' THEN 1 END) AS " + strings.ToLower(reason)
}
return db.Select(db.Statement.Selects, statusSelect)
}
}

func getCount(query, countName string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
return db.Select(db.Statement.Selects, "COUNT("+query+") AS "+countName)
}
}

type aggregateFunc func(db *gorm.DB) *gorm.DB

var summaryFuncs = map[string]aggregateFunc{
"total": getCount("*", "total"),
"avg_duration": getDuration("AVG", durationQuery, "avg_duration"),
"max_duration": getDuration("MAX", durationQuery, "max_duration"),
"total_duration": getDuration("SUM", durationQuery, "total_duration"),
"min_duration": getDuration("MIN", durationQuery, "min_duration"),
"succeeded": getStatus(statusQuery, "Succeeded"),
"failed": getStatus(statusQuery, "Failed"),
"cancelled": getStatus(statusQuery, "Cancelled"),
"running": getStatus(statusQuery, "Running"),
"others": getStatus(statusQuery, "Others"),
}

func getAggregateFunc(queries []string) ([]aggregateFunc, error) {
fns := make([]aggregateFunc, 0, len(queries))
for _, q := range queries {
fn, ok := summaryFuncs[q]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "invalid aggregate query: %s", q)
}
fns = append(fns, fn)
}
return fns, nil
}

func (a *Aggregator) applyAggregateFunc(ctx context.Context, db *gorm.DB) *gorm.DB {
db = db.WithContext(ctx)
for _, fn := range a.aggregators {
db = fn(db)
}
return db
}

// OfRecordList returns a new Aggregator for Record List Summary Request
func OfRecordList(env *cel.Env, resultParent, resultName string, request *pb.RecordListSummaryRequest) (*Aggregator, error) {
return newAggregator(env, request, equalityClause{
columnName: "parent",
value: resultParent,
}, equalityClause{
columnName: "result_name",
value: resultName,
})
}
64 changes: 64 additions & 0 deletions pkg/api/server/v1alpha2/lister/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package lister

import (
"testing"
)

func TestCheckAndBuildGroupQuery(t *testing.T) {
tests := []struct {
name string
query string
want string
wantErr bool
}{
{
name: "valid time query with one part",
query: "minute",
want: "EXTRACT(EPOCH FROM DATE_TRUNC('minute', (data->'metadata'->>'creationTimestamp')::TIMESTAMP WITH TIME ZONE)) AS group_value",
wantErr: false,
},
{
name: "valid time query with two parts and startTime",
query: "hour startTime",
want: "EXTRACT(EPOCH FROM DATE_TRUNC('hour', (data->'status'->>'startTime')::TIMESTAMP WITH TIME ZONE)) AS group_value",
wantErr: false,
},
{
name: "valid time query with two parts and completionTime",
query: "minute completionTime",
want: "EXTRACT(EPOCH FROM DATE_TRUNC('minute', (data->'status'->>'completionTime')::TIMESTAMP WITH TIME ZONE)) AS group_value",
wantErr: false,
},
{
name: "valid non-time query with parent",
query: "namespace",
want: "data->'metadata'->>'namespace' AS group_value",
wantErr: false,
},
{
name: "valid non-time query with pipeline",
query: "pipeline",
want: "data->'metadata'->'labels'->>'tekton.dev/pipeline' AS group_value",
wantErr: false,
},
{
name: "invalid query",
query: "invalid",
want: "",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := checkAndBuildGroupQuery(tt.query)
if (err != nil) != tt.wantErr {
t.Errorf("checkAndBuildGroupQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("checkAndBuildGroupQuery() = %v, want %v", got, tt.want)
}
})
}
}
40 changes: 40 additions & 0 deletions pkg/api/server/v1alpha2/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package server

import (
"context"

"github.com/tektoncd/results/pkg/api/server/v1alpha2/auth"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/lister"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/result"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// GetRecordListSummary returns the summary and aggregation for a given list of records
func (s *Server) GetRecordListSummary(ctx context.Context, req *pb.RecordListSummaryRequest) (*pb.RecordListSummary, error) {
if req.GetParent() == "" {
return nil, status.Error(codes.InvalidArgument, "parent missing")
}

parent, resultName, err := result.ParseName(req.GetParent())
if err != nil {
return nil, err
}

if err := s.auth.Check(ctx, parent, auth.ResourceRecords, auth.PermissionGet); err != nil {
return nil, err
}

recordAggregator, err := lister.OfRecordList(s.recordsEnv, parent, resultName, req)
if err != nil {
return nil, err
}

agg, err := recordAggregator.Aggregate(ctx, s.db)
if err != nil {
return nil, err
}

return agg, nil
}
18 changes: 6 additions & 12 deletions proto/v1alpha2/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,9 @@ service Results {
};
}

rpc GetResultSummary(GetResultRequest) returns (Summary) {
rpc GetRecordListSummary(RecordListSummaryRequest) returns (RecordListSummary) {
option (google.api.http) = {
get: "/apis/results.tekton.dev/v1alpha2/parents/{name=*/results/*}/summary"
};
}

rpc GetResultsListSummary(ResultListSummaryRequest) returns (Summary) {
option (google.api.http) = {
get: "/apis/results.tekton.dev/v1alpha2/parents/{parent=*}/results/summary"
get: "/apis/results.tekton.dev/v1alpha2/parents/{parent=*/results/*}/records/summary"
};
}
}
Expand Down Expand Up @@ -175,16 +169,16 @@ message GetResultRequest {
}];
}

message ResultListSummaryRequest {
message RecordListSummaryRequest {
string parent = 1 [
(google.api.field_behavior) = REQUIRED,
(google.api.resource_reference) = {
child_type: "tekton.results.v1alpha2/Result"
child_type: "tekton.results.v1alpha2/Record"
}];

string filter = 2;

string group_by = 3;
string summary = 3;
string group_by = 4;
}

message ListResultsRequest {
Expand Down
Loading

0 comments on commit 50b33b7

Please sign in to comment.