Skip to content
Open
105 changes: 94 additions & 11 deletions onion_message/onion_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,28 @@
"context"
"errors"
"fmt"
"sync"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/msgmux"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/record"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/tlv"
)

const (
// defaultOnionMessageQueueSize is the default size of the onion
// message queue.
defaultOnionMessageQueueSize = 50

// defaultMinRedThreshold is the default minimum threshold for the Random
// Early Drop queue. It is set to 80% of the queue size.
defaultMinRedThreshold = 40
)

var (
// ErrBadMessage is returned when we can't process an onion message.
ErrBadMessage = errors.New("onion message processing failed")
Expand Down Expand Up @@ -108,6 +120,16 @@

// MsgSender sends a onion message to the target peer.
MsgSender OnionMessageSender

// onionMsgQueue is a queue that contains incoming onion messages.
onionMsgQueue *queue.BackpressureQueue[msgmux.PeerMsg]

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run macOS itest

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run windows itest

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres, backend=bitcoind dbbackend=postgres)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-etcd, backend=bitcoind dbbackend=etcd)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-rpcpolling, backend="bitcoind rpcpolling")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-cover)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql, backend=bitcoind dbbackend=postgres nativesql=true)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite=nativesql-experiment, backend=bitcoind dbbackend=sqlite nativesql=tru...

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql-experiment, backend=bitcoind dbbackend=postgres nativesql...

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind-notxindex, backend="bitcoind notxindex")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (btcd, backend=btcd cover=1)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite, backend=bitcoind dbbackend=sqlite)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (neutrino, backend=neutrino cover=1)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind, backend=bitcoind cover=1)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite-nativesql, backend=bitcoind dbbackend=sqlite nativesql=true)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.BackpressureQueue

Check failure on line 125 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.BackpressureQueue

// quit is a channel that is closed when the endpoint is shutting down.
quit chan struct{}

// wg is a wait group that is used to wait for the message handler to
// exit.
wg sync.WaitGroup
}

// A compile-time check to ensure OnionEndpoint implements the Endpoint
Expand All @@ -116,8 +138,17 @@

// NewOnionEndpoint creates a new OnionEndpoint with the given options.
func NewOnionEndpoint(opts ...OnionEndpointOption) *OnionEndpoint {
// By default, we will drop onion messages if the queue is full.
dropPredicate := queue.RandomEarlyDrop[msgmux.PeerMsg](

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run macOS itest

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run windows itest

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres, backend=bitcoind dbbackend=postgres)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-etcd, backend=bitcoind dbbackend=etcd)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-rpcpolling, backend="bitcoind rpcpolling")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-cover)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql, backend=bitcoind dbbackend=postgres nativesql=true)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite=nativesql-experiment, backend=bitcoind dbbackend=sqlite nativesql=tru...

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql-experiment, backend=bitcoind dbbackend=postgres nativesql...

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind-notxindex, backend="bitcoind notxindex")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (btcd, backend=btcd cover=1)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite, backend=bitcoind dbbackend=sqlite)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (neutrino, backend=neutrino cover=1)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind, backend=bitcoind cover=1)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite-nativesql, backend=bitcoind dbbackend=sqlite nativesql=true)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.RandomEarlyDrop

Check failure on line 142 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.RandomEarlyDrop
defaultMinRedThreshold, defaultOnionMessageQueueSize,
)

o := &OnionEndpoint{
onionMessageServer: nil,
onionMsgQueue: queue.NewBackpressureQueue[msgmux.PeerMsg](

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run macOS itest

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run windows itest

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres, backend=bitcoind dbbackend=postgres)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-etcd, backend=bitcoind dbbackend=etcd)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-rpcpolling, backend="bitcoind rpcpolling")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-cover)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql, backend=bitcoind dbbackend=postgres nativesql=true)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite=nativesql-experiment, backend=bitcoind dbbackend=sqlite nativesql=tru...

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql-experiment, backend=bitcoind dbbackend=postgres nativesql...

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind-notxindex, backend="bitcoind notxindex")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (btcd, backend=btcd cover=1)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite, backend=bitcoind dbbackend=sqlite)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (neutrino, backend=neutrino cover=1)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind, backend=bitcoind cover=1)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite-nativesql, backend=bitcoind dbbackend=sqlite nativesql=true)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.NewBackpressureQueue

Check failure on line 148 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.NewBackpressureQueue
defaultOnionMessageQueueSize, dropPredicate,
),
quit: make(chan struct{}),
}
for _, opt := range opts {
opt(o)
Expand All @@ -137,16 +168,76 @@
return ok
}

