-
Notifications
You must be signed in to change notification settings - Fork 204
BYOC: add streaming #3727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BYOC: add streaming #3727
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3727 +/- ##
===================================================
+ Coverage 31.67056% 33.04596% +1.37540%
===================================================
Files 159 160 +1
Lines 38951 40250 +1299
===================================================
+ Hits 12336 13301 +965
- Misses 25722 25906 +184
- Partials 893 1043 +150
... and 4 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
fdc7fc4 to
34e53b9
Compare
e862a20 to
33fda51
Compare
…able live payments
server/job_stream.go
Outdated
|
|
||
| //start payment monitoring | ||
| go func() { | ||
| stream, _ := h.node.ExternalCapabilities.Streams[orchJob.Req.ID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this accesses ExternalCapabilities.Streams map directly without holding the capm mutex. any concurrent map access will throw panics when streams are added/removed unless we acquire the lock first. I think we should follow a pattern similar to how StreamExists() is used - adding a helper GetStream(streamID) could be useful for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that we're re-using this - we should probably re-capture each time instead of re-using stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in e9a4c9d
server/job_stream.go
Outdated
| } | ||
|
|
||
| clog.Infof(ctx, "Insufficient balance, stopping stream %s for sender %s", orchJob.Req.ID, orchJob.Sender) | ||
| _, exists := h.node.ExternalCapabilities.Streams[orchJob.Req.ID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see previous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in e9a4c9d
server/job_stream.go
Outdated
|
|
||
| //check if stream still exists | ||
| // if not, send stop to worker and exit monitoring | ||
| stream, exists := h.node.ExternalCapabilities.Streams[orchJob.Req.ID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in e9a4c9d
| priceInfo := sess.OrchestratorInfo.PriceInfo | ||
| var paymentProcessor *LivePaymentProcessor | ||
| if priceInfo != nil && priceInfo.PricePerUnit != 0 { | ||
| if priceInfo != nil && priceInfo.PricePerUnit != 0 && sess.OrchestratorInfo.AuthToken != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment documenting why we're using sess.OrchestratorInfo.AuthToken check here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 17357af
2b02214 to
db1df66
Compare
d9aac9a to
421ba70
Compare
|
Pending passing checks/tests 👍 Code LGTM. |
What does this pull request do? Explain your changes. (required)
Adds configurable streaming for BYOC entrypoint to go-livepeer. Uses trickle protocol to handle streaming for similar entrypoints and outputs from go-livepeer as live-video-to-video.
Streams can be any or a mix of the following:
Control and Events channels are created for every stream.
Streams are created with a POST request to
/ai/stream/startthat will start the stream and reserve the capacity with an Orchestrator that is providing the BYOC capability. If video ingress is enabled, the client should then start a stream with WHIP or RTMP to the provided ingress URLs provided in the response. URLs for egress video, data, updates (control) and events are also included in the response as well as the stream_id. The stream_id is an integral part of the URLs provided to interact with the stream and is combined with a provided stream name in the /ai/stream/start request.Streams are stopped with a POST request to
/ai/stream/stop. Orchestrators and Gateways track payment balance and the Gateway adjusts to the Orchestrators provided balance in new JobTokens provided at each payment interval every minute. Orchestrators will shutdown a stream when payment balance is zero.Specific updates (required)
job_stream.goandjob_stream_test.gojob_rpc.goto reuse stream setup where made sensecommon/testutil.go.How did you test each of these updates (required)
Used
byoc-streamto test end to end: https://github.com/ad-astra-video/livepeer-app-pipelines/tree/main/byoc-streamAdded tests to
job_stream_test.goand some additional tests tojob_rpc_test.go.Does this pull request close any open issues?
Checklist:
makeruns successfully./test.shpass