diff --git a/.rubocop.yml b/.rubocop.yml index 1d2341687..d313b5ab5 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,6 +1,6 @@ inherit_gem: rubocop-rails-omakase: rubocop.yml - + Layout/IndentationWidth: Enabled: true @@ -12,4 +12,8 @@ Layout/IndentationConsistency: Enabled: true Layout/SpaceInsidePercentLiteralDelimiters: - Enabled: true \ No newline at end of file + Enabled: true + +Layout/SpaceInsideArrayLiteralBrackets: + Exclude: + - "db/schema.rb" diff --git a/app/controllers/api/v1/import_sessions_controller.rb b/app/controllers/api/v1/import_sessions_controller.rb new file mode 100644 index 000000000..99c1c96b3 --- /dev/null +++ b/app/controllers/api/v1/import_sessions_controller.rb @@ -0,0 +1,166 @@ +# frozen_string_literal: true + +class Api::V1::ImportSessionsController < Api::V1::BaseController + before_action :ensure_read_scope, only: [ :show ] + before_action :ensure_write_scope, only: [ :create, :create_chunk, :publish ] + before_action :set_import_session, only: [ :show, :create_chunk, :publish ] + + def create + @import_session = ImportSession.create_or_find_for!( + family: Current.family, + import_type: params[:type].to_s, + client_session_id: params[:client_session_id].presence, + expected_chunks: expected_chunks_param + ) + + render :show, status: :created + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + rescue ActiveRecord::RecordInvalid => e + render json: { + error: "validation_failed", + message: "Import session could not be created", + errors: e.record.errors.full_messages + }, status: :unprocessable_entity + end + + def show + render :show + end + + def create_chunk + content, filename, content_type = sure_import_upload_attributes + return unless content + + @import_session.attach_chunk!( + sequence: sequence_param, + client_chunk_id: params[:client_chunk_id].presence, + content: content, + filename: filename, + content_type: content_type + ) + + @import_session.reload + render :show, status: :created + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + rescue ActiveRecord::RecordInvalid => e + render json: { + error: "validation_failed", + message: "Import chunk could not be created", + errors: e.record.errors.full_messages + }, status: :unprocessable_entity + end + + def publish + @import_session.publish_later + @import_session.reload + render :show, status: :accepted + rescue Import::MaxRowCountExceededError + render json: { + error: "max_row_count_exceeded", + message: "Import session has too many rows to publish." + }, status: :unprocessable_entity + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + end + + private + def set_import_session + @import_session = Current.family.import_sessions.find(params[:id]) + end + + def ensure_read_scope + authorize_scope!(:read) + end + + def ensure_write_scope + authorize_scope!(:write) + end + + def expected_chunks_param + return if params[:expected_chunks].blank? + + params[:expected_chunks].to_i + end + + def sequence_param + raise ActionController::ParameterMissing.new(:sequence) if params[:sequence].blank? + + params[:sequence].to_i + end + + def sure_import_upload_attributes + if params[:file].present? + sure_import_file_upload_attributes(params[:file]) + elsif params[:raw_file_content].present? + sure_import_raw_content_attributes(params[:raw_file_content].to_s) + else + render json: { + error: "missing_content", + message: "Provide a Sure NDJSON file or raw_file_content." + }, status: :unprocessable_entity + nil + end + end + + def sure_import_file_upload_attributes(file) + if file.size > SureImport.max_ndjson_size + render json: { + error: "file_too_large", + message: "File is too large. Maximum size is #{SureImport.max_ndjson_size / 1.megabyte}MB." + }, status: :unprocessable_entity + return + end + + extension = File.extname(file.original_filename.to_s).downcase + unless SureImport::ALLOWED_NDJSON_CONTENT_TYPES.include?(file.content_type) || extension.in?(%w[.ndjson .json]) + render json: { + error: "invalid_file_type", + message: "Invalid file type. Please upload a Sure NDJSON file." + }, status: :unprocessable_entity + return + end + + sure_import_validated_attributes( + content: file.read, + filename: file.original_filename.presence || "sure-import.ndjson", + content_type: file.content_type.presence || "application/x-ndjson" + ) + end + + def sure_import_raw_content_attributes(content) + if content.bytesize > SureImport.max_ndjson_size + render json: { + error: "content_too_large", + message: "Content is too large. Maximum size is #{SureImport.max_ndjson_size / 1.megabyte}MB." + }, status: :unprocessable_entity + return + end + + sure_import_validated_attributes( + content: content, + filename: "sure-import.ndjson", + content_type: "application/x-ndjson" + ) + end + + def sure_import_validated_attributes(content:, filename:, content_type:) + unless SureImport.valid_ndjson_first_line?(content) + render json: { + error: "invalid_ndjson", + message: "Invalid Sure NDJSON content." + }, status: :unprocessable_entity + return + end + + [ content, filename, content_type ] + end + + def render_import_session_conflict(message) + render json: { + error: "import_session_conflict", + message: message + }, status: :conflict + end +end diff --git a/app/jobs/import_session_job.rb b/app/jobs/import_session_job.rb new file mode 100644 index 000000000..de7f0e377 --- /dev/null +++ b/app/jobs/import_session_job.rb @@ -0,0 +1,12 @@ +class ImportSessionJob < ApplicationJob + queue_as :high_priority + + def perform(import_session) + raise ArgumentError, "ImportSessionJob requires an import_session" if import_session.nil? + + Rails.logger.info("ImportSessionJob started import_session_id=#{import_session.id}") + import_session.publish + import_session.reload + Rails.logger.info("ImportSessionJob finished import_session_id=#{import_session.id} status=#{import_session.status}") + end +end diff --git a/app/models/family.rb b/app/models/family.rb index 168773701..15bfe999a 100644 --- a/app/models/family.rb +++ b/app/models/family.rb @@ -27,6 +27,8 @@ class Family < ApplicationRecord has_many :invitations, dependent: :destroy has_many :imports, dependent: :destroy + has_many :import_sessions, dependent: :destroy + has_many :import_source_mappings, dependent: :destroy has_many :family_exports, dependent: :destroy has_many :account_statements, dependent: :destroy diff --git a/app/models/family/data_importer.rb b/app/models/family/data_importer.rb index 8ee468dc8..7cbb8364e 100644 --- a/app/models/family/data_importer.rb +++ b/app/models/family/data_importer.rb @@ -1,12 +1,73 @@ require "set" class Family::DataImporter + MissingReferenceError = Class.new(StandardError) do + attr_reader :code, :details + + def initialize(record_type:, source_type:, source_id:) + @code = "missing_source_reference" + @details = { + record_type: record_type, + source_type: source_type, + source_id: source_id + } + + super("#{record_type} references missing #{source_type} source id #{source_id}") + end + end + + InvalidRecordError = Class.new(StandardError) do + attr_reader :code, :details + + def initialize(record_type:, field:, value:) + @code = "invalid_import_record" + @details = { + record_type: record_type, + field: field, + value: value + } + + super("#{record_type} has invalid #{field}: #{value.inspect}") + end + end + SUPPORTED_TYPES = %w[Account Balance Category Tag Merchant RecurringTransaction Transaction Transfer RejectedTransfer Trade Holding Valuation Budget BudgetCategory Rule].freeze ACCOUNTABLE_TYPES = Accountable::TYPES.freeze - - def initialize(family, ndjson_content) + MAPPING_TYPES = { + accounts: "Account", + categories: "Category", + tags: "Tag", + merchants: "Merchant", + recurring_transactions: "RecurringTransaction", + transactions: "Transaction", + budgets: "Budget", + securities: "Security", + rules: "Rule" + }.freeze + SUMMARY_KEYS = { + "Account" => "accounts", + "Balance" => "balances", + "Category" => "categories", + "Tag" => "tags", + "Merchant" => "merchants", + "RecurringTransaction" => "recurring_transactions", + "Transaction" => "transactions", + "Transfer" => "transfers", + "RejectedTransfer" => "rejected_transfers", + "Trade" => "trades", + "Holding" => "holdings", + "Valuation" => "valuations", + "Budget" => "budgets", + "BudgetCategory" => "budget_categories", + "Rule" => "rules" + }.freeze + + def initialize(family, ndjson_content, import_session: nil, import: nil) @family = family @ndjson_content = ndjson_content + @import_session = import_session + @import = import + @strict_references = import_session.present? @id_mappings = { accounts: {}, categories: {}, @@ -15,11 +76,13 @@ def initialize(family, ndjson_content) recurring_transactions: {}, transactions: {}, budgets: {}, - securities: {} + securities: {}, + rules: {} } @security_cache = {} @created_accounts = [] @created_entries = [] + @summary = Hash.new { |hash, key| hash[key] = empty_summary_bucket } end def import! @@ -46,7 +109,7 @@ def import! import_rules(records["Rule"] || []) end - { accounts: @created_accounts, entries: @created_entries } + { accounts: @created_accounts, entries: @created_entries, summary: compact_summary } end private @@ -71,6 +134,124 @@ def parse_ndjson records end + def empty_summary_bucket + { "created" => 0, "updated" => 0, "skipped" => 0, "failed" => 0 } + end + + def compact_summary + @summary.select { |_entity_type, counts| counts.values.any?(&:positive?) } + end + + def increment_summary(record_type, status) + @summary[SUMMARY_KEYS.fetch(record_type)].tap do |counts| + counts[status.to_s] = counts.fetch(status.to_s, 0) + 1 + end + end + + def map_source!(mapping_key, source_id, target) + return if source_id.blank? || target.blank? + + @id_mappings[mapping_key][source_id] = target.id + return unless @import_session + + source_type = MAPPING_TYPES.fetch(mapping_key) + mapping = @import_session.source_mappings.find_or_initialize_by( + family: @family, + source_type: source_type, + source_id: source_id + ) + mapping.target = target + mapping.save! + end + + def mapped_id(mapping_key, old_id, record_type:, required: true) + if old_id.blank? + missing_reference(record_type, mapping_key, "(blank)") if required + return + end + + return @id_mappings[mapping_key][old_id] if @id_mappings[mapping_key].key?(old_id) + + source_type = MAPPING_TYPES.fetch(mapping_key) + mapping = @import_session&.source_mappings&.find_by(source_type: source_type, source_id: old_id) + + if mapping + @id_mappings[mapping_key][old_id] = mapping.target_id + return mapping.target_id + end + + if required && @strict_references + raise MissingReferenceError.new( + record_type: record_type, + source_type: source_type, + source_id: old_id + ) + end + + nil + end + + def mapped_record(mapping_key, old_id, scope, record_type:) + target_id = mapped_id(mapping_key, old_id, record_type: record_type, required: false) + return if target_id.blank? + + scope.find_by(id: target_id) + end + + def missing_reference(record_type, mapping_key, old_id) + if @strict_references + increment_summary(record_type, :failed) + raise MissingReferenceError.new( + record_type: record_type, + source_type: MAPPING_TYPES.fetch(mapping_key), + source_id: old_id + ) + end + + increment_summary(record_type, :skipped) + nil + end + + def require_source_id!(record_type, source_id) + return if source_id.present? || !@strict_references + + increment_summary(record_type, :failed) + raise MissingReferenceError.new( + record_type: record_type, + source_type: record_type, + source_id: "(blank)" + ) + end + + def invalid_record!(record_type, field, value) + if @strict_references + increment_summary(record_type, :failed) + raise InvalidRecordError.new(record_type: record_type, field: field, value: value) + end + + increment_summary(record_type, :skipped) + nil + end + + def session_entry_source + return unless @import_session + + "sure_import_session:#{@import_session.id}" + end + + def session_entry_external_id(record_type, source_id) + return if @import_session.blank? || source_id.blank? + + "#{record_type}:#{source_id}" + end + + def session_imported_entry(account, record_type, source_id) + external_id = session_entry_external_id(record_type, source_id) + return if external_id.blank? + + account.entries.find_by(source: session_entry_source, external_id: external_id) + end + def import_accounts(records) records.each do |record| data = record["data"] @@ -78,28 +259,40 @@ def import_accounts(records) accountable_data = data["accountable"] || {} accountable_type = data["accountable_type"] - # Skip if accountable type is not valid - next unless ACCOUNTABLE_TYPES.include?(accountable_type) + require_source_id!("Account", old_id) - # Build accountable - accountable_class = accountable_type.constantize - accountable = accountable_class.new - accountable.subtype = accountable_data["subtype"] if accountable.respond_to?(:subtype=) && accountable_data["subtype"] + unless ACCOUNTABLE_TYPES.include?(accountable_type) + invalid_record!("Account", "accountable_type", accountable_type) + next + end - # Copy any other accountable attributes - safe_accountable_attrs = %w[subtype locked_attributes] - safe_accountable_attrs.each do |attr| - if accountable.respond_to?("#{attr}=") && accountable_data[attr].present? - accountable.send("#{attr}=", accountable_data[attr]) + account = mapped_record(:accounts, old_id, @family.accounts, record_type: "Account") + created = account.blank? + + if account + accountable = account.accountable + else + # Build accountable + accountable_class = accountable_type.constantize + accountable = accountable_class.new + accountable.subtype = accountable_data["subtype"] if accountable.respond_to?(:subtype=) && accountable_data["subtype"] + + # Copy any other accountable attributes + safe_accountable_attrs = %w[subtype locked_attributes] + safe_accountable_attrs.each do |attr| + if accountable.respond_to?("#{attr}=") && accountable_data[attr].present? + accountable.send("#{attr}=", accountable_data[attr]) + end end + + account = @family.accounts.build(accountable: accountable) end - account = @family.accounts.build( + account.assign_attributes( name: data["name"], balance: data["balance"].to_d, cash_balance: data["cash_balance"]&.to_d || data["balance"].to_d, currency: data["currency"] || @family.currency, - accountable: accountable, subtype: data["subtype"], institution_name: data["institution_name"], institution_domain: data["institution_domain"], @@ -111,7 +304,7 @@ def import_accounts(records) # Set opening balance if we have a historical balance and the import # does not provide an explicit opening-anchor valuation for this account. - if data["balance"].present? && !@imported_opening_anchor_account_ids.include?(old_id) + if created && data["balance"].present? && !@imported_opening_anchor_account_ids.include?(old_id) manager = Account::OpeningBalanceManager.new(account) result = manager.set_opening_balance( balance: data["balance"].to_d, @@ -120,8 +313,9 @@ def import_accounts(records) log_failed_opening_balance_import(account, old_id, result) unless result.success? end - @id_mappings[:accounts][old_id] = account.id - @created_accounts << account + map_source!(:accounts, old_id, account) + @created_accounts << account if created + increment_summary("Account", created ? :created : :updated) end end @@ -132,16 +326,23 @@ def importable_account_status(status) def import_balances(records) records.each do |record| data = record["data"] || {} - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Balance") balance_date = parse_import_date(data["date"]) - next if new_account_id.blank? || balance_date.blank? || data["balance"].blank? + next if new_account_id.blank? + + if balance_date.blank? || data["balance"].blank? + increment_summary("Balance", :skipped) + next + end account = @family.accounts.find(new_account_id) currency = data["currency"].presence || account.currency balance = account.balances.find_or_initialize_by(date: balance_date, currency: currency) + created = balance.new_record? balance.assign_attributes(imported_balance_attributes(data)) balance.save! + increment_summary("Balance", created ? :created : :updated) end end @@ -181,10 +382,16 @@ def import_categories(records) old_id = data["id"] parent_id = data["parent_id"] + require_source_id!("Category", old_id) + # Store parent relationship for second pass parent_mappings[old_id] = parent_id if parent_id.present? - category = @family.categories.build( + category = mapped_record(:categories, old_id, @family.categories, record_type: "Category") + created = category.blank? + category ||= @family.categories.build + + category.assign_attributes( name: data["name"], color: data["color"] || Category::UNCATEGORIZED_COLOR, classification_unused: data["classification_unused"] || data["classification"] || "expense", @@ -192,13 +399,14 @@ def import_categories(records) ) category.save! - @id_mappings[:categories][old_id] = category.id + map_source!(:categories, old_id, category) + increment_summary("Category", created ? :created : :updated) end # Second pass: establish parent relationships parent_mappings.each do |old_id, old_parent_id| - new_id = @id_mappings[:categories][old_id] - new_parent_id = @id_mappings[:categories][old_parent_id] + new_id = mapped_id(:categories, old_id, record_type: "Category") + new_parent_id = mapped_id(:categories, old_parent_id, record_type: "Category") next unless new_id && new_parent_id @@ -212,13 +420,23 @@ def import_tags(records) data = record["data"] old_id = data["id"] - tag = @family.tags.build( + require_source_id!("Tag", old_id) + + tag = mapped_record(:tags, old_id, @family.tags, record_type: "Tag") + created = tag.blank? + tag ||= @family.tags.build + color = data["color"] || tag.color + # Keep replayed session imports deterministic when the source omits a color. + color ||= Tag::COLORS.first if created + + tag.assign_attributes( name: data["name"], - color: data["color"] || Tag::COLORS.sample + color: color ) tag.save! - @id_mappings[:tags][old_id] = tag.id + map_source!(:tags, old_id, tag) + increment_summary("Tag", created ? :created : :updated) end end @@ -227,14 +445,21 @@ def import_merchants(records) data = record["data"] old_id = data["id"] - merchant = @family.merchants.build( + require_source_id!("Merchant", old_id) + + merchant = mapped_record(:merchants, old_id, @family.merchants, record_type: "Merchant") + created = merchant.blank? + merchant ||= @family.merchants.build + + merchant.assign_attributes( name: data["name"], color: data["color"], logo_url: data["logo_url"] ) merchant.save! - @id_mappings[:merchants][old_id] = merchant.id + map_source!(:merchants, old_id, merchant) + increment_summary("Merchant", created ? :created : :updated) end end @@ -243,10 +468,20 @@ def import_recurring_transactions(records) data = record["data"] old_id = data["id"] - new_account_id = remap_optional_id(:accounts, data["account_id"]) + require_source_id!("RecurringTransaction", old_id) + + recurring_transaction = mapped_record( + :recurring_transactions, + old_id, + @family.recurring_transactions, + record_type: "RecurringTransaction" + ) + created = recurring_transaction.blank? + + new_account_id = remap_optional_id(:accounts, data["account_id"], record_type: "RecurringTransaction") next if data["account_id"].present? && new_account_id.blank? - new_merchant_id = remap_optional_id(:merchants, data["merchant_id"]) + new_merchant_id = remap_optional_id(:merchants, data["merchant_id"], record_type: "RecurringTransaction") next if data["merchant_id"].present? && new_merchant_id.blank? expected_day_of_month = recurring_expected_day_for(data["expected_day_of_month"]) @@ -255,7 +490,8 @@ def import_recurring_transactions(records) next_expected_date = parse_import_date(data["next_expected_date"]) next unless last_occurrence_date && next_expected_date - recurring_transaction = @family.recurring_transactions.build( + recurring_transaction ||= @family.recurring_transactions.build + recurring_transaction.assign_attributes( account_id: new_account_id, merchant_id: new_merchant_id, amount: data["amount"].to_d, @@ -273,14 +509,15 @@ def import_recurring_transactions(records) ) recurring_transaction.save! - @id_mappings[:recurring_transactions][old_id] = recurring_transaction.id + map_source!(:recurring_transactions, old_id, recurring_transaction) + increment_summary("RecurringTransaction", created ? :created : :updated) end end - def remap_optional_id(mapping_key, old_id) + def remap_optional_id(mapping_key, old_id, record_type:) return if old_id.blank? - @id_mappings[mapping_key][old_id] + mapped_id(mapping_key, old_id, record_type: record_type) end def recurring_transaction_status_for(status) @@ -305,8 +542,10 @@ def import_transactions(records) data = record["data"] old_id = data["id"] + require_source_id!("Transaction", old_id) + # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Transaction") next unless new_account_id account = @family.accounts.find(new_account_id) @@ -314,78 +553,93 @@ def import_transactions(records) # Map category ID (optional) new_category_id = nil if data["category_id"].present? - new_category_id = @id_mappings[:categories][data["category_id"]] + new_category_id = mapped_id(:categories, data["category_id"], record_type: "Transaction") end # Map merchant ID (optional) new_merchant_id = nil if data["merchant_id"].present? - new_merchant_id = @id_mappings[:merchants][data["merchant_id"]] + new_merchant_id = mapped_id(:merchants, data["merchant_id"], record_type: "Transaction") end # Map tag IDs (optional) new_tag_ids = [] if data["tag_ids"].present? - new_tag_ids = Array(data["tag_ids"]).map { |old_tag_id| @id_mappings[:tags][old_tag_id] }.compact + new_tag_ids = Array(data["tag_ids"]).map do |old_tag_id| + mapped_id(:tags, old_tag_id, record_type: "Transaction") + end.compact end - transaction = Transaction.new( + entry = session_imported_entry(account, "Transaction", old_id) + transaction = entry&.entryable if entry&.entryable.is_a?(Transaction) + created = transaction.blank? + + transaction ||= Transaction.new + transaction.assign_attributes( category_id: new_category_id, merchant_id: new_merchant_id, kind: data["kind"] || "standard" ) - entry = Entry.new( + entry ||= Entry.new(entryable: transaction) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: data["name"] || "Imported transaction", currency: data["currency"] || account.currency, notes: data["notes"], - excluded: data["excluded"] || false, - entryable: transaction + excluded: data["excluded"] || false ) + if @import_session + entry.external_id = session_entry_external_id("Transaction", old_id) + entry.source = session_entry_source + end entry.save! # Add tags through the tagging association + transaction.taggings.destroy_all unless created new_tag_ids.each do |tag_id| transaction.taggings.create!(tag_id: tag_id) end - @created_entries << entry - @id_mappings[:transactions][old_id] = transaction.id + @created_entries << entry if created + map_source!(:transactions, old_id, transaction) + increment_summary("Transaction", created ? :created : :updated) end end def import_transfers(records) records.each do |record| data = record["data"] - inflow_transaction_id = @id_mappings[:transactions][data["inflow_transaction_id"]] - outflow_transaction_id = @id_mappings[:transactions][data["outflow_transaction_id"]] + inflow_transaction_id = mapped_id(:transactions, data["inflow_transaction_id"], record_type: "Transfer") + outflow_transaction_id = mapped_id(:transactions, data["outflow_transaction_id"], record_type: "Transfer") next unless inflow_transaction_id && outflow_transaction_id - Transfer.find_or_create_by!( + transfer = Transfer.find_or_create_by!( inflow_transaction_id: inflow_transaction_id, outflow_transaction_id: outflow_transaction_id ) do |transfer| transfer.status = transfer_status_for(data["status"]) transfer.notes = data["notes"] end + increment_summary("Transfer", transfer.previously_new_record? ? :created : :updated) end end def import_rejected_transfers(records) records.each do |record| data = record["data"] - inflow_transaction_id = @id_mappings[:transactions][data["inflow_transaction_id"]] - outflow_transaction_id = @id_mappings[:transactions][data["outflow_transaction_id"]] + inflow_transaction_id = mapped_id(:transactions, data["inflow_transaction_id"], record_type: "RejectedTransfer") + outflow_transaction_id = mapped_id(:transactions, data["outflow_transaction_id"], record_type: "RejectedTransfer") next unless inflow_transaction_id && outflow_transaction_id - RejectedTransfer.find_or_create_by!( + rejected_transfer = RejectedTransfer.find_or_create_by!( inflow_transaction_id: inflow_transaction_id, outflow_transaction_id: outflow_transaction_id ) + increment_summary("RejectedTransfer", rejected_transfer.previously_new_record? ? :created : :updated) end end @@ -400,9 +654,12 @@ def transfer_status_for(status) def import_trades(records) records.each do |record| data = record["data"] + old_id = data["id"] + + require_source_id!("Trade", old_id) # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Trade") next unless new_account_id account = @family.accounts.find(new_account_id) @@ -419,34 +676,47 @@ def import_trades(records) exchange_operating_mic: data["exchange_operating_mic"] ) - trade = Trade.new( + entry = session_imported_entry(account, "Trade", old_id) + trade = entry&.entryable if entry&.entryable.is_a?(Trade) + created = trade.blank? + + trade ||= Trade.new + trade.assign_attributes( security: security, qty: data["qty"].to_d, price: data["price"].to_d, currency: data["currency"] || account.currency ) - entry = Entry.new( + entry ||= Entry.new(entryable: trade) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: "#{data["qty"].to_d >= 0 ? 'Buy' : 'Sell'} #{ticker}", - currency: data["currency"] || account.currency, - entryable: trade + currency: data["currency"] || account.currency ) + if @import_session + entry.external_id = session_entry_external_id("Trade", old_id) + entry.source = session_entry_source + end entry.save! - @created_entries << entry + @created_entries << entry if created + increment_summary("Trade", created ? :created : :updated) end end def import_holdings(records) - accounts_by_id = @family.accounts.where(id: records.filter_map { |record| @id_mappings[:accounts][record.dig("data", "account_id")] }).index_by(&:id) + account_ids = records.filter_map do |record| + mapped_id(:accounts, record.dig("data", "account_id"), record_type: "Holding", required: false) + end + accounts_by_id = @family.accounts.where(id: account_ids).index_by(&:id) records.each do |record| data = record["data"] - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Holding") next unless new_account_id account = accounts_by_id[new_account_id] @@ -481,33 +751,46 @@ def import_holdings(records) security_locked: truthy?(data["security_locked"]) || false } - upsert_imported_holding!(account, security, holding_date, holding_currency, holding_attributes) + created = upsert_imported_holding!(account, security, holding_date, holding_currency, holding_attributes) + increment_summary("Holding", created ? :created : :updated) end end def import_valuations(records) records.each do |record| data = record["data"] + old_id = data["id"] + + require_source_id!("Valuation", old_id) # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Valuation") next unless new_account_id account = @family.accounts.find(new_account_id) - valuation = Valuation.new(kind: valuation_kind_for(data["kind"])) + entry = session_imported_entry(account, "Valuation", old_id) + valuation = entry&.entryable if entry&.entryable.is_a?(Valuation) + created = valuation.blank? + valuation ||= Valuation.new + valuation.kind = valuation_kind_for(data["kind"]) - entry = Entry.new( + entry ||= Entry.new(entryable: valuation) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: data["name"] || "Valuation", - currency: data["currency"] || account.currency, - entryable: valuation + currency: data["currency"] || account.currency ) + if @import_session + entry.external_id = session_entry_external_id("Valuation", old_id) + entry.source = session_entry_source + end entry.save! - @created_entries << entry + @created_entries << entry if created + increment_summary("Valuation", created ? :created : :updated) end end @@ -573,7 +856,13 @@ def import_budgets(records) data = record["data"] old_id = data["id"] - budget = @family.budgets.build( + require_source_id!("Budget", old_id) + + budget = mapped_record(:budgets, old_id, @family.budgets, record_type: "Budget") + created = budget.blank? + budget ||= @family.budgets.build + + budget.assign_attributes( start_date: Date.parse(data["start_date"].to_s), end_date: Date.parse(data["end_date"].to_s), budgeted_spending: data["budgeted_spending"]&.to_d, @@ -582,7 +871,8 @@ def import_budgets(records) ) budget.save! - @id_mappings[:budgets][old_id] = budget.id + map_source!(:budgets, old_id, budget) + increment_summary("Budget", created ? :created : :updated) end end @@ -591,36 +881,49 @@ def import_budget_categories(records) data = record["data"] # Map budget ID - new_budget_id = @id_mappings[:budgets][data["budget_id"]] + new_budget_id = mapped_id(:budgets, data["budget_id"], record_type: "BudgetCategory") next unless new_budget_id # Map category ID - new_category_id = @id_mappings[:categories][data["category_id"]] + new_category_id = mapped_id(:categories, data["category_id"], record_type: "BudgetCategory") next unless new_category_id budget = @family.budgets.find(new_budget_id) - budget_category = budget.budget_categories.build( + budget_category = budget.budget_categories.find_or_initialize_by(category_id: new_category_id) + created = budget_category.new_record? + budget_category.assign_attributes( category_id: new_category_id, budgeted_spending: data["budgeted_spending"].to_d, currency: data["currency"] || budget.currency ) budget_category.save! + increment_summary("BudgetCategory", created ? :created : :updated) end end def import_rules(records) records.each do |record| data = record["data"] + old_id = data["id"] + + require_source_id!("Rule", old_id) - rule = @family.rules.build( + rule = mapped_record(:rules, old_id, @family.rules, record_type: "Rule") + created = rule.blank? + rule ||= @family.rules.build + + rule.assign_attributes( name: data["name"], resource_type: data["resource_type"] || "transaction", active: data["active"] || false, effective_date: data["effective_date"].present? ? Date.parse(data["effective_date"].to_s) : nil ) + rule.conditions.destroy_all unless created + rule.actions.destroy_all unless created + # Build conditions (data["conditions"] || []).each do |condition_data| build_rule_condition(rule, condition_data) @@ -632,6 +935,8 @@ def import_rules(records) end rule.save! + map_source!(:rules, old_id, rule) + increment_summary("Rule", created ? :created : :updated) end end @@ -768,8 +1073,9 @@ def find_or_create_security(ticker, currency, old_security_id: nil, **attributes return security end - if old_security_id.present? && @id_mappings[:securities][old_security_id] - security = Security.find(@id_mappings[:securities][old_security_id]) + mapped_security_id = mapped_id(:securities, old_security_id, record_type: "Security", required: false) + if old_security_id.present? && mapped_security_id + security = Security.find(mapped_security_id) apply_security_metadata(security, normalized_ticker, attributes) @security_cache[cache_key] = security return security @@ -779,7 +1085,7 @@ def find_or_create_security(ticker, currency, old_security_id: nil, **attributes apply_security_metadata(security, normalized_ticker, attributes) @security_cache[cache_key] = security - @id_mappings[:securities][old_security_id] = security.id if old_security_id.present? + map_source!(:securities, old_security_id, security) if old_security_id.present? security end @@ -824,6 +1130,7 @@ def assign_if_blank_or_placeholder(record, attribute, value, placeholder:) def upsert_imported_holding!(account, security, date, currency, attributes) holding = account.holdings.find_or_initialize_by(security: security, date: date, currency: currency) + created = holding.new_record? holding.assign_attributes(attributes) begin @@ -831,7 +1138,10 @@ def upsert_imported_holding!(account, security, date, currency, attributes) rescue ActiveRecord::RecordNotUnique existing = account.holdings.find_by!(security: security, date: date, currency: currency) existing.update!(attributes) + created = false end + + created end def security_kind_for(value) diff --git a/app/models/import.rb b/app/models/import.rb index 2f1256ff4..22b4513d1 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -33,11 +33,13 @@ def self.max_csv_size belongs_to :family belongs_to :account, optional: true + belongs_to :import_session, optional: true before_validation :set_default_number_format before_validation :ensure_utf8_encoding scope :ordered, -> { order(created_at: :desc) } + scope :ordered_by_sequence, -> { order(:sequence, :created_at) } enum :status, { pending: "pending", @@ -53,6 +55,7 @@ def self.max_csv_size validates :col_sep, inclusion: { in: SEPARATORS.map(&:last) } validates :signage_convention, inclusion: { in: SIGNAGE_CONVENTIONS }, allow_nil: true validates :number_format, presence: true, inclusion: { in: NUMBER_FORMATS.keys } + validates :client_chunk_id, length: { maximum: 255 }, allow_blank: true validate :custom_column_import_requires_identifier validates :rows_to_skip, numericality: { only_integer: true, greater_than_or_equal_to: 0 } validate :account_belongs_to_family diff --git a/app/models/import_session.rb b/app/models/import_session.rb new file mode 100644 index 000000000..75e26b480 --- /dev/null +++ b/app/models/import_session.rb @@ -0,0 +1,389 @@ +require "digest" + +class ImportSession < ApplicationRecord + ConflictError = Class.new(StandardError) + + IMPORT_TYPES = %w[SureImport].freeze + STATUSES = %w[pending importing complete failed].freeze + + belongs_to :family + has_many :imports, -> { order(:sequence, :created_at) }, dependent: :destroy + has_many :source_mappings, + class_name: "ImportSourceMapping", + dependent: :destroy + + enum :status, { + pending: "pending", + importing: "importing", + complete: "complete", + failed: "failed" + }, validate: true, default: "pending" + + validates :import_type, inclusion: { in: IMPORT_TYPES } + validates :client_session_id, uniqueness: { scope: :family_id }, allow_blank: true + validates :client_session_id, length: { maximum: 255 }, allow_blank: true + validates :expected_chunks, + numericality: { only_integer: true, greater_than: 0 }, + allow_nil: true + + def self.create_or_find_for!(family:, import_type:, client_session_id:, expected_chunks:) + import_type = import_type.presence || "SureImport" + expected_chunks = expected_chunks.present? ? expected_chunks.to_i : nil + unless IMPORT_TYPES.include?(import_type) + session = new(import_type: import_type) + session.errors.add(:import_type, "must be SureImport") + raise ActiveRecord::RecordInvalid.new(session) + end + + if client_session_id.present? + session = family.import_sessions.find_or_initialize_by(client_session_id: client_session_id) + if session.persisted? && + expected_chunks.present? && + session.expected_chunks.present? && + session.expected_chunks != expected_chunks + raise ConflictError, "client_session_id already exists with a different expected_chunks value" + end + else + session = family.import_sessions.build + end + + session.import_type = import_type + session.expected_chunks ||= expected_chunks + session.save! + session + rescue ActiveRecord::RecordNotUnique + raise unless client_session_id.present? + + existing = family.import_sessions.find_by(client_session_id: client_session_id) + raise unless existing + + if expected_chunks.present? && + existing.expected_chunks.present? && + existing.expected_chunks != expected_chunks + raise ConflictError, "client_session_id already exists with a different expected_chunks value" + end + if expected_chunks.present? && existing.expected_chunks.nil? + existing.update!(expected_chunks: expected_chunks) + end + + existing + end + + def attach_chunk!(sequence:, content:, filename:, content_type:, client_chunk_id: nil) + sequence = sequence.to_i + raise ConflictError, "sequence must be a positive integer" unless sequence.positive? + + checksum = Digest::SHA256.hexdigest(content) + normalized_client_chunk_id = client_chunk_id.presence + chunk_needs_finalization = false + + chunk = with_lock do + raise ConflictError, "cannot add chunks after publishing starts" unless pending? || failed? + + existing = existing_chunk_for!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum + ) + + if existing + chunk_needs_finalization = prepare_existing_chunk_for_retry!( + existing, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + existing + else + chunk_needs_finalization = true + chunk = create_chunk!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + end + end + + finalize_chunk_for_retry!(chunk, checksum) if chunk_needs_finalization + chunk + rescue ActiveRecord::RecordNotUnique + imports.reset + existing = existing_chunk_for!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum + ) + return prepare_and_finalize_existing_chunk!( + existing, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) if existing + + raise ConflictError, "chunk already exists with different content" + end + + def create_chunk!(sequence:, client_chunk_id:, checksum:, content:, filename:, content_type:) + imports.create!( + family: family, + type: "SureImport", + sequence: sequence, + client_chunk_id: client_chunk_id, + checksum: checksum + ).tap do |import| + import.ndjson_file.attach( + io: StringIO.new(content), + filename: filename, + content_type: content_type + ) + end + end + private :create_chunk! + + def publish_later + previous_status = nil + should_enqueue = false + + with_lock do + return if complete? || importing? + + raise Import::MaxRowCountExceededError if row_count_exceeded? + raise ConflictError, "import session has no chunks" unless imports.exists? + validate_expected_chunk_sequences! + + previous_status = status + update!(status: :importing, error_details: {}) + should_enqueue = true + end + + return unless should_enqueue + + begin + ImportSessionJob.perform_later(self) + rescue => error + with_lock do + reload + if importing? + update!(status: previous_status, error_details: enqueue_error_details) + end + end + raise + end + end + + def publish + return unless prepare_for_publish! + + Rails.logger.info("ImportSession publish started import_session_id=#{id}") + + imports.ordered_by_sequence.each do |import| + process_chunk!(import) + end + + update!(status: :complete, summary: aggregate_chunk_summaries, error_details: {}) + enqueue_family_sync + Rails.logger.info("ImportSession publish completed import_session_id=#{id}") + rescue => error + update!( + status: :failed, + error_details: error_details_for(error), + summary: aggregate_chunk_summaries + ) + Rails.logger.error("ImportSession publish failed import_session_id=#{id} exception=#{error.class}") + end + + def aggregate_chunk_summaries + imports.each_with_object({}) do |import, totals| + merge_summary!(totals, import.summary || {}) + end + end + + private + def prepare_for_publish! + with_lock do + return false if complete? + + raise Import::MaxRowCountExceededError if row_count_exceeded? + raise ConflictError, "import session has no chunks" unless imports.exists? + validate_expected_chunk_sequences! + + update!(status: :importing, error_details: {}) unless importing? + true + end + end + + def enqueue_family_sync + family.sync_later + rescue => error + update!(error_details: sync_enqueue_error_details) + Rails.logger.error( + "ImportSession family sync enqueue failed import_session_id=#{id} exception=#{error.class}" + ) + end + + def existing_chunk_for!(sequence:, client_chunk_id:, checksum:) + sequence_match = imports.find_by(sequence: sequence) + client_chunk_match = imports.find_by(client_chunk_id: client_chunk_id) if client_chunk_id.present? + + if sequence_match && client_chunk_match && sequence_match.id != client_chunk_match.id + raise ConflictError, "sequence and client_chunk_id refer to different chunks" + end + + existing = sequence_match || client_chunk_match + return unless existing + + if existing.sequence != sequence + raise ConflictError, "client_chunk_id already exists with a different sequence" + end + + if client_chunk_id.present? && existing.client_chunk_id.present? && existing.client_chunk_id != client_chunk_id + raise ConflictError, "sequence already exists with a different client_chunk_id" + end + + raise ConflictError, "chunk already exists with different content" unless existing.checksum == checksum + + existing + end + + def prepare_and_finalize_existing_chunk!(chunk, checksum:, content:, filename:, content_type:) + needs_finalization = with_lock do + prepare_existing_chunk_for_retry!( + chunk.reload, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + end + + finalize_chunk_for_retry!(chunk, checksum) if needs_finalization + chunk + end + + def prepare_existing_chunk_for_retry!(chunk, checksum:, content:, filename:, content_type:) + return false if chunk_ready_for_retry?(chunk, checksum) + return true if chunk.ndjson_file.attached? && chunk_content_checksum(chunk) == checksum + + chunk.ndjson_file.attach( + io: StringIO.new(content), + filename: filename, + content_type: content_type + ) + true + end + + def finalize_chunk_for_retry!(chunk, checksum) + chunk.sync_ndjson_rows_count! + chunk.reload + return chunk if chunk_ready_for_retry?(chunk, checksum) + + raise ConflictError, "chunk already exists but is incomplete" + rescue ActiveStorage::FileNotFoundError + raise ConflictError, "chunk already exists but is incomplete" + end + + def chunk_ready_for_retry?(chunk, checksum) + chunk.ndjson_file.attached? && + chunk.rows_count.to_i.positive? && + chunk_content_checksum(chunk) == checksum + end + + def chunk_content_checksum(chunk) + Digest::SHA256.hexdigest(chunk.ndjson_file.download) + rescue ActiveStorage::FileNotFoundError + nil + end + + def process_chunk!(import) + return if import.complete? + + import.update!(status: :importing, error: nil, error_details: {}) + result = import.import!(import_session: self) + import.update!(status: :complete, summary: result.fetch(:summary, {}), error_details: {}) + rescue => error + import.update!( + status: :failed, + error: error.message, + error_details: error_details_for(error), + summary: failed_summary_for(error) + ) + raise + end + + def row_count_exceeded? + imports.sum(:rows_count) > SureImport.max_row_count + end + + def validate_expected_chunk_sequences! + return if expected_chunks.blank? + + expected_sequences = (1..expected_chunks).to_a + actual_sequences = imports.pluck(:sequence).sort + return if actual_sequences == expected_sequences + + missing_sequences = expected_sequences - actual_sequences + unexpected_sequences = actual_sequences - expected_sequences + details = [] + details << "missing sequences: #{missing_sequences.join(', ')}" if missing_sequences.any? + details << "unexpected sequences: #{unexpected_sequences.join(', ')}" if unexpected_sequences.any? + + raise ConflictError, "import session chunks do not match expected sequences (#{details.join('; ')})" + end + + def error_details_for(error) + details = { + "code" => error.respond_to?(:code) ? error.code : "import_failed", + "message" => error.message + } + + if error.respond_to?(:details) + details.merge!(error.details.stringify_keys) + end + + details + end + + def enqueue_error_details + { + "code" => "import_enqueue_failed", + "message" => "Import session could not be queued." + } + end + + def sync_enqueue_error_details + { + "code" => "family_sync_enqueue_failed", + "message" => "Family sync could not be queued after import completion." + } + end + + def merge_summary!(totals, summary) + summary.each do |entity_type, counts| + next unless counts.respond_to?(:each) + + totals[entity_type] ||= {} + counts.each do |status, count| + totals[entity_type][status] = totals[entity_type].fetch(status, 0) + count.to_i + end + end + end + + def failed_summary_for(error) + record_type = error_details_for(error)["record_type"] + return {} if record_type.blank? + + { + record_type.to_s.underscore.pluralize => { + "created" => 0, + "updated" => 0, + "skipped" => 0, + "failed" => 1 + } + } + end +end diff --git a/app/models/import_source_mapping.rb b/app/models/import_source_mapping.rb new file mode 100644 index 000000000..d9bdc673d --- /dev/null +++ b/app/models/import_source_mapping.rb @@ -0,0 +1,10 @@ +class ImportSourceMapping < ApplicationRecord + belongs_to :family + belongs_to :import_session + belongs_to :target, polymorphic: true + + validates :source_type, :source_id, :target_type, :target_id, presence: true + validates :source_type, length: { maximum: 64 } + validates :source_id, length: { maximum: 255 } + validates :source_id, uniqueness: { scope: [ :import_session_id, :source_type ] } +end diff --git a/app/models/sure_import.rb b/app/models/sure_import.rb index 23815437a..9d0ca81c5 100644 --- a/app/models/sure_import.rb +++ b/app/models/sure_import.rb @@ -104,12 +104,25 @@ def dry_run self.class.dry_run_totals_from_ndjson(ndjson_blob_string) end - def import! - importer = Family::DataImporter.new(family, ndjson_blob_string) + def import!(import_session: nil) + importer = Family::DataImporter.new(family, ndjson_blob_string, import_session: import_session, import: self) result = importer.import! - result[:accounts].each { |account| accounts << account } - result[:entries].each { |entry| entries << entry } + Import.transaction do + result[:accounts].each { |account| account.save! if account.new_record? } + result[:entries].each { |entry| entry.save! if entry.new_record? } + + account_ids = result[:accounts].filter_map(&:id) + entry_ids = result[:entries].filter_map(&:id) + existing_account_ids = accounts.where(id: account_ids).pluck(:id) + existing_entry_ids = entries.where(id: entry_ids).pluck(:id) + + accounts.concat(result[:accounts].reject { |account| existing_account_ids.include?(account.id) }) + entries.concat(result[:entries].reject { |entry| existing_entry_ids.include?(entry.id) }) + update!(summary: result[:summary]) if has_attribute?(:summary) + end + + result end def uploaded? diff --git a/app/views/api/v1/import_sessions/show.json.jbuilder b/app/views/api/v1/import_sessions/show.json.jbuilder new file mode 100644 index 000000000..b54ca33b6 --- /dev/null +++ b/app/views/api/v1/import_sessions/show.json.jbuilder @@ -0,0 +1,28 @@ +chunks = @import_session.imports.ordered_by_sequence.to_a + +json.data do + json.id @import_session.id + json.type @import_session.import_type + json.status @import_session.status + json.client_session_id @import_session.client_session_id + json.expected_chunks @import_session.expected_chunks + json.chunks_count chunks.size + json.summary @import_session.summary || {} + json.error @import_session.error_details.presence + json.created_at @import_session.created_at + json.updated_at @import_session.updated_at + + json.chunks do + json.array! chunks do |import| + json.id import.id + json.sequence import.sequence + json.client_chunk_id import.client_chunk_id + json.status import.status + json.rows_count import.rows_count + json.summary import.summary || {} + json.error import.error_details.presence + json.created_at import.created_at + json.updated_at import.updated_at + end + end +end diff --git a/config/routes.rb b/config/routes.rb index 5f4aa337a..53e574c8a 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -514,6 +514,10 @@ post :preflight, on: :collection get :rows, on: :member end + resources :import_sessions, only: [ :show, :create ] do + post :chunks, on: :member, action: :create_chunk + post :publish, on: :member + end resource :usage, only: [ :show ], controller: :usage resource :balance_sheet, only: [ :show ], controller: :balance_sheet resource :family_settings, only: [ :show ], controller: :family_settings diff --git a/db/migrate/20260513013000_create_import_sessions.rb b/db/migrate/20260513013000_create_import_sessions.rb new file mode 100644 index 000000000..037aa33e3 --- /dev/null +++ b/db/migrate/20260513013000_create_import_sessions.rb @@ -0,0 +1,72 @@ +class CreateImportSessions < ActiveRecord::Migration[7.2] + def change + create_table :import_sessions, id: :uuid do |t| + t.references :family, null: false, foreign_key: true, type: :uuid + t.string :import_type, null: false, default: "SureImport" + t.string :status, null: false, default: "pending" + t.string :client_session_id, limit: 255 + t.integer :expected_chunks + t.jsonb :summary, null: false, default: {} + t.jsonb :error_details, null: false, default: {} + + t.timestamps + + t.index [ :family_id, :client_session_id ], + unique: true, + where: "client_session_id IS NOT NULL", + name: "idx_import_sessions_on_family_client_session" + t.index [ :family_id, :status ] + t.index [ :id, :family_id ], + unique: true, + name: "idx_import_sessions_on_id_family" + end + + create_table :import_source_mappings, id: :uuid do |t| + t.references :family, null: false, foreign_key: true, type: :uuid + t.references :import_session, null: false, type: :uuid + t.string :source_type, null: false, limit: 64 + t.string :source_id, null: false, limit: 255 + t.references :target, + polymorphic: true, + null: false, + type: :uuid, + index: { name: "idx_import_source_mappings_on_target" } + + t.timestamps + + t.index [ :import_session_id, :source_type, :source_id ], + unique: true, + name: "index_import_source_mappings_on_session_type_and_source" + t.index [ :family_id, :source_type, :source_id ], + name: "idx_import_source_mappings_on_family_source" + end + + add_foreign_key :import_source_mappings, + :import_sessions, + column: [ :import_session_id, :family_id ], + primary_key: [ :id, :family_id ], + on_delete: :cascade, + name: "fk_import_source_mappings_session_family" + + add_reference :imports, + :import_session, + type: :uuid, + foreign_key: { on_delete: :cascade } + add_column :imports, :sequence, :integer + add_column :imports, :client_chunk_id, :string, limit: 255 + add_column :imports, :checksum, :string, limit: 64 + add_column :imports, :summary, :jsonb, null: false, default: {} + add_column :imports, :error_details, :jsonb, null: false, default: {} + + add_index :imports, + [ :import_session_id, :sequence ], + unique: true, + where: "import_session_id IS NOT NULL AND sequence IS NOT NULL", + name: "idx_imports_on_session_sequence" + add_index :imports, + [ :import_session_id, :client_chunk_id ], + unique: true, + where: "import_session_id IS NOT NULL AND client_chunk_id IS NOT NULL", + name: "idx_imports_on_session_client_chunk" + end +end diff --git a/db/schema.rb b/db/schema.rb index 943e4dfe3..20f43a597 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2026_05_12_211200) do +ActiveRecord::Schema[7.2].define(version: 2026_05_13_013000) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -858,6 +858,38 @@ t.check_constraint "source_row_number > 0", name: "chk_import_rows_source_row_number_positive" end + create_table "import_sessions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.uuid "family_id", null: false + t.string "import_type", default: "SureImport", null: false + t.string "status", default: "pending", null: false + t.string "client_session_id", limit: 255 + t.integer "expected_chunks" + t.jsonb "summary", default: {}, null: false + t.jsonb "error_details", default: {}, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["family_id", "client_session_id"], name: "idx_import_sessions_on_family_client_session", unique: true, where: "(client_session_id IS NOT NULL)" + t.index ["family_id", "status"], name: "index_import_sessions_on_family_id_and_status" + t.index ["family_id"], name: "index_import_sessions_on_family_id" + t.index ["id", "family_id"], name: "idx_import_sessions_on_id_family", unique: true + end + + create_table "import_source_mappings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.uuid "family_id", null: false + t.uuid "import_session_id", null: false + t.string "source_type", limit: 64, null: false + t.string "source_id", limit: 255, null: false + t.string "target_type", null: false + t.uuid "target_id", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["family_id", "source_type", "source_id"], name: "idx_import_source_mappings_on_family_source" + t.index ["family_id"], name: "index_import_source_mappings_on_family_id" + t.index ["import_session_id", "source_type", "source_id"], name: "index_import_source_mappings_on_session_type_and_source", unique: true + t.index ["import_session_id"], name: "index_import_source_mappings_on_import_session_id" + t.index ["target_type", "target_id"], name: "idx_import_source_mappings_on_target" + end + create_table "imports", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.jsonb "column_mappings" t.string "status" @@ -894,7 +926,16 @@ t.text "ai_summary" t.string "document_type" t.jsonb "extracted_data" + t.uuid "import_session_id" + t.integer "sequence" + t.string "client_chunk_id", limit: 255 + t.string "checksum", limit: 64 + t.jsonb "summary", default: {}, null: false + t.jsonb "error_details", default: {}, null: false t.index ["family_id"], name: "index_imports_on_family_id" + t.index ["import_session_id", "client_chunk_id"], name: "idx_imports_on_session_client_chunk", unique: true, where: "((import_session_id IS NOT NULL) AND (client_chunk_id IS NOT NULL))" + t.index ["import_session_id", "sequence"], name: "idx_imports_on_session_sequence", unique: true, where: "((import_session_id IS NOT NULL) AND (sequence IS NOT NULL))" + t.index ["import_session_id"], name: "index_imports_on_import_session_id" end create_table "indexa_capital_accounts", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -1897,7 +1938,11 @@ add_foreign_key "impersonation_sessions", "users", column: "impersonated_id" add_foreign_key "impersonation_sessions", "users", column: "impersonator_id" add_foreign_key "import_rows", "imports" + add_foreign_key "import_sessions", "families" + add_foreign_key "import_source_mappings", "families" + add_foreign_key "import_source_mappings", "import_sessions", column: ["import_session_id", "family_id"], primary_key: ["id", "family_id"], name: "fk_import_source_mappings_session_family", on_delete: :cascade add_foreign_key "imports", "families" + add_foreign_key "imports", "import_sessions", on_delete: :cascade add_foreign_key "indexa_capital_accounts", "indexa_capital_items" add_foreign_key "indexa_capital_items", "families" add_foreign_key "invitations", "families" diff --git a/docs/api/openapi.yaml b/docs/api/openapi.yaml index ebc4c4425..7fa393dd9 100644 --- a/docs/api/openapi.yaml +++ b/docs/api/openapi.yaml @@ -2015,6 +2015,115 @@ components: properties: data: "$ref": "#/components/schemas/ImportDetail" + ImportSessionChunk: + type: object + required: + - id + - sequence + - status + - rows_count + - summary + - created_at + - updated_at + properties: + id: + type: string + format: uuid + sequence: + type: integer + minimum: 1 + client_chunk_id: + type: string + nullable: true + status: + type: string + enum: + - pending + - importing + - complete + - failed + rows_count: + type: integer + minimum: 0 + summary: + type: object + additionalProperties: + type: object + additionalProperties: + type: integer + error: + type: object + nullable: true + additionalProperties: true + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + ImportSession: + type: object + required: + - id + - type + - status + - chunks_count + - summary + - chunks + - created_at + - updated_at + properties: + id: + type: string + format: uuid + type: + type: string + enum: + - SureImport + status: + type: string + enum: + - pending + - importing + - complete + - failed + client_session_id: + type: string + nullable: true + expected_chunks: + type: integer + nullable: true + minimum: 1 + chunks_count: + type: integer + minimum: 0 + summary: + type: object + additionalProperties: + type: object + additionalProperties: + type: integer + error: + type: object + nullable: true + additionalProperties: true + chunks: + type: array + items: + "$ref": "#/components/schemas/ImportSessionChunk" + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + ImportSessionResponse: + type: object + required: + - data + properties: + data: + "$ref": "#/components/schemas/ImportSession" ProviderConnectionInstitution: type: object required: @@ -4437,6 +4546,261 @@ paths: application/json: schema: "$ref": "#/components/schemas/ErrorResponse" + "/api/v1/import_sessions": + post: + summary: Create import session + description: Create or idempotently retrieve a multi-file SureImport session + keyed by client_session_id. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + parameters: [] + responses: + '201': + description: import session created + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: client session conflict + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: validation error + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + requestBody: + content: + application/json: + schema: + type: object + properties: + type: + type: string + enum: + - SureImport + description: Import session type. Only SureImport is supported. + client_session_id: + type: string + nullable: true + description: Client-provided idempotency key for the full import + session. + expected_chunks: + type: integer + minimum: 1 + nullable: true + description: Expected number of ordered chunks before publish is + allowed. + "/api/v1/import_sessions/{id}": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + get: + summary: Retrieve import session + description: Retrieve import session status, chunk status, per-entity summary + counts, and safe error details. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + responses: + '200': + description: import session retrieved + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + "/api/v1/import_sessions/{id}/chunks": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + post: + summary: Upload import session chunk + description: Attach an ordered Sure NDJSON chunk to an import session. Chunks + are idempotent by sequence and client_chunk_id with content verification. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + parameters: [] + responses: + '201': + description: chunk uploaded + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: chunk conflict + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: missing or invalid content + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + requestBody: + content: + application/json: + schema: + type: object + properties: + sequence: + type: integer + minimum: 1 + description: One-based chunk sequence. Earlier dependency chunks + must have lower sequence numbers. + client_chunk_id: + type: string + nullable: true + description: Client-provided idempotency key for this chunk. + raw_file_content: + type: string + description: Raw Sure NDJSON content. Each chunk is limited to 10MB. + file: + type: string + format: binary + description: Multipart Sure NDJSON file upload. Each chunk is limited + to 10MB. + multipart/form-data: + schema: + type: object + properties: + sequence: + type: integer + minimum: 1 + description: One-based chunk sequence. Earlier dependency chunks + must have lower sequence numbers. + client_chunk_id: + type: string + nullable: true + description: Client-provided idempotency key for this chunk. + raw_file_content: + type: string + description: Raw Sure NDJSON content. Each chunk is limited to 10MB. + file: + type: string + format: binary + description: Multipart Sure NDJSON file upload. Each chunk is limited + to 10MB. + "/api/v1/import_sessions/{id}/publish": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + post: + summary: Publish import session + description: Queue ordered chunk processing for a SureImport session. Later + chunks can reference source IDs mapped by earlier chunks. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + responses: + '202': + description: import session publish queued + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: max_row_count_exceeded + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: missing expected chunks + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" "/api/v1/imports": get: summary: List imports diff --git a/spec/requests/api/v1/import_sessions_spec.rb b/spec/requests/api/v1/import_sessions_spec.rb new file mode 100644 index 000000000..b8308a32e --- /dev/null +++ b/spec/requests/api/v1/import_sessions_spec.rb @@ -0,0 +1,381 @@ +# frozen_string_literal: true + +require 'swagger_helper' + +RSpec.describe 'API V1 Import Sessions', type: :request do + let(:user) { users(:empty) } + let(:family) { user.family } + + let(:api_key) { api_keys(:active_key) } + let(:api_key_without_write_scope) { api_keys(:one) } + let(:api_key_without_read_scope) { api_keys(:expired_key) } + + let(:'X-Api-Key') { api_key.plain_key } + + let(:entity_ndjson) do + { + type: 'Account', + data: { + id: 'docs-account-1', + name: 'Docs Checking', + balance: '100.00', + currency: 'USD', + accountable_type: 'Depository' + } + }.to_json + end + + let(:transaction_ndjson) do + { + type: 'Transaction', + data: { + id: 'docs-transaction-1', + account_id: 'docs-account-1', + date: '2024-01-15', + amount: '-12.34', + currency: 'USD', + name: 'Docs Transaction' + } + }.to_json + end + + path '/api/v1/import_sessions' do + post 'Create import session' do + description 'Create or idempotently retrieve a multi-file SureImport session keyed by client_session_id.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + consumes 'application/json' + produces 'application/json' + + parameter name: :body, in: :body, required: false, schema: { + type: :object, + properties: { + type: { + type: :string, + enum: %w[SureImport], + description: 'Import session type. Only SureImport is supported.' + }, + client_session_id: { + type: :string, + nullable: true, + description: 'Client-provided idempotency key for the full import session.' + }, + expected_chunks: { + type: :integer, + minimum: 1, + nullable: true, + description: 'Expected number of ordered chunks before publish is allowed.' + } + } + } + + response '201', 'import session created' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + let(:body) do + { + type: 'SureImport', + client_session_id: 'docs-session-1', + expected_chunks: 2 + } + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + let(:body) { { type: 'SureImport' } } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + let(:body) { { type: 'SureImport' } } + + run_test! + end + + response '409', 'client session conflict' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + family.import_sessions.create!( + client_session_id: 'docs-session-conflict', + expected_chunks: 1 + ) + end + + let(:body) do + { + type: 'SureImport', + client_session_id: 'docs-session-conflict', + expected_chunks: 2 + } + end + + run_test! + end + + response '422', 'validation error' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:body) { { type: 'TransactionImport' } } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 1) } + + get 'Retrieve import session' do + description 'Retrieve import session status, chunk status, per-entity summary counts, and safe error details.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + produces 'application/json' + + let(:id) { import_session.id } + + response '200', 'import session retrieved' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_read_scope.plain_key } + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}/chunks' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 2) } + let(:id) { import_session.id } + + post 'Upload import session chunk' do + description 'Attach an ordered Sure NDJSON chunk to an import session. Chunks are idempotent by sequence and client_chunk_id with content verification.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + consumes 'application/json', 'multipart/form-data' + produces 'application/json' + + parameter name: :body, in: :body, required: false, schema: { + type: :object, + properties: { + sequence: { + type: :integer, + minimum: 1, + description: 'One-based chunk sequence. Earlier dependency chunks must have lower sequence numbers.' + }, + client_chunk_id: { + type: :string, + nullable: true, + description: 'Client-provided idempotency key for this chunk.' + }, + raw_file_content: { + type: :string, + description: 'Raw Sure NDJSON content. Each chunk is limited to 10MB.' + }, + file: { + type: :string, + format: :binary, + description: 'Multipart Sure NDJSON file upload. Each chunk is limited to 10MB.' + } + } + } + + response '201', 'chunk uploaded' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + let(:body) do + { + sequence: 1, + client_chunk_id: 'docs-entities', + raw_file_content: entity_ndjson + } + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '409', 'chunk conflict' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + let(:body) do + { + sequence: 1, + client_chunk_id: 'docs-entities', + raw_file_content: transaction_ndjson + } + end + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '422', 'missing or invalid content' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:body) { { sequence: 1 } } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}/publish' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 1) } + let(:id) { import_session.id } + + post 'Publish import session' do + description 'Queue ordered chunk processing for a SureImport session. Later chunks can reference source IDs mapped by earlier chunks.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + produces 'application/json' + + response '202', 'import session publish queued' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + + run_test! + end + + response '422', 'max_row_count_exceeded' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + import_session.imports.update_all(rows_count: SureImport.max_row_count + 1) + end + + run_test! + end + + response '409', 'missing expected chunks' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 2) } + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + + run_test! + end + end + end +end diff --git a/spec/swagger_helper.rb b/spec/swagger_helper.rb index d27ad818f..e7ca61e79 100644 --- a/spec/swagger_helper.rb +++ b/spec/swagger_helper.rb @@ -1118,6 +1118,68 @@ data: { '$ref' => '#/components/schemas/ImportDetail' } } }, + ImportSessionChunk: { + type: :object, + required: %w[id sequence status rows_count summary created_at updated_at], + properties: { + id: { type: :string, format: :uuid }, + sequence: { type: :integer, minimum: 1 }, + client_chunk_id: { type: :string, nullable: true }, + status: { type: :string, enum: %w[pending importing complete failed] }, + rows_count: { type: :integer, minimum: 0 }, + summary: { + type: :object, + additionalProperties: { + type: :object, + additionalProperties: { type: :integer } + } + }, + error: { + type: :object, + nullable: true, + additionalProperties: true + }, + created_at: { type: :string, format: :'date-time' }, + updated_at: { type: :string, format: :'date-time' } + } + }, + ImportSession: { + type: :object, + required: %w[id type status chunks_count summary chunks created_at updated_at], + properties: { + id: { type: :string, format: :uuid }, + type: { type: :string, enum: %w[SureImport] }, + status: { type: :string, enum: %w[pending importing complete failed] }, + client_session_id: { type: :string, nullable: true }, + expected_chunks: { type: :integer, nullable: true, minimum: 1 }, + chunks_count: { type: :integer, minimum: 0 }, + summary: { + type: :object, + additionalProperties: { + type: :object, + additionalProperties: { type: :integer } + } + }, + error: { + type: :object, + nullable: true, + additionalProperties: true + }, + chunks: { + type: :array, + items: { '$ref' => '#/components/schemas/ImportSessionChunk' } + }, + created_at: { type: :string, format: :'date-time' }, + updated_at: { type: :string, format: :'date-time' } + } + }, + ImportSessionResponse: { + type: :object, + required: %w[data], + properties: { + data: { '$ref' => '#/components/schemas/ImportSession' } + } + }, ProviderConnectionInstitution: { type: :object, required: %w[name], diff --git a/test/controllers/api/v1/import_sessions_controller_test.rb b/test/controllers/api/v1/import_sessions_controller_test.rb new file mode 100644 index 000000000..c3bfe9e16 --- /dev/null +++ b/test/controllers/api/v1/import_sessions_controller_test.rb @@ -0,0 +1,260 @@ +# frozen_string_literal: true + +require "test_helper" + +class Api::V1::ImportSessionsControllerTest < ActionDispatch::IntegrationTest + include ActiveJob::TestHelper + + setup do + @user = users(:family_admin) + @family = @user.family + @api_key = api_keys(:active_key) + @read_only_api_key = api_keys(:one) + + Redis.new.del("api_rate_limit:#{@api_key.id}") + Redis.new.del("api_rate_limit:#{@read_only_api_key.id}") + end + + test "creates an idempotent Sure import session" do + assert_difference("ImportSession.count", 1) do + post api_v1_import_sessions_url, + params: { + type: "SureImport", + client_session_id: "client-session-1", + expected_chunks: 2 + }, + headers: api_headers(@api_key) + end + + assert_response :created + first_id = JSON.parse(response.body).dig("data", "id") + + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { + type: "SureImport", + client_session_id: "client-session-1", + expected_chunks: 2 + }, + headers: api_headers(@api_key) + end + + assert_response :created + assert_equal first_id, JSON.parse(response.body).dig("data", "id") + end + + test "rejects unsupported import session types" do + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { type: "TransactionImport" }, + headers: api_headers(@api_key) + end + + assert_response :unprocessable_entity + assert_equal "validation_failed", JSON.parse(response.body)["error"] + end + + test "requires authentication for session creation" do + post api_v1_import_sessions_url, params: { type: "SureImport" } + + assert_response :unauthorized + assert_equal "unauthorized", JSON.parse(response.body)["error"] + end + + test "uploads ordered chunks and publishes a full-fidelity transaction import" do + session = build_import_session + + post chunks_api_v1_import_session_url(session), + params: { + sequence: 1, + client_chunk_id: "entities", + raw_file_content: build_ndjson(entity_records) + }, + headers: api_headers(@api_key) + + assert_response :created + assert_equal 1, JSON.parse(response.body).dig("data", "chunks_count") + + post chunks_api_v1_import_session_url(session), + params: { + sequence: 2, + client_chunk_id: "transactions", + raw_file_content: build_ndjson(transaction_records) + }, + headers: api_headers(@api_key) + + assert_response :created + + perform_enqueued_jobs do + post publish_api_v1_import_session_url(session), headers: api_headers(@api_key) + end + + assert_response :accepted + session.reload + assert session.complete? + assert_equal 1, session.summary.dig("transactions", "created") + + entry = @family.accounts.find_by!(name: "API Session Checking").entries.find_by!(name: "API Grocery Run") + transaction = entry.entryable + assert_equal "API Groceries", transaction.category.name + assert_equal "API Market", transaction.merchant.name + assert_equal [ "API Weekly" ], transaction.tags.map(&:name) + end + + test "rejects replayed chunk with different content" do + session = build_import_session + params = { + sequence: 1, + client_chunk_id: "entities", + raw_file_content: build_ndjson(entity_records) + } + + post chunks_api_v1_import_session_url(session), params: params, headers: api_headers(@api_key) + assert_response :created + + post chunks_api_v1_import_session_url(session), + params: params.merge(raw_file_content: build_ndjson(transaction_records)), + headers: api_headers(@api_key) + + assert_response :conflict + assert_equal "import_session_conflict", JSON.parse(response.body)["error"] + end + + test "requires chunk sequence" do + session = build_import_session + + post chunks_api_v1_import_session_url(session), + params: { raw_file_content: build_ndjson(entity_records) }, + headers: api_headers(@api_key) + + assert_response :bad_request + assert_equal "bad_request", JSON.parse(response.body)["error"] + end + + test "shows import session with read scope" do + session = build_import_session + + get api_v1_import_session_url(session), headers: api_headers(@read_only_api_key) + + assert_response :success + data = JSON.parse(response.body)["data"] + assert_equal session.id, data["id"] + assert_equal "SureImport", data["type"] + end + + test "shows chunks in sequence order" do + session = build_import_session + session.imports.create!( + family: @family, + type: "SureImport", + sequence: 2, + checksum: Digest::SHA256.hexdigest("two") + ) + session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + checksum: Digest::SHA256.hexdigest("one") + ) + + get api_v1_import_session_url(session), headers: api_headers(@api_key) + + assert_response :success + assert_equal [ 1, 2 ], JSON.parse(response.body).dig("data", "chunks").map { |chunk| chunk["sequence"] } + end + + test "requires write scope for session mutation" do + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { type: "SureImport" }, + headers: api_headers(@read_only_api_key) + end + + assert_response :forbidden + assert_equal "insufficient_scope", JSON.parse(response.body)["error"] + end + + test "rejects publishing a session with no chunks" do + session = @family.import_sessions.create! + + post publish_api_v1_import_session_url(session), headers: api_headers(@api_key) + + assert_response :conflict + assert_equal "import_session_conflict", JSON.parse(response.body)["error"] + end + + test "does not expose another family's import session" do + other_family = Family.create!(name: "Other Family", currency: "USD", locale: "en") + other_session = other_family.import_sessions.create! + + get api_v1_import_session_url(other_session), headers: api_headers(@api_key) + + assert_response :not_found + end + + private + def build_import_session + @family.import_sessions.create!(expected_chunks: 2) + end + + def entity_records + [ + { + type: "Account", + data: { + id: "api-acct-1", + name: "API Session Checking", + balance: "100.00", + currency: "USD", + accountable_type: "Depository" + } + }, + { + type: "Category", + data: { + id: "api-cat-1", + name: "API Groceries", + color: "#407706", + classification: "expense" + } + }, + { + type: "Merchant", + data: { + id: "api-merchant-1", + name: "API Market" + } + }, + { + type: "Tag", + data: { + id: "api-tag-1", + name: "API Weekly" + } + } + ] + end + + def transaction_records + [ + { + type: "Transaction", + data: { + id: "api-txn-1", + account_id: "api-acct-1", + category_id: "api-cat-1", + merchant_id: "api-merchant-1", + tag_ids: [ "api-tag-1" ], + date: "2024-01-15", + amount: "-12.34", + currency: "USD", + name: "API Grocery Run" + } + } + ] + end + + def build_ndjson(records) + records.map(&:to_json).join("\n") + end +end diff --git a/test/jobs/import_session_job_test.rb b/test/jobs/import_session_job_test.rb new file mode 100644 index 000000000..9b9849496 --- /dev/null +++ b/test/jobs/import_session_job_test.rb @@ -0,0 +1,19 @@ +require "test_helper" + +class ImportSessionJobTest < ActiveJob::TestCase + test "raises when import session is missing" do + error = assert_raises(ArgumentError) do + ImportSessionJob.perform_now(nil) + end + + assert_equal "ImportSessionJob requires an import_session", error.message + end + + test "publishes import session" do + import_session = families(:empty).import_sessions.create! + + import_session.expects(:publish).once + + ImportSessionJob.perform_now(import_session) + end +end diff --git a/test/models/family/data_importer_test.rb b/test/models/family/data_importer_test.rb index c6c75dda1..16d4068a0 100644 --- a/test/models/family/data_importer_test.rb +++ b/test/models/family/data_importer_test.rb @@ -125,6 +125,26 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal 1, balance.flows_factor end + test "counts skipped balance rows with blank account references once" do + ndjson = build_ndjson([ + { + type: "Balance", + data: { + id: "balance-1", + account_id: "", + date: "2024-01-31", + balance: "1200.00", + currency: "USD" + } + } + ]) + + result = Family::DataImporter.new(@family, ndjson).import! + + assert_equal 1, result.dig(:summary, "balances", "skipped") + assert_not Balance.exists?(date: Date.iso8601("2024-01-31"), currency: "USD", balance: BigDecimal("1200.00")) + end + test "imports duplicate raw balance records idempotently by account date and currency" do balance_record = { type: "Balance", @@ -406,6 +426,23 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal "#FF0000", tag.color end + test "imports tags with deterministic fallback color when source omits color" do + ndjson = build_ndjson([ + { + type: "Tag", + data: { + id: "tag-1", + name: "Important" + } + } + ]) + + Family::DataImporter.new(@family, ndjson).import! + + tag = @family.tags.find_by!(name: "Important") + assert_equal Tag::COLORS.first, tag.color + end + test "imports merchants" do ndjson = build_ndjson([ { diff --git a/test/models/import_session_test.rb b/test/models/import_session_test.rb new file mode 100644 index 000000000..396557554 --- /dev/null +++ b/test/models/import_session_test.rb @@ -0,0 +1,509 @@ +require "test_helper" + +class ImportSessionTest < ActiveSupport::TestCase + setup do + @family = families(:empty) + end + + test "publishes ordered chunks with source mappings across files" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.complete? + account = @family.accounts.find_by!(name: "Session Checking") + entry = account.entries.find_by!(name: "Grocery Run") + transaction = entry.entryable + + assert_equal "Groceries", transaction.category.name + assert_equal "Market", transaction.merchant.name + assert_equal [ "Weekly" ], transaction.tags.map(&:name) + assert_equal "sure_import_session:#{session.id}", entry.source + assert_equal "Transaction:txn-1", entry.external_id + assert_equal 1, session.summary.dig("transactions", "created") + assert_equal 1, session.source_mappings.where(source_type: "Account", source_id: "acct-1").count + end + + test "publishing the same complete session does not duplicate imported transactions" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert_no_difference("Entry.count") do + session.publish + end + end + + test "republishing failed session skips complete chunks and retries failed chunks" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + complete_chunk = session.imports.find_by!(sequence: 1) + failed_chunk = session.imports.find_by!(sequence: 2) + complete_chunk.update!(status: :complete, summary: { "accounts" => { "created" => 1 } }, error_details: {}) + failed_chunk.update!(status: :failed, error: "transient failure", error_details: { "code" => "import_failed" }) + session.update!( + status: :failed, + summary: complete_chunk.summary, + error_details: { "code" => "import_failed", "message" => "transient failure" } + ) + processed_sequences = [] + + importer_factory = lambda do |_family, _content, import_session:, import:| + processed_sequences << import.sequence + flunk "completed chunk was reprocessed" if import.sequence == 1 + assert_equal session, import_session + + Object.new.tap do |importer| + importer.define_singleton_method(:import!) do + { + accounts: [], + entries: [], + summary: { "transactions" => { "created" => 1 } } + } + end + end + end + + Family::DataImporter.stub(:new, importer_factory) do + session.publish + end + + assert_equal [ 2 ], processed_sequences + assert complete_chunk.reload.complete? + assert failed_chunk.reload.complete? + assert session.reload.complete? + assert_equal 1, session.summary.dig("accounts", "created") + assert_equal 1, session.summary.dig("transactions", "created") + end + + test "publish keeps session complete and records safe error when family sync enqueue fails" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + Family.any_instance.stubs(:sync_later).raises(StandardError, "redis://secret.local/0") + session.publish + + assert session.reload.complete? + assert_equal "family_sync_enqueue_failed", session.error_details["code"] + assert_equal "Family sync could not be queued after import completion.", session.error_details["message"] + assert_no_match(/secret/, session.error_details.to_json) + end + + test "publish later requires the exact expected chunk sequences" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 3, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + error = assert_raises(ImportSession::ConflictError) do + session.publish_later + end + + expected_message = "import session chunks do not match expected sequences " \ + "(missing sequences: 2; unexpected sequences: 3)" + assert_equal expected_message, error.message + assert session.reload.pending? + end + + test "publish later restores status and records enqueue failures" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + ImportSessionJob.stub(:perform_later, ->(_import_session) { raise StandardError, "queue offline" }) do + error = assert_raises(StandardError) do + session.publish_later + end + + assert_equal "queue offline", error.message + end + + assert session.reload.pending? + assert_equal "import_enqueue_failed", session.error_details["code"] + assert_equal "Import session could not be queued.", session.error_details["message"] + end + + test "fails loudly when a later chunk references a missing source id" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + chunk = session.imports.first + assert chunk.failed? + assert_equal "missing_source_reference", chunk.error_details["code"] + assert_equal "acct-1", chunk.error_details["source_id"] + assert_equal 0, @family.entries.count + end + + test "chunk upload is idempotent by sequence and checksum" do + session = @family.import_sessions.create! + content = build_ndjson(entity_records) + + first = session.attach_chunk!( + sequence: 1, + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + second = session.attach_chunk!( + sequence: 1, + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal first.id, second.id + assert_raises(ImportSession::ConflictError) do + session.attach_chunk!( + sequence: 1, + content: build_ndjson(transaction_records), + filename: "different.ndjson", + content_type: "application/x-ndjson" + ) + end + end + + test "chunk upload repairs incomplete existing chunk before accepting retry" do + session = @family.import_sessions.create! + content = build_ndjson(transaction_records) + chunk = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + + result = session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal chunk.id, result.id + assert result.reload.ndjson_file.attached? + assert_equal 1, result.rows_count + end + + test "chunk upload resyncs attached existing chunk before accepting retry" do + session = @family.import_sessions.create! + content = build_ndjson(transaction_records) + chunk = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + chunk.ndjson_file.attach( + io: StringIO.new(content), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + result = session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal chunk.id, result.id + assert_equal 1, result.rows_count + end + + test "chunk upload rejects inconsistent sequence and client chunk keys" do + session = @family.import_sessions.create! + session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + error = assert_raises(ImportSession::ConflictError) do + session.attach_chunk!( + sequence: 1, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + end + + assert_equal "sequence and client_chunk_id refer to different chunks", error.message + end + + test "chunk upload treats duplicate insert races as idempotent retries" do + session = @family.import_sessions.create! + content = build_ndjson(entity_records) + existing = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + existing.ndjson_file.attach( + io: StringIO.new(content), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + existing.sync_ndjson_rows_count! + lookup_count = 0 + + session.stub(:existing_chunk_for!, ->(**) { + lookup_count += 1 + lookup_count == 1 ? nil : existing + }) do + session.stub(:create_chunk!, ->(**) { raise ActiveRecord::RecordNotUnique, "duplicate chunk" }) do + assert_equal existing, session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + end + end + + assert_equal 2, lookup_count + end + + test "client session creation treats duplicate insert races as idempotent retries" do + existing = @family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2) + ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + session = ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 2 + ) + + assert_equal existing, session + end + + test "client session creation race backfills missing expected chunks" do + existing = @family.import_sessions.create!(client_session_id: "race-session") + racing_session = @family.import_sessions.build(client_session_id: "race-session") + racing_session.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + @family.import_sessions.stub(:find_or_initialize_by, racing_session) do + session = ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 2 + ) + + assert_equal existing, session + end + assert_equal 2, existing.reload.expected_chunks + end + + test "client session creation race preserves expected chunks conflict" do + @family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2) + ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + error = assert_raises(ImportSession::ConflictError) do + ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 3 + ) + end + + assert_equal "client_session_id already exists with a different expected_chunks value", error.message + end + + test "session mode rejects rule records without source ids" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson([ + { + type: "Rule", + data: { + name: "Missing Source Rule", + resource_type: "transaction", + active: true + } + } + ]), + filename: "rules.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + assert_equal 0, @family.rules.count + assert_equal "missing_source_reference", session.imports.first.error_details["code"] + assert_equal "Rule", session.imports.first.error_details["record_type"] + assert_equal "(blank)", session.imports.first.error_details["source_id"] + end + + test "client idempotency keys are bounded before indexed writes" do + session = @family.import_sessions.build(client_session_id: "x" * 256) + + assert_not session.valid? + assert_includes session.errors[:client_session_id], "is too long (maximum is 255 characters)" + + import = @family.imports.build(type: "SureImport", client_chunk_id: "x" * 256) + + assert_not import.valid? + assert_includes import.errors[:client_chunk_id], "is too long (maximum is 255 characters)" + + mapping = @family.import_source_mappings.build( + import_session: @family.import_sessions.build, + source_type: "x" * 65, + source_id: "x" * 256, + target_type: "Account", + target_id: SecureRandom.uuid + ) + + assert_not mapping.valid? + assert_includes mapping.errors[:source_type], "is too long (maximum is 64 characters)" + assert_includes mapping.errors[:source_id], "is too long (maximum is 255 characters)" + end + + private + def entity_records + [ + { + type: "Account", + data: { + id: "acct-1", + name: "Session Checking", + balance: "100.00", + currency: "USD", + accountable_type: "Depository", + accountable: { subtype: "checking" } + } + }, + { + type: "Category", + data: { + id: "cat-1", + name: "Groceries", + color: "#407706", + classification: "expense" + } + }, + { + type: "Merchant", + data: { + id: "merchant-1", + name: "Market", + color: "#111111" + } + }, + { + type: "Tag", + data: { + id: "tag-1", + name: "Weekly", + color: "#222222" + } + } + ] + end + + def transaction_records + [ + { + type: "Transaction", + data: { + id: "txn-1", + account_id: "acct-1", + category_id: "cat-1", + merchant_id: "merchant-1", + tag_ids: [ "tag-1" ], + date: "2024-01-15", + amount: "-12.34", + currency: "USD", + name: "Grocery Run" + } + } + ] + end + + def build_ndjson(records) + records.map(&:to_json).join("\n") + end +end