Skip to content
Merged
4 changes: 4 additions & 0 deletions app/controllers/pending_duplicate_merges_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def create
else
redirect_back_or_to transactions_path, alert: "Could not merge transactions"
end
rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotDestroyed,
ActiveRecord::Deadlocked, ActiveRecord::LockWaitTimeout => e
Rails.logger.error("Failed to manually merge pending transaction: #{e.message}")
redirect_back_or_to transactions_path, alert: t("transactions.merge_duplicate.failure")
end

private
Expand Down
4 changes: 3 additions & 1 deletion app/controllers/transactions_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def merge_duplicate
end

redirect_to transactions_path
rescue ActiveRecord::RecordNotDestroyed, ActiveRecord::RecordInvalid => e
rescue ActiveRecord::RecordNotFound, ActiveRecord::RecordInvalid,
ActiveRecord::RecordNotDestroyed, ActiveRecord::Deadlocked,
ActiveRecord::LockWaitTimeout => e
Rails.logger.error("Failed to merge duplicate transaction #{params[:id]}: #{e.message}")
flash[:alert] = t("transactions.merge_duplicate.failure")
redirect_to transactions_path
Expand Down
31 changes: 31 additions & 0 deletions app/models/enable_banking_account/transactions/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,45 @@ def process
Rails.logger.info "EnableBankingAccount::Transactions::Processor - Processing #{total_count} transactions for enable_banking_account #{enable_banking_account.id}"

imported_count = 0
skipped_count = 0
failed_count = 0
errors = []

shared_adapter = if enable_banking_account.current_account.present?
Account::ProviderImportAdapter.new(enable_banking_account.current_account)
end

# Pre-fetch external_ids that were manually merged and must not be re-imported.
# One query per sync; O(1) Set lookup per transaction — avoids N+1.
# manual_merge is stored as an Array (current) or Hash (legacy); both are handled.
excluded_ids = if enable_banking_account.current_account
Transaction.joins(:entry)
.where(entries: { account_id: enable_banking_account.current_account.id })
.where("transactions.extra ? 'manual_merge'")
.pluck(:extra)
.flat_map { |extra|
mm = extra["manual_merge"]
case mm
when Array then mm.filter_map { |r| r["merged_from_external_id"] }
when Hash then [ mm["merged_from_external_id"] ].compact
else []
end
}
.to_set
else
Set.new
end

enable_banking_account.raw_transactions_payload.each_with_index do |transaction_data, index|
begin
ext_id = EnableBankingEntry::Processor.compute_external_id(transaction_data)

if ext_id && excluded_ids.include?(ext_id)
Rails.logger.info("EnableBankingAccount::Transactions::Processor - Skipping re-import of manually merged pending transaction: #{ext_id}")
skipped_count += 1
next
end

result = EnableBankingEntry::Processor.new(
transaction_data,
enable_banking_account: enable_banking_account,
Expand Down Expand Up @@ -56,6 +86,7 @@ def process
success: failed_count == 0,
total: total_count,
imported: imported_count,
skipped: skipped_count,
failed: failed_count,
errors: errors
}
Expand Down
22 changes: 16 additions & 6 deletions app/models/enable_banking_entry/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,25 @@ class EnableBankingEntry::Processor
# transaction_amount: { amount, currency },
# creditor_name, debtor_name, remittance_information, ...
# }
def self.compute_external_id(raw_transaction_data)
data = raw_transaction_data.with_indifferent_access
id = data[:transaction_id].presence || data[:entry_reference].presence
id ? "enable_banking_#{id}" : nil
end
Comment thread
CrossDrain marked this conversation as resolved.

def initialize(enable_banking_transaction, enable_banking_account:, import_adapter: nil)
@enable_banking_transaction = enable_banking_transaction
@enable_banking_account = enable_banking_account
@import_adapter = import_adapter
end

def process
# Cache a safe diagnostic id upfront — used in all logging paths so rescue
# blocks never call the potentially-raising private external_id method.
safe_id = self.class.compute_external_id(@enable_banking_transaction) || "unknown"

