Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progressWriter data race #4011

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

alexcb
Copy link
Collaborator

@alexcb alexcb commented Jul 11, 2023

the progress writer can be used from multiple threads and requires locking.

@alexcb
Copy link
Collaborator Author

alexcb commented Jul 11, 2023

before fixing:

    sandbox.go:288: WARNING: DATA RACE
    sandbox.go:288: Write at 0x00c0006505e0 by goroutine 298:
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.trackProgress()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:113 +0x544
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.(*ProviderWithProgress).ReaderAt.func1()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:36 +0x12b
    sandbox.go:288: 
    sandbox.go:288: Previous read at 0x00c0006505e0 by goroutine 299:
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.trackProgress.func1()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:97 +0x39
    sandbox.go:288: 
    sandbox.go:288: Goroutine 298 (running) created at:
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.(*ProviderWithProgress).ReaderAt()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:36 +0x388
    sandbox.go:288:   github.com/moby/buildkit/util/contentutil.(*localFetcher).Fetch()
    sandbox.go:288:       /src/util/contentutil/copy.go:30 +0x104
    sandbox.go:288:   github.com/moby/buildkit/util/resolver/limited.(*fetcher).Fetch()
    sandbox.go:288:       /src/util/resolver/limited/group.go:113 +0x184
    sandbox.go:288:   github.com/containerd/containerd/remotes.Fetch()
    sandbox.go:288:       /src/vendor/github.com/containerd/containerd/remotes/handlers.go:147 +0x617
    sandbox.go:288:   github.com/containerd/containerd/remotes.FetchHandler.func1()
    sandbox.go:288:       /src/vendor/github.com/containerd/containerd/remotes/handlers.go:104 +0x377
    sandbox.go:288:   github.com/moby/buildkit/util/resolver/retryhandler.New.func1()
    sandbox.go:288:       /src/util/resolver/retryhandler/retry.go:25 +0xc4
    sandbox.go:288:   github.com/moby/buildkit/util/contentutil.Copy()
    sandbox.go:288:       /src/util/contentutil/copy.go:19 +0x1a8
    sandbox.go:288:   github.com/moby/buildkit/cache.lazyRefProvider.Unlazy.func1()
    sandbox.go:288:       /src/cache/remote.go:335 +0x64e
    sandbox.go:288:   github.com/moby/buildkit/util/flightcontrol.(*call[...]).run()
    sandbox.go:288:       /src/util/flightcontrol/flightcontrol.go:121 +0x133
    sandbox.go:288:   github.com/moby/buildkit/util/flightcontrol.(*call[...]).wait.func1()
    sandbox.go:288:       /src/util/flightcontrol/flightcontrol.go:165 +0x47
    sandbox.go:288:   sync.(*Once).doSlow()
    sandbox.go:288:       /usr/local/go/src/sync/once.go:74 +0x101
    sandbox.go:288:   sync.(*Once).Do()
    sandbox.go:288:       /usr/local/go/src/sync/once.go:65 +0x46
    sandbox.go:288:   github.com/moby/buildkit/util/flightcontrol.(*call[...]).wait.func2()
    sandbox.go:288:       /src/util/flightcontrol/flightcontrol.go:165 +0x47
    sandbox.go:288: 
    sandbox.go:288: Goroutine 299 (finished) created at:
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.trackProgress()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:96 +0x231
    sandbox.go:288:   github.com/moby/buildkit/util/pull/pullprogress.(*ProviderWithProgress).ReaderAt.func1()
    sandbox.go:288:       /src/util/pull/pullprogress/progress.go:36 +0x12b

@alexcb alexcb marked this pull request as ready for review July 12, 2023 00:02
Copy link
Member

@tonistiigi tonistiigi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what exactly is causing the race in here. Is it p.meta or pw.meta. If pw.meta, then where is it getting modified?

the progress writer can be used from multiple threads and requires
locking.

Signed-off-by: Alex Couture-Beil <[email protected]>
@alexcb alexcb force-pushed the acb/progress-race branch from 85b57be to c2757d8 Compare July 18, 2023 20:45
@alexcb
Copy link
Collaborator Author

alexcb commented Jul 18, 2023

I had a closer look, and discovered I left off the RLock() code from the Meta function.

