@@ -3,12 +3,15 @@ package cache
3
3
import (
4
4
"context"
5
5
"fmt"
6
- "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
7
6
"sort"
8
7
"strings"
8
+ "sync"
9
9
"testing"
10
10
"time"
11
11
12
+ "golang.org/x/sync/semaphore"
13
+ "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
14
+
12
15
"github.com/stretchr/testify/assert"
13
16
"github.com/stretchr/testify/require"
14
17
appsv1 "k8s.io/api/apps/v1"
71
74
)
72
75
73
76
func newCluster (t * testing.T , objs ... runtime.Object ) * clusterCache {
77
+ cache := newClusterWithOptions (t , []UpdateSettingsFunc {}, objs ... )
78
+
79
+ t .Cleanup (func () {
80
+ cache .Invalidate ()
81
+ })
82
+
83
+ return cache
84
+ }
85
+
86
+ func newClusterWithOptions (t * testing.T , opts []UpdateSettingsFunc , objs ... runtime.Object ) * clusterCache {
74
87
client := fake .NewSimpleDynamicClient (scheme .Scheme , objs ... )
75
88
reactor := client .ReactionChain [0 ]
76
89
client .PrependReactor ("list" , "*" , func (action testcore.Action ) (handled bool , ret runtime.Object , err error ) {
@@ -101,11 +114,14 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache {
101
114
Meta : metav1.APIResource {Namespaced : true },
102
115
}}
103
116
117
+ opts = append ([]UpdateSettingsFunc {
118
+ SetKubectl (& kubetest.MockKubectlCmd {APIResources : apiResources , DynamicClient : client }),
119
+ }, opts ... )
120
+
104
121
cache := NewClusterCache (
105
- & rest.Config {Host : "https://test" }, SetKubectl (& kubetest.MockKubectlCmd {APIResources : apiResources , DynamicClient : client }))
106
- t .Cleanup (func () {
107
- cache .Invalidate ()
108
- })
122
+ & rest.Config {Host : "https://test" },
123
+ opts ... ,
124
+ )
109
125
return cache
110
126
}
111
127
@@ -492,23 +508,23 @@ metadata:
492
508
func TestGetManagedLiveObjsFailedConversion (t * testing.T ) {
493
509
cronTabGroup := "stable.example.com"
494
510
495
- testCases := []struct {
496
- name string
497
- localConvertFails bool
511
+ testCases := []struct {
512
+ name string
513
+ localConvertFails bool
498
514
expectConvertToVersionCalled bool
499
- expectGetResourceCalled bool
515
+ expectGetResourceCalled bool
500
516
}{
501
517
{
502
- name : "local convert fails, so GetResource is called" ,
503
- localConvertFails : true ,
518
+ name : "local convert fails, so GetResource is called" ,
519
+ localConvertFails : true ,
504
520
expectConvertToVersionCalled : true ,
505
- expectGetResourceCalled : true ,
521
+ expectGetResourceCalled : true ,
506
522
},
507
523
{
508
- name : "local convert succeeds, so GetResource is not called" ,
509
- localConvertFails : false ,
524
+ name : "local convert succeeds, so GetResource is not called" ,
525
+ localConvertFails : false ,
510
526
expectConvertToVersionCalled : true ,
511
- expectGetResourceCalled : false ,
527
+ expectGetResourceCalled : false ,
512
528
},
513
529
}
514
530
@@ -557,7 +573,6 @@ metadata:
557
573
return testCronTab (), nil
558
574
})
559
575
560
-
561
576
managedObjs , err := cluster .GetManagedLiveObjs ([]* unstructured.Unstructured {targetDeploy }, func (r * Resource ) bool {
562
577
return true
563
578
})
@@ -816,25 +831,25 @@ func testPod() *corev1.Pod {
816
831
817
832
func testCRD () * apiextensions.CustomResourceDefinition {
818
833
return & apiextensions.CustomResourceDefinition {
819
- TypeMeta : metav1.TypeMeta {
834
+ TypeMeta : metav1.TypeMeta {
820
835
APIVersion : "apiextensions.k8s.io/v1" ,
821
836
},
822
837
ObjectMeta : metav1.ObjectMeta {
823
838
Name : "crontabs.stable.example.com" ,
824
839
},
825
- Spec : apiextensions.CustomResourceDefinitionSpec {
840
+ Spec : apiextensions.CustomResourceDefinitionSpec {
826
841
Group : "stable.example.com" ,
827
842
Versions : []apiextensions.CustomResourceDefinitionVersion {
828
843
{
829
- Name : "v1" ,
830
- Served : true ,
844
+ Name : "v1" ,
845
+ Served : true ,
831
846
Storage : true ,
832
847
Schema : & apiextensions.CustomResourceValidation {
833
848
OpenAPIV3Schema : & apiextensions.JSONSchemaProps {
834
849
Type : "object" ,
835
850
Properties : map [string ]apiextensions.JSONSchemaProps {
836
851
"cronSpec" : {Type : "string" },
837
- "image" : {Type : "string" },
852
+ "image" : {Type : "string" },
838
853
"replicas" : {Type : "integer" },
839
854
},
840
855
},
@@ -855,14 +870,14 @@ func testCRD() *apiextensions.CustomResourceDefinition {
855
870
func testCronTab () * unstructured.Unstructured {
856
871
return & unstructured.Unstructured {Object : map [string ]interface {}{
857
872
"apiVersion" : "stable.example.com/v1" ,
858
- "kind" : "CronTab" ,
873
+ "kind" : "CronTab" ,
859
874
"metadata" : map [string ]interface {}{
860
- "name" : "test-crontab" ,
875
+ "name" : "test-crontab" ,
861
876
"namespace" : "default" ,
862
877
},
863
878
"spec" : map [string ]interface {}{
864
879
"cronSpec" : "* * * * */5" ,
865
- "image" : "my-awesome-cron-image" ,
880
+ "image" : "my-awesome-cron-image" ,
866
881
},
867
882
}}
868
883
}
@@ -1006,3 +1021,76 @@ func TestIterateHierachy(t *testing.T) {
1006
1021
keys )
1007
1022
})
1008
1023
}
1024
+
1025
+ // TestDeadlock_startMissingWatches validates that starting watches will not create a deadlock
1026
+ // caused by using improper locking in various callback methods when there is a high load on the
1027
+ // system.
1028
+ func Test_watchEvents_Deadlock (t * testing.T ) {
1029
+ // deadlock lock is used to simulate a user function calling the cluster cache while holding a lock
1030
+ // and using this lock in callbacks such as OnPopulateResourceInfoHandler.
1031
+ deadlock := sync.RWMutex {}
1032
+
1033
+ hasDeadlock := false
1034
+ res1 := testPod ()
1035
+ res2 := testRS ()
1036
+
1037
+ cluster := newClusterWithOptions (t , []UpdateSettingsFunc {
1038
+ // Set low blocking semaphore
1039
+ SetListSemaphore (semaphore .NewWeighted (1 )),
1040
+ // Resync watches often to use the semaphore and trigger the rate limiting behavior
1041
+ SetResyncTimeout (500 * time .Millisecond ),
1042
+ // Use new resource handler to run code in the list callbacks
1043
+ SetPopulateResourceInfoHandler (func (un * unstructured.Unstructured , isRoot bool ) (info interface {}, cacheManifest bool ) {
1044
+ if un .GroupVersionKind ().GroupKind () == res1 .GroupVersionKind ().GroupKind () ||
1045
+ un .GroupVersionKind ().GroupKind () == res2 .GroupVersionKind ().GroupKind () {
1046
+ // Create a bottleneck for resources holding the semaphore
1047
+ time .Sleep (2 * time .Second )
1048
+ }
1049
+
1050
+ //// Uncommenting the following code will simulate a deadlock caused by client code holding a lock and
1051
+ //// trying to acquire the same lock in the event callback
1052
+ // deadlock.RLock()
1053
+ // defer deadlock.RUnlock()
1054
+
1055
+ return
1056
+ }),
1057
+ }, res1 , res2 , testDeploy ())
1058
+ defer func () {
1059
+ // Invalidate() is a blocking method and cannot be called safely in case of deadlock
1060
+ if ! hasDeadlock {
1061
+ cluster .Invalidate ()
1062
+ }
1063
+ }()
1064
+
1065
+ err := cluster .EnsureSynced ()
1066
+ require .NoError (t , err )
1067
+
1068
+ for i := 0 ; i < 2 ; i ++ {
1069
+ done := make (chan bool , 1 )
1070
+ go func () {
1071
+ // Stop the watches, so startMissingWatches will restart them
1072
+ cluster .stopWatching (res1 .GroupVersionKind ().GroupKind (), res1 .Namespace )
1073
+ cluster .stopWatching (res2 .GroupVersionKind ().GroupKind (), res2 .Namespace )
1074
+
1075
+ // calling startMissingWatches to simulate that a CRD event was received
1076
+ // TODO: how to simulate real watch events and test the full watchEvents function?
1077
+ err = runSynced (& cluster .lock , func () error {
1078
+ deadlock .Lock ()
1079
+ defer deadlock .Unlock ()
1080
+ return cluster .startMissingWatches ()
1081
+ })
1082
+ require .NoError (t , err )
1083
+ done <- true
1084
+ }()
1085
+ select {
1086
+ case v := <- done :
1087
+ require .True (t , v )
1088
+ case <- time .After (10 * time .Second ):
1089
+ hasDeadlock = true
1090
+ t .Errorf ("timeout reached on attempt %d. It is possible that a deadlock occured" , i )
1091
+ // Tip: to debug the deadlock, increase the timer to a value higher than X in "go test -timeout X"
1092
+ // This will make the test panic with the goroutines information
1093
+ t .FailNow ()
1094
+ }
1095
+ }
1096
+ }
0 commit comments