Skip to content
This repository was archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
tracing: refactor tracing code in transaction
Browse files Browse the repository at this point in the history
Decouples tracing code from RunStepFuncLocally() func and
added a middleware which implements transaction.StepManager
to trace step funcs. This will provide a better trace
management and clean code.

Signed-off-by: Oshank Kumar <[email protected]>
  • Loading branch information
Oshank Kumar authored and kshlm committed Feb 20, 2019
1 parent f46dee6 commit a94bec2
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 45 deletions.
12 changes: 12 additions & 0 deletions glusterd2/middleware/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package middleware

import (
"net/http"

"go.opencensus.io/plugin/ochttp"
)

// Tracing is a http middleware to be use for trace incoming http.Request
func Tracing(next http.Handler) http.Handler {
return &ochttp.Handler{Handler: next}
}
19 changes: 8 additions & 11 deletions glusterd2/servers/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/justinas/alice"
log "github.com/sirupsen/logrus"
config "github.com/spf13/viper"
"go.opencensus.io/plugin/ochttp"
)

const (
Expand Down Expand Up @@ -92,17 +91,15 @@ func NewMuxed(m cmux.CMux) *GDRest {
gdutils.EnableProfiling(rest.Routes)
}

// Set Handler to opencensus HTTP handler to enable tracing
// Set chain of ordered middlewares
rest.server.Handler = &ochttp.Handler{
Handler: alice.New(
middleware.Recover,
middleware.Expvar,
middleware.ReqIDGenerator,
middleware.LogRequest,
middleware.Auth,
).Then(rest.Routes),
}
rest.server.Handler = alice.New(
middleware.Recover,
middleware.Tracing,
middleware.Expvar,
middleware.ReqIDGenerator,
middleware.LogRequest,
middleware.Auth,
).Then(rest.Routes)

return rest
}
Expand Down
47 changes: 26 additions & 21 deletions glusterd2/transaction/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod

var err error
if uuid.Equal(node, gdctx.MyUUID) {
err = RunStepFuncLocally(origCtx, stepName, ctx)
err = traceStep(RunStepFuncLocally)(origCtx, stepName, ctx)
} else {
// remote node
err = runStepOn(origCtx, stepName, node, ctx)
Expand All @@ -146,30 +146,35 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod
respCh <- stepPeerResp{node, err}
}

