Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stress test response times when clients become more chatty #1378

Open
Flix6x opened this issue Mar 18, 2025 · 1 comment
Open

Stress test response times when clients become more chatty #1378

Flix6x opened this issue Mar 18, 2025 · 1 comment
Assignees

Comments

@Flix6x
Copy link
Contributor

Flix6x commented Mar 18, 2025

Report your findings here.

@victorgarcia98
Copy link
Contributor

Introduction

In both tests we are running 1000 async workers running HTTP requests @ 1HZ and 1000 async workers running websocket requests @ 1Hz. The first test points to endpoints that don't use database, while the second involves a couple of DB queries to test how it would work in production.

Test I. No DB

Command

$ python stress-test.py --api_url http://localhost:5000/api/ops/ping --ws_url ws://localhost:5000/ping1 --n_api 1000 --n_ws 1000 --filename results-no-db.csv 

Results

Image

Test II. DB

Command

$ python stress-test.py --api_url http://localhost:5000/api/v3_0/sensors/1  --ws_url ws://localhost:5000/ping2 --n_api 1000 --n_ws 1000 --filename results-db.csv

Results

Image

Conclusions

Test I shows that if we don't carefully balance the load of the services, one could take over the other. Probably this would also happen for a fast HTTP endpoint with an important usage.

In case we encounter high frequency for the websockets communications we have some workarounds:

  1. Create a microservice that deals only with websockets
  2. Try to use eventlet or gevent (example here].

Code

import asyncio
import websockets
import argparse
from aiohttp import ClientSession
import time
import random
from flexmeasures import auth
import csv

email = "[email protected]"
password = "admin"

result = []

async def make_request(session, url, method="GET", token : str | None = None, json_payload : dict | None = None) -> dict:
    headers = {}
    if token:
        headers = {"Authorization": token}
    
    async with session.request(url=url, method=method, headers=headers, json=json_payload) as response:
        return await response.json()

async def worker_api(i, session, api_url, token):
    await asyncio.sleep(random.random()*10)
    while True:
        now = time.time()
        response = await make_request(session=session, url=api_url, token=token, method="GET")
        delta = round(time.time() - now, 3)
        result.append(
            (time.time(), "API", i, delta)
        )
        print(f"API {i}: {delta}s")
        await asyncio.sleep(1)
    
async def get_api_key(session, email, password) -> str:
    response = await make_request(
        session=session, method="POST", url="http://localhost:5000/api/requestAuthToken", json_payload={"email" : email, "password" : password}
    )
    print(response)
    return response["auth_token"]

async def stress_test_api(api_url, n_api):
    async with ClientSession() as session:
        token = await get_api_key(session, email, password)
        tasks = [worker_api(i, session, api_url, token=token) for i in range(n_api)]
        responses = await asyncio.gather(*tasks)

async def worker_ws(i, url):
    await asyncio.sleep(random.random()*10)
    async with websockets.connect(url) as ws:
        while True:
            now = time.time()
            await ws.send(f"Hello from {i}!")
            response = await ws.recv()
            delta = round(time.time() - now, 3)
            result.append(
                (time.time(), "WS", i, delta)
            )
            print(f"WS {i}: RTT {delta}s")
            await asyncio.sleep(1)
                
async def stress_test_ws(ws_url, n_ws):
    tasks = [worker_ws(i, ws_url) for i in range(n_ws)]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    print(f"WebSocket Test Completed: {len(responses)} connections established")

async def main(api_url, ws_url, n_api, n_ws):
    async with asyncio.TaskGroup() as tg:
        tg.create_task(stress_test_api(api_url, n_api))
        tg.create_task(stress_test_ws(ws_url, n_ws))

if __name__ == "__main__":
    """
    Example:
        python stress-test.py --api_url http://localhost:5000/api/v3_0/sensors\?include_public_assets\=true --ws_url ws://localhost:5000/ping2 --n_api 1000 --n_ws 1000
    """
    parser = argparse.ArgumentParser(description="Stress test an API and WebSocket endpoint in parallel.")
    parser.add_argument("--api_url", type=str, required=True, help="API endpoint URL")
    parser.add_argument("--ws_url", type=str, required=True, help="WebSocket endpoint URL")
    parser.add_argument("--n_api", type=int, required=False, default=1, help="Number of API requests")
    parser.add_argument("--n_ws", type=int, required=False, default=1, help="Number of WebSocket connections")
    parser.add_argument("--filename", type=str, required=False, default="results.csv", help="Filename")
    
    args = parser.parse_args()
    try:
        asyncio.run(main(args.api_url, args.ws_url, args.n_api, args.n_ws))
    except KeyboardInterrupt:
        def save_results(vec, filename):
            with open(filename, mode="w", newline="") as file:
                writer = csv.writer(file)
                writer.writerows(vec)

        print("Saving results...")
        save_results(result, args.filename)
        print("Done!")                      

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants