Skip to content

Commit

Permalink
feat/refactor(puller): update the schema_path in the processed request
Browse files Browse the repository at this point in the history
* feat/refactor(puller): update the schema_path in the processed request

And some clean up around also adding the model disk size
And some minor refactors

* chore: review comment and make fmt
  • Loading branch information
tjohnson31415 authored and njhill committed Nov 20, 2021
1 parent dba7e38 commit 0434a42
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 84 deletions.
65 changes: 37 additions & 28 deletions model-serving-puller/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/go-logr/logr"
"google.golang.org/grpc/status"

"github.com/kserve/modelmesh-runtime-adapter/internal/modelschema"
"github.com/kserve/modelmesh-runtime-adapter/internal/proto/mmesh"
"github.com/kserve/modelmesh-runtime-adapter/internal/util"
)
Expand Down Expand Up @@ -69,16 +70,17 @@ func NewPullerFromConfig(log logr.Logger, config *PullerConfiguration) *Puller {
return s
}

// processLoadModelRequest is for use in an mmesh ModelRuntimeServer that embeds the puller
// ProcessLoadModelRequest is for use in an mmesh serving runtime that embeds the puller
//
// The input request is modified in place and also returned. The path is
// rewritten to a local file path and the size of the model on disk is added to
// the model metadata.
// The input request is modified in place and also returned.
// After pulling the model files, changes to the request are:
// - rewrite ModelPath to a local filesystem path
// - rewrite ModelKey["schema_path"] to a local filesystem path
// - add the size of the model on disk to ModelKey["disk_size_bytes"]
func (s *Puller) ProcessLoadModelRequest(req *mmesh.LoadModelRequest) (*mmesh.LoadModelRequest, error) {
// parse json
var modelKey map[string]interface{}
parseErr := json.Unmarshal([]byte(req.ModelKey), &modelKey)
if parseErr != nil {
if parseErr := json.Unmarshal([]byte(req.ModelKey), &modelKey); parseErr != nil {
return nil, fmt.Errorf("Invalid modelKey in LoadModelRequest. ModelKey value '%s' is not valid JSON: %s", req.ModelKey, parseErr)
}
schemaPath, ok := modelKey[jsonAttrModelSchemaPath].(string)
Expand Down Expand Up @@ -118,9 +120,32 @@ func (s *Puller) ProcessLoadModelRequest(req *mmesh.LoadModelRequest) (*mmesh.Lo
if pullerErr != nil {
return nil, status.Errorf(status.Code(pullerErr), "Failed to pull model from storage due to error: %s", pullerErr)
}
// update the request

// update the model path
req.ModelPath = localPath
req = AddModelDiskSize(req, s.Log)

// update the model key to add the schema path
if schemaPath != "" {
schemaFullPath, joinErr := util.SecureJoin(s.PullerConfig.RootModelDir, req.ModelId, modelschema.ModelSchemaFile)
if joinErr != nil {
return nil, fmt.Errorf("Error joining paths '%s', '%s', and '%s': %w", s.PullerConfig.RootModelDir, req.ModelId, modelschema.ModelSchemaFile, joinErr)
}
modelKey[jsonAttrModelSchemaPath] = schemaFullPath
}

// update the model key to add the disk size
if size, err1 := getModelDiskSize(localPath); err1 != nil {
s.Log.Info("Model disk size will not be included in the LoadModelRequest due to error", "model_key", modelKey, "error", err1)
} else {
modelKey[jsonAttrModelKeyDiskSizeBytes] = size
}

// rewrite the ModelKey JSON with any updates that have been made
modelKeyBytes, err := json.Marshal(modelKey)
if err != nil {
return nil, fmt.Errorf("Error serializing ModelKey back to JSON: %w", err)
}
req.ModelKey = string(modelKeyBytes)

return req, nil
}
Expand Down Expand Up @@ -158,19 +183,12 @@ func (s *Puller) CleanCache() {
}
}

func AddModelDiskSize(req *mmesh.LoadModelRequest, log logr.Logger) *mmesh.LoadModelRequest {
var modelKey map[string]interface{}
err := json.Unmarshal([]byte(req.ModelKey), &modelKey)
if err != nil {
log.Info("ModelDiskSize will not be included in the LoadModelRequest as LoadModelRequest.ModelKey value is not valid JSON", "size", jsonAttrModelKeyDiskSizeBytes, "model_key", req.ModelKey, "error", err)
return req
}

func getModelDiskSize(modelPath string) (int64, error) {
// This walks the local filesystem and accumulates the size of the model
// It would be more efficient to accumulate the size as the files are downloaded,
// but this would require refactoring because the s3 download iterator does not return a size.
var size int64
err = filepath.Walk(req.ModelPath, func(_ string, info os.FileInfo, err error) error {
err := filepath.Walk(modelPath, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -180,19 +198,10 @@ func AddModelDiskSize(req *mmesh.LoadModelRequest, log logr.Logger) *mmesh.LoadM
return nil
})
if err != nil {
log.Info("ModelDiskSize will not be included in the LoadModelRequest due to error getting the disk size", "size", jsonAttrModelKeyDiskSizeBytes, "path", req.ModelPath, "error", err)
return req
return size, fmt.Errorf("Error computing model's disk size: %w", err)
}

modelKey[jsonAttrModelKeyDiskSizeBytes] = size
modelKeyBytes, err := json.Marshal(modelKey)
if err != nil {
log.Info("ModelDiskSize will not be included in the LoadModelRequest as failure in marshalling to JSON", "size", jsonAttrModelKeyDiskSizeBytes, "model_key", modelKey, "error", err)
return req
}
req.ModelKey = string(modelKeyBytes)

return req
return size, nil
}

func (p *Puller) CleanupModel(modelID string) error {
Expand Down
82 changes: 80 additions & 2 deletions model-serving-puller/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func Test_DownloadFromCOS_Success(t *testing.T) {
assert.Nil(t, err)
}

func Test_ProcessLoadModelRequest_Success(t *testing.T) {
func Test_ProcessLoadModelRequest_Success_SingleFileModel(t *testing.T) {
p, mockDownloader, mockCtrl := newPullerWithMock(t)
defer mockCtrl.Finish()

Expand All @@ -241,7 +241,7 @@ func Test_ProcessLoadModelRequest_Success(t *testing.T) {
ModelId: "testmodel",
ModelPath: filepath.Join(p.PullerConfig.RootModelDir, "testmodel/model.zip"),
ModelType: "tensorflow",
ModelKey: `{"bucket":"bucket1","disk_size_bytes":0,"storage_key":"myStorage"}`,
ModelKey: `{"bucket":"bucket1","disk_size_bytes":0,"storage_key":"myStorage","storage_params":{"bucket":"bucket1"}}`,
}

mockDownloader.EXPECT().ListObjectsUnderPrefix("bucket1", "model.zip").Return([]string{"model.zip"}, nil).Times(1)
Expand All @@ -252,6 +252,64 @@ func Test_ProcessLoadModelRequest_Success(t *testing.T) {
assert.Nil(t, err)
}

func Test_ProcessLoadModelRequest_Success_MultiFileModel(t *testing.T) {
p, mockDownloader, mockCtrl := newPullerWithMock(t)
defer mockCtrl.Finish()

request := &mmesh.LoadModelRequest{
ModelId: "testmodel",
ModelPath: "path/to/model",
ModelType: "tensorflow",
ModelKey: `{"storage_key": "myStorage", "bucket": "bucket1"}`,
}

expectedRequestRewrite := &mmesh.LoadModelRequest{
ModelId: "testmodel",
ModelPath: filepath.Join(p.PullerConfig.RootModelDir, "testmodel"),
ModelType: "tensorflow",
ModelKey: `{"bucket":"bucket1","disk_size_bytes":0,"storage_key":"myStorage","storage_params":{"bucket":"bucket1"}}`,
}

mockDownloader.EXPECT().ListObjectsUnderPrefix("bucket1", "path/to/model").Return([]string{"path/to/model/model.zip", "path/to/model/metadata.txt", "path/to/model/model/data"}, nil).Times(1)
mockDownloader.EXPECT().DownloadWithIterator(gomock.Any(), gomock.Any()).Return(nil).Times(1)

returnRequest, err := p.ProcessLoadModelRequest(request)
assert.Equal(t, expectedRequestRewrite, returnRequest)
assert.Nil(t, err)
}

func Test_ProcessLoadModelRequest_SuccessWithSchema(t *testing.T) {
p, mockDownloader, mockCtrl := newPullerWithMock(t)
defer mockCtrl.Finish()

request := &mmesh.LoadModelRequest{
ModelId: "testmodel",
ModelPath: "model.zip",
ModelType: "tensorflow",
ModelKey: `{"storage_key": "myStorage", "bucket": "bucket1", "schema_path": "my_schema"}`,
}

// expect updated schema_path in ModelKey
expectedSchemaPath := filepath.Join(p.PullerConfig.RootModelDir, "testmodel/_schema.json")
expectedRequestRewrite := &mmesh.LoadModelRequest{
ModelId: "testmodel",
ModelPath: filepath.Join(p.PullerConfig.RootModelDir, "testmodel/model.zip"),
ModelType: "tensorflow",
ModelKey: fmt.Sprintf(`{"bucket":"bucket1","disk_size_bytes":0,"schema_path":"%s","storage_key":"myStorage","storage_params":{"bucket":"bucket1"}}`, expectedSchemaPath),
}

// model file
mockDownloader.EXPECT().ListObjectsUnderPrefix("bucket1", "model.zip").Return([]string{"model.zip"}, nil).Times(1)
mockDownloader.EXPECT().DownloadWithIterator(gomock.Any(), gomock.Any()).Return(nil).Times(1)
// schema
mockDownloader.EXPECT().ListObjectsUnderPrefix("bucket1", "my_schema").Return([]string{"my_schema"}, nil).Times(1)
mockDownloader.EXPECT().DownloadWithIterator(gomock.Any(), gomock.Any()).Return(nil).Times(1)

returnRequest, err := p.ProcessLoadModelRequest(request)
assert.Equal(t, expectedRequestRewrite, returnRequest)
assert.Nil(t, err)
}

func Test_ProcessLoadModelRequest_SuccessWithStorageParams(t *testing.T) {
p, mockDownloader, mockCtrl := newPullerWithMock(t)
defer mockCtrl.Finish()
Expand Down Expand Up @@ -328,3 +386,23 @@ func Test_ProcessLoadModelRequest_FailMissingStorageKey(t *testing.T) {
assert.Nil(t, returnRequest)
assert.EqualError(t, err, expectedError)
}

func Test_getModelDiskSize(t *testing.T) {
var diskSizeTests = []struct {
modelPath string
expectedSize int64
}{
{"testModelSize/1/airbnb.model.lr.zip", 15259},
{"testModelSize/1", 15259},
{"testModelSize/2", 39375276},
}

for _, tt := range diskSizeTests {
t.Run("", func(t *testing.T) {
fullPath := filepath.Join(RootModelDir, tt.modelPath)
diskSize, err := getModelDiskSize(fullPath)
assert.NoError(t, err)
assert.EqualValues(t, tt.expectedSize, diskSize)
})
}
}
4 changes: 0 additions & 4 deletions model-serving-puller/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,3 @@ func (s *PullerServer) RuntimeStatus(ctx context.Context, req *mmesh.RuntimeStat
func (s *PullerServer) CleanCache() {
s.puller.CleanCache()
}

func addModelDiskSize(req *mmesh.LoadModelRequest, log logr.Logger) *mmesh.LoadModelRequest {
return puller.AddModelDiskSize(req, log)
}
51 changes: 1 addition & 50 deletions model-serving-puller/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package server

import (
"context"
"encoding/json"
"path/filepath"
"testing"
"time"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/kserve/modelmesh-runtime-adapter/internal/proto/mmesh"
"github.com/kserve/modelmesh-runtime-adapter/model-serving-puller/generated/mocks"
. "github.com/kserve/modelmesh-runtime-adapter/model-serving-puller/puller"
"github.com/stretchr/testify/assert"

"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand Down Expand Up @@ -101,7 +99,7 @@ func TestLoadModel(t *testing.T) {
ModelId: tt.modelID,
ModelPath: filepath.Join(s.puller.PullerConfig.RootModelDir, tt.outputModelPath),
ModelType: "tensorflow",
ModelKey: `{"bucket":"bucket1","disk_size_bytes":0,"storage_key":"myStorage"}`,
ModelKey: `{"bucket":"bucket1","disk_size_bytes":0,"storage_key":"myStorage","storage_params":{"bucket":"bucket1"}}`,
}

// Assert s.LoadModel calls the s3 Download and then the model runtime LoadModel rpc
Expand All @@ -124,50 +122,3 @@ func TestLoadModel(t *testing.T) {
})
}
}

func TestAddModelDiskSize(t *testing.T) {
var diskSizeTests = []struct {
modelPath string
expectedSize int64
}{
{"testModelSize/1/airbnb.model.lr.zip", 15259},
{"testModelSize/1", 15259},
{"testModelSize/2", 39375276},
}

for _, tt := range diskSizeTests {
t.Run("", func(t *testing.T) {
requestBefore := &mmesh.LoadModelRequest{
ModelId: filepath.Base(filepath.Dir(tt.modelPath)),
ModelPath: filepath.Join(RootModelDir, tt.modelPath),
ModelType: "tensorflow",
ModelKey: `{"storage_key": "myStorage", "bucket": "bucket1", "modelType": "tensorflow"}`,
}
var modelKeyBefore map[string]interface{}
err := json.Unmarshal([]byte(requestBefore.ModelKey), &modelKeyBefore)
if err != nil {
t.Fatal("Error unmarshalling modelKeyBefore JSON", err)
}
assert.Equal(t, "myStorage", modelKeyBefore["storage_key"])
assert.Equal(t, "bucket1", modelKeyBefore["bucket"])
assert.Equal(t, "tensorflow", modelKeyBefore["modelType"])
log := zap.New(zap.UseDevMode(true))
requestAfter := addModelDiskSize(requestBefore, log)

assert.Equal(t, requestBefore.ModelId, requestAfter.ModelId)
assert.Equal(t, requestBefore.ModelPath, requestAfter.ModelPath)
assert.Equal(t, requestBefore.ModelType, requestAfter.ModelType)

var modelKeyAfter map[string]interface{}
err = json.Unmarshal([]byte(requestAfter.ModelKey), &modelKeyAfter)
if err != nil {
t.Fatal("Error unmarshalling modelKeyAfter JSON", err)
}

assert.Equal(t, modelKeyBefore["storage_key"], modelKeyAfter["storage_key"])
assert.Equal(t, modelKeyBefore["bucket"], modelKeyAfter["bucket"])
assert.Equal(t, modelKeyBefore["modelType"], modelKeyAfter["modelType"])
assert.EqualValues(t, tt.expectedSize, modelKeyAfter["disk_size_bytes"])
})
}
}

0 comments on commit 0434a42

Please sign in to comment.