Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions app/jobs/cleanup_sub_directory_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

# Walks the Carrierwave uploaded-file staging directory
# (<uploads>/<tenant>/hyrax/uploaded_file/file/) and deletes files whose
# corresponding Hyrax::UploadedFile record shows they have been ingested
# (file_set_uri is present) and are old enough, or that are orphaned and
# very old.
class CleanupSubDirectoryJob < ApplicationJob
non_tenant_job

attr_reader :delete_ingested_after_days, :delete_all_after_days, :directory, :tenant, :files_checked, :files_deleted

def perform(delete_ingested_after_days:, directory:, delete_all_after_days: 730, tenant: nil)
@directory = directory
@delete_ingested_after_days = delete_ingested_after_days
@delete_all_after_days = delete_all_after_days
@tenant = tenant
@files_checked = 0
@files_deleted = 0
process_upload_directories
delete_empty_directories
logger.info("Completed #{directory}: checked #{@files_checked}, deleted #{@files_deleted}")
end

private

def process_upload_directories
Dir.glob("#{directory}/*").each do |upload_dir|
next unless File.directory?(upload_dir)

uploaded_file_id = File.basename(upload_dir)
process_upload_dir(upload_dir, uploaded_file_id)
end
end

def process_upload_dir(upload_dir, uploaded_file_id)
Dir.glob("#{upload_dir}/*").each do |path|
next unless File.file?(path)
next unless should_be_deleted?(path, uploaded_file_id)

File.delete(path)
@files_deleted += 1
logger.info("Checked #{@files_checked}, deleted #{@files_deleted} files") if (@files_deleted % 100).zero?
end
end

def delete_empty_directories
Dir.glob("#{directory}/*").select { |path| File.directory?(path) }.each do |dir|
FileUtils.rmdir(dir)
rescue Errno::ENOTEMPTY
next
end

logger.info("Completed empty directory cleanup for #{directory}")
end

def should_be_deleted?(path, uploaded_file_id)
return true if very_old?(path)

ingested_and_old_enough?(path, uploaded_file_id)
end

def ingested_and_old_enough?(path, uploaded_file_id)
file_older_than?(path, delete_ingested_after_days) && ingested?(uploaded_file_id)
end

def very_old?(path)
file_older_than?(path, delete_all_after_days)
end

def file_older_than?(path, days)
File.mtime(path) < (Time.zone.now - days.to_i.days)
end

def ingested?(uploaded_file_id)
@files_checked += 1
record = if tenant.present?
Apartment::Tenant.switch(tenant) { Hyrax::UploadedFile.find_by(id: uploaded_file_id) }
else
Hyrax::UploadedFile.find_by(id: uploaded_file_id)
end

return false if record.nil?

record.file_set_uri.present?
end
end
40 changes: 40 additions & 0 deletions app/jobs/cleanup_upload_files_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

# Kicks off a CleanupSubDirectoryJob for the Carrierwave uploaded-file staging
# directory under the tenant's upload root.
class CleanupUploadFilesJob < ApplicationJob
non_tenant_job

CARRIERWAVE_SUBDIR = File.join("hyrax", "uploaded_file", "file").freeze

attr_reader :uploads_path, :tenant
def perform(delete_ingested_after_days:, uploads_path:, delete_all_after_days: 730, tenant: nil)
@uploads_path = uploads_path
@tenant = tenant
carrierwave_dir = File.join(uploads_path, CARRIERWAVE_SUBDIR)

unless Dir.exist?(carrierwave_dir)
logger.info("No Carrierwave directory at #{carrierwave_dir}; nothing to clean")
return
end

logger.info(message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days))

CleanupSubDirectoryJob.perform_later(
delete_ingested_after_days: delete_ingested_after_days,
directory: carrierwave_dir,
delete_all_after_days: delete_all_after_days,
tenant: tenant
)
end

private

def message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days)
<<~MESSAGE
Starting cleanup of #{carrierwave_dir}:
delete ingested after #{delete_ingested_after_days} days,
delete all files after #{delete_all_after_days} days.
MESSAGE
end
end
38 changes: 38 additions & 0 deletions lib/tasks/uploads_cleanup.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

