Skip to content

Commit

Permalink
articles are now read from disk into a buffer.
Browse files Browse the repository at this point in the history
workers read article data from the buffer.
this is to remove disk IO as a potential bottleneck.
  • Loading branch information
lsh-0 committed Oct 25, 2023
1 parent a6f0219 commit ab7f05e
Showing 1 changed file with 100 additions and 42 deletions.
142 changes: 100 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/sourcegraph/conc/pool"
Expand All @@ -38,6 +39,22 @@ type Schema struct {
Schema *jsonschema.Schema
}

type Result struct {
Type string
FileName string
Elapsed int64
Success bool
// these can get large. I recommend not accumulating them for large jobs with many problems.
Error error
}

type Article struct {
Type string // POA or VOR
FileName string
Data interface{} // unmarshalled json data

}

func configure_validator(schema_root string) map[string]Schema {
compiler := jsonschema.NewCompiler()
compiler.Draft = jsonschema.Draft4
Expand Down Expand Up @@ -76,7 +93,7 @@ func configure_validator(schema_root string) map[string]Schema {

// ---

func read_article_data(article_json_path string) (string, interface{}) {
func read_article_data(article_json_path string) Article {
article_json_bytes, err := os.ReadFile(article_json_path)
panic_on_err(err, "reading bytes from path: "+article_json_path)

Expand Down Expand Up @@ -109,7 +126,11 @@ func read_article_data(article_json_path string) (string, interface{}) {
err = json.Unmarshal(raw, &article)
panic_on_err(err, "unmarshalling article section bytes")

return schema_key, article
return Article{
FileName: article_json_path,
Data: article,
Type: schema_key,
}
}

func validate(schema Schema, article interface{}) (time.Duration, error) {
Expand All @@ -132,30 +153,20 @@ func path_is_dir(path string) bool {
return fi.Mode().IsDir()
}

type Result struct {
Type string
FileName string
Elapsed int64
Success bool
// these can get large. I recommend not accumulating them for large jobs with many problems.
Error error
}

func validate_article(schema_map map[string]Schema, article_json_path string, capture_error bool) Result {
func validate_article(schema_map map[string]Schema, article Article, capture_error bool) Result {

// read article data and determine schema to use
schema_key, article := read_article_data(article_json_path)
schema, present := schema_map[schema_key]
schema, present := schema_map[article.Type]
if !present {
panic("schema not found: " + schema_key)
panic("schema not found: " + article.Type)
}

// validate!
elapsed, err := validate(schema, article)
elapsed, err := validate(schema, article.Data)

r := Result{
Type: schema_key, // POA or VOR
FileName: filepath.Base(article_json_path),
Type: article.Type, // POA or VOR
FileName: article.FileName,
Elapsed: elapsed.Milliseconds(),
Success: err == nil,
}
Expand All @@ -175,7 +186,6 @@ func (r Result) String() string {
return fmt.Sprintf(msg, r.Type, "valid", r.Elapsed, r.FileName)
}
return fmt.Sprintf(msg, r.Type, "invalid", r.Elapsed, r.FileName)

}

func format_ms(ms int64) string {
Expand Down Expand Up @@ -205,6 +215,65 @@ func die(b bool, msg string) {
}
}

// process `file_list` by `num_workers` number of goroutines
func process_files_in_parallel(num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
worker_pool := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)
start_time := time.Now()
for _, file := range file_list {
file := file
worker_pool.Go(func() Result {
article := read_article_data(file)
result := validate_article(schema_map, article, capture_errors)
println(result.String())
return result
})
}
result_list := worker_pool.Wait()
end_time := time.Now()
return start_time, end_time, result_list
}

// keep a buffer of N files in memory at once to feed a pool of validators.
// ensures disk I/O is not a factor in keeping the CPU busy.
func process_files_with_feeder(num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
// read files from disk into buffer

// the number of articles to keep in memory at once.
buffer_size := 2000 // 1k articles is about ~1.5GiB of RAM
job_size := len(file_list)
if job_size < buffer_size {
buffer_size = job_size
}
article_chan := make(chan Article, buffer_size)
wg := sync.WaitGroup{}
wg.Add(1)
go func(article_chan chan Article, wg *sync.WaitGroup) {
defer wg.Done()
for _, file := range file_list {
article_chan <- read_article_data(file)
}
close(article_chan)
}(article_chan, &wg)

// process articles from `article_chan` until it's closed.

worker_pool := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)
start_time := time.Now()
for article := range article_chan {
article := article
worker_pool.Go(func() Result {
result := validate_article(schema_map, article, capture_errors)
println(result.String())
return result
})
}

wg.Wait()
result_list := worker_pool.Wait()
end_time := time.Now()
return start_time, end_time, result_list
}

func do() {
var err error
args := os.Args[1:]
Expand Down Expand Up @@ -233,7 +302,16 @@ func do() {
die(err != nil, "third argument (number of workers) is not an integer.")
}

if path_is_dir(input_path) {
if !path_is_dir(input_path) {
// validate single
capture_errors := true
article := read_article_data(input_path)
result := validate_article(schema_map, article, capture_errors)
if !result.Success {
long_validation_error(result.Error)
os.Exit(1)
}
} else {
// validate many
path_list, err := os.ReadDir(input_path)
panic_on_err(err, "reading contents of directory: "+input_path)
Expand Down Expand Up @@ -267,19 +345,8 @@ func do() {
sample_size = len(file_list)

capture_errors := false
worker_pool := pool.NewWithResults[Result]().WithMaxGoroutines(num_workers)

start_time := time.Now()
for _, file := range file_list {
file := file
worker_pool.Go(func() Result {
result := validate_article(schema_map, file, capture_errors)
println(result.String())
return result
})
}
result_list := worker_pool.Wait()
end_time := time.Now()
//start_time, end_time, result_list := process_files_in_parallel(num_workers, file_list, schema_map, capture_errors)
start_time, end_time, result_list := process_files_with_feeder(num_workers, file_list, schema_map, capture_errors)
wall_time_ms := end_time.Sub(start_time).Milliseconds()

var cpu_time_ms int64
Expand Down Expand Up @@ -308,15 +375,6 @@ func do() {
}
os.Exit(1)
}

} else {
// validate single
capture_errors := true
result := validate_article(schema_map, input_path, capture_errors)
if !result.Success {
long_validation_error(result.Error)
os.Exit(1)
}
}
}

Expand Down

0 comments on commit ab7f05e

Please sign in to comment.