File tree Expand file tree Collapse file tree 2 files changed +34
-22
lines changed Expand file tree Collapse file tree 2 files changed +34
-22
lines changed Original file line number Diff line number Diff line change 1
1
'''enables heartbeats'''
2
2
3
- import asyncio
3
+ import time
4
+ import os
5
+
6
+ import requests
4
7
5
8
import runpod .serverless .modules .logging as log
6
9
from .worker_state import get_current_job_id , ping_url , ping_interval
7
10
11
+ # COUNTER = 0
12
+
8
13
9
- async def send_ping (session ):
14
+ def heartbeat_ping (session ):
10
15
'''
11
- sends the http request
16
+ Pings the heartbeat endpoint
12
17
'''
18
+ ping_params = None
19
+
13
20
try :
14
- ping_params = None
15
21
job_id = get_current_job_id ()
16
22
17
23
if job_id is not None :
18
24
ping_params = {
19
25
'job_id' : job_id ,
20
26
}
21
- await session .get (ping_url , params = ping_params ,
22
- timeout = int (ping_interval / 1000 ))
23
27
24
- log .debug (f'Heartbeat URL: { ping_url } Interval: { ping_interval } ms' )
25
- log .debug (f"Heartbeat Params: { ping_params } " )
28
+ session .get (
29
+ ping_url ,
30
+ params = ping_params ,
31
+ timeout = int (ping_interval / 1000 )
32
+ )
26
33
34
+ log .info (
35
+ f'Heartbeat sent to { ping_url } interval: { ping_interval } ms params: { ping_params } ' )
27
36
except Exception as err : # pylint: disable=broad-except
28
- log .warn ( f"Error while sending heartbeat: { err } " )
37
+ log .error ( err )
29
38
30
39
31
- async def heartbeat_ping ( session ):
40
+ def start_heartbeat ( ):
32
41
'''
33
42
manages heartbeat timing
34
43
'''
35
- if ping_url is not None :
36
- while True :
37
-
38
- asyncio .create_task (
39
- send_ping (session )
40
- )
41
44
42
- await asyncio .sleep (ping_interval / 1000 )
45
+ session = requests .Session ()
46
+ session .headers .update ({
47
+ "Authorization" : f"{ os .environ .get ('RUNPOD_AI_API_KEY' )} " })
48
+ while True :
49
+ heartbeat_ping (session )
50
+ time .sleep (ping_interval / 1000 )
Original file line number Diff line number Diff line change 5
5
6
6
import os
7
7
import json
8
- import asyncio
8
+ from threading import Thread
9
9
10
10
import aiohttp
11
11
12
12
import runpod .serverless .modules .logging as log
13
- from .modules .heartbeat import heartbeat_ping
13
+ from .modules .heartbeat import start_heartbeat
14
14
from .modules .job import get_job , run_job , send_result
15
15
from .modules .worker_state import set_job_id
16
16
@@ -27,7 +27,9 @@ async def start_worker(config):
27
27
28
28
async with aiohttp .ClientSession (headers = auth_header ) as session :
29
29
30
- asyncio .create_task (heartbeat_ping (session ))
30
+ heartbeat_thread = Thread (target = start_heartbeat , daemon = True )
31
+ heartbeat_thread .daemon = True
32
+ heartbeat_thread .start ()
31
33
32
34
while True :
33
35
# GET JOB
@@ -49,8 +51,10 @@ async def start_worker(config):
49
51
try :
50
52
job_data = json .dumps (job_result , ensure_ascii = False )
51
53
except Exception as err : # pylint: disable=broad-except
52
- log .error (f"Error while serializing job result { job ['id' ]} : { err } " )
53
- job_data = json .dumps ({"error" : "unable to serialize job output" })
54
+ log .error (
55
+ f"Error while serializing job result { job ['id' ]} : { err } " )
56
+ job_data = json .dumps (
57
+ {"error" : "unable to serialize job output" })
54
58
55
59
# SEND RESULTS
56
60
await send_result (session , job_data , job )
You can’t perform that action at this time.
0 commit comments