# Scheduled via cron / Kubernetes CronJob (or similar). Uses ActiveJob only—works with Sidekiq, GoodJob,
# or any other queue adapter; no GoodJob-specific APIs.

namespace :hyku do
desc "Enqueue per-tenant Hyrax upload staging cleanup (DELETE_INGESTED_AFTER_DAYS, DELETE_ALL_AFTER_DAYS); skips S3 tenants"
task cleanup_uploads: :environment do
ingested = ENV.fetch("DELETE_INGESTED_AFTER_DAYS", "180").to_i
delete_all = ENV.fetch("DELETE_ALL_AFTER_DAYS", "730").to_i

Account.find_each do |account|
Apartment::Tenant.switch(account.tenant) do
if Site.account&.s3_bucket.present?
puts "Skipping #{account.tenant}: S3 uploads (no local staging tree)"
next
end

uploads_path = Hyrax.config.upload_path.call
uploads_path = uploads_path.to_s

unless Dir.exist?(uploads_path)
puts "Skipping #{account.tenant}: #{uploads_path} does not exist"
next
end

puts "Enqueueing cleanup for #{account.tenant} → #{uploads_path}"

CleanupUploadFilesJob.perform_later(
delete_ingested_after_days: ingested,
uploads_path: uploads_path,
delete_all_after_days: delete_all,
tenant: account.tenant
)
end
end
end
end
141 changes: 141 additions & 0 deletions spec/jobs/cleanup_sub_directory_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# frozen_string_literal: true

RSpec.describe CleanupSubDirectoryJob do
let(:old_time) { Time.zone.now - 1.year }
let(:very_old_time) { Time.zone.now - 3.years }
let(:new_time) { Time.zone.now - 1.week }
let(:tenant) { 'tenant-abc' }
let(:base_dir) { '/app/samvera/uploads/tenant-abc/hyrax/uploaded_file/file' }

let(:file_1) { "#{base_dir}/1/document.pdf" }
let(:file_2) { "#{base_dir}/2/image.jpg" }
let(:file_3) { "#{base_dir}/3/recent.txt" }
let(:file_4) { "#{base_dir}/4/not_a_file" }
let(:file_5) { "#{base_dir}/5/orphan.pdf" }
let(:file_6) { "#{base_dir}/6/ancient.pdf" }

let(:filenames) do
{ '1' => 'document.pdf', '2' => 'image.jpg', '3' => 'recent.txt',
'4' => 'not_a_file', '5' => 'orphan.pdf', '6' => 'ancient.pdf' }
end

before do
allow(Apartment::Tenant).to receive(:switch).with(tenant).and_yield

allow(Dir).to receive(:glob) do |pattern|
case pattern
when "#{base_dir}/*"
%w[1 2 3 4 5 6].map { |id| "#{base_dir}/#{id}" }
when /\A#{Regexp.escape(base_dir)}\/(\d+)\/\*\z/
id = Regexp.last_match(1)
["#{base_dir}/#{id}/#{filenames[id]}"]
else
[]
end
end

allow(File).to receive(:directory?) do |path|
%w[1 2 3 4 5 6].any? { |id| path == "#{base_dir}/#{id}" }
end

allow(File).to receive(:file?) do |path|
path != file_4 && [file_1, file_2, file_3, file_5, file_6].include?(path)
end

allow(File).to receive(:mtime).with(file_1).and_return(old_time)
allow(File).to receive(:mtime).with(file_2).and_return(old_time)
allow(File).to receive(:mtime).with(file_3).and_return(new_time)
allow(File).to receive(:mtime).with(file_5).and_return(old_time)
allow(File).to receive(:mtime).with(file_6).and_return(very_old_time)

allow(File).to receive(:delete)
allow(FileUtils).to receive(:rmdir)

allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '1')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'http://fcrepo/rest/abc'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '2')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '3')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid'))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil))
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6')
.and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil))
end