So what exactly is causing the race in here. Is it p.meta or pw.meta. If pw.meta, then where is it getting modified?

it's p.meta that's being referenced by multiple threads; here's some details:

which are reproducible by running CGO_ENABLED=1 GOBUILDFLAGS="-race" TESTFLAGS="-v -test.parallel=1 -test.run=TestIntegration/TestBridgeNetworkingDNSNoRootless/worker=containerd" TESTPKGS=./client ./hack/test integration

==================
WARNING: DATA RACE


Read at 0x00c0000ac4f8 by goroutine 81:
  github.com/moby/buildkit/util/progress.(*Progress).Meta()
      /src/util/progress/progress.go:274 +0x10e4
  github.com/moby/buildkit/solver.(*Job).Status()
      /src/solver/progress.go:41 +0x10d0
  github.com/moby/buildkit/solver/llbsolver.(*Solver).Status()
      /src/solver/llbsolver/solver.go:891 +0x152
  github.com/moby/buildkit/control.(*Controller).Status.func1()
      /src/control/control.go:465 +0xa9
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /src/vendor/golang.org/x/sync/errgroup/errgroup.go:75 +0x82

here is the location of the read, from /src/util/progress/progress.go:274

func (p *Progress) Meta(key string) (interface{}, bool) {
	v, ok := p.meta[key] // this is being read from
	return v, ok
}


Previous write at 0x00c0000ac4f8 by goroutine 150:
  github.com/moby/buildkit/util/progress.(*progressWriter).WriteRawProgress()
      /src/util/progress/progress.go:252 +0x144
  github.com/moby/buildkit/util/progress.(*MultiWriter).writeRawProgress()
      /src/util/progress/multiwriter.go:93 +0x27b
  github.com/moby/buildkit/util/progress.(*MultiWriter).WriteRawProgress()
      /src/util/progress/multiwriter.go:85 +0x188
  github.com/moby/buildkit/util/flightcontrol.(*progressState).run()
      /src/util/flightcontrol/flightcontrol.go:312 +0x305
  github.com/moby/buildkit/util/flightcontrol.newCall[...].func1()
      /src/util/flightcontrol/flightcontrol.go:112 +0x58

// meanwhile, we had a different thread which called WriteRawProgress

func (pw *progressWriter) WriteRawProgress(p *Progress) error {
	meta := p.meta
	if len(pw.meta) > 0 {
		meta = map[string]interface{}{}
		for k, v := range p.meta {
			meta[k] = v
		}
		for k, v := range pw.meta {
			if _, ok := meta[k]; !ok {
				meta[k] = v
			}
		}
	}
	p.meta = meta                // <---- a write occurs here
	return pw.writeRawProgress(p)
}


///////////////////////////////////////////////////////////

The reading thread is created at:

Goroutine 81 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /src/vendor/golang.org/x/sync/errgroup/errgroup.go:72 +0x12e
  github.com/moby/buildkit/control.(*Controller).Status()
      /src/control/control.go:464 +0x284
  github.com/moby/buildkit/api/services/control._Control_Status_Handler()
      /src/api/services/control/control.pb.go:2448 +0xf8
  github.com/moby/buildkit/util/grpcerrors.StreamServerInterceptor()
      /src/util/grpcerrors/intercept.go:33 +0x86
  github.com/grpc-ecosystem/go-grpc-middleware.ChainStreamServer.func1.1.1()
      /src/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go:49 +0x82
  go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.StreamServerInterceptor.func1()
      /src/vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go:435 +0x60d
  github.com/grpc-ecosystem/go-grpc-middleware.ChainStreamServer.func1.1.1()
      /src/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go:49 +0x82
  github.com/grpc-ecosystem/go-grpc-middleware.ChainStreamServer.func1()
      /src/vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go:58 +0x11d
  google.golang.org/grpc.(*Server).processStreamingRPC()
      /src/vendor/google.golang.org/grpc/server.go:1627 +0x1f95
  google.golang.org/grpc.(*Server).handleStream()
      /src/vendor/google.golang.org/grpc/server.go:1708 +0xfae
  google.golang.org/grpc.(*Server).serveStreams.func1.2()
      /src/vendor/google.golang.org/grpc/server.go:965 +0xec


