From 84c60d0ef89d6c2d0cb1fd19f994c4df9a6da7f2 Mon Sep 17 00:00:00 2001 From: Isabel Andrade Date: Fri, 6 May 2022 14:25:26 +0100 Subject: [PATCH] Support module CRUD operations in device client This change adds all the CRUD module operations to the HTTP device client. Note that the `Module` model definitions are the same as for the `iotservice` client and they have been reused instead of redefining them. Ideally, this model definitions should be in the common package but that would break backward compatibility. --- common/message.go | 44 ------- iotdevice/client.go | 46 ++++++++ iotdevice/client_test.go | 129 ++++++++++++++++++++ iotdevice/transport/http/http.go | 164 ++++++++++++++++++++++++-- iotdevice/transport/http/http_test.go | 130 ++++++++++++++++++++ iotdevice/transport/mqtt/mqtt.go | 28 +++++ iotdevice/transport/transport.go | 6 + 7 files changed, 495 insertions(+), 52 deletions(-) create mode 100644 iotdevice/client_test.go create mode 100644 iotdevice/transport/http/http_test.go diff --git a/common/message.go b/common/message.go index 04f3359..b3d2c63 100644 --- a/common/message.go +++ b/common/message.go @@ -60,47 +60,3 @@ type ConnectionAuthMethod struct { Type string `json:"type"` Issuer string `json:"issuer"` } - -// ModuleIdentity - an entity to create or update a module identity device-to-cloud -type ModuleIdentity struct { - // ModuleId - The unique identifier of the module. - ModuleId string `json:"moduleId"` - - // DeviceId - The unique identifier of the device. - DeviceId string `json:"deviceId"` - - // Authentication - The authentication mechanism used by the module when connecting to the service and edge hub. - Authentication struct { - Type string `json:"type"` - SymmetricKey struct { - PrimaryKey string `json:"primaryKey"` - SecondaryKey string `json:"secondaryKey"` - } `json:"symmetricKey"` - X509Thumbprint struct { - PrimaryThumbprint string `json:"primaryThumbprint"` - SecondaryThumbprint string `json:"secondaryThumbprint"` - } `json:"x509Thumbprint"` - } `json:"authentication"` - - // ManagedBy - Identifies who manages this module. For instance, this value is "IotEdge" if the edge runtime owns this module. - // If not specified, will be null. - ManagedBy string `json:"managedBy"` - - // LastActivityTime - The date and time the device last connected, received, or sent a message. - LastActivityTime string `json:"lastActivityTime"` - - // CloudToDeviceMessageCount - The number of cloud-to-module messages currently queued to be sent to the module. - CloudToDeviceMessageCount int `json:"cloudToDeviceMessageCount"` - - // ConnectionState - The connection state of the device. - ConnectionState string `json:"connectionState"` - - // ConnectionStateUpdatedTime - The date and time the connection state was last updated - ConnectionStateUpdatedTime string `json:"connectionStateUpdatedTime"` - - // Etag - The string representing a weak ETag for the module identity, as per RFC7232. - Etag string `json:"etag"` - - // GenerationId - The IoT Hub generated, case-sensitive string up to 128 characters long. This value is used to distinguish modules with the same moduleId, when they have been deleted and re-created. - GenerationId string `json:"generationId"` -} diff --git a/iotdevice/client.go b/iotdevice/client.go index ea8faf6..28a1f27 100644 --- a/iotdevice/client.go +++ b/iotdevice/client.go @@ -14,6 +14,7 @@ import ( "github.com/amenzhinsky/iothub/common" "github.com/amenzhinsky/iothub/iotdevice/transport" + "github.com/amenzhinsky/iothub/iotservice" "github.com/amenzhinsky/iothub/logger" ) @@ -390,3 +391,48 @@ func (c *Client) UploadFile(ctx context.Context, blobName string, file io.Reader return err } + +// ListModules list all the registered modules on the device. +func (c *Client) ListModules(ctx context.Context) ([]*iotservice.Module, error) { + if err := c.checkConnection(ctx); err != nil { + return nil, err + } + + return c.tr.ListModules(ctx) +} + +// CreateModule Creates adds the given module to the registry. +func (c *Client) CreateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + if err := c.checkConnection(ctx); err != nil { + return nil, err + } + + return c.tr.CreateModule(ctx, m) +} + +// GetModule retrieves the named module. +func (c *Client) GetModule(ctx context.Context, moduleID string) (*iotservice.Module, error) { + if err := c.checkConnection(ctx); err != nil { + return nil, err + } + + return c.tr.GetModule(ctx, moduleID) +} + +// UpdateModule updates the given module. +func (c *Client) UpdateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + if err := c.checkConnection(ctx); err != nil { + return nil, err + } + + return c.tr.UpdateModule(ctx, m) +} + +// DeleteModule removes the named device module. +func (c *Client) DeleteModule(ctx context.Context, m *iotservice.Module) error { + if err := c.checkConnection(ctx); err != nil { + return err + } + + return c.tr.DeleteModule(ctx, m) +} diff --git a/iotdevice/client_test.go b/iotdevice/client_test.go new file mode 100644 index 0000000..a22fab0 --- /dev/null +++ b/iotdevice/client_test.go @@ -0,0 +1,129 @@ +package iotdevice + +import ( + "context" + "os" + "strconv" + "testing" + "time" + + "github.com/amenzhinsky/iothub/iotdevice/transport/http" + "github.com/amenzhinsky/iothub/iotservice" +) + +var testRunID = strconv.Itoa(int(time.Now().Unix())) + +func newServiceClient(t *testing.T) *iotservice.Client { + t.Helper() + cs := os.Getenv("TEST_IOTHUB_SERVICE_CONNECTION_STRING") + if cs == "" { + t.Fatal("$TEST_IOTHUB_SERVICE_CONNECTION_STRING is empty") + } + c, err := iotservice.NewFromConnectionString(cs) + if err != nil { + t.Fatal(err) + } + return c +} + +func newDevice(t *testing.T, c *iotservice.Client) *iotservice.Device { + t.Helper() + + device := &iotservice.Device{ + DeviceID: "test-device-" + testRunID, + } + device, err := c.CreateDevice(context.Background(), device) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + device.ETag = "" + if err := c.DeleteDevice(context.Background(), device); err != nil { + t.Fatal(err) + } + }) + return device +} + +func newDeviceClient(t *testing.T) *Client { + t.Helper() + sc := newServiceClient(t) + device := newDevice(t, sc) + + dcs, err := sc.DeviceConnectionString(device, false) + if err != nil { + t.Fatal(err) + } + + dc, err := NewFromConnectionString(http.New(), dcs) + if err != nil { + t.Fatal(err) + } + + if err := dc.Connect(context.Background()); err != nil { + t.Fatal(err) + } + + return dc +} + +func newModule(t *testing.T, c *Client) *iotservice.Module { + module := &iotservice.Module{ + DeviceID: c.DeviceID(), + ModuleID: "test-module-" + testRunID, + ManagedBy: "admin", + } + module, err := c.CreateModule(context.Background(), module) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + module.ETag = "" + if err := c.DeleteModule(context.Background(), module); err != nil { + t.Fatal(err) + } + }) + return module +} + +func TestListModules(t *testing.T) { + c := newDeviceClient(t) + module := newModule(t, c) + modules, err := c.ListModules(context.Background()) + if err != nil { + t.Fatal(err) + } + + if len(modules) != 1 { + t.Errorf("module count = %d, want 1", len(modules)) + } + + if modules[0].ModuleID != module.ModuleID { + t.Errorf("moduleID = %s, want %s", modules[0].ModuleID, module.ModuleID) + } +} + +func TestGetModule(t *testing.T) { + c := newDeviceClient(t) + module := newModule(t, c) + if _, err := c.GetModule( + context.Background(), module.ModuleID, + ); err != nil { + t.Fatal(err) + } +} + +func TestUpdateModule(t *testing.T) { + c := newDeviceClient(t) + module := newModule(t, c) + module.Authentication.Type = iotservice.AuthSAS + updatedModule, err := c.UpdateModule(context.Background(), module) + + if err != nil { + t.Fatal(err) + } + + if updatedModule.Authentication.Type != iotservice.AuthSAS { + t.Errorf("authentication type = `%s`, want `%s`", updatedModule.Authentication.Type, iotservice.AuthSAS) + } +} diff --git a/iotdevice/transport/http/http.go b/iotdevice/transport/http/http.go index 88f1a5c..543c46c 100644 --- a/iotdevice/transport/http/http.go +++ b/iotdevice/transport/http/http.go @@ -14,9 +14,12 @@ import ( "github.com/amenzhinsky/iothub/common" "github.com/amenzhinsky/iothub/iotdevice/transport" + "github.com/amenzhinsky/iothub/iotservice" "github.com/amenzhinsky/iothub/logger" ) +const API_VERSION = "2020-09-30" + var ( ErrNotImplemented = errors.New("not implemented") DefaultSASTTL = 30 * time.Second @@ -124,21 +127,166 @@ func (tr *Transport) UpdateTwinProperties(ctx context.Context, payload []byte) ( return 0, ErrNotImplemented } -//CreateOrUpdateModuleIdentity Creates or updates the module identity for a device in the IoT Hub. -//Notice the method is PUT and overrides previous data. -func (tr *Transport) CreateOrUpdateModuleIdentity(ctx context.Context, identity *common.ModuleIdentity) error { - target, err := url.Parse(fmt.Sprintf("https://%s/devices/%s/modules/$edgeAgent?api-version=2020-03-13", tr.creds.GetHostName(), url.PathEscape(tr.creds.GetDeviceID()))) - requestPayloadBytes, err := json.Marshal(&identity) +// ListModules list all the registered modules on the device. +func (tr *Transport) ListModules(ctx context.Context) ([]*iotservice.Module, error) { + target, err := url.Parse( + fmt.Sprintf( + "https://%s/devices/%s/modules?api-version=%s", + tr.creds.GetHostName(), + url.PathEscape(tr.creds.GetDeviceID()), + API_VERSION, + ), + ) if err != nil { - return err + return nil, err + } + + resp, err := tr.getTokenAndSendRequest(http.MethodGet, target, nil, nil) + if err != nil { + return nil, err + } + + err = tr.handleErrorResponse(resp) + if err != nil { + return nil, err + } + + var res []*iotservice.Module + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, err + } + + return res, nil +} + +// CreateModule Creates adds the given module to the registry. +func (tr *Transport) CreateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + target, err := url.Parse( + fmt.Sprintf( + "https://%s/devices/%s/modules/%s?api-version=%s", + tr.creds.GetHostName(), + url.PathEscape(tr.creds.GetDeviceID()), url.PathEscape(m.ModuleID), + API_VERSION, + ), + ) + if err != nil { + return nil, err + } + requestPayloadBytes, err := json.Marshal(&m) + if err != nil { + return nil, err + } + + resp, err := tr.getTokenAndSendRequest(http.MethodPut, target, requestPayloadBytes, map[string]string{}) + if err != nil { + return nil, err + } + + err = tr.handleErrorResponse(resp) + if err != nil { + return nil, err + } + + var res iotservice.Module + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, err + } + + return &res, nil +} + +// GetModule retrieves the named module. +func (tr *Transport) GetModule(ctx context.Context, moduleID string) (*iotservice.Module, error) { + target, err := url.Parse( + fmt.Sprintf( + "https://%s/devices/%s/modules/%s?api-version=%s", + tr.creds.GetHostName(), + url.PathEscape(tr.creds.GetDeviceID()), url.PathEscape(moduleID), + API_VERSION, + ), + ) + if err != nil { + return nil, err + } + + resp, err := tr.getTokenAndSendRequest(http.MethodGet, target, nil, nil) + if err != nil { + return nil, err + } + + err = tr.handleErrorResponse(resp) + if err != nil { + return nil, err + } + + var res iotservice.Module + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, err + } + + return &res, nil +} + +// UpdateModule updates the given module. +func (tr *Transport) UpdateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + target, err := url.Parse( + fmt.Sprintf( + "https://%s/devices/%s/modules/%s?api-version=%s", + tr.creds.GetHostName(), + url.PathEscape(m.DeviceID), url.PathEscape(m.ModuleID), + API_VERSION, + ), + ) + if err != nil { + return nil, err + } + requestPayloadBytes, err := json.Marshal(&m) + if err != nil { + return nil, err + } + + resp, err := tr.getTokenAndSendRequest(http.MethodPut, target, requestPayloadBytes, ifMatchHeader(m.ETag)) + if err != nil { + return nil, err } - resp, err := tr.getTokenAndSendRequest(http.MethodPut, target, requestPayloadBytes, map[string]string{"If-Match": "*"}) + var res iotservice.Module + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, err + } + + return &res, nil +} + +// DeleteModule removes the named device module. +func (tr *Transport) DeleteModule(ctx context.Context, m *iotservice.Module) error { + target, err := url.Parse( + fmt.Sprintf( + "https://%s/devices/%s/modules/%s?api-version=%s", + tr.creds.GetHostName(), + url.PathEscape(m.DeviceID), url.PathEscape(m.ModuleID), + API_VERSION, + ), + ) if err != nil { return err } - return tr.handleErrorResponse(resp) + _, err = tr.getTokenAndSendRequest(http.MethodDelete, target, nil, ifMatchHeader(m.ETag)) + return err +} + +func ifMatchHeader(etag string) map[string]string { + if etag == "" { + etag = "*" + } else { + etag = `"` + etag + `"` + } + return map[string]string{"If-Match": etag} } func (tr *Transport) handleErrorResponse(resp *http.Response) error { diff --git a/iotdevice/transport/http/http_test.go b/iotdevice/transport/http/http_test.go new file mode 100644 index 0000000..e6a2ef0 --- /dev/null +++ b/iotdevice/transport/http/http_test.go @@ -0,0 +1,130 @@ +package http + +import ( + "context" + "os" + "strconv" + "testing" + "time" + + "github.com/amenzhinsky/iothub/iotdevice" + "github.com/amenzhinsky/iothub/iotservice" +) + +var testRunID = strconv.Itoa(int(time.Now().Unix())) + +func newClient(t *testing.T) *iotservice.Client { + t.Helper() + cs := os.Getenv("TEST_IOTHUB_SERVICE_CONNECTION_STRING") + if cs == "" { + t.Fatal("$TEST_IOTHUB_SERVICE_CONNECTION_STRING is empty") + } + c, err := iotservice.NewFromConnectionString(cs) + if err != nil { + t.Fatal(err) + } + return c +} + +func newDevice(t *testing.T, c *iotservice.Client) *iotservice.Device { + t.Helper() + + device := &iotservice.Device{ + DeviceID: "test-device-" + testRunID, + } + device, err := c.CreateDevice(context.Background(), device) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + device.ETag = "" + if err := c.DeleteDevice(context.Background(), device); err != nil { + t.Fatal(err) + } + }) + return device +} + +func newTransport(t *testing.T) *Transport { + t.Helper() + + sc := newClient(t) + device := newDevice(t, sc) + + dcs, err := sc.DeviceConnectionString(device, false) + if err != nil { + t.Fatal(err) + } + + creds, err := iotdevice.ParseConnectionString(dcs) + if err != nil { + t.Fatal(err) + } + tr := New() + if err := tr.Connect(context.Background(), creds); err != nil { + t.Fatal(err) + } + + return tr +} + +func newModule(t *testing.T, tr *Transport) *iotservice.Module { + module := &iotservice.Module{ + DeviceID: tr.creds.GetDeviceID(), + ModuleID: "test-module-" + testRunID, + ManagedBy: "admin", + } + module, err := tr.CreateModule(context.Background(), module) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + module.ETag = "" + if err := tr.DeleteModule(context.Background(), module); err != nil { + t.Fatal(err) + } + }) + return module +} + +func TestListModules(t *testing.T) { + tr := newTransport(t) + module := newModule(t, tr) + modules, err := tr.ListModules(context.Background()) + if err != nil { + t.Fatal(err) + } + + if len(modules) != 1 { + t.Errorf("module count = %d, want 1", len(modules)) + } + + if modules[0].ModuleID != module.ModuleID { + t.Errorf("moduleID = %s, want %s", modules[0].ModuleID, module.ModuleID) + } +} + +func TestGetModule(t *testing.T) { + tr := newTransport(t) + module := newModule(t, tr) + if _, err := tr.GetModule( + context.Background(), module.ModuleID, + ); err != nil { + t.Fatal(err) + } +} + +func TestUpdateModule(t *testing.T) { + tr := newTransport(t) + module := newModule(t, tr) + module.Authentication.Type = iotservice.AuthSAS + updatedModule, err := tr.UpdateModule(context.Background(), module) + + if err != nil { + t.Fatal(err) + } + + if updatedModule.Authentication.Type != iotservice.AuthSAS { + t.Errorf("authentication type = `%s`, want `%s`", updatedModule.Authentication.Type, iotservice.AuthSAS) + } +} diff --git a/iotdevice/transport/mqtt/mqtt.go b/iotdevice/transport/mqtt/mqtt.go index d60d722..2bc5dca 100644 --- a/iotdevice/transport/mqtt/mqtt.go +++ b/iotdevice/transport/mqtt/mqtt.go @@ -15,10 +15,13 @@ import ( "github.com/amenzhinsky/iothub/common" "github.com/amenzhinsky/iothub/iotdevice/transport" + "github.com/amenzhinsky/iothub/iotservice" "github.com/amenzhinsky/iothub/logger" mqtt "github.com/eclipse/paho.mqtt.golang" ) +var ErrNotImplemented = errors.New("not implemented") + // DefaultQoS is the default quality of service value. const DefaultQoS = 1 @@ -582,3 +585,28 @@ func (tr *Transport) UploadToBlob(ctx context.Context, sasURI string, file io.Re func (tr *Transport) NotifyUploadComplete(ctx context.Context, correlationID string, success bool, statusCode int, statusDescription string) error { return fmt.Errorf("unavailable in the MQTT transport") } + +// ListModules list all the registered modules on the device. +func (tr *Transport) ListModules(ctx context.Context) ([]*iotservice.Module, error) { + return nil, ErrNotImplemented +} + +// CreateModule Creates adds the given module to the registry. +func (tr *Transport) CreateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + return nil, ErrNotImplemented +} + +// GetModule retrieves the named module. +func (tr *Transport) GetModule(ctx context.Context, moduleID string) (*iotservice.Module, error) { + return nil, ErrNotImplemented +} + +// UpdateModule updates the given module. +func (tr *Transport) UpdateModule(ctx context.Context, m *iotservice.Module) (*iotservice.Module, error) { + return nil, ErrNotImplemented +} + +// DeleteModule removes the named device module. +func (tr *Transport) DeleteModule(ctx context.Context, m *iotservice.Module) error { + return ErrNotImplemented +} diff --git a/iotdevice/transport/transport.go b/iotdevice/transport/transport.go index c9fb1c1..0e2f52f 100644 --- a/iotdevice/transport/transport.go +++ b/iotdevice/transport/transport.go @@ -7,6 +7,7 @@ import ( "time" "github.com/amenzhinsky/iothub/common" + "github.com/amenzhinsky/iothub/iotservice" "github.com/amenzhinsky/iothub/logger" ) @@ -23,6 +24,11 @@ type Transport interface { GetBlobSharedAccessSignature(ctx context.Context, blobName string) (string, string, error) UploadToBlob(ctx context.Context, sasURI string, file io.Reader, size int64) error NotifyUploadComplete(ctx context.Context, correlationID string, success bool, statusCode int, statusDescription string) error + ListModules(ctx context.Context) ([]*iotservice.Module, error) + CreateModule(ctx context.Context, module *iotservice.Module) (*iotservice.Module, error) + GetModule(ctx context.Context, moduleID string) (*iotservice.Module, error) + UpdateModule(ctx context.Context, module *iotservice.Module) (*iotservice.Module, error) + DeleteModule(ctx context.Context, module *iotservice.Module) error Close() error }