it 'deletes old files that have been ingested (file_set_uri present)' do
expect(File).to receive(:delete).with(file_1)
expect(File).to receive(:delete).with(file_2)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete files newer than delete_ingested_after_days even if ingested' do
expect(File).not_to receive(:delete).with(file_3)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete entries that are not files' do
expect(File).not_to receive(:delete).with(file_4)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'does not delete old files without file_set_uri (orphaned but not old enough)' do
expect(File).not_to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'deletes orphaned files older than delete_all_after_days' do
expect(File).to receive(:delete).with(file_6)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 730,
tenant: tenant)
end

it 'uses configurable delete_all_after_days threshold' do
expect(File).to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 300,
tenant: tenant)
end

it 'deletes files when no UploadedFile DB record exists and file is very old' do
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6').and_return(nil)
expect(File).to receive(:delete).with(file_6)
described_class.perform_now(delete_ingested_after_days: 180,
directory: base_dir,
delete_all_after_days: 730,
tenant: tenant)
end

it 'does not delete files when no UploadedFile DB record exists but file is not very old' do
allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5').and_return(nil)
expect(File).not_to receive(:delete).with(file_5)
described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

describe 'cleaning up empty directories' do
before do
allow(File).to receive(:directory?).with("#{base_dir}/1").and_return(true)
allow(File).to receive(:directory?).with("#{base_dir}/2").and_return(true)
allow(File).to receive(:directory?).with("#{base_dir}/3").and_return(true)
allow(FileUtils).to receive(:rmdir)
.with("#{base_dir}/2")
.and_raise(Errno::ENOTEMPTY)
end

it 'attempts to remove empty upload ID directories' do
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/1")
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/2")
expect(FileUtils).to receive(:rmdir).with("#{base_dir}/3")

described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant)
end

it 'continues when a directory is not empty' do
expect { described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) }
.not_to raise_error
end
end
end
74 changes: 74 additions & 0 deletions spec/jobs/cleanup_upload_files_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# frozen_string_literal: true

RSpec.describe CleanupUploadFilesJob do
before do
ActiveJob::Base.queue_adapter = :test
end

after do
clear_enqueued_jobs
end

let(:uploads_path) { '/app/samvera/uploads/tenant-abc' }
let(:tenant) { 'tenant-abc' }
let(:carrierwave_dir) { "#{uploads_path}/hyrax/uploaded_file/file" }

context 'when the Carrierwave directory exists' do
before do
allow(Dir).to receive(:exist?).and_call_original
allow(Dir).to receive(:exist?).with(carrierwave_dir).and_return(true)
end

it 'spawns a CleanupSubDirectoryJob for the Carrierwave directory' do
expect do
described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant)
end.to have_enqueued_job(CleanupSubDirectoryJob)
.with(
delete_ingested_after_days: 180,
directory: carrierwave_dir,
delete_all_after_days: 730,
tenant: tenant
)
end

it 'passes delete_all_after_days parameter to child job' do
expect do
described_class.perform_now(delete_ingested_after_days: 180,
uploads_path: uploads_path,
delete_all_after_days: 365,
tenant: tenant)
end.to have_enqueued_job(CleanupSubDirectoryJob)
.with(
delete_ingested_after_days: 180,
directory: carrierwave_dir,
delete_all_after_days: 365,
tenant: tenant
)
end

it 'uses default delete_all_after_days of 730 when not specified' do
expect do
described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant)
end.to have_enqueued_job(CleanupSubDirectoryJob)
.with(
delete_ingested_after_days: 180,
directory: carrierwave_dir,
delete_all_after_days: 730,
tenant: tenant
)
end
end

context 'when the Carrierwave directory does not exist' do
before do
allow(Dir).to receive(:exist?).and_call_original
allow(Dir).to receive(:exist?).with(carrierwave_dir).and_return(false)
end

it 'does not spawn any child jobs' do
expect do
described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant)
end.not_to have_enqueued_job(CleanupSubDirectoryJob)
end
end
end
Loading