-
Notifications
You must be signed in to change notification settings - Fork 203
BYOC: add streaming #3727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
BYOC: add streaming #3727
Changes from 59 commits
a139b2f
b2507bf
8b7c61b
00fd90c
3a6d24a
a1323ee
642814d
5a07065
35ff4ac
aa2f0b8
3e166c5
699f778
bc624ee
9ac928d
ba8525c
33c05ae
e52582c
8a7ec3f
fa105de
5e8676b
b78405d
80b56ba
56def52
b480f78
469da95
c6cfec1
a1b9a57
9c51a2f
015c433
3f3b156
8bd0849
cbaba08
3bb3000
c4c9bd6
2455161
e2e23ed
f8198f8
9aacc0d
12f8895
8246825
358cea2
ec00c5e
85e5c25
9182ef8
55666ef
ddf5639
f79d2fe
30af123
d648a59
da54cbf
dcc98b1
5881ea9
6fb5308
1ca67c4
1c97122
a6c8d88
b1771b7
3d01cc3
9c94845
53d17d1
5962060
a71ba37
e232088
4c9bf64
e26e441
9aa7ec7
f68e81c
f8189f0
bb7decf
e9a4c9d
f7c2664
f01e843
db1df66
f8beda8
6776014
17357af
a7c70fb
421ba70
2eca26b
dc5a34f
59e5a46
a0e3b57
41bbe78
2c08005
bf2fec3
071ac17
229effe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,33 @@ | ||
| package core | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "math/big" | ||
|
|
||
| "sync" | ||
|
|
||
| ethcommon "github.com/ethereum/go-ethereum/common" | ||
| "github.com/golang/glog" | ||
| "github.com/livepeer/go-livepeer/net" | ||
| "github.com/livepeer/go-livepeer/trickle" | ||
| ) | ||
|
|
||
| type JobToken struct { | ||
| SenderAddress *JobSender `json:"sender_address,omitempty"` | ||
| TicketParams *net.TicketParams `json:"ticket_params,omitempty"` | ||
| Balance int64 `json:"balance,omitempty"` | ||
| Price *net.PriceInfo `json:"price,omitempty"` | ||
| ServiceAddr string `json:"service_addr,omitempty"` | ||
|
|
||
| LastNonce uint32 | ||
| } | ||
| type JobSender struct { | ||
| Addr string `json:"addr"` | ||
| Sig string `json:"sig"` | ||
| } | ||
|
|
||
| type ExternalCapability struct { | ||
| Name string `json:"name"` | ||
| Description string `json:"description"` | ||
|
|
@@ -25,13 +43,144 @@ type ExternalCapability struct { | |
| Load int | ||
| } | ||
|
|
||
| type StreamInfo struct { | ||
| StreamID string | ||
| Capability string | ||
|
|
||
| //Orchestrator fields | ||
| Sender ethcommon.Address | ||
| StreamRequest []byte | ||
| pubChannel *trickle.TrickleLocalPublisher | ||
| subChannel *trickle.TrickleLocalPublisher | ||
| controlChannel *trickle.TrickleLocalPublisher | ||
| eventsChannel *trickle.TrickleLocalPublisher | ||
| dataChannel *trickle.TrickleLocalPublisher | ||
| //Stream fields | ||
| JobParams string | ||
| StreamCtx context.Context | ||
| CancelStream context.CancelFunc | ||
|
|
||
| sdm sync.Mutex | ||
| } | ||
|
|
||
| func (sd *StreamInfo) IsActive() bool { | ||
| sd.sdm.Lock() | ||
| defer sd.sdm.Unlock() | ||
| if sd.StreamCtx.Err() != nil { | ||
| return false | ||
| } | ||
|
|
||
| if sd.controlChannel == nil { | ||
| return false | ||
| } | ||
|
|
||
| return true | ||
| } | ||
|
|
||
| func (sd *StreamInfo) UpdateParams(params string) { | ||
| sd.sdm.Lock() | ||
| defer sd.sdm.Unlock() | ||
| sd.JobParams = params | ||
| } | ||
|
|
||
| func (sd *StreamInfo) SetChannels(pub, sub, control, events, data *trickle.TrickleLocalPublisher) { | ||
| sd.sdm.Lock() | ||
| defer sd.sdm.Unlock() | ||
| sd.pubChannel = pub | ||
| sd.subChannel = sub | ||
| sd.controlChannel = control | ||
| sd.eventsChannel = events | ||
| sd.dataChannel = data | ||
| } | ||
|
|
||
| type ExternalCapabilities struct { | ||
| capm sync.Mutex | ||
| Capabilities map[string]*ExternalCapability | ||
| Streams map[string]*StreamInfo | ||
| } | ||
|
|
||
| func NewExternalCapabilities() *ExternalCapabilities { | ||
| return &ExternalCapabilities{Capabilities: make(map[string]*ExternalCapability)} | ||
| return &ExternalCapabilities{Capabilities: make(map[string]*ExternalCapability), | ||
| Streams: make(map[string]*StreamInfo), | ||
| } | ||
| } | ||
|
|
||
| func (extCaps *ExternalCapabilities) AddStream(streamID string, capability string, streamReq []byte) (*StreamInfo, error) { | ||
| extCaps.capm.Lock() | ||
| defer extCaps.capm.Unlock() | ||
| _, ok := extCaps.Streams[streamID] | ||
| if ok { | ||
| return nil, fmt.Errorf("stream already exists: %s", streamID) | ||
| } | ||
|
|
||
| //add to streams | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| stream := StreamInfo{ | ||
| StreamID: streamID, | ||
| Capability: capability, | ||
| StreamRequest: streamReq, | ||
| StreamCtx: ctx, | ||
| CancelStream: cancel, | ||
| } | ||
| extCaps.Streams[streamID] = &stream | ||
|
|
||
| //clean up when stream ends | ||
| go func() { | ||
| <-ctx.Done() | ||
|
|
||
| //orchestrator channels shutdown | ||
| if stream.pubChannel != nil { | ||
|
||
| if err := stream.pubChannel.Close(); err != nil { | ||
| glog.Errorf("error closing pubChannel for stream=%s: %v", streamID, err) | ||
| } | ||
| } | ||
| if stream.subChannel != nil { | ||
| if err := stream.subChannel.Close(); err != nil { | ||
| glog.Errorf("error closing subChannel for stream=%s: %v", streamID, err) | ||
| } | ||
| } | ||
| if stream.controlChannel != nil { | ||
| if err := stream.controlChannel.Close(); err != nil { | ||
| glog.Errorf("error closing controlChannel for stream=%s: %v", streamID, err) | ||
| } | ||
| } | ||
| if stream.eventsChannel != nil { | ||
| if err := stream.eventsChannel.Close(); err != nil { | ||
| glog.Errorf("error closing eventsChannel for stream=%s: %v", streamID, err) | ||
| } | ||
| } | ||
| if stream.dataChannel != nil { | ||
| if err := stream.dataChannel.Close(); err != nil { | ||
| glog.Errorf("error closing dataChannel for stream=%s: %v", streamID, err) | ||
| } | ||
| } | ||
| return | ||
| }() | ||
|
|
||
| return &stream, nil | ||
| } | ||
|
|
||
| func (extCaps *ExternalCapabilities) RemoveStream(streamID string) { | ||
| extCaps.capm.Lock() | ||
| defer extCaps.capm.Unlock() | ||
|
|
||
| streamInfo, ok := extCaps.Streams[streamID] | ||
| if ok { | ||
| //confirm stream context is canceled before deleting | ||
| if streamInfo.StreamCtx.Err() == nil { | ||
| streamInfo.CancelStream() | ||
| } | ||
| } | ||
|
|
||
| delete(extCaps.Streams, streamID) | ||
| } | ||
|
|
||
| func (extCaps *ExternalCapabilities) StreamExists(streamID string) bool { | ||
| extCaps.capm.Lock() | ||
| defer extCaps.capm.Unlock() | ||
|
|
||
| _, ok := extCaps.Streams[streamID] | ||
| return ok | ||
| } | ||
|
|
||
| func (extCaps *ExternalCapabilities) RemoveCapability(extCap string) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should/need to acquire the mutex sd.sdm.lock() here before reading or there is a possiblility of reading nil, old, or partial data
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d73fe37