func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Control_StatusServer) error {
	if err := sendTimestampHeader(stream); err != nil {
		return err
	}
	ch := make(chan *client.SolveStatus, 8)

	eg, ctx := errgroup.WithContext(stream.Context())
	eg.Go(func() error {
		return c.solver.Status(ctx, req.Ref, ch)  // <------- here is where the reading thread is created
	})

/////////////////////

And the writing thread is created at:

Goroutine 150 (running) created at:
  github.com/moby/buildkit/util/flightcontrol.newCall[...]()
      /src/util/flightcontrol/flightcontrol.go:112 +0x53c
  github.com/moby/buildkit/util/flightcontrol.(*Group[...]).do()
      /src/util/flightcontrol/flightcontrol.go:67 +0x1b4
  github.com/moby/buildkit/util/flightcontrol.(*Group[...]).Do()
      /src/util/flightcontrol/flightcontrol.go:37 +0xab
  github.com/moby/buildkit/source/containerimage.(*puller).CacheKey()
      /src/source/containerimage/pull.go:258 +0x8c4
  github.com/moby/buildkit/solver/llbsolver/ops.(*SourceOp).CacheMap()
      /src/solver/llbsolver/ops/source.go:82 +0xbb
  github.com/moby/buildkit/solver.(*sharedOp).CacheMap.func2()
      /src/solver/jobs.go:832 +0x854
  github.com/moby/buildkit/util/flightcontrol.(*call[...]).run()
      /src/util/flightcontrol/flightcontrol.go:121 +0x141
  github.com/moby/buildkit/util/flightcontrol.(*call[...]).wait.func1()
      /src/util/flightcontrol/flightcontrol.go:165 +0x47
  sync.(*Once).doSlow()
      /usr/local/go/src/sync/once.go:74 +0x101
  sync.(*Once).Do()
      /usr/local/go/src/sync/once.go:65 +0x46
  github.com/moby/buildkit/util/flightcontrol.(*call[...]).wait.func2()
      /src/util/flightcontrol/flightcontrol.go:165 +0x47

func newCall[T any](fn func(ctx context.Context) (T, error)) *call[T] {
	c := &call[T]{
		fn:            fn,
		ready:         make(chan struct{}),
		cleaned:       make(chan struct{}),
		progressState: newProgressState(),
	}
	ctx := newContext(c) // newSharedContext
	pr, pctx, closeProgressWriter := progress.NewContext(context.Background())

	c.progressCtx = pctx
	c.ctx = ctx
	c.closeProgressWriter = closeProgressWriter

	go c.progressState.run(pr) // TODO: remove this, wrap writer instead  <------- writing thread created here

	return c
}


The run is:


func (ps *progressState) run(pr progress.Reader) {
	for {
		p, err := pr.Read(context.TODO())
		if err != nil {
			if err == io.EOF {
				ps.mu.Lock()
				ps.done = true
				ps.mu.Unlock()
				for _, w := range ps.writers {
					w.Close()
				}
			}
			return
		}
		ps.mu.Lock()
		for _, p := range p {
			for _, w := range ps.writers {
				w.WriteRawProgress(p)    // <---- this is where the Write gets called
			}
			ps.items[p.ID] = p
		}
		ps.mu.Unlock()
	}
}

This was quite the deep-dive into the solver progress code, but it looks like:

under func (pw *progressWriter) writeRawProgress(p *Progress) error { from util/progress/progress.go, once the writeRawProgress is called, it places itself in a dirty map:

pw.reader.dirty[p.ID] = p

Then along the ways the Status thread makes a call to func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) { which then returns the contents of this dirty map,
which then eventually it calls p.Meta("vertex"), which triggers a read without a memory barrier.

@jsternberg
Copy link
Collaborator

I think it would probably be better to avoid adding a mutex. Based on the stacktraces and the explanation given, I think it would be better to update the section of code that you have to create a new progress struct with the new metadata rather than modify the existing one.

Something like:

func (p *Progress) WithMeta(meta map[string]interface{}) *Progress {
    newP := *p
    newP.meta = meta
    return &newP
}

While a mutex would solve it, it seems like an error that we can have a multi writer and one of the multi writers could modify the meta of the progress object for the other unrelated writers. Duplicating the object and replacing the meta object would resolve the race condition while also preserving the meta object for the other writers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants