Skip to content

Commit 87e869c

Browse files
committed
fix: update heartbeat method using threads
1 parent b0f48a3 commit 87e869c

File tree

6 files changed

+55
-43
lines changed

6 files changed

+55
-43
lines changed

requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
aiohttp >= 3.8.4
22
backoff == 2.2.1
33
boto3 >= 1.26.142
4-
importlib-metadata == 6.6.0
54
pillow >= 9.3.0
65
py-cpuinfo == 9.0.0
76
python-dotenv >= 0.21.0
Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Enables heartbeats."""
22

33
import os
4+
import time
45
import threading
56

67
import requests
@@ -12,36 +13,47 @@
1213
_session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})
1314

1415

15-
def _send_ping(ping_params=None):
16-
if PING_URL not in [None, 'PING_URL_NOT_SET']:
17-
try:
18-
result = _session.get(
19-
PING_URL,
20-
params=ping_params,
21-
timeout=int(PING_INTERVAL / 1000)
22-
)
23-
24-
log.info(f"Heartbeat Sent URL: {PING_URL} Status: {result.status_code}")
25-
log.info(f"Heartbeat Sent Interval: {PING_INTERVAL}ms Params: {ping_params}")
26-
27-
except Exception as err: # pylint: disable=broad-except
28-
log.error(f"Heartbeat Failed URL: {PING_URL} Params: {ping_params}")
29-
log.error(f"Heartbeat Fail Error: {err}")
30-
31-
32-
def start_ping():
33-
"""
34-
Pings the heartbeat endpoint at the specified interval.
35-
"""
36-
job_id = get_current_job_id()
37-
38-
ping_params = {
39-
'job_id': job_id,
40-
} if job_id is not None else None
41-
42-
_send_ping(ping_params)
43-
44-
log.debug(f"Scheduling next heartbeat in {PING_INTERVAL}ms")
45-
heartbeat_thread = threading.Timer(int(PING_INTERVAL / 1000), start_ping)
46-
heartbeat_thread.daemon = True
47-
heartbeat_thread.start()
16+
class HeartbeatSender:
17+
''' Sends heartbeats to the Runpod server. '''
18+
19+
def __init__(self):
20+
self._thread = threading.Thread(target=self._run, daemon=True)
21+
22+
def start_ping(self):
23+
'''
24+
Starts the heartbeat thread.
25+
'''
26+
self._thread.start()
27+
28+
def _run(self):
29+
'''
30+
Sends heartbeats to the Runpod server.
31+
'''
32+
while True:
33+
self._send_ping()
34+
time.sleep(int(PING_INTERVAL / 1000))
35+
36+
def _send_ping(self):
37+
'''
38+
Sends a heartbeat to the Runpod server.
39+
'''
40+
job_id = get_current_job_id()
41+
42+
ping_params = {
43+
'job_id': job_id,
44+
} if job_id is not None else None
45+
46+
if PING_URL not in [None, 'PING_URL_NOT_SET']:
47+
try:
48+
result = _session.get(
49+
PING_URL,
50+
params=ping_params,
51+
timeout=int(PING_INTERVAL / 1000)
52+
)
53+
54+
log.info(f"Heartbeat Sent URL: {PING_URL} Status: {result.status_code}")
55+
log.info(f"Heartbeat Sent Interval: {PING_INTERVAL}ms Params: {ping_params}")
56+
57+
except Exception as err: # pylint: disable=broad-except
58+
log.error(f"Heartbeat Failed URL: {PING_URL} Params: {ping_params}")
59+
log.error(f"Heartbeat Fail Error: {err}")

runpod/serverless/modules/rp_fastapi.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99

1010
from .job import run_job
1111
from .worker_state import set_job_id
12-
from .heartbeat import start_ping
12+
from .heartbeat import HeartbeatSender
13+
14+
15+
heartbeat = HeartbeatSender()
1316

1417

1518
class Job(BaseModel):
@@ -29,7 +32,7 @@ def __init__(self, handler=None):
2932
3. Sets the handler for processing jobs.
3033
'''
3134
# Start the heartbeat thread.
32-
start_ping()
35+
heartbeat.start_ping()
3336

3437
# Set the handler for processing jobs.
3538
self.config = {"handler": handler}

runpod/serverless/utils/rp_download.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
from typing import List, Union
1414
from urllib.parse import urlparse
1515
from email import message_from_string
16-
from importlib.metadata import version
1716
from concurrent.futures import ThreadPoolExecutor
1817

1918
import backoff
2019
import requests
2120

22-
HEADERS = {
23-
"User-Agent": f"runpod-python/{version('runpod-python')} (https://runpod.io; [email protected])"
24-
}
21+
HEADERS = {"User-Agent": "runpod-python/0.0.0 (https://runpod.io; [email protected])"}
2522

2623

2724
def calculate_chunk_size(file_size: int) -> int:

runpod/serverless/work_loop.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99
import aiohttp
1010

1111
import runpod.serverless.modules.logging as log
12-
from .modules.heartbeat import start_ping
12+
from .modules.heartbeat import HeartbeatSender
1313
from .modules.job import get_job, run_job, send_result
1414
from .modules.worker_state import REF_COUNT_ZERO, set_job_id
1515
from .utils import rp_debugger
1616

1717

1818
_TIMEOUT = aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2)
1919

20+
heartbeat = HeartbeatSender()
21+
2022

2123
def _get_auth_header() -> dict:
2224
'''
@@ -40,7 +42,7 @@ async def start_worker(config):
4042

4143
async with aiohttp.ClientSession(headers=auth_header, timeout=_TIMEOUT) as session:
4244

43-
start_ping()
45+
heartbeat.start_ping()
4446

4547
while True:
4648
job = await get_job(session, config)

setup.cfg

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ install_requires =
2929
aiohttp >= 3.8.3
3030
backoff >= 2.2.1
3131
boto3 >= 1.26.142
32-
importlib-metadata >= 6.6.0
3332
pillow >= 9.3.0
3433
py-cpuinfo >= 9.0.0
3534
python-dotenv >= 0.21.0

0 commit comments

Comments
 (0)