From 92c526dc87734a80232fa529c67418df65119855 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Wed, 24 Sep 2025 20:19:35 +0200 Subject: [PATCH 1/3] Add `IgnoreSenderReserveRequirements` param --- cmd/livepeer/starter/flags.go | 1 + cmd/livepeer/starter/starter.go | 293 ++++++++++++++++---------------- pm/recipient.go | 37 ++-- pm/recipient_test.go | 15 ++ pm/sendermonitor.go | 22 +++ pm/stub.go | 6 + server/redeemer.go | 12 ++ 7 files changed, 233 insertions(+), 153 deletions(-) diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index ff3395623d..ba3737449e 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -100,6 +100,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { cfg.MaxTotalEV = fs.String("maxTotalEV", *cfg.MaxTotalEV, "The maximum acceptable expected value for one PM payment") // Broadcaster deposit multiplier to determine max acceptable ticket faceValue cfg.DepositMultiplier = fs.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") + cfg.IgnoreSenderReserveRequirements = fs.Bool("ignoreSenderReserveRequirements", *cfg.IgnoreSenderReserveRequirements, "Skip sender reserve validation and rely solely on broadcaster deposits covering ticket face value (unsafe)") // Orchestrator base pricing info cfg.PricePerUnit = fs.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") // Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 57abfd4356..9ca246c487 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -80,112 +80,113 @@ const ( ) type LivepeerConfig struct { - Network *string - RtmpAddr *string - CliAddr *string - HttpAddr *string - ServiceAddr *string - Nodes *string - OrchAddr *string - VerifierURL *string - EthController *string - VerifierPath *string - LocalVerify *bool - HttpIngest *bool - Orchestrator *bool - Transcoder *bool - AIServiceRegistry *bool - AIWorker *bool - Gateway *bool - Broadcaster *bool - OrchSecret *string - TranscodingOptions *string - AIModels *string - MaxAttempts *int - SelectRandWeight *float64 - SelectStakeWeight *float64 - SelectPriceWeight *float64 - SelectPriceExpFactor *float64 - OrchPerfStatsURL *string - Region *string - MaxPricePerUnit *string - MaxPricePerCapability *string - IgnoreMaxPriceIfNeeded *bool - MinPerfScore *float64 - DiscoveryTimeout *time.Duration - ExtraNodes *int - MaxSessions *string - CurrentManifest *bool - Nvidia *string - Netint *string - HevcDecoding *bool - TestTranscoder *bool - GatewayHost *string - EthAcctAddr *string - EthPassword *string - EthKeystorePath *string - EthOrchAddr *string - EthUrl *string - TxTimeout *time.Duration - MaxTxReplacements *int - GasLimit *int - MinGasPrice *int64 - MaxGasPrice *int - InitializeRound *bool - InitializeRoundMaxDelay *time.Duration - TicketEV *string - MaxFaceValue *string - MaxTicketEV *string - MaxTotalEV *string - DepositMultiplier *int - PricePerUnit *string - PixelsPerUnit *string - PriceFeedAddr *string - AutoAdjustPrice *bool - PricePerGateway *string - PricePerBroadcaster *string - BlockPollingInterval *int - Redeemer *bool - RedeemerAddr *string - Reward *bool - Monitor *bool - MetricsPerStream *bool - MetricsExposeClientIP *bool - MetadataQueueUri *string - MetadataAmqpExchange *string - MetadataPublishTimeout *time.Duration - Datadir *string - AIModelsDir *string - Objectstore *string - Recordstore *string - FVfailGsBucket *string - FVfailGsKey *string - AuthWebhookURL *string - LiveAIAuthWebhookURL *string - LiveAITrickleHostForRunner *string - OrchWebhookURL *string - OrchBlacklist *string - OrchMinLivepeerVersion *string - TestOrchAvail *bool - AIRunnerImage *string - AIRunnerImageOverrides *string - AIVerboseLogs *bool - AIProcessingRetryTimeout *time.Duration - AIRunnerContainersPerGPU *int - AIMinRunnerVersion *string - KafkaBootstrapServers *string - KafkaUsername *string - KafkaPassword *string - KafkaGatewayTopic *string - MediaMTXApiPassword *string - LiveAIAuthApiKey *string - LiveAIHeartbeatURL *string - LiveAIHeartbeatHeaders *string - LiveAIHeartbeatInterval *time.Duration - LivePaymentInterval *time.Duration - LiveOutSegmentTimeout *time.Duration - LiveAICapRefreshModels *string - LiveAISaveNSegments *int + Network *string + RtmpAddr *string + CliAddr *string + HttpAddr *string + ServiceAddr *string + Nodes *string + OrchAddr *string + VerifierURL *string + EthController *string + VerifierPath *string + LocalVerify *bool + HttpIngest *bool + Orchestrator *bool + Transcoder *bool + AIServiceRegistry *bool + AIWorker *bool + Gateway *bool + Broadcaster *bool + OrchSecret *string + TranscodingOptions *string + AIModels *string + MaxAttempts *int + SelectRandWeight *float64 + SelectStakeWeight *float64 + SelectPriceWeight *float64 + SelectPriceExpFactor *float64 + OrchPerfStatsURL *string + Region *string + MaxPricePerUnit *string + MaxPricePerCapability *string + IgnoreMaxPriceIfNeeded *bool + MinPerfScore *float64 + DiscoveryTimeout *time.Duration + ExtraNodes *int + MaxSessions *string + CurrentManifest *bool + Nvidia *string + Netint *string + HevcDecoding *bool + TestTranscoder *bool + GatewayHost *string + EthAcctAddr *string + EthPassword *string + EthKeystorePath *string + EthOrchAddr *string + EthUrl *string + TxTimeout *time.Duration + MaxTxReplacements *int + GasLimit *int + MinGasPrice *int64 + MaxGasPrice *int + InitializeRound *bool + InitializeRoundMaxDelay *time.Duration + TicketEV *string + MaxFaceValue *string + MaxTicketEV *string + MaxTotalEV *string + DepositMultiplier *int + IgnoreSenderReserveRequirements *bool + PricePerUnit *string + PixelsPerUnit *string + PriceFeedAddr *string + AutoAdjustPrice *bool + PricePerGateway *string + PricePerBroadcaster *string + BlockPollingInterval *int + Redeemer *bool + RedeemerAddr *string + Reward *bool + Monitor *bool + MetricsPerStream *bool + MetricsExposeClientIP *bool + MetadataQueueUri *string + MetadataAmqpExchange *string + MetadataPublishTimeout *time.Duration + Datadir *string + AIModelsDir *string + Objectstore *string + Recordstore *string + FVfailGsBucket *string + FVfailGsKey *string + AuthWebhookURL *string + LiveAIAuthWebhookURL *string + LiveAITrickleHostForRunner *string + OrchWebhookURL *string + OrchBlacklist *string + OrchMinLivepeerVersion *string + TestOrchAvail *bool + AIRunnerImage *string + AIRunnerImageOverrides *string + AIVerboseLogs *bool + AIProcessingRetryTimeout *time.Duration + AIRunnerContainersPerGPU *int + AIMinRunnerVersion *string + KafkaBootstrapServers *string + KafkaUsername *string + KafkaPassword *string + KafkaGatewayTopic *string + MediaMTXApiPassword *string + LiveAIAuthApiKey *string + LiveAIHeartbeatURL *string + LiveAIHeartbeatHeaders *string + LiveAIHeartbeatInterval *time.Duration + LivePaymentInterval *time.Duration + LiveOutSegmentTimeout *time.Duration + LiveAICapRefreshModels *string + LiveAISaveNSegments *int } // DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process. @@ -263,6 +264,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultMaxPricePerUnit := "0" defaultMaxPricePerCapability := "" defaultIgnoreMaxPriceIfNeeded := false + defaultIgnoreSenderReserveRequirements := false defaultPixelsPerUnit := "1" defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet defaultAutoAdjustPrice := true @@ -361,40 +363,41 @@ func DefaultLivepeerConfig() LivepeerConfig { LiveAIHeartbeatInterval: &defaultLiveAIHeartbeatInterval, // Onchain: - EthAcctAddr: &defaultEthAcctAddr, - EthPassword: &defaultEthPassword, - EthKeystorePath: &defaultEthKeystorePath, - EthOrchAddr: &defaultEthOrchAddr, - EthUrl: &defaultEthUrl, - TxTimeout: &defaultTxTimeout, - MaxTxReplacements: &defaultMaxTxReplacements, - GasLimit: &defaultGasLimit, - MaxGasPrice: &defaultMaxGasPrice, - EthController: &defaultEthController, - InitializeRound: &defaultInitializeRound, - InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay, - TicketEV: &defaultTicketEV, - MaxFaceValue: &defaultMaxFaceValue, - MaxTicketEV: &defaultMaxTicketEV, - MaxTotalEV: &defaultMaxTotalEV, - DepositMultiplier: &defaultDepositMultiplier, - MaxPricePerUnit: &defaultMaxPricePerUnit, - MaxPricePerCapability: &defaultMaxPricePerCapability, - IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded, - PixelsPerUnit: &defaultPixelsPerUnit, - PriceFeedAddr: &defaultPriceFeedAddr, - AutoAdjustPrice: &defaultAutoAdjustPrice, - PricePerGateway: &defaultPricePerGateway, - PricePerBroadcaster: &defaultPricePerBroadcaster, - BlockPollingInterval: &defaultBlockPollingInterval, - Redeemer: &defaultRedeemer, - RedeemerAddr: &defaultRedeemerAddr, - Monitor: &defaultMonitor, - MetricsPerStream: &defaultMetricsPerStream, - MetricsExposeClientIP: &defaultMetricsExposeClientIP, - MetadataQueueUri: &defaultMetadataQueueUri, - MetadataAmqpExchange: &defaultMetadataAmqpExchange, - MetadataPublishTimeout: &defaultMetadataPublishTimeout, + EthAcctAddr: &defaultEthAcctAddr, + EthPassword: &defaultEthPassword, + EthKeystorePath: &defaultEthKeystorePath, + EthOrchAddr: &defaultEthOrchAddr, + EthUrl: &defaultEthUrl, + TxTimeout: &defaultTxTimeout, + MaxTxReplacements: &defaultMaxTxReplacements, + GasLimit: &defaultGasLimit, + MaxGasPrice: &defaultMaxGasPrice, + EthController: &defaultEthController, + InitializeRound: &defaultInitializeRound, + InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay, + TicketEV: &defaultTicketEV, + MaxFaceValue: &defaultMaxFaceValue, + MaxTicketEV: &defaultMaxTicketEV, + MaxTotalEV: &defaultMaxTotalEV, + DepositMultiplier: &defaultDepositMultiplier, + IgnoreSenderReserveRequirements: &defaultIgnoreSenderReserveRequirements, + MaxPricePerUnit: &defaultMaxPricePerUnit, + MaxPricePerCapability: &defaultMaxPricePerCapability, + IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded, + PixelsPerUnit: &defaultPixelsPerUnit, + PriceFeedAddr: &defaultPriceFeedAddr, + AutoAdjustPrice: &defaultAutoAdjustPrice, + PricePerGateway: &defaultPricePerGateway, + PricePerBroadcaster: &defaultPricePerBroadcaster, + BlockPollingInterval: &defaultBlockPollingInterval, + Redeemer: &defaultRedeemer, + RedeemerAddr: &defaultRedeemerAddr, + Monitor: &defaultMonitor, + MetricsPerStream: &defaultMetricsPerStream, + MetricsExposeClientIP: &defaultMetricsExposeClientIP, + MetadataQueueUri: &defaultMetadataQueueUri, + MetadataAmqpExchange: &defaultMetadataAmqpExchange, + MetadataPublishTimeout: &defaultMetadataPublishTimeout, // Ingest: HttpIngest: &defaultHttpIngest, @@ -1015,9 +1018,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { defer sm.Stop() tcfg := pm.TicketParamsConfig{ - EV: ev, - RedeemGas: redeemGas, - TxCostMultiplier: txCostMultiplier, + EV: ev, + RedeemGas: redeemGas, + TxCostMultiplier: txCostMultiplier, + IgnoreSenderReserveRequirements: *cfg.IgnoreSenderReserveRequirements, } n.Recipient, err = pm.NewRecipient( recipientAddr, @@ -1032,6 +1036,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Errorf("Error setting up PM recipient: %v", err) return } + if *cfg.IgnoreSenderReserveRequirements { + glog.Warning("Sender reserve requirements disabled; relying on broadcaster deposit to cover ticket face value. Double-spend protection is reduced.") + } mfv, _ := new(big.Int).SetString(*cfg.MaxFaceValue, 10) if mfv == nil { panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue)) diff --git a/pm/recipient.go b/pm/recipient.go index 2199842489..6545ff3531 100644 --- a/pm/recipient.go +++ b/pm/recipient.go @@ -69,6 +69,9 @@ type TicketParamsConfig struct { // TxCostMultiplier is the desired multiplier of the transaction // cost for redemption TxCostMultiplier int + + // IgnoreSenderReserveRequirements instructs the recipient to skip sender reserve checks and accept tickets as long as the computed face value meets EV requirements + IgnoreSenderReserveRequirements bool } // GasPriceMonitor defines methods for monitoring gas prices @@ -271,16 +274,28 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { faceValue = new(big.Int).Mul(r.cfg.EV, evMultiplier) } - // Fetch current max float for sender - maxFloat, err := r.sm.MaxFloat(sender) - if err != nil { - return nil, err - } + var maxFloat *big.Int + if !r.cfg.IgnoreSenderReserveRequirements { + // Fetch current max float for sender + var err error + maxFloat, err = r.sm.MaxFloat(sender) + if err != nil { + return nil, err + } - if faceValue.Cmp(maxFloat) > 0 { - // If faceValue > maxFloat - // Set faceValue = maxFloat - faceValue = maxFloat + if faceValue.Cmp(maxFloat) > 0 { + // If faceValue > maxFloat + // Set faceValue = maxFloat + faceValue = maxFloat + } + } else { + deposit, err := r.sm.SenderDeposit(sender) + if err != nil { + return nil, err + } + if deposit.Cmp(faceValue) < 0 { + return nil, errInsufficientSenderReserve + } } if r.maxfacevalue.Cmp(big.NewInt(0)) > 0 { @@ -290,7 +305,9 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { } if monitor.Enabled { monitor.TicketFaceValue(sender.Hex(), faceValue) - monitor.MaxFloat(sender.Hex(), maxFloat) + if maxFloat != nil { + monitor.MaxFloat(sender.Hex(), maxFloat) + } } if faceValue.Cmp(r.cfg.EV) < 0 { return nil, errInsufficientSenderReserve diff --git a/pm/recipient_test.go b/pm/recipient_test.go index d84f5fab39..704eeeac72 100644 --- a/pm/recipient_test.go +++ b/pm/recipient_test.go @@ -530,6 +530,21 @@ func TestTicketParams(t *testing.T) { _, err = r.TicketParams(sender, big.NewRat(1, 1)) assert.EqualError(err, errInsufficientSenderReserve.Error()) + // Test ignoring sender reserve requirements bypasses maxFloat enforcement + cfg.IgnoreSenderReserveRequirements = true + sm.maxFloat = big.NewInt(100) + sm.deposit = new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil) + rIgnore := NewRecipientWithSecret(recipient, b, v, gm, sm, tm, secret, cfg) + paramsIgnore, err := rIgnore.TicketParams(sender, big.NewRat(1, 1)) + require.Nil(err) + assert.True(paramsIgnore.FaceValue.Cmp(sm.maxFloat) > 0) + + // Test ignoring sender reserve requirements errors if deposit < faceValue + sm.deposit = new(big.Int).Sub(paramsIgnore.FaceValue, big.NewInt(1)) + _, err = rIgnore.TicketParams(sender, big.NewRat(1, 1)) + assert.EqualError(err, errInsufficientSenderReserve.Error()) + cfg.IgnoreSenderReserveRequirements = false + // Test faceValue < txCostWithGasPrice(current gasPrice) and faceValue > txCostWithGasPrice(avg gasPrice) // Set current gasPrice higher than avg gasPrice gm.gasPrice = new(big.Int).Add(avgGasPrice, big.NewInt(1)) diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index 5ef5dcfedd..cf767dec55 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -41,6 +41,9 @@ type SenderMonitor interface { // MaxFloat returns a remote sender's max float MaxFloat(addr ethcommon.Address) (*big.Int, error) + // SenderDeposit returns a sender's deposit balance + SenderDeposit(addr ethcommon.Address) (*big.Int, error) + // ValidateSender checks whether a sender's unlock period ends the round after the next round ValidateSender(addr ethcommon.Address) error } @@ -162,6 +165,25 @@ func (sm *LocalSenderMonitor) MaxFloat(addr ethcommon.Address) (*big.Int, error) return sm.maxFloat(addr) } +// SenderDeposit returns the sender's current deposit balance +func (sm *LocalSenderMonitor) SenderDeposit(addr ethcommon.Address) (*big.Int, error) { + sm.mu.Lock() + defer sm.mu.Unlock() + + sm.ensureCache(addr) + + info, err := sm.smgr.GetSenderInfo(addr) + if err != nil { + return nil, err + } + + if info.Deposit == nil { + return big.NewInt(0), nil + } + + return new(big.Int).Set(info.Deposit), nil +} + // QueueTicket adds a ticket to the queue for a remote sender func (sm *LocalSenderMonitor) QueueTicket(ticket *SignedTicket) error { sm.mu.Lock() diff --git a/pm/stub.go b/pm/stub.go index 33ba0352d1..8c62e189bf 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -388,12 +388,14 @@ type stubSenderMonitor struct { maxFloatErr error validateSenderErr error shouldFail error + deposit *big.Int } func newStubSenderMonitor() *stubSenderMonitor { return &stubSenderMonitor{ maxFloat: big.NewInt(0), redeemable: make(chan *redemption), + deposit: big.NewInt(0), } } @@ -433,6 +435,10 @@ func (s *stubSenderMonitor) MaxFloat(_ ethcommon.Address) (*big.Int, error) { return s.maxFloat, nil } +func (s *stubSenderMonitor) SenderDeposit(_ ethcommon.Address) (*big.Int, error) { + return s.deposit, nil +} + func (s *stubSenderMonitor) ValidateSender(_ ethcommon.Address) error { return s.validateSenderErr } // MockRecipient is useful for testing components that depend on pm.Recipient diff --git a/server/redeemer.go b/server/redeemer.go index ccec858b43..13c4b1f79d 100644 --- a/server/redeemer.go +++ b/server/redeemer.go @@ -272,6 +272,18 @@ func (r *RedeemerClient) MaxFloat(sender ethcommon.Address) (*big.Int, error) { return mf, nil } +// SenderDeposit retrieves the sender's deposit directly from the local sender manager cache +func (r *RedeemerClient) SenderDeposit(sender ethcommon.Address) (*big.Int, error) { + info, err := r.sm.GetSenderInfo(sender) + if err != nil { + return nil, err + } + if info.Deposit == nil { + return big.NewInt(0), nil + } + return new(big.Int).Set(info.Deposit), nil +} + // ValidateSender checks whether a sender has not recently unlocked its deposit and reserve func (r *RedeemerClient) ValidateSender(sender ethcommon.Address) error { info, err := r.sm.GetSenderInfo(sender) From c6aa153b6401ea391f4867865106c7af357650d6 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Mon, 29 Sep 2025 16:18:30 +0200 Subject: [PATCH 2/3] Shorten param name --- cmd/livepeer/starter/flags.go | 2 +- cmd/livepeer/starter/starter.go | 296 ++++++++++++++++---------------- pm/recipient.go | 6 +- pm/recipient_test.go | 4 +- 4 files changed, 154 insertions(+), 154 deletions(-) diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index ba3737449e..bd11edcff6 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -100,7 +100,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { cfg.MaxTotalEV = fs.String("maxTotalEV", *cfg.MaxTotalEV, "The maximum acceptable expected value for one PM payment") // Broadcaster deposit multiplier to determine max acceptable ticket faceValue cfg.DepositMultiplier = fs.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") - cfg.IgnoreSenderReserveRequirements = fs.Bool("ignoreSenderReserveRequirements", *cfg.IgnoreSenderReserveRequirements, "Skip sender reserve validation and rely solely on broadcaster deposits covering ticket face value (unsafe)") + cfg.IgnoreSenderReserve = fs.Bool("IgnoreSenderReserve", *cfg.IgnoreSenderReserve, "Skip sender reserve validation and rely solely on broadcaster deposits covering ticket face value (unsafe)") // Orchestrator base pricing info cfg.PricePerUnit = fs.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") // Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 9ca246c487..ed75cbc41f 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -80,113 +80,113 @@ const ( ) type LivepeerConfig struct { - Network *string - RtmpAddr *string - CliAddr *string - HttpAddr *string - ServiceAddr *string - Nodes *string - OrchAddr *string - VerifierURL *string - EthController *string - VerifierPath *string - LocalVerify *bool - HttpIngest *bool - Orchestrator *bool - Transcoder *bool - AIServiceRegistry *bool - AIWorker *bool - Gateway *bool - Broadcaster *bool - OrchSecret *string - TranscodingOptions *string - AIModels *string - MaxAttempts *int - SelectRandWeight *float64 - SelectStakeWeight *float64 - SelectPriceWeight *float64 - SelectPriceExpFactor *float64 - OrchPerfStatsURL *string - Region *string - MaxPricePerUnit *string - MaxPricePerCapability *string - IgnoreMaxPriceIfNeeded *bool - MinPerfScore *float64 - DiscoveryTimeout *time.Duration - ExtraNodes *int - MaxSessions *string - CurrentManifest *bool - Nvidia *string - Netint *string - HevcDecoding *bool - TestTranscoder *bool - GatewayHost *string - EthAcctAddr *string - EthPassword *string - EthKeystorePath *string - EthOrchAddr *string - EthUrl *string - TxTimeout *time.Duration - MaxTxReplacements *int - GasLimit *int - MinGasPrice *int64 - MaxGasPrice *int - InitializeRound *bool - InitializeRoundMaxDelay *time.Duration - TicketEV *string - MaxFaceValue *string - MaxTicketEV *string - MaxTotalEV *string - DepositMultiplier *int - IgnoreSenderReserveRequirements *bool - PricePerUnit *string - PixelsPerUnit *string - PriceFeedAddr *string - AutoAdjustPrice *bool - PricePerGateway *string - PricePerBroadcaster *string - BlockPollingInterval *int - Redeemer *bool - RedeemerAddr *string - Reward *bool - Monitor *bool - MetricsPerStream *bool - MetricsExposeClientIP *bool - MetadataQueueUri *string - MetadataAmqpExchange *string - MetadataPublishTimeout *time.Duration - Datadir *string - AIModelsDir *string - Objectstore *string - Recordstore *string - FVfailGsBucket *string - FVfailGsKey *string - AuthWebhookURL *string - LiveAIAuthWebhookURL *string - LiveAITrickleHostForRunner *string - OrchWebhookURL *string - OrchBlacklist *string - OrchMinLivepeerVersion *string - TestOrchAvail *bool - AIRunnerImage *string - AIRunnerImageOverrides *string - AIVerboseLogs *bool - AIProcessingRetryTimeout *time.Duration - AIRunnerContainersPerGPU *int - AIMinRunnerVersion *string - KafkaBootstrapServers *string - KafkaUsername *string - KafkaPassword *string - KafkaGatewayTopic *string - MediaMTXApiPassword *string - LiveAIAuthApiKey *string - LiveAIHeartbeatURL *string - LiveAIHeartbeatHeaders *string - LiveAIHeartbeatInterval *time.Duration - LivePaymentInterval *time.Duration - LiveOutSegmentTimeout *time.Duration - LiveAICapRefreshModels *string - LiveAISaveNSegments *int + Network *string + RtmpAddr *string + CliAddr *string + HttpAddr *string + ServiceAddr *string + Nodes *string + OrchAddr *string + VerifierURL *string + EthController *string + VerifierPath *string + LocalVerify *bool + HttpIngest *bool + Orchestrator *bool + Transcoder *bool + AIServiceRegistry *bool + AIWorker *bool + Gateway *bool + Broadcaster *bool + OrchSecret *string + TranscodingOptions *string + AIModels *string + MaxAttempts *int + SelectRandWeight *float64 + SelectStakeWeight *float64 + SelectPriceWeight *float64 + SelectPriceExpFactor *float64 + OrchPerfStatsURL *string + Region *string + MaxPricePerUnit *string + MaxPricePerCapability *string + IgnoreMaxPriceIfNeeded *bool + MinPerfScore *float64 + DiscoveryTimeout *time.Duration + ExtraNodes *int + MaxSessions *string + CurrentManifest *bool + Nvidia *string + Netint *string + HevcDecoding *bool + TestTranscoder *bool + GatewayHost *string + EthAcctAddr *string + EthPassword *string + EthKeystorePath *string + EthOrchAddr *string + EthUrl *string + TxTimeout *time.Duration + MaxTxReplacements *int + GasLimit *int + MinGasPrice *int64 + MaxGasPrice *int + InitializeRound *bool + InitializeRoundMaxDelay *time.Duration + TicketEV *string + MaxFaceValue *string + MaxTicketEV *string + MaxTotalEV *string + DepositMultiplier *int + IgnoreSenderReserve *bool + PricePerUnit *string + PixelsPerUnit *string + PriceFeedAddr *string + AutoAdjustPrice *bool + PricePerGateway *string + PricePerBroadcaster *string + BlockPollingInterval *int + Redeemer *bool + RedeemerAddr *string + Reward *bool + Monitor *bool + MetricsPerStream *bool + MetricsExposeClientIP *bool + MetadataQueueUri *string + MetadataAmqpExchange *string + MetadataPublishTimeout *time.Duration + Datadir *string + AIModelsDir *string + Objectstore *string + Recordstore *string + FVfailGsBucket *string + FVfailGsKey *string + AuthWebhookURL *string + LiveAIAuthWebhookURL *string + LiveAITrickleHostForRunner *string + OrchWebhookURL *string + OrchBlacklist *string + OrchMinLivepeerVersion *string + TestOrchAvail *bool + AIRunnerImage *string + AIRunnerImageOverrides *string + AIVerboseLogs *bool + AIProcessingRetryTimeout *time.Duration + AIRunnerContainersPerGPU *int + AIMinRunnerVersion *string + KafkaBootstrapServers *string + KafkaUsername *string + KafkaPassword *string + KafkaGatewayTopic *string + MediaMTXApiPassword *string + LiveAIAuthApiKey *string + LiveAIHeartbeatURL *string + LiveAIHeartbeatHeaders *string + LiveAIHeartbeatInterval *time.Duration + LivePaymentInterval *time.Duration + LiveOutSegmentTimeout *time.Duration + LiveAICapRefreshModels *string + LiveAISaveNSegments *int } // DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process. @@ -264,7 +264,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultMaxPricePerUnit := "0" defaultMaxPricePerCapability := "" defaultIgnoreMaxPriceIfNeeded := false - defaultIgnoreSenderReserveRequirements := false + defaultIgnoreSenderReserve := false defaultPixelsPerUnit := "1" defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet defaultAutoAdjustPrice := true @@ -363,41 +363,41 @@ func DefaultLivepeerConfig() LivepeerConfig { LiveAIHeartbeatInterval: &defaultLiveAIHeartbeatInterval, // Onchain: - EthAcctAddr: &defaultEthAcctAddr, - EthPassword: &defaultEthPassword, - EthKeystorePath: &defaultEthKeystorePath, - EthOrchAddr: &defaultEthOrchAddr, - EthUrl: &defaultEthUrl, - TxTimeout: &defaultTxTimeout, - MaxTxReplacements: &defaultMaxTxReplacements, - GasLimit: &defaultGasLimit, - MaxGasPrice: &defaultMaxGasPrice, - EthController: &defaultEthController, - InitializeRound: &defaultInitializeRound, - InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay, - TicketEV: &defaultTicketEV, - MaxFaceValue: &defaultMaxFaceValue, - MaxTicketEV: &defaultMaxTicketEV, - MaxTotalEV: &defaultMaxTotalEV, - DepositMultiplier: &defaultDepositMultiplier, - IgnoreSenderReserveRequirements: &defaultIgnoreSenderReserveRequirements, - MaxPricePerUnit: &defaultMaxPricePerUnit, - MaxPricePerCapability: &defaultMaxPricePerCapability, - IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded, - PixelsPerUnit: &defaultPixelsPerUnit, - PriceFeedAddr: &defaultPriceFeedAddr, - AutoAdjustPrice: &defaultAutoAdjustPrice, - PricePerGateway: &defaultPricePerGateway, - PricePerBroadcaster: &defaultPricePerBroadcaster, - BlockPollingInterval: &defaultBlockPollingInterval, - Redeemer: &defaultRedeemer, - RedeemerAddr: &defaultRedeemerAddr, - Monitor: &defaultMonitor, - MetricsPerStream: &defaultMetricsPerStream, - MetricsExposeClientIP: &defaultMetricsExposeClientIP, - MetadataQueueUri: &defaultMetadataQueueUri, - MetadataAmqpExchange: &defaultMetadataAmqpExchange, - MetadataPublishTimeout: &defaultMetadataPublishTimeout, + EthAcctAddr: &defaultEthAcctAddr, + EthPassword: &defaultEthPassword, + EthKeystorePath: &defaultEthKeystorePath, + EthOrchAddr: &defaultEthOrchAddr, + EthUrl: &defaultEthUrl, + TxTimeout: &defaultTxTimeout, + MaxTxReplacements: &defaultMaxTxReplacements, + GasLimit: &defaultGasLimit, + MaxGasPrice: &defaultMaxGasPrice, + EthController: &defaultEthController, + InitializeRound: &defaultInitializeRound, + InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay, + TicketEV: &defaultTicketEV, + MaxFaceValue: &defaultMaxFaceValue, + MaxTicketEV: &defaultMaxTicketEV, + MaxTotalEV: &defaultMaxTotalEV, + DepositMultiplier: &defaultDepositMultiplier, + IgnoreSenderReserve: &defaultIgnoreSenderReserve, + MaxPricePerUnit: &defaultMaxPricePerUnit, + MaxPricePerCapability: &defaultMaxPricePerCapability, + IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded, + PixelsPerUnit: &defaultPixelsPerUnit, + PriceFeedAddr: &defaultPriceFeedAddr, + AutoAdjustPrice: &defaultAutoAdjustPrice, + PricePerGateway: &defaultPricePerGateway, + PricePerBroadcaster: &defaultPricePerBroadcaster, + BlockPollingInterval: &defaultBlockPollingInterval, + Redeemer: &defaultRedeemer, + RedeemerAddr: &defaultRedeemerAddr, + Monitor: &defaultMonitor, + MetricsPerStream: &defaultMetricsPerStream, + MetricsExposeClientIP: &defaultMetricsExposeClientIP, + MetadataQueueUri: &defaultMetadataQueueUri, + MetadataAmqpExchange: &defaultMetadataAmqpExchange, + MetadataPublishTimeout: &defaultMetadataPublishTimeout, // Ingest: HttpIngest: &defaultHttpIngest, @@ -1018,10 +1018,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { defer sm.Stop() tcfg := pm.TicketParamsConfig{ - EV: ev, - RedeemGas: redeemGas, - TxCostMultiplier: txCostMultiplier, - IgnoreSenderReserveRequirements: *cfg.IgnoreSenderReserveRequirements, + EV: ev, + RedeemGas: redeemGas, + TxCostMultiplier: txCostMultiplier, + IgnoreSenderReserve: *cfg.IgnoreSenderReserve, } n.Recipient, err = pm.NewRecipient( recipientAddr, @@ -1036,7 +1036,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Errorf("Error setting up PM recipient: %v", err) return } - if *cfg.IgnoreSenderReserveRequirements { + if *cfg.IgnoreSenderReserve { glog.Warning("Sender reserve requirements disabled; relying on broadcaster deposit to cover ticket face value. Double-spend protection is reduced.") } mfv, _ := new(big.Int).SetString(*cfg.MaxFaceValue, 10) diff --git a/pm/recipient.go b/pm/recipient.go index 6545ff3531..491e6fbfd9 100644 --- a/pm/recipient.go +++ b/pm/recipient.go @@ -70,8 +70,8 @@ type TicketParamsConfig struct { // cost for redemption TxCostMultiplier int - // IgnoreSenderReserveRequirements instructs the recipient to skip sender reserve checks and accept tickets as long as the computed face value meets EV requirements - IgnoreSenderReserveRequirements bool + // IgnoreSenderReserve instructs the recipient to skip sender reserve checks and accept tickets as long as the computed face value meets EV requirements + IgnoreSenderReserve bool } // GasPriceMonitor defines methods for monitoring gas prices @@ -275,7 +275,7 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { } var maxFloat *big.Int - if !r.cfg.IgnoreSenderReserveRequirements { + if !r.cfg.IgnoreSenderReserve { // Fetch current max float for sender var err error maxFloat, err = r.sm.MaxFloat(sender) diff --git a/pm/recipient_test.go b/pm/recipient_test.go index 704eeeac72..4371e6fb72 100644 --- a/pm/recipient_test.go +++ b/pm/recipient_test.go @@ -531,7 +531,7 @@ func TestTicketParams(t *testing.T) { assert.EqualError(err, errInsufficientSenderReserve.Error()) // Test ignoring sender reserve requirements bypasses maxFloat enforcement - cfg.IgnoreSenderReserveRequirements = true + cfg.IgnoreSenderReserve = true sm.maxFloat = big.NewInt(100) sm.deposit = new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil) rIgnore := NewRecipientWithSecret(recipient, b, v, gm, sm, tm, secret, cfg) @@ -543,7 +543,7 @@ func TestTicketParams(t *testing.T) { sm.deposit = new(big.Int).Sub(paramsIgnore.FaceValue, big.NewInt(1)) _, err = rIgnore.TicketParams(sender, big.NewRat(1, 1)) assert.EqualError(err, errInsufficientSenderReserve.Error()) - cfg.IgnoreSenderReserveRequirements = false + cfg.IgnoreSenderReserve = false // Test faceValue < txCostWithGasPrice(current gasPrice) and faceValue > txCostWithGasPrice(avg gasPrice) // Set current gasPrice higher than avg gasPrice From 88bb692f738abb405ad02f2a4993bf928b64ca5f Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Mon, 29 Sep 2025 16:45:47 +0200 Subject: [PATCH 3/3] Take full deposit + total reserve - pending tickets into account --- pm/recipient.go | 6 +++--- pm/sendermonitor.go | 24 +++++++++++++++++------- pm/stub.go | 6 +++++- server/redeemer.go | 21 ++++++++++++++++----- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/pm/recipient.go b/pm/recipient.go index 491e6fbfd9..028c89411c 100644 --- a/pm/recipient.go +++ b/pm/recipient.go @@ -289,11 +289,11 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { faceValue = maxFloat } } else { - deposit, err := r.sm.SenderDeposit(sender) + available, err := r.sm.SenderFunds(sender) if err != nil { return nil, err } - if deposit.Cmp(faceValue) < 0 { + if available.Cmp(faceValue) < 0 { return nil, errInsufficientSenderReserve } } @@ -305,7 +305,7 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { } if monitor.Enabled { monitor.TicketFaceValue(sender.Hex(), faceValue) - if maxFloat != nil { + if !r.cfg.IgnoreSenderReserve && maxFloat != nil { monitor.MaxFloat(sender.Hex(), maxFloat) } } diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index cf767dec55..ebb82ff3c2 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -41,8 +41,8 @@ type SenderMonitor interface { // MaxFloat returns a remote sender's max float MaxFloat(addr ethcommon.Address) (*big.Int, error) - // SenderDeposit returns a sender's deposit balance - SenderDeposit(addr ethcommon.Address) (*big.Int, error) + // SenderFunds returns the broadcaster's spendable balance (deposit + total reserve - pending tickets) + SenderFunds(addr ethcommon.Address) (*big.Int, error) // ValidateSender checks whether a sender's unlock period ends the round after the next round ValidateSender(addr ethcommon.Address) error @@ -165,8 +165,8 @@ func (sm *LocalSenderMonitor) MaxFloat(addr ethcommon.Address) (*big.Int, error) return sm.maxFloat(addr) } -// SenderDeposit returns the sender's current deposit balance -func (sm *LocalSenderMonitor) SenderDeposit(addr ethcommon.Address) (*big.Int, error) { +// SenderFunds returns the sender's deposit plus total reserve minus pending tickets +func (sm *LocalSenderMonitor) SenderFunds(addr ethcommon.Address) (*big.Int, error) { sm.mu.Lock() defer sm.mu.Unlock() @@ -177,11 +177,21 @@ func (sm *LocalSenderMonitor) SenderDeposit(addr ethcommon.Address) (*big.Int, e return nil, err } - if info.Deposit == nil { - return big.NewInt(0), nil + totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining) + if info.Reserve.ClaimedInCurrentRound != nil { + totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound) + } + + available := new(big.Int).Set(totalReserve) + if info.Deposit != nil { + available.Add(available, info.Deposit) + } + available.Sub(available, sm.senders[addr].pendingAmount) + if available.Sign() < 0 { + available = big.NewInt(0) } - return new(big.Int).Set(info.Deposit), nil + return available, nil } // QueueTicket adds a ticket to the queue for a remote sender diff --git a/pm/stub.go b/pm/stub.go index 8c62e189bf..1c2c53bc3a 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -381,6 +381,7 @@ func (s *stubGasPriceMonitor) GasPrice() *big.Int { type stubSenderMonitor struct { maxFloat *big.Int + funds *big.Int redeemable chan *redemption queued []*SignedTicket acceptable bool @@ -435,7 +436,10 @@ func (s *stubSenderMonitor) MaxFloat(_ ethcommon.Address) (*big.Int, error) { return s.maxFloat, nil } -func (s *stubSenderMonitor) SenderDeposit(_ ethcommon.Address) (*big.Int, error) { +func (s *stubSenderMonitor) SenderFunds(_ ethcommon.Address) (*big.Int, error) { + if s.funds != nil { + return new(big.Int).Set(s.funds), nil + } return s.deposit, nil } diff --git a/server/redeemer.go b/server/redeemer.go index 13c4b1f79d..c24c2d4e27 100644 --- a/server/redeemer.go +++ b/server/redeemer.go @@ -272,16 +272,27 @@ func (r *RedeemerClient) MaxFloat(sender ethcommon.Address) (*big.Int, error) { return mf, nil } -// SenderDeposit retrieves the sender's deposit directly from the local sender manager cache -func (r *RedeemerClient) SenderDeposit(sender ethcommon.Address) (*big.Int, error) { +// SenderFunds retrieves the sender's spendable balance directly from the local sender manager cache +func (r *RedeemerClient) SenderFunds(sender ethcommon.Address) (*big.Int, error) { info, err := r.sm.GetSenderInfo(sender) if err != nil { return nil, err } - if info.Deposit == nil { - return big.NewInt(0), nil + + totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining) + if info.Reserve.ClaimedInCurrentRound != nil { + totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound) + } + + available := new(big.Int).Set(totalReserve) + if info.Deposit != nil { + available.Add(available, info.Deposit) + } + + if available.Sign() < 0 { + available = big.NewInt(0) } - return new(big.Int).Set(info.Deposit), nil + return available, nil } // ValidateSender checks whether a sender has not recently unlocked its deposit and reserve