Skip to content

Commit 7fc1455

Browse files
authored
Merge pull request #139 from mahmednabil109/Group_tasks_by_project
Add tag endpoint to admin server
2 parents 8e681c5 + 8387dc9 commit 7fc1455

File tree

7 files changed

+543
-22
lines changed

7 files changed

+543
-22
lines changed

cmds/admin_server/job/rdb/rdb.go

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package rdb
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/google/go-safeweb/safesql"
7+
adminServerJob "github.com/linuxboot/contest/cmds/admin_server/job"
8+
"github.com/linuxboot/contest/pkg/xcontext"
9+
)
10+
11+
var (
12+
tagsStmt = safesql.New(`SELECT tag, COUNT(tag) FROM job_tags WHERE tag REGEXP CONCAT('.*',?,'.*') GROUP BY tag`)
13+
jobStmt = safesql.New(`SELECT t.job_id, r.reporter_name, r.report_time, r.data FROM job_tags t LEFT JOIN final_reports r ON t.job_id = r.job_id WHERE t.tag = ?`)
14+
)
15+
16+
// SQL defines a struct that wraps a db connection to job sql database
17+
type Storage struct {
18+
db *safesql.DB
19+
}
20+
21+
func New(dbURI, driveName string) (*Storage, error) {
22+
23+
if driveName == "" {
24+
driveName = "mysql"
25+
}
26+
27+
db, err := safesql.Open(driveName, dbURI)
28+
if err != nil {
29+
return nil, fmt.Errorf("error while initializing the db: %w", err)
30+
}
31+
if err := db.Ping(); err != nil {
32+
db.Close()
33+
return nil, fmt.Errorf("error while connecting to the db: %w", err)
34+
}
35+
36+
return &Storage{
37+
db: &db,
38+
}, nil
39+
}
40+
41+
// GetTags returns tags that has a tag matches tagPattern
42+
func (r *Storage) GetTags(ctx xcontext.Context, tagPattern string) ([]adminServerJob.Tag, error) {
43+
var resultErr error
44+
res := []adminServerJob.Tag{}
45+
doneChan := make(chan struct{})
46+
47+
go func(doneChan chan<- struct{}) {
48+
defer func() {
49+
doneChan <- struct{}{}
50+
}()
51+
52+
rows, err := r.db.Query(tagsStmt, tagPattern)
53+
if err != nil {
54+
resultErr = fmt.Errorf("error while listing projects with tag like %s (sql: %q): %w", tagPattern, tagsStmt, err)
55+
return
56+
}
57+
defer func() {
58+
err = rows.Close()
59+
if err != nil {
60+
ctx.Errorf("error while closing the rows reader: %w", err)
61+
}
62+
}()
63+
64+
for rows.Next() {
65+
if rows.Err() != nil {
66+
resultErr = fmt.Errorf("error while reading the rows from query result: %w", err)
67+
return
68+
}
69+
70+
var tag adminServerJob.Tag
71+
if err := rows.Scan(&tag.Name, &tag.JobsCount); err != nil {
72+
resultErr = fmt.Errorf("error while scaning query result (sql: %q): %w", tagsStmt, err)
73+
return
74+
}
75+
res = append(res, tag)
76+
}
77+
}(doneChan)
78+
79+
for {
80+
select {
81+
case <-doneChan:
82+
return res, resultErr
83+
case <-ctx.Done():
84+
return nil, ctx.Err()
85+
}
86+
}
87+
}
88+
89+
// GetJobs returns jobs with final report if exists that are under a given tagName
90+
func (r *Storage) GetJobs(ctx xcontext.Context, tagName string) ([]adminServerJob.Job, error) {
91+
var resultErr error
92+
res := []adminServerJob.Job{}
93+
doneChan := make(chan struct{})
94+
95+
go func(doneChan chan<- struct{}) {
96+
defer func() {
97+
doneChan <- struct{}{}
98+
}()
99+
100+
rows, err := r.db.Query(jobStmt, tagName)
101+
if err != nil {
102+
resultErr = fmt.Errorf("error while listing jobs with tag %s (sql: %q): %w", tagName, jobStmt, err)
103+
return
104+
}
105+
defer func() {
106+
err = rows.Close()
107+
if err != nil {
108+
ctx.Errorf("error while closing the rows reader: %w", err)
109+
}
110+
}()
111+
112+
for rows.Next() {
113+
if rows.Err() != nil {
114+
resultErr = fmt.Errorf("error while reading the rows from query result: %w", err)
115+
return
116+
}
117+
118+
var job adminServerJob.Job
119+
if err := rows.Scan(&job.JobID, &job.ReporterName, &job.ReportTime, &job.Data); err != nil {
120+
resultErr = fmt.Errorf("error while scaning the job (sql: %q): %w", jobStmt, err)
121+
return
122+
}
123+
res = append(res, job)
124+
}
125+
}(doneChan)
126+
127+
for {
128+
select {
129+
case <-doneChan:
130+
return res, resultErr
131+
case <-ctx.Done():
132+
return nil, ctx.Err()
133+
}
134+
}
135+
}
136+
137+
func (r *Storage) Close() error {
138+
return r.db.Close()
139+
}

