diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 981d52749f..7a6f13869f 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -1092,7 +1092,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } } - if n.NodeType == core.BroadcasterNode { + if n.NodeType == core.BroadcasterNode || n.NodeType == core.RemoteSignerNode { maxEV, _ := new(big.Rat).SetString(*cfg.MaxTicketEV) if maxEV == nil { panic(fmt.Errorf("-maxTicketEV must be a valid rational number, but %v provided. Restart the node with a valid value for -maxTicketEV", *cfg.MaxTicketEV)) diff --git a/core/accounting.go b/core/accounting.go index ff3cb0b520..213389e933 100644 --- a/core/accounting.go +++ b/core/accounting.go @@ -31,6 +31,11 @@ func (b *Balance) Credit(amount *big.Rat) { b.balances.Credit(b.addr, b.manifestID, amount) } +// Reserve zeroes the balance and returns the current balance +func (b *Balance) Reserve() *big.Rat { + return b.balances.Reserve(b.addr, b.manifestID) +} + // StageUpdate prepares a balance update by reserving the current balance and returning the number of tickets // to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance) func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat) { diff --git a/pm/sender.go b/pm/sender.go index ebeb09e2c6..1f446a540a 100644 --- a/pm/sender.go +++ b/pm/sender.go @@ -22,6 +22,9 @@ type Sender interface { // for creating new tickets StartSession(ticketParams TicketParams) string + // StartSessionWithNonce is like StartSession with a non-default nonce + StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string + // CleanupSession deletes session from the internal map CleanupSession(sessionID string) @@ -33,6 +36,9 @@ type Sender interface { // EV returns the ticket EV for a session EV(sessionID string) (*big.Rat, error) + + // Nonce returns the current nonce for a session + Nonce(sessionID string) (uint32, error) } type session struct { @@ -75,6 +81,17 @@ func (s *sender) StartSession(ticketParams TicketParams) string { return sessionID } +func (s *sender) StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string { + sessionID := ticketParams.RecipientRandHash.Hex() + + s.sessions.Store(sessionID, &session{ + ticketParams: ticketParams, + senderNonce: nonce, + }) + + return sessionID +} + // EV returns the ticket EV for a session func (s *sender) EV(sessionID string) (*big.Rat, error) { session, err := s.loadSession(sessionID) @@ -85,6 +102,14 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) { return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil } +func (s *sender) Nonce(sessionID string) (uint32, error) { + session, err := s.loadSession(sessionID) + if err != nil { + return 0, err + } + return session.senderNonce, nil +} + func (s *sender) CleanupSession(sessionID string) { s.sessions.Delete(sessionID) } diff --git a/pm/stub.go b/pm/stub.go index 33ba0352d1..a1424bd743 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -511,6 +511,11 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string { return args.String(0) } +func (m *MockSender) StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string { + args := m.Called(ticketParams, nonce) + return args.String(0) +} + // CleanupSession deletes session from the internal ma func (m *MockSender) CleanupSession(sessionID string) { m.Called(sessionID) @@ -545,3 +550,8 @@ func (m *MockSender) ValidateTicketParams(ticketParams *TicketParams) error { args := m.Called(ticketParams) return args.Error(0) } + +func (m *MockSender) Nonce(sessionID string) (uint32, error) { + args := m.Called(sessionID) + return uint32(args.Int(0)), args.Error(1) +} diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 0dbcb09e70..c78ab51259 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -82,7 +82,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara priceInfo := sess.OrchestratorInfo.PriceInfo var paymentProcessor *LivePaymentProcessor if priceInfo != nil && priceInfo.PricePerUnit != 0 { - paymentSender := livePaymentSender{} + paymentSender := params.liveParams.paymentSender sendPaymentFunc := func(inPixels int64) error { return paymentSender.SendPayment(context.Background(), &SegmentInfoSender{ sess: sess.BroadcastSession, diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 7ce096187f..1640c160f3 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -692,7 +692,7 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi var err error for { perOrchCtx, perOrchCancel := context.WithCancelCause(ctx) - params.liveParams = newParams(params.liveParams, perOrchCancel) + params.liveParams = newLiveParams(params, perOrchCancel) var resp interface{} resp, err = processAIRequest(perOrchCtx, params, req) if err != nil { @@ -758,7 +758,8 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi <-firstProcessed } -func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams { +func newLiveParams(aiParams aiRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams { + params := aiParams.liveParams return &liveRequestParams{ segmentReader: params.segmentReader, rtmpOutputs: params.rtmpOutputs, @@ -775,8 +776,15 @@ func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *l orchestrator: params.orchestrator, startTime: time.Now(), kickOrch: cancelOrch, + paymentSender: choosePaymentSender(aiParams), } +} +func choosePaymentSender(params aiRequestParams) LivePaymentSender { + if hasRemoteSigner(params) { + return NewRemotePaymentSender(params.node) + } + return &livePaymentSender{} } func startProcessing(ctx context.Context, params aiRequestParams, res interface{}) error { diff --git a/server/ai_process.go b/server/ai_process.go index cc50b380dd..9ca2455486 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -104,6 +104,7 @@ type liveRequestParams struct { pipeline string orchestrator string + paymentSender LivePaymentSender paymentProcessInterval time.Duration outSegmentTimeout time.Duration @@ -1051,28 +1052,54 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A params.liveParams.sess = sess params.liveParams.startTime = time.Now() - // Live Video should not reuse the existing session balance, because it could lead to not sending the init - // payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side. - // It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side. - clearSessionBalance(sess.BroadcastSession, core.RandomManifestID()) + var paymentHeaders worker.RequestEditorFn + if hasRemoteSigner(params) { + rpp, ok := params.liveParams.paymentSender.(*remotePaymentSender) + if !ok { + return nil, errors.New("remote sender was not the correct type") + } + res, err := rpp.RequestPayment(ctx, &SegmentInfoSender{ + sess: sess.BroadcastSession, + }) + if err != nil { + return nil, err + } + paymentHeaders = func(_ context.Context, req *http.Request) error { + req.Header.Set(segmentHeader, res.SegCreds) + req.Header.Set(paymentHeader, res.Payment) + req.Header.Set("Authorization", protoVerAIWorker) + return nil + } + } else { - client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) - if err != nil { - if monitor.Enabled { - monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo) + // Live Video should not reuse the existing session balance, because it could lead to not sending the init + // payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side. + // It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side. + clearSessionBalance(sess.BroadcastSession, core.RandomManifestID()) + + var ( + balUpdate *BalanceUpdate + err error + ) + paymentHeaders, balUpdate, err = prepareAIPayment(ctx, sess, initPixelsToPay) + if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo) + } + return nil, err } - return nil, err + defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) } - paymentHeaders, balUpdate, err := prepareAIPayment(ctx, sess, initPixelsToPay) + + // Send request to orchestrator + client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo) } return nil, err } - defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) - // Send request to orchestrator reqTimeout := 5 * time.Second reqCtx, cancel := context.WithTimeout(ctx, reqTimeout) defer cancel() @@ -1669,3 +1696,7 @@ func encodeReqMetadata(metadata map[string]string) string { metadataBytes, _ := json.Marshal(metadata) return string(metadataBytes) } + +func hasRemoteSigner(params aiRequestParams) bool { + return params.node != nil && params.node.RemoteSignerAddr != nil +} diff --git a/server/live_payment.go b/server/live_payment.go index 2d159be1db..17aca99261 100644 --- a/server/live_payment.go +++ b/server/live_payment.go @@ -1,11 +1,16 @@ package server import ( + "bytes" "context" + "encoding/json" + "errors" "fmt" "io" "math/big" "net/http" + "net/url" + "sync" "time" ethcommon "github.com/ethereum/go-ethereum/common" @@ -25,6 +30,7 @@ type SegmentInfoSender struct { inPixels int64 priceInfo *net.PriceInfo mid string + callCount int } type SegmentInfoReceiver struct { @@ -164,7 +170,149 @@ func (r *livePaymentReceiver) AccountPayment( return fmt.Errorf("insufficient balance, mid=%s, fee=%s, balance=%s", segmentInfo.sessionID, fee.FloatString(0), balanceStr) } r.orchestrator.DebitFees(segmentInfo.sender, core.ManifestID(segmentInfo.sessionID), segmentInfo.priceInfo, segmentInfo.inPixels) - clog.V(common.DEBUG).Infof(ctx, "Accounted payment for sessionID=%s, fee=%s", segmentInfo.sessionID, fee.FloatString(0)) + balance = r.orchestrator.Balance(segmentInfo.sender, core.ManifestID(segmentInfo.sessionID)) + clog.V(common.DEBUG).Infof(ctx, "Accounted payment for sessionID=%s, fee=%s balance=%s", segmentInfo.sessionID, fee.FloatString(0), balance.FloatString(0)) + return nil +} + +// Delegate ticket generation to a remote signer service and then forward the +// payment on to the orchestrator. Return intermediate payment state as a blob +type remotePaymentSender struct { + node *core.LivepeerNode + client *http.Client + + // access to all fields below must be protected by the mutex mu + mu sync.Mutex + state RemotePaymentStateSig +} + +func NewRemotePaymentSender(node *core.LivepeerNode) *remotePaymentSender { + return &remotePaymentSender{ + node: node, + client: &http.Client{ + Timeout: paymentRequestTimeout, + }, + } +} + +func (r *remotePaymentSender) RequestPayment(ctx context.Context, segmentInfo *SegmentInfoSender) (*RemotePaymentResponse, error) { + if r == nil || r.node == nil || r.node.RemoteSignerAddr == nil { + return nil, fmt.Errorf("remote signer not configured") + } + + sess := segmentInfo.sess + if sess == nil || sess.OrchestratorInfo == nil || sess.OrchestratorInfo.PriceInfo == nil { + return nil, fmt.Errorf("missing session or OrchestratorInfo") + } + + // Marshal OrchestratorInfo + oInfoBytes, err := proto.Marshal(&net.PaymentResult{Info: sess.OrchestratorInfo}) + if err != nil { + return nil, fmt.Errorf("error marshaling OrchestratorInfo for remote signer: %w", err) + } + + r.mu.Lock() + state := r.state + r.mu.Unlock() + + // Build remote payment request + reqPayload := RemotePaymentRequest{ + ManifestID: segmentInfo.mid, + Orchestrator: oInfoBytes, + State: state, + Type: RemoteType_LiveVideoToVideo, + } + + body, err := json.Marshal(reqPayload) + if err != nil { + return nil, fmt.Errorf("error marshaling request payload for remote signer: %w", err) + } + + remoteURL := r.node.RemoteSignerAddr.ResolveReference(&url.URL{Path: "/generate-live-payment"}) + httpReq, err := http.NewRequestWithContext(ctx, "POST", remoteURL.String(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + resp, err := r.client.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("failed to call remote signer: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == HTTPStatusRefreshSession { + if segmentInfo.callCount > 3 { + return nil, errors.New("too many consecutive session refreshes") + } + if err := refreshSession(ctx, sess, true); err != nil { + return nil, fmt.Errorf("could not refresh session for remote signer: %w", err) + } + segmentInfo.callCount += 1 + return r.RequestPayment(ctx, segmentInfo) + } + + if resp.StatusCode != http.StatusOK { + data, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(data)) + } + + var rp RemotePaymentResponse + if err := json.NewDecoder(resp.Body).Decode(&rp); err != nil { + return nil, fmt.Errorf("failed to decode remote signer response: %w", err) + } + + // Cache updated state blob and signature + r.mu.Lock() + r.state = rp.State + r.mu.Unlock() + + return &rp, nil +} + +// SendPayment via remote signer: request tickets + seg creds from remote signer +// and then forward them to the orchestrator. +func (r *remotePaymentSender) SendPayment(ctx context.Context, segmentInfo *SegmentInfoSender) error { + rp, err := r.RequestPayment(ctx, segmentInfo) + if err != nil { + return err + } + + // Forward payment + segment credentials to orchestrator + url := segmentInfo.sess.OrchestratorInfo.Transcoder + req, err := http.NewRequestWithContext(ctx, "POST", url+"/payment", nil) + if err != nil { + clog.Errorf(ctx, "Could not generate payment request to orch=%s", url) + return err + } + req.Header.Set(paymentHeader, rp.Payment) + req.Header.Set(segmentHeader, rp.SegCreds) + + resp, err := sendReqWithTimeout(req, paymentRequestTimeout) + if err != nil { + clog.Errorf(ctx, "Could not send payment to orch=%s err=%q", url, err) + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + clog.Errorf(ctx, "Orchestrator did not accept payment status=%d", resp.StatusCode) + return fmt.Errorf("orchestrator did not accept payment, status=%d", resp.StatusCode) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + clog.Errorf(ctx, "Could not read response from orchestrator=%s err=%q", url, err) + return err + } + + // Update session to refresh ticket params from the response + var pr net.PaymentResult + err = proto.Unmarshal(data, &pr) + if err != nil { + clog.Errorf(ctx, "Could not unmarshal response from orchestrator=%s err=%q", url, err) + return err + } + updateSession(segmentInfo.sess, &ReceivedTranscodeResult{Info: pr.Info}) + return nil } diff --git a/server/live_payment_test.go b/server/live_payment_test.go index bd4620a95c..1a67ac5458 100644 --- a/server/live_payment_test.go +++ b/server/live_payment_test.go @@ -2,6 +2,14 @@ package server import ( "context" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + ethcommon "github.com/ethereum/go-ethereum/common" "github.com/golang/protobuf/proto" "github.com/livepeer/go-livepeer/core" @@ -9,10 +17,6 @@ import ( "github.com/livepeer/go-livepeer/pm" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "math/big" - "net/http" - "testing" - "time" ) func TestSendPayment(t *testing.T) { @@ -140,3 +144,82 @@ func TestAccountPayment(t *testing.T) { }) } } + +// TestRemoteLivePaymentSender_BasicFlow verifies that remoteLivePaymentSender forwards +// remote-generated payments to the orchestrator. +func TestRemoteLivePaymentSender_BasicFlow(t *testing.T) { + require := require.New(t) + + // Stub Orchestrator + ts, mux := stubTLSServer() + defer ts.Close() + tr := &net.PaymentResult{ + Info: &net.OrchestratorInfo{ + Transcoder: ts.URL, + PriceInfo: &net.PriceInfo{PricePerUnit: 7, PixelsPerUnit: 7}, + TicketParams: &net.TicketParams{ExpirationBlock: big.NewInt(100).Bytes()}, + AuthToken: stubAuthToken, + }, + } + buf, err := proto.Marshal(tr) + require.Nil(err) + + mux.HandleFunc("/payment", func(w http.ResponseWriter, r *http.Request) { + // Expect payment + segCreds headers from remote signer + if r.Header.Get(paymentHeader) == "" { + t.Errorf("missing payment header") + } + if r.Header.Get(segmentHeader) == "" { + t.Errorf("missing segment header") + } + w.WriteHeader(http.StatusOK) + w.Write(buf) + }) + + // Stub remote signer + remoteResp := RemotePaymentResponse{ + Payment: "dummy-payment", + SegCreds: "dummy-segcreds", + State: HexBytes([]byte{0x01}), + // Signature value is not validated by the sender + StateSignature: HexBytes([]byte{0x02}), + } + remoteBody, err := json.Marshal(remoteResp) + require.Nil(err) + + remoteTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write(remoteBody) + })) + defer remoteTS.Close() + + remoteURL, err := url.Parse(remoteTS.URL) + require.Nil(err) + + // Stub session + sess := StubBroadcastSession(ts.URL) + sess.Sender = nil + sess.Balances = nil + + // Livepeer node with RemoteSignerAddr configured + node, _ := core.NewLivepeerNode(nil, "", nil) + node.RemoteSignerAddr = remoteURL + + // Create remote payment sender and segment info + paymentSender := NewRemotePaymentProcessor(node) + segmentInfo := &SegmentInfoSender{ + sess: sess, + inPixels: 1000000, + priceInfo: &net.PriceInfo{ + PricePerUnit: 1, + PixelsPerUnit: 1, + }, + mid: "mid1", + } + + // when + err = paymentSender.SendPayment(context.TODO(), segmentInfo) + + // then + require.Nil(err) +} diff --git a/server/remote_signer.go b/server/remote_signer.go index bc4b682656..0445c7d53f 100644 --- a/server/remote_signer.go +++ b/server/remote_signer.go @@ -3,17 +3,29 @@ package server import ( "encoding/hex" "encoding/json" + "errors" "fmt" "io" + "math/big" "net/http" "net/url" + "sync" "time" + ethcommon "github.com/ethereum/go-ethereum/common" "github.com/golang/glog" + "github.com/golang/protobuf/proto" "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/core" + lpcrypto "github.com/livepeer/go-livepeer/crypto" + "github.com/livepeer/go-livepeer/monitor" + "github.com/livepeer/go-livepeer/net" + "github.com/livepeer/lpms/stream" ) +const HTTPStatusRefreshSession = 480 +const RemoteType_LiveVideoToVideo = "lv2v" + // SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) { ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID())) @@ -56,6 +68,7 @@ func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Re func StartRemoteSignerServer(ls *LivepeerServer, bind string) error { // Register the remote signer endpoint ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo)) + ls.HTTPMux.Handle("POST /generate-live-payment", http.HandlerFunc(ls.GenerateLivePayment)) // Start the HTTP server glog.Info("Starting Remote Signer server on ", bind) @@ -108,6 +121,316 @@ type OrchInfoSigResponse struct { Signature HexBytes `json:"signature"` } +// State required for remote ticket creation. +// Treated as an opaque, signed blob by the gateway. +type RemotePaymentState struct { + StateID string + PMSessionID string + LastUpdate time.Time + OrchestratorAddress ethcommon.Address + SenderNonce uint32 + Balance string +} + +type RemotePaymentStateSig struct { + State []byte + Sig []byte +} + +// RemotePaymentRequest is sent by the gateway to the remote signer to request a batch of tickets. +// TODO length limits for string / byte fields +type RemotePaymentRequest struct { + // State is an opaque, signed blob previously returned by the remote signer. + State RemotePaymentStateSig `json:"state,omitempty"` + + // protobuf bytes of net.PaymentResult containing OrchestratorInfo. Required + Orchestrator []byte `json:"orchestrator"` + + // Set if an ID is needed to tie into orch accounting for a session. Optional + ManifestID string + + // Number of pixels to generate a ticket for. Required if `type` is not set. + InPixels int64 `json:"inPixels"` + + // Job type to automatically calculate payments. Valid values: `lv2v`. Optional. + Type string `json:"type"` +} + +// Returned by the remote signer and includes a new payment plus updated state. +type RemotePaymentResponse struct { + Payment string `json:"payment"` + SegCreds string `json:"segCreds,omitempty"` + State RemotePaymentStateSig `json:"state"` +} + +// Signs the serialized state with the remote signer's Ethereum key. +func signState(ls *LivepeerServer, stateBytes []byte) ([]byte, error) { + if ls == nil || ls.LivepeerNode == nil || ls.LivepeerNode.Eth == nil { + return nil, fmt.Errorf("ethereum client not configured for remote signer") + } + sig, err := ls.LivepeerNode.Eth.Sign(stateBytes) + if err != nil { + return nil, err + } + return sig, nil +} + +// verifyStateSignature verifies that sig is a valid signature over stateBytes produced +// by the remote signer's Ethereum account. +func verifyStateSignature(ls *LivepeerServer, stateBytes []byte, sig []byte) error { + if ls == nil || ls.LivepeerNode == nil || ls.LivepeerNode.Eth == nil { + return fmt.Errorf("ethereum client not configured for remote signer") + } + addr := ls.LivepeerNode.Eth.Account().Address + if !lpcrypto.VerifySig(addr, stateBytes, sig) { + return fmt.Errorf("invalid state signature") + } + return nil +} + +// GenerateLivePayment handles remote generation of a payment for live streams. +func (ls *LivepeerServer) GenerateLivePayment(w http.ResponseWriter, r *http.Request) { + ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID())) + remoteAddr := getRemoteAddr(r) + clog.Info(ctx, "Live payment request", "ip", remoteAddr) + + // TODO avoid using the global Balances; keep balance changes request-local + if ls.LivepeerNode.Balances == nil || ls.LivepeerNode.Sender == nil { + err := fmt.Errorf("LivepeerNode missing balances or sender") + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + balances, sender := ls.LivepeerNode.Balances, ls.LivepeerNode.Sender + + var req RemotePaymentRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + clog.Errorf(ctx, "Failed to decode RemotePaymentRequest err=%q", err) + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + if len(req.Orchestrator) == 0 { + err := fmt.Errorf("missing orchestrator") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + var info net.PaymentResult + if err := proto.Unmarshal(req.Orchestrator, &info); err != nil { + clog.Errorf(ctx, "Failed to unmarshal orch info err=%q", err) + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + if info.Info == nil { + err := errors.New("Missing orch info") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + oInfo := info.Info // OrchestratorInfo + priceInfo := oInfo.PriceInfo + if priceInfo == nil || priceInfo.PricePerUnit == 0 { + err := fmt.Errorf("missing or zero priceInfo") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + if oInfo.TicketParams == nil { + err := fmt.Errorf("missing ticketParams in OrchestratorInfo") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + orchAddr := ethcommon.BytesToAddress(oInfo.Address) + + // Load or initialize state + var ( + state *RemotePaymentState + err error + ) + reqState, reqSig := req.State.State, req.State.Sig + if len(reqState) != 0 || len(reqSig) != 0 { + if err := verifyStateSignature(ls, reqState, reqSig); err != nil { + err = errors.New("invalid sig") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + if err := json.Unmarshal(reqState, &state); err != nil { + err = errors.New("invalid state") + respondJsonError(ctx, w, err, http.StatusBadRequest) + } + if state.OrchestratorAddress != orchAddr { + err := fmt.Errorf("orchestratorAddress mismatch") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + } else { + state = &RemotePaymentState{ + StateID: string(core.RandomManifestID()), + OrchestratorAddress: orchAddr, + } + } + + stateID := core.ManifestID(state.StateID) + clog.AddVal(ctx, "state_id", state.StateID) + + manifestID := req.ManifestID + if manifestID == "" { + manifestID = string(core.RandomManifestID()) + } + ctx = clog.AddVal(ctx, "manifest_id", manifestID) + + streamParams := &core.StreamParameters{ + // Embedded within genSegCreds, may be used by orch for payment accounting + ManifestID: core.ManifestID(manifestID), + } + + pmParams := pmTicketParams(oInfo.TicketParams) + if pmParams == nil { + err := fmt.Errorf("failed to derive ticket params from OrchestratorInfo") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + sessionBalance := core.NewBalance(orchAddr, stateID, balances) + + // Restore balance + oldBal := &big.Rat{} + if state.Balance != "" { + if _, ok := oldBal.SetString(state.Balance); ok { + // Reset existing balance for this stream and apply saved value + sessionBalance.Reserve() + sessionBalance.Credit(oldBal) + } + } + + // Reset nonce if session has been refreshed. + sessionID := pmParams.RecipientRandHash.Hex() + nonce := state.SenderNonce + if state.PMSessionID != sessionID { + nonce = 0 + } + + sess := &BroadcastSession{ + Broadcaster: core.NewBroadcaster(ls.LivepeerNode), + Params: streamParams, + Sender: sender, + Balances: balances, + Balance: sessionBalance, + lock: &sync.RWMutex{}, + OrchestratorInfo: oInfo, + CleanupSession: sender.CleanupSession, + PMSessionID: sender.StartSessionWithNonce(*pmParams, nonce), + InitialPrice: oInfo.PriceInfo, + } + defer sess.CleanupSession(sess.PMSessionID) + + if should, err := shouldRefreshSession(ctx, sess); err == nil && should { + err := errors.New("refresh session for remote signer") + respondJsonError(ctx, w, err, HTTPStatusRefreshSession) + return + } else if err != nil { + err = fmt.Errorf("remote signer could not check whether to refresh session: %w", err) + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + pixels := req.InPixels + if req.Type == RemoteType_LiveVideoToVideo { + info := defaultSegInfo + now := time.Now() + lastUpdate := state.LastUpdate + if lastUpdate.IsZero() { + // preload with 60 seconds of data by default + lastUpdate = now.Add(-60 * time.Second) + } + pixelsPerSec := float64(info.Height) * float64(info.Width) * float64(info.FPS) + secSinceLastProcessed := now.Sub(lastUpdate).Seconds() + pixels = int64(pixelsPerSec * secSinceLastProcessed) + } else if req.Type != "" { + err = errors.New("invalid job type") + respondJsonError(ctx, w, err, http.StatusBadRequest) + } + if pixels <= 0 { + err = errors.New("missing pixels or job type") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + // Compute required fee + fee := calculateFee(pixels, priceInfo) + + // Create balance update + balUpdate, err := newBalanceUpdate(sess, fee) + if err != nil { + err = fmt.Errorf("Failed to update balance: %w", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + balUpdate.Debit = fee + balUpdate.Status = ReceivedChange + + // Generate payment tickets + payment, err := genPayment(ctx, sess, balUpdate.NumTickets) + if err != nil { + clog.Errorf(ctx, "Could not create payment err=%q", err) + if monitor.Enabled { + monitor.PaymentCreateError(ctx) + } + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + // Generate segment credentials with an empty segment + segCreds, err := genSegCreds(sess, &stream.HLSSegment{}, nil, false) + if err != nil { + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + // Complete balance update and set state to new balance + completeBalanceUpdate(sess, balUpdate) // Updates sessionBalance internally + newBal := sessionBalance.Balance() + if newBal == nil { + err = errors.New("zero balance?") + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + state.Balance = newBal.RatString() + state.LastUpdate = time.Now() + state.PMSessionID = sess.PMSessionID + state.SenderNonce, err = sender.Nonce(sess.PMSessionID) + if err != nil { + err = fmt.Errorf("remote signer failed to retrieve nonce: %w", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + // Encode and sign updated state + stateBytes, err := json.Marshal(state) + if err != nil { + clog.Errorf(ctx, "Failed to encode updated RemotePaymentState err=%q", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + stateSig, err := signState(ls, stateBytes) + if err != nil { + clog.Errorf(ctx, "Could not sign state err=%q", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + clog.Info(ctx, "Signed", "numTickets", balUpdate.NumTickets, "nonce", state.SenderNonce, "fee", fee.FloatString(0), "sessionId", oInfo.AuthToken.SessionId, "pmSessionId", sess.PMSessionID, "oldBalance", oldBal.FloatString(0), "newBalance", newBal.FloatString(0)) + + // Return payment (tickets), creds and signed state + resp := RemotePaymentResponse{ + Payment: payment, + SegCreds: segCreds, + State: RemotePaymentStateSig{State: stateBytes, Sig: stateSig}, + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) +} + // Calls the remote signer service to get a signature for GetOrchInfo func GetOrchInfoSig(remoteSignerHost *url.URL) (*OrchInfoSigResponse, error) {