diff --git a/app/jobs/cleanup_sub_directory_job.rb b/app/jobs/cleanup_sub_directory_job.rb new file mode 100644 index 0000000000..3692e6fe83 --- /dev/null +++ b/app/jobs/cleanup_sub_directory_job.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +# Walks the Carrierwave uploaded-file staging directory +# (//hyrax/uploaded_file/file/) and deletes files whose +# corresponding Hyrax::UploadedFile record shows they have been ingested +# (file_set_uri is present) and are old enough, or that are orphaned and +# very old. +class CleanupSubDirectoryJob < ApplicationJob + non_tenant_job + + # Avoid ApplicationJob's retry_on(StandardError) burning 5 attempts + # and swallowing the error when the retry block is present. + discard_on ArgumentError do |_job, error| + logger.warn(error.message) + end + + # Must stay aligned with CleanupUploadFilesJob::CARRIERWAVE_SUBDIR (site/banner_images, etc. must never match). + INGEST_STAGING_SUFFIX = File.join(File::SEPARATOR, "hyrax", "uploaded_file", "file").freeze + + attr_reader :delete_ingested_after_days, :delete_all_after_days, :directory, :tenant, :files_checked, :files_deleted + + def perform(delete_ingested_after_days:, directory:, delete_all_after_days: 730, tenant: nil) + assert_ingest_staging_directory!(directory) + + @directory = directory + @delete_ingested_after_days = delete_ingested_after_days + @delete_all_after_days = delete_all_after_days + @tenant = tenant + @files_checked = 0 + @files_deleted = 0 + process_upload_directories + delete_empty_directories + logger.info("Completed #{directory}: checked #{@files_checked}, deleted #{@files_deleted}") + end + + private + + def assert_ingest_staging_directory!(dir) + path = File.expand_path(dir.to_s) + return if path.end_with?(INGEST_STAGING_SUFFIX) + + relative = File.join("hyrax", "uploaded_file", "file") + raise ArgumentError, + "CleanupSubDirectoryJob only accepts Hyrax ingest staging paths ending in #{relative.inspect} " \ + "(e.g. tenant upload root + #{relative}); got #{dir.inspect}" + end + + def process_upload_directories + Dir.glob("#{directory}/*").each do |upload_dir| + next unless File.directory?(upload_dir) + + uploaded_file_id = File.basename(upload_dir) + process_upload_dir(upload_dir, uploaded_file_id) + end + end + + def process_upload_dir(upload_dir, uploaded_file_id) + Dir.glob("#{upload_dir}/*").each do |path| + next unless File.file?(path) + next unless should_be_deleted?(path, uploaded_file_id) + + File.delete(path) + @files_deleted += 1 + logger.info("Checked #{@files_checked}, deleted #{@files_deleted} files") if (@files_deleted % 100).zero? + end + end + + def delete_empty_directories + Dir.glob("#{directory}/*").select { |path| File.directory?(path) }.each do |dir| + FileUtils.rmdir(dir) + rescue Errno::ENOTEMPTY + next + end + + logger.info("Completed empty directory cleanup for #{directory}") + end + + def should_be_deleted?(path, uploaded_file_id) + return true if very_old?(path) + + ingested_and_old_enough?(path, uploaded_file_id) + end + + def ingested_and_old_enough?(path, uploaded_file_id) + file_older_than?(path, delete_ingested_after_days) && ingested?(uploaded_file_id) + end + + def very_old?(path) + file_older_than?(path, delete_all_after_days) + end + + def file_older_than?(path, days) + File.mtime(path) < (Time.zone.now - days.to_i.days) + end + + def ingested?(uploaded_file_id) + @files_checked += 1 + record = if tenant.present? + Apartment::Tenant.switch(tenant) { Hyrax::UploadedFile.find_by(id: uploaded_file_id) } + else + Hyrax::UploadedFile.find_by(id: uploaded_file_id) + end + + return false if record.nil? + + record.file_set_uri.present? + end +end diff --git a/app/jobs/cleanup_upload_files_job.rb b/app/jobs/cleanup_upload_files_job.rb new file mode 100644 index 0000000000..dc47cb7125 --- /dev/null +++ b/app/jobs/cleanup_upload_files_job.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +# Kicks off a CleanupSubDirectoryJob for the Carrierwave uploaded-file staging +# directory under the tenant's upload root. +class CleanupUploadFilesJob < ApplicationJob + non_tenant_job + + CARRIERWAVE_SUBDIR = File.join("hyrax", "uploaded_file", "file").freeze + + attr_reader :uploads_path, :tenant + def perform(delete_ingested_after_days:, uploads_path:, delete_all_after_days: 730, tenant: nil) + @uploads_path = uploads_path + @tenant = tenant + carrierwave_dir = File.join(uploads_path, CARRIERWAVE_SUBDIR) + + unless Dir.exist?(carrierwave_dir) + logger.info("No Carrierwave directory at #{carrierwave_dir}; nothing to clean") + return + end + + logger.info(message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days)) + + CleanupSubDirectoryJob.perform_later( + delete_ingested_after_days: delete_ingested_after_days, + directory: carrierwave_dir, + delete_all_after_days: delete_all_after_days, + tenant: tenant + ) + end + + private + + def message(carrierwave_dir, delete_ingested_after_days, delete_all_after_days) + <<~MESSAGE + Starting cleanup of #{carrierwave_dir}: + delete ingested after #{delete_ingested_after_days} days, + delete all files after #{delete_all_after_days} days. + MESSAGE + end +end diff --git a/lib/tasks/uploads_cleanup.rake b/lib/tasks/uploads_cleanup.rake new file mode 100644 index 0000000000..47ed9d6a10 --- /dev/null +++ b/lib/tasks/uploads_cleanup.rake @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +# Scheduled via cron / Kubernetes CronJob (or similar). Uses ActiveJob only—works with Sidekiq, GoodJob, +# or any other queue adapter; no GoodJob-specific APIs. + +namespace :hyku do + desc "Enqueue per-tenant Hyrax upload staging cleanup (DELETE_INGESTED_AFTER_DAYS, DELETE_ALL_AFTER_DAYS); skips S3 tenants" + task cleanup_uploads: :environment do + ingested = ENV.fetch("DELETE_INGESTED_AFTER_DAYS", "180").to_i + delete_all = ENV.fetch("DELETE_ALL_AFTER_DAYS", "730").to_i + + Account.find_each do |account| + Apartment::Tenant.switch(account.tenant) do + if Site.account&.s3_bucket.present? + puts "Skipping #{account.tenant}: S3 uploads (no local staging tree)" + next + end + + uploads_path = Hyrax.config.upload_path.call + uploads_path = uploads_path.to_s + + unless Dir.exist?(uploads_path) + puts "Skipping #{account.tenant}: #{uploads_path} does not exist" + next + end + + puts "Enqueueing cleanup for #{account.tenant} → #{uploads_path}" + + CleanupUploadFilesJob.perform_later( + delete_ingested_after_days: ingested, + uploads_path: uploads_path, + delete_all_after_days: delete_all, + tenant: account.tenant + ) + end + end + end +end diff --git a/spec/jobs/cleanup_sub_directory_job_spec.rb b/spec/jobs/cleanup_sub_directory_job_spec.rb new file mode 100644 index 0000000000..d0a185be71 --- /dev/null +++ b/spec/jobs/cleanup_sub_directory_job_spec.rb @@ -0,0 +1,176 @@ +# frozen_string_literal: true + +RSpec.describe CleanupSubDirectoryJob do + let(:old_time) { Time.zone.now - 1.year } + let(:very_old_time) { Time.zone.now - 3.years } + let(:new_time) { Time.zone.now - 1.week } + let(:tenant) { 'tenant-abc' } + let(:base_dir) { '/app/samvera/uploads/tenant-abc/hyrax/uploaded_file/file' } + + describe 'ingest staging directory guard' do + # Call #perform directly so ArgumentError is not intercepted by ApplicationJob's retry_on(StandardError), + # which (with a block) does not re-raise after the final attempt — perform_now would then not fail the example. + it 'raises when given a site banner directory (ticket #842)' do + banner = '/app/samvera/uploads/56e0eb81-c2d5-4d5d-9171-b251bf7299a4/site/banner_images/1' + expect do + described_class.new.perform(delete_ingested_after_days: 180, directory: banner, tenant: tenant) + end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths}) + end + + it 'raises when given the tenant upload root' do + root = '/app/samvera/uploads/tenant-abc' + expect do + described_class.new.perform(delete_ingested_after_days: 180, directory: root, tenant: tenant) + end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths}) + end + + it 'raises when given hyrax/uploaded_file without the final file segment' do + partial = '/app/samvera/uploads/tenant-abc/hyrax/uploaded_file' + expect do + described_class.new.perform(delete_ingested_after_days: 180, directory: partial, tenant: tenant) + end.to raise_error(ArgumentError, %r{only accepts Hyrax ingest staging paths}) + end + + it 'accepts a valid staging path with a trailing slash' do + allow(Apartment::Tenant).to receive(:switch).with(tenant).and_yield + allow(Dir).to receive(:glob).and_return([]) + allow(FileUtils).to receive(:rmdir) + + expect do + described_class.new.perform(delete_ingested_after_days: 180, directory: "#{base_dir}/", tenant: tenant) + end.not_to raise_error + end + end + + let(:file_1) { "#{base_dir}/1/document.pdf" } + let(:file_2) { "#{base_dir}/2/image.jpg" } + let(:file_3) { "#{base_dir}/3/recent.txt" } + let(:file_4) { "#{base_dir}/4/not_a_file" } + let(:file_5) { "#{base_dir}/5/orphan.pdf" } + let(:file_6) { "#{base_dir}/6/ancient.pdf" } + + let(:filenames) do + { '1' => 'document.pdf', '2' => 'image.jpg', '3' => 'recent.txt', + '4' => 'not_a_file', '5' => 'orphan.pdf', '6' => 'ancient.pdf' } + end + + before do + allow(Apartment::Tenant).to receive(:switch).with(tenant).and_yield + + allow(Dir).to receive(:glob) do |pattern| + case pattern + when "#{base_dir}/*" + %w[1 2 3 4 5 6].map { |id| "#{base_dir}/#{id}" } + when /\A#{Regexp.escape(base_dir)}\/(\d+)\/\*\z/ + id = Regexp.last_match(1) + ["#{base_dir}/#{id}/#{filenames[id]}"] + else + [] + end + end + + allow(File).to receive(:directory?) do |path| + %w[1 2 3 4 5 6].any? { |id| path == "#{base_dir}/#{id}" } + end + + allow(File).to receive(:file?) do |path| + path != file_4 && [file_1, file_2, file_3, file_5, file_6].include?(path) + end + + allow(File).to receive(:mtime).with(file_1).and_return(old_time) + allow(File).to receive(:mtime).with(file_2).and_return(old_time) + allow(File).to receive(:mtime).with(file_3).and_return(new_time) + allow(File).to receive(:mtime).with(file_5).and_return(old_time) + allow(File).to receive(:mtime).with(file_6).and_return(very_old_time) + + allow(File).to receive(:delete) + allow(FileUtils).to receive(:rmdir) + + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '1') + .and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'http://fcrepo/rest/abc')) + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '2') + .and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid')) + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '3') + .and_return(instance_double(Hyrax::UploadedFile, file_set_uri: 'some-uuid')) + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5') + .and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil)) + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6') + .and_return(instance_double(Hyrax::UploadedFile, file_set_uri: nil)) + end + + it 'deletes old files that have been ingested (file_set_uri present)' do + expect(File).to receive(:delete).with(file_1) + expect(File).to receive(:delete).with(file_2) + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + it 'does not delete files newer than delete_ingested_after_days even if ingested' do + expect(File).not_to receive(:delete).with(file_3) + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + it 'does not delete entries that are not files' do + expect(File).not_to receive(:delete).with(file_4) + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + it 'does not delete old files without file_set_uri (orphaned but not old enough)' do + expect(File).not_to receive(:delete).with(file_5) + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + it 'deletes orphaned files older than delete_all_after_days' do + expect(File).to receive(:delete).with(file_6) + described_class.perform_now(delete_ingested_after_days: 180, + directory: base_dir, + delete_all_after_days: 730, + tenant: tenant) + end + + it 'uses configurable delete_all_after_days threshold' do + expect(File).to receive(:delete).with(file_5) + described_class.perform_now(delete_ingested_after_days: 180, + directory: base_dir, + delete_all_after_days: 300, + tenant: tenant) + end + + it 'deletes files when no UploadedFile DB record exists and file is very old' do + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '6').and_return(nil) + expect(File).to receive(:delete).with(file_6) + described_class.perform_now(delete_ingested_after_days: 180, + directory: base_dir, + delete_all_after_days: 730, + tenant: tenant) + end + + it 'does not delete files when no UploadedFile DB record exists but file is not very old' do + allow(Hyrax::UploadedFile).to receive(:find_by).with(id: '5').and_return(nil) + expect(File).not_to receive(:delete).with(file_5) + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + describe 'cleaning up empty directories' do + before do + allow(File).to receive(:directory?).with("#{base_dir}/1").and_return(true) + allow(File).to receive(:directory?).with("#{base_dir}/2").and_return(true) + allow(File).to receive(:directory?).with("#{base_dir}/3").and_return(true) + allow(FileUtils).to receive(:rmdir) + .with("#{base_dir}/2") + .and_raise(Errno::ENOTEMPTY) + end + + it 'attempts to remove empty upload ID directories' do + expect(FileUtils).to receive(:rmdir).with("#{base_dir}/1") + expect(FileUtils).to receive(:rmdir).with("#{base_dir}/2") + expect(FileUtils).to receive(:rmdir).with("#{base_dir}/3") + + described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) + end + + it 'continues when a directory is not empty' do + expect { described_class.perform_now(delete_ingested_after_days: 180, directory: base_dir, tenant: tenant) } + .not_to raise_error + end + end +end diff --git a/spec/jobs/cleanup_upload_files_job_spec.rb b/spec/jobs/cleanup_upload_files_job_spec.rb new file mode 100644 index 0000000000..d662a23115 --- /dev/null +++ b/spec/jobs/cleanup_upload_files_job_spec.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +RSpec.describe CleanupUploadFilesJob do + before do + ActiveJob::Base.queue_adapter = :test + end + + after do + clear_enqueued_jobs + end + + let(:uploads_path) { '/app/samvera/uploads/tenant-abc' } + let(:tenant) { 'tenant-abc' } + let(:carrierwave_dir) { "#{uploads_path}/hyrax/uploaded_file/file" } + + context 'when the Carrierwave directory exists' do + before do + allow(Dir).to receive(:exist?).and_call_original + allow(Dir).to receive(:exist?).with(carrierwave_dir).and_return(true) + end + + it 'spawns a CleanupSubDirectoryJob for the Carrierwave directory' do + expect do + described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant) + end.to have_enqueued_job(CleanupSubDirectoryJob) + .with( + delete_ingested_after_days: 180, + directory: carrierwave_dir, + delete_all_after_days: 730, + tenant: tenant + ) + end + + it 'passes delete_all_after_days parameter to child job' do + expect do + described_class.perform_now(delete_ingested_after_days: 180, + uploads_path: uploads_path, + delete_all_after_days: 365, + tenant: tenant) + end.to have_enqueued_job(CleanupSubDirectoryJob) + .with( + delete_ingested_after_days: 180, + directory: carrierwave_dir, + delete_all_after_days: 365, + tenant: tenant + ) + end + + it 'uses default delete_all_after_days of 730 when not specified' do + expect do + described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant) + end.to have_enqueued_job(CleanupSubDirectoryJob) + .with( + delete_ingested_after_days: 180, + directory: carrierwave_dir, + delete_all_after_days: 730, + tenant: tenant + ) + end + end + + context 'when the Carrierwave directory does not exist' do + before do + allow(Dir).to receive(:exist?).and_call_original + allow(Dir).to receive(:exist?).with(carrierwave_dir).and_return(false) + end + + it 'does not spawn any child jobs' do + expect do + described_class.perform_now(delete_ingested_after_days: 180, uploads_path: uploads_path, tenant: tenant) + end.not_to have_enqueued_job(CleanupSubDirectoryJob) + end + end +end