Skip to content

Commit a91d98e

Browse files
committed
impl SRS /api/v1/clients api.
1 parent 0b7fbe7 commit a91d98e

File tree

6 files changed

+233
-11
lines changed

6 files changed

+233
-11
lines changed

proxy/api.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,23 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
8282
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
8383
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
8484
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
85-
apiError(ctx, w, r, err)
85+
apiError(ctx, w, r, err, http.StatusInternalServerError)
8686
}
8787
})
8888

8989
// The WebRTC WHEP API handler.
9090
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
9191
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
9292
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
93-
apiError(ctx, w, r, err)
93+
apiError(ctx, w, r, err, http.StatusInternalServerError)
9494
}
9595
})
9696

97+
logger.Df(ctx, "Proxy /api/ to srs")
98+
mux.HandleFunc("/api/", func(w http.ResponseWriter, r *http.Request) {
99+
srsLoadBalancer.ProxyHTTPAPI(ctx, w, r)
100+
})
101+
97102
// Run HTTP API server.
98103
v.wg.Add(1)
99104
go func() {
@@ -239,7 +244,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
239244
logger.Df(ctx, "Register SRS media server, %+v", server)
240245
return nil
241246
}(); err != nil {
242-
apiError(ctx, w, r, err)
247+
apiError(ctx, w, r, err, http.StatusInternalServerError)
243248
}
244249

245250
type Response struct {

proxy/http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request)
198198
ctx := logger.WithContext(v.ctx)
199199

200200
if err := v.serve(ctx, w, r); err != nil {
201-
apiError(ctx, w, r, err)
201+
apiError(ctx, w, r, err, http.StatusInternalServerError)
202202
} else {
203203
logger.Df(ctx, "HTTP client done")
204204
}
@@ -318,7 +318,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
318318
defer r.Body.Close()
319319

320320
if err := v.serve(v.ctx, w, r); err != nil {
321-
apiError(v.ctx, w, r, err)
321+
apiError(v.ctx, w, r, err, http.StatusInternalServerError)
322322
} else {
323323
logger.Df(v.ctx, "HLS client %v for %v with %v done",
324324
v.SRSProxyBackendHLSID, v.StreamURL, r.URL.Path)

proxy/srs-api-proxy.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (c) 2024 Winlin
2+
//
3+
// SPDX-License-Identifier: MIT
4+
package main
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"io"
10+
"net/http"
11+
"srs-proxy/errors"
12+
"srs-proxy/logger"
13+
"strings"
14+
)
15+
16+
type SrsClient struct {
17+
Id string `json:"id"`
18+
Vhost string `json:"vhost"`
19+
Stream string `json:"stream"`
20+
Ip string `json:"ip"`
21+
PageUrl string `json:"pageUrl"`
22+
SwfUrl string `json:"swfUrl"`
23+
TcUrl string `json:"tcUrl"`
24+
Url string `json:"url"`
25+
Name string `json:"name"`
26+
Type string `json:"type"`
27+
Publish bool `json:"publish"`
28+
Alive float32 `json:"alive"`
29+
SendBytes int `json:"send_bytes"`
30+
RecvBytes int `json:"recv_bytes"`
31+
}
32+
33+
type SrsClientResponse struct {
34+
Code int `json:"code"`
35+
Server string `json:"server"`
36+
Service string `json:"service"`
37+
Pid string `json:"pid"`
38+
Client SrsClient `json:"client"`
39+
}
40+
41+
type SrsClientsResponse struct {
42+
Code int `json:"code"`
43+
Server string `json:"server"`
44+
Service string `json:"service"`
45+
Pid string `json:"pid"`
46+
Clients []SrsClient `json:"clients"`
47+
}
48+
49+
type SrsClientDeleteResponse struct {
50+
Code int `json:"code"`
51+
}
52+
53+
type SrsApiProxy struct {
54+
}
55+
56+
func (v *SrsApiProxy) proxySrsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
57+
if strings.HasPrefix(r.URL.Path, "/api/v1/clients") {
58+
return proxySrsClientsAPI(ctx, servers, w, r)
59+
} else if strings.HasPrefix(r.URL.Path, "/api/v1/streams") {
60+
return proxySrsStreamsAPI(ctx, servers, w, r)
61+
}
62+
return nil
63+
}
64+
65+
// handle srs clients api /api/v1/clients
66+
func proxySrsClientsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
67+
defer r.Body.Close()
68+
69+
clientId := ""
70+
if strings.HasPrefix(r.URL.Path, "/api/v1/clients/") {
71+
clientId = r.URL.Path[len("/api/v1/clients/"):]
72+
}
73+
logger.Df(ctx, "%v %v clientId=%v", r.Method, r.URL.Path, clientId)
74+
75+
body, err := io.ReadAll(r.Body)
76+
if err != nil {
77+
apiError(ctx, w, r, err, http.StatusInternalServerError)
78+
return errors.Wrapf(err, "read request body err")
79+
}
80+
81+
switch r.Method {
82+
case http.MethodDelete:
83+
for _, server := range servers {
84+
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
85+
logger.Df(ctx, "response %v", string(ret))
86+
var res SrsClientDeleteResponse
87+
if err := json.Unmarshal(ret, &res); err == nil && res.Code == 0 {
88+
apiResponse(ctx, w, r, res)
89+
return nil
90+
}
91+
}
92+
}
93+
94+
err := errors.Errorf("clientId %v not found in server", clientId)
95+
apiError(ctx, w, r, err, http.StatusNotFound)
96+
return err
97+
case http.MethodGet:
98+
if len(clientId) > 0 {
99+
for _, server := range servers {
100+
var client SrsClientResponse
101+
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
102+
if err := json.Unmarshal(ret, &client); err == nil && client.Code == 0 {
103+
apiResponse(ctx, w, r, client)
104+
return nil
105+
}
106+
}
107+
}
108+
} else { // get all clients
109+
var clients SrsClientsResponse
110+
for _, server := range servers {
111+
var res SrsClientsResponse
112+
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
113+
if err := json.Unmarshal(ret, &res); err == nil && res.Code == 0 {
114+
clients.Clients = append(clients.Clients, res.Clients...)
115+
}
116+
}
117+
}
118+
119+
apiResponse(ctx, w, r, clients)
120+
return nil
121+
}
122+
default:
123+
logger.Df(ctx, "/api/v1/clients %v", r.Method)
124+
}
125+
return nil
126+
}
127+
128+
func proxySrsStreamsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
129+
return nil
130+
}

