Skip to content

Commit

Permalink
Merge pull request #105 from Wafris/make-http-calls-concurrent
Browse files Browse the repository at this point in the history
Make http calls concurrent
  • Loading branch information
rmcastil authored Nov 29, 2024
2 parents 6662279 + fe167f7 commit 1737b72
Showing 1 changed file with 44 additions and 36 deletions.
80 changes: 44 additions & 36 deletions lib/wafris.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,25 @@ def send_upsync_requests(requests_array)

url_and_api_key = @configuration.upsync_url + "/" + @configuration.api_key

LogSuppressor.puts_log("[Upsync] Beginning request thread...")
current_time = Time.now
response = HTTParty.post(
url_and_api_key,
body: body,
headers: headers,
timeout: 10
timeout: 5
)

if response.code == 200
@configuration.upsync_status = "Complete"
else
LogSuppressor.puts_log("Upsync Error. HTTP Response: #{response.code}")
LogSuppressor.puts_log("[Upsync] Error. HTTP Response: #{response.code}")
end
rescue HTTParty::Error => e
LogSuppressor.puts_log("Upsync Error. Failed to send upsync requests: #{e.message}")
LogSuppressor.puts_log("[Upsync] Thread Error. Failed to send upsync requests: #{e.message}")
ensure
elapsed_time = Time.now - current_time
LogSuppressor.puts_log("[Upsync] request thread complete in #{elapsed_time.round(2)} seconds.")
end

# This method is used to queue upsync requests. It takes in several parameters including:
Expand All @@ -226,7 +231,7 @@ def queue_upsync_request(request, treatment, category, rule)
@configuration.upsync_queue = []
@configuration.last_upsync_timestamp = Time.now.to_i

send_upsync_requests(requests_array)
Thread.new { send_upsync_requests(requests_array) }
end

@configuration.upsync_status = "Enabled"
Expand Down Expand Up @@ -255,10 +260,10 @@ def downsync_db(db_rule_category, current_filename = nil)
begin
lockfile = File.open(lockfile_path, File::RDWR | File::CREAT | File::EXCL)
rescue Errno::EEXIST
LogSuppressor.puts_log("[Wafris][Downsync] Lockfile already exists, skipping downsync.")
LogSuppressor.puts_log("[Downsync][downsync_db] Lockfile already exists, skipping downsync.")
return
rescue Exception => e
LogSuppressor.puts_log("[Wafris][Downsync] Error creating lockfile: #{e.message}")
LogSuppressor.puts_log("[Downsync] Error creating lockfile: #{e.message}")
end

begin
Expand All @@ -280,24 +285,27 @@ def downsync_db(db_rule_category, current_filename = nil)
# puts "Downloading from #{@configuration.downsync_url}/#{db_rule_category}/#{@configuration.api_key}?current_version=#{current_filename}&process_id=#{Process.pid}"
uri = "#{@configuration.downsync_url}/#{db_rule_category}/#{@configuration.api_key}?#{data.to_query}"

LogSuppressor.puts_log("[Downsync] Beginning request thread for #{db_rule_category}...")
current_time = Time.now

response = HTTParty.get(
uri,
follow_redirects: true, # Enable following redirects
max_redirects: 2 # Maximum number of redirects to follow
max_redirects: 2, # Maximum number of redirects to follow
timeout: 30
)

# TODO: What to do if timeout
# TODO: What to do if error

if response.code == 401
@configuration.upsync_status = "Disabled"
LogSuppressor.puts_log("[Wafris][Downsync] Unauthorized: Bad or missing API key")
LogSuppressor.puts_log("[Wafris][Downsync] API Key: #{@configuration.api_key}")
LogSuppressor.puts_log("[Downsync] Unauthorized: Bad or missing API key")
LogSuppressor.puts_log("[Downsync] API Key: #{@configuration.api_key}")
filename = current_filename

elsif response.code == 304
@configuration.upsync_status = "Enabled"
LogSuppressor.puts_log("[Wafris][Downsync] No new rules to download")

filename = current_filename

Expand Down Expand Up @@ -332,12 +340,12 @@ def downsync_db(db_rule_category, current_filename = nil)
# DB file is bad or empty so keep using whatever we have now
else
filename = old_file_name
LogSuppressor.puts_log("[Wafris][Downsync] DB Error - No tables exist in the db file #{@configuration.db_file_path}/#{filename}")
LogSuppressor.puts_log("[Downsync] DB Error - No tables exist in the db file #{@configuration.db_file_path}/#{filename}")
end

end
rescue => e
LogSuppressor.puts_log("[Wafris][Downsync] Error downloading rules: #{e.message}")
LogSuppressor.puts_log("[Downsync] Error downloading rules: #{e.message}")

