Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 北极星旁路功能插件设计——支持 Local Plugin 以及 Remote Plugin #1139

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/pluggable"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/common/version"
config_center "github.com/polarismesh/polaris/config"
Expand Down Expand Up @@ -98,6 +99,12 @@ func Start(configFilePath string) {
metrics.InitMetrics()
eventhub.InitEventHub()

// 加载可插拔插件
if err = pluggable.Discovery(ctx); err != nil {
fmt.Printf("[ERROR] discover pluggable plugin fail: %+v", err)
return
}

// 设置插件配置
plugin.SetPluginConfig(&cfg.Plugin)

Expand Down
115 changes: 115 additions & 0 deletions common/pluggable/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/polaris-contrib/polaris-server-remote-plugin-common/api"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// SetupTimeout is the timeout for setting up a connection.
const SetupTimeout = 5 * time.Second

// GRPCConnectionDialer defines the function to dial a grpc connection.
type GRPCConnectionDialer func(ctx context.Context, name string) (*grpc.ClientConn, error)

// SocketDialContext dials a gRPC connection using a socket.
func SocketDialContext(ctx context.Context, socket string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
unixSock := "unix://" + socket

// disable TLS as default when using socket
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

dialCtx, cancel := context.WithTimeout(ctx, SetupTimeout)
defer cancel()

grpcConn, err := grpc.DialContext(dialCtx, unixSock, opts...)
if err != nil {
return nil, err
}

return grpcConn, nil
}

// GRPCPluginClient defines the interface for a gRPC plugin client,
// polaris server will call the plugin client's Ping method to check if the plugin is alive.
type GRPCPluginClient interface {
// Ping checks if the plugin is alive.
Ping(ctx context.Context, in *api.PingRequest, opts ...grpc.CallOption) (*api.PongResponse, error)
}

// GRPCConnector defines the connector for a gRPC plugin.
type GRPCConnector struct {
// pluginClient is the client that is used to communicate with the plugin, exposed for plugin logic layer.
pluginClient GRPCPluginClient
// dialer use to dial a grpc connection.
dialer GRPCConnectionDialer
// conn is the grpc client connection.
conn *grpc.ClientConn
// clientFactory is the factory to create a grpc client.
clientFactory func(grpc.ClientConnInterface) GRPCPluginClient
}

// NewGRPCConnectorWithDialer creates a new grpc connector for the given client factory and dialer.
func NewGRPCConnectorWithDialer(
dialer GRPCConnectionDialer, factory func(grpc.ClientConnInterface) GRPCPluginClient) *GRPCConnector {
return &GRPCConnector{
dialer: dialer,
clientFactory: factory,
}
}

// Dial init a grpc connection to the plugin server and create a grpc client.
func (g *GRPCConnector) Dial(ctx context.Context, name string) error {
conn, err := g.dialer(ctx, name)
if err != nil {
return errors.Wrapf(err, "unable to open GRPC connection using the dialer")
}

g.conn = conn
g.pluginClient = g.clientFactory(conn)
return nil
}

// PluginClient returns the grpc client.
func (g *GRPCConnector) PluginClient() GRPCPluginClient {
return g.pluginClient
}

// socketDialer returns a GRPCConnectionDialer that dials a grpc connection using a socket.
func socketDialer(socket string, opts ...grpc.DialOption) GRPCConnectionDialer {
return func(ctx context.Context, name string) (*grpc.ClientConn, error) {
return SocketDialContext(ctx, socket, opts...)
}
}

// Ping checks if the plugin is alive.
func (g *GRPCConnector) Ping(ctx context.Context) error {
_, err := g.pluginClient.Ping(ctx, &api.PingRequest{}, grpc.WaitForReady(true))
return err
}

// Close closes the underlying gRPC connection.
func (g *GRPCConnector) Close() error {
return g.conn.Close()
}
180 changes: 180 additions & 0 deletions common/pluggable/pluggable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"context"
"os"
"path/filepath"
"strings"

"github.com/jhump/protoreflect/grpcreflect"
"github.com/pkg/errors"
"google.golang.org/grpc"
_ "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
reflectV1Alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/polarismesh/polaris/common/log"
)

const (
// envVarPolarisPluggableFolder
envVarPolarisPluggableFolder string = "POLARIS_PLUGGABLE_SOCKETS_FOLDER"
// defaultPolarisPluggablePath
defaultPolarisPluggablePath = "/tmp/polaris-pluggable-sockets"
)

// onFinishedCallback is a callback to be called when a plugin is finished.
type onFinishedCallback func(name string, dialer GRPCConnectionDialer)

// onFinished
var onFinished = make(map[string]onFinishedCallback)

