diff --git a/NOTICE b/NOTICE index db12921..1b2b741 100644 --- a/NOTICE +++ b/NOTICE @@ -19,3 +19,7 @@ the corresponding developers and organisations: * https://github.com/oleksandr/bonjour by Oleksandr Lobunets (MIT License) * https://github.com/syndtr/goleveldb by Suryandaru Triandana (MIT License) * https://github.com/dgrijalva/jwt-go by Dave Grijalva (MIT License) + +Moreover, it includes portions of source code from the following developers and organizations: + +* https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c by Ismael Celis (MIT License) diff --git a/catalog/catalog.go b/catalog/catalog.go index 71685c8..9e6306f 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -48,6 +48,8 @@ type CatalogController interface { cleanExpired() Stop() + + AddSubscriber(listener EventListener) } // Storage interface diff --git a/catalog/controller.go b/catalog/controller.go index d593e3a..5e0d3e4 100644 --- a/catalog/controller.go +++ b/catalog/controller.go @@ -23,7 +23,8 @@ import ( var controllerExpiryCleanupInterval = 60 * time.Second // to be modified in unit tests type Controller struct { - storage Storage + storage Storage + listeners eventHandler } func NewController(storage Storage) (CatalogController, error) { @@ -36,6 +37,10 @@ func NewController(storage Storage) (CatalogController, error) { return &c, nil } +func (c *Controller) AddSubscriber(listener EventListener) { + c.listeners = append(c.listeners, listener) +} + func (c *Controller) add(td ThingDescription) (string, error) { id, ok := td[wot.KeyThingID].(string) if !ok || id == "" { @@ -66,6 +71,8 @@ func (c *Controller) add(td ThingDescription) (string, error) { return "", err } + go c.listeners.created(td) + return id, nil } @@ -94,7 +101,7 @@ func (c *Controller) update(id string, td ThingDescription) error { return err } if len(results) != 0 { - return &ValidationError{results} + return &ValidationError{ValidationErrors: results} } now := time.Now().UTC() @@ -112,6 +119,8 @@ func (c *Controller) update(id string, td ThingDescription) error { return err } + go c.listeners.updated(oldTD, td) + return nil } @@ -169,15 +178,24 @@ func (c *Controller) patch(id string, td ThingDescription) error { return err } + go c.listeners.updated(oldTD, td) + return nil } func (c *Controller) delete(id string) error { - err := c.storage.delete(id) + oldTD, err := c.storage.get(id) + if err != nil { + return err + } + + err = c.storage.delete(id) if err != nil { return err } + go c.listeners.deleted(oldTD) + return nil } @@ -235,7 +253,7 @@ func (c *Controller) filterJSONPath(path string, page, perPage int) ([]interface // filter results with jsonpath b, err = jsonpath.Get(b, path) if err != nil { - return nil, 0, &BadRequestError{fmt.Sprintf("error evaluating jsonpath: %s", err)} + return nil, 0, &BadRequestError{S: fmt.Sprintf("error evaluating jsonpath: %s", err)} } // de-serialize the filtered results @@ -248,7 +266,7 @@ func (c *Controller) filterJSONPath(path string, page, perPage int) ([]interface // paginate offset, limit, err := utils.GetPagingAttr(len(results), page, perPage, MaxPerPage) if err != nil { - return nil, 0, &BadRequestError{fmt.Sprintf("unable to paginate: %s", err)} + return nil, 0, &BadRequestError{S: fmt.Sprintf("unable to paginate: %s", err)} } // return the requested page return results[offset : offset+limit], len(results), nil @@ -302,7 +320,7 @@ func (c *Controller) filterXPath(path string, page, perPage int) ([]interface{}, // filter with xpath nodes, err := xpath.QueryAll(doc, path) if err != nil { - return nil, 0, &BadRequestError{fmt.Sprintf("error filtering input with xpath: %s", err)} + return nil, 0, &BadRequestError{S: fmt.Sprintf("error filtering input with xpath: %s", err)} } for _, n := range nodes { results = append(results, getObjectFromXPathNode(n)) @@ -311,7 +329,7 @@ func (c *Controller) filterXPath(path string, page, perPage int) ([]interface{}, // paginate offset, limit, err := utils.GetPagingAttr(len(results), page, perPage, MaxPerPage) if err != nil { - return nil, 0, &BadRequestError{fmt.Sprintf("unable to paginate: %s", err)} + return nil, 0, &BadRequestError{S: fmt.Sprintf("unable to paginate: %s", err)} } // return the requested page return results[offset : offset+limit], len(results), nil @@ -333,7 +351,7 @@ func (c *Controller) filterXPathBytes(path string) ([]byte, error) { // filter with xpath nodes, err := xpath.QueryAll(doc, path) if err != nil { - return nil, &BadRequestError{fmt.Sprintf("error filtering input with xpath: %s", err)} + return nil, &BadRequestError{S: fmt.Sprintf("error filtering input with xpath: %s", err)} } results := make([]interface{}, len(nodes)) for i := range nodes { diff --git a/catalog/errors.go b/catalog/errors.go index 8b866a2..8ae8879 100644 --- a/catalog/errors.go +++ b/catalog/errors.go @@ -13,23 +13,23 @@ import ( ) // Not Found -type NotFoundError struct{ s string } +type NotFoundError struct{ S string } -func (e *NotFoundError) Error() string { return e.s } +func (e *NotFoundError) Error() string { return e.S } // Conflict (non-unique id, assignment to read-only data) -type ConflictError struct{ s string } +type ConflictError struct{ S string } -func (e *ConflictError) Error() string { return e.s } +func (e *ConflictError) Error() string { return e.S } // Bad Request -type BadRequestError struct{ s string } +type BadRequestError struct{ S string } -func (e *BadRequestError) Error() string { return e.s } +func (e *BadRequestError) Error() string { return e.S } // Validation error (HTTP Bad Request) type ValidationError struct { - validationErrors []wot.ValidationError + ValidationErrors []wot.ValidationError } func (e *ValidationError) Error() string { return "validation errors" } diff --git a/catalog/events.go b/catalog/events.go new file mode 100644 index 0000000..366e8d2 --- /dev/null +++ b/catalog/events.go @@ -0,0 +1,41 @@ +package catalog + +// EventListener interface that listens to TDD events. +type EventListener interface { + CreateHandler(new ThingDescription) error + UpdateHandler(old ThingDescription, new ThingDescription) error + DeleteHandler(old ThingDescription) error +} + +// eventHandler implements sequential fav-out/fan-in of events from registry +type eventHandler []EventListener + +func (h eventHandler) created(new ThingDescription) error { + for i := range h { + err := h[i].CreateHandler(new) + if err != nil { + return err + } + } + return nil +} + +func (h eventHandler) updated(old ThingDescription, new ThingDescription) error { + for i := range h { + err := h[i].UpdateHandler(old, new) + if err != nil { + return err + } + } + return nil +} + +func (h eventHandler) deleted(old ThingDescription) error { + for i := range h { + err := h[i].DeleteHandler(old) + if err != nil { + return err + } + } + return nil +} diff --git a/catalog/http.go b/catalog/http.go index 50f0062..0d3cd72 100644 --- a/catalog/http.go +++ b/catalog/http.go @@ -83,7 +83,7 @@ func (a *HTTPAPI) Post(w http.ResponseWriter, req *http.Request) { ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error()) return case *ValidationError: - ValidationErrorResponse(w, err.(*ValidationError).validationErrors) + ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors) return default: ErrorResponse(w, http.StatusInternalServerError, "Error creating the registration:", err.Error()) @@ -137,7 +137,7 @@ func (a *HTTPAPI) Put(w http.ResponseWriter, req *http.Request) { ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error()) return case *ValidationError: - ValidationErrorResponse(w, err.(*ValidationError).validationErrors) + ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors) return default: ErrorResponse(w, http.StatusInternalServerError, "Error creating the registration:", err.Error()) @@ -151,7 +151,7 @@ func (a *HTTPAPI) Put(w http.ResponseWriter, req *http.Request) { ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error()) return case *ValidationError: - ValidationErrorResponse(w, err.(*ValidationError).validationErrors) + ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors) return default: ErrorResponse(w, http.StatusInternalServerError, "Error updating the registration:", err.Error()) @@ -196,7 +196,7 @@ func (a *HTTPAPI) Patch(w http.ResponseWriter, req *http.Request) { ErrorResponse(w, http.StatusBadRequest, "Invalid registration:", err.Error()) return case *ValidationError: - ValidationErrorResponse(w, err.(*ValidationError).validationErrors) + ValidationErrorResponse(w, err.(*ValidationError).ValidationErrors) return default: ErrorResponse(w, http.StatusInternalServerError, "Error updating the registration:", err.Error()) diff --git a/main.go b/main.go index 037f82c..15aa9fd 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( _ "github.com/linksmart/go-sec/auth/keycloak/validator" "github.com/linksmart/go-sec/auth/validator" "github.com/linksmart/thing-directory/catalog" + "github.com/linksmart/thing-directory/notification" "github.com/linksmart/thing-directory/wot" "github.com/rs/cors" uuid "github.com/satori/go.uuid" @@ -104,10 +105,29 @@ func main() { // Create catalog API object api := catalog.NewHTTPAPI(controller, Version) - nRouter, err := setupHTTPRouter(&config.HTTP, api) + // Start notification + var eventQueue notification.EventQueue + switch config.Storage.Type { + case catalog.BackendLevelDB: + eventQueue, err = notification.NewLevelDBEventQueue(config.Storage.DSN+"/sse", nil, 1000) + if err != nil { + panic("Failed to start LevelDB storage for SSE events:" + err.Error()) + } + defer eventQueue.Close() + default: + panic("Could not create SSE storage. Unsupported type:" + config.Storage.Type) + } + notificationController := notification.NewController(eventQueue) + notifAPI := notification.NewSSEAPI(notificationController, Version) + defer notificationController.Stop() + + controller.AddSubscriber(notificationController) + + nRouter, err := setupHTTPRouter(&config.HTTP, api, notifAPI) if err != nil { panic(err) } + // Start listener addr := fmt.Sprintf("%s:%d", config.HTTP.BindAddr, config.HTTP.BindPort) listener, err := net.Listen("tcp", addr) @@ -158,7 +178,7 @@ func main() { log.Println("Shutting down...") } -func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI) (*negroni.Negroni, error) { +func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI, notifAPI *notification.SSEAPI) (*negroni.Negroni, error) { corsHandler := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, @@ -216,6 +236,9 @@ func setupHTTPRouter(config *HTTPConfig, api *catalog.HTTPAPI) (*negroni.Negroni // TD validation r.get("/validation", commonHandlers.ThenFunc(api.GetValidation)) + //TD notification + r.get("/events", commonHandlers.ThenFunc(notifAPI.SubscribeEvent)) + logger := negroni.NewLogger() logFlags := log.LstdFlags if evalEnv(EnvDisableLogTime) { diff --git a/notification/controller.go b/notification/controller.go new file mode 100644 index 0000000..cbbc514 --- /dev/null +++ b/notification/controller.go @@ -0,0 +1,184 @@ +package notification + +import ( + "encoding/json" + "fmt" + "log" + + jsonpatch "github.com/evanphx/json-patch/v5" + "github.com/linksmart/thing-directory/catalog" + "github.com/linksmart/thing-directory/wot" +) + +type Controller struct { + s EventQueue + // Events are pushed to this channel by the main events-gathering routine + Notifier chan Event + + // New client connections + subscribingClients chan subscriber + + // Closed client connections + unsubscribingClients chan chan Event + + // Client connections registry + activeClients map[chan Event]subscriber + + // shutdown + shutdown chan bool +} + +type subscriber struct { + client chan Event + eventTypes []wot.EventType + full bool + lastEventID string +} + +func NewController(s EventQueue) *Controller { + c := &Controller{ + s: s, + Notifier: make(chan Event, 1), + subscribingClients: make(chan subscriber), + unsubscribingClients: make(chan chan Event), + activeClients: make(map[chan Event]subscriber), + shutdown: make(chan bool), + } + go c.handler() + return c +} + +func (c *Controller) subscribe(client chan Event, eventTypes []wot.EventType, full bool, lastEventID string) error { + s := subscriber{client: client, + eventTypes: eventTypes, + full: full, + lastEventID: lastEventID, + } + c.subscribingClients <- s + return nil +} + +func (c *Controller) unsubscribe(client chan Event) error { + c.unsubscribingClients <- client + return nil +} + +func (c *Controller) storeAndNotify(event Event) error { + var err error + event.ID, err = c.s.getNewID() + if err != nil { + return fmt.Errorf("error generating ID : %v", err) + } + + // Notify + c.Notifier <- event + + // Store + err = c.s.addRotate(event) + if err != nil { + return fmt.Errorf("error storing the notification : %v", err) + } + + return nil +} + +func (c *Controller) Stop() { + c.shutdown <- true +} + +func (c *Controller) CreateHandler(new catalog.ThingDescription) error { + event := Event{ + Type: wot.EventTypeCreate, + Data: new, + } + + err := c.storeAndNotify(event) + return err +} + +func (c *Controller) UpdateHandler(old catalog.ThingDescription, new catalog.ThingDescription) error { + oldJson, err := json.Marshal(old) + if err != nil { + return fmt.Errorf("error marshalling old TD") + } + newJson, err := json.Marshal(new) + if err != nil { + return fmt.Errorf("error marshalling new TD") + } + patch, err := jsonpatch.CreateMergePatch(oldJson, newJson) + if err != nil { + return fmt.Errorf("error merging new TD") + } + var td catalog.ThingDescription + if err := json.Unmarshal(patch, &td); err != nil { + return fmt.Errorf("error unmarshalling the patch TD") + } + td[wot.KeyThingID] = old[wot.KeyThingID] + event := Event{ + Type: wot.EventTypeUpdate, + Data: td, + } + err = c.storeAndNotify(event) + return err +} + +func (c *Controller) DeleteHandler(old catalog.ThingDescription) error { + deleted := catalog.ThingDescription{ + wot.KeyThingID: old[wot.KeyThingID], + } + event := Event{ + Type: wot.EventTypeDelete, + Data: deleted, + } + err := c.storeAndNotify(event) + return err +} + +func (c *Controller) handler() { +loop: + for { + select { + case s := <-c.subscribingClients: + c.activeClients[s.client] = s + log.Printf("New subscription. %d active clients", len(c.activeClients)) + + // Send the missed events + if s.lastEventID != "" { + missedEvents, err := c.s.getAllAfter(s.lastEventID) + if err != nil { + log.Printf("error getting the events after ID %s: %s", s.lastEventID, err) + continue loop + } + for _, event := range missedEvents { + sendToSubscriber(s, event) + } + } + case clientChan := <-c.unsubscribingClients: + delete(c.activeClients, clientChan) + close(clientChan) + log.Printf("Unsubscribed. %d active clients", len(c.activeClients)) + case event := <-c.Notifier: + for _, s := range c.activeClients { + sendToSubscriber(s, event) + } + case <-c.shutdown: + log.Println("Shutting down notification controller") + break loop + } + } + +} + +func sendToSubscriber(s subscriber, event Event) { + for _, eventType := range s.eventTypes { + // Send the notification if the type matches + if eventType == event.Type { + toSend := event + if !s.full { + toSend.Data = catalog.ThingDescription{wot.KeyThingID: toSend.Data[wot.KeyThingID]} + } + s.client <- toSend + break + } + } +} diff --git a/notification/event_type.go b/notification/event_type.go new file mode 100644 index 0000000..4306c87 --- /dev/null +++ b/notification/event_type.go @@ -0,0 +1 @@ +package notification diff --git a/notification/ldb_eventqueue.go b/notification/ldb_eventqueue.go new file mode 100644 index 0000000..4455ff3 --- /dev/null +++ b/notification/ldb_eventqueue.go @@ -0,0 +1,156 @@ +package notification + +import ( + "encoding/binary" + "encoding/json" + "flag" + "fmt" + "log" + "net/url" + "strconv" + "sync" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// LevelDB storage +type LevelDBEventQueue struct { + db *leveldb.DB + wg sync.WaitGroup + latestID uint64 + capacity uint64 +} + +func NewLevelDBEventQueue(dsn string, opts *opt.Options, capacity uint64) (EventQueue, error) { + url, err := url.Parse(dsn) + if err != nil { + return nil, err + } + + // Open the database file + db, err := leveldb.OpenFile(url.Path, opts) + if err != nil { + return nil, err + } + + ldbEventQueue := &LevelDBEventQueue{db: db, capacity: capacity} + ldbEventQueue.latestID, err = ldbEventQueue.fetchLatestID() + if err != nil { + return nil, fmt.Errorf("error fetching the latest ID from storage: %w", err) + } + return ldbEventQueue, nil +} + +func (s *LevelDBEventQueue) addRotate(event Event) error { + s.wg.Add(1) + defer s.wg.Done() + + // add new data + bytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("error marshalling event: %w", err) + } + uintID, err := strconv.ParseUint(event.ID, 16, 64) + if err != nil { + return fmt.Errorf("error parsing event ID: %w", err) + } + batch := new(leveldb.Batch) + + batch.Put(uint64ToByte(uintID), bytes) + + // cleanup the older data + if s.latestID > s.capacity { + cleanBefore := s.latestID - s.capacity + 1 // adding 1 as Range is is not inclusive the limit. + iter := s.db.NewIterator(&util.Range{Limit: uint64ToByte(cleanBefore)}, nil) + for iter.Next() { + // log.Println("deleting older entry: ", byteToUint64(iter.Key())) + batch.Delete(iter.Key()) + } + iter.Release() + err = iter.Error() + if err != nil { + return err + } + } + err = s.db.Write(batch, nil) + if err != nil { + return fmt.Errorf("error cleaning up: %w", err) + } + return nil +} + +func (s *LevelDBEventQueue) getAllAfter(id string) ([]Event, error) { + intID, err := strconv.ParseUint(id, 16, 64) + if err != nil { + return nil, fmt.Errorf("error parsing latest ID: %w", err) + } + + // start from the last missing event. + // If the leveldb does not have the requested ID, + // then the iterator starts with oldest available entry + iter := s.db.NewIterator(&util.Range{Start: uint64ToByte(intID + 1)}, nil) + var events []Event + for iter.Next() { + var event Event + err = json.Unmarshal(iter.Value(), &event) + if err != nil { + iter.Release() + return nil, fmt.Errorf("error unmarshalling event: %w", err) + } + events = append(events, event) + } + iter.Release() + err = iter.Error() + if err != nil { + return nil, err + } + return events, nil +} + +func (s *LevelDBEventQueue) getNewID() (string, error) { + s.latestID += 1 + return strconv.FormatUint(s.latestID, 16), nil +} + +func (s *LevelDBEventQueue) Close() { + s.wg.Wait() + err := s.db.Close() + if err != nil { + log.Printf("Error closing SSE storage: %s", err) + } + if flag.Lookup("test.v") == nil { + log.Println("Closed SSE leveldb.") + } +} + +func (s *LevelDBEventQueue) fetchLatestID() (uint64, error) { + var latestID uint64 + s.wg.Add(1) + defer s.wg.Done() + iter := s.db.NewIterator(nil, nil) + exists := iter.Last() + if exists { + latestID = byteToUint64(iter.Key()) + } else { + // Start from 0 + latestID = 0 + } + iter.Release() + err := iter.Error() + if err != nil { + return 0, err + } + return latestID, nil +} + +//byte to unint64 conversion functions and vice versa +func byteToUint64(input []byte) uint64 { + return binary.BigEndian.Uint64(input) +} +func uint64ToByte(input uint64) []byte { + output := make([]byte, 8) + binary.BigEndian.PutUint64(output, input) + return output +} diff --git a/notification/notification.go b/notification/notification.go new file mode 100644 index 0000000..4aa764b --- /dev/null +++ b/notification/notification.go @@ -0,0 +1,41 @@ +package notification + +import ( + "github.com/linksmart/thing-directory/catalog" + "github.com/linksmart/thing-directory/wot" +) + +type Event struct { + ID string `json:"id"` + Type wot.EventType `json:"event"` + Data catalog.ThingDescription `json:"data"` +} + +// NotificationController interface +type NotificationController interface { + // subscribe to the events. the caller will get events through the channel 'client' starting from 'lastEventID' + subscribe(client chan Event, eventTypes []wot.EventType, full bool, lastEventID string) error + + // unsubscribe and close the channel 'client' + unsubscribe(client chan Event) error + + // Stop the controller + Stop() + + catalog.EventListener +} + +// EventQueue interface +type EventQueue interface { + //addRotate adds new and delete the old event if the event queue is full + addRotate(event Event) error + + // getAllAfter gets the events after the event ID + getAllAfter(id string) ([]Event, error) + + // getNewID creates a new ID for the event + getNewID() (string, error) + + // Close all the resources acquired by the queue implementation + Close() +} diff --git a/notification/sse.go b/notification/sse.go new file mode 100644 index 0000000..3ede991 --- /dev/null +++ b/notification/sse.go @@ -0,0 +1,106 @@ +package notification + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + + "github.com/linksmart/thing-directory/catalog" + "github.com/linksmart/thing-directory/wot" +) + +const ( + QueryParamType = "type" + QueryParamFull = "full" + HeaderLastEventID = "Last-Event-ID" +) + +type SSEAPI struct { + controller NotificationController + contentType string +} + +func NewSSEAPI(controller NotificationController, version string) *SSEAPI { + contentType := "text/event-stream" + if version != "" { + contentType += ";version=" + version + } + return &SSEAPI{ + controller: controller, + contentType: contentType, + } + +} + +func (a *SSEAPI) SubscribeEvent(w http.ResponseWriter, req *http.Request) { + eventTypes, full, err := parseQueryParameters(req) + if err != nil { + catalog.ErrorResponse(w, http.StatusBadRequest, err) + } + flusher, ok := w.(http.Flusher) + if !ok { + catalog.ErrorResponse(w, http.StatusInternalServerError, "Streaming unsupported") + return + } + w.Header().Set("Content-Type", a.contentType) + + messageChan := make(chan Event) + + lastEventID := req.Header.Get(HeaderLastEventID) + a.controller.subscribe(messageChan, eventTypes, full, lastEventID) + + go func() { + <-req.Context().Done() + // unsubscribe to events and close the messageChan + a.controller.unsubscribe(messageChan) + }() + + for event := range messageChan { + //data, err := json.MarshalIndent(event.Data, "data: ", "") + data, err := json.Marshal(event.Data) + if err != nil { + log.Printf("error marshaling event %v: %s", event, err) + } + fmt.Fprintf(w, "event: %s\n", event.Type) + fmt.Fprintf(w, "id: %s\n", event.ID) + fmt.Fprintf(w, "data: %s\n\n", data) + + flusher.Flush() + } +} + +func parseQueryParameters(req *http.Request) ([]wot.EventType, bool, error) { + + full := false + req.ParseForm() + + // Parse full or partial events + if strings.EqualFold(req.Form.Get(QueryParamFull), "true") { + full = true + } + + // Parse event type to be subscribed to + queriedTypes := req.Form[QueryParamType] + if queriedTypes == nil { + return []wot.EventType{wot.EventTypeCreate, wot.EventTypeUpdate, wot.EventTypeDelete}, full, nil + } + + var eventTypes []wot.EventType +loopQueriedTypes: + for _, v := range queriedTypes { + eventType := wot.EventType(v) + if !eventType.IsValid() { + return nil, false, fmt.Errorf("invalid type parameter") + } + for _, existing := range eventTypes { + if existing == eventType { + continue loopQueriedTypes + } + } + eventTypes = append(eventTypes, eventType) + } + + return eventTypes, full, nil +} diff --git a/wot/discovery.go b/wot/discovery.go index 022a1ac..89684f2 100644 --- a/wot/discovery.go +++ b/wot/discovery.go @@ -19,6 +19,10 @@ const ( KeyThingRegistrationModified = "modified" KeyThingRegistrationExpires = "expires" KeyThingRegistrationTTL = "ttl" + // TD event types + EventTypeCreate = "create" + EventTypeUpdate = "update" + EventTypeDelete = "delete" ) type EnrichedTD struct { @@ -35,3 +39,14 @@ type ThingRegistration struct { Retrieved *time.Time `json:"retrieved,omitempty"` TTL *float64 `json:"ttl,omitempty"` } + +type EventType string + +func (e EventType) IsValid() bool { + switch e { + case EventTypeCreate, EventTypeUpdate, EventTypeDelete: + return true + default: + return false + } +}