Skip to content

Commit

Permalink
adds buffer size as argument
Browse files Browse the repository at this point in the history
  • Loading branch information
lsh-0 committed Oct 25, 2023
1 parent ab7f05e commit 9610515
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,9 @@ func process_files_in_parallel(num_workers int, file_list []string, schema_map m

// keep a buffer of N files in memory at once to feed a pool of validators.
// ensures disk I/O is not a factor in keeping the CPU busy.
func process_files_with_feeder(num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
func process_files_with_feeder(buffer_size int, num_workers int, file_list []string, schema_map map[string]Schema, capture_errors bool) (time.Time, time.Time, []Result) {
// read files from disk into buffer

// the number of articles to keep in memory at once.
buffer_size := 2000 // 1k articles is about ~1.5GiB of RAM
job_size := len(file_list)
if job_size < buffer_size {
buffer_size = job_size
Expand All @@ -253,6 +251,7 @@ func process_files_with_feeder(num_workers int, file_list []string, schema_map m
article_chan <- read_article_data(file)
}
close(article_chan)
println("(done reading files)")
}(article_chan, &wg)

// process articles from `article_chan` until it's closed.
Expand Down Expand Up @@ -302,6 +301,13 @@ func do() {
die(err != nil, "third argument (number of workers) is not an integer.")
}

// the number of articles to keep in memory at once.
buffer_size := 2000 // 1k articles is about ~1.5GiB of RAM
if len(args) > 4 {
buffer_size, err = strconv.Atoi(args[4])
die(err != nil, "fourth argument (buffer size) is not an integer.")
}

if !path_is_dir(input_path) {
// validate single
capture_errors := true
Expand Down Expand Up @@ -346,7 +352,7 @@ func do() {

capture_errors := false
//start_time, end_time, result_list := process_files_in_parallel(num_workers, file_list, schema_map, capture_errors)
start_time, end_time, result_list := process_files_with_feeder(num_workers, file_list, schema_map, capture_errors)
start_time, end_time, result_list := process_files_with_feeder(buffer_size, num_workers, file_list, schema_map, capture_errors)
wall_time_ms := end_time.Sub(start_time).Milliseconds()

var cpu_time_ms int64
Expand Down

0 comments on commit 9610515

Please sign in to comment.