Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions app/jobs/cleanup_sub_directory_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# frozen_string_literal: true

# Walks the Carrierwave uploaded-file staging directory
# (<uploads>/<tenant>/hyrax/uploaded_file/file/) and deletes files whose
# corresponding Hyrax::UploadedFile record shows they have been ingested
# (file_set_uri is present) and are old enough, or that are orphaned and
# very old.
class CleanupSubDirectoryJob < ApplicationJob
non_tenant_job

# Avoid ApplicationJob's retry_on(StandardError) burning 5 attempts
# and swallowing the error when the retry block is present.
discard_on ArgumentError do |_job, error|
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about discard_on, I like that!

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right?! i thought that was pretty neato myself!

logger.warn(error.message)
end

# Must stay aligned with CleanupUploadFilesJob::CARRIERWAVE_SUBDIR (site/banner_images, etc. must never match).
INGEST_STAGING_SUFFIX = File.join(File::SEPARATOR, "hyrax", "uploaded_file", "file").freeze

attr_reader :delete_ingested_after_days, :delete_all_after_days, :directory, :tenant, :files_checked, :files_deleted

def perform(delete_ingested_after_days:, directory:, delete_all_after_days: 730, tenant: nil)
assert_ingest_staging_directory!(directory)

@directory = directory
@delete_ingested_after_days = delete_ingested_after_days
@delete_all_after_days = delete_all_after_days
@tenant = tenant
@files_checked = 0
@files_deleted = 0
process_upload_directories
delete_empty_directories
logger.info("Completed #{directory}: checked #{@files_checked}, deleted #{@files_deleted}")
end

private

def assert_ingest_staging_directory!(dir)
path = File.expand_path(dir.to_s)
return if path.end_with?(INGEST_STAGING_SUFFIX)

relative = File.join("hyrax", "uploaded_file", "file")
raise ArgumentError,
"CleanupSubDirectoryJob only accepts Hyrax ingest staging paths ending in #{relative.inspect} " \
"(e.g. tenant upload root + #{relative}); got #{dir.inspect}"
end

def process_upload_directories
Dir.glob("#{directory}/*").each do |upload_dir|
next unless File.directory?(upload_dir)

uploaded_file_id = File.basename(upload_dir)
process_upload_dir(upload_dir, uploaded_file_id)
end
end

def process_upload_dir(upload_dir, uploaded_file_id)
Dir.glob("#{upload_dir}/*").each do |path|
next unless File.file?(path)
next unless should_be_deleted?(path, uploaded_file_id)

File.delete(path)
@files_deleted += 1
logger.info("Checked #{@files_checked}, deleted #{@files_deleted} files") if (@files_deleted % 100).zero?
end
end

def delete_empty_directories
Dir.glob("#{directory}/*").select { |path| File.directory?(path) }.each do |dir|
FileUtils.rmdir(dir)
rescue Errno::ENOTEMPTY
next
end

logger.info("Completed empty directory cleanup for #{directory}")
end

def should_be_deleted?(path, uploaded_file_id)
return true if very_old?(path)

ingested_and_old_enough?(path, uploaded_file_id)
end

def ingested_and_old_enough?(path, uploaded_file_id)
file_older_than?(path, delete_ingested_after_days) && ingested?(uploaded_file_id)
end

def very_old?(path)
file_older_than?(path, delete_all_after_days)
end

def file_older_than?(path, days)
File.mtime(path) < (Time.zone.now - days.to_i.days)
end

def ingested?(uploaded_file_id)
@files_checked += 1
record = if tenant.present?
Apartment::Tenant.switch(tenant) { Hyrax::UploadedFile.find_by(id: uploaded_file_id) }
else
Hyrax::UploadedFile.find_by(id: uploaded_file_id)
end

return false if record.nil?

record.file_set_uri.present?
end
end
40 changes: 40 additions & 0 deletions app/jobs/cleanup_upload_files_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

# Kicks off a CleanupSubDirectoryJob for the Carrierwave uploaded-file staging
# directory under the tenant's upload root.
class CleanupUploadFilesJob < ApplicationJob
non_tenant_job

