Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit ed0b123

Browse files
author
Ulysses Souza
authored
Merge pull request #1901 from mat007/fix-races
Fix races
2 parents 9d03155 + 2d0d388 commit ed0b123

File tree

11 files changed

+50
-39
lines changed

11 files changed

+50
-39
lines changed

Diff for: kube/client/client.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,7 @@ func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOp
285285

286286
eg, ctx := errgroup.WithContext(ctx)
287287
for serviceName, servicePorts := range opts.Services {
288-
serviceName := serviceName
289-
servicePorts := servicePorts
288+
serviceName, servicePorts := serviceName, servicePorts
290289
pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
291290
if err != nil {
292291
return err

Diff for: pkg/compose/convergence.go

+23-7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ const (
5555
type convergence struct {
5656
service *composeService
5757
observedState map[string]Containers
58+
stateMutex sync.Mutex
59+
}
60+
61+
func (c *convergence) getObservedState(serviceName string) Containers {
62+
c.stateMutex.Lock()
63+
defer c.stateMutex.Unlock()
64+
return c.observedState[serviceName]
65+
}
66+
67+
func (c *convergence) setObservedState(serviceName string, containers Containers) {
68+
c.stateMutex.Lock()
69+
defer c.stateMutex.Unlock()
70+
c.observedState[serviceName] = containers
5871
}
5972

6073
func newConvergence(services []string, state Containers, s *composeService) *convergence {
@@ -97,7 +110,7 @@ var mu sync.Mutex
97110

98111
// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
99112
func (c *convergence) updateProject(project *types.Project, service string) {
100-
containers := c.observedState[service]
113+
containers := c.getObservedState(service)
101114
container := containers[0]
102115

103116
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
@@ -148,7 +161,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
148161
if err != nil {
149162
return err
150163
}
151-
containers := c.observedState[service.Name]
164+
containers := c.getObservedState(service.Name)
152165
actual := len(containers)
153166
updated := make(Containers, expected)
154167

@@ -157,6 +170,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
157170
for i, container := range containers {
158171
if i > expected {
159172
// Scale Down
173+
container := container
160174
eg.Go(func() error {
161175
err := c.service.apiClient.ContainerStop(ctx, container.ID, timeout)
162176
if err != nil {
@@ -178,7 +192,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
178192
name := getContainerProgressName(container)
179193
diverged := container.Labels[api.ConfigHashLabel] != configHash
180194
if diverged || recreate == api.RecreateForce || service.Extensions[extLifecycle] == forceRecreate {
181-
i := i
195+
i, container := i, container
182196
eg.Go(func() error {
183197
recreated, err := c.service.recreateContainer(ctx, project, service, container, inherit, timeout)
184198
updated[i] = recreated
@@ -197,6 +211,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
197211
case ContainerExited:
198212
w.Event(progress.CreatedEvent(name))
199213
default:
214+
container := container
200215
eg.Go(func() error {
201216
return c.service.startContainer(ctx, container)
202217
})
@@ -212,16 +227,17 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
212227
// Scale UP
213228
number := next + i
214229
name := getContainerName(project.Name, service, number)
230+
i := i
215231
eg.Go(func() error {
216232
container, err := c.service.createContainer(ctx, project, service, name, number, false, true)
217-
updated[actual+i-1] = container
233+
updated[actual+i] = container
218234
return err
219235
})
220236
continue
221237
}
222238

223239
err = eg.Wait()
224-
c.observedState[service.Name] = updated
240+
c.setObservedState(service.Name, updated)
225241
return err
226242
}
227243

@@ -542,11 +558,11 @@ func (s *composeService) startService(ctx context.Context, project *types.Projec
542558

543559
w := progress.ContextWriter(ctx)
544560
eg, ctx := errgroup.WithContext(ctx)
545-
for _, c := range containers {
546-
container := c
561+
for _, container := range containers {
547562
if container.State == ContainerRunning {
548563
continue
549564
}
565+
container := container
550566
eg.Go(func() error {
551567
eventName := getContainerProgressName(container)
552568
w.Event(progress.StartingEvent(eventName))

Diff for: pkg/compose/cp.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ func (s *composeService) Copy(ctx context.Context, project *types.Project, opts
8181
}
8282

8383
g := errgroup.Group{}
84-
for i := range containers {
85-
containerID := containers[i].ID
86-
84+
for _, container := range containers {
85+
containerID := container.ID
8786
g.Go(func() error {
8887
switch direction {
8988
case fromService:

Diff for: pkg/compose/dependencies.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -91,22 +91,22 @@ func visit(ctx context.Context, project *types.Project, traversalConfig graphTra
9191
// Note: this could be `graph.walk` or whatever
9292
func run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, traversalConfig graphTraversalConfig, fn func(context.Context, string) error) error {
9393
for _, node := range nodes {
94-
n := node
9594
// Don't start this service yet if all of its children have
9695
// not been started yet.
97-
if len(traversalConfig.filterAdjacentByStatusFn(graph, n.Service, traversalConfig.adjacentServiceStatusToSkip)) != 0 {
96+
if len(traversalConfig.filterAdjacentByStatusFn(graph, node.Service, traversalConfig.adjacentServiceStatusToSkip)) != 0 {
9897
continue
9998
}
10099

100+
node := node
101101
eg.Go(func() error {
102-
err := fn(ctx, n.Service)
102+
err := fn(ctx, node.Service)
103103
if err != nil {
104104
return err
105105
}
106106

107-
graph.UpdateStatus(n.Service, traversalConfig.targetServiceStatus)
107+
graph.UpdateStatus(node.Service, traversalConfig.targetServiceStatus)
108108

109-
return run(ctx, graph, eg, traversalConfig.adjacentNodesFn(n), traversalConfig, fn)
109+
return run(ctx, graph, eg, traversalConfig.adjacentNodesFn(node), traversalConfig, fn)
110110
})
111111
}
112112

Diff for: pkg/compose/down.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -217,17 +217,17 @@ func (s *composeService) stopContainers(ctx context.Context, w progress.Writer,
217217
func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration, volumes bool) error {
218218
eg, _ := errgroup.WithContext(ctx)
219219
for _, container := range containers {
220-
toDelete := container
220+
container := container
221221
eg.Go(func() error {
222-
eventName := getContainerProgressName(toDelete)
222+
eventName := getContainerProgressName(container)
223223
w.Event(progress.StoppingEvent(eventName))
224-
err := s.stopContainers(ctx, w, []moby.Container{toDelete}, timeout)
224+
err := s.stopContainers(ctx, w, []moby.Container{container}, timeout)
225225
if err != nil {
226226
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
227227
return err
228228
}
229229
w.Event(progress.RemovingEvent(eventName))
230-
err = s.apiClient.ContainerRemove(ctx, toDelete.ID, moby.ContainerRemoveOptions{
230+
err = s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{
231231
Force: true,
232232
RemoveVolumes: volumes,
233233
})

Diff for: pkg/compose/logs.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,20 @@ import (
2929
)
3030

3131
func (s *composeService) Logs(ctx context.Context, projectName string, consumer api.LogConsumer, options api.LogOptions) error {
32-
list, err := s.getContainers(ctx, projectName, oneOffExclude, true, options.Services...)
32+
containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, options.Services...)
3333

3434
if err != nil {
3535
return err
3636
}
3737
eg, ctx := errgroup.WithContext(ctx)
38-
for _, c := range list {
39-
c := c
38+
for _, c := range containers {
4039
service := c.Labels[api.ServiceLabel]
4140
container, err := s.apiClient.ContainerInspect(ctx, c.ID)
4241
if err != nil {
4342
return err
4443
}
4544

45+
name := getContainerNameWithoutProject(c)
4646
eg.Go(func() error {
4747
r, err := s.apiClient.ContainerLogs(ctx, container.ID, types.ContainerLogsOptions{
4848
ShowStdout: true,
@@ -58,7 +58,6 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
5858
}
5959
defer r.Close() // nolint errcheck
6060

61-
name := getContainerNameWithoutProject(c)
6261
w := utils.GetWriter(func(line string) {
6362
consumer.Log(name, service, line)
6463
})

Diff for: pkg/compose/ps.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ func (s *composeService) Ps(ctx context.Context, projectName string, options api
3838

3939
summary := make([]api.ContainerSummary, len(containers))
4040
eg, ctx := errgroup.WithContext(ctx)
41-
for i, c := range containers {
42-
container := c
43-
i := i
41+
for i, container := range containers {
42+
i, container := i, container
4443
eg.Go(func() error {
4544
var publishers []api.PortPublisher
4645
sort.Slice(container.Ports, func(i, j int) bool {

Diff for: pkg/compose/pull.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
5858
w := progress.ContextWriter(ctx)
5959
eg, ctx := errgroup.WithContext(ctx)
6060

61-
for _, srv := range project.Services {
62-
service := srv
61+
for _, service := range project.Services {
62+
service := service
6363
if service.Image == "" {
6464
w.Event(progress.Event{
6565
ID: service.Name,

Diff for: pkg/compose/remove.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
7474
func (s *composeService) remove(ctx context.Context, containers Containers, options api.RemoveOptions) error {
7575
w := progress.ContextWriter(ctx)
7676
eg, ctx := errgroup.WithContext(ctx)
77-
for _, c := range containers {
78-
c := c
77+
for _, container := range containers {
78+
container := container
7979
eg.Go(func() error {
80-
eventName := getContainerProgressName(c)
80+
eventName := getContainerProgressName(container)
8181
w.Event(progress.RemovingEvent(eventName))
82-
err := s.apiClient.ContainerRemove(ctx, c.ID, moby.ContainerRemoveOptions{
82+
err := s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{
8383
RemoveVolumes: options.Volumes,
8484
Force: options.Force,
8585
})

Diff for: pkg/compose/restart.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func (s *composeService) restart(ctx context.Context, project *types.Project, op
4949
return nil
5050
}
5151
eg, ctx := errgroup.WithContext(ctx)
52-
for _, c := range observedState.filter(isService(service)) {
53-
container := c
52+
for _, container := range observedState.filter(isService(service)) {
53+
container := container
5454
eg.Go(func() error {
5555
eventName := getContainerProgressName(container)
5656
w.Event(progress.RestartingEvent(eventName))

Diff for: pkg/compose/top.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ func (s *composeService) Top(ctx context.Context, projectName string, services [
3434
}
3535
summary := make([]api.ContainerProcSummary, len(containers))
3636
eg, ctx := errgroup.WithContext(ctx)
37-
for i, c := range containers {
38-
container := c
39-
i := i
37+
for i, container := range containers {
38+
i, container := i, container
4039
eg.Go(func() error {
4140
topContent, err := s.apiClient.ContainerTop(ctx, container.ID, []string{})
4241
if err != nil {

0 commit comments

Comments
 (0)