Skip to content

Commit

Permalink
implementation for summary of result
Browse files Browse the repository at this point in the history
Signed-off-by: Avinal Kumar <[email protected]>
  • Loading branch information
avinal committed Nov 20, 2023
1 parent 98af257 commit dd1fd69
Show file tree
Hide file tree
Showing 8 changed files with 1,066 additions and 633 deletions.
208 changes: 208 additions & 0 deletions pkg/api/server/v1alpha2/lister/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package lister

import (
"context"
"strings"

"github.com/google/cel-go/cel"
tdb "github.com/tektoncd/results/pkg/api/server/db"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"
)

const (
durationQuery = "(data->'status'->>'completionTime')::TIMESTAMP WITH TIME ZONE - (data->'status'->>'startTime')::TIMESTAMP WITH TIME ZONE"
statusQuery = "(data->'status'->'conditions'->0->>'reason')"
groupQuery = "(data->'status'->>'completionTime')::TIMESTAMP WITH TIME ZONE"
)

type summaryRequest interface {
GetParent() string
GetFilter() string
GetGroupBy() string
GetSummary() string
}

// Aggregator contains the query builders for filters and aggregate functions for summary
type Aggregator struct {
queryBuilders []queryBuilder
aggregators []aggregateFunc
}

// buildQuery applies filters
func (a *Aggregator) buildQuery(ctx context.Context, db *gorm.DB) (*gorm.DB, error) {
var err error
db = db.WithContext(ctx)

for _, builder := range a.queryBuilders {
db, err = builder.build(db)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
return db, err
}

// Aggregate function runs the aggregation tasks and returns Summary
func (a *Aggregator) Aggregate(ctx context.Context, db *gorm.DB) (*pb.Summary, error) {
var err error
summary := make([]map[string]interface{}, 0)
db = db.Model(&tdb.Record{})
db, err = a.buildQuery(ctx, db)
if err != nil {
return nil, err
}

db = a.applyAggregateFunc(ctx, db)
db.Scan(&summary)

sm, err := ToSummary(summary)
if err != nil {
return nil, err
}
return sm, nil
}

// ToSummary converts the array of summary map to Summary proto
func ToSummary(summary []map[string]interface{}) (*pb.Summary, error) {
var data []*structpb.Struct
for _, s := range summary {
m := make(map[string]*structpb.Value)
for sk, sv := range s {
pbValue, err := structpb.NewValue(sv)
if err != nil {
return nil, err
}
m[sk] = pbValue
}
data = append(data, &structpb.Struct{Fields: m})
}

return &pb.Summary{
Summary: data,
}, nil
}

var validGroups = map[string]struct{}{
"year": {},
"month": {},
"week": {},
"day": {},
"hour": {},
"minute": {},
}

func newAggregator(env *cel.Env, aggregateObjectRequest summaryRequest, clauses ...equalityClause) (*Aggregator, error) {
filter := &filter{
env: env,
expr: strings.TrimSpace(aggregateObjectRequest.GetFilter()),
equalityClauses: clauses,
}

summary := strings.Split(strings.TrimSpace(aggregateObjectRequest.GetSummary()), ",")
if len(summary) == 0 {
// make total default
summary = append(summary, "total")
}

aggregators, err := getAggregateFunc(summary)
if err != nil {
return nil, err
}

group := strings.TrimSpace(aggregateObjectRequest.GetGroupBy())
if group != "" {
_, ok := validGroups[group]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "group_by does not recognize %s", group)
}
aggregators = append(aggregators, groupBy(group, groupQuery))
}

return &Aggregator{
aggregators: aggregators,
queryBuilders: []queryBuilder{
filter,
},
}, nil
}

func getDuration(fn, query, value string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
return db.Select(db.Statement.Selects, fn+"("+query+")::INTERVAL as "+value)
}
}

func groupBy(group, query string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
groupSelect := "EXTRACT(EPOCH FROM DATE_TRUNC('" + group + "', " + query + ")) AS group_value"
return db.Select(db.Statement.Selects, groupSelect).Group("group_value")
}
}

func getStatus(query, reason string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
statusSelect := ""
if reason == "Unknown" {
statusSelect = "COUNT(CASE WHEN " + query + " NOT IN ('Failed', 'Succeeded', 'Cancelled') THEN 1 END) AS unknown"
} else {
statusSelect = "COUNT(CASE WHEN " + query + " = '" + reason + "' THEN 1 END) AS " + strings.ToLower(reason)
}
return db.Select(db.Statement.Selects, statusSelect)
}
}

func getCount(query, countName string) aggregateFunc {
return func(db *gorm.DB) *gorm.DB {
return db.Select(db.Statement.Selects, "COUNT("+query+") AS "+countName)
}
}

