Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient,
// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
// request and response sizes.
func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
if err := benchmark.DoUnaryCall(context.TODO(), client, reqSize, respSize); err != nil {
logger.Fatalf("DoUnaryCall failed: %v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,15 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
}
}

// DoUnaryCall performs a unary RPC with given stub and request and response sizes.
func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
// DoUnaryCall performs a unary RPC with given context, stub and request and response sizes.
func DoUnaryCall(ctx context.Context, tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if _, err := tc.UnaryCall(context.Background(), req); err != nil {
if _, err := tc.UnaryCall(ctx, req); err != nil {
return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
}
return nil
Expand Down
50 changes: 22 additions & 28 deletions benchmark/worker/benchmark_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {

type benchmarkClient struct {
closeConns func()
stop chan bool
lastResetTime time.Time
histogramOptions stats.HistogramOptions
lockingHistograms []lockingHistogram
Expand Down Expand Up @@ -168,7 +167,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
}, nil
}

func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
func performRPCs(ctx context.Context, config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
// Read payload size and type from config.
var (
payloadReqSize, payloadRespSize int
Expand Down Expand Up @@ -212,17 +211,17 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc

switch config.RpcType {
case testpb.RpcType_UNARY:
bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
bc.unaryLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
case testpb.RpcType_STREAMING:
bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
bc.streamingLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
default:
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
}

return nil
}

func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
func startBenchmarkClient(ctx context.Context, config *testpb.ClientConfig) (*benchmarkClient, error) {
printClientConfig(config)

// Set running environment like how many cores to use.
Expand All @@ -243,13 +242,12 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
},
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),

stop: make(chan bool),
lastResetTime: time.Now(),
closeConns: closeConns,
rusageLastReset: syscall.GetRusage(),
}

if err = performRPCs(config, conns, bc); err != nil {
if err = performRPCs(ctx, config, conns, bc); err != nil {
// Close all connections if performRPCs failed.
closeConns()
return nil, err
Expand All @@ -258,14 +256,17 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
return bc, nil
}

func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
func (bc *benchmarkClient) unaryLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
for ic, conn := range conns {
client := testgrpc.NewBenchmarkServiceClient(conn)
// For each connection, create rpcCountPerConn goroutines to do rpc.
for j := range rpcCountPerConn {
// Create histogram for each goroutine.
idx := ic*rpcCountPerConn + j
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
if ctx.Err() != nil {
return
}
// Start goroutine on the created mutex and histogram.
go func(idx int) {
// TODO: do warm up if necessary.
Expand All @@ -274,31 +275,25 @@ func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn i
// before starting benchmark.
if poissonLambda == nil { // Closed loop.
for {
select {
case <-bc.stop:
return
default:
}
start := time.Now()
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
continue
if err := benchmark.DoUnaryCall(ctx, client, reqSize, respSize); err != nil {
return
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}
} else { // Open loop.
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
bc.poissonUnary(ctx, client, idx, reqSize, respSize, *poissonLambda)
})
}

}(idx)
}
}
}

func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip
Expand All @@ -309,13 +304,16 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
// For each connection, create rpcCountPerConn goroutines to do rpc.
for j := 0; j < rpcCountPerConn; j++ {
c := testgrpc.NewBenchmarkServiceClient(conn)
stream, err := c.StreamingCall(context.Background())
stream, err := c.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
}
idx := ic*rpcCountPerConn + j
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
if poissonLambda == nil { // Closed loop.
if stream.Context().Err() != nil {
return
}
// Start goroutine on the created mutex and histogram.
go func(idx int) {
// TODO: do warm up if necessary.
Expand All @@ -329,11 +327,6 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
select {
case <-bc.stop:
return
default:
}
}
}(idx)
} else { // Open loop.
Expand All @@ -346,24 +339,26 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
}
}

func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
func (bc *benchmarkClient) poissonUnary(ctx context.Context, client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
go func() {
start := time.Now()
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {

if err := benchmark.DoUnaryCall(ctx, client, reqSize, respSize); err != nil {
return
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}()
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / lambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, lambda)
bc.poissonUnary(ctx, client, idx, reqSize, respSize, lambda)
})
}

func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
go func() {
start := time.Now()

if err := doRPC(stream, reqSize, respSize); err != nil {
return
}
Expand Down Expand Up @@ -430,6 +425,5 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
}

func (bc *benchmarkClient) shutdown() {
close(bc.stop)
bc.closeConns()
}
5 changes: 4 additions & 1 deletion benchmark/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer)
logger.Infof("client setup received when client already exists, shutting down the existing client")
bc.shutdown()
}
bc, err = startBenchmarkClient(t.Setup)

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
bc, err = startBenchmarkClient(ctx, t.Setup)
if err != nil {
return err
}
Expand Down