Skip to content

Commit

Permalink
Add support for CLI quality of life updates: (#95)
Browse files Browse the repository at this point in the history
* Add support for

--targets-file - File with list of targets

Default port settings

$SANSSHELL_PROXY env var

* We're allowed now to just have a proxy and no targets depending on action.

* Fix check to be correct

* Require at least one target and pass it even for changing proxy log level

* Change back to requiring target or proxy

* No use tracking eof state if we don't set it.

Otherwise we loop again and always see EOF and drop the errors we might have gotten.

* Allow no targets so you can still get a Conn to the proxy.

Won't work except for proxy only RPCs like log levels but this makes it avoid passing a needless target

* Fix a typo

* Remove need to set a target for a proxy only action
  • Loading branch information
sfc-gh-jchacon authored Mar 30, 2022
1 parent ae32ebc commit 43a843b
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 16 deletions.
5 changes: 2 additions & 3 deletions cmd/sanssh/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ const (
// Run takes the given context and RunState and executes the command passed in after parsing with flags.
// As this is intended to be called from main() it doesn't return errors and will instead exit on any errors.
func Run(ctx context.Context, rs RunState) {

// Bunch of flag sanity checking
if len(rs.Targets) == 0 {
fmt.Fprintln(os.Stderr, "Must set targets")
if len(rs.Targets) == 0 && rs.Proxy == "" {
fmt.Fprintln(os.Stderr, "Must set a target or a proxy")
os.Exit(1)
}
if len(rs.Targets) > 1 && rs.Proxy == "" {
Expand Down
61 changes: 55 additions & 6 deletions cmd/sanssh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"log"
"os"
"strings"
"time"

Expand All @@ -43,15 +46,25 @@ import (
_ "github.com/Snowflake-Labs/sansshell/services/service/client"
)

const (
defaultProxyPort = 50043
defaultTargetPort = 50042
proxyEnv = "SANSSHELL_PROXY"
)

var (
defaultAddress = "localhost:50042"
defaultTimeout = 3 * time.Second
defaultTimeout = 30 * time.Second

proxyAddr = flag.String("proxy", "", "Address to contact for proxy to sansshell-server. If blank a direct connection to the first entry in --targets will be made")
proxyAddr = flag.String("proxy", "", fmt.Sprintf(
`Address (host[:port]) to contact for proxy to sansshell-server.
%s in the environment can also be set instead of setting this flag. The flag will take precedence.
If blank a direct connection to the first entry in --targets will be made.
If port is blank the default of %d will be used`, proxyEnv, defaultProxyPort))
timeout = flag.Duration("timeout", defaultTimeout, "How long to wait for the command to complete")
credSource = flag.String("credential-source", mtlsFlags.Name(), fmt.Sprintf("Method used to obtain mTLS credentials (one of [%s])", strings.Join(mtls.Loaders(), ",")))
outputsDir = flag.String("output-dir", "", "If set defines a directory to emit output/errors from commands. Files will be generated based on target as destination/0 destination/0.error, etc.")
justification = flag.String("justification", "", "If non-empty will add the key '"+rpcauth.ReqJustKey+"' to the outgoing context Metadata to be passed along to the server for possbile validation and logging.")
justification = flag.String("justification", "", "If non-empty will add the key '"+rpcauth.ReqJustKey+"' to the outgoing context Metadata to be passed along to the server for possible validation and logging.")
targetsFile = flag.String("targets-file", "", "If set read the targets list line by line (as host[:port]) from the indicated file instead of using --targets (error if both flags are used). A blank port acts the same as --targets")

// targets will be bound to --targets for sending a single request to N nodes.
targetsFlag util.StringSliceFlag
Expand All @@ -61,11 +74,11 @@ var (
)

func init() {
targetsFlag.Set(defaultAddress)
// Setup an empty slice so it can be deref'd below regardless of user input.
outputsFlag.Target = &[]string{}
targetsFlag.Target = &[]string{}

flag.Var(&targetsFlag, "targets", "List of targets (separated by commas) to apply RPC against. If --proxy is not set must be one entry only.")
flag.Var(&targetsFlag, "targets", fmt.Sprintf("List of targets (host[:port] separated by commas) to apply RPC against. If --proxy is not set must be one entry only. If port is blank the default of %d will be used", defaultTargetPort))
flag.Var(&outputsFlag, "outputs", `List of output destinations (separated by commas) to direct output into.
Use - to indicated stdout/stderr (default if nothing else is set). Using - does not have to be repeated per target.
Errors will be emitted to <destination>.error separately from command/execution output which will be in the destination file.
Expand All @@ -76,12 +89,48 @@ func init() {
subcommands.ImportantFlag("targets")
subcommands.ImportantFlag("outputs")
subcommands.ImportantFlag("output-dir")
subcommands.ImportantFlag("targets-file")
subcommands.ImportantFlag("justification")
}

func main() {
// If this is blank it'll remain blank which is fine
// as that means just talk to --targets[0] instead.
// If the flag itself was set that will override.
*proxyAddr = os.Getenv(proxyEnv)
flag.Parse()

// If we're given a --targets-file read it in and stuff into targetsFlag
// so it can be processed below as if it was set that way.
if *targetsFile != "" {
// Can't set both flags.
if len(*targetsFlag.Target) > 0 {
log.Fatal("can't set --targets-file and --targets at the same time")
}
f, err := os.Open(*targetsFile)
if err != nil {
log.Fatalf("can't open %s: %v", *targetsFile, err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
*targetsFlag.Target = append(*targetsFlag.Target, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Fatalf("scanning error reading %s: %v", *targetsFile, err)
}
}

// Fixup proxy and targets flags if needed.
if !strings.Contains(*proxyAddr, ":") {
*proxyAddr = fmt.Sprintf("%s:%d", *proxyAddr, defaultProxyPort)
}
for i, t := range *targetsFlag.Target {
if !strings.Contains(t, ":") {
(*targetsFlag.Target)[i] = fmt.Sprintf("%s:%d", t, defaultTargetPort)
}
}

rs := client.RunState{
Proxy: *proxyAddr,
Targets: *targetsFlag.Target,
Expand Down
3 changes: 3 additions & 0 deletions proxy/protoc-gen-go-grpcproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ func generate(plugin *protogen.Plugin, file *protogen.File) {
g.P("}")
g.P("}")
g.P("ret = append(ret, typedResp)")
if clientOnly {
g.P("eof[r.Index] = true")
}
g.P("}")
if clientOnly {
g.P("}")
Expand Down
7 changes: 2 additions & 5 deletions proxy/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ func (p *proxyStream) RecvMsg(m interface{}) error {
if streamStatus.Code() != codes.OK {
closedErr = streamStatus.Err()
}

for _, id := range cl.StreamIds {
p.ids[id].Error = closedErr
p.ids[id].Resp = nil
Expand Down Expand Up @@ -538,10 +537,8 @@ func Dial(proxy string, targets []string, opts ...grpc.DialOption) (*Conn, error
// DialContext is the same as Dial except the context provided can be used to cancel or expire the pending connection.
// By default dial operations are non-blocking. See grpc.Dial for a complete explanation.
func DialContext(ctx context.Context, proxy string, targets []string, opts ...grpc.DialOption) (*Conn, error) {
if len(targets) == 0 {
return nil, status.Error(codes.InvalidArgument, "no targets passed")
}

// If there are no targets things will likely fail but this gives the ability to still send RPCs to the
// proxy itself.
dialTarget := proxy
ret := &Conn{}
if proxy == "" {
Expand Down
1 change: 0 additions & 1 deletion proxy/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func TestDial(t *testing.T) {
{
name: "proxy and no targets",
proxy: "proxy",
wantErr: true,
options: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
},
{
Expand Down
1 change: 1 addition & 0 deletions proxy/testdata/testservice_grpcproxy.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (x *testServiceClientTestClientStreamClientProxy) CloseAndRecv() ([]*TestCl
}
}
ret = append(ret, typedResp)
eof[r.Index] = true
}
}
return ret, nil
Expand Down
2 changes: 1 addition & 1 deletion services/localfile/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (p *cpCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{
retCode := subcommands.ExitSuccess
for r := range respChan {
if r.Error != nil {
fmt.Fprintf(state.Err[r.Index], "immutable client error: %v\n", r.Error)
fmt.Fprintf(state.Err[r.Index], "Got error from target %s (%d) - %v\n", r.Target, r.Index, r.Error)
retCode = subcommands.ExitFailure
}
}
Expand Down
1 change: 1 addition & 0 deletions services/localfile/localfile_grpcproxy.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (x *localFileClientWriteClientProxy) CloseAndRecv() ([]*WriteManyResponse,
}
}
ret = append(ret, typedResp)
eof[r.Index] = true
}
}
return ret, nil
Expand Down

0 comments on commit 43a843b

Please sign in to comment.