// SendMessage processes the incoming onion message.
// It returns true if the message was successfully processed.
// SendMessage processes the incoming onion message. It returns true if the
// message was successfully processed.
func (o *OnionEndpoint) SendMessage(ctx context.Context,
msg msgmux.PeerMsg) bool {

onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
_, ok := msg.Message.(*lnwire.OnionMessage)
if !ok {
return false
}

err := o.onionMsgQueue.Enqueue(ctx, msg)
if err != nil {
if errors.Is(err, queue.ErrQueueFullAndDropped) {

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run macOS itest

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run windows itest

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres, backend=bitcoind dbbackend=postgres)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.ErrQueueFullAndDropped) (typecheck)

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Lint code

undefined: queue.ErrQueueFullAndDropped (typecheck)

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-etcd, backend=bitcoind dbbackend=etcd)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-rpcpolling, backend="bitcoind rpcpolling")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-cover)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql, backend=bitcoind dbbackend=postgres nativesql=true)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite=nativesql-experiment, backend=bitcoind dbbackend=sqlite nativesql=tru...

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-postgres-nativesql-experiment, backend=bitcoind dbbackend=postgres nativesql...

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind-notxindex, backend="bitcoind notxindex")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (btcd, backend=btcd cover=1)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite, backend=bitcoind dbbackend=sqlite)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (neutrino, backend=neutrino cover=1)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run basic itests (bitcoind, backend=bitcoind cover=1)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run itests (bitcoind-sqlite-nativesql, backend=bitcoind dbbackend=sqlite nativesql=true)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_sqlite")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_sqlite")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_etcd")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="test_db_postgres")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit tags="kvdb_postgres")

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.ErrQueueFullAndDropped

Check failure on line 183 in onion_message/onion_endpoint.go

View workflow job for this annotation

GitHub Actions / Run unit tests (unit-race)

undefined: queue.ErrQueueFullAndDropped
log.Warnf("Onion message queue full, dropping message")
} else {
log.Errorf("Failed to enqueue onion message: %v", err)
}
}

return true
}

// Start starts the onion message handler.
func (o *OnionEndpoint) Start() {
o.wg.Add(1)
go o.messageHandler()
}

// Stop stops the onion message handler.
func (o *OnionEndpoint) Stop() {
close(o.quit)
o.wg.Wait()
}

// messageHandler is the main goroutine that processes onion messages from the
// queue.
func (o *OnionEndpoint) messageHandler() {
defer o.wg.Done()

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-o.quit
cancel()
}()

for {
result := o.onionMsgQueue.Dequeue(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be worked into the actor system as mentioned in other PRs.


if result.IsErr() {
if errors.Is(result.Err(), context.Canceled) {
return
}

log.Errorf("OnionEndpoint Dequeue failed: %v",
result.Err())
continue
}

result.WhenOk(o.processMessage)
}
}

// processMessage processes an onion message.
func (o *OnionEndpoint) processMessage(msg msgmux.PeerMsg) {
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
if !ok {
// This should not happen as we check it before enqueueing.
return
}

peer := msg.PeerPub.SerializeCompressed()
log.Debugf("OnionEndpoint received OnionMessage from peer %x: "+
"BlindingPoint=%v, OnionPacket[:10]=%10x...", peer,
Expand Down Expand Up @@ -176,15 +267,7 @@
// Send the update to any subscribers.
if sendErr := o.onionMessageServer.SendUpdate(update); sendErr != nil {
log.Errorf("Failed to send onion message update: %v", sendErr)
return false
}

// If we failed to handle the onion message, we return false.
if err != nil {
return false
}

return true
}

// handleOnionMessage decodes and processes an onion message.
Expand Down
15 changes: 12 additions & 3 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/msgmux"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/onion_message"

Check failure on line 48 in peer/brontide.go

View workflow job for this annotation

GitHub Actions / Lint code

could not import github.com/lightningnetwork/lnd/onion_message (-: # github.com/lightningnetwork/lnd/onion_message
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/protofsm"
"github.com/lightningnetwork/lnd/queue"
Expand Down Expand Up @@ -646,6 +646,9 @@

// log is a peer-specific logging instance.
log btclog.Logger

// onionMessageEndpoint is the endpoint that handles onion messages.
onionMessageEndpoint *onion_message.OnionEndpoint
}

// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer
Expand Down Expand Up @@ -899,17 +902,18 @@
return fmt.Errorf("unable to load channels: %w", err)
}

onionMessageEndpoint := onion_message.NewOnionEndpoint(
p.onionMessageEndpoint = onion_message.NewOnionEndpoint(
onion_message.WithMessageServer(p.cfg.OnionMessageServer),
onion_message.WithOnionProcessor(p.cfg.Sphinx),
onion_message.WithMessageSender(p.cfg.OnionMsgSender),
)
p.onionMessageEndpoint.Start()

// We register the onion message endpoint with the message router.
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
_ = r.UnregisterEndpoint(p.onionMessageEndpoint.Name())

return r.RegisterEndpoint(onionMessageEndpoint)
return r.RegisterEndpoint(p.onionMessageEndpoint)
})
if err != nil {
return fmt.Errorf("unable to register endpoint for onion "+
Expand Down Expand Up @@ -1656,6 +1660,11 @@
// Stop PingManager before closing TCP connection.
p.pingManager.Stop()

// Stop the onion message endpoint if we have one.
if p.onionMessageEndpoint != nil {
p.onionMessageEndpoint.Stop()
}

// Ensure that the TCP connection is properly closed before continuing.
p.cfg.Conn.Close()

Expand Down
Loading