Skip to content

Commit bccd483

Browse files
authored
feat: support max number of concurrent resource operation executions (#1292)
1 parent 0b7337c commit bccd483

File tree

7 files changed

+111
-4
lines changed

7 files changed

+111
-4
lines changed

Diff for: pkg/apis/api.kusion.io/v1/types.go

+10
Original file line numberDiff line numberDiff line change
@@ -734,3 +734,13 @@ type Release struct {
734734
// ModifiedTime is the time that the Release is modified.
735735
ModifiedTime time.Time `yaml:"modifiedTime" json:"modifiedTime"`
736736
}
737+
738+
const (
739+
// Environment variable for maximum number of concurrent resource executions,
740+
// including preview, apply and destroy.
741+
// Note that the maximum number should be between 1 to 100.
742+
MaxConcurrentEnvVar = "KUSION_EXEC_MAX_CONCURRENT"
743+
744+
// The default maximum number of concurrent resource executions for Kusion is 10.
745+
DefaultMaxConcurrent = 10
746+
)

Diff for: pkg/engine/operation/apply.go

+5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta
6262
return nil, s
6363
}
6464

65+
// Update the operation semaphore.
66+
if err := o.UpdateSemaphore(); err != nil {
67+
return nil, v1.NewErrorStatus(err)
68+
}
69+
6570
// 1. init & build Indexes
6671
priorState := req.Release.State
6772
priorStateResourceIndex := priorState.Resources.Index()

Diff for: pkg/engine/operation/apply_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"kusionstack.io/kusion/pkg/engine/runtime"
1818
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
1919
"kusionstack.io/kusion/pkg/engine/runtime/kubernetes"
20-
"kusionstack.io/kusion/pkg/infra/util/semaphore"
2120
)
2221

2322
func TestApplyOperation_Apply(t *testing.T) {
@@ -33,7 +32,6 @@ func TestApplyOperation_Apply(t *testing.T) {
3332
msgCh chan models.Message
3433
release *apiv1.Release
3534
lock *sync.Mutex
36-
sem *semaphore.Semaphore
3735
}
3836
type args struct {
3937
applyRequest *ApplyRequest
@@ -113,7 +111,6 @@ func TestApplyOperation_Apply(t *testing.T) {
113111
releaseStorage: &storages.LocalStorage{},
114112
runtimeMap: map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: &kubernetes.KubernetesRuntime{}},
115113
msgCh: make(chan models.Message, 5),
116-
sem: semaphore.New(10),
117114
},
118115
args: args{applyRequest: &ApplyRequest{
119116
Request: models.Request{
@@ -141,7 +138,6 @@ func TestApplyOperation_Apply(t *testing.T) {
141138
MsgCh: tc.fields.msgCh,
142139
Release: tc.fields.release,
143140
Lock: tc.fields.lock,
144-
Sem: tc.fields.sem,
145141
}
146142
ao := &ApplyOperation{
147143
Operation: *o,

Diff for: pkg/engine/operation/destroy.go

+5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ func (do *DestroyOperation) Destroy(req *DestroyRequest) (rsp *DestroyResponse,
3737
return nil, s
3838
}
3939

40+
// Update the operation semaphore.
41+
if err := o.UpdateSemaphore(); err != nil {
42+
return nil, v1.NewErrorStatus(err)
43+
}
44+
4045
// 1. init & build Indexes
4146
priorState := req.Release.State
4247
priorStateResourceIndex := priorState.Resources.Index()

Diff for: pkg/engine/operation/models/operation_context.go

+21
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package models
22

33
import (
44
"fmt"
5+
"os"
6+
"strconv"
57
"sync"
68
"time"
79

@@ -124,3 +126,22 @@ func (o *Operation) UpdateReleaseState() error {
124126
log.Infof("update release succeeded, project %s, workspace %s, revision %d", o.Release.Project, o.Release.Workspace, o.Release.Revision)
125127
return nil
126128
}
129+
130+
// Update the operation semaphore with the maximum number of concurrent resource executions.
131+
func (o *Operation) UpdateSemaphore() error {
132+
v := os.Getenv(apiv1.MaxConcurrentEnvVar)
133+
if v != "" {
134+
maxConcurrent, err := strconv.Atoi(v)
135+
if err != nil {
136+
return err
137+
}
138+
if maxConcurrent < 1 || maxConcurrent > 100 {
139+
return fmt.Errorf("invalid value for maximum number of concurrent resource executions: %d", maxConcurrent)
140+
}
141+
o.Sem = semaphore.New(int64(maxConcurrent))
142+
} else {
143+
o.Sem = semaphore.New(int64(apiv1.DefaultMaxConcurrent))
144+
}
145+
146+
return nil
147+
}
+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package models
2+
3+
import (
4+
"errors"
5+
"os"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
10+
"kusionstack.io/kusion/pkg/infra/util/semaphore"
11+
)
12+
13+
func TestOperation_UpdateSemaphore(t *testing.T) {
14+
original := os.Getenv(apiv1.MaxConcurrentEnvVar)
15+
defer os.Setenv(apiv1.MaxConcurrentEnvVar, original)
16+
17+
testcases := []struct {
18+
name string
19+
env string
20+
expectedErr error
21+
expectedVal int64
22+
}{
23+
{
24+
name: "Invalid Env Type",
25+
env: "not-a-number",
26+
expectedErr: errors.New("invalid syntax"),
27+
},
28+
{
29+
name: "Invalid Value (less than 0)",
30+
env: "-1",
31+
expectedErr: errors.New("invalid value"),
32+
},
33+
{
34+
name: "Invalid Value (larger than 100)",
35+
env: "200",
36+
expectedErr: errors.New("invalid value"),
37+
},
38+
{
39+
name: "Default Value",
40+
env: "",
41+
expectedErr: nil,
42+
expectedVal: int64(apiv1.DefaultMaxConcurrent),
43+
},
44+
{
45+
name: "Customized Value",
46+
env: "50",
47+
expectedErr: nil,
48+
expectedVal: int64(50),
49+
},
50+
}
51+
52+
for _, tc := range testcases {
53+
t.Run(tc.name, func(t *testing.T) {
54+
op := &Operation{}
55+
os.Setenv(apiv1.MaxConcurrentEnvVar, tc.env)
56+
err := op.UpdateSemaphore()
57+
if tc.expectedErr != nil {
58+
assert.ErrorContains(t, err, tc.expectedErr.Error())
59+
} else {
60+
assert.Nil(t, err)
61+
assert.Equal(t, *semaphore.New(tc.expectedVal), *op.Sem)
62+
}
63+
})
64+
}
65+
}

Diff for: pkg/engine/operation/preview.go

+5
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ func (po *PreviewOperation) Preview(req *PreviewRequest) (rsp *PreviewResponse,
5454
return nil, s
5555
}
5656

57+
// Update the operation semaphore.
58+
if err := o.UpdateSemaphore(); err != nil {
59+
return nil, v1.NewErrorStatus(err)
60+
}
61+
5762
// 1. init & build Indexes
5863
priorState := req.State
5964

0 commit comments

Comments
 (0)