proxy/srs.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
package main
55

66
import (
7+
"bytes"
78
"context"
89
"encoding/json"
910
"fmt"
11+
"io"
1012
"math/rand"
13+
"net/http"
1114
"os"
1215
"strconv"
1316
"strings"
@@ -97,6 +100,28 @@ func (v *SRSServer) Format(f fmt.State, c rune) {
97100
}
98101
}
99102

103+
func (v *SRSServer) ApiRequest(ctx context.Context, r *http.Request, body []byte) ([]byte, error) {
104+
url := "http://" + v.IP + ":" + v.API[0] + r.URL.Path
105+
if r.URL.RawQuery != "" {
106+
url += "?" + r.URL.RawQuery
107+
}
108+
109+
if req, err := http.NewRequestWithContext(ctx, r.Method, url, bytes.NewReader(body)); err != nil {
110+
return nil, errors.Wrapf(err, "create request to %v", url)
111+
} else if res, err := http.DefaultClient.Do(req); err != nil {
112+
return nil, errors.Wrapf(err, "send request to %v", url)
113+
} else {
114+
defer res.Body.Close()
115+
if ret, err := io.ReadAll(res.Body); err != nil {
116+
return nil, errors.Wrapf(err, "read http respose error")
117+
} else if !isHttpStatusOK(res.StatusCode) {
118+
return ret, errors.Errorf("http response status code %v", res.StatusCode)
119+
} else {
120+
return ret, nil
121+
}
122+
}
123+
}
124+
100125
func NewSRSServer(opts ...func(*SRSServer)) *SRSServer {
101126
v := &SRSServer{}
102127
for _, opt := range opts {
@@ -158,13 +183,16 @@ type SRSLoadBalancer interface {
158183
StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error
159184
// Load the WebRTC streaming by ufrag, the ICE username.
160185
LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error)
186+
// proxy http api to srs
187+
ProxyHTTPAPI(ctx context.Context, w http.ResponseWriter, r *http.Request) error
161188
}
162189

163190
// srsLoadBalancer is the global SRS load balancer.
164191
var srsLoadBalancer SRSLoadBalancer
165192

166193
// srsMemoryLoadBalancer stores state in memory.
167194
type srsMemoryLoadBalancer struct {
195+
*SrsApiProxy
168196
// All available SRS servers, key is server ID.
169197
servers sync.Map[string, *SRSServer]
170198
// The picked server to servce client by specified stream URL, key is stream url.
@@ -287,7 +315,17 @@ func (v *srsMemoryLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag str
287315
}
288316
}
289317

318+
func (v *srsMemoryLoadBalancer) ProxyHTTPAPI(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
319+
services := make([]*SRSServer, v.servers.Size())
320+
v.servers.Range(func(_ string, value *SRSServer) bool {
321+
services = append(services, value)
322+
return true
323+
})
324+
return v.proxySrsAPI(ctx, services, w, r)
325+
}
326+
290327
type srsRedisLoadBalancer struct {
328+
*SrsApiProxy
291329
// The redis client sdk.
292330
rdb *redis.Client
293331
}
@@ -528,6 +566,40 @@ func (v *srsRedisLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag stri
528566
return &actual, nil
529567
}
530568

569+
func (v *srsRedisLoadBalancer) ProxyHTTPAPI(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
570+
defer r.Body.Close()
571+
// Query all servers from redis, in json string.
572+
var serverKeys []string
573+
if b, err := v.rdb.Get(ctx, v.redisKeyServers()).Bytes(); err == nil {
574+
if err := json.Unmarshal(b, &serverKeys); err != nil {
575+
return errors.Wrapf(err, "unmarshal key=%v servers %v", v.redisKeyServers(), string(b))
576+
}
577+
}
578+
579+
// No server found, failed.
580+
if len(serverKeys) == 0 {
581+
err := errors.New("servers empty")
582+
apiError(ctx, w, r, err, http.StatusInternalServerError)
583+
return err
584+
}
585+
586+
// TODO get all SRSServer
587+
var srsServers []*SRSServer
588+
589+
for _, key := range serverKeys {
590+
var server SRSServer
591+
if b, err := v.rdb.Get(ctx, key).Bytes(); err == nil {
592+
if err := json.Unmarshal(b, &server); err != nil {
593+
return errors.Wrapf(err, "unmarshal servers %v, %v", key, string(b))
594+
}
595+
srsServers = append(srsServers, &server)
596+
logger.Df(ctx, "srsServer: %v", server)
597+
}
598+
}
599+
600+
return v.proxySrsAPI(ctx, srsServers, w, r)
601+
}
602+
531603
func (v *srsRedisLoadBalancer) redisKeyUfrag(ufrag string) string {
532604
return fmt.Sprintf("srs-proxy-ufrag:%v", ufrag)
533605
}
@@ -549,5 +621,5 @@ func (v *srsRedisLoadBalancer) redisKeyServer(serverID string) string {
549621
}
550622

551623
func (v *srsRedisLoadBalancer) redisKeyServers() string {
552-
return fmt.Sprintf("srs-proxy-all-servers")
624+
return "srs-proxy-all-servers"
553625
}

proxy/sync/map.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,13 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
4343
func (m *Map[K, V]) Store(key K, value V) {
4444
m.m.Store(key, value)
4545
}
46+
47+
func (m *Map[K, V]) Size() uint32 {
48+
size := uint32(0)
49+
m.m.Range(func(_, _ any) bool {
50+
size++
51+
return true
52+
})
53+
54+
return size
55+
}

proxy/utils.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func apiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, da
3232

3333
b, err := json.Marshal(data)
3434
if err != nil {
35-
apiError(ctx, w, r, errors.Wrapf(err, "marshal %v %v", reflect.TypeOf(data), data))
35+
apiError(ctx, w, r, errors.Wrapf(err, "marshal %v %v", reflect.TypeOf(data), data), http.StatusInternalServerError)
3636
return
3737
}
3838

@@ -41,10 +41,10 @@ func apiResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, da
4141
w.Write(b)
4242
}
4343

