diff --git a/src/lib/network_pool/indexed_pool.ml b/src/lib/network_pool/indexed_pool.ml index 733e53de6d6..22aa9e37bad 100644 --- a/src/lib/network_pool/indexed_pool.ml +++ b/src/lib/network_pool/indexed_pool.ml @@ -66,11 +66,8 @@ let config t = t.config (* Compute the total currency required from the sender to execute a command. Returns None in case of overflow. *) -let currency_consumed_unchecked : - constraint_constants:Genesis_constants.Constraint_constants.t - -> User_command.t - -> Currency.Amount.t option = - fun ~constraint_constants:_ cmd -> +let currency_consumed_unchecked : User_command.t -> Currency.Amount.t option = + fun cmd -> let fee_amt = Currency.Amount.of_fee @@ User_command.fee cmd in let open Currency.Amount in let amt = @@ -88,17 +85,14 @@ let currency_consumed_unchecked : in fee_amt + amt -let currency_consumed ~constraint_constants cmd = - currency_consumed_unchecked ~constraint_constants +let currency_consumed cmd = + currency_consumed_unchecked (Transaction_hash.User_command_with_valid_signature.command cmd) let currency_consumed' : - constraint_constants:Genesis_constants.Constraint_constants.t - -> User_command.t - -> (Currency.Amount.t, Command_error.t) Result.t = - fun ~constraint_constants cmd -> - cmd - |> currency_consumed_unchecked ~constraint_constants + User_command.t -> (Currency.Amount.t, Command_error.t) Result.t = + fun cmd -> + cmd |> currency_consumed_unchecked |> Result.of_option ~error:Command_error.Overflow module For_tests = struct @@ -180,7 +174,6 @@ module For_tests = struct nonce ; let consumed = currency_consumed_unchecked - ~constraint_constants:pool.config.constraint_constants (Transaction_hash.User_command_with_valid_signature.command cmd ) |> Option.value_exn @@ -568,14 +561,13 @@ let run : it. Called from revalidate and remove_lowest_fee, and when replacing transactions. *) let remove_with_dependents_exn : - constraint_constants:_ - -> Transaction_hash.User_command_with_valid_signature.t + Transaction_hash.User_command_with_valid_signature.t -> Sender_local_state.t ref -> ( Transaction_hash.User_command_with_valid_signature.t Sequence.t , Update.single , _ ) Writer_result.t = - fun ~constraint_constants (* ({ constraint_constants; _ } as t) *) cmd state -> + fun cmd state -> let unchecked = Transaction_hash.User_command_with_valid_signature.command cmd in @@ -604,7 +596,7 @@ let remove_with_dependents_exn : Option.value_exn (* safe because we check for overflow when we add commands. *) (let open Option.Let_syntax in - let%bind consumed = currency_consumed ~constraint_constants cmd' in + let%bind consumed = currency_consumed cmd' in Currency.Amount.(consumed + acc)) ) Currency.Amount.zero drop_queue in @@ -642,47 +634,158 @@ let run' t cmd x = x let remove_with_dependents_exn' t cmd = - match - run' t cmd - (remove_with_dependents_exn - ~constraint_constants:t.config.constraint_constants cmd ) - with + match run' t cmd (remove_with_dependents_exn cmd) with | Ok x -> x | Error _ -> failwith "remove_with_dependents_exn" (** Drop commands from the end of the queue until the total currency consumed is - <= the current balance. *) + <= the current balance. + + Returns the prefix of a queue, updated currency reserved and sequence of + dropped transactions in the same order they appear in queue. + *) let drop_until_sufficient_balance : - constraint_constants:Genesis_constants.Constraint_constants.t - -> Transaction_hash.User_command_with_valid_signature.t F_sequence.t + Transaction_hash.User_command_with_valid_signature.t F_sequence.t * Currency.Amount.t -> Currency.Amount.t -> Transaction_hash.User_command_with_valid_signature.t F_sequence.t * Currency.Amount.t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun ~constraint_constants (queue, currency_reserved) current_balance -> + fun (queue, currency_reserved) current_balance -> let rec go queue' currency_reserved' dropped_so_far = if Currency.Amount.(currency_reserved' <= current_balance) then (queue', currency_reserved', dropped_so_far) else - let daeh, liat = + let init, last = Option.value_exn ~message: "couldn't drop any more transactions when trying to preserve \ sufficient balance" (F_sequence.unsnoc queue') in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants liat) - in - go daeh + let consumed = Option.value_exn (currency_consumed last) in + go init (Option.value_exn Currency.Amount.(currency_reserved' - consumed)) - (Sequence.append dropped_so_far @@ Sequence.singleton liat) + (Sequence.shift_right dropped_so_far last) in go queue currency_reserved Sequence.empty +let revalidate_by_sender ~logger ~global_slot ~sender ~sender_account t queue + currency_reserved = + let current_balance = + Currency.Balance.to_amount + (Account.liquid_balance_at_slot ~global_slot sender_account) + in + [%log debug] + "Revalidating account $account in transaction pool ($account_nonce, \ + $account_balance)" + ~metadata: + [ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender)) + ; ("account_nonce", `Int (Account_nonce.to_int sender_account.nonce)) + ; ( "account_balance" + , `String (Currency.Amount.to_mina_string current_balance) ) + ] ; + let first_cmd = F_sequence.head_exn queue in + let first_nonce = + first_cmd |> Transaction_hash.User_command_with_valid_signature.command + |> User_command.applicable_at_nonce + in + if + not + ( Account.has_permission_to_send sender_account + && Account.has_permission_to_increment_nonce sender_account ) + then ( + [%log debug] "Account no longer has permission to send; dropping queue" ; + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, dropped) ) + else if Account_nonce.(sender_account.nonce < first_nonce) then ( + [%log debug] + "Current account nonce precedes first nonce in queue; dropping queue" ; + let dropped, t_updated = remove_with_dependents_exn' t first_cmd in + (t_updated, dropped) ) + else + (* current_nonce >= first_nonce *) + let first_applicable_nonce_index = + F_sequence.findi queue ~f:(fun cmd' -> + let nonce = + Transaction_hash.User_command_with_valid_signature.command cmd' + |> User_command.applicable_at_nonce + in + Account_nonce.equal nonce sender_account.nonce ) + |> Option.value ~default:(F_sequence.length queue) + in + [%log debug] + "Current account nonce succeeds first nonce in queue; splitting queue at \ + $index" + ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; + let dropped_from_nonce, retained_by_nonce = + F_sequence.split_at queue first_applicable_nonce_index + in + let currency_reserved_partially_updated = + F_sequence.foldl + (fun c cmd -> + Option.value_exn + Currency.Amount.(c - Option.value_exn (currency_consumed cmd)) ) + currency_reserved dropped_from_nonce + in + (* NB: dropped_for_balance is ordered by nonce *) + let keep_queue, currency_reserved_updated, dropped_for_balance = + drop_until_sufficient_balance + (retained_by_nonce, currency_reserved_partially_updated) + current_balance + in + let keeping_prefix = F_sequence.is_empty dropped_from_nonce in + let keeping_suffix = Sequence.is_empty dropped_for_balance in + (* t with all_by_sender and applicable_by_fee fields updated *) + let t_partially_updated = + match F_sequence.uncons keep_queue with + | _ when keeping_prefix && keeping_suffix -> + (* Nothing dropped, nothing needs to be updated *) + t + | None -> + (* We drop the entire queue, first element needs to be removed from + applicable_by_fee *) + let t' = remove_applicable_exn t first_cmd in + { t' with all_by_sender = Map.remove t'.all_by_sender sender } + | Some _ when keeping_prefix -> + (* We drop only transactions from the end of queue, keeping + the head untouched, no need to update applicable_by_fee *) + { t with + all_by_sender = + Map.set t.all_by_sender ~key:sender + ~data:(keep_queue, currency_reserved_updated) + } + | Some (first_kept, _) -> + (* We need to replace old queue head with the new queue head + in applicable_by_fee *) + let first_kept_unchecked = + Transaction_hash.User_command_with_valid_signature.command + first_kept + in + let t' = remove_applicable_exn t first_cmd in + { t' with + all_by_sender = + Map.set t'.all_by_sender ~key:sender + ~data:(keep_queue, currency_reserved_updated) + ; applicable_by_fee = + Map_set.insert + (module Transaction_hash.User_command_with_valid_signature) + t'.applicable_by_fee + (User_command.fee_per_wu first_kept_unchecked) + first_kept + } + in + let to_drop = + Sequence.append (F_sequence.to_seq dropped_from_nonce) dropped_for_balance + in + let t_updated = + Sequence.fold ~init:t_partially_updated + ~f:remove_all_by_fee_and_hash_and_expiration_exn to_drop + in + (t_updated, to_drop) + (* Iterate over commands in the pool, removing them if they require too much currency or have too low of a nonce. An argument is provided to instruct which commands require revalidation. @@ -693,134 +796,23 @@ let revalidate : -> [ `Entire_pool | `Subset of Account_id.Set.t ] -> (Account_id.t -> Account.t) -> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t = - fun ({ config = { constraint_constants; _ }; _ } as t) ~logger scope f -> + fun t_initial ~logger scope get_account_by_id -> let requires_revalidation = match scope with | `Entire_pool -> - Fn.const true + t_initial.all_by_sender | `Subset subset -> - Set.mem subset + (* intersection of scope and all_by_sender *) + Account_id.Map.merge t_initial.all_by_sender + (Account_id.Set.to_map subset ~f:(const ())) + ~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None) in - Map.fold t.all_by_sender ~init:(t, Sequence.empty) - ~f:(fun - ~key:sender - ~data:(queue, currency_reserved) - ((t', dropped_acc) as acc) - -> - if not (requires_revalidation sender) then acc - else - let account : Account.t = f sender in - let current_balance = - Currency.Balance.to_amount - (Account.liquid_balance_at_slot - ~global_slot:(global_slot_since_genesis t.config) - account ) - in - [%log debug] - "Revalidating account $account in transaction pool ($account_nonce, \ - $account_balance)" - ~metadata: - [ ( "account" - , `String (Sexp.to_string @@ Account_id.sexp_of_t sender) ) - ; ("account_nonce", `Int (Account_nonce.to_int account.nonce)) - ; ( "account_balance" - , `String (Currency.Amount.to_mina_string current_balance) ) - ] ; - let first_cmd = F_sequence.head_exn queue in - let first_nonce = - first_cmd - |> Transaction_hash.User_command_with_valid_signature.command - |> User_command.applicable_at_nonce - in - if - not - ( Account.has_permission_to_send account - && Account.has_permission_to_increment_nonce account ) - then ( - [%log debug] - "Account no longer has permission to send; dropping queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else if Account_nonce.(account.nonce < first_nonce) then ( - [%log debug] - "Current account nonce precedes first nonce in queue; dropping \ - queue" ; - let dropped, t'' = remove_with_dependents_exn' t first_cmd in - (t'', Sequence.append dropped_acc dropped) ) - else - (* current_nonce >= first_nonce *) - let first_applicable_nonce_index = - F_sequence.findi queue ~f:(fun cmd' -> - let nonce = - Transaction_hash.User_command_with_valid_signature.command - cmd' - |> User_command.applicable_at_nonce - in - Account_nonce.equal nonce account.nonce ) - |> Option.value ~default:(F_sequence.length queue) - in - [%log debug] - "Current account nonce succeeds first nonce in queue; splitting \ - queue at $index" - ~metadata:[ ("index", `Int first_applicable_nonce_index) ] ; - let drop_queue, keep_queue = - F_sequence.split_at queue first_applicable_nonce_index - in - let currency_reserved' = - F_sequence.foldl - (fun c cmd -> - Option.value_exn - Currency.Amount.( - c - - Option.value_exn - (currency_consumed ~constraint_constants cmd)) ) - currency_reserved drop_queue - in - let keep_queue', currency_reserved'', dropped_for_balance = - drop_until_sufficient_balance ~constraint_constants - (keep_queue, currency_reserved') - current_balance - in - let to_drop = - Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance - in - match Sequence.next to_drop with - | None -> - acc - | Some (head, tail) -> - let t'' = - Sequence.fold tail - ~init: - (remove_all_by_fee_and_hash_and_expiration_exn - (remove_applicable_exn t' head) - head ) - ~f:remove_all_by_fee_and_hash_and_expiration_exn - in - let t''' = - match F_sequence.uncons keep_queue' with - | None -> - { t'' with - all_by_sender = Map.remove t''.all_by_sender sender - } - | Some (first_kept, _) -> - let first_kept_unchecked = - Transaction_hash.User_command_with_valid_signature.command - first_kept - in - { t'' with - all_by_sender = - Map.set t''.all_by_sender ~key:sender - ~data:(keep_queue', currency_reserved'') - ; applicable_by_fee = - Map_set.insert - ( module Transaction_hash - .User_command_with_valid_signature ) - t''.applicable_by_fee - (User_command.fee_per_wu first_kept_unchecked) - first_kept - } - in - (t''', Sequence.append dropped_acc to_drop) ) + let global_slot = global_slot_since_genesis t_initial.config in + Map.fold requires_revalidation ~init:(t_initial, Sequence.empty) + ~f:(fun ~key:sender ~data:(queue, currency_reserved) (t, dropped_acc) -> + Tuple2.map_snd ~f:(Sequence.append dropped_acc) + @@ revalidate_by_sender ~logger ~global_slot ~sender + ~sender_account:(get_account_by_id sender) t queue currency_reserved ) let expired_by_global_slot (t : t) : Transaction_hash.User_command_with_valid_signature.t Sequence.t = @@ -898,7 +890,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct , Command_error.t ) M.t = fun ~config: - ( { constraint_constants + ( { constraint_constants = _ ; consensus_constants ; time_controller ; slot_tx_end @@ -929,9 +921,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct Result.Let_syntax.( (* C5 *) let%bind () = check_expiry config unchecked in - let%bind consumed = - currency_consumed' ~constraint_constants unchecked - in + let%bind consumed = currency_consumed' unchecked in let%map () = (* TODO: Proper exchange rate mechanism. *) let fee_token = User_command.fee_token unchecked in @@ -1058,7 +1048,7 @@ module Add_from_gossip_exn (M : Writer_result.S) = struct (* C3 *) in let%bind dropped = - remove_with_dependents_exn ~constraint_constants + remove_with_dependents_exn (F_sequence.head_exn drop_queue) by_sender |> M.lift @@ -1159,7 +1149,7 @@ let add_from_backtrack : -> Transaction_hash.User_command_with_valid_signature.t -> (t, Command_error.t) Result.t = fun ( { config = - { constraint_constants + { constraint_constants = _ ; consensus_constants ; time_controller ; slot_tx_end @@ -1186,9 +1176,7 @@ let add_from_backtrack : let fee_payer = User_command.fee_payer unchecked in let fee_per_wu = User_command.fee_per_wu unchecked in let cmd_hash = Transaction_hash.User_command_with_valid_signature.hash cmd in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants cmd) - in + let consumed = Option.value_exn (currency_consumed cmd) in match Map.find t.all_by_sender fee_payer with | None -> { all_by_sender = diff --git a/src/lib/network_pool/indexed_pool.mli b/src/lib/network_pool/indexed_pool.mli index d2a25ebb0eb..172bf1ed6c6 100644 --- a/src/lib/network_pool/indexed_pool.mli +++ b/src/lib/network_pool/indexed_pool.mli @@ -151,7 +151,6 @@ module For_tests : sig Account_id.Map.t val currency_consumed : - constraint_constants:Genesis_constants.Constraint_constants.t - -> Transaction_hash.User_command_with_valid_signature.t + Transaction_hash.User_command_with_valid_signature.t -> Currency.Amount.t option end diff --git a/src/lib/network_pool/test/indexed_pool_tests.ml b/src/lib/network_pool/test/indexed_pool_tests.ml index 042175ada96..c3c33586d45 100644 --- a/src/lib/network_pool/test/indexed_pool_tests.ml +++ b/src/lib/network_pool/test/indexed_pool_tests.ml @@ -35,8 +35,7 @@ let singleton_properties () = (Amount.of_nanomina_int_exn 500) in if - Option.value_exn (currency_consumed ~constraint_constants cmd) - |> Amount.to_nanomina_int > 500 + Option.value_exn (currency_consumed cmd) |> Amount.to_nanomina_int > 500 then match add_res with | Error (Insufficient_funds _) -> @@ -214,9 +213,7 @@ let replacement () = ~common:(fun c -> { c with fee = Amount.to_fee fee }) ~body:(fun b -> { b with amount }) in - let consumed = - Option.value_exn (currency_consumed ~constraint_constants cmd') - in + let consumed = Option.value_exn (currency_consumed cmd') in let%map rest = go (Account_nonce.succ current_nonce) @@ -293,7 +290,7 @@ let replacement () = ~f:(fun consumed_so_far cmd -> Option.value_exn Option.( - currency_consumed ~constraint_constants cmd + currency_consumed cmd >>= fun consumed -> Amount.(consumed + consumed_so_far)) ) in assert (Amount.(currency_consumed_pre_replace <= init_balance)) ; @@ -301,12 +298,9 @@ let replacement () = Option.value_exn (let open Option.Let_syntax in let%bind replaced_currency_consumed = - currency_consumed ~constraint_constants - @@ List.nth_exn setup_cmds replaced_idx - in - let%bind replacer_currency_consumed = - currency_consumed ~constraint_constants replace_cmd + currency_consumed @@ List.nth_exn setup_cmds replaced_idx in + let%bind replacer_currency_consumed = currency_consumed replace_cmd in let%bind a = Amount.(currency_consumed_pre_replace - replaced_currency_consumed) in