Skip to content

Commit 03abb4a

Browse files
authored
[fix][evaluation] kill expt (#285)
fix(evaluation): panic fix(evaluation): BatchGetExperimentResult with sorted turn results fix(evaluation): BatchGetExperimentResult panic fix(evaluation): kill expt fix(evaluation): kill expt fix(evaluation): ut fix(evaluation): expt terminating for time cost fix(evaluation): ut
1 parent 280027e commit 03abb4a

38 files changed

+1481
-235
lines changed

backend/kitex_gen/coze/loop/evaluation/domain/expt/expt.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/modules/evaluation/application/experiment_app.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"context"
88
"fmt"
99
"strconv"
10+
"time"
1011

1112
"github.com/bytedance/gg/gptr"
1213

14+
"github.com/coze-dev/coze-loop/backend/infra/backoff"
1315
"github.com/coze-dev/coze-loop/backend/infra/idgen"
1416
"github.com/coze-dev/coze-loop/backend/kitex_gen/base"
1517
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/evaluation"
@@ -28,6 +30,7 @@ import (
2830
"github.com/coze-dev/coze-loop/backend/modules/evaluation/pkg/errno"
2931
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
3032
"github.com/coze-dev/coze-loop/backend/pkg/json"
33+
"github.com/coze-dev/coze-loop/backend/pkg/lang/goroutine"
3134
"github.com/coze-dev/coze-loop/backend/pkg/lang/maps"
3235
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
3336
"github.com/coze-dev/coze-loop/backend/pkg/lang/slices"
@@ -514,27 +517,49 @@ func (e *experimentApplication) RetryExperiment(ctx context.Context, req *expt.R
514517

515518
func (e *experimentApplication) KillExperiment(ctx context.Context, req *expt.KillExperimentRequest) (r *expt.KillExperimentResponse, err error) {
516519
session := entity.NewSession(ctx)
520+
logs.CtxInfo(ctx, "KillExperiment receive req, expt_id: %v, user_id: %v", req.GetExptID(), session.UserID)
517521

518522
got, err := e.manager.Get(ctx, req.GetExptID(), req.GetWorkspaceID(), session)
519523
if err != nil {
520524
return nil, err
521525
}
522526

523-
err = e.auth.AuthorizationWithoutSPI(ctx, &rpc.AuthorizationWithoutSPIParam{
524-
ObjectID: strconv.FormatInt(req.GetExptID(), 10),
525-
SpaceID: req.GetWorkspaceID(),
526-
ActionObjects: []*rpc.ActionObject{{Action: gptr.Of(consts.Run), EntityType: gptr.Of(rpc.AuthEntityType_EvaluationExperiment)}},
527-
OwnerID: gptr.Of(got.CreatedBy),
528-
ResourceSpaceID: req.GetWorkspaceID(),
529-
})
530-
if err != nil {
531-
return nil, err
527+
if got.Status != entity.ExptStatus_Processing {
528+
return nil, errorx.NewByCode(errno.TerminateNonRunningExperimentErrorCode)
532529
}
533530

534-
if err := e.manager.CompleteExpt(ctx, req.GetExptID(), req.GetWorkspaceID(), session, entity.WithStatus(entity.ExptStatus_Terminated)); err != nil {
531+
if !e.configer.GetMaintainerUserIDs(ctx)[session.UserID] {
532+
if err := e.auth.AuthorizationWithoutSPI(ctx, &rpc.AuthorizationWithoutSPIParam{
533+
ObjectID: strconv.FormatInt(req.GetExptID(), 10),
534+
SpaceID: req.GetWorkspaceID(),
535+
ActionObjects: []*rpc.ActionObject{{Action: gptr.Of(consts.Run), EntityType: gptr.Of(rpc.AuthEntityType_EvaluationExperiment)}},
536+
OwnerID: gptr.Of(got.CreatedBy),
537+
ResourceSpaceID: req.GetWorkspaceID(),
538+
}); err != nil {
539+
return nil, err
540+
}
541+
}
542+
543+
if err := e.manager.SetExptTerminating(ctx, req.GetExptID(), got.LatestRunID, req.GetWorkspaceID(), session); err != nil {
535544
return nil, err
536545
}
537546

547+
kill := func(ctx context.Context, exptID, exptRunID, spaceID int64, session *entity.Session) error {
548+
if err := e.manager.CompleteRun(ctx, exptID, exptRunID, spaceID, session, entity.WithStatus(entity.ExptStatus_Terminated)); err != nil {
549+
return err
550+
}
551+
return e.manager.CompleteExpt(ctx, exptID, spaceID, session,
552+
entity.WithStatus(entity.ExptStatus_Terminated), entity.WithCompleteInterval(time.Second), entity.NoAggrCalculate())
553+
}
554+
555+
goroutine.Go(ctx, func() {
556+
if err := backoff.RetryWithElapsedTime(ctx, time.Minute*3, func() error {
557+
return kill(ctx, req.GetExptID(), got.LatestRunID, req.GetWorkspaceID(), session)
558+
}); err != nil {
559+
logs.CtxInfo(ctx, "kill expt failed, expt_id: %v, err: %v", req.GetExptID(), err)
560+
}
561+
})
562+
538563
return &expt.KillExperimentResponse{BaseResp: base.NewBaseResp()}, nil
539564
}
540565

backend/modules/evaluation/application/experiment_app_test.go

Lines changed: 156 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
repo_mocks "github.com/coze-dev/coze-loop/backend/modules/evaluation/domain/repo/mocks"
1919

2020
idgenmock "github.com/coze-dev/coze-loop/backend/infra/idgen/mocks"
21+
"github.com/coze-dev/coze-loop/backend/infra/middleware/session"
2122
"github.com/coze-dev/coze-loop/backend/kitex_gen/base"
2223
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/data/domain/tag"
2324
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/evaluation/domain/common"
@@ -1814,11 +1815,13 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
18141815
// Create mock objects
18151816
mockManager := servicemocks.NewMockIExptManager(ctrl)
18161817
mockAuth := rpcmocks.NewMockIAuthProvider(ctrl)
1818+
mockConfiger := componentMocks.NewMockIConfiger(ctrl)
18171819

18181820
// Test data
18191821
validWorkspaceID := int64(123)
18201822
validExptID := int64(456)
18211823
validUserID := int64(789)
1824+
validRunID := int64(999)
18221825

18231826
tests := []struct {
18241827
name string
@@ -1828,19 +1831,59 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
18281831
wantErr bool
18291832
}{
18301833
{
1831-
name: "successfully terminate experiment",
1834+
name: "successfully terminate experiment with maintainer permission",
18321835
req: &exptpb.KillExperimentRequest{
18331836
WorkspaceID: gptr.Of(validWorkspaceID),
18341837
ExptID: gptr.Of(validExptID),
18351838
},
18361839
mockSetup: func() {
18371840
// 获取实验信息
18381841
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(&entity.Experiment{
1839-
ID: validExptID,
1840-
SpaceID: validWorkspaceID,
1841-
CreatedBy: strconv.FormatInt(validUserID, 10),
1842+
ID: validExptID,
1843+
SpaceID: validWorkspaceID,
1844+
CreatedBy: strconv.FormatInt(validUserID, 10),
1845+
LatestRunID: validRunID,
1846+
Status: entity.ExptStatus_Processing,
1847+
}, nil)
1848+
1849+
// Maintainer权限检查 - 用户是maintainer
1850+
mockConfiger.EXPECT().GetMaintainerUserIDs(gomock.Any()).Return(map[string]bool{
1851+
strconv.FormatInt(validUserID, 10): true,
1852+
})
1853+
1854+
// 设置终止中状态(实现中同步执行)
1855+
mockManager.EXPECT().SetExptTerminating(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any()).Return(nil)
1856+
1857+
// 异步终止:允许在后台调用,不校验调用次数
1858+
mockManager.EXPECT().CompleteRun(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
1859+
mockManager.EXPECT().CompleteExpt(gomock.Any(), validExptID, validWorkspaceID, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
1860+
},
1861+
wantResp: &exptpb.KillExperimentResponse{
1862+
BaseResp: base.NewBaseResp(),
1863+
},
1864+
wantErr: false,
1865+
},
1866+
{
1867+
name: "successfully terminate experiment with regular permission",
1868+
req: &exptpb.KillExperimentRequest{
1869+
WorkspaceID: gptr.Of(validWorkspaceID),
1870+
ExptID: gptr.Of(validExptID),
1871+
},
1872+
mockSetup: func() {
1873+
// 获取实验信息
1874+
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(&entity.Experiment{
1875+
ID: validExptID,
1876+
SpaceID: validWorkspaceID,
1877+
CreatedBy: strconv.FormatInt(validUserID, 10),
1878+
LatestRunID: validRunID,
1879+
Status: entity.ExptStatus_Processing,
18421880
}, nil)
18431881

1882+
// Maintainer权限检查 - 用户不是maintainer
1883+
mockConfiger.EXPECT().GetMaintainerUserIDs(gomock.Any()).Return(map[string]bool{
1884+
"other_user": true,
1885+
})
1886+
18441887
// 权限验证
18451888
mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), &rpc.AuthorizationWithoutSPIParam{
18461889
ObjectID: strconv.FormatInt(validExptID, 10),
@@ -1850,19 +1893,12 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
18501893
ResourceSpaceID: validWorkspaceID,
18511894
}).Return(nil)
18521895

1853-
// 终止实验
1854-
mockManager.EXPECT().CompleteExpt(gomock.Any(), validExptID, validWorkspaceID, gomock.Any(), gomock.Any()).DoAndReturn(
1855-
func(ctx context.Context, exptID, spaceID int64, session *entity.Session, opts ...entity.CompleteExptOptionFn) error {
1856-
// 验证传入的 opts 是否包含正确的状态设置
1857-
opt := &entity.CompleteExptOption{}
1858-
for _, fn := range opts {
1859-
fn(opt)
1860-
}
1861-
if opt.Status != entity.ExptStatus_Terminated {
1862-
t.Errorf("expected status %v, got %v", entity.ExptStatus_Terminated, opt.Status)
1863-
}
1864-
return nil
1865-
})
1896+
// 设置终止中状态(实现中同步执行)
1897+
mockManager.EXPECT().SetExptTerminating(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any()).Return(nil)
1898+
1899+
// 异步终止:允许在后台调用,不校验调用次数
1900+
mockManager.EXPECT().CompleteRun(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
1901+
mockManager.EXPECT().CompleteExpt(gomock.Any(), validExptID, validWorkspaceID, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
18661902
},
18671903
wantResp: &exptpb.KillExperimentResponse{
18681904
BaseResp: base.NewBaseResp(),
@@ -1881,6 +1917,102 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
18811917
wantResp: nil,
18821918
wantErr: true,
18831919
},
1920+
{
1921+
name: "permission validation failed for regular user",
1922+
req: &exptpb.KillExperimentRequest{
1923+
WorkspaceID: gptr.Of(validWorkspaceID),
1924+
ExptID: gptr.Of(validExptID),
1925+
},
1926+
mockSetup: func() {
1927+
// 获取实验信息
1928+
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(&entity.Experiment{
1929+
ID: validExptID,
1930+
SpaceID: validWorkspaceID,
1931+
CreatedBy: strconv.FormatInt(validUserID, 10),
1932+
LatestRunID: validRunID,
1933+
Status: entity.ExptStatus_Processing,
1934+
}, nil)
1935+
1936+
// Maintainer权限检查 - 用户不是maintainer
1937+
mockConfiger.EXPECT().GetMaintainerUserIDs(gomock.Any()).Return(map[string]bool{
1938+
"other_user": true,
1939+
})
1940+
1941+
// 权限验证失败
1942+
mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), &rpc.AuthorizationWithoutSPIParam{
1943+
ObjectID: strconv.FormatInt(validExptID, 10),
1944+
SpaceID: validWorkspaceID,
1945+
ActionObjects: []*rpc.ActionObject{{Action: gptr.Of(consts.Run), EntityType: gptr.Of(rpc.AuthEntityType_EvaluationExperiment)}},
1946+
OwnerID: gptr.Of(strconv.FormatInt(validUserID, 10)),
1947+
ResourceSpaceID: validWorkspaceID,
1948+
}).Return(errorx.NewByCode(errno.CommonNoPermissionCode))
1949+
},
1950+
wantResp: nil,
1951+
wantErr: true,
1952+
},
1953+
{
1954+
name: "complete run failed",
1955+
req: &exptpb.KillExperimentRequest{
1956+
WorkspaceID: gptr.Of(validWorkspaceID),
1957+
ExptID: gptr.Of(validExptID),
1958+
},
1959+
mockSetup: func() {
1960+
// 获取实验信息
1961+
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(&entity.Experiment{
1962+
ID: validExptID,
1963+
SpaceID: validWorkspaceID,
1964+
CreatedBy: strconv.FormatInt(validUserID, 10),
1965+
LatestRunID: validRunID,
1966+
Status: entity.ExptStatus_Processing,
1967+
}, nil)
1968+
1969+
// Maintainer权限检查 - 用户是maintainer
1970+
mockConfiger.EXPECT().GetMaintainerUserIDs(gomock.Any()).Return(map[string]bool{
1971+
strconv.FormatInt(validUserID, 10): true,
1972+
})
1973+
1974+
// 设置终止中状态
1975+
mockManager.EXPECT().SetExptTerminating(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any()).Return(nil)
1976+
1977+
// 异步终止运行失败:允许后台调用
1978+
mockManager.EXPECT().CompleteRun(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any(), gomock.Any()).Return(
1979+
errorx.NewByCode(errno.CommonInternalErrorCode)).AnyTimes()
1980+
},
1981+
wantResp: &exptpb.KillExperimentResponse{BaseResp: base.NewBaseResp()},
1982+
wantErr: false,
1983+
},
1984+
{
1985+
name: "complete experiment failed",
1986+
req: &exptpb.KillExperimentRequest{
1987+
WorkspaceID: gptr.Of(validWorkspaceID),
1988+
ExptID: gptr.Of(validExptID),
1989+
},
1990+
mockSetup: func() {
1991+
// 获取实验信息
1992+
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(&entity.Experiment{
1993+
ID: validExptID,
1994+
SpaceID: validWorkspaceID,
1995+
CreatedBy: strconv.FormatInt(validUserID, 10),
1996+
LatestRunID: validRunID,
1997+
Status: entity.ExptStatus_Processing,
1998+
}, nil)
1999+
2000+
// Maintainer权限检查 - 用户是maintainer
2001+
mockConfiger.EXPECT().GetMaintainerUserIDs(gomock.Any()).Return(map[string]bool{
2002+
strconv.FormatInt(validUserID, 10): true,
2003+
})
2004+
2005+
// 设置终止中状态
2006+
mockManager.EXPECT().SetExptTerminating(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any()).Return(nil)
2007+
2008+
// 异步终止
2009+
mockManager.EXPECT().CompleteRun(gomock.Any(), validExptID, validRunID, validWorkspaceID, gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
2010+
mockManager.EXPECT().CompleteExpt(gomock.Any(), validExptID, validWorkspaceID, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(
2011+
errorx.NewByCode(errno.CommonInternalErrorCode)).AnyTimes()
2012+
},
2013+
wantResp: &exptpb.KillExperimentResponse{BaseResp: base.NewBaseResp()},
2014+
wantErr: false,
2015+
},
18842016
}
18852017

18862018
for _, tt := range tests {
@@ -1896,7 +2028,7 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
18962028
nil, // scheduler
18972029
nil, // recordEval
18982030
nil,
1899-
nil, // configer
2031+
mockConfiger, // configer
19002032
mockAuth,
19012033
nil, // userInfoService
19022034
nil, // evalTargetService
@@ -1907,8 +2039,13 @@ func TestExperimentApplication_KillExperiment(t *testing.T) {
19072039
nil,
19082040
)
19092041

2042+
// 设置 context 中的 UserID,这样 entity.NewSession 才能获取到 UserID
2043+
ctx := session.WithCtxUser(context.Background(), &session.User{
2044+
ID: strconv.FormatInt(validUserID, 10),
2045+
})
2046+
19102047
// 执行测试
1911-
gotResp, err := app.KillExperiment(context.Background(), tt.req)
2048+
gotResp, err := app.KillExperiment(ctx, tt.req)
19122049

19132050
// 验证结果
19142051
if tt.wantErr {

backend/modules/evaluation/domain/component/conf.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ type IConfiger interface {
1818
GetExptTurnResultFilterBmqProducerCfg(ctx context.Context) *entity.BmqProducerCfg
1919
GetCKDBName(ctx context.Context) *entity.CKDBConfig
2020
GetExptExportWhiteList(ctx context.Context) *entity.ExptExportWhiteList
21+
GetMaintainerUserIDs(ctx context.Context) map[string]bool
2122
}

backend/modules/evaluation/domain/component/mocks/expt_configer.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/modules/evaluation/domain/entity/common.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,7 @@ func StorageProviderFromString(s string) (StorageProvider, error) {
355355
}
356356

357357
func StorageProviderPtr(v StorageProvider) *StorageProvider { return &v }
358+
359+
type SystemMaintainerConf struct {
360+
UserIDs []string `json:"user_ids" mapstructure:"user_ids"`
361+
}

backend/modules/evaluation/domain/entity/expt.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
ExptStatus_Terminated ExptStatus = 13
3636
// System terminated
3737
ExptStatus_SystemTerminated ExptStatus = 14
38+
ExptStatus_Terminating ExptStatus = 15
3839

3940
// 流式执行完成,不再接收新的请求
4041
ExptStatus_Draining ExptStatus = 21
@@ -265,8 +266,6 @@ type ExptCalculateStats struct {
265266
SuccessItemCnt int
266267
ProcessingItemCnt int
267268
TerminatedItemCnt int
268-
269-
IncompleteTurnIDs []*ItemTurnID
270269
}
271270

272271
type ItemTurnID struct {

0 commit comments

Comments
 (0)