# This gets set even if the API key is bad or other issues
# to prevent hammering the distribution server on every request
Expand All @@ -353,50 +361,55 @@ def downsync_db(db_rule_category, current_filename = nil)
# Ensure the lockfile is removed after operations
lockfile.close
File.delete(lockfile_path)

elapsed_time = Time.now - current_time
LogSuppressor.puts_log("[Downsync] request thread complete in #{elapsed_time.round(2)} seconds for #{db_rule_category}.")
end

filename
end

def sync_interval(db_rule_category)
if db_rule_category == "custom_rules"
@configuration.downsync_custom_rules_interval
else
@configuration.downsync_data_subscriptions_interval
end
end

# Returns the current database file,
# if the file is older than the interval, it will download the latest db
# if the file doesn't exist, it will download the latest db
# if the lockfile exists, it will return the current db
def current_db(db_rule_category)
interval = if db_rule_category == "custom_rules"
@configuration.downsync_custom_rules_interval
else
@configuration.downsync_data_subscriptions_interval
end

# Checks for existing current modfile, which contains the current db filename
if File.exist?("#{@configuration.db_file_path}/#{db_rule_category}.modfile")

LogSuppressor.puts_log("[Wafris][Downsync] Modfile exists, skipping downsync")
LogSuppressor.puts_log("[Downsync] Modfile exists, skipping downsync")

# Get last Modified Time and current database file name
last_db_synctime = File.mtime("#{@configuration.db_file_path}/#{db_rule_category}.modfile").to_i
returned_db = File.read("#{@configuration.db_file_path}/#{db_rule_category}.modfile").strip

LogSuppressor.puts_log("[Wafris][Downsync] Modfile Last Modified Time: #{last_db_synctime}")
LogSuppressor.puts_log("[Wafris][Downsync] DB in Modfile: #{returned_db}")
LogSuppressor.puts_log("[Downsync] Modfile Last Modified Time: #{last_db_synctime}")
LogSuppressor.puts_log("[Downsync] DB in Modfile: #{returned_db}")

# Check if the db file is older than the interval
if (Time.now.to_i - last_db_synctime) > interval
if (Time.now.to_i - last_db_synctime) > sync_interval(db_rule_category)

LogSuppressor.puts_log("[Wafris][Downsync] DB is older than the interval")
LogSuppressor.puts_log("[Downsync] DB is older than the interval")

# Make sure that another process isn't already downloading the rules
if !File.exist?("#{@configuration.db_file_path}/#{db_rule_category}.lockfile")
returned_db = downsync_db(db_rule_category, returned_db)
Thread.new { downsync_db(db_rule_category, returned_db) }
end

returned_db

# Current db is up to date
else

LogSuppressor.puts_log("[Wafris][Downsync] DB is up to date")
LogSuppressor.puts_log("[Downsync] DB is up to date")

returned_db = File.read("#{@configuration.db_file_path}/#{db_rule_category}.modfile").strip

Expand All @@ -413,24 +426,18 @@ def current_db(db_rule_category)
# No modfile exists, so download the latest db
else

LogSuppressor.puts_log("[Wafris][Downsync] No modfile exists, downloading latest db")
LogSuppressor.puts_log("[Downsync] No modfile exists, downloading latest #{db_rule_category} db")

# Make sure that another process isn't already downloading the rules
if File.exist?("#{@configuration.db_file_path}/#{db_rule_category}.lockfile")
LogSuppressor.puts_log("[Wafris][Downsync] Lockfile exists, skipping downsync")
LogSuppressor.puts_log("[Downsync][current_db] Lockfile exists, skipping downsync")
# Lockfile exists, but no modfile with a db filename
nil
else

LogSuppressor.puts_log("[Wafris][Downsync] No modfile exists, downloading latest db")
# No modfile exists, so download the latest db
returned_db = downsync_db(db_rule_category, nil)
Thread.new { downsync_db(db_rule_category, nil) }

if returned_db.nil?
nil
else
returned_db
end
nil
end
end
end
Expand All @@ -444,11 +451,12 @@ def evaluate(request)

return "Passed" if @configuration.api_key.nil?

# Now current_db can return the actual db, nil, or a future object
rules_db_filename = current_db("custom_rules")
data_subscriptions_db_filename = current_db("data_subscriptions")

# Checks to see if the filenames are present before loading the db
if rules_db_filename.to_s.strip != "" && data_subscriptions_db_filename.strip.to_s.strip != ""
if rules_db_filename.to_s.strip != "" && data_subscriptions_db_filename.to_s.strip != ""

rules_db = SQLite3::Database.new "#{@configuration.db_file_path}/#{rules_db_filename}"
data_subscriptions_db =
Expand Down

0 comments on commit 1737b72

Please sign in to comment.