CARRIERWAVE_SUBDIR = File.join("hyrax", "uploaded_file", "file").freeze

attr_reader :uploads_path, :tenant
def perform(delete_ingested_after_days:, uploads_path:, delete_all_after_days: 730, tenant: nil)
@uploads_path = uploads_path
@tenant = tenant
carrierwave_dir = File.join(uploads_path, CARRIERWAVE_SUBDIR)

unless Dir.exist?(carrierwave_dir)
logger.info("No Carrierwave directory at #{carrierwave_dir}; nothing to clean")
return
end

logger.info(message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days))

CleanupSubDirectoryJob.perform_later(
delete_ingested_after_days: delete_ingested_after_days,
directory: carrierwave_dir,
delete_all_after_days: delete_all_after_days,
tenant: tenant
)
end

private

def message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days)
<<~MESSAGE
Starting cleanup of #{carrierwave_dir}:
delete ingested after #{delete_ingested_after_days} days,
delete all files after #{delete_all_after_days} days.
MESSAGE
end
end
38 changes: 38 additions & 0 deletions lib/tasks/uploads_cleanup.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

# Scheduled via cron / Kubernetes CronJob (or similar). Uses ActiveJob only—works with Sidekiq, GoodJob,
# or any other queue adapter; no GoodJob-specific APIs.

namespace :hyku do
desc "Enqueue per-tenant Hyrax upload staging cleanup (DELETE_INGESTED_AFTER_DAYS, DELETE_ALL_AFTER_DAYS); skips S3 tenants"
task cleanup_uploads: :environment do
ingested = ENV.fetch("DELETE_INGESTED_AFTER_DAYS", "180").to_i
delete_all = ENV.fetch("DELETE_ALL_AFTER_DAYS", "730").to_i

Account.find_each do |account|
Apartment::Tenant.switch(account.tenant) do
if Site.account&.s3_bucket.present?
puts "Skipping #{account.tenant}: S3 uploads (no local staging tree)"
next
end

uploads_path = Hyrax.config.upload_path.call
uploads_path = uploads_path.to_s

unless Dir.exist?(uploads_path)
puts "Skipping #{account.tenant}: #{uploads_path} does not exist"
next
end

puts "Enqueueing cleanup for #{account.tenant} → #{uploads_path}"

CleanupUploadFilesJob.perform_later(
delete_ingested_after_days: ingested,
uploads_path: uploads_path,
delete_all_after_days: delete_all,
tenant: account.tenant
)
end
end
end
end
176 changes: 176 additions & 0 deletions spec/jobs/cleanup_sub_directory_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# frozen_string_literal: true

RSpec.describe CleanupSubDirectoryJob do
let(:old_time) { Time.zone.now - 1.year }
let(:very_old_time) { Time.zone.now - 3.years }
let(:new_time) { Time.zone.now - 1.week }
let(:tenant) { 'tenant-abc' }
let(:base_dir) { '/app/samvera/uploads/tenant-abc/hyrax/uploaded_file/file' }

describe 'ingest staging directory guard' do
# Call #perform directly so ArgumentError is not intercepted by ApplicationJob's retry_on(StandardError),
# which (with a block) does not re-raise after the final attempt — perform_now would then not fail the example.
it 'raises when given a site banner directory (ticket #842)' do
banner = '/app/samvera/uploads/56e0eb81-c2d5-4d5d-9171-b251bf7299a4/site/banner_images/1'
expect do
described_class.new.perform(delete_ingested_after_days: 180, directory: banner, tenant: tenant)
end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths})
end

it 'raises when given the tenant upload root' do
root = '/app/samvera/uploads/tenant-abc'
expect do
described_class.new.perform(delete_ingested_after_days: 180, directory: root, tenant: tenant)
end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths})
end

it 'raises when given hyrax/uploaded_file without the final file segment' do
partial = '/app/samvera/uploads/tenant-abc/hyrax/uploaded_file'
expect do
described_class.new.perform(delete_ingested_after_days: 180, directory: partial, tenant: tenant)
end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths})
end

