From 307772b99e01fd15357ae6937dd96fd4a2c7fcb4 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 17 Jul 2025 10:35:40 -0500 Subject: [PATCH 01/12] add data channel --- server/ai_http.go | 5 ++ server/ai_live_video.go | 121 ++++++++++++++++++++++++++++- server/ai_mediaserver.go | 163 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 287 insertions(+), 2 deletions(-) diff --git a/server/ai_http.go b/server/ai_http.go index 97a68a6530..d655aff191 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -145,6 +145,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subUrl = pubUrl + "-out" controlUrl = pubUrl + "-control" eventsUrl = pubUrl + "-events" + //dataUrl = pubUrl + "-data" ) // Handle initial payment, the rest of the payments are done separately from the stream processing @@ -180,6 +181,8 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { controlPubCh.CreateChannel() eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json") eventsCh.CreateChannel() + dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/json") + dataCh.CreateChannel() // Start payment receiver which accounts the payments and stops the stream if the payment is insufficient priceInfo := payment.GetExpectedPrice() @@ -231,6 +234,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl) subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl) publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl) + //dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) workerReq := worker.LiveVideoToVideoParams{ ModelId: req.ModelId, @@ -255,6 +259,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() controlPubCh.Close() eventsCh.Close() + dataCh.Close() cancel() respondWithError(w, err.Error(), http.StatusInternalServerError) return diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 2acd3d40f2..276837a626 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -760,6 +760,125 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar }() } +func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { + // subscribe to the outputs and send them into LPMS + subscriber := trickle.NewTrickleSubscriber(url.String()) + + // Set up output buffers + rbc := media.RingBufferConfig{BufferLen: 50_000_000} // 50 MB buffer + outWriter, err := media.NewRingBuffer(&rbc) + if err != nil { + stopProcessing(ctx, params, fmt.Errorf("ringbuffer init failed: %w", err)) + return + } + + // Store data segments for SSE endpoint + stream := params.liveParams.stream + dataStore := getDataStore(stream) + + // read segments from trickle subscription + go func() { + defer outWriter.Close() + defer removeDataStore(stream) // Clean up when done + + var err error + firstSegment := true + + retries := 0 + // we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length + const retryPause = 300 * time.Millisecond + const maxRetries = 5 + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "trickle subscribe done") + return + default: + } + if !params.inputStreamExists() { + clog.Infof(ctx, "trickle subscribe stopping, input stream does not exist.") + break + } + var segment *http.Response + clog.V(8).Infof(ctx, "trickle subscribe read data await") + segment, err = subscriber.Read() + if err != nil { + if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) { + stopProcessing(ctx, params, fmt.Errorf("trickle subscribe stopping, stream not found, err=%w", err)) + return + } + var sequenceNonexistent *trickle.SequenceNonexistent + if errors.As(err, &sequenceNonexistent) { + // stream exists but segment doesn't, so skip to leading edge + subscriber.SetSeq(sequenceNonexistent.Latest) + } + // TODO if not EOS then signal a new orchestrator is needed + err = fmt.Errorf("trickle subscribe error reading: %w", err) + clog.Infof(ctx, "%s", err) + if retries > maxRetries { + stopProcessing(ctx, params, errors.New("trickle subscribe stopping, retries exceeded")) + return + } + retries++ + params.liveParams.sendErrorEvent(err) + time.Sleep(retryPause) + continue + } + retries = 0 + seq := trickle.GetSeq(segment) + clog.V(8).Infof(ctx, "trickle subscribe read data received seq=%d", seq) + copyStartTime := time.Now() + + // Read segment data and store it for SSE + body, err := io.ReadAll(segment.Body) + segment.Body.Close() + if err != nil { + clog.InfofErr(ctx, "trickle subscribe error reading segment body seq=%d", seq, err) + subscriber.SetSeq(seq) + retries++ + continue + } + + // Store the raw segment data for SSE endpoint + dataStore.Store(body) + + // Write to output buffer using the body data + n, err := outWriter.Write(body) + if err != nil { + if errors.Is(err, context.Canceled) { + clog.Info(ctx, "trickle subscribe stopping - context canceled") + return + } + + clog.InfofErr(ctx, "trickle subscribe error writing to output buffer seq=%d", seq, err) + subscriber.SetSeq(seq) + retries++ + continue + } + if firstSegment { + firstSegment = false + delayMs := time.Since(params.liveParams.startTime).Milliseconds() + if monitor.Enabled { + monitor.AIFirstSegmentDelay(delayMs, params.liveParams.sess.OrchestratorInfo) + monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + "type": "gateway_receive_first_data_segment", + "timestamp": time.Now().UnixMilli(), + "stream_id": params.liveParams.streamID, + "pipeline_id": params.liveParams.pipelineID, + "request_id": params.liveParams.requestID, + "orchestrator_info": map[string]interface{}{ + "address": params.liveParams.sess.Address(), + "url": params.liveParams.sess.Transcoder(), + }, + }) + } + } + + clog.V(8).Info(ctx, "trickle subscribe read data completed", "seq", seq, "bytes", humanize.Bytes(uint64(n)), "took", time.Since(copyStartTime)) + } + }() +} + func (a aiRequestParams) inputStreamExists() bool { if a.node == nil { return false @@ -795,7 +914,7 @@ const maxInflightSegments = 3 // If inflight max is hit, returns true, false otherwise. func (s *SlowOrchChecker) BeginSegment() (int, bool) { // Returns `false` if there are multiple segments in-flight - // this means the orchestrator is slow reading them + // this means the orchestrator is slow reading // If all-OK, returns `true` s.mu.Lock() defer s.mu.Unlock() diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index e65903525f..bd638bfd3b 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "strings" + "sync" "time" "github.com/livepeer/go-livepeer/monitor" @@ -109,6 +110,9 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { ls.HTTPMux.Handle("OPTIONS /live/video-to-video/{streamId}/status", ls.WithCode(http.StatusNoContent)) ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus()) + // Stream data SSE endpoint + ls.HTTPMux.Handle("/live/video-to-video/{stream}/data", ls.GetLiveVideoToVideoData()) + //API for dynamic capabilities ls.HTTPMux.Handle("/process/request/", ls.SubmitJob()) @@ -773,16 +777,21 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ if err != nil { return fmt.Errorf("invalid events URL: %w", err) } + data, err := common.AppendHostname(strings.Replace(*resp.JSON200.EventsUrl, "-events", "-data", 1), host) + if err != nil { + return fmt.Errorf("invalid data URL: %w", err) + } if resp.JSON200.ManifestId != nil { ctx = clog.AddVal(ctx, "manifest_id", *resp.JSON200.ManifestId) params.liveParams.manifestID = *resp.JSON200.ManifestId } - clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events) + clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s data %s", pub, sub, control, events, data) startControlPublish(ctx, control, params) startTricklePublish(ctx, pub, params, params.liveParams.sess) startTrickleSubscribe(ctx, sub, params, params.liveParams.sess) startEventsSubscribe(ctx, events, params, params.liveParams.sess) + startDataSubscribe(ctx, data, params, params.liveParams.sess) return nil } @@ -1304,3 +1313,155 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler { }() }) } + +// DataSegmentStore stores data segments for SSE streaming +type DataSegmentStore struct { + streamID string + segments chan []byte + mu sync.RWMutex + closed bool +} + +func NewDataSegmentStore(streamID string) *DataSegmentStore { + return &DataSegmentStore{ + streamID: streamID, + segments: make(chan []byte, 100), // Buffer up to 100 segments + } +} + +func (d *DataSegmentStore) Store(data []byte) { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return + } + select { + case d.segments <- data: + default: + // Channel is full, drop oldest segment + select { + case <-d.segments: + default: + } + select { + case d.segments <- data: + default: + } + } +} + +func (d *DataSegmentStore) Subscribe() <-chan []byte { + return d.segments +} + +func (d *DataSegmentStore) Close() { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.segments) + } +} + +// Global store for data segments by stream ID +var dataStores = make(map[string]*DataSegmentStore) +var dataStoresMu sync.RWMutex + +func getDataStore(stream string) *DataSegmentStore { + dataStoresMu.RLock() + store, exists := dataStores[stream] + dataStoresMu.RUnlock() + if exists { + return store + } + + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + // Double-check after acquiring write lock + if store, exists := dataStores[stream]; exists { + return store + } + + store = NewDataSegmentStore(stream) + dataStores[stream] = store + return store +} + +func removeDataStore(stream string) { + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + if store, exists := dataStores[stream]; exists { + store.Close() + delete(dataStores, stream) + } +} + +// @Summary Get Live Stream Data +// @Param streamId path string true "Stream ID" +// @Success 200 +// @Router /live/video-to-video/{stream}/data [get] +func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + stream := r.PathValue("stream") + if stream == "" { + http.Error(w, "stream name is required", http.StatusBadRequest) + return + } + if r.Method == http.MethodOptions { + corsHeaders(w, r.Method) + w.WriteHeader(http.StatusNoContent) + return + } + + ctx := r.Context() + ctx = clog.AddVal(ctx, "stream", stream) + + // Get the data store for this stream + dataStore := getDataStore(stream) + if dataStore == nil { + http.Error(w, "Stream not found", http.StatusNoContent) + return + } + + // Set up SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Get the subscription channel + dataChan := dataStore.Subscribe() + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + clog.Infof(ctx, "Starting SSE data stream for stream=%s", stream) + + // Send keep-alive ping initially + fmt.Fprintf(w, "event: ping\ndata: {\"type\":\"connected\"}\n\n") + flusher.Flush() + + // Stream data segments as SSE events + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "SSE data stream client disconnected") + return + case data, ok := <-dataChan: + if !ok { + // Channel closed, stream ended + fmt.Fprintf(w, "event: end\ndata: {\"type\":\"stream_ended\"}\n\n") + flusher.Flush() + return + } + + // Send the segment data as a data event + fmt.Fprintf(w, "data: %s\n\n", string(data)) + flusher.Flush() + } + } + }) +} From 862051afa1f67097bd30bc831d0dcdb60f356f1d Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 22 Jul 2025 14:08:56 -0500 Subject: [PATCH 02/12] update to new trickle subscriber api --- server/ai_live_video.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 276837a626..f597a997dd 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -762,7 +762,14 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { // subscribe to the outputs and send them into LPMS - subscriber := trickle.NewTrickleSubscriber(url.String()) + subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{ + URL: url.String(), + Ctx: ctx, + }) + if err != nil { + clog.Infof(ctx, "Failed to create trickle subscriber: %s", err) + return + } // Set up output buffers rbc := media.RingBufferConfig{BufferLen: 50_000_000} // 50 MB buffer From e844fd0e353b5340a62c95ad1e8dfa00ab7865c1 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 22 Jul 2025 14:09:11 -0500 Subject: [PATCH 03/12] update ai-runner bindings --- ai/worker/runner.gen.go | 177 ++++++++++++++++++++-------------------- 1 file changed, 90 insertions(+), 87 deletions(-) diff --git a/ai/worker/runner.gen.go b/ai/worker/runner.gen.go index 68197ee283..b4c0826556 100644 --- a/ai/worker/runner.gen.go +++ b/ai/worker/runner.gen.go @@ -317,6 +317,9 @@ type LiveVideoToVideoParams struct { // ControlUrl URL for subscribing via Trickle protocol for updates in the live video-to-video generation params. ControlUrl *string `json:"control_url,omitempty"` + // DataUrl URL for publishing data via Trickle protocol for pipeline status and logs. + DataUrl *string `json:"data_url,omitempty"` + // EventsUrl URL for publishing events via Trickle protocol for pipeline status and logs. EventsUrl *string `json:"events_url,omitempty"` @@ -3160,93 +3163,93 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xdeW/ctrb/KoTeA5IAM97a3D4YuH84SxPj2onhpQtaYy5HOjPDWCJVkrI9zfN3f+Am", - "kRI1I7u229c7f2UscTnr7xySh8rXJGVFyShQKZL9r4lIF1Bg/fPg5PA954yr3xmIlJNSEkaTffUGgXqF", - "OIiSUQGoYBnkW8koKTkrgUsCeoxCzLvdzxdguxcgBJ6D6ieJzCHZT47FXP21LNUfQnJC58nd3Sjh8FtF", - "OGTJ/i961MumS01o3Y9Nv0Aqk7tRclBlhJ1aKruknAb0oxnjCKseaA4UOFatukzpFvpHnn+eJfu/fE3+", - "m8Ms2U/+a7uR5rYV5fYxZARfnB4ld5ejiCTsTJCZmbc63JrpfH4DniJMv2HZcjIHqhues3O4lYrcHi5C", - "ki7KnOHMUYNmJAckGZoCkhxT1XIKmZLJjPECy2Q/mRKK+TJp0ddV4igpQOIMS2xmneEqV/2/3iVtuRxk", - "GVE/cY6+sCki1ExGGLW0lFgIyNQfcgGoJCXkhIZ25OaK0aGUPSFZSEeHio/VfE7oHH2PU2cgh+9QpSZW", - "huLkUTorqac2TbPY1BxkxelEkgKExEUpQhokr6BDx6nug5o+ZvpFoBIk4VZuobOqLBlX1nSN8wrEPnoh", - "gEqgKbwYoRc3jGcvRkiZOTJEoSljOWCKXr5Qk79Q717McC7gxast9M5QhohA9vXLZrxXW64lKgBTgSjz", - "iNyys9l36vd4irXWmjae1CyX541k1sFAxzFidr/CPQ4LPIdzpv/p+se8IhmmKUxEinMI1PTd1uu2jt7T", - "lFUcz0FYS5E1hgAihX6R5kxAvkQ5oVeN8Sq9oZKzopTo5YLMF8Ct7lCBl4hDVqV2CPRbhXMil698uX2w", - "dKIzTWfNL62KKXDFL3EM9ni6GVsyRTmZLdENkYuOX/W7u5FfxNb1uJMVctztyvEdzDloYm4WJDVkNAhp", - "KCUClZVYaBHeYJ4J3YpQIgnOTZutNn1ovZhyxrFYAwkH6IidHqCXR+xmfIrpFTrIcCk1Mr2yisc0Q0QK", - "lDJuomOmvOwGyHwhteMaJrwAg97f4qLMYR99Rb8mOZZA5ThlVBChHG25nafFWFE3Ftlt/muyj3a3dkbo", - "14QCJ1/EdkluIR9jLsfu7d6dL4AjzdiT4WCHn4FQSGGOJbmGiTH+NUScN27yUrzS7lWRDNDNAkv1F9ym", - "eZUBmnFWRER8OKeMKwuaodAg0a/Vzs43Kdr1yf5kSUMnhrQY9VUxMX49KYHHeNhts/BJmxpiMwcIPkaU", - "wC17ASFVgQ5N4xPgHXIIlTA31qvpoTPgoFmT0Aotuzs7/fRkQBkRSse64xY6ZhzMb1SJCucKtQBrzLIQ", - "ZaHIsTKtJBI5uwGOairUMFmVa8+dLlW8ATqXiw5/rj0601THuPPFO8QqVtlkv04FnoFcTtIFpFeB8FTo", - "a0vvBLjCRBVIdTeku2lTFJIUGvdnbexSsFDlmUph2GwGVCgjYxwtMC9mVe6TeWZGfauJqYm10VpTC5B1", - "JXIG1i05phkrkMG3HlGoxlF5O10FUtjZ+p8euGYzk4o0aRouy5w0QY6D07HRzMsd9WY3CGRnbs4ONrfi", - "fukUaAJbJAEIIvv6DCCeIA8OmzXrjxY5HzFBrVUyFJb/EBr3T9nndS3drlPpwJzuB5IB66p01gLFf8QW", - "ZDOOCxAakAWkjGbavIM85FoN73P3fQ9uLXTYD+Z8/V10VtMSEYp0OBcDJv1oBo/NO9h26/iDzfg6fv6p", - "VmvIuH86UTDVejKt0iuQbSp2975rk3HhJlQq1qtNRZQSOS5YRaVSgBmzXm75CYXWmQmF6pWFWfWzULHT", - "9rwhea7AnlD9qqPCY9PsjSY6YMwP7YwImOBqPumB5Z29Tp5as6A7I5xlDRgHDJt0GX0MFh520cFBQDHN", - "ddrc29ckvDTlgIXjOwjxmoCDao76AX59+rL3+v9x9rLJK5wkbkjWst7dnb1vY3ioW94LDn/UY3dnvWeE", - "MaFjRYg5g3kBVB7QpVwQOt/rhpkpu41smqJcGxD6FmHO8RLNyTVQhAXCaMpu3RaA9TONiyPF/08///Qz", - "Mmjsc/uG3fauubuTHzq8F4b4hyI8FlcTQstKRvljN2MOguWVBjXVGOnGLabksiSp9kq9WMOo5HBNWCXU", - "j4ykujeR1q5GTVal/WL39uPtj+jlx3/++M+91//QJnl2cBxkksdq5kNN5l9u1VtUufJicTVhlawFuQIP", - "DlVuXcGokaCJKtzuCi5UAq4GNNuCuJiSeaWEaURvzEqMEJtJoOrPrEr1vh9ICdz2lAtMFeIQOs/BU0PA", - "laMcfTaUx8CDKqPKye8wSRnjmbgfeyUjVCLdk1AsQdQBtB63WVJgOgf0y85o99KaiO5t50VwW0IqTfMp", - "mAYchHqoHhn1ZaRQWMmoCCOWnQu9NTzEGPUn6zrDp9s96+VsZrmyimj5ws0COCDAqSUfEaU49PKn0c+v", - "GvQLEmndrE2Zl79rwnI8hTxC2JF+Xmc0AWmOml1EaEZSLX+smsKcs4pmtrWK9ztBkylOr/wmXXLNtCs2", - "xHM2J/Ie1mK6CVTRsfIAsWC5ynC0eZqxEKFCqqjPZopEjXH6fWTT+cjM3tXz0NjRiQkr4sdFWe+EPnDB", - "+cj7tI8DiJVhK3v4fuCaFPC71/9BG1iDpLnZyVqXcd5758g5Z8R/3y4qehXLe1L1QieoSpnaK3FzyNU9", - "P5Z2u6mb9OoBbKarR/VZDLc+Gl3XM/WM6V53BiYSCkXQnTdHPVY9kQ5jHUlKv6EizJOlEVREgh9OLt6y", - "oqwkHNJZ5Oz5uD6Ez0Biosz/w8kFSk0f/xi4K1QDXzXWxXMv/MWUMzSJ4he/bMDz1wIKxpeTGQcIOujH", - "6Hv1eEU3ySTOI/3O9fNoR0JbpOkH0e0AXAQ0fVJ/r91XUwKhpmVAZMiqk5EjyNNqS3lx9V5IkpPftYrW", - "qVhptmqaIyGxJEKSVDxQuc+ssWFqGCUejxNryX43T2LIyjc6nT+MoblvFEP5gIXwUJOIcRAlKLSWti1E", - "LObj+flJT42RejWwyMiAxfCCnLpeqFuQ887hjpk5QJy2/Oy0HtMNOz28/oBzkunhaq77WHHgvJKT9nge", - "khtOYjDuU9seIEY35tkN5trrrSwG1VEp/16J2POyUvmeqaSqC35OgjarmG8BksfZh7JCfSbnJ7uDjkNs", - "bu23P3HP1qFu2TSs5x01jPumE5HyCmWcSSzFIDVwwPlYRWutkFUgq8gSbtwHKqTt8y2lGLL/ulox7EfU", - "0kv4R8C5XLx1eXYoUTVcJeJp2UJ3RKaJS808yoBWhSL26PPBu8NPH5JR8vlfySh5f3r6+TQZJYfvjt77", - "hJ6ZqdaxbinyOfQYiPCnF433qp+MLVgi69wesbRXGX6eur7K0rc3UzaxLo+1tFy2+q2qrvROIu8lGJ3c", - "r5JL/7qgkYqu8lu7KGgn660EPcZBhNGjo+O3C0ZSiMWp3JRxDou4R0fHx6bUN7m7vBslM0KJWEw4YOGi", - "ibftUB+q6lbo1LSKbXbTDG6DzFA/iKd5ZvoHkty2Gj2PJ9FGVHE5HjfTh4JMGZVApc/EW/sotlnG8gDy", - "Tlm+Hu64aeRmCql2hMXJPoXfKhCR2ogC304kuwLaPqX7h78Lf4vOTZsVGhGDkx1fI36drx2m7egW3Hut", - "S4eX6N6J5ICLoJ+ucg0rVXAR3cqQUJTKVSsOrUPa73ynbRpFzkIlKyfhrs141+vMSvSvqERVv7JdguZ3", - "O1lbWlMrJbQSZwd9VtKgYMu6tU/cS8XWje787QQzSFfBKQcFioHz2EfRooghq0hnM2vNpHIOvYYh7QIX", - "xmxjK7DCTpDWlFfWI5302rpYidjedBGwKcoc9Iqtcd4GdtzLVV5rdtAi3c1e5KquenEZ6amX2P0do1t4", - "bphRhKfWTKH0PPHE5EeuQZ//2mPgE8yxsdouanOWTyqer9k0vzg90sFfVFNdqE/oHF0TjM45Sa/0URuT", - "LGW53ULP9ImKrRbJybUtGRlLNm7XxaBSE+cnA28NWeiCRy0WrpVt3oPosprmRCwUzaZvP+kuc3UJLaYZ", - "ytk8IO+9GaOHujmWcIOXE26wZv15hEqMDt+51PmD6Y5sd/RSUZUzc2RRVrxkAkRYOG97WHDrO6/FlMwG", - "0+NaK8L04bYijfF0AUJyLBkfQNdxPUb2B05rPuEC2ssKJBniFe3al3phfnxh0y30iUmSApK6tHpBBCIC", - "UVxAhtzkrnDKlaibXW8mF8ARZ5UEMdInKkSijIFAlElTA6pmwih6lGRqquAWp9I8eyleoQxKoJlAjIac", - "EOX1BVBpy01phgpdQDfVNUozMq84nuagLVb1/Ldxl38jzOeVK4EYtNSsMaCW9te7zrmkvXygG4ME7h2x", - "Ri4nWVyJIJB1ucZJw90pIQk1HCsftepllZwzc/al8hIlYjtMMKd51Od+pu8wKzf5j7Lx9dbs2kZl61AR", - "4vyesYqn4LNKaMqKkNV6DCSDcqCz+nmU4/bCOKAk1IMfQeIBYkAouddScTXyd1eO949HOtS4gsfV0z1h", - "gPGjomQuyKyKJ4PDyXDc/uzDcwDg2oOJcDFlOEY/rhcjyYb78X3DZ7O1sILd1SHysb34uZ14VUZ9jMWV", - "uJfvmr6u0K7HYf1yl7ZSOL4ZoYp6FU9NPZZAL03XV3V80QVc4XWvsJglLN9bu+/WGU+LIKr3lPG+fTwt", - "jxfCxOJMF0eY5ppuXe8UThlAtxl47TVwS5hwza1UL1u0r9Sv3jaMHOoX6oVTpsJXTEzNs3cXGU9ZJVtl", - "qbpfV+FUzG660/y4AOkqyM2EN1igWY7nc8gQFujT2fc/BuUmapjhJRRKE+qNqdLxy/3rGQeV7Ub9Wg2u", - "nNoUjTUspJiqLAynKQhh7orXx5QDnNi4rjCkaLH5+tTq6tPjxelRTJU62qhk3Fwp7aUy1Nhz89zmUjET", - "YfTxt511PYgYsvFsSkeG78mbQpC7VmVIbCfnabe+R47Hy7D3KmBQ7+0Nsr5dgL/PDfHHvKbUuX+94prS", - "5sr15sr13/fK9ev/6BvX6AxKrOWsrzyUZsdQl8DrzaIX//tCmYaoP1gyXTaF8Zsq1z/tXlUHvwfeq7IG", - "0wqxYQjtjbNnJUC66Au0ARc+ZB2gQuGJKAFfAUcZ5OQauFA6zhX450sEtyUHofWmwgSmWtWZ6gPpwlXr", - "KqPTtqoeZ7plSWSqPaezend/Kdm5qdUSVgLYdEv9ZcaP69Eb5Amvfw+hZFW0aJKy1SHCXDTS+zmrpurN", - "10J7CUwhYjBra+hylgYHjpgu7Xl/m8OvHZu+vPNjeNqqw2pOls1nzVpn0FEZ6gdNU00zOldP16Wuig8z", - "lW3pudaAur0fgAvrMK2z+yettRol183EXfixLx3snFaUAveMxFH9sBouN/XligE9Cd17X3b9Tqz5eMC6", - "pYy7aq/aBqupe5Y6tVdRrnDGELGm9MmS6stq9W6YjmFpxYlcnilSDJ8fz89P3gDmwOvvB+rAZx7Vgyyk", - "LJO7O121EysMP7DfDEnrz7zxiqKDw3on2N/6PSLXUCq0PThsTKi2u2Rna/ebrddKIqwEikuS7CffbO1u", - "7Sh1YbnQdG/rz4eNJRs7nCuZiCU89TfWvE/imWt8dkHKSmsOh5labbW/P2a3U9+wbNmq+DGJEeZyW2Um", - "Y/dpPKPndVYQ+9jZXahjlQbpB0ajmu29nZ0WFZ7Yt7/YeqxhJARraD13K7ep9H7IrMpR02yUfPuIJDQl", - "15H53+DMne6aeXefZ94Liiu5YJz8DpmeePeb55nY7dO/p1KtFM4ZQ0eYm4Kpb3dfPxf3TU6vocqEO0XC", - "3t6jktApf+8S0zRBdYn86+eyv0MqgVOcozPg18AdBR6O6rTER9BfLu8uR4moigLzpfuWJjpnyGVPeC4U", - "eLtYouD7dmyyUCyWY4oLGLNr4JxkGvoDdBgl2wtb0bztYHgOWgQhiPnl6MkTIkis7H0okNz5cnIDmbr/", - "kNO6qH0lq67E+8l5NRP9MS7dGIpNXcDdz555/ZR8eRXkD+PKkKi50atPFZTry9DxqHxQlvnS3YgOPjol", - "TIlJyZnKsrz1bCdMt74S9sRxOpjtmQN1WNO+idT9kXoToe4bocynZc4Zqr8vcM8QRULH8EFgQGau9/QM", - "DqxPzMOPyD2Pw/8ZiXnsgsfG6//i+fkGeh4MPQ9MjkngoT7wXNffj4wiz4fYVxPvlXS4r4w9DwaZ2Z4Z", - "hMLdpA38bJKOJ/D8+mt9D3N95xijZDsn1zAOi2DXLT+iCw+vqt6UN/pfQZYVp5AhoJn+UJaIQkS7PnEl", - "TDxcRz21zM+MEr3FmBvA2ADG4wGGMjMDFn8ENfK2ZxrkyIsBqYI+jq10yQdGOabzSkFYXe3QRYGj46dy", - "/OZC53M7u3d9cePfG/9+RP/W3nJvf84L48K2Wn+M7Qccx3v9Hm2/9Whrw/VtTUxXZPyRb0M+cdbfmfGZ", - "3Tysut84+sbRH8/Rnfc540Z7D/B70XWQUbKtIvSAo4cPraJtc8+3KYuMJ/VeMdwThfVuud3mlGHj9n8T", - "t9eFhn/gkEF67hc4uylZHLTVF3bx/4NM8/8aujvqbhNQNsWRmGZelWrwv0b2IIUpg3xSqAgqLZ8ZK8L/", - "w3SDFRuseHysqF3oYWBhu2u0qLxvtkdhwn43ul4JoOnS/ac4+taoFKj5rzGibt98efqJVwduok12sPH4", - "v4nHe19tv6erV7UzjJJtr3I9WkvV1JI/3aGZneJBhVRBZ6GlKbTsWv8HiKuefpuzKkNvWVFUlMil+4ZS", - "Yi9865ptsb+9nXHAxdh+oGkrt923UtVdX6PoGf9M6hSpb9h6IKHbbeOSbE9B4u1aeXeXd/8XAAD//5FW", - "1i+hfgAA", + "H4sIAAAAAAAC/+xdeW/ctrb/KoTeA+IAM97atO8ZuH84SxPj2onhpQtaw5cjnZlhLJEqSdme5vm7P3CT", + "SImakR3b7e2dvzKWuJz1dw7JQ+VLkrKiZBSoFMnel0Skcyiw/rl/fPCOc8bV7wxEykkpCaPJnnqDQL1C", + "HETJqABUsAzyzWSUlJyVwCUBPUYhZt3uZ3Ow3QsQAs9A9ZNE5pDsJUdipv5alOoPITmhs+TubpRw+L0i", + "HLJk71c96kXTpSa07scmnyGVyd0o2a8ywk4slV1STgL60ZRxhFUPNAMKHKtWXaZ0C/0jzz9Nk71fvyT/", + "zWGa7CX/tdVIc8uKcusIMoLPTw6Tu4tRRBJ2JsjMzJsdbs10Pr8BTxGmX7NscTkDqhuesTO4lYrcHi5C", + "ks7LnOHMUYOmJAckGZoAkhxT1XICmZLJlPECy2QvmRCK+SJp0ddV4igpQOIMS2xmneIqV/2/3CVtuexn", + "GVE/cY4+swki1ExGGLW0lFgIyNQfcg6oJCXkhIZ25OaK0aGUfUmykI4OFR+q2YzQGfoBp85ADt6iSk2s", + "DMXJo3RWUk9tmmaxqTnIitNLSQoQEhelCGmQvIIOHSe6D2r6mOnngUqQhFu5iU6rsmRcWdM1zisQe+iF", + "ACqBpvBihF7cMJ69GCFl5sgQhSaM5YAp2nihJn+h3r2Y4lzAi5eb6K2hDBGB7OuNZryXm64lKgBTgSjz", + "iNy0s9l36vd4grXWmjae1CyXZ41kVsFAxzFidr/EPQ4KPIMzpv/p+sesIhmmKVyKFOcQqOn7zVdtHb2j", + "Kas4noGwliJrDAFECv0izZmAfIFyQq8a41V6QyVnRSnRxpzM5sCt7lCBF4hDVqV2CPR7hXMiFy99ub23", + "dKJTTWfNL62KCXDFL3EM9ni6GVsyRTmZLtANkfOOX/W7u5FfxNb1uJdL5LjTleNbmHHQxNzMSWrIaBDS", + "UEoEKisx1yK8wTwTuhWhRBKcmzabbfrQajHljGOxAhL20SE72Ucbh+xmfILpFdrPcCk1Mr20isc0Q0QK", + "lDJuomOmvOwGyGwuteMaJrwAg97d4qLMYQ99Qb8lOZZA5ThlVBChHG2xlafFWFE3Ftlt/luyh3Y2t0fo", + "t4QCJ5/FVkluIR9jLsfu7e6dL4BDzdiT4WCHn4FQSGGGJbmGS2P8K4g4a9xkQ7zU7lWRDNDNHEv1F9ym", + "eZUBmnJWRER8MKOMKwuaotAg0W/V9vY3Kdrxyf5oSUPHhrQY9VVxafz6sgQe42GnzcJHbWqITR0g+BhR", + "ArfsBYRUBTowjY+Bd8ghVMLMWK+mh06Bg2ZNQiu07Gxv99OTAWVEKB3rjpvoiHEwv1ElKpwr1AKsMctC", + "lIUix8qkkkjk7AY4qqlQw2RVrj13slDxBuhMzjv8ufboVFMd484X7xCrWGaT/ToVeApycZnOIb0KhKdC", + "X1t6x8AVJqpAqrsh3U2bopCk0Lg/bWOXgoUqz1QKw6ZToEIZGeNojnkxrXKfzFMz6htNTE2sjdaaWoCs", + "K5FTsG7JMc1YgQy+9YhCNY7K2+kqkML25v/0wDWbmlSkSdNwWeakCXIcnI6NZja21ZudIJCdujk72NyK", + "+6VToAlskQQgiOyrM4B4gjw4bNasP1rkfMQEtVbJUFj+KjTun7LP61q6XaXSgTndjyQD1lXptAWK38UW", + "ZFOOCxAakAWkjGbavIM85FoN73P3Qw9uzXXYD+Z89X10VtMSEYp0OBcDJv1gBo/NO9h26/iDzfg6fv6p", + "VmvIuH86UTDV+nJSpVcg21Ts7H7fJuPcTahUrFebiiglclywikqlADNmvdzyEwqtMxMK1SsLs+pnoWKn", + "7XlD8lyBPaH6VUeFR6bZa010wJgf2hkRcImr2WUPLG/vdvLUmgXdGeEsa8A4YNiky+hDsPCwiw4OAopJ", + "rtPm3r4m4aUpBywc30GI1wTsVzPUD/Cr05fdV//G2cs6r3CSuCFZy3p3tne/jeGhbnkvOPxJj92d9Z4R", + "xoSOJSHmFGYFULlPF3JO6Gy3G2Ym7DayaYpybUDoW4Q5xws0I9dAERYIowm7dVsA1s80Lo4U/z//8vMv", + "yKCxz+1rdtu75u5OfuDwXhjiH4rwWFxdElpWMsofuxlzECyvNKipxkg3bjElFyVJtVfqxRpGJYdrwiqh", + "fmQk1b2JtHY1arIq7Rc7tx9uf0IbH/7x0z92X32nTfJ0/yjIJI/UzAeazL/cqreocuXF4uqSVbIW5BI8", + "OFC5dQWjRoImqnC7KzhXCbga0GwL4mJCZpUSphG9MSsxQmwqgao/syrV+34gJXDbU84xVYhD6CwHTw0B", + "V45y9MlQHgMPqowqJ3/AZcoYz8T92CsZoRLpnoRiCaIOoPW4zZIC0xmgX7dHOxfWRHRvOy+C2xJSaZpP", + "wDTgINRD9cioLyOFwkpGRRix7FzojeEhxqg/WdcZPt7uWi9nU8uVVUTLF27mwAEBTi35iCjFoY2fR7+8", + "bNAvSKR1szZlXv6uCcvxBPIIYYf6eZ3RBKQ5anYQoRlJtfyxagozziqa2dYq3m8HTSY4vfKbdMk10y7Z", + "EM/ZjMh7WIvpJlBFx8oDxJzlKsPR5mnGQoQKqaI+myoSNcbp95FN50Mze1fPQ2NHJyYsiR/nZb0T+sAF", + "5yPv0z4OIFaGrezh+4ErUsDvX/0HbWANkuZ6J2tVxnnvnSPnnBH/fTOv6FUs70nVC52gKmVqr8TNIVf3", + "/Fja7aZu0qsHsJmuHtVnMdz6aHRdz9QzpnvdGZhIKBRBd94c9Vj1RDqMdSQp/YaKME+WRlARCb4/Pn/D", + "irKScECnkbPno/oQPgOJiTL/98fnKDV9/GPgrlANfNVYF8+98GdTztAkip/9sgHPXwsoGF9cTjlA0EE/", + "Rj+ox0u6SSZxHul3pp9HOxLaIk0/iG4H4CKg6aP6e+W+mhIINS0DIkNWnYwcQZ5WW8qLq/dckpz8oVW0", + "SsVKs1XTHAmJJRGSpOKByn1mjQ1TwyjxeLy0lux38ySGrHyj0/nDGJr7RjGUD1gIDzWJGAdRgkJradtC", + "xGI+nJ0d99QYqVcDi4wMWAwvyKnrhboFOW8d7piZA8Rpy89O6zHdsNPD6484J5kerua6jxUHzks5aY/n", + "IbnhJAbjPrXtAWJ0Y57dYK693spiUB2V8u+liD0rK5XvmUqquuDnOGizjPkWIHmcvS8r1GdyfrI76DjE", + "5tZ++2P3bBXqlk3Det5Rw7hvOhEpL1HGqcRSDFIDB5yPVbTWClkGsoos4cZ9oELaPt9SiiH7r6sVw35E", + "Lb2EfwCcy/kbl2eHElXDVSKels11R2SauNTMowxoVShiDz/tvz34+D4ZJZ/+mYySdycnn06SUXLw9vCd", + "T+ipmWoV65Yin0OPgQh/etF4r/rJ2IIlss7tEUt7leHnqaurLH17M2UTq/JYS8tFq9+y6krvJPJegtHJ", + "/TK59K8LGqnoKr+Vi4J2st5K0GMcRBg9PDx6M2ckhVicyk0Z57CIe3h4dGRKfZO7i7tRMiWUiPklByxc", + "NPG2HepDVd0KnZhWsc1umsFtkBnqB/E0z0z/QJLbVqPn8STaiCoux6Nm+lCQKaMSqPSZeGMfxTbLWB5A", + "3gnLV8MdN43cTCHVjrA42SfwewUiUhtR4NtLya6Atk/pvvN34W/RmWmzRCNicLLja8Sv87XDtB3dgnuv", + "denwEt07kRxwEfTTVa5hpQouolsZEopSuWrFoXVI+73vtE2jyFmoZOVluGsz3vE6sxL9MypR1a9sl6D5", + "3Y5XltbUSgmtxNlBn5U0KNiybu0T91KxdaM7fzvBDNJVcMpBgWLgPPZRtChiyCrS2cxKM6mcQ69gSLvA", + "uTHb2AqssBOkNeWV9UgnvbYuliK2N10EbIoyB71ia5y3gR33cpnXmh20SHezF7msq15cRnrqJXZ/x+gW", + "nhtmFOGpNVMoPU88MfmRa9Dnv/YY+BhzbKy2i9qc5ZcVz1dsmp+fHOrgL6qJLtQndIauCUZnnKRX+qiN", + "SZay3G6hZ/pExVaL5OTaloyMJRu362JQqYnzk4E3hix0zqMWm2GJ70FyWU1yIuaKYtWzn2yXtbpkFtMM", + "5WwWkPZWjdBDF1wrn3kYZabv19H2zozRQ90MS7jBi0tuMHD1OYlK2A7eupT+vemObHe0oajKmTlKKSte", + "MgEiLOi3PSzo9p0jY0qmg+lxrRVh+tBdkcZ4OgchOZaMD6DrqB4j+4pTpI+4gPZyB0mGeEW7dq9emB+f", + "2WQTfWSSpICkLvmeE4GIQBQXkCE3uSvocqXzZjeeyTlwxFklQYz0SQ+RKGMgEGXS1KaqmTCKHnGZWi+4", + "xak0zzbES5RBCTQTiNGQE6LQqAAqbRkszVChC/smunZqSmYVx5MctMWqnv8ybvwvhPmscqUZg5bANTbV", + "0v5y1zkvtZcidGOQwL2j38ilKYt3EWS0Ltc4abhrJiShhmPlo1a9rJIzZs7kVL6kRGyHCeY0j/rcz/Qd", + "ZuUmL1M2vtqaXduobB1aQ5zfU1bxFHxWCU1ZEbJaj4FkUKZ0Wj+PctxesAeUhHrwI1s8cA0Icfdawi6P", + "SN0V7f3jpA6BrhBz+XTDA9+9A4wfrSVzQWZZPBkcTobj9icfngMA1x5MhIspwzH6cb0YSTbcj+8bPpst", + "jyXsLg+Rj+3Fz+3EyzL9IyyuxL181/R1BYA9DuuX4bSVwvHNCFXUq8Rq6sQE2jBdX9bxRReWhdfQwiKb", + "sKxw5X5gZzwtgqjeU8b79he1PF4IE4szXbRhmmu6dR1WOGUA3WbgldfTLWHCNbdSvWjRvlS/ejszUmxQ", + "qBdOmQpfMTG12N4daTxhlWyVy+p+XYVTMb3pTvPTHKSrbDcT3mCBpjmezSBDWKCPpz/8FJTBqGGGl3Yo", + "Tag3pnrIv4ZQzzionDjq12pw5dSmmK1hIcVUZWE4TUEIc4e9Pj4d4MTGdYUhRYvN16dWV58ez08OY6rU", + "0UYl4+aqay+Vocaem+c2l4qZCKOPvx2u61TEkA1xU9Iy/KzAFKjctSpWYjtMT7slP3I8XoS9lwGDem9v", + "tvXtTvx9bq4/5vWpzr3wJden1lfB11fB/75XwV/9R98ER6dQYi1nfRWjNDuGujRfbxa9+L8XyjRE/SGV", + "yaIp2F9X3/5p9706+D3wvpc1mFaIDUNob5w9LQHSeV+gDbjwIWsfFQpPRAn4CjjKICfXwIXSca7AP18g", + "uC05CK03FSYw1arOVB9I566KWBmdtlX1ONMtSyJT7Tmd1bv7S8nOTa2WsBLAplvqLzN+XI/eIE94LX0I", + "JcuiRZOULQ8R5gKU3s9ZNlVvvhbaS2AKEYNZWduXszQ4CMV0YesQ2hx+6dj0xZ0fw9NWfVhz4m0+t9Y6", + "G4/KUD9ommqa0Zl6uip1VXyYqWxLz7UG1BP+CFxYh2nVFDxpDdgouW4m7sKPfelg56SiFLhnJI7qh9WW", + "uakvlgzoSeje+7Krd2LNRw1WLWXcJwBU22A1dc8SrPYqyhX0GCJWlGRZUn1ZLd8N0zEsrTiRi1NFiuHz", + "w9nZ8WvAHHj9XUMd+MyjepC5lGVyd6eriWIF6/v2WyZp/fk5XlG0f1DvBPtbv4fkGkqFtvsHjQnVdpds", + "b+58s/m/SiKsBIpLkuwl32zubG4rdWE513Rv6c+ajSUbO5wrmYglPPW337xP9ZnrhXZBykprDgeZWm21", + "v4tmt1Nfs2zRqkQyiRHmcktlJmP3yT6j51VWEPsI212oY5UG6QdGo5rt3e3tFhWe2Lc+2zqxYSQEa2g9", + "dyu3qfR+yLTKUdNslHz7iCQ0peCR+V/jzJ3umnl3nmfec4orOWec/AGZnnjnm+eZ2O3Tv6NSrRTOGEOH", + "mJtCrm93Xj0X901Or6HKhDtFwu7uo5LQKcvvEtM0QXXp/qvnsr8DKoFTnKNT4NfAHQUejuq0xEfQXy/u", + "LkaJqIoC84X7xic6Y8hlT3gmFHi7WKLg+3ZsslAsFmOKCxiza+CcZBr6A3QYJVtzW2m95WB4BloEIYj5", + "ZfLJEyJIrBx/KJDc+XJyA5n7CCGndbH9UlZd6fmT82om+jou3RiKTV1Y3s+eef2UfHmV7Q/jypCoudGr", + "TxWU60va8ai8X5b5wt3UDj6GJUyJScmZyrK89WwnTLe+XvbEcTqY7ZkDdVhrv47U/ZF6HaHuG6HMJ2/O", + "GKq/e3DPEEVCx/BBYEBmrvf0DA6sTszDj9s9j8P/GYl57OLJ2uv/4vn5GnoeDD0PTI5J4KE+8FzX37WM", + "Is/72Ncc75V0uK+fPQ8GmdmeGYTC3aQ1/KyTjifw/Porgg9zfecYo2QrJ9cwDotgVy0/ogsPr6relDf6", + "X2eWFaeQIaCZ/oCXiEJEuz5xKUw8XEc9tczPjBK9xZhrwFgDxuMBhjIzAxZfgxp52zMNcuTFgFRBH8dW", + "uuQDoxzTWaUgrK526KLA4dFTOX5z0fS5nd27Vrn277V/P6J/a2+5tz/nhXFhW60/xvbDkuPdfo+236C0", + "teH6FimmSzL+yDcrnzjr78z4zG4eVt2vHX3t6I/n6M77nHGj3Qf4veg6yCjZUhF6wNHD+1bRtrnn25RF", + "xpN6rxjuicJ6t9xufcqwdvu/idvrQsOvOGSQnvsFzm5KFgdt9YVd/P+40/x/i+6OutsElE1xJKaZV6Ua", + "/G+WPUhhyiCfFCqCSstnxorw/1ZdY8UaKx4fK2oXehhY2O4aLSrvW/JRmLDfs65XAmiycP9Zj741KgVq", + "/suOqNs3X8R+4tWBm2idHaw9/m/i8d7X5O/p6lXtDKNky6tcj9ZSNbXkT3doZqd4UCFV0FloaQotu9b/", + "TeKqp9/krMrQG1YUFSVy4b6hlNgL37pmW+xtbWUccDG2H2jazG33zVR119coesY/lTpF6hu2Hkjodlu4", + "JFsTkHirVt7dxd3/BwAA//8lGRESOX8AAA==", } // GetSwagger returns the content of the embedded swagger specification file From 0ccb90a07d87621df14b61dae651793a40c72a6d Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 22 Jul 2025 14:27:51 -0500 Subject: [PATCH 04/12] remove Orch data url short circuit --- server/ai_http.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/ai_http.go b/server/ai_http.go index d655aff191..5d3b27f637 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -145,7 +145,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subUrl = pubUrl + "-out" controlUrl = pubUrl + "-control" eventsUrl = pubUrl + "-events" - //dataUrl = pubUrl + "-data" + dataUrl = pubUrl + "-data" ) // Handle initial payment, the rest of the payments are done separately from the stream processing @@ -234,7 +234,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl) subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl) publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl) - //dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) + dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) workerReq := worker.LiveVideoToVideoParams{ ModelId: req.ModelId, @@ -242,6 +242,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { SubscribeUrl: subscribeUrlOverwrite, EventsUrl: &eventsUrlOverwrite, ControlUrl: &controlUrlOverwrite, + DataUrl: &dataUrlOverwrite, Params: req.Params, GatewayRequestId: &gatewayRequestID, ManifestId: &mid, From aef53af4e4489b8292e4c94b1737fbd9dbe334aa Mon Sep 17 00:00:00 2001 From: Brad P Date: Mon, 28 Jul 2025 16:42:32 -0500 Subject: [PATCH 05/12] move datastore items to separte file similar to status --- server/ai_mediaserver.go | 83 ------------------------------------ server/ai_pipeline_data.go | 87 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 83 deletions(-) create mode 100644 server/ai_pipeline_data.go diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index bd638bfd3b..b63976c88d 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -14,7 +14,6 @@ import ( "os" "os/exec" "strings" - "sync" "time" "github.com/livepeer/go-livepeer/monitor" @@ -1314,88 +1313,6 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler { }) } -// DataSegmentStore stores data segments for SSE streaming -type DataSegmentStore struct { - streamID string - segments chan []byte - mu sync.RWMutex - closed bool -} - -func NewDataSegmentStore(streamID string) *DataSegmentStore { - return &DataSegmentStore{ - streamID: streamID, - segments: make(chan []byte, 100), // Buffer up to 100 segments - } -} - -func (d *DataSegmentStore) Store(data []byte) { - d.mu.RLock() - defer d.mu.RUnlock() - if d.closed { - return - } - select { - case d.segments <- data: - default: - // Channel is full, drop oldest segment - select { - case <-d.segments: - default: - } - select { - case d.segments <- data: - default: - } - } -} - -func (d *DataSegmentStore) Subscribe() <-chan []byte { - return d.segments -} - -func (d *DataSegmentStore) Close() { - d.mu.Lock() - defer d.mu.Unlock() - if !d.closed { - d.closed = true - close(d.segments) - } -} - -// Global store for data segments by stream ID -var dataStores = make(map[string]*DataSegmentStore) -var dataStoresMu sync.RWMutex - -func getDataStore(stream string) *DataSegmentStore { - dataStoresMu.RLock() - store, exists := dataStores[stream] - dataStoresMu.RUnlock() - if exists { - return store - } - - dataStoresMu.Lock() - defer dataStoresMu.Unlock() - // Double-check after acquiring write lock - if store, exists := dataStores[stream]; exists { - return store - } - - store = NewDataSegmentStore(stream) - dataStores[stream] = store - return store -} - -func removeDataStore(stream string) { - dataStoresMu.Lock() - defer dataStoresMu.Unlock() - if store, exists := dataStores[stream]; exists { - store.Close() - delete(dataStores, stream) - } -} - // @Summary Get Live Stream Data // @Param streamId path string true "Stream ID" // @Success 200 diff --git a/server/ai_pipeline_data.go b/server/ai_pipeline_data.go new file mode 100644 index 0000000000..01790e8ccc --- /dev/null +++ b/server/ai_pipeline_data.go @@ -0,0 +1,87 @@ +package server + +import ( + "sync" +) + +// DataSegmentStore stores data segments for SSE streaming +type DataSegmentStore struct { + streamID string + segments chan []byte + mu sync.RWMutex + closed bool +} + +func NewDataSegmentStore(streamID string) *DataSegmentStore { + return &DataSegmentStore{ + streamID: streamID, + segments: make(chan []byte, 100), // Buffer up to 100 segments + } +} + +func (d *DataSegmentStore) Store(data []byte) { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return + } + select { + case d.segments <- data: + default: + // Channel is full, drop oldest segment + select { + case <-d.segments: + default: + } + select { + case d.segments <- data: + default: + } + } +} + +func (d *DataSegmentStore) Subscribe() <-chan []byte { + return d.segments +} + +func (d *DataSegmentStore) Close() { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.segments) + } +} + +// Global store for data segments by stream ID +var dataStores = make(map[string]*DataSegmentStore) +var dataStoresMu sync.RWMutex + +func getDataStore(stream string) *DataSegmentStore { + dataStoresMu.RLock() + store, exists := dataStores[stream] + dataStoresMu.RUnlock() + if exists { + return store + } + + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + // Double-check after acquiring write lock + if store, exists := dataStores[stream]; exists { + return store + } + + store = NewDataSegmentStore(stream) + dataStores[stream] = store + return store +} + +func removeDataStore(stream string) { + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + if store, exists := dataStores[stream]; exists { + store.Close() + delete(dataStores, stream) + } +} From d2e848c8d78fad596d618afd44b6675ebce9dfaa Mon Sep 17 00:00:00 2001 From: Brad P Date: Fri, 1 Aug 2025 16:55:44 -0500 Subject: [PATCH 06/12] update to remove datastore and use ringbuffer reader --- core/livepeernode.go | 2 + server/ai_live_video.go | 16 +++---- server/ai_mediaserver.go | 65 +++++++++++++++++++--------- server/ai_pipeline_data.go | 87 -------------------------------------- 4 files changed, 54 insertions(+), 116 deletions(-) delete mode 100644 server/ai_pipeline_data.go diff --git a/core/livepeernode.go b/core/livepeernode.go index 99c28667ab..1eee7fec44 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -19,6 +19,7 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/go-livepeer/media" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/go-livepeer/trickle" @@ -174,6 +175,7 @@ type LivePipeline struct { Pipeline string ControlPub *trickle.TricklePublisher StopControl func() + DataWriter *media.RingBuffer } // NewLivepeerNode creates a new Livepeer Node. Eth can be nil. diff --git a/server/ai_live_video.go b/server/ai_live_video.go index f597a997dd..4ba03416d2 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -774,19 +774,22 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam // Set up output buffers rbc := media.RingBufferConfig{BufferLen: 50_000_000} // 50 MB buffer outWriter, err := media.NewRingBuffer(&rbc) + //put the data buffer in live pipeline for clients to read from + pipeline := params.node.LivePipelines[params.liveParams.stream] + if pipeline == nil { + clog.Infof(ctx, "No live pipeline found for stream %s", params.liveParams.stream) + return + } + pipeline.DataWriter = outWriter + if err != nil { stopProcessing(ctx, params, fmt.Errorf("ringbuffer init failed: %w", err)) return } - // Store data segments for SSE endpoint - stream := params.liveParams.stream - dataStore := getDataStore(stream) - // read segments from trickle subscription go func() { defer outWriter.Close() - defer removeDataStore(stream) // Clean up when done var err error firstSegment := true @@ -846,9 +849,6 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam continue } - // Store the raw segment data for SSE endpoint - dataStore.Store(body) - // Write to output buffer using the body data n, err := outWriter.Write(body) if err != nil { diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index b63976c88d..770e31b106 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -3,6 +3,7 @@ package server import ( "bytes" "context" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -15,6 +16,7 @@ import ( "os/exec" "strings" "time" + "unicode/utf8" "github.com/livepeer/go-livepeer/monitor" @@ -1314,7 +1316,7 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler { } // @Summary Get Live Stream Data -// @Param streamId path string true "Stream ID" +// @Param stream path string true "Stream Key" // @Success 200 // @Router /live/video-to-video/{stream}/data [get] func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { @@ -1333,12 +1335,20 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { ctx := r.Context() ctx = clog.AddVal(ctx, "stream", stream) - // Get the data store for this stream - dataStore := getDataStore(stream) - if dataStore == nil { - http.Error(w, "Stream not found", http.StatusNoContent) + // Get the live pipeline for this stream + livePipeline, ok := ls.LivepeerNode.LivePipelines[stream] + if !ok { + http.Error(w, "Stream not found", http.StatusNotFound) + return + } + + // Get the data readerring buffer + if livePipeline.DataWriter == nil { + clog.Infof(ctx, "No data writer available for stream %s", stream) + http.Error(w, "Stream data not available", http.StatusServiceUnavailable) return } + dataReader := livePipeline.DataWriter.MakeReader() // Set up SSE headers w.Header().Set("Content-Type", "text/event-stream") @@ -1346,9 +1356,6 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") - // Get the subscription channel - dataChan := dataStore.Subscribe() - flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) @@ -1357,27 +1364,43 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { clog.Infof(ctx, "Starting SSE data stream for stream=%s", stream) - // Send keep-alive ping initially - fmt.Fprintf(w, "event: ping\ndata: {\"type\":\"connected\"}\n\n") - flusher.Flush() - - // Stream data segments as SSE events + // Listen for broadcast signals from ring buffer writes + // dataReader.Read() blocks on rb.cond.Wait() until startDataSubscribe broadcasts for { select { case <-ctx.Done(): clog.Info(ctx, "SSE data stream client disconnected") return - case data, ok := <-dataChan: - if !ok { - // Channel closed, stream ended - fmt.Fprintf(w, "event: end\ndata: {\"type\":\"stream_ended\"}\n\n") - flusher.Flush() + default: + // Listen for broadcast from ring buffer writer + buffer := make([]byte, 32*1024) // 32KB read buffer + n, err := dataReader.Read(buffer) + if err != nil { + if err == io.EOF { + // Stream ended + fmt.Fprintf(w, "event: end\ndata: {\"type\":\"stream_ended\"}\n\n") + flusher.Flush() + return + } + clog.Errorf(ctx, "Error reading from ring buffer: %v", err) return } - // Send the segment data as a data event - fmt.Fprintf(w, "data: %s\n\n", string(data)) - flusher.Flush() + if n > 0 { + // Broadcast received - forward segment data as SSE event + data := buffer[:n] + + // Check if data is valid UTF-8 text + if utf8.Valid(data) { + // Send as text string + fmt.Fprintf(w, "data: %s\n\n", string(data)) + } else { + // Send as base64 encoded binary data + encoded := base64.StdEncoding.EncodeToString(data) + fmt.Fprintf(w, "data: %s\n\n", encoded) + } + flusher.Flush() + } } } }) diff --git a/server/ai_pipeline_data.go b/server/ai_pipeline_data.go deleted file mode 100644 index 01790e8ccc..0000000000 --- a/server/ai_pipeline_data.go +++ /dev/null @@ -1,87 +0,0 @@ -package server - -import ( - "sync" -) - -// DataSegmentStore stores data segments for SSE streaming -type DataSegmentStore struct { - streamID string - segments chan []byte - mu sync.RWMutex - closed bool -} - -func NewDataSegmentStore(streamID string) *DataSegmentStore { - return &DataSegmentStore{ - streamID: streamID, - segments: make(chan []byte, 100), // Buffer up to 100 segments - } -} - -func (d *DataSegmentStore) Store(data []byte) { - d.mu.RLock() - defer d.mu.RUnlock() - if d.closed { - return - } - select { - case d.segments <- data: - default: - // Channel is full, drop oldest segment - select { - case <-d.segments: - default: - } - select { - case d.segments <- data: - default: - } - } -} - -func (d *DataSegmentStore) Subscribe() <-chan []byte { - return d.segments -} - -func (d *DataSegmentStore) Close() { - d.mu.Lock() - defer d.mu.Unlock() - if !d.closed { - d.closed = true - close(d.segments) - } -} - -// Global store for data segments by stream ID -var dataStores = make(map[string]*DataSegmentStore) -var dataStoresMu sync.RWMutex - -func getDataStore(stream string) *DataSegmentStore { - dataStoresMu.RLock() - store, exists := dataStores[stream] - dataStoresMu.RUnlock() - if exists { - return store - } - - dataStoresMu.Lock() - defer dataStoresMu.Unlock() - // Double-check after acquiring write lock - if store, exists := dataStores[stream]; exists { - return store - } - - store = NewDataSegmentStore(stream) - dataStores[stream] = store - return store -} - -func removeDataStore(stream string) { - dataStoresMu.Lock() - defer dataStoresMu.Unlock() - if store, exists := dataStores[stream]; exists { - store.Close() - delete(dataStores, stream) - } -} From b11658edc7d58d7a5f02f2640e7ef6f32acd2824 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 14 Aug 2025 05:35:26 +0000 Subject: [PATCH 07/12] ai/live: Read JSONL from a time delimited data channel. --- core/livepeernode.go | 2 +- server/ai_live_video.go | 78 ++++++++++++++++++---------------------- server/ai_mediaserver.go | 38 ++++++-------------- server/ai_process.go | 1 + 4 files changed, 46 insertions(+), 73 deletions(-) diff --git a/core/livepeernode.go b/core/livepeernode.go index 1eee7fec44..0a7ebbce06 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -175,7 +175,7 @@ type LivePipeline struct { Pipeline string ControlPub *trickle.TricklePublisher StopControl func() - DataWriter *media.RingBuffer + DataWriter *media.SegmentWriter } // NewLivepeerNode creates a new Livepeer Node. Eth can be nil. diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 4ba03416d2..1a76dd2c59 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -1,6 +1,7 @@ package server import ( + "bufio" "context" "encoding/json" "errors" @@ -505,8 +506,9 @@ func registerControl(ctx context.Context, params aiRequestParams) { } params.node.LivePipelines[stream] = &core.LivePipeline{ - RequestID: params.liveParams.requestID, - Pipeline: params.liveParams.pipeline, + RequestID: params.liveParams.requestID, + Pipeline: params.liveParams.pipeline, + DataWriter: params.liveParams.dataWriter, } } @@ -767,29 +769,15 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam Ctx: ctx, }) if err != nil { - clog.Infof(ctx, "Failed to create trickle subscriber: %s", err) + clog.Infof(ctx, "Failed to create data subscriber: %s", err) return } - // Set up output buffers - rbc := media.RingBufferConfig{BufferLen: 50_000_000} // 50 MB buffer - outWriter, err := media.NewRingBuffer(&rbc) - //put the data buffer in live pipeline for clients to read from - pipeline := params.node.LivePipelines[params.liveParams.stream] - if pipeline == nil { - clog.Infof(ctx, "No live pipeline found for stream %s", params.liveParams.stream) - return - } - pipeline.DataWriter = outWriter - - if err != nil { - stopProcessing(ctx, params, fmt.Errorf("ringbuffer init failed: %w", err)) - return - } + dataWriter := params.liveParams.dataWriter // read segments from trickle subscription go func() { - defer outWriter.Close() + defer dataWriter.Close() var err error firstSegment := true @@ -801,20 +789,21 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam for { select { case <-ctx.Done(): - clog.Info(ctx, "trickle subscribe done") + clog.Info(ctx, "data subscribe done") return default: } if !params.inputStreamExists() { - clog.Infof(ctx, "trickle subscribe stopping, input stream does not exist.") + clog.Infof(ctx, "data subscribe stopping, input stream does not exist.") break } var segment *http.Response - clog.V(8).Infof(ctx, "trickle subscribe read data await") + readBytes, readMessages := 0, 0 + clog.V(8).Infof(ctx, "data subscribe await") segment, err = subscriber.Read() if err != nil { if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) { - stopProcessing(ctx, params, fmt.Errorf("trickle subscribe stopping, stream not found, err=%w", err)) + stopProcessing(ctx, params, fmt.Errorf("data subscribe stopping, stream not found, err=%w", err)) return } var sequenceNonexistent *trickle.SequenceNonexistent @@ -823,10 +812,10 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam subscriber.SetSeq(sequenceNonexistent.Latest) } // TODO if not EOS then signal a new orchestrator is needed - err = fmt.Errorf("trickle subscribe error reading: %w", err) + err = fmt.Errorf("data subscribe error reading: %w", err) clog.Infof(ctx, "%s", err) if retries > maxRetries { - stopProcessing(ctx, params, errors.New("trickle subscribe stopping, retries exceeded")) + stopProcessing(ctx, params, errors.New("data subscribe stopping, retries exceeded")) return } retries++ @@ -836,32 +825,33 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam } retries = 0 seq := trickle.GetSeq(segment) - clog.V(8).Infof(ctx, "trickle subscribe read data received seq=%d", seq) + clog.V(8).Infof(ctx, "data subscribe received seq=%d", seq) copyStartTime := time.Now() - // Read segment data and store it for SSE - body, err := io.ReadAll(segment.Body) - segment.Body.Close() - if err != nil { - clog.InfofErr(ctx, "trickle subscribe error reading segment body seq=%d", seq, err) - subscriber.SetSeq(seq) - retries++ - continue - } - - // Write to output buffer using the body data - n, err := outWriter.Write(body) - if err != nil { - if errors.Is(err, context.Canceled) { - clog.Info(ctx, "trickle subscribe stopping - context canceled") + defer segment.Body.Close() + scanner := bufio.NewScanner(segment.Body) + for scanner.Scan() { + writer, err := dataWriter.Next() + if err != nil { + if err != io.EOF { + stopProcessing(ctx, params, fmt.Errorf("data subscribe could not get next: %w", err)) + } return } - - clog.InfofErr(ctx, "trickle subscribe error writing to output buffer seq=%d", seq, err) + n, err := writer.Write(scanner.Bytes()) + if err != nil { + stopProcessing(ctx, params, fmt.Errorf("data subscribe could not write: %w", err)) + } + readBytes += n + readMessages += 1 + } + if err := scanner.Err(); err != nil { + clog.InfofErr(ctx, "data subscribe error reading seq=%d", seq, err) subscriber.SetSeq(seq) retries++ continue } + if firstSegment { firstSegment = false delayMs := time.Since(params.liveParams.startTime).Milliseconds() @@ -881,7 +871,7 @@ func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParam } } - clog.V(8).Info(ctx, "trickle subscribe read data completed", "seq", seq, "bytes", humanize.Bytes(uint64(n)), "took", time.Since(copyStartTime)) + clog.V(8).Info(ctx, "data subscribe read completed", "seq", seq, "bytes", humanize.Bytes(uint64(readBytes)), "messages", readMessages, "took", time.Since(copyStartTime)) } }() } diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 770e31b106..08f9cbcf5b 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -3,7 +3,6 @@ package server import ( "bytes" "context" - "encoding/base64" "encoding/json" "errors" "fmt" @@ -16,7 +15,6 @@ import ( "os/exec" "strings" "time" - "unicode/utf8" "github.com/livepeer/go-livepeer/monitor" @@ -112,7 +110,8 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus()) // Stream data SSE endpoint - ls.HTTPMux.Handle("/live/video-to-video/{stream}/data", ls.GetLiveVideoToVideoData()) + ls.HTTPMux.Handle("OPTIONS /live/video-to-video/{stream}/data", ls.WithCode(http.StatusNoContent)) + ls.HTTPMux.Handle("GET /live/video-to-video/{stream}/data", ls.GetLiveVideoToVideoData()) //API for dynamic capabilities ls.HTTPMux.Handle("/process/request/", ls.SubmitJob()) @@ -624,6 +623,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { liveParams: &liveRequestParams{ segmentReader: ssr, + dataWriter: media.NewSegmentWriter(5), rtmpOutputs: rtmpOutputs, localRTMPPrefix: mediaMTXInputURL, stream: streamName, @@ -1089,6 +1089,7 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { liveParams: &liveRequestParams{ segmentReader: ssr, + dataWriter: media.NewSegmentWriter(5), rtmpOutputs: rtmpOutputs, localRTMPPrefix: internalOutputHost, stream: streamName, @@ -1326,11 +1327,6 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { http.Error(w, "stream name is required", http.StatusBadRequest) return } - if r.Method == http.MethodOptions { - corsHeaders(w, r.Method) - w.WriteHeader(http.StatusNoContent) - return - } ctx := r.Context() ctx = clog.AddVal(ctx, "stream", stream) @@ -1348,7 +1344,7 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { http.Error(w, "Stream data not available", http.StatusServiceUnavailable) return } - dataReader := livePipeline.DataWriter.MakeReader() + dataReader := livePipeline.DataWriter.MakeReader(media.SegmentReaderConfig{}) // Set up SSE headers w.Header().Set("Content-Type", "text/event-stream") @@ -1372,13 +1368,11 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { clog.Info(ctx, "SSE data stream client disconnected") return default: - // Listen for broadcast from ring buffer writer - buffer := make([]byte, 32*1024) // 32KB read buffer - n, err := dataReader.Read(buffer) + reader, err := dataReader.Next() if err != nil { if err == io.EOF { // Stream ended - fmt.Fprintf(w, "event: end\ndata: {\"type\":\"stream_ended\"}\n\n") + fmt.Fprintf(w, `event: end\ndata: {"type":"stream_ended"}\n\n`) flusher.Flush() return } @@ -1386,21 +1380,9 @@ func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { return } - if n > 0 { - // Broadcast received - forward segment data as SSE event - data := buffer[:n] - - // Check if data is valid UTF-8 text - if utf8.Valid(data) { - // Send as text string - fmt.Fprintf(w, "data: %s\n\n", string(data)) - } else { - // Send as base64 encoded binary data - encoded := base64.StdEncoding.EncodeToString(data) - fmt.Fprintf(w, "data: %s\n\n", encoded) - } - flusher.Flush() - } + data, err := io.ReadAll(reader) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() } } }) diff --git a/server/ai_process.go b/server/ai_process.go index cc50b380dd..a0034e57a1 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -96,6 +96,7 @@ type aiRequestParams struct { // For live video pipelines type liveRequestParams struct { segmentReader *media.SwitchableSegmentReader + dataWriter *media.SegmentWriter stream string requestID string streamID string From b9fd1686b0f415229357b3450fe6edf6cd5c6948 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 18:03:30 -0500 Subject: [PATCH 08/12] copy dataWriter in newParams --- server/ai_live_video.go | 2 +- server/ai_mediaserver.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 1a76dd2c59..32ea93d2b0 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -763,7 +763,7 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar } func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { - // subscribe to the outputs and send them into LPMS + // subscribe to the outputs subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{ URL: url.String(), Ctx: ctx, diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 08f9cbcf5b..9ad0f036c6 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -740,6 +740,7 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams { return &liveRequestParams{ segmentReader: params.segmentReader, + dataWriter: params.dataWriter, rtmpOutputs: params.rtmpOutputs, localRTMPPrefix: params.localRTMPPrefix, stream: params.stream, From ce670f291da98439f5b2ccfdae2aa0446d6ebb46 Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 09:35:24 -0500 Subject: [PATCH 09/12] update data channel mimetype --- server/ai_http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/ai_http.go b/server/ai_http.go index 5d3b27f637..bf8f0d9f19 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -181,7 +181,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { controlPubCh.CreateChannel() eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json") eventsCh.CreateChannel() - dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/json") + dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl") dataCh.CreateChannel() // Start payment receiver which accounts the payments and stops the stream if the payment is insufficient From 12b87293419507704883188f48a252d88c649f0d Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 11:12:13 -0500 Subject: [PATCH 10/12] update code gen for data_url added to LiveVideoToVideoResponse --- ai/worker/runner.gen.go | 63 +++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/ai/worker/runner.gen.go b/ai/worker/runner.gen.go index b4c0826556..a47eb474d0 100644 --- a/ai/worker/runner.gen.go +++ b/ai/worker/runner.gen.go @@ -350,6 +350,9 @@ type LiveVideoToVideoResponse struct { // ControlUrl URL for updating the live video-to-video generation ControlUrl *string `json:"control_url,omitempty"` + // DataUrl URL for publishing data for pipeline + DataUrl *string `json:"data_url,omitempty"` + // EventsUrl URL for subscribing to events for pipeline status and logs EventsUrl *string `json:"events_url,omitempty"` @@ -3220,36 +3223,36 @@ var swaggerSpec = []string{ "xak0zzbES5RBCTQTiNGQE6LQqAAqbRkszVChC/smunZqSmYVx5MctMWqnv8ybvwvhPmscqUZg5bANTbV", "0v5y1zkvtZcidGOQwL2j38ilKYt3EWS0Ltc4abhrJiShhmPlo1a9rJIzZs7kVL6kRGyHCeY0j/rcz/Qd", "ZuUmL1M2vtqaXduobB1aQ5zfU1bxFHxWCU1ZEbJaj4FkUKZ0Wj+PctxesAeUhHrwI1s8cA0Icfdawi6P", - "SN0V7f3jpA6BrhBz+XTDA9+9A4wfrSVzQWZZPBkcTobj9icfngMA1x5MhIspwzH6cb0YSTbcj+8bPpst", - "jyXsLg+Rj+3Fz+3EyzL9IyyuxL181/R1BYA9DuuX4bSVwvHNCFXUq8Rq6sQE2jBdX9bxRReWhdfQwiKb", - "sKxw5X5gZzwtgqjeU8b79he1PF4IE4szXbRhmmu6dR1WOGUA3WbgldfTLWHCNbdSvWjRvlS/ejszUmxQ", - "qBdOmQpfMTG12N4daTxhlWyVy+p+XYVTMb3pTvPTHKSrbDcT3mCBpjmezSBDWKCPpz/8FJTBqGGGl3Yo", - "Tag3pnrIv4ZQzzionDjq12pw5dSmmK1hIcVUZWE4TUEIc4e9Pj4d4MTGdYUhRYvN16dWV58ez08OY6rU", - "0UYl4+aqay+Vocaem+c2l4qZCKOPvx2u61TEkA1xU9Iy/KzAFKjctSpWYjtMT7slP3I8XoS9lwGDem9v", - "tvXtTvx9bq4/5vWpzr3wJden1lfB11fB/75XwV/9R98ER6dQYi1nfRWjNDuGujRfbxa9+L8XyjRE/SGV", - "yaIp2F9X3/5p9706+D3wvpc1mFaIDUNob5w9LQHSeV+gDbjwIWsfFQpPRAn4CjjKICfXwIXSca7AP18g", - "uC05CK03FSYw1arOVB9I566KWBmdtlX1ONMtSyJT7Tmd1bv7S8nOTa2WsBLAplvqLzN+XI/eIE94LX0I", - "JcuiRZOULQ8R5gKU3s9ZNlVvvhbaS2AKEYNZWduXszQ4CMV0YesQ2hx+6dj0xZ0fw9NWfVhz4m0+t9Y6", - "G4/KUD9ommqa0Zl6uip1VXyYqWxLz7UG1BP+CFxYh2nVFDxpDdgouW4m7sKPfelg56SiFLhnJI7qh9WW", - "uakvlgzoSeje+7Krd2LNRw1WLWXcJwBU22A1dc8SrPYqyhX0GCJWlGRZUn1ZLd8N0zEsrTiRi1NFiuHz", - "w9nZ8WvAHHj9XUMd+MyjepC5lGVyd6eriWIF6/v2WyZp/fk5XlG0f1DvBPtbv4fkGkqFtvsHjQnVdpds", - "b+58s/m/SiKsBIpLkuwl32zubG4rdWE513Rv6c+ajSUbO5wrmYglPPW337xP9ZnrhXZBykprDgeZWm21", - "v4tmt1Nfs2zRqkQyiRHmcktlJmP3yT6j51VWEPsI212oY5UG6QdGo5rt3e3tFhWe2Lc+2zqxYSQEa2g9", - "dyu3qfR+yLTKUdNslHz7iCQ0peCR+V/jzJ3umnl3nmfec4orOWec/AGZnnjnm+eZ2O3Tv6NSrRTOGEOH", - "mJtCrm93Xj0X901Or6HKhDtFwu7uo5LQKcvvEtM0QXXp/qvnsr8DKoFTnKNT4NfAHQUejuq0xEfQXy/u", - "LkaJqIoC84X7xic6Y8hlT3gmFHi7WKLg+3ZsslAsFmOKCxiza+CcZBr6A3QYJVtzW2m95WB4BloEIYj5", - "ZfLJEyJIrBx/KJDc+XJyA5n7CCGndbH9UlZd6fmT82om+jou3RiKTV1Y3s+eef2UfHmV7Q/jypCoudGr", - "TxWU60va8ai8X5b5wt3UDj6GJUyJScmZyrK89WwnTLe+XvbEcTqY7ZkDdVhrv47U/ZF6HaHuG6HMJ2/O", - "GKq/e3DPEEVCx/BBYEBmrvf0DA6sTszDj9s9j8P/GYl57OLJ2uv/4vn5GnoeDD0PTI5J4KE+8FzX37WM", - "Is/72Ncc75V0uK+fPQ8GmdmeGYTC3aQ1/KyTjifw/Porgg9zfecYo2QrJ9cwDotgVy0/ogsPr6relDf6", - "X2eWFaeQIaCZ/oCXiEJEuz5xKUw8XEc9tczPjBK9xZhrwFgDxuMBhjIzAxZfgxp52zMNcuTFgFRBH8dW", - "uuQDoxzTWaUgrK526KLA4dFTOX5z0fS5nd27Vrn277V/P6J/a2+5tz/nhXFhW60/xvbDkuPdfo+236C0", - "teH6FimmSzL+yDcrnzjr78z4zG4eVt2vHX3t6I/n6M77nHGj3Qf4veg6yCjZUhF6wNHD+1bRtrnn25RF", - "xpN6rxjuicJ6t9xufcqwdvu/idvrQsOvOGSQnvsFzm5KFgdt9YVd/P+40/x/i+6OutsElE1xJKaZV6Ua", - "/G+WPUhhyiCfFCqCSstnxorw/1ZdY8UaKx4fK2oXehhY2O4aLSrvW/JRmLDfs65XAmiycP9Zj741KgVq", - "/suOqNs3X8R+4tWBm2idHaw9/m/i8d7X5O/p6lXtDKNky6tcj9ZSNbXkT3doZqd4UCFV0FloaQotu9b/", - "TeKqp9/krMrQG1YUFSVy4b6hlNgL37pmW+xtbWUccDG2H2jazG33zVR119coesY/lTpF6hu2Hkjodlu4", - "JFsTkHirVt7dxd3/BwAA//8lGRESOX8AAA==", + "SN0V7f3jpA6BrhBz+XTPFvj8OPJUIc3PDyRzYW1ZBBscwIZHik9+QAhChsYMIlwUGx4VHhc3kGTDkeO+", + "AbvZZFnC7vKg/Ni48dywsWxtcYTFlbgXWpi+ruSwByL8wp+2Uji+GaGKerVfTWWaQBum68s6oulStvDi", + "W1jWExYyrtyB7IynRRDVe8p4346mlscLYaJ/pstETHNNt678CqcMgoUZeOWFeEuYcM2tVC9atC/Vr95A", + "jZQ3FOqFU6ZCdExM9bd3KxtPWCVbBbq6X1fhVExvutP8NAfpaunNhDdYoGmOZzPIEBbo4+kPPwWFN2qY", + "4cUkShPqjalX8i8+1DMOKmCO+rUaXDm1KZ9rWEgxVXkfTlMQwtyarw9sBzixcV1hSNFi8/Wp1dWnx/OT", + "w5gqdbRR6b+5XNtLZaix5+a5zaViJsLo42/A68oYMWQL3hTRDD+dMCUxd60amdie1tMeAowcjxdh72XA", + "oN7bu3R9+yF/n7vyj3lhq3MTfcmFrfXl8/Xl87/v5fNX/9F3z9EplFjLWV/+KM0epb4MoLenXvzfC2Ua", + "ov50y2TRXBFY1/v+aTfMOvg98IaZNZhWiA1DaG+cPS0B0nlfoA248CFrHxUKT0QJ+Ao4yiAn18CF0nGu", + "wD9fILgtOQitNxUmMNWqzlQfSOeublkZnbZV9TjTLUsiU+05ndW7+0vJzk2tlrASwKZb6i8zflyP3iBP", + "eBF+CCXLokWTlC0PEebKld7PWTZVb74W2ktgChGDWVlNmLM0OHrFdGErH9ocfunY9MWdH8PTVkVac8Zu", + "PvDWOo2PylA/aJpqmtGZeroqdVV8mKlsS8+1BlQw/ghcWIdpVTE8adXZKLluJu7Cj33pYOekohS4ZySO", + "6odVs7mpL5YM6Eno3jvBq/d+zWcUVi1l3EcHVNtgNXXPoq/2KsqVEBkiVhSBWVJ9WS3fDdMxLK04kYtT", + "RYrh88PZ2fFrwBx4/SVFHfjMo3qQuZRlcnen65diJfL79uspaf3BO15RtH9Q7wT7W7+H5BpKhbb7B40J", + "1XaXbG/ufLP5v0oirASKS5LsJd9s7mxuK3VhOdd0b+kPqY0lGzucK5mIJTz11+a8jwOaC412QcpKaw4H", + "mVpttb/EZrdTX7Ns0ap9MokR5nJLZSZj95FAo+dVVhD77NtdqGOVBukHRqOa7d3t7RYVnti3PtvKtGEk", + "BGtoPXcrt6n0fsi0ylHTbJR8+4gkNMXnkflf48ydJ5t5d55n3nOKKzlnnPwBmZ5455vnmdjt07+jUq0U", + "zhhDh5ib0rFvd149F/dNTq+hyoQ7RcLu7qOS0LkI0CWmaYLqywKvnsv+DqgETnGOToFfA3cUeDiq0xIf", + "QX+9uLsYJaIqCswX7qui6Iwhlz3hmVDg7WKJgu/bsclCsViMKS5gzK6Bc5Jp6A/QYZRszW1t95aD4Rlo", + "EYQg5hfmJ0+IILELAEOB5M6XkxvI3IAIOa3L+5ey6ordn5xXM9HXcenGUGzqUvZ+9szrp+TLq6V/GFeG", + "RM2NXn2qoFxfC49H5f2yzBfubnjw+S1hilpKzlSW5a1nO2G69b20J47TwWzPHKjD6v51pO6P1OsIdd8I", + "ZT6yc8ZQ/aWFe4YoEjqGDwIDMnO9p2dwYHViHn5O73kc/s9IzGNXXdZe/xfPz9fQ82DoeWByTAIP9YHn", + "uv6SZhR53se+H3mvpMN9b+15MMjM9swgFO4mreFnnXQ8gefX3y18mOs7xxglWzm5hnFYdrtq+RFdeHh1", + "/Ka80f8etKw4hQwBzfQnw0QUItr1iUth4uE66qmefmaU6C3GXAPGGjAeDzCUmRmw+BrUyNueaZAjLwak", + "Cvo4ttIlHxjlmM4qBWF1tUMXBQ6Pnsrxm6utz+3s3kXOtX+v/fsR/Vt7y739OS+MC9tq/TG2n7Ic7/Z7", + "tP3qpa0N1/dWMV2S8Ue+kvnEWX9nxmd287Dqfu3oa0d/PEd33ueMG+0+wO9F10FGyZaK0AOOHt63irbN", + "zeKmLDKe1HvFcE8U1rvldutThrXb/03cXhcafsUhg/TcL3B2U7I4aKsv7OL/V6Hmf3h0t+LdJqBsiiMx", + "zbwq1eD/z+xBClMG+aRQEVRaPjNWhP+b6xor1ljx+FhRu9DDwMJ212hReV+vj8KE/YJ2vRJAk4X774H0", + "rVEpUPOfhETdvvkG9xOvDtxE6+xg7fF/E4/3vl9/T1evamcYJVte5Xq0lqqpJX+6QzM7xYMKqYLOQktT", + "aNm1/jcUVz39JmdVht6woqgokQv31abEXvjWNdtib2sr44CLsf0k1GZuu2+mqru+RtEz/qnUKVLfsPVA", + "QrfbwiXZmoDEW7Xy7i7u/j8AAP//9DvSdKt/AAA=", } // GetSwagger returns the content of the embedded swagger specification file From 0d2a6f85d5fea41393e6cd3a68fc632ae431ca5c Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 14:27:35 -0500 Subject: [PATCH 11/12] update to use live video response DataUrl field --- server/ai_mediaserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 9ad0f036c6..1009d33681 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -779,7 +779,7 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ if err != nil { return fmt.Errorf("invalid events URL: %w", err) } - data, err := common.AppendHostname(strings.Replace(*resp.JSON200.EventsUrl, "-events", "-data", 1), host) + data, err := common.AppendHostname(*resp.JSON200.DataUrl, host) if err != nil { return fmt.Errorf("invalid data URL: %w", err) } From 9f5b8fa3b4fa38d0d375e46ffd526b3721c1eb6c Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 11:32:36 -0500 Subject: [PATCH 12/12] update to make data channel optional --- server/ai_http.go | 45 ++++++++++++++++++++++++++++++++++++---- server/ai_live_video.go | 5 +++++ server/ai_mediaserver.go | 43 +++++++++++++++++++++++++++++++------- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/server/ai_http.go b/server/ai_http.go index bf8f0d9f19..b084ba4a16 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -148,6 +148,26 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { dataUrl = pubUrl + "-data" ) + //if data is not enabled remove the url and do not start the data channel + if enableData, ok := (*req.Params)["enableData"]; ok { + if val, ok := enableData.(bool); ok { + //turn off data channel if request sets to false + if !val { + dataUrl = "" + } else { + clog.Infof(ctx, "data channel is enabled") + } + } else { + clog.Warningf(ctx, "enableData is not a bool, got type %T", enableData) + } + + //delete the param used for go-livepeer signaling + delete((*req.Params), "enableData") + } else { + //default to no data channel + dataUrl = "" + } + // Handle initial payment, the rest of the payments are done separately from the stream processing // Note that this payment is debit from the balance and acts as a buffer for the AI Realtime Video processing payment, err := getPayment(r.Header.Get(paymentHeader)) @@ -181,8 +201,13 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { controlPubCh.CreateChannel() eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json") eventsCh.CreateChannel() - dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl") - dataCh.CreateChannel() + + //optional channels + var dataCh *trickle.TrickleLocalPublisher + if dataUrl != "" { + dataCh = trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl") + dataCh.CreateChannel() + } // Start payment receiver which accounts the payments and stops the stream if the payment is insufficient priceInfo := payment.GetExpectedPrice() @@ -203,6 +228,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() eventsCh.Close() controlPubCh.Close() + if dataCh != nil { + dataCh.Close() + } cancel() } return err @@ -230,11 +258,17 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { }() // Prepare request to worker + // required channels controlUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, controlUrl) eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl) subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl) publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl) - dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) + + // optional channels + var dataUrlOverwrite string + if dataCh != nil { + dataUrlOverwrite = overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) + } workerReq := worker.LiveVideoToVideoParams{ ModelId: req.ModelId, @@ -260,7 +294,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() controlPubCh.Close() eventsCh.Close() - dataCh.Close() + if dataCh != nil { + dataCh.Close() + } cancel() respondWithError(w, err.Error(), http.StatusInternalServerError) return @@ -272,6 +308,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { SubscribeUrl: subUrl, ControlUrl: &controlUrl, EventsUrl: &eventsUrl, + DataUrl: &dataUrl, RequestId: &requestID, ManifestId: &mid, }) diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 32ea93d2b0..1c02271d3a 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -763,6 +763,11 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar } func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { + //only start DataSubscribe if enabled + if params.liveParams.dataWriter == nil { + return + } + // subscribe to the outputs subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{ URL: url.String(), diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 1009d33681..99db461019 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -623,7 +623,6 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { liveParams: &liveRequestParams{ segmentReader: ssr, - dataWriter: media.NewSegmentWriter(5), rtmpOutputs: rtmpOutputs, localRTMPPrefix: mediaMTXInputURL, stream: streamName, @@ -638,6 +637,15 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler { }, } + //create a dataWriter for data channel if enabled + if enableData, ok := pipelineParams["enableData"]; ok { + if enableData == true || enableData == "true" { + params.liveParams.dataWriter = media.NewSegmentWriter(5) + pipelineParams["enableData"] = true + clog.Infof(ctx, "Data channel enabled for stream %s", streamName) + } + } + registerControl(ctx, params) // Create a special parent context for orchestrator cancellation @@ -763,6 +771,8 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ resp := res.(*worker.GenLiveVideoToVideoResponse) host := params.liveParams.sess.Transcoder() + + //required channels pub, err := common.AppendHostname(resp.JSON200.PublishUrl, host) if err != nil { return fmt.Errorf("invalid publish URL: %w", err) @@ -779,21 +789,30 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ if err != nil { return fmt.Errorf("invalid events URL: %w", err) } - data, err := common.AppendHostname(*resp.JSON200.DataUrl, host) - if err != nil { - return fmt.Errorf("invalid data URL: %w", err) - } + if resp.JSON200.ManifestId != nil { ctx = clog.AddVal(ctx, "manifest_id", *resp.JSON200.ManifestId) params.liveParams.manifestID = *resp.JSON200.ManifestId } - clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s data %s", pub, sub, control, events, data) + + clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events) startControlPublish(ctx, control, params) startTricklePublish(ctx, pub, params, params.liveParams.sess) startTrickleSubscribe(ctx, sub, params, params.liveParams.sess) startEventsSubscribe(ctx, events, params, params.liveParams.sess) - startDataSubscribe(ctx, data, params, params.liveParams.sess) + + //optional channels + var data *url.URL + if *resp.JSON200.DataUrl != "" { + data, err = common.AppendHostname(*resp.JSON200.DataUrl, host) + if err != nil { + return fmt.Errorf("invalid data URL: %w", err) + } + clog.V(common.VERBOSE).Infof(ctx, "data %s", data) + startDataSubscribe(ctx, data, params, params.liveParams.sess) + } + return nil } @@ -1090,7 +1109,6 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { liveParams: &liveRequestParams{ segmentReader: ssr, - dataWriter: media.NewSegmentWriter(5), rtmpOutputs: rtmpOutputs, localRTMPPrefix: internalOutputHost, stream: streamName, @@ -1106,6 +1124,15 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler { }, } + //create a dataWriter for data channel if enabled + if enableData, ok := pipelineParams["enableData"]; ok { + if enableData == true || enableData == "true" { + params.liveParams.dataWriter = media.NewSegmentWriter(5) + pipelineParams["enableData"] = true + clog.Infof(ctx, "Data channel enabled for stream %s", streamName) + } + } + registerControl(ctx, params) req := worker.GenLiveVideoToVideoJSONRequestBody{