Skip to content

Commit 5cd19b4

Browse files
authored
Merge pull request #58 from osint-dev-team/develop
Email breaches module. Redis cache. Publisher/Consumer refactoring.
2 parents 4417844 + 6767d76 commit 5cd19b4

14 files changed

+382
-54
lines changed

docker-compose.yml

+15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ services:
2121
- postgres:/data/postgres
2222
networks:
2323
- postgres
24+
redis:
25+
container_name: osint-framework-redis
26+
image: redis:alpine
27+
healthcheck:
28+
test: redis-cli ping
29+
interval: 30s
30+
timeout: 5s
31+
retries: 5
32+
networks:
33+
- redis
34+
restart: unless-stopped
2435
rabbitmq:
2536
container_name: osint-framework-rabbitmq
2637
image: rabbitmq:alpine
@@ -76,6 +87,7 @@ services:
7687
POSTGRES_PORT: ${POSTGRES_PORT:-5432}
7788
RABBITMQ_HOST: ${RABBITMQ_HOST:-osint-framework-rabbitmq}
7889
RABBITMQ_PORT: ${RABBITMQ_PORT:-5672}
90+
REDIS_HOST: ${REDIS_HOST-osint-framework-redis}
7991
LOG_HANDLER: ${LOG_HANDLER:-stream}
8092
build:
8193
context: .
@@ -91,10 +103,13 @@ services:
91103
networks:
92104
- postgres
93105
- rabbitmq
106+
- redis
94107
networks:
95108
postgres:
96109
driver: bridge
97110
rabbitmq:
98111
driver: bridge
112+
redis:
113+
driver: bridge
99114
volumes:
100115
postgres:

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pycares==3.1.1
2424
pycparser==2.20
2525
Pygments==2.6.1
2626
PyYAML==5.3.1
27+
redis==3.5.3
2728
requests==2.24.0
2829
rich==5.1.2
2930
selenium==3.141.0

server.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from src.server.handlers.task_spawner import TaskSpawner
2323
from src.server.structures.response import ServerResponse
2424
from src.server.structures.task import TaskItem
25+
from src.server.structures.task import TaskStatus
26+
from src.cache.redis import RedisCache
2527

2628
# Set logging level for Tornado Server
2729
tornado.log.access_log.setLevel(DEBUG)
@@ -32,6 +34,9 @@
3234
# Initialize publisher
3335
publisher = Publisher()
3436

37+
# Initialize redis
38+
redis = RedisCache()
39+
3540

3641
class BaseHandler(RequestHandler, ABC):
3742
"""
@@ -170,12 +175,26 @@ def get(self) -> None:
170175
"""
171176
try:
172177
task_id = self.get_argument("task_id", default=None)
173-
results = json_encode(TaskCrud.get_results(task_id))
178+
redis_cache = redis.get(task_id)
179+
# If cache is available - write cache as response
180+
if redis_cache:
181+
logger.info(msg=f"Redis cache is available, task '{task_id}'")
182+
return self.write(redis_cache)
183+
# If cache is not available - get results from the database
184+
db_results = TaskCrud.get_results(task_id)
185+
json_results = dumps(db_results, default=str)
186+
# If status is 'pending' (in progress), skip cache saving, write database results
187+
if db_results.get("task", {}).get("status", "") == TaskStatus.PENDING:
188+
logger.info(msg=f"Status of the task '{task_id}' is '{TaskStatus.PENDING}', skip Redis cache saving")
189+
return self.write(json_results)
190+
# If status is 'error' or 'success' (finished in any way), save the cache and write database results
191+
redis.set(key=task_id, value=json_results)
192+
logger.info(msg=f"Save results to Redis cache, task '{task_id}'")
193+
self.write(json_results)
174194
except Exception as get_results_error:
175195
return self.error(
176196
msg=f"Unexpected error at getting results: {str(get_results_error)}"
177197
)
178-
self.write(results)
179198

180199

181200
class HealthCheckHandler(BaseHandler, ABC):
@@ -219,7 +238,7 @@ def make_app() -> Application:
219238