it 'accepts a valid staging path with a trailing slash' do
allow(Apartment::Tenant).to receive(:switch).with(tenant).and_yield
allow(Dir).to receive(:glob).and_return([])
allow(FileUtils).to receive(:rmdir)

expect do
described_class.new.perform(delete_ingested_after_days: 180, directory: "#{base_dir}/", tenant: tenant)
end.not_to raise_error
end
end

let(:file_1) { "#{base_dir}/1/document.pdf" }
let(:file_2) { "#{base_dir}/2/image.jpg" }
let(:file_3) { "#{base_dir}/3/recent.txt" }
let(:file_4) { "#{base_dir}/4/not_a_file" }
let(:file_5) { "#{base_dir}/5/orphan.pdf" }
let(:file_6) { "#{base_dir}/6/ancient.pdf" }

let(:filenames) do
{ '1' => 'document.pdf', '2' => 'image.jpg', '3' => 'recent.txt',
'4' => 'not_a_file', '5' => 'orphan.pdf', '6' => 'ancient.pdf' }
end

before do
allow(Apartment::Tenant).to receive(:switch).with(tenant).and_yield

allow(Dir).to receive(:glob) do |pattern|
case pattern
when "#{base_dir}/*"
%w[1 2 3 4 5 6].map { |id| "#{base_dir}/#{id}" }
when /\A#{Regexp.escape(base_dir)}\/(\d+)\/\*\z/
id = Regexp.last_match(1)
["#{base_dir}/#{id}/#{filenames[id]}"]
else
[]
end
end

allow(File).to receive(:directory?) do |path|
%w[1 2 3 4 5 6].any? { |id| path == "#{base_dir}/#{id}" }
end

allow(File).to receive(:file?) do |path|
path != file_4 && [file_1, file_2, file_3, file_5, file_6].include?(path)
end

allow(File).to receive(:mtime).with(file_1).and_return(old_time)
allow(File).to receive(:mtime).with(file_2).and_return(old_time)
allow(File).to receive(:mtime).with(file_3).and_return(new_time)
allow(File).to receive(:mtime).with(file_5).and_return(old_time)
allow(File).to receive(:mtime).with(file_6).and_return(very_old_time)

allow(File).to receive(:delete)
allow(FileUtils).to receive(:rmdir)

allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '1')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'http://fcrepo/rest/abc'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '2')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '3')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil))
end

it 'deletes old files that have been ingested (file_set_uri present)' do
expect(File).to receive(:delete).with(file_1)
expect(File).to receive(:delete).with(file_2)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete files newer than delete_ingested_after_days even if ingested' do
expect(File).not_to receive(:delete).with(file_3)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete entries that are not files' do
expect(File).not_to receive(:delete).with(file_4)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete old files without file_set_uri (orphaned but not old enough)' do
expect(File).not_to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'deletes orphaned files older than delete_all_after_days' do
expect(File).to receive(:delete).with(file_6)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 730,
tenant: tenant)
end

it 'uses configurable delete_all_after_days threshold' do
expect(File).to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 300,
tenant: tenant)
end

it 'deletes files when no UploadedFile DB record exists and file is very old' do
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6').and_return(nil)
expect(File).to receive(:delete).with(file_6)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 730,
tenant: tenant)
end

it 'does not delete files when no UploadedFile DB record exists but file is not very old' do
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5').and_return(nil)
expect(File).not_to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

describe 'cleaning up empty directories' do
before do
allow(File).to receive(:directory?).with("#{base_dir}/1").and_return(true)
allow(File).to receive(:directory?).with("#{base_dir}/2").and_return(true)
allow(File).to receive(:directory?).with("#{base_dir}/3").and_return(true)
allow(FileUtils).to receive(:rmdir)
.with("#{base_dir}/2")
.and_raise(Errno::ENOTEMPTY)
end

it 'attempts to remove empty upload ID directories' do
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/1")
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/2")
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/3")

described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'continues when a directory is not empty' do
expect { described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) }
.not_to raise_error
end
end
end
Loading
Loading