Skip to content

Commit

Permalink
Merge pull request #165 from mercari/fix-cel-plugin
Browse files Browse the repository at this point in the history
Fix cel plugin
  • Loading branch information
goccy authored May 7, 2024
2 parents c8a872e + 65602ab commit 712c202
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 32 deletions.
3 changes: 2 additions & 1 deletion _examples/15_cel_plugin/plugin/plugin_grpc_federation.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion _examples/18_load/plugin/plugin_grpc_federation.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions generator/code_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ func (g *CodeGenerator) Generate(file *resolver.File, enums []*resolver.Enum) ([
if err != nil {
return nil, err
}
return generateGoContent(tmpl, &File{
File: file,
enums: enums,
pkgMap: make(map[*resolver.GoPackage]struct{}),
})
return generateGoContent(tmpl, NewFile(file, enums))
}

type File struct {
Expand All @@ -46,6 +42,14 @@ type File struct {
pkgMap map[*resolver.GoPackage]struct{}
}

func NewFile(file *resolver.File, enums []*resolver.Enum) *File {
return &File{
File: file,
pkgMap: make(map[*resolver.GoPackage]struct{}),
enums: enums,
}
}

func (f *File) Version() string {
return grpcfed.Version
}
Expand Down
3 changes: 2 additions & 1 deletion generator/templates/plugin.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func Register{{ $pluginName }}(plug {{ $pluginName }}) {
}
encoded, err := grpcfed.EncodeCELPluginResponse(res)
if err != nil {
continue
fmt.Fprintf(os.Stderr, "fatal error: failed to encode cel plugin response: %s\n", err.Error())
os.Exit(1)
}
_, _ = os.Stdout.Write(append(encoded, '\n'))
}
Expand Down
94 changes: 70 additions & 24 deletions grpc/federation/cel/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,33 @@ func (p *CELPlugin) CreateInstance(ctx context.Context, celRegistry *types.Regis
modCfg := wazero.NewModuleConfig().
WithStdin(stdinR).
WithStdout(stdoutW).
WithStderr(os.Stderr)

go p.wasmRuntime.InstantiateModule(ctx, p.mod, modCfg) //nolint: errcheck
WithStderr(os.Stderr).
WithArgs("plugin")

instanceModErrCh := make(chan error)
go func() {
_, err := p.wasmRuntime.InstantiateModule(ctx, p.mod, modCfg)
instanceModErrCh <- err
}()
return &CELPluginInstance{
name: p.cfg.Name,
functions: p.cfg.Functions,
celRegistry: celRegistry,
stdin: stdinW,
stdout: stdoutR,
name: p.cfg.Name,
functions: p.cfg.Functions,
celRegistry: celRegistry,
stdin: stdinW,
stdout: stdoutR,
instanceModErrCh: instanceModErrCh,
}
}

type CELPluginInstance struct {
ctx context.Context
name string
functions []*CELFunction
celRegistry *types.Registry
stdin *io.PipeWriter
stdout *io.PipeReader
ctx context.Context
name string
functions []*CELFunction
celRegistry *types.Registry
stdin *io.PipeWriter
stdout *io.PipeReader
instanceModErrCh chan error
closed bool
}

const PluginProtocolVersion = 1
Expand All @@ -148,7 +156,7 @@ var (
)

func (i *CELPluginInstance) ValidatePlugin(ctx context.Context) error {
if _, err := i.stdin.Write([]byte(versionCommand)); err != nil {
if err := i.write([]byte(versionCommand)); err != nil {
return fmt.Errorf("failed to send cel protocol version command: %w", err)
}
content, err := i.recvContent()
Expand Down Expand Up @@ -184,8 +192,27 @@ func (i *CELPluginInstance) ValidatePlugin(ctx context.Context) error {
return nil
}

func (i *CELPluginInstance) write(cmd []byte) error {
if i.closed {
return errors.New("grpc-federation: plugin has already been closed")
}

writeCh := make(chan error)
go func() {
_, err := i.stdin.Write(cmd)
writeCh <- err
}()
select {
case err := <-i.instanceModErrCh:
return err
case err := <-writeCh:
return err
}
}

func (i *CELPluginInstance) Close(ctx context.Context) error {
if _, err := i.stdin.Write([]byte(exitCommand)); err != nil {
defer func() { i.closed = true }()
if err := i.write([]byte(exitCommand)); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -252,7 +279,7 @@ func (i *CELPluginInstance) sendRequest(fn *CELFunction, md metadata.MD, args ..
if err != nil {
return err
}
if _, err := i.stdin.Write(append(encoded, '\n')); err != nil {
if err := i.write(append(encoded, '\n')); err != nil {
return err
}
return nil
Expand All @@ -275,15 +302,34 @@ func (i *CELPluginInstance) recvResponse(fn *CELFunction) ref.Val {
}

func (i *CELPluginInstance) recvContent() (string, error) {
reader := bufio.NewReader(i.stdout)
content, err := reader.ReadString('\n')
if err != nil {
return "", fmt.Errorf("grpc-federation: failed to receive response from wasm plugin: %w", err)
if i.closed {
return "", errors.New("grpc-federation: plugin has already been closed")
}

type readResult struct {
response string
err error
}
if content == "" {
return "", errors.New("grpc-federation: receive empty response from wasm plugin")
readCh := make(chan readResult)
go func() {
reader := bufio.NewReader(i.stdout)
content, err := reader.ReadString('\n')
if err != nil {
readCh <- readResult{err: fmt.Errorf("grpc-federation: failed to receive response from wasm plugin: %w", err)}
return
}
if content == "" {
readCh <- readResult{err: errors.New("grpc-federation: receive empty response from wasm plugin")}
return
}
readCh <- readResult{response: content}
}()
select {
case err := <-i.instanceModErrCh:
return "", err
case result := <-readCh:
return result.response, result.err
}
return content, nil
}

func (i *CELPluginInstance) refToCELPluginValue(typ *cel.Type, v ref.Val) (*plugin.CELPluginValue, error) {
Expand Down

0 comments on commit 712c202

Please sign in to comment.