cmds/admin_server/job/storage.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package job
2+
3+
import (
4+
"time"
5+
6+
"github.com/linuxboot/contest/pkg/types"
7+
"github.com/linuxboot/contest/pkg/xcontext"
8+
)
9+
10+
// DB wraps a job database
11+
type Storage interface {
12+
GetTags(ctx xcontext.Context, tagPattern string) ([]Tag, error)
13+
GetJobs(ctx xcontext.Context, projectName string) ([]Job, error)
14+
}
15+
16+
// Tag contains metadata about jobs under a given tag
17+
type Tag struct {
18+
Name string
19+
// number of jobs with under this tag
20+
JobsCount uint
21+
}
22+
23+
// Job contains final report data about that job_id
24+
type Job struct {
25+
JobID types.JobID
26+
// fields for the final report of the job if it exists.
27+
ReporterName *string
28+
ReportTime *time.Time
29+
Success *bool
30+
Data *string
31+
}

cmds/admin_server/main.go

+20-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"syscall"
1010
"time"
1111

12+
// this import registers mysql driver for safesql to use
13+
_ "github.com/go-sql-driver/mysql"
14+
"github.com/linuxboot/contest/cmds/admin_server/job/rdb"
1215
"github.com/linuxboot/contest/cmds/admin_server/server"
1316
mongoStorage "github.com/linuxboot/contest/cmds/admin_server/storage/mongo"
1417
"github.com/linuxboot/contest/pkg/logging"
@@ -18,18 +21,20 @@ import (
1821
)
1922

2023
var (
21-
flagSet *flag.FlagSet
22-
flagPort *int
23-
flagDBURI *string
24-
flagTLSCert *string
25-
flagTLSKey *string
26-
flagLogLevel *string
24+
flagSet *flag.FlagSet
25+
flagPort *int
26+
flagDBURI *string
27+
flagContestDBURI *string
28+
flagTLSCert *string
29+
flagTLSKey *string
30+
flagLogLevel *string
2731
)
2832

2933
func initFlags(cmd string) {
3034
flagSet = flag.NewFlagSet(cmd, flag.ContinueOnError)
3135
flagPort = flagSet.Int("port", 8000, "Port to init the admin server on")
3236
flagDBURI = flagSet.String("dbURI", "mongodb://localhost:27017", "Database URI")
37+
flagContestDBURI = flagSet.String("contestdbURI", "contest:contest@tcp(localhost:3306)/contest_integ?parseTime=true", "Contest Database URI")
3338
flagTLSCert = flagSet.String("tlsCert", "", "Path to the tls cert file")
3439
flagTLSKey = flagSet.String("tlsKey", "", "Path to the tls key file")
3540
flagLogLevel = flagSet.String("logLevel", "debug", "A log level, possible values: debug, info, warning, error, panic, fatal")
@@ -72,6 +77,14 @@ func main() {
7277
defer cancel()
7378
defer storage.Close(closeCtx)
7479

80+
var jobStorage *rdb.Storage
81+
ctx.Debugf("init contest db connection %v \n", *flagContestDBURI)
82+
jobStorage, err = rdb.New(*flagContestDBURI, "mysql")
83+
if err != nil {
84+
exitWithError(err, 1)
85+
}
86+
defer jobStorage.Close()
87+
7588
go func() {
7689
<-sigs
7790
cancel()
@@ -88,7 +101,7 @@ func main() {
88101
}
89102
}
90103

91-
if err := server.Serve(ctx, *flagPort, storage, nil, tlsConfig); err != nil {
104+
if err := server.Serve(ctx, *flagPort, storage, jobStorage, nil, tlsConfig); err != nil {
92105
exitWithError(fmt.Errorf("server err: %w", err), 1)
93106
}
94107
}

0 commit comments

Comments
 (0)