Skip to content

Commit

Permalink
adds conc to deal with concurrency weirdness
Browse files Browse the repository at this point in the history
  • Loading branch information
lsh-0 committed Aug 25, 2023
1 parent 96e552c commit 4ce9e52
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 39 deletions.
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ module validate-article-json

go 1.20

require github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/sourcegraph/conc v0.3.0
github.com/tidwall/gjson v1.15.0
)

require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/tidwall/gjson v1.15.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
)
23 changes: 15 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw=
github.com/tidwall/gjson v1.15.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
86 changes: 60 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"strings"
"time"

"github.com/sourcegraph/conc/pool"

"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/tidwall/gjson"
)
Expand Down Expand Up @@ -105,8 +107,8 @@ func read_article_data(article_json_path string) (string, interface{}) {
func validate(schema Foo, article interface{}) (bool, time.Duration) {
start := time.Now()
err := schema.Schema.Validate(article)
t := time.Now()
elapsed := t.Sub(start)
end := time.Now()
elapsed := end.Sub(start)
if err != nil {
return false, elapsed
}
Expand All @@ -125,7 +127,13 @@ func path_is_dir(path string) bool {
return fi.Mode().IsDir()
}

func validate_article(article_json_path string) (bool, int64) {
type Result struct {
Path string
Elapsed int64
Success bool
}

func validate_article(article_json_path string) Result {

// read article data and determine schema to use
schema_key, article := read_article_data(article_json_path)
Expand All @@ -146,14 +154,23 @@ func validate_article(article_json_path string) (bool, int64) {
} else {
println(fmt.Sprintf(msg, schema.Label, "invalid", elapsed_ms, fname))
}
return success, elapsed_ms
return Result{
Path: article_json_path,
Elapsed: elapsed_ms,
Success: success,
}
}

func worker(id int, jobs <-chan string, results chan<- int64) {
for path := range jobs {
_, ms_elapsed := validate_article(path)
results <- ms_elapsed
func format_ms(ms int64) string {
elapsed_str := fmt.Sprintf("%dms", ms)
if ms > 1000 {
// seconds
elapsed_str = fmt.Sprintf("%ds", ms/1000)
} else if ms > 60000 {
// minutes
elapsed_str = fmt.Sprintf("%dm", (ms/1000)/60)
}
return elapsed_str
}

func main() {
Expand All @@ -178,35 +195,52 @@ func main() {
}
}

jobs := make(chan string, sample_size)
results := make(chan int64) // time taken in ms
// filter directories from path listing
file_list := []string{}
for _, path := range path_list[:sample_size] {
if !path.IsDir() {
file_list = append(file_list, input_path+path.Name())
}
}

// init worker pool
num_workers := 10 // todo: set to num cpus
for w := 1; w <= num_workers; w++ {
go worker(w, jobs, results)
p := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)

start_time := time.Now()
for _, file := range file_list {
file := file
p.Go(func() Result {
return validate_article(file)
})
}
result_list := p.Wait()
end_time := time.Now()
wall_time_ms := end_time.Sub(start_time).Milliseconds()

// send jobs to workers
for _, path := range path_list[:sample_size] {
if !path.IsDir() {
jobs <- input_path + path.Name()
}
var cpu_time_ms int64
for _, result := range result_list {
cpu_time_ms = cpu_time_ms + result.Elapsed
}

// there is a long pause here.
// perhaps we should be tallying results/printing progress as we go?
failures := []Result{}
for _, result := range result_list {
if !result.Success {
failures = append(failures, result)
}
}

var total_ms int64
for i := 0; i < sample_size; i++ {
// total_ms = total_ms + ms
total_ms = total_ms + <-results
println("")
println(fmt.Sprintf("articles:%d, failures:%d, workers:%d, wall-time:%s, cpu-time:%s, average:%dms", sample_size, len(failures), num_workers, format_ms(wall_time_ms), format_ms(cpu_time_ms), (cpu_time_ms / int64(sample_size))))

if len(failures) > 0 {
os.Exit(1)
}
println(fmt.Sprintf("total: %dms average: %dms", total_ms, (total_ms / int64(sample_size))))

} else {
// assume file or a link pointing to a file, validate single
validate_article(input_path)
result := validate_article(input_path)
if !result.Success {
os.Exit(1)
}
}
}

0 comments on commit 4ce9e52

Please sign in to comment.