44import pytest
55import pubnub as pn
66
7+ from unittest .mock import patch
78from pubnub .exceptions import PubNubException
89from pubnub .models .consumer .common import PNStatus
910from pubnub .models .consumer .pubsub import PNPublishResult
@@ -108,27 +109,29 @@ async def test_publish_object_via_post(event_loop):
108109 filter_query_parameters = ['uuid' , 'seqn' , 'pnsdk' ])
109110@pytest .mark .asyncio
110111async def test_publish_mixed_via_get_encrypted (event_loop ):
111- pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
112- await asyncio .gather (
113- asyncio .ensure_future (assert_success_publish_get (pubnub , "hi" )),
114- asyncio .ensure_future (assert_success_publish_get (pubnub , 5 )),
115- asyncio .ensure_future (assert_success_publish_get (pubnub , True )),
116- asyncio .ensure_future (assert_success_publish_get (pubnub , ["hi" , "hi2" , "hi3" ])))
112+ with patch ("pubnub.crypto.PubNubCryptodome.get_initialization_vector" , return_value = "knightsofni12345" ):
113+ pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
114+ await asyncio .gather (
115+ asyncio .ensure_future (assert_success_publish_get (pubnub , "hi" )),
116+ asyncio .ensure_future (assert_success_publish_get (pubnub , 5 )),
117+ asyncio .ensure_future (assert_success_publish_get (pubnub , True )),
118+ asyncio .ensure_future (assert_success_publish_get (pubnub , ["hi" , "hi2" , "hi3" ])))
117119
118- pubnub .stop ()
120+ pubnub .stop ()
119121
120122
121123@pn_vcr .use_cassette (
122124 'tests/integrational/fixtures/asyncio/publish/object_via_get_encrypted.yaml' ,
123125 filter_query_parameters = ['uuid' , 'seqn' , 'pnsdk' ],
124- match_on = ['host' , 'method' , 'query' , 'object_in_path_with_decrypt' ]
126+ match_on = ['host' , 'method' , 'query' ]
125127)
126128@pytest .mark .asyncio
127129async def test_publish_object_via_get_encrypted (event_loop ):
128- pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
129- await asyncio .ensure_future (assert_success_publish_get (pubnub , {"name" : "Alex" , "online" : True }))
130+ with patch ("pubnub.crypto.PubNubCryptodome.get_initialization_vector" , return_value = "knightsofni12345" ):
131+ pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
132+ await asyncio .ensure_future (assert_success_publish_get (pubnub , {"name" : "Alex" , "online" : True }))
130133
131- pubnub .stop ()
134+ pubnub .stop ()
132135
133136
134137@pn_vcr .use_cassette (
@@ -138,28 +141,30 @@ async def test_publish_object_via_get_encrypted(event_loop):
138141)
139142@pytest .mark .asyncio
140143async def test_publish_mixed_via_post_encrypted (event_loop ):
141- pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
142- await asyncio .gather (
143- asyncio .ensure_future (assert_success_publish_post (pubnub , "hi" )),
144- asyncio .ensure_future (assert_success_publish_post (pubnub , 5 )),
145- asyncio .ensure_future (assert_success_publish_post (pubnub , True )),
146- asyncio .ensure_future (assert_success_publish_post (pubnub , ["hi" , "hi2" , "hi3" ]))
147- )
144+ with patch ("pubnub.crypto.PubNubCryptodome.get_initialization_vector" , return_value = "knightsofni12345" ):
145+ pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
146+ await asyncio .gather (
147+ asyncio .ensure_future (assert_success_publish_post (pubnub , "hi" )),
148+ asyncio .ensure_future (assert_success_publish_post (pubnub , 5 )),
149+ asyncio .ensure_future (assert_success_publish_post (pubnub , True )),
150+ asyncio .ensure_future (assert_success_publish_post (pubnub , ["hi" , "hi2" , "hi3" ]))
151+ )
148152
149- pubnub .stop ()
153+ pubnub .stop ()
150154
151155
152156@pn_vcr .use_cassette (
153157 'tests/integrational/fixtures/asyncio/publish/object_via_post_encrypted.yaml' ,
154158 filter_query_parameters = ['uuid' , 'seqn' , 'pnsdk' ],
155- match_on = ['method' , 'path' , 'query' , 'object_in_body_with_decrypt' ]
159+ match_on = ['method' , 'path' , 'query' ]
156160)
157161@pytest .mark .asyncio
158162async def test_publish_object_via_post_encrypted (event_loop ):
159- pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
160- await asyncio .ensure_future (assert_success_publish_post (pubnub , {"name" : "Alex" , "online" : True }))
163+ with patch ("pubnub.crypto.PubNubCryptodome.get_initialization_vector" , return_value = "knightsofni12345" ):
164+ pubnub = PubNubAsyncio (pnconf_enc_copy (), custom_event_loop = event_loop )
165+ await asyncio .ensure_future (assert_success_publish_post (pubnub , {"name" : "Alex" , "online" : True }))
161166
162- pubnub .stop ()
167+ pubnub .stop ()
163168
164169
165170@pytest .mark .asyncio
0 commit comments