220239
# Init rabbitmq queue polling
221240
polling = tornado.ioloop.PeriodicCallback(
222-
lambda: publisher.process_data_events(), 1000
241+
lambda: publisher.process_data_events(), callback_time=1.000
223242
)
224243
polling.start()
225244

src/cache/__init__.py

Whitespace-only changes.

src/cache/redis.py

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env python3
2+
3+
from redis import Redis
4+
from os import environ
5+
6+
7+
class DefaultValues:
8+
# 24 hrs
9+
REDIS_TIMEOUT = 86400
10+
REDIS_HOST = environ.get("REDIS_HOST", default="localhost")
11+
12+
13+
class RedisCache:
14+
def __init__(
15+
self,
16+
host: str = DefaultValues.REDIS_HOST,
17+
timeout: int = DefaultValues.REDIS_TIMEOUT,
18+
):
19+
self.options = dict(timeout=timeout)
20+
self.redis = Redis(host=host)
21+
22+
def get(self, key) -> dict or list:
23+
"""
24+
Return redis cache value
25+
:param key: key to get
26+
:return: cache
27+
"""
28+
if self.exists(key):
29+
return self.redis.get(key)
30+
return None
31+
32+
def set(self, key, value, timeout=None) -> None:
33+
"""
34+
Set redis cache value
35+
:param key: key to set
36+
:param value: value to set
37+
:param timeout: timeout to live
38+
:return: None
39+
"""
40+
self.redis.set(key, value)
41+
if timeout:
42+
self.redis.expire(key, timeout)
43+
else:
44+
self.redis.expire(key, self.options["timeout"])
45+
46+
def delitem(self, key) -> None:
47+
"""
48+
Delete cache value
49+
:param key: key to delete
50+
:return: None
51+
"""
52+
self.redis.delete(key)
53+
54+
def exists(self, key) -> bool:
55+
"""
56+
Check if value exists
57+
:param key: key to check
58+
:return: bool
59+
"""
60+
return bool(self.redis.exists(key))

src/db/crud.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -118,25 +118,28 @@ def update_task(task: TaskItem, db: Session = SessionLocal()) -> None:
118118

119119
@staticmethod
120120
@retry()
121-
def get_results(task_id: str, db: Session = SessionLocal()) -> list:
121+
def get_results(task_id: str, db: Session = SessionLocal()) -> dict:
122122
"""
123123
Return results
124124
:param task_id: task id to use
125125
:param db: database to use
126126
:return: dict
127127
"""
128+
# fmt: off
128129
try:
129-
results = (
130-
db.query(models.Result).filter(models.Result.owner_id == task_id).all()
131-
)
130+
db_results = db.query(models.Result).filter(models.Result.owner_id == task_id).all()
131+
db_task_status = db.query(models.Task).filter_by(task_id=task_id).first()
132132
except exc.DBAPIError as api_err:
133133
raise api_err from api_err
134134
except:
135-
return []
135+
return {}
136136
else:
137-
return [loads(str(data.result)) for data in results]
137+
results = [loads(str(data.result)) for data in db_results]
138+
task_status = object_as_dict(db_task_status)
139+
return {"task": task_status, "results": results}
138140
finally:
139141
db.close()
142+
# fmt: on
140143

