Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Notification implementation (#47)
Browse files Browse the repository at this point in the history
* notification implementation: Initial changes

* TDD notification: tested basic functionality with memory storage

* attending review comments and fixes to the sse

* autoincrement ID

* supporting query parameters and only the difference on update notifications

* adding leveldb support to store the older events

* taking storage dsn from config and starting the id from 0

* improved logging

* Update NOTICE

* remove redundant MIT license text

* addressing the reveiw comments

* reverting print verb

* eventType moved to wot package

* minor refactoring

* renaming the storage to eventQueue

* renaming the local variable to avoid confusion

Co-authored-by: Shreekantha Devasya <[email protected]>
  • Loading branch information
farshidtz and Shreekantha Devasya authored May 25, 2021
1 parent 47fa092 commit eb51e51
Show file tree
Hide file tree
Showing 13 changed files with 612 additions and 21 deletions.
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ the corresponding developers and organisations:
* https://github.com/oleksandr/bonjour by Oleksandr Lobunets (MIT License)
* https://github.com/syndtr/goleveldb by Suryandaru Triandana (MIT License)
* https://github.com/dgrijalva/jwt-go by Dave Grijalva (MIT License)

Moreover, it includes portions of source code from the following developers and organizations:

* https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c by Ismael Celis (MIT License)
2 changes: 2 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type CatalogController interface {
cleanExpired()

Stop()

AddSubscriber(listener EventListener)
}

// Storage interface
Expand Down
34 changes: 26 additions & 8 deletions catalog/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
var controllerExpiryCleanupInterval = 60 * time.Second // to be modified in unit tests

type Controller struct {
storage Storage
storage Storage
listeners eventHandler
}

func NewController(storage Storage) (CatalogController, error) {
Expand All @@ -36,6 +37,10 @@ func NewController(storage Storage) (CatalogController, error) {
return &c, nil
}

func (c *Controller) AddSubscriber(listener EventListener) {
c.listeners = append(c.listeners, listener)
}

func (c *Controller) add(td ThingDescription) (string, error) {
id, ok := td[wot.KeyThingID].(string)
if !ok || id == "" {
Expand Down Expand Up @@ -66,6 +71,8 @@ func (c *Controller) add(td ThingDescription) (string, error) {
return "", err
}

go c.listeners.created(td)

return id, nil
}

Expand Down Expand Up @@ -94,7 +101,7 @@ func (c *Controller) update(id string, td ThingDescription) error {
return err
}
if len(results) != 0 {
return &ValidationError{results}
return &ValidationError{ValidationErrors: results}
}

now := time.Now().UTC()
Expand All @@ -112,6 +119,8 @@ func (c *Controller) update(id string, td ThingDescription) error {
return err
}

go c.listeners.updated(oldTD, td)

return nil
}

Expand Down Expand Up @@ -169,15 +178,24 @@ func (c *Controller) patch(id string, td ThingDescription) error {
return err
}

go c.listeners.updated(oldTD, td)

return nil
}

func (c *Controller) delete(id string) error {
err := c.storage.delete(id)
oldTD, err := c.storage.get(id)
if err != nil {
return err
}

err = c.storage.delete(id)
if err != nil {
return err
}

go c.listeners.deleted(oldTD)

return nil
}

Expand Down Expand Up @@ -235,7 +253,7 @@ func (c *Controller) filterJSONPath(path string, page, perPage int) ([]interface
// filter results with jsonpath
b, err = jsonpath.Get(b, path)
if err != nil {
return nil, 0, &BadRequestError{fmt.Sprintf("error evaluating jsonpath: %s", err)}
return nil, 0, &BadRequestError{S: fmt.Sprintf("error evaluating jsonpath: %s", err)}
}

// de-serialize the filtered results
Expand All @@ -248,7 +266,7 @@ func (c *Controller) filterJSONPath(path string, page, perPage int) ([]interface
// paginate
offset, limit, err := utils.GetPagingAttr(len(results), page, perPage, MaxPerPage)
if err != nil {
return nil, 0, &BadRequestError{fmt.Sprintf("unable to paginate: %s", err)}
return nil, 0, &BadRequestError{S: fmt.Sprintf("unable to paginate: %s", err)}
}
// return the requested page
return results[offset : offset+limit], len(results), nil
Expand Down Expand Up @@ -302,7 +320,7 @@ func (c *Controller) filterXPath(path string, page, perPage int) ([]interface{},
// filter with xpath
nodes, err := xpath.QueryAll(doc, path)
if err != nil {
return nil, 0, &BadRequestError{fmt.Sprintf("error filtering input with xpath: %s", err)}
return nil, 0, &BadRequestError{S: fmt.Sprintf("error filtering input with xpath: %s", err)}
}
for _, n := range nodes {
results = append(results, getObjectFromXPathNode(n))
Expand All @@ -311,7 +329,7 @@ func (c *Controller) filterXPath(path string, page, perPage int) ([]interface{},
// paginate
offset, limit, err := utils.GetPagingAttr(len(results), page, perPage, MaxPerPage)
if err != nil {
return nil, 0, &BadRequestError{fmt.Sprintf("unable to paginate: %s", err)}
return nil, 0, &BadRequestError{S: fmt.Sprintf("unable to paginate: %s", err)}
}
// return the requested page
return results[offset : offset+limit], len(results), nil
Expand All @@ -333,7 +351,7 @@ func (c *Controller) filterXPathBytes(path string) ([]byte, error) {
// filter with xpath
nodes, err := xpath.QueryAll(doc, path)
if err != nil {
return nil, &BadRequestError{fmt.Sprintf("error filtering input with xpath: %s", err)}
return nil, &BadRequestError{S: fmt.Sprintf("error filtering input with xpath: %s", err)}
}
results := make([]interface{}, len(nodes))
for i := range nodes {
Expand Down
14 changes: 7 additions & 7 deletions catalog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ import (
)

// Not Found
type NotFoundError struct{ s string }
type NotFoundError struct{ S string }

func (e *NotFoundError) Error() string { return e.s }
func (e *NotFoundError) Error() string { return e.S }

// Conflict (non-unique id, assignment to read-only data)
type ConflictError struct{ s string }
type ConflictError struct{ S string }

func (e *ConflictError) Error() string { return e.s }
func (e *ConflictError) Error() string { return e.S }

// Bad Request
type BadRequestError struct{ s string }
type BadRequestError struct{ S string }

func (e *BadRequestError) Error() string { return e.s }
func (e *BadRequestError) Error() string { return e.S }

// Validation error (HTTP Bad Request)
type ValidationError struct {
validationErrors []wot.ValidationError
ValidationErrors []wot.ValidationError
}

func (e *ValidationError) Error() string { return "validation errors" }
Expand Down
41 changes: 41 additions & 0 deletions catalog/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package catalog

// EventListener interface that listens to TDD events.
type EventListener interface {
CreateHandler(new ThingDescription) error
UpdateHandler(old ThingDescription, new ThingDescription) error
DeleteHandler(old ThingDescription) error
}

// eventHandler implements sequential fav-out/fan-in of events from registry
type eventHandler []EventListener

func (h eventHandler) created(new ThingDescription) error {
for i := range h {
err := h[i].CreateHandler(new)
if err != nil {
return err
}
}
return nil
}

func (h eventHandler) updated(old ThingDescription, new ThingDescription) error {
for i := range h {
err := h[i].UpdateHandler(old, new)
if err != nil {
return err
}
}
return nil
}

func (h eventHandler) deleted(old ThingDescription) error {
for i := range h {
err := h[i].DeleteHandler(old)
if err != nil {
return err
}
}
return nil
}
8 changes: 4 additions & 4 deletions catalog/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (a *HTTPAPI) Post(w http.ResponseWriter, req *http.Request) {
ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error())
return
case *ValidationError:
ValidationErrorResponse(w, err.(*ValidationError).validationErrors)
ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors)
return
default:
ErrorResponse(w, http.StatusInternalServerError, "Error creating the registration:", err.Error())
Expand Down Expand Up @@ -137,7 +137,7 @@ func (a *HTTPAPI) Put(w http.ResponseWriter, req *http.Request) {
ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error())
return
case *ValidationError:
ValidationErrorResponse(w, err.(*ValidationError).validationErrors)
ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors)
return
default:
ErrorResponse(w, http.StatusInternalServerError, "Error creating the registration:", err.Error())
Expand All @@ -151,7 +151,7 @@ func (a *HTTPAPI) Put(w http.ResponseWriter, req *http.Request) {
ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error())
return
case *ValidationError:
ValidationErrorResponse(w, err.(*ValidationError).validationErrors)
ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors)
return
default:
ErrorResponse(w, http.StatusInternalServerError, "Error updating the registration:", err.Error())
Expand Down Expand Up @@ -196,7 +196,7 @@ func (a *HTTPAPI) Patch(w http.ResponseWriter, req *http.Request) {
ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error())
return
case *ValidationError:
ValidationErrorResponse(w, err.(*ValidationError).validationErrors)
ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors)
return
default:
ErrorResponse(w, http.StatusInternalServerError, "Error updating the registration:", err.Error())
Expand Down
27 changes: 25 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/linksmart/go-sec/auth/keycloak/validator"
"github.com/linksmart/go-sec/auth/validator"
"github.com/linksmart/thing-directory/catalog"
"github.com/linksmart/thing-directory/notification"
"github.com/linksmart/thing-directory/wot"
"github.com/rs/cors"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -104,10 +105,29 @@ func main() {
// Create catalog API object
api := catalog.NewHTTPAPI(controller, Version)

nRouter, err := setupHTTPRouter(&config.HTTP, api)
// Start notification
var eventQueue notification.EventQueue
switch config.Storage.Type {
case catalog.BackendLevelDB:
eventQueue, err = notification.NewLevelDBEventQueue(config.Storage.DSN+"/sse", nil, 1000)
if err != nil {
panic("Failed to start LevelDB storage for SSE events:" + err.Error())
}
defer eventQueue.Close()
default:
panic("Could not create SSE storage. Unsupported type:" + config.Storage.Type)
}
notificationController := notification.NewController(eventQueue)
notifAPI := notification.NewSSEAPI(notificationController, Version)
defer notificationController.Stop()

controller.AddSubscriber(notificationController)

nRouter, err := setupHTTPRouter(&config.HTTP, api, notifAPI)
if err != nil {
panic(err)
}

// Start listener
addr := fmt.Sprintf("%s:%d", config.HTTP.BindAddr, config.HTTP.BindPort)
listener, err := net.Listen("tcp", addr)
Expand Down Expand Up @@ -158,7 +178,7 @@ func main() {
log.Println("Shutting down...")
}

func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI) (*negroni.Negroni, error) {
func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI, notifAPI *notification.SSEAPI) (*negroni.Negroni, error) {

corsHandler := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
Expand Down Expand Up @@ -216,6 +236,9 @@ func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI) (*negroni.Negroni
// TD validation
r.get("/validation", commonHandlers.ThenFunc(api.GetValidation))

//TD notification
r.get("/events", commonHandlers.ThenFunc(notifAPI.SubscribeEvent))

logger := negroni.NewLogger()
logFlags := log.LstdFlags
if evalEnv(EnvDisableLogTime) {
Expand Down
Loading

0 comments on commit eb51e51

Please sign in to comment.