diff --git a/go.mod b/go.mod index 9dc93e6..ee4f261 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/SumeruCCTV/sumeru go 1.18 require ( + github.com/SumeruCCTV/go-onvif v1.0.7-0.20220710204443-43ab317ee846 github.com/SumeruCCTV/transcoder v1.2.0 github.com/bytedance/sonic v1.3.3 github.com/go-redis/redis/v8 v8.11.5 @@ -21,7 +22,9 @@ require ( github.com/andybalholm/brotli v1.0.4 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/chenzhuoyu/base64x v0.0.0-20220526154910-8bf9453eb81a // indirect + github.com/clbanning/mxj v1.8.4 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.3.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index b9c9b9b..79fe3ed 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= +github.com/SumeruCCTV/go-onvif v1.0.7-0.20220710204443-43ab317ee846 h1:hThJe79pI4K+NGBnY41oGHAyKf9HqsFUhDKiJm1Hd7I= +github.com/SumeruCCTV/go-onvif v1.0.7-0.20220710204443-43ab317ee846/go.mod h1:t0R1GtnbvjwyZ5KtvJkl5lkM8x8UCxFhhbBakw35sew= github.com/SumeruCCTV/transcoder v1.2.0 h1:B81Afhfjajc8gZcBX3xIPZUqgSZVU/1LJzacTZGdvDc= github.com/SumeruCCTV/transcoder v1.2.0/go.mod h1:Ux5TH3YT5/lNQ3blm+LkqlnO2ZAKZSovIA0GLmLg93Y= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= @@ -14,6 +16,8 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20220526154910-8bf9453eb81a h1:lmGPzuocwDxoPAMr9h16zoJY/USZR9jIh99nrmKk1uI= github.com/chenzhuoyu/base64x v0.0.0-20220526154910-8bf9453eb81a/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= +github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= +github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -40,6 +44,9 @@ github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPh github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= diff --git a/service/camera/camera.go b/service/camera/camera.go index ef4f750..067adc0 100644 --- a/service/camera/camera.go +++ b/service/camera/camera.go @@ -2,10 +2,21 @@ package camera import ( "github.com/SumeruCCTV/sumeru/pkg/svcstat" - "github.com/SumeruCCTV/transcoder/ffmpeg" + "github.com/SumeruCCTV/sumeru/pkg/utils" + "github.com/SumeruCCTV/sumeru/service/database" + "github.com/SumeruCCTV/sumeru/service/database/models" ) type Service struct { + log *utils.Logger + database *database.Service + + // queue to add cameras + q chan *models.Camera + closed bool + + // map of currently active cameras by uuid + cameras map[string]Connector } func (Service) Name() string { @@ -13,15 +24,37 @@ func (Service) Name() string { } func (svc *Service) Start() error { - _ = ffmpeg.Config{} + svc.q = make(chan *models.Camera) + go svc.startQueue() + svc.cameras = make(map[string]Connector) return nil } func (svc *Service) Stop() error { + close(svc.q) return nil } func (svc *Service) Status() svcstat.Status { - return svcstat.StatusHealthy + if !svc.closed { + return svcstat.StatusHealthy + } + return svcstat.StatusUnhealthy +} + +func (svc *Service) AddCamera(cam *models.Camera) { + svc.q <- cam +} + +func (svc *Service) startQueue() { + svc.closed = false + for { + cam, ok := <-svc.q + if !ok { + svc.closed = true + return + } + go svc.accept(cam) + } } diff --git a/service/camera/connector.go b/service/camera/connector.go new file mode 100644 index 0000000..ed23f37 --- /dev/null +++ b/service/camera/connector.go @@ -0,0 +1,25 @@ +package camera + +import "github.com/SumeruCCTV/sumeru/service/database/models" + +type Connector interface { + TestConnection() error +} + +type ConnectorData struct { + uuid string + ipAddress string + port int + credentials models.CameraCredentials +} + +func NewConnector(cameraType models.CameraType, svc *Service, data *ConnectorData) Connector { + switch cameraType { + case models.CameraTypeONVIF: + return NewONVIFConnector(svc, data) + case models.CameraTypeRTSP: + return NewRTSPConnector(svc, data) + default: + return nil + } +} diff --git a/service/camera/handler.go b/service/camera/handler.go new file mode 100644 index 0000000..e45c5d3 --- /dev/null +++ b/service/camera/handler.go @@ -0,0 +1,33 @@ +package camera + +import ( + "github.com/SumeruCCTV/sumeru/service/database/models" + "go.uber.org/zap" +) + +func (svc *Service) accept(cam *models.Camera) { + svc.log.Debugw("adding camera", zap.String("uuid", cam.Uuid)) + c := NewConnector(cam.Type, svc, &ConnectorData{ + uuid: cam.Uuid, + ipAddress: cam.IPAddress, + port: cam.Port, + credentials: cam.Credentials, + }) + if c == nil { + return // invalid camera type? + } + if err := c.TestConnection(); err != nil { + svc.log.Debugw("failed to test connection for camera", zap.String("uuid", cam.Uuid), zap.Error(err)) + if cam.Status != models.CameraStatusInvalid { + if err := svc.database.UpdateCameraStatus(cam.Uuid, models.CameraStatusInvalid); err != nil { + svc.log.Warnw("failed to update camera status", zap.String("uuid", cam.Uuid), zap.Error(err)) + } + } + return + } + svc.cameras[cam.Uuid] = c + if err := svc.database.UpdateCameraStatus(cam.Uuid, models.CameraStatusDisconnected); err != nil { + svc.log.Warnw("failed to update camera status", zap.String("uuid", cam.Uuid), zap.Error(err)) + return + } +} diff --git a/service/camera/onvif.go b/service/camera/onvif.go new file mode 100644 index 0000000..680a64c --- /dev/null +++ b/service/camera/onvif.go @@ -0,0 +1,44 @@ +package camera + +import ( + "fmt" + "github.com/SumeruCCTV/go-onvif" + "github.com/SumeruCCTV/sumeru/pkg/utils" +) + +// maybe switch to https://github.com/videonext/onvif +// go-onvif is a mess + +const protocol = "http" + +type ONVIFConnector struct { + svc *Service + log *utils.Logger + + data *ConnectorData + rtsp *RTSPConnector +} + +func (c *ONVIFConnector) TestConnection() error { + dvc := c.device() + if _, err := dvc.GetInformation(); err != nil { + return err + } + return nil +} + +func (c *ONVIFConnector) device() *onvif.Device { + return &onvif.Device{ + XAddr: fmt.Sprintf("%s://%s:%d/onvif/services", protocol, c.data.ipAddress, c.data.port), + User: c.data.credentials.Username, + Password: c.data.credentials.Password, + } +} + +func NewONVIFConnector(svc *Service, data *ConnectorData) *ONVIFConnector { + return &ONVIFConnector{ + svc: svc, + log: svc.log.Named("onvif"), + data: data, + } +} diff --git a/service/camera/rtsp.go b/service/camera/rtsp.go new file mode 100644 index 0000000..983124b --- /dev/null +++ b/service/camera/rtsp.go @@ -0,0 +1,38 @@ +package camera + +import ( + "fmt" + "github.com/SumeruCCTV/sumeru/pkg/utils" + "github.com/SumeruCCTV/transcoder/ffmpeg" + "net" +) + +// TODO: see https://github.com/andrewlfw/joy4/tree/main/format/rtspv2 +// maybe use this instead of ffmpeg? + +type RTSPConnector struct { + svc *Service + log *utils.Logger + + data *ConnectorData +} + +func (c *RTSPConnector) TestConnection() error { + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", c.data.ipAddress, c.data.port)) + if err != nil { + return err + } + _ = conn.Close() + + _ = ffmpeg.Config{} + + return nil +} + +func NewRTSPConnector(svc *Service, data *ConnectorData) *RTSPConnector { + return &RTSPConnector{ + svc: svc, + log: svc.log.Named("rtsp"), + data: data, + } +} diff --git a/service/database/db/cameras.go b/service/database/db/cameras.go index 1d546dc..e9a14ac 100644 --- a/service/database/db/cameras.go +++ b/service/database/db/cameras.go @@ -4,15 +4,16 @@ import ( "github.com/SumeruCCTV/sumeru/service/database/models" ) -func (db *Database) AddCameraByUuid(accountUuid, cameraName, cameraAddr string, cameraPort int, cameraType models.CameraType) (*models.Camera, error) { - uuid := db.GenerateUuid() - camera := &models.Camera{ - Uuid: uuid, - Name: cameraName, - OwnerUuid: accountUuid, - IPAddress: cameraAddr, - Port: cameraPort, - Type: cameraType, - } +func (db *Database) AddCameraByUuid(camera *models.Camera) (*models.Camera, error) { + camera.Uuid = db.GenerateUuid() + camera.Status = models.CameraStatusInvalid return camera, db.Create(camera).Error } + +func (db *Database) UpdateCameraStatus(uuid string, status models.CameraStatus) error { + // should we create the model only once? + return db.Model(&models.Camera{}). + Where("uuid = ?", uuid). + Update("status", status). + Error +} diff --git a/service/database/models/cameras.go b/service/database/models/cameras.go index 0fca9f1..8add193 100644 --- a/service/database/models/cameras.go +++ b/service/database/models/cameras.go @@ -7,10 +7,11 @@ type Camera struct { OwnerUuid string `gorm:"uniqueIndex:idx_owner_uuid_name" json:"ownerUuid"` Name string `gorm:"uniqueIndex:idx_owner_uuid_name" json:"name"` - IPAddress string `gorm:"not null" json:"addr"` - Port int `gorm:"not null" json:"port"` - Type CameraType `gorm:"not null" json:"type"` - Status CameraStatus `gorm:"not null" json:"status"` + IPAddress string `gorm:"not null" json:"addr"` + Port int `gorm:"not null" json:"port"` + Type CameraType `gorm:"not null" json:"type"` + Status CameraStatus `gorm:"not null" json:"status"` + Credentials CameraCredentials `gorm:"not null;embedded;embeddedPrefix:camera_" json:"credentials"` CreatedAt int64 `json:"-"` } @@ -18,15 +19,19 @@ type Camera struct { type CameraType int const ( - CameraTypeUnknown CameraType = iota - CameraTypeONVIF + CameraTypeONVIF CameraType = iota CameraTypeRTSP ) type CameraStatus int const ( - CameraStatusInvalid CameraType = iota + CameraStatusInvalid CameraStatus = iota CameraStatusDisconnected CameraStatusConnected ) + +type CameraCredentials struct { + Username string `gorm:"not null" json:"username"` + Password string `gorm:"not null" json:"password"` +} diff --git a/service/web/routes/r_camera/post_camera.go b/service/web/routes/r_camera/post_camera.go index b0f9531..bd47cac 100644 --- a/service/web/routes/r_camera/post_camera.go +++ b/service/web/routes/r_camera/post_camera.go @@ -11,10 +11,11 @@ import ( func init() { type requestBody struct { - Name string `json:"name"` - Addr string `json:"addr"` - Port int `json:"port"` - Type models.CameraType `json:"type"` + Name string `json:"name"` + Addr string `json:"addr"` + Port int `json:"port"` + Type models.CameraType `json:"type"` + Credentials models.CameraCredentials `json:"credentials"` } type responseBody struct { @@ -41,6 +42,9 @@ func init() { if err = utils.ValidCameraType(body.Type, ctx); err != nil { return } + if err = utils.ValidBody(ctx, body.Credentials.Username, body.Credentials.Password); err != nil { + return + } return } @@ -55,7 +59,14 @@ func init() { if err != nil { return err } - camera, err := svc.DB().AddCameraByUuid(uuid, body.Name, body.Addr, body.Port, body.Type) + cam, err := svc.DB().AddCameraByUuid(&models.Camera{ + OwnerUuid: uuid, + Name: body.Name, + IPAddress: body.Addr, + Port: body.Port, + Type: body.Type, + Credentials: body.Credentials, + }) if err != nil { if errors.IsPgErr(err, errors.PgErrDuplicateEntry) { ctx.Status(fiber.StatusConflict) @@ -67,8 +78,9 @@ func init() { ).Errorf("error adding camera to db: %v", err) return errors.ErrorAddingCamera } + svc.CameraSvc().AddCamera(cam) ctx.Status(fiber.StatusCreated) - return ctx.JSON(responseBody{Uuid: camera.Uuid}) + return ctx.JSON(responseBody{Uuid: cam.Uuid}) }) }) } diff --git a/service/web/utils/validate.go b/service/web/utils/validate.go index 7e88db4..535813a 100644 --- a/service/web/utils/validate.go +++ b/service/web/utils/validate.go @@ -53,7 +53,8 @@ func ValidCameraPort(port int, ctx *fiber.Ctx) error { } func ValidCameraType(cameraType models.CameraType, ctx *fiber.Ctx) error { - if cameraType < models.CameraTypeUnknown || cameraType > models.CameraTypeRTSP { + // can be either ONVIF or RTSP + if cameraType < models.CameraTypeONVIF || cameraType > models.CameraTypeRTSP { ctx.Status(fiber.StatusBadRequest) return errors.InvalidCameraType } diff --git a/service/web/web.go b/service/web/web.go index e48a287..2d09feb 100644 --- a/service/web/web.go +++ b/service/web/web.go @@ -8,6 +8,7 @@ import ( "github.com/SumeruCCTV/sumeru/pkg/json" "github.com/SumeruCCTV/sumeru/pkg/svcstat" "github.com/SumeruCCTV/sumeru/pkg/utils" + "github.com/SumeruCCTV/sumeru/service/camera" "github.com/SumeruCCTV/sumeru/service/database" "github.com/bytedance/sonic" "github.com/gofiber/fiber/v2" @@ -17,16 +18,16 @@ import ( "github.com/gofiber/fiber/v2/middleware/recover" "github.com/gofiber/fiber/v2/middleware/requestid" "github.com/gofiber/helmet/v2" - "go.uber.org/atomic" ) type Service struct { cfg *config.Config log *utils.Logger database *database.Service + camera *camera.Service app *fiber.App - running atomic.Bool + running bool } func (Service) Name() string { @@ -66,9 +67,9 @@ func (svc *Service) Start() error { // todo: return error port := svc.cfg.Web.Port svc.log.Infof("Starting web server on port %d", port) - svc.running.Store(true) + svc.running = true _ = svc.app.Listen(fmt.Sprintf(":%d", port)) - svc.running.Store(false) + svc.running = false }() return nil } @@ -78,7 +79,7 @@ func (svc *Service) Stop() error { } func (svc *Service) Status() svcstat.Status { - if svc.app != nil && svc.database != nil && svc.running.Load() { + if svc.app != nil && svc.database != nil && svc.running { return svcstat.StatusHealthy } return svcstat.StatusUnhealthy @@ -92,6 +93,10 @@ func (svc *Service) DB() *database.Service { return svc.database } +func (svc *Service) CameraSvc() *camera.Service { + return svc.camera +} + var errorHandler = func(ctx *fiber.Ctx, _e error) error { if ctx.Response().StatusCode() == fiber.StatusOK { ctx.Status(fiber.StatusInternalServerError)