Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a few bugs in revalidate #16400

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 107 additions & 111 deletions src/lib/network_pool/indexed_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -678,132 +678,128 @@ let revalidate :
-> [ `Entire_pool | `Subset of Account_id.Set.t ]
-> (Account_id.t -> Account.t)
-> t * Transaction_hash.User_command_with_valid_signature.t Sequence.t =
fun t ~logger scope f ->
fun t ~logger scope get_account_by_id ->
let requires_revalidation =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to senders_to_validate or some such?

match scope with
| `Entire_pool ->
Fn.const true
t.all_by_sender
| `Subset subset ->
Set.mem subset
(* intersection of scope and all_by_sender *)
Account_id.Map.merge t.all_by_sender
(Account_id.Set.to_map subset ~f:(const ()))
~f:(fun ~key:_ -> function `Both (v, ()) -> Some v | _ -> None)
in
Map.fold t.all_by_sender ~init:(t, Sequence.empty)
Map.fold requires_revalidation ~init:(t, Sequence.empty)
~f:(fun
~key:sender
~data:(queue, currency_reserved)
((t', dropped_acc) as acc)
->
if not (requires_revalidation sender) then acc
let account : Account.t = get_account_by_id sender in
let current_balance =
Currency.Balance.to_amount
(Account.liquid_balance_at_slot
~global_slot:(global_slot_since_genesis t.config)
account )
in
[%log debug]
"Revalidating account $account in transaction pool ($account_nonce, \
$account_balance)"
~metadata:
[ ("account", `String (Sexp.to_string @@ Account_id.sexp_of_t sender))
; ("account_nonce", `Int (Account_nonce.to_int account.nonce))
; ( "account_balance"
, `String (Currency.Amount.to_mina_string current_balance) )
] ;
let first_cmd = F_sequence.head_exn queue in
let first_nonce =
first_cmd |> Transaction_hash.User_command_with_valid_signature.command
|> User_command.applicable_at_nonce
in
if
not
( Account.has_permission_to_send account
&& Account.has_permission_to_increment_nonce account )
then (
[%log debug] "Account no longer has permission to send; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else if Account_nonce.(account.nonce < first_nonce) then (
[%log debug]
"Current account nonce precedes first nonce in queue; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else
let account : Account.t = f sender in
let current_balance =
Currency.Balance.to_amount
(Account.liquid_balance_at_slot
~global_slot:(global_slot_since_genesis t.config)
account )
(* current_nonce >= first_nonce *)
let first_applicable_nonce_index =
F_sequence.findi queue ~f:(fun cmd' ->
let nonce =
Transaction_hash.User_command_with_valid_signature.command cmd'
|> User_command.applicable_at_nonce
in
Account_nonce.equal nonce account.nonce )
|> Option.value ~default:(F_sequence.length queue)
in
[%log debug]
"Revalidating account $account in transaction pool ($account_nonce, \
$account_balance)"
~metadata:
[ ( "account"
, `String (Sexp.to_string @@ Account_id.sexp_of_t sender) )
; ("account_nonce", `Int (Account_nonce.to_int account.nonce))
; ( "account_balance"
, `String (Currency.Amount.to_mina_string current_balance) )
] ;
let first_cmd = F_sequence.head_exn queue in
let first_nonce =
first_cmd
|> Transaction_hash.User_command_with_valid_signature.command
|> User_command.applicable_at_nonce
"Current account nonce succeeds first nonce in queue; splitting \
queue at $index"
~metadata:[ ("index", `Int first_applicable_nonce_index) ] ;
let drop_queue, keep_queue =
F_sequence.split_at queue first_applicable_nonce_index
in
if
not
( Account.has_permission_to_send account
&& Account.has_permission_to_increment_nonce account )
then (
[%log debug]
"Account no longer has permission to send; dropping queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else if Account_nonce.(account.nonce < first_nonce) then (
[%log debug]
"Current account nonce precedes first nonce in queue; dropping \
queue" ;
let dropped, t'' = remove_with_dependents_exn' t first_cmd in
(t'', Sequence.append dropped_acc dropped) )
else
(* current_nonce >= first_nonce *)
let first_applicable_nonce_index =
F_sequence.findi queue ~f:(fun cmd' ->
let nonce =
Transaction_hash.User_command_with_valid_signature.command
cmd'
|> User_command.applicable_at_nonce
in
Account_nonce.equal nonce account.nonce )
|> Option.value ~default:(F_sequence.length queue)
in
[%log debug]
"Current account nonce succeeds first nonce in queue; splitting \
queue at $index"
~metadata:[ ("index", `Int first_applicable_nonce_index) ] ;
let drop_queue, keep_queue =
F_sequence.split_at queue first_applicable_nonce_index
in
let currency_reserved' =
F_sequence.foldl
(fun c cmd ->
Option.value_exn
Currency.Amount.(c - Option.value_exn (currency_consumed cmd))
)
currency_reserved drop_queue
in
let keep_queue', currency_reserved'', dropped_for_balance =
drop_until_sufficient_balance
(keep_queue, currency_reserved')
current_balance
in
let to_drop =
Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance
in
match Sequence.next to_drop with
| None ->
acc
| Some (head, tail) ->
let t'' =
Sequence.fold tail
~init:
(remove_all_by_fee_and_hash_and_expiration_exn
(remove_applicable_exn t' head)
head )
~f:remove_all_by_fee_and_hash_and_expiration_exn
in
let t''' =
match F_sequence.uncons keep_queue' with
| None ->
{ t'' with
all_by_sender = Map.remove t''.all_by_sender sender
}
| Some (first_kept, _) ->
let first_kept_unchecked =
Transaction_hash.User_command_with_valid_signature.command
let currency_reserved' =
F_sequence.foldl
(fun c cmd ->
Option.value_exn
Currency.Amount.(c - Option.value_exn (currency_consumed cmd))
)
currency_reserved drop_queue
in
let keep_queue', currency_reserved'', dropped_for_balance =
drop_until_sufficient_balance
(keep_queue, currency_reserved')
current_balance
in
let to_drop =
Sequence.append (F_sequence.to_seq drop_queue) dropped_for_balance
in
match Sequence.next to_drop with
| None ->
acc
| Some (head, tail) ->
let t'' =
Sequence.fold tail
~init:
(remove_all_by_fee_and_hash_and_expiration_exn
(remove_applicable_exn t' head)
head )
~f:remove_all_by_fee_and_hash_and_expiration_exn
in
let t''' =
match F_sequence.uncons keep_queue' with
| None ->
{ t'' with
all_by_sender = Map.remove t''.all_by_sender sender
}
| Some (first_kept, _) ->
let first_kept_unchecked =
Transaction_hash.User_command_with_valid_signature.command
first_kept
in
{ t'' with
all_by_sender =
Map.set t''.all_by_sender ~key:sender
~data:(keep_queue', currency_reserved'')
; applicable_by_fee =
Map_set.insert
( module Transaction_hash
.User_command_with_valid_signature )
t''.applicable_by_fee
(User_command.fee_per_wu first_kept_unchecked)
first_kept
in
{ t'' with
all_by_sender =
Map.set t''.all_by_sender ~key:sender
~data:(keep_queue', currency_reserved'')
; applicable_by_fee =
Map_set.insert
( module Transaction_hash
.User_command_with_valid_signature )
t''.applicable_by_fee
(User_command.fee_per_wu first_kept_unchecked)
first_kept
}
in
(t''', Sequence.append dropped_acc to_drop) )
}
in
(t''', Sequence.append dropped_acc to_drop) )

let expired_by_global_slot (t : t) :
Transaction_hash.User_command_with_valid_signature.t Sequence.t =
Expand Down