Skip to content

Commit 88807fa

Browse files
authored
Merge pull request #3727 from ad-astra-video/av/add-byoc-streaming
BYOC: add streaming
2 parents 5629cca + 229effe commit 88807fa

15 files changed

+4508
-292
lines changed

CHANGELOG_PENDING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@
2323
* [#3777](https://github.com/livepeer/go-livepeer/pull/3777) docker: Forcefully SIGKILL runners after timeout (@pwilczynskiclearcode)
2424
* [#3779](https://github.com/livepeer/go-livepeer/pull/3779) worker: Fix orphaned containers on node shutdown (@victorges)
2525
* [#3781](https://github.com/livepeer/go-livepeer/pull/3781) worker/docker: Destroy containers from watch routines (@victorges)
26+
* [#3727](https://github.com/livepeer/go-livepeer/pull/3727) BYOC: add streaming for BYOC pipelines using trickle (@ad-astra-video)
2627

2728
#### CLI

common/testutil.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ func (s *StubServerStream) Send(n *net.NotifySegment) error {
8282
func IgnoreRoutines() []goleak.Option {
8383
// goleak works by making list of all running goroutines and reporting error if it finds any
8484
// this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines
85+
// following added for job_stream_tests, believe related to open connections on trickle server that are cleaned up periodically
86+
// net/http.(*persistConn).mapRoundTripError
87+
// net/http.(*persistConn).readLoop
88+
// net/http.(*persistConn).writeLoop
89+
// io.(*pipe).read
8590
funcs2ignore := []string{"github.com/golang/glog.(*loggingT).flushDaemon", "go.opencensus.io/stats/view.(*worker).start",
8691
"github.com/rjeczalik/notify.(*recursiveTree).dispatch", "github.com/rjeczalik/notify._Cfunc_CFRunLoopRun", "github.com/ethereum/go-ethereum/metrics.(*meterArbiter).tick",
8792
"github.com/ethereum/go-ethereum/consensus/ethash.(*Ethash).remote", "github.com/ethereum/go-ethereum/core.(*txSenderCacher).cache",
@@ -93,6 +98,12 @@ func IgnoreRoutines() []goleak.Option {
9398
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup",
9499
"internal/synctest.Run",
95100
"testing/synctest.testingSynctestTest",
101+
"github.com/livepeer/go-livepeer/server.startTrickleSubscribe.func2",
102+
"net/http.(*persistConn).mapRoundTripError",
103+
"net/http.(*persistConn).readLoop",
104+
"net/http.(*persistConn).writeLoop",
105+
"io.(*pipe).read",
106+
"github.com/livepeer/go-livepeer/media.gatherIncomingTracks",
96107
}
97108
ignoreAnywhereFuncs := []string{
98109
// glog’s file flusher often has syscall/os.* on top
@@ -104,7 +115,6 @@ func IgnoreRoutines() []goleak.Option {
104115
res = append(res, goleak.IgnoreTopFunction(f))
105116
}
106117
for _, f := range ignoreAnywhereFuncs {
107-
// ignore if these function signatures appear anywhere in the call stack
108118
res = append(res, goleak.IgnoreAnyFunction(f))
109119
}
110120
return res

core/accounting.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ func (b *Balance) Balance() *big.Rat {
6666

6767
// AddressBalances holds credit balances for ETH addresses
6868
type AddressBalances struct {
69-
balances map[ethcommon.Address]*Balances
70-
mtx sync.Mutex
71-
ttl time.Duration
69+
balances map[ethcommon.Address]*Balances
70+
mtx sync.Mutex
71+
sharedBalMtx sync.Mutex
72+
ttl time.Duration
7273
}
7374

7475
// NewAddressBalances creates a new AddressBalances instance
@@ -99,6 +100,47 @@ func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Ra
99100
return a.balancesForAddr(addr).Balance(id)
100101
}
101102

103+
// compares expected balance with current balance and updates accordingly with the expected balance being the target
104+
// returns the difference and if minimum balance was covered
105+
// also returns if balance was reset to zero because expected was zero
106+
func (a *AddressBalances) CompareAndUpdateBalance(addr ethcommon.Address, id ManifestID, expected *big.Rat, minimumBal *big.Rat) (*big.Rat, *big.Rat, bool, bool) {
107+
a.sharedBalMtx.Lock()
108+
defer a.sharedBalMtx.Unlock()
109+
current := a.balancesForAddr(addr).Balance(id)
110+
if current == nil {
111+
//create a balance of 1 to start tracking
112+
a.Debit(addr, id, big.NewRat(0, 1))
113+
current = a.balancesForAddr(addr).Balance(id)
114+
}
115+
if expected == nil {
116+
expected = big.NewRat(0, 1)
117+
}
118+
diff := new(big.Rat).Sub(expected, current)
119+
120+
if diff.Sign() > 0 {
121+
a.Credit(addr, id, diff)
122+
} else {
123+
a.Debit(addr, id, new(big.Rat).Abs(diff))
124+
}
125+
126+
var resetToZero bool
127+
if expected.Sign() == 0 {
128+
a.Debit(addr, id, current)
129+
130+
resetToZero = true
131+
}
132+
133+
//get updated balance after changes
134+
current = a.balancesForAddr(addr).Balance(id)
135+
136+
var minimumBalCovered bool
137+
if current.Cmp(minimumBal) >= 0 {
138+
minimumBalCovered = true
139+
}
140+
141+
return current, diff, minimumBalCovered, resetToZero
142+
}
143+
102144
// StopCleanup stops the cleanup loop for all balances
103145
func (a *AddressBalances) StopCleanup() {
104146
a.mtx.Lock()

core/accounting_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,97 @@ func TestBalancesCleanup(t *testing.T) {
265265
// Now balance for mid1 should be cleaned as well
266266
assert.Nil(b.Balance(mid1))
267267
}
268+
269+
func TestAddressBalances_CompareAndUpdateBalance(t *testing.T) {
270+
addr := ethcommon.BytesToAddress([]byte("foo"))
271+
mid := ManifestID("some manifestID")
272+
balances := NewAddressBalances(1 * time.Minute)
273+
defer balances.StopCleanup()
274+
275+
assert := assert.New(t)
276+
277+
// Test 1: Balance doesn't exist - should initialize to 1 and then update to expected
278+
expected := big.NewRat(10, 1)
279+
minimumBal := big.NewRat(5, 1)
280+
current, diff, minimumBalCovered, resetToZero := balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
281+
282+
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
283+
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be expected - initial (10 - 1)")
284+
assert.True(minimumBalCovered, "Minimum balance should be covered when going from 1 to 10")
285+
assert.False(resetToZero, "Should not be reset to zero")
286+
287+
// Test 2: Expected > Current (Credit scenario)
288+
expected = big.NewRat(20, 1)
289+
minimumBal = big.NewRat(15, 1)
290+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
291+
292+
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
293+
assert.Zero(big.NewRat(10, 1).Cmp(diff), "Diff should be 20 - 10 = 10")
294+
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing threshold")
295+
assert.False(resetToZero, "Should not be reset to zero")
296+
297+
// Test 3: Expected < Current (Debit scenario)
298+
expected = big.NewRat(5, 1)
299+
minimumBal = big.NewRat(3, 1)
300+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
301+
302+
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
303+
assert.Zero(big.NewRat(-15, 1).Cmp(diff), "Diff should be 5 - 20 = -15")
304+
assert.True(minimumBalCovered, "Minimum balance should still be covered")
305+
assert.False(resetToZero, "Should not be reset to zero")
306+
307+
// Test 4: Expected == Current (No change)
308+
expected = big.NewRat(5, 1)
309+
minimumBal = big.NewRat(3, 1)
310+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
311+
312+
assert.Zero(expected.Cmp(current), "Balance should remain the same")
313+
assert.Zero(big.NewRat(0, 1).Cmp(diff), "Diff should be 0")
314+
assert.True(minimumBalCovered, "Minimum balance should still be covered")
315+
assert.False(resetToZero, "Should not be reset to zero")
316+
317+
// Test 5: Reset to zero (current > 0, expected = 0)
318+
balances.Credit(addr, mid, big.NewRat(5, 1)) // Set current to 10
319+
expected = big.NewRat(0, 1)
320+
minimumBal = big.NewRat(3, 1)
321+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
322+
323+
assert.Zero(expected.Cmp(current), "Balance should be reset to zero")
324+
assert.Zero(big.NewRat(-10, 1).Cmp(diff), "Diff should be 0 - 10 = -10")
325+
assert.False(minimumBalCovered, "Minimum balance should not be covered when resetting to zero")
326+
assert.True(resetToZero, "Should be marked as reset to zero")
327+
328+
// Test 6: Minimum balance covered threshold - just below to just above
329+
expected = big.NewRat(2, 1)
330+
minimumBal = big.NewRat(5, 1)
331+
balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal) // Set to 2
332+
333+
expected = big.NewRat(5, 1)
334+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
335+
336+
assert.Zero(expected.Cmp(current), "Balance should be updated to 5")
337+
assert.Zero(big.NewRat(3, 1).Cmp(diff), "Diff should be 5 - 2 = 3")
338+
assert.True(minimumBalCovered, "Minimum balance should be covered when crossing from below to at threshold")
339+
assert.False(resetToZero, "Should not be reset to zero")
340+
341+
// Test 7: Minimum balance not covered - already above threshold
342+
expected = big.NewRat(10, 1)
343+
minimumBal = big.NewRat(5, 1)
344+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
345+
346+
assert.Zero(expected.Cmp(current), "Balance should be updated to 10")
347+
assert.Zero(big.NewRat(5, 1).Cmp(diff), "Diff should be 10 - 5 = 5")
348+
assert.True(minimumBalCovered, "Minimum balance should still be covered")
349+
assert.False(resetToZero, "Should not be reset to zero")
350+
351+
// Test 8: Negative balance handling
352+
balances.Debit(addr, mid, big.NewRat(20, 1)) // Force negative: 10 - 20 = -10
353+
expected = big.NewRat(5, 1)
354+
minimumBal = big.NewRat(3, 1)
355+
current, diff, minimumBalCovered, resetToZero = balances.CompareAndUpdateBalance(addr, mid, expected, minimumBal)
356+
357+
assert.Zero(expected.Cmp(current), "Balance should be updated to expected value")
358+
assert.Zero(big.NewRat(15, 1).Cmp(diff), "Diff should be 5 - (-10) = 15")
359+
assert.True(minimumBalCovered, "Minimum balance should be covered when going from negative to positive above minimum")
360+
assert.False(resetToZero, "Should not be reset to zero")
361+
}

core/ai_orchestrator.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,8 +1163,8 @@ func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string)
11631163
func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string) error {
11641164
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
11651165
if ok {
1166-
cap.mu.Lock()
1167-
defer cap.mu.Unlock()
1166+
cap.Mu.Lock()
1167+
defer cap.Mu.Unlock()
11681168

11691169
cap.Load++
11701170
return nil
@@ -1176,8 +1176,8 @@ func (orch *orchestrator) ReserveExternalCapabilityCapacity(extCapability string
11761176
func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) error {
11771177
cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]
11781178
if ok {
1179-
cap.mu.Lock()
1180-
defer cap.mu.Unlock()
1179+
cap.Mu.Lock()
1180+
defer cap.Mu.Unlock()
11811181

11821182
cap.Load--
11831183
return nil
@@ -1200,6 +1200,12 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
12001200
return nil, err
12011201
}
12021202

1203+
//ensure price numerator and denominator can be int64
1204+
jobPrice, err = common.PriceToInt64(jobPrice)
1205+
if err != nil {
1206+
return nil, fmt.Errorf("invalid job price: %w", err)
1207+
}
1208+
12031209
return &net.PriceInfo{
12041210
PricePerUnit: jobPrice.Num().Int64(),
12051211
PixelsPerUnit: jobPrice.Denom().Int64(),

0 commit comments

Comments
 (0)