@@ -55,7 +55,7 @@ def __init__(self):
55
55
self .queue = asyncio .Queue (maxsize = settings .PROXY_QUEUE_SIZE )
56
56
57
57
async def exec (self ):
58
- await asyncio .wait ( [
58
+ await asyncio .gather ( * [
59
59
self .producer (),
60
60
self .consumer (),
61
61
])
@@ -74,7 +74,7 @@ async def consumer(self):
74
74
i += 1
75
75
76
76
if tasks :
77
- await asyncio .wait ( tasks )
77
+ await asyncio .gather ( * tasks )
78
78
tasks .clear ()
79
79
except KeyboardInterrupt :
80
80
raise
@@ -91,7 +91,7 @@ async def producer(self):
91
91
collector_states = await db .execute (
92
92
CollectorState .select ().where (
93
93
CollectorState .last_processing_time < time .time () - CollectorState .processing_period
94
- )
94
+ ). limit ( settings . CONCURRENT_TASKS_COUNT )
95
95
)
96
96
97
97
tasks = [
@@ -100,7 +100,7 @@ async def producer(self):
100
100
]
101
101
102
102
if tasks :
103
- await asyncio .wait ( tasks )
103
+ await asyncio .gather ( * tasks )
104
104
tasks .clear ()
105
105
106
106
# check proxies
@@ -193,11 +193,11 @@ async def process_raw_proxies(self, proxies, collector_id):
193
193
for proxy in proxies :
194
194
tasks .append (self .process_raw_proxy (proxy , collector_id ))
195
195
if len (tasks ) > settings .CONCURRENT_TASKS_COUNT :
196
- await asyncio .wait ( tasks )
196
+ await asyncio .gather ( * tasks )
197
197
tasks .clear ()
198
198
199
199
if tasks :
200
- await asyncio .wait ( tasks )
200
+ await asyncio .gather ( * tasks )
201
201
202
202
async def process_raw_proxy (self , proxy , collector_id ):
203
203
self .logger .debug ("adding raw proxy \" {}\" to queue" .format (proxy ))
0 commit comments