// AddOnFinished adds a callback to be called when a plugin is finished.
func AddOnFinished(serviceDesc string, cb onFinishedCallback) {
_, ok := onFinished[serviceDesc]
if ok {
log.Fatalf("onFinished callback for %s already exists", serviceDesc)
}
onFinished[serviceDesc] = cb
}

// Discovery discovers all the plugins.
func Discovery(ctx context.Context) error {
services, err := discovery(ctx)
if err != nil {
return err
}
finished(services)
return nil
}

// finished calls the onFinished callback for the given services.
func finished(services []*pluginService) {
for _, svc := range services {
cb, ok := onFinished[svc.protoRef]
if !ok {
continue
}

cb(svc.name, svc.dialer)
log.Infof("discovered pluggable component service: %s", svc.protoRef)
}
}

// pluginService is a plugin service.
type pluginService struct {
name string
protoRef string
dialer GRPCConnectionDialer
}

// discovery discovers all the plugins.
func discovery(ctx context.Context) ([]*pluginService, error) {
sockFolder := socketFolder()
files, err := pluginFiles(sockFolder)
if err != nil {
return nil, err
}

var services []*pluginService
for _, dirEntry := range files {
if dirEntry.IsDir() {
continue
}

var discoveredServices []*pluginService
discoveredServices, err = trySingleSocket(ctx, dirEntry, sockFolder)

// skip non-socket files.
if err == errNotSocket {
continue
}

// return error if any other error occurs.
if err != nil {
return nil, err
}

services = append(services, discoveredServices...)
}
return services, nil
}

// trySingleSocket tries to discover plugins in a single socket.
func trySingleSocket(ctx context.Context, entry os.DirEntry, socketsFolder string) ([]*pluginService, error) {
socket, err := socketName(entry)
if err != nil {
return nil, err
}

socketFullPath := filepath.Join(socketsFolder, socket)
reflectClient, cleanup, err := dialServerReflection(ctx, socketFullPath)
if err != nil {
return nil, err
}
defer cleanup()

services, err := reflectClient.ListServices()
if err != nil {
return nil, errors.Wrapf(err, "unable to list plugin: %s's services", socket)
}

socketNameWithoutExt := strings.Trim(socket, filepath.Ext(socket))
dialer := socketDialer(socketFullPath, grpc.WithBlock(), grpc.FailOnNonTempDialError(true))

var pluginServices []*pluginService
for _, svc := range services {
pluginServices = append(pluginServices, &pluginService{
protoRef: svc,
dialer: dialer,
name: socketNameWithoutExt,
})
}

return pluginServices, nil
}

// dialServerReflection dials the server reflection service, returning the client and a cleanup function.
func dialServerReflection(ctx context.Context, socket string) (*grpcreflect.Client, func(), error) {
conn, err := SocketDialContext(ctx, socket, grpc.WithBlock())
if err != nil {
return nil, nil, err
}

reflectClient := grpcreflect.NewClientV1Alpha(ctx, reflectV1Alpha.NewServerReflectionClient(conn))
return reflectClient, reflectionConnectionCleanup(conn, reflectClient), nil
}

// reflectionConnectionCleanup closes the reflection connection.
func reflectionConnectionCleanup(conn *grpc.ClientConn, client *grpcreflect.Client) func() {
return func() {
client.Reset()
if err := conn.Close(); err != nil {
log.Errorf("error closing grpc reflection connection: %v", err)
}
}
}

// socketFolder returns the socket folder path specified by the environment variable.
func socketFolder() string {
if value, ok := os.LookupEnv(envVarPolarisPluggableFolder); ok {
return value
}
return defaultPolarisPluggablePath
}
72 changes: 72 additions & 0 deletions common/pluggable/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"os"

"github.com/pkg/errors"

"github.com/polarismesh/polaris/common/log"
)

var (
// errNotSocket is returned when the file is not a socket.
errNotSocket = errors.New("not a socket")
)

// pluginFiles returns the plugin files in the socket folder.
func pluginFiles(sockFolder string) ([]os.DirEntry, error) {
_, err := os.Stat(sockFolder)
if os.IsNotExist(err) {
log.Infof("socket folder %s does not exist, skip plugin discovery", sockFolder)
return nil, nil
}

if err != nil {
log.Errorf("failed to stat socket folder %s: %v", sockFolder, err)
return nil, err
}

var files []os.DirEntry
files, err = os.ReadDir(sockFolder)
if err != nil {
return nil, errors.Wrapf(err, "failed to read socket folder %s", sockFolder)
}

return files, nil
}

// socketName returns true if the file is a socket.
func socketName(entry os.DirEntry) (string, error) {
if entry.IsDir() {
return "", errNotSocket
}

f, err := entry.Info()
if err != nil {
return "", err
}

// skip non-socket files.
if f.Mode()&os.ModeSocket == 0 {
return "", errNotSocket
}

return entry.Name(), nil
}
Loading