// RunStepFuncLocally runs a step func on local node
func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error {

var err error
type runFunc func(origCtx context.Context, stepName string, ctx TxnCtx) error

func traceStep(f runFunc) runFunc {
return func(origCtx context.Context, stepName string, ctx TxnCtx) error {
if origCtx != nil {
_, span := trace.StartSpan(origCtx, stepName)
reqID := ctx.GetTxnReqID()
span.AddAttributes(
trace.StringAttribute("reqID", reqID),
)
defer span.End()
}

if origCtx != nil {
_, span := trace.StartSpan(origCtx, stepName)
reqID := ctx.GetTxnReqID()
span.AddAttributes(
trace.StringAttribute("reqID", reqID),
)
defer span.End()
return f(origCtx, stepName, ctx)
}
}

// RunStepFuncLocally runs a step func on local node
func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error {
stepFunc, ok := getStepFunc(stepName)
if ok {
if err = stepFunc(ctx); err == nil {
// if step function executes successfully, commit the
// results to the store
err = ctx.Commit()
}
} else {
err = ErrStepFuncNotFound
if !ok {
return ErrStepFuncNotFound
}

if err := stepFunc(ctx); err != nil {
return err
}

return err
// if step function executes successfully, commit the
// results to the store
return ctx.Commit()
}
9 changes: 1 addition & 8 deletions glusterd2/transactionv2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

const (
Expand All @@ -36,7 +35,7 @@ func NewEngine() *Engine {
engine := &Engine{
stop: make(chan struct{}),
selfNodeID: gdctx.MyUUID,
stepManager: newStepManager(),
stepManager: newTracingManager(newStepManager()),
txnManager: NewTxnManager(store.Store.Watcher),
}

Expand Down Expand Up @@ -101,12 +100,6 @@ func (txnEng *Engine) Execute(ctx context.Context, txn *Txn) {

txn.Ctx.Logger().WithField("state", status.State).Debug("received a transaction")

ctx, span := trace.StartSpanWithRemoteParent(ctx, "txnEng.Execute/", txn.TxnSpanCtx)
defer span.End()
span.AddAttributes(
trace.StringAttribute("reqID", txn.Ctx.GetTxnReqID()),
)

switch status.State {
case txnPending:
if err := txnEng.executor.Execute(ctx, txn); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions glusterd2/transactionv2/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ type Executor interface {
// NewExecutor returns an Executor instance
func NewExecutor() Executor {
e := &executorImpl{
txnManager: NewTxnManager(store.Store.Watcher),
stepManager: newStepManager(),
selfNodeID: gdctx.MyUUID,
txnManager: NewTxnManager(store.Store.Watcher),
selfNodeID: gdctx.MyUUID,
}

stepManager := newStepManager()
stepManager = newTracingManager(stepManager)
e.stepManager = stepManager
return e
}

Expand Down
75 changes: 75 additions & 0 deletions glusterd2/transactionv2/steptracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package transaction

import (
"context"
"fmt"

"github.com/gluster/glusterd2/glusterd2/transaction"

"go.opencensus.io/trace"
)

func newTracingManager(next StepManager) StepManager {
return &tracingManager{next}
}

type tracingManager struct {
next StepManager
}

// RunStep is a middleware which creates tracing span for step.DoFunc
func (t *tracingManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) (err error) {
spanName := fmt.Sprintf("RunStep/%s", step.DoFunc)
ctx, span := trace.StartSpan(ctx, spanName)
defer span.End()

defer func() {
attrs := []trace.Attribute{
trace.StringAttribute("reqID", txnCtx.GetTxnReqID()),
}
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
}
span.AddAttributes(attrs...)
}()

return t.next.RunStep(ctx, step, txnCtx)
}

// RollBackStep is a middleware which creates tracing span for step.UndoFunc
func (t *tracingManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) (err error) {
spanName := fmt.Sprintf("RollBackStep/%s", step.UndoFunc)
ctx, span := trace.StartSpan(ctx, spanName)
defer span.End()

defer func() {
attrs := []trace.Attribute{
trace.StringAttribute("reqID", txnCtx.GetTxnReqID()),
}
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
}
span.AddAttributes(attrs...)
}()

return t.next.RollBackStep(ctx, step, txnCtx)
}

// SyncStep is a middleware which creates tracing span for Sync steps
func (t *tracingManager) SyncStep(ctx context.Context, stepIndex int, txn *Txn) (err error) {
spanName := fmt.Sprintf("SyncStep/%s", txn.Steps[stepIndex].DoFunc)
ctx, span := trace.StartSpan(ctx, spanName)
defer span.End()

defer func() {
attrs := []trace.Attribute{
trace.StringAttribute("reqID", txn.Ctx.GetTxnReqID()),
}
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
}
span.AddAttributes(attrs...)
}()

return t.next.SyncStep(ctx, stepIndex, txn)
}
4 changes: 2 additions & 2 deletions glusterd2/transactionv2/tracingexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type tracingExecutor struct {

// Execute will record trace info for Execute operation.
func (t *tracingExecutor) Execute(ctx context.Context, txn *Txn) error {
ctx, span := trace.StartSpan(ctx, "txnEng.executor.Execute/")
ctx, span := trace.StartSpanWithRemoteParent(ctx, "txnEng.executor.Execute/", txn.TxnSpanCtx)
defer span.End()
span.AddAttributes(
trace.StringAttribute("reqID", txn.Ctx.GetTxnReqID()),
Expand All @@ -28,7 +28,7 @@ func (t *tracingExecutor) Execute(ctx context.Context, txn *Txn) error {

// Resume will record trace info for Resume operation.
func (t *tracingExecutor) Resume(ctx context.Context, txn *Txn) error {
ctx, span := trace.StartSpan(ctx, "txnEng.executor.Resume/")
ctx, span := trace.StartSpanWithRemoteParent(ctx, "txnEng.executor.Resume/", txn.TxnSpanCtx)
defer span.End()
span.AddAttributes(
trace.StringAttribute("reqID", txn.Ctx.GetTxnReqID()),
Expand Down

0 comments on commit a94bec2

Please sign in to comment.