44-
func apiError(ctx context.Context, w http.ResponseWriter, r *http.Request, err error) {
44+
func apiError(ctx context.Context, w http.ResponseWriter, r *http.Request, err error, code int) {
4545
logger.Wf(ctx, "HTTP API error %+v", err)
4646
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
47-
w.WriteHeader(http.StatusInternalServerError)
47+
w.WriteHeader(code)
4848
fmt.Fprintln(w, fmt.Sprintf("%v", err))
4949
}
5050

@@ -69,6 +69,10 @@ func apiCORS(ctx context.Context, w http.ResponseWriter, r *http.Request) bool {
6969
return false
7070
}
7171

72+
func isHttpStatusOK(v int) bool {
73+
return v >= 200 && v < 300
74+
}
75+
7276
func parseGracefullyQuitTimeout() (time.Duration, error) {
7377
if t, err := time.ParseDuration(envGraceQuitTimeout()); err != nil {
7478
return 0, errors.Wrapf(err, "parse duration %v", envGraceQuitTimeout())
@@ -250,8 +254,9 @@ func parseSRTStreamID(sid string) (host, resource string, err error) {
250254
}
251255

252256
// parseListenEndpoint parse the listen endpoint as:
253-
// port The tcp listen port, like 1935.
254-
// protocol://ip:port The listen endpoint, like tcp://:1935 or tcp://0.0.0.0:1935
257+
//
258+
// port The tcp listen port, like 1935.
259+
// protocol://ip:port The listen endpoint, like tcp://:1935 or tcp://0.0.0.0:1935
255260
func parseListenEndpoint(ep string) (protocol string, ip net.IP, port uint16, err error) {
256261
// If no colon in ep, it's port in string.
257262
if !strings.Contains(ep, ":") {

0 commit comments

Comments
 (0)