type aggregateFunc func(db *gorm.DB) *gorm.DB

var summaryFuncs = map[string]aggregateFunc{
"total": getCount("*", "total"),
"avg_duration": getDuration("AVG", durationQuery, "avg_duration"),
"max_duration": getDuration("MAX", durationQuery, "max_duration"),
"total_duration": getDuration("SUM", durationQuery, "total_duration"),
"min_duration": getDuration("MIN", durationQuery, "min_duration"),
"succeeded": getStatus(statusQuery, "Succeeded"),
"failed": getStatus(statusQuery, "Failed"),
"cancelled": getStatus(statusQuery, "Cancelled"),
"completed": getStatus(statusQuery, "Completed"),
"unknown": getStatus(statusQuery, "Unknown"),
}

func getAggregateFunc(queries []string) ([]aggregateFunc, error) {
fns := make([]aggregateFunc, 0, len(queries))
for _, q := range queries {
fn, ok := summaryFuncs[q]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "invalid aggregate query: %s", q)
}
fns = append(fns, fn)
}
return fns, nil
}

func (a *Aggregator) applyAggregateFunc(ctx context.Context, db *gorm.DB) *gorm.DB {
db = db.WithContext(ctx)
for _, fn := range a.aggregators {
db = fn(db)
}
return db
}

// OfRecordList returns a new Aggregator for Record List Summary Request
func OfRecordList(env *cel.Env, resultParent, resultName string, request *pb.RecordListSummaryRequest) (*Aggregator, error) {
return newAggregator(env, request, equalityClause{
columnName: "parent",
value: resultParent,
}, equalityClause{
columnName: "result_name",
value: resultName,
})
}
40 changes: 40 additions & 0 deletions pkg/api/server/v1alpha2/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package server

import (
"context"

"github.com/tektoncd/results/pkg/api/server/v1alpha2/auth"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/lister"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/result"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// GetRecordListSummary returns the summary and aggregation for a given list of records
func (s *Server) GetRecordListSummary(ctx context.Context, req *pb.RecordListSummaryRequest) (*pb.Summary, error) {
if req.GetParent() == "" {
return nil, status.Error(codes.InvalidArgument, "parent missing")
}

parent, resultName, err := result.ParseName(req.GetParent())
if err != nil {
return nil, err
}

if err := s.auth.Check(ctx, parent, auth.ResourceRecords, auth.PermissionGet); err != nil {
return nil, err
}

recordAggregator, err := lister.OfRecordList(s.recordsEnv, parent, resultName, req)
if err != nil {
return nil, err
}

agg, err := recordAggregator.Aggregate(ctx, s.db)
if err != nil {
return nil, err
}

return agg, nil
}
22 changes: 20 additions & 2 deletions proto/v1alpha2/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ service Results {
};
}

rpc GetResultsListSummary(ResultListSummaryRequest) returns (Summary) {
rpc GetResultListSummary(ResultListSummaryRequest) returns (Summary) {
option (google.api.http) = {
get: "/apis/results.tekton.dev/v1alpha2/parents/{parent=*}/results/summary"
};
}

rpc GetRecordListSummary(RecordListSummaryRequest) returns (Summary) {
option (google.api.http) = {
get: "/apis/results.tekton.dev/v1alpha2/parents/{parent=*/results/*}/records/summary"
};
}
}

service Logs {
Expand Down Expand Up @@ -183,8 +189,20 @@ message ResultListSummaryRequest {
}];

string filter = 2;
string summary = 3;
string group_by = 4;
}

message RecordListSummaryRequest {
string parent = 1 [
(google.api.field_behavior) = REQUIRED,
(google.api.resource_reference) = {
child_type: "tekton.results.v1alpha2/Record"
}];

string group_by = 3;
string filter = 2;
string summary = 3;
string group_by = 4;
}

message ListResultsRequest {
Expand Down
15 changes: 3 additions & 12 deletions proto/v1alpha2/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tekton.results.v1alpha2;
import "google/api/field_behavior.proto";
import "google/api/resource.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

option go_package = "github.com/tektoncd/results/proto/v1alpha2/results_go_proto";

Expand Down Expand Up @@ -178,17 +179,7 @@ message LogSummary {
}

message Summary {
// The query that was used to generate this summary
string query = 1;

// The criteria for grouping
string group_by = 2;

// The aggregated results of the query. It is a map of the form "group-name" : <map-of-aggregations>
// for a non grouped response it is "default": <map-of-aggregations>
map<string, Aggregations> data = 3;
repeated google.protobuf.Struct summary = 1;
}

message Aggregations {
map<string, string> aggregations = 1;
}

Loading

0 comments on commit dd1fd69

Please sign in to comment.