Skip to content
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ release/deployment/helm-chart/umbrella/Chart.lock

**/kitex_remote_config.json
.coda/
.coco
backend/script/errorx/.env
.cursor/
AGENTS.md
3 changes: 3 additions & 0 deletions backend/infra/platestwrite/latest_write_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ const (
ResourceTypeTarget ResourceType = "eval_target"
ResourceTypeTargetVersion ResourceType = "eval_target_version"
ResourceTypeEvaluator ResourceType = "evaluator"

ResourceTypeExptInsightAnalysisRecord ResourceType = "expt_insight_analysis_record"
ResourceTypeExptInsightAnalysisFeedback ResourceType = "expt_insight_analysis_feedback"
)
15 changes: 13 additions & 2 deletions backend/modules/evaluation/application/experiment_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,12 +1129,21 @@ func (e *experimentApplication) InsightAnalysisExperiment(ctx context.Context, r
if err != nil {
return nil, err
}

var startTime, endTime *int64
if got.StartAt != nil {
startTime = gptr.Of(got.StartAt.UnixMilli())
}
if got.EndAt != nil {
endTime = gptr.Of(got.EndAt.UnixMilli())
}

recordID, err := e.CreateAnalysisRecord(ctx, &entity.ExptInsightAnalysisRecord{
SpaceID: req.GetWorkspaceID(),
ExptID: req.GetExptID(),
CreatedBy: session.UserID,
Status: entity.InsightAnalysisStatus_Running,
}, session)
}, session, gptr.Indirect(startTime), gptr.Indirect(endTime))
if err != nil {
return nil, err
}
Expand All @@ -1151,7 +1160,6 @@ func (e *experimentApplication) ListExptInsightAnalysisRecord(ctx context.Contex
UserID: strconv.FormatInt(gptr.Indirect(req.Session.UserID), 10),
}
}