141144
@staticmethod
142145
@retry()
@@ -164,7 +167,7 @@ def get_results_count(task_id: str, db: Session = SessionLocal()) -> int:
164167
@retry()
165168
def get_task(task_id: str, db: Session = SessionLocal()) -> dict:
166169
"""
167-
Return task results by UUID
170+
Return task status by UUID
168171
:param task_id: task id to use
169172
:param db: database to use
170173
:return: dict

src/queue/consumer.py

+36-24
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
from json import loads, dumps
44

5-
import pika
5+
from pika import BlockingConnection, ConnectionParameters
6+
from pika.adapters.blocking_connection import BlockingChannel
7+
from pika.spec import Basic, BasicProperties
68

79
from src.core.runner.manager import CaseManager
810
from src.core.utils.log import Logger
@@ -15,62 +17,72 @@
1517

1618
class Consumer:
1719
def __init__(
18-
self, host: str = Default.RABBITMQ_HOST, port: int = Default.RABBITMQ_PORT
20+
self,
21+
host: str = Default.RABBITMQ_HOST,
22+
port: int = Default.RABBITMQ_PORT,
23+
task_queue: str = Default.TASK_QUEUE
1924
):
2025
"""
2126
Init rabbitmq consumer
2227
:param host: rabbitmq host
2328
:param port: rabbitmq port
29+
:param task_queue: queue name
2430
"""
25-
self.queue = Default.QUEUE
26-
self.connection = pika.BlockingConnection(
27-
pika.ConnectionParameters(host=host, port=port,)
31+
self.connection = BlockingConnection(
32+
ConnectionParameters(host=host, port=port,)
2833
)
2934
self.channel = self.connection.channel()
30-
self.channel.queue_declare(queue=self.queue)
35+
self.channel.queue_declare(queue=task_queue)
36+
self.channel.basic_consume(
37+
queue=task_queue,
38+
on_message_callback=self.task_process,
39+
)
40+
3141
self.manager = CaseManager()
3242

33-
def callback(self, ch, method, properties, body) -> None:
43+
def task_process(
44+
self,
45+
channel: BlockingChannel,
46+
method: Basic.Deliver,
47+
properties: BasicProperties,
48+
body: bytes
49+
) -> None:
3450
"""
3551
Process the received task
36-
:param ch: channel
52+
:param channel: channel
3753
:param method: method
3854
:param properties: task properties
3955
:param body: task body
4056
:return: None
4157
"""
42-
raw_body = loads(body)
58+
raw_body = loads(body.decode(encoding="utf-8"))
4359
cases = raw_body.get("cases", {})
4460
task = TaskItem(**raw_body.get("task", {}))
4561

46-
done_tasks = 0
47-
cases_len = len(cases)
48-
for result in self.manager.multi_case_runner(cases=cases):
49-
done_tasks += 1
50-
TaskCrud.create_task_result(task, result or {})
51-
message = f"Done {done_tasks} out of {cases_len} cases"
52-
task.set_pending(message)
53-
logger.info(message)
54-
TaskCrud.update_task(task)
62+
try:
63+
results = list(self.manager.multi_case_runner(cases=cases))
64+
for result in results:
65+
TaskCrud.create_task_result(task, result or {})
66+
task.set_success(msg=f"Task done: {len(results)} out of {len(cases)} cases")
67+
except Exception as cases_err:
68+
task.set_error(msg=f"Task error: {str(cases_err)}")
5569

56-
task.set_success(msg=f"All cases done ({done_tasks} out of {cases_len})")
5770
TaskCrud.update_task(task)
5871
logger.info(msg=f"Done task {task.task_id}")
5972

60-
ch.basic_publish(
73+
channel.basic_publish(
6174
exchange="",
6275
routing_key=properties.reply_to,
63-
properties=pika.BasicProperties(correlation_id=properties.correlation_id),
64-
body=dumps(task.as_json()),
76+
properties=BasicProperties(correlation_id=properties.correlation_id),
77+
body=dumps(task.as_json()).encode(encoding="utf-8"),
6578
)
66-
ch.basic_ack(delivery_tag=method.delivery_tag)
79+
channel.basic_ack(delivery_tag=method.delivery_tag)
6780

6881
def start_consuming(self) -> None:
6982
"""
7083
Run consumer
7184
:return: None
7285
"""
73-
self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback)
7486
self.channel.start_consuming()
7587

7688
def __del__(self):

src/queue/defaults.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ class DefaultValues:
77
RABBITMQ_HOST = str(environ.get("RABBITMQ_HOST", default="localhost"))
88
RABBITMQ_PORT = int(environ.get("RABBITMQ_PORT", default=5672))
99

10-
QUEUE = "case_queue"
10+
TASK_QUEUE = "task_queue"
11+
RESPONSE_QUEUE = "response_queue"

0 commit comments

Comments
 (0)