diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 2ede8638c..1499f098e 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -179,6 +179,7 @@ def stream(self, func, *args, **kwargs): # We want to ensure we are returning within that timeout. disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False + deserialize = kwargs.pop('deserialize', False) while True: resp = func(*args, **kwargs) try: @@ -186,7 +187,11 @@ def stream(self, func, *args, **kwargs): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log if watch_arg == "watch": - event = self.unmarshal_event(line, return_type) + if deserialize: + event = self.unmarshal_event(line, return_type) + else: + # Only do basic JSON parsing, no deserialize + event = json.loads(line) if isinstance(event, dict) \ and event['type'] == 'ERROR': obj = event['raw_object'] diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index f3880de7c..7fb4fb64d 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -576,5 +576,44 @@ def test_pod_log_empty_lines(self): self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) + def test_watch_with_deserialize_param(self): + """test watch.stream() deserialize param""" + # prepare test data + test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock(return_value=[test_json + '\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # test case with deserialize=True + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=True): + self.assertEqual("ADDED", e['type']) + # Verify that the object is deserialized correctly + self.assertTrue(hasattr(e['object'], 'metadata')) + self.assertEqual("test1", e['object'].metadata.name) + self.assertEqual("1", e['object'].metadata.resource_version) + # Verify that the original object is saved + self.assertEqual(json.loads(test_json)['object'], e['raw_object']) + + # test case with deserialize=False + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=False): + self.assertEqual("ADDED", e['type']) + # The validation object remains in the original dictionary format + self.assertIsInstance(e['object'], dict) + self.assertEqual("test1", e['object']['metadata']['name']) + self.assertEqual("1", e['object']['metadata']['resourceVersion']) + + # verify the api is called twice + fake_api.get_namespaces.assert_has_calls([ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True) + ]) + if __name__ == '__main__': unittest.main()