6
6
import unittest
7
7
8
8
import kubernetes
9
-
9
+ from kubernetes .client .configuration import Configuration
10
+ import urllib3
10
11
11
12
class TestApiClient (unittest .TestCase ):
12
13
@@ -23,3 +24,28 @@ def test_atexit_closes_threadpool(self):
23
24
self .assertIsNotNone (client ._pool )
24
25
atexit ._run_exitfuncs ()
25
26
self .assertIsNone (client ._pool )
27
+
28
+ def test_rest_proxycare (self ):
29
+
30
+ pool = { 'proxy' : urllib3 .ProxyManager , 'direct' : urllib3 .PoolManager }
31
+
32
+ for dst , proxy , no_proxy , expected_pool in [
33
+ ( 'http://kube.local/' , None , None , pool ['direct' ]),
34
+ ( 'http://kube.local/' , 'http://proxy.local:8080/' , None , pool ['proxy' ]),
35
+ ( 'http://127.0.0.1:8080/' , 'http://proxy.local:8080/' , 'localhost,127.0.0.0/8,.local' , pool ['direct' ]),
36
+ ( 'http://kube.local/' , 'http://proxy.local:8080/' , 'localhost,127.0.0.0/8,.local' , pool ['direct' ]),
37
+ ( 'http://kube.others.com:1234/' ,'http://proxy.local:8080/' , 'localhost,127.0.0.0/8,.local' , pool ['proxy' ]),
38
+ ( 'http://kube.others.com:1234/' ,'http://proxy.local:8080/' , '*' , pool ['direct' ]),
39
+ ]:
40
+ # setup input
41
+ config = Configuration ()
42
+ setattr (config , 'host' , dst )
43
+ if proxy is not None :
44
+ setattr (config , 'proxy' , proxy )
45
+ if no_proxy is not None :
46
+ setattr (config , 'no_proxy' , no_proxy )
47
+ # setup done
48
+
49
+ # test
50
+ client = kubernetes .client .ApiClient (configuration = config )
51
+ self .assertEqual ( expected_pool , type (client .rest_client .pool_manager ) )
0 commit comments