unless account.present?
Rails.logger.warn "EnableBankingEntry::Processor - No linked account for enable_banking_account #{enable_banking_account.id}, skipping transaction #{external_id}"
Rails.logger.warn "EnableBankingEntry::Processor - No linked account for enable_banking_account #{enable_banking_account.id}, skipping transaction #{safe_id}"
return nil
end

Expand All @@ -35,13 +45,13 @@ def process
extra: extra
)
rescue ArgumentError => e
Rails.logger.error "EnableBankingEntry::Processor - Validation error for transaction #{external_id}: #{e.message}"
Rails.logger.error "EnableBankingEntry::Processor - Validation error for transaction #{safe_id}: #{e.message}"
raise
rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotSaved => e
Rails.logger.error "EnableBankingEntry::Processor - Failed to save transaction #{external_id}: #{e.message}"
Rails.logger.error "EnableBankingEntry::Processor - Failed to save transaction #{safe_id}: #{e.message}"
raise StandardError.new("Failed to import transaction: #{e.message}")
rescue => e
Rails.logger.error "EnableBankingEntry::Processor - Unexpected error processing transaction #{external_id}: #{e.class} - #{e.message}"
Rails.logger.error "EnableBankingEntry::Processor - Unexpected error processing transaction #{safe_id}: #{e.class} - #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise StandardError.new("Unexpected error importing transaction: #{e.message}")
end
Expand All @@ -64,9 +74,9 @@ def data
end

def external_id
id = data[:transaction_id].presence || data[:entry_reference].presence
id = self.class.compute_external_id(data)
raise ArgumentError, "Enable Banking transaction missing required field 'transaction_id'" unless id
"enable_banking_#{id}"
id
end

def name
Expand Down
42 changes: 18 additions & 24 deletions app/models/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,23 @@ class Entry < ApplicationRecord
# Pending transaction scopes - check Transaction.extra for provider pending flags
# Works with any provider that stores pending status in extra["provider_name"]["pending"]
scope :pending, -> {
conditions = Transaction::PENDING_PROVIDERS.map { |p| "(transactions.extra -> '#{p}' ->> 'pending')::boolean = true" }
joins("INNER JOIN transactions ON transactions.id = entries.entryable_id AND entries.entryable_type = 'Transaction'")
.where(<<~SQL.squish)
(transactions.extra -> 'simplefin' ->> 'pending')::boolean = true
OR (transactions.extra -> 'plaid' ->> 'pending')::boolean = true
OR (transactions.extra -> 'lunchflow' ->> 'pending')::boolean = true
SQL
.where(conditions.join(" OR "))
}