err = e.auth.Authorization(ctx, &rpc.AuthorizationParam{
ObjectID: strconv.FormatInt(req.WorkspaceID, 10),
SpaceID: req.WorkspaceID,
Expand All @@ -1160,6 +1168,9 @@ func (e *experimentApplication) ListExptInsightAnalysisRecord(ctx context.Contex
if err != nil {
return nil, err
}

// First record contains the upvote/downvote count info for display purpose,
// Other records' feedback is not necessary for this list api
records, total, err := e.ListAnalysisRecord(ctx, req.GetWorkspaceID(), req.GetExptID(), entity.NewPage(int(req.GetPageNumber()), int(req.GetPageSize())), session)
if err != nil {
return nil, err
Expand Down
17 changes: 13 additions & 4 deletions backend/modules/evaluation/application/experiment_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"strconv"
"testing"
"time"

"github.com/bytedance/gg/gptr"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -3736,11 +3737,15 @@ func TestInsightAnalysisExperiment(t *testing.T) {

t.Run("成功创建洞察分析", func(t *testing.T) {
// Mock the manager.Get call
mockManager.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&entity.Experiment{CreatedBy: "test-user"}, nil)
mockManager.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&entity.Experiment{
CreatedBy: "test-user",
StartAt: &[]time.Time{time.Now()}[0],
EndAt: &[]time.Time{time.Now()}[0],
}, nil)
// Mock the auth.AuthorizationWithoutSPI call
mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), gomock.Any()).Return(nil)
// Mock the CreateAnalysisRecord call
mockInsightService.EXPECT().CreateAnalysisRecord(gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(123), nil)
mockInsightService.EXPECT().CreateAnalysisRecord(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(123), nil)

_, err := app.InsightAnalysisExperiment(ctx, req)
assert.NoError(t, err)
Expand All @@ -3764,9 +3769,13 @@ func TestInsightAnalysisExperiment(t *testing.T) {
})

t.Run("创建分析记录失败", func(t *testing.T) {
mockManager.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&entity.Experiment{CreatedBy: "test-user"}, nil)
mockManager.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&entity.Experiment{
CreatedBy: "test-user",
StartAt: &[]time.Time{time.Now()}[0],
EndAt: &[]time.Time{time.Now()}[0],
}, nil)
mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), gomock.Any()).Return(nil)
mockInsightService.EXPECT().CreateAnalysisRecord(gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), errors.New("create analysis record error"))
mockInsightService.EXPECT().CreateAnalysisRecord(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(int64(0), errors.New("create analysis record error"))

_, err := app.InsightAnalysisExperiment(ctx, req)
assert.Error(t, err)
Expand Down
10 changes: 5 additions & 5 deletions backend/modules/evaluation/application/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import (

//go:generate mockgen -destination=mocks/trace_agent.go -package=mocks . IAgentAdapter
type IAgentAdapter interface {
CallTraceAgent(ctx context.Context, spaceID int64, url string) (int64, error)
CallTraceAgent(ctx context.Context, spaceID int64, url string, startTime, endTime int64) (int64, error)
GetReport(ctx context.Context, spaceID, reportID int64) (report string, status entity.ReportStatus, err error)
}
8 changes: 5 additions & 3 deletions backend/modules/evaluation/domain/entity/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ type ExportCSVEvent struct {
ExperimentID int64
SpaceID int64

Session *Session
ExportScene ExportScene
CreatedAt int64
Session *Session
ExportScene ExportScene
CreatedAt int64
ExptStartTime int64 // Unix Time
ExptEndTime int64 // Unix Time
}

type ExportScene int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const (
InsightAnalysisStatus_Failed InsightAnalysisStatus = 3
)

const (
InsightAnalysisRunningTimeout = 2 * time.Hour
)

type ExptInsightAnalysisRecord struct {
ID int64
SpaceID int64
Expand Down
4 changes: 2 additions & 2 deletions backend/modules/evaluation/domain/service/insight_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

type IExptInsightAnalysisService interface {
CreateAnalysisRecord(ctx context.Context, record *entity.ExptInsightAnalysisRecord, session *entity.Session) (int64, error)
GenAnalysisReport(ctx context.Context, spaceID, exptID, recordID, CreateAt int64) error
CreateAnalysisRecord(ctx context.Context, record *entity.ExptInsightAnalysisRecord, session *entity.Session, startTime, endTime int64) (int64, error)
GenAnalysisReport(ctx context.Context, spaceID, exptID, recordID, CreateAt, startTime, endTime int64) error
GetAnalysisRecordByID(ctx context.Context, spaceID, exptID, recordID int64, session *entity.Session) (*entity.ExptInsightAnalysisRecord, error)
ListAnalysisRecord(ctx context.Context, spaceID, exptID int64, page entity.Page, session *entity.Session) ([]*entity.ExptInsightAnalysisRecord, int64, error)
DeleteAnalysisRecord(ctx context.Context, spaceID, exptID, recordID int64) error
Expand Down
82 changes: 68 additions & 14 deletions backend/modules/evaluation/domain/service/insight_analysis_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,20 @@ func NewInsightAnalysisService(repo repo.IExptInsightAnalysisRecordRepo,
}
}

func (e ExptInsightAnalysisServiceImpl) CreateAnalysisRecord(ctx context.Context, record *entity.ExptInsightAnalysisRecord, session *entity.Session) (int64, error) {
func (e ExptInsightAnalysisServiceImpl) CreateAnalysisRecord(ctx context.Context, record *entity.ExptInsightAnalysisRecord, session *entity.Session, startTime, endTime int64) (int64, error) {
recordID, err := e.repo.CreateAnalysisRecord(ctx, record)
if err != nil {
return 0, err
}

exportEvent := &entity.ExportCSVEvent{
ExportID: recordID,
ExperimentID: record.ExptID,
SpaceID: record.SpaceID,
ExportScene: entity.ExportSceneInsightAnalysis,
CreatedAt: time.Now().Unix(),
ExportID: recordID,
ExperimentID: record.ExptID,
SpaceID: record.SpaceID,
ExportScene: entity.ExportSceneInsightAnalysis,
CreatedAt: time.Now().Unix(),
ExptStartTime: startTime,
ExptEndTime: endTime,
}
err = e.exptPublisher.PublishExptExportCSVEvent(ctx, exportEvent, gptr.Of(time.Second*3))
if err != nil {
Expand All @@ -76,7 +78,7 @@ func (e ExptInsightAnalysisServiceImpl) CreateAnalysisRecord(ctx context.Context
return recordID, nil
}

func (e ExptInsightAnalysisServiceImpl) GenAnalysisReport(ctx context.Context, spaceID, exptID, recordID, CreateAt int64) (err error) {
func (e ExptInsightAnalysisServiceImpl) GenAnalysisReport(ctx context.Context, spaceID, exptID, recordID, CreateAt, startTime, endTime int64) (err error) {
analysisRecord, err := e.repo.GetAnalysisRecordByID(ctx, spaceID, exptID, recordID)
if err != nil {
return err
Expand Down Expand Up @@ -125,7 +127,7 @@ func (e ExptInsightAnalysisServiceImpl) GenAnalysisReport(ctx context.Context, s
return err
}

reportID, err := e.agentAdapter.CallTraceAgent(ctx, spaceID, url)
reportID, err := e.agentAdapter.CallTraceAgent(ctx, spaceID, url, startTime, endTime)
if err != nil {
return err
}
Expand All @@ -135,11 +137,13 @@ func (e ExptInsightAnalysisServiceImpl) GenAnalysisReport(ctx context.Context, s

// 发送时间检查分析报告生成状态
exportEvent := &entity.ExportCSVEvent{
ExportID: recordID,
ExperimentID: exptID,
SpaceID: spaceID,
ExportScene: entity.ExportSceneInsightAnalysis,
CreatedAt: CreateAt,
ExportID: recordID,
ExperimentID: exptID,
SpaceID: spaceID,
ExportScene: entity.ExportSceneInsightAnalysis,
CreatedAt: CreateAt,
ExptStartTime: startTime,
ExptEndTime: endTime,
}
err = e.exptPublisher.PublishExptExportCSVEvent(ctx, exportEvent, gptr.Of(time.Minute*3))
if err != nil {
Expand All @@ -154,6 +158,14 @@ func (e ExptInsightAnalysisServiceImpl) checkAnalysisReportGenStatus(ctx context
if err != nil {
return err
}

// 超过3小时,未生成分析报告,认为是失败
if status == entity.ReportStatus_Running && record.CreatedAt.Add(entity.InsightAnalysisRunningTimeout).Unix() <= time.Now().Unix() {
record.Status = entity.InsightAnalysisStatus_Failed
logs.CtxWarn(ctx, "checkAnalysisReportGenStatus found timeout event, expt_id: %v, record_id: %v", record.ExptID, record.ID)
return e.repo.UpdateAnalysisRecord(ctx, record)
}

if status == entity.ReportStatus_Failed {
record.Status = entity.InsightAnalysisStatus_Failed
return e.repo.UpdateAnalysisRecord(ctx, record)
Expand Down Expand Up @@ -195,6 +207,15 @@ func (e ExptInsightAnalysisServiceImpl) GetAnalysisRecordByID(ctx context.Contex
return nil, err
}

if analysisRecord.Status == entity.InsightAnalysisStatus_Running && analysisRecord.CreatedAt.Add(entity.InsightAnalysisRunningTimeout).Unix() < time.Now().Unix() {
analysisRecord.Status = entity.InsightAnalysisStatus_Failed
err = e.repo.UpdateAnalysisRecord(ctx, analysisRecord)
if err != nil {
logs.CtxError(ctx, "GetAnalysisRecordByID: UpdateAnalysisRecord failed: %v", err)
}
return analysisRecord, err
}

if analysisRecord.Status == entity.InsightAnalysisStatus_Running ||
analysisRecord.Status == entity.InsightAnalysisStatus_Failed {
return analysisRecord, nil
Expand Down Expand Up @@ -254,7 +275,40 @@ func (e ExptInsightAnalysisServiceImpl) notifyAnalysisComplete(ctx context.Conte
}

func (e ExptInsightAnalysisServiceImpl) ListAnalysisRecord(ctx context.Context, spaceID, exptID int64, page entity.Page, session *entity.Session) ([]*entity.ExptInsightAnalysisRecord, int64, error) {
return e.repo.ListAnalysisRecord(ctx, spaceID, exptID, page)
analysisRecords, total, err := e.repo.ListAnalysisRecord(ctx, spaceID, exptID, page)
if err != nil {
return nil, 0, err
}
if total == 0 {
return analysisRecords, total, nil
}

firstAnalysisRecord := analysisRecords[0]

upvoteCount, downvoteCount, err := e.repo.CountFeedbackVote(ctx, spaceID, exptID, firstAnalysisRecord.ID)
if err != nil {
// side path, don't block the main flow
logs.CtxWarn(ctx, "CountFeedbackVote failed for space_id: %v, expt_id: %v, record_id: %v, err=%v", spaceID, exptID, firstAnalysisRecord.ID, err)
return analysisRecords, total, nil
}

curUserFeedbackVote, err := e.repo.GetFeedbackVoteByUser(ctx, spaceID, exptID, firstAnalysisRecord.ID, session.UserID)
if err != nil {
// side path, don't block the main flow
logs.CtxWarn(ctx, "GetFeedbackVoteByUser failed for space_id: %v, expt_id: %v, record_id: %v, err=%v", spaceID, exptID, firstAnalysisRecord.ID, err)
return analysisRecords, total, nil
}
firstAnalysisRecord.ExptInsightAnalysisFeedback = entity.ExptInsightAnalysisFeedback{
UpvoteCount: upvoteCount,
DownvoteCount: downvoteCount,
CurrentUserVoteType: entity.None,
}
firstAnalysisRecord.ExptInsightAnalysisFeedback.CurrentUserVoteType = entity.None
if curUserFeedbackVote != nil {
firstAnalysisRecord.ExptInsightAnalysisFeedback.CurrentUserVoteType = curUserFeedbackVote.VoteType
}

return analysisRecords, total, nil
}

func (e ExptInsightAnalysisServiceImpl) DeleteAnalysisRecord(ctx context.Context, spaceID, exptID, recordID int64) error {
Expand Down
Loading
Loading