scope :excluding_pending, -> {
# For non-Transaction entries (Trade, Valuation), always include
# For Transaction entries, exclude if pending flag is true
# For Transaction entries, exclude if any provider marks it pending
inner = Transaction::PENDING_PROVIDERS
.map { |p| "(t.extra -> '#{p}' ->> 'pending')::boolean = true" }
.join(" OR ")
where(<<~SQL.squish)
entries.entryable_type != 'Transaction'
OR NOT EXISTS (
SELECT 1 FROM transactions t
WHERE t.id = entries.entryable_id
AND (
(t.extra -> 'simplefin' ->> 'pending')::boolean = true
OR (t.extra -> 'plaid' ->> 'pending')::boolean = true
OR (t.extra -> 'lunchflow' ->> 'pending')::boolean = true
)
AND (#{inner})
)
SQL
}
Expand Down Expand Up @@ -148,6 +144,10 @@ def self.auto_exclude_stale_pending(account:, days: 8)
def self.reconcile_pending_duplicates(account: nil, dry_run: false, date_window: 8, amount_tolerance: 0.25)
stats = { checked: 0, reconciled: 0, details: [] }

not_pending_sql = Transaction::PENDING_PROVIDERS
.map { |p| "(transactions.extra -> '#{p}' ->> 'pending')::boolean IS NOT TRUE" }
.join(" AND ")

# Get pending entries to check
scope = Entry.pending.where(excluded: false)
scope = scope.where(account: account) if account
Expand All @@ -164,11 +164,7 @@ def self.reconcile_pending_duplicates(account: nil, dry_run: false, date_window:
.where(currency: pending_entry.currency)
.where(amount: pending_entry.amount)
.where(date: pending_entry.date..(pending_entry.date + date_window.days)) # Posted must be ON or AFTER pending date
.where(<<~SQL.squish)
(transactions.extra -> 'simplefin' ->> 'pending')::boolean IS NOT TRUE
AND (transactions.extra -> 'plaid' ->> 'pending')::boolean IS NOT TRUE
AND (transactions.extra -> 'lunchflow' ->> 'pending')::boolean IS NOT TRUE
SQL
.where(not_pending_sql)
.limit(2) # Only need to know if 0, 1, or 2+ candidates
.to_a # Load limited records to avoid COUNT(*) on .size

Expand Down Expand Up @@ -211,11 +207,7 @@ def self.reconcile_pending_duplicates(account: nil, dry_run: false, date_window:
.where(currency: pending_entry.currency)
.where(date: pending_entry.date..(pending_entry.date + fuzzy_date_window.days)) # Posted ON or AFTER pending
.where("ABS(entries.amount) BETWEEN ? AND ?", min_amount, max_amount)
.where(<<~SQL.squish)
(transactions.extra -> 'simplefin' ->> 'pending')::boolean IS NOT TRUE
AND (transactions.extra -> 'plaid' ->> 'pending')::boolean IS NOT TRUE
AND (transactions.extra -> 'lunchflow' ->> 'pending')::boolean IS NOT TRUE
SQL
.where(not_pending_sql)

# Match by name similarity (first 3 words)
name_words = pending_entry.name.downcase.gsub(/[^a-z0-9\s]/, "").split.first(3).join(" ")
Expand Down Expand Up @@ -253,10 +245,12 @@ def self.reconcile_pending_duplicates(account: nil, dry_run: false, date_window:
pending_transaction.update!(
extra: existing_extra.merge(
"potential_posted_match" => {
"entry_id" => fuzzy_match.id,
"reason" => "fuzzy_amount_match",
"entry_id" => fuzzy_match.id,
"reason" => "fuzzy_amount_match",
"posted_amount" => fuzzy_match.amount.to_s,
"detected_at" => Date.current.to_s
"confidence" => "medium",
"dismissed" => false,
"detected_at" => Date.current.to_s
}
)
)
Expand Down
102 changes: 93 additions & 9 deletions app/models/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def pending?
PENDING_PROVIDERS.any? do |provider|
ActiveModel::Type::Boolean.new.cast(extra_data.dig(provider, "pending"))
end
rescue
rescue StandardError
false
end

Expand Down Expand Up @@ -176,22 +176,106 @@ def potential_duplicate_dismissed?
potential_posted_match_data&.dig("dismissed") == true
end

# Merge this pending transaction with its suggested posted match
# This DELETES the pending entry since the posted version is canonical
# Merge this pending transaction with its suggested posted match.
# The pending entry is destroyed; the posted entry survives with attributes inherited from both sides.
# Attribute inheritance: Date + Category from pending, Name + Merchant from posted (booked).
def merge_with_duplicate!
return false unless pending?
return false unless has_potential_duplicate?

posted_entry = potential_duplicate_entry
return false unless posted_entry

pending_entry_id = entry.id
pending_entry_name = entry.name
pending_entry = entry

# Delete this pending entry completely (no need to keep it around)
entry.destroy!
# Guard: cross-account merges are never valid
if posted_entry.account_id != pending_entry.account_id
Rails.logger.warn("merge_with_duplicate! rejected: posted_entry #{posted_entry.id} belongs to different account than pending entry #{pending_entry.id}")
return false
end

Rails.logger.info("User merged pending entry #{pending_entry_id} (#{pending_entry_name}) with posted entry #{posted_entry.id}")
true
pending_entry_id = pending_entry.id
merge_succeeded = false

ApplicationRecord.transaction(requires_new: true) do
# Row-level locks prevent concurrent merges on the same pair of entries.
# If a concurrent request already destroyed the pending entry, lock! raises
# RecordNotFound — treat that as an idempotent success.
begin
pending_entry.lock!
rescue ActiveRecord::RecordNotFound
Rails.logger.info("Pending entry #{pending_entry_id} already destroyed (concurrent merge), skipping")
return true
end

begin
posted_entry.lock!
rescue ActiveRecord::RecordNotFound
Rails.logger.info("Posted entry #{posted_entry.id} deleted concurrently; aborting merge")
raise ActiveRecord::Rollback
end

# Capture after lock! (which reloads) to guarantee DB-fresh values and avoid
# stale in-memory cached associations (e.g., loaded via touch: true).
external_id = pending_entry.external_id
pending_entry_date = pending_entry.date

# Batch all changes to the surviving posted Transaction into a single update!
# to avoid firing after_save callbacks twice on the same row.
# Lock the Transaction row so concurrent merges into the same posted entry
# cannot race on reading/writing extra (e.g., the manual_merge array).
posted_tx = posted_entry.entryable
posted_tx.lock! if posted_tx.is_a?(Transaction)
if posted_tx.is_a?(Transaction)
tx_attrs = {}

# Merge metadata — always written so the sync engine can skip re-importing.
# Stored as an array so multiple pending entries merged into the same posted
# transaction each preserve their external_id for future sync exclusion.
# Legacy records written as a plain Hash are migrated to a single-element array
# on first append, maintaining backward compatibility.
if external_id.present?
new_record = {
"merged_from_entry_id" => pending_entry_id,
"merged_from_external_id" => external_id,
"merged_at" => Time.current.iso8601,
"source" => pending_entry.source
}
prior = case posted_tx.extra["manual_merge"]
when Array then posted_tx.extra["manual_merge"]
when Hash then [ posted_tx.extra["manual_merge"] ]
else []
end
tx_attrs[:extra] = posted_tx.extra.merge("manual_merge" => prior + [ new_record ])
end

# Attribute inheritance — only when the posted entry is not already user-protected.
unless posted_entry.protected_from_sync?
pending_transaction = pending_entry.entryable
if pending_transaction.is_a?(Transaction) && pending_transaction.category_id.present?
tx_attrs[:category_id] = pending_transaction.category_id
end
end

posted_tx.update!(tx_attrs) if tx_attrs.any?
Comment thread
CrossDrain marked this conversation as resolved.
end

# Date inheritance on the Entry row — separate from the Transaction update above.
unless posted_entry.protected_from_sync?
# Date: pending dates reflect actual transaction initiation time
posted_entry.update!(date: pending_entry_date) if posted_entry.date != pending_entry_date
# Name + Merchant intentionally NOT inherited — booked values are canonical
end

# Lock the posted entry so future syncs cannot overwrite the merged state
posted_entry.mark_user_modified!

Rails.logger.info("User merged pending entry #{pending_entry_id} (ext: #{external_id}) into posted entry #{posted_entry.id}")
pending_entry.destroy!
merge_succeeded = true
end

merge_succeeded
end

# Dismiss the duplicate suggestion - user says these are NOT the same transaction
Expand Down
11 changes: 11 additions & 0 deletions db/migrate/20260508120000_add_manual_merge_ext_id_index.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class AddManualMergeExtIdIndex < ActiveRecord::Migration[7.2]
disable_ddl_transaction!

def change
add_index :transactions,
Arel.sql("(extra->'manual_merge'->>'merged_from_external_id')"),
name: "idx_transactions_manual_merge_ext_id",
where: "(extra ? 'manual_merge')",
algorithm: :concurrently
end
end
Loading