Skip to content

Commit 3f27e06

Browse files
authored
Fix: Related Content should be fetched during the fetch pipeline (#873)
* Fix: messages & pre check balance should be done on fetch pipeline * Fix: unit test missing file in db due to fetching adding the pin in db instead of processing * Refactor: `verify_message` renamed to `verify_and_fetch_message`. now fetch related content & verify permissions * fix: processing pipeline should still fetch the file if the fetch pipeline didn't * fix: remove useless change from test
1 parent 9630da1 commit 3f27e06

File tree

4 files changed

+34
-19
lines changed

4 files changed

+34
-19
lines changed

src/aleph/handlers/message_handler.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -379,11 +379,28 @@ async def insert_costs(
379379
insert_stmt = make_costs_upsert_query(costs)
380380
session.execute(insert_stmt)
381381

382-
async def verify_message(self, pending_message: PendingMessageDb) -> MessageDb:
382+
async def verify_and_fetch_message(
383+
self, session: DbSession, pending_message: PendingMessageDb
384+
) -> MessageDb:
383385
await self.verify_signature(pending_message=pending_message)
384386
validated_message = await self.fetch_pending_message(
385387
pending_message=pending_message
386388
)
389+
content_handler = self.get_content_handler(pending_message.type)
390+
391+
# Check Permissions before the fetch
392+
await content_handler.check_permissions(
393+
session=session, message=validated_message
394+
)
395+
396+
await content_handler.pre_check_balance(
397+
session=session, message=validated_message
398+
)
399+
400+
# Fetch related content like the IPFS associated file
401+
await content_handler.fetch_related_content(
402+
session=session, message=validated_message
403+
)
387404

388405
return validated_message
389406

@@ -432,14 +449,10 @@ async def process(
432449
)
433450

434451
# First check the message content and verify it
435-
message = await self.verify_message(pending_message=pending_message)
436-
437-
# Do a balance pre-check to avoid saving related data
452+
message = await self.verify_and_fetch_message(
453+
pending_message=pending_message, session=session
454+
)
438455
content_handler = self.get_content_handler(message.type)
439-
await content_handler.pre_check_balance(session=session, message=message)
440-
441-
# Fetch related content like the IPFS associated file
442-
await self.fetch_related_content(session=session, message=message)
443456

444457
await content_handler.check_dependencies(session=session, message=message)
445458
await content_handler.check_permissions(session=session, message=message)

src/aleph/jobs/fetch_pending_messages.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ def __init__(
5555
async def fetch_pending_message(self, pending_message: PendingMessageDb):
5656
with self.session_factory() as session:
5757
try:
58-
message = await self.message_handler.verify_message(
59-
pending_message=pending_message
58+
message = await self.message_handler.verify_and_fetch_message(
59+
pending_message=pending_message, session=session
6060
)
61+
6162
session.execute(
6263
make_pending_message_fetched_statement(
6364
pending_message, message.content

tests/message_processing/test_process_stores.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,15 @@ async def test_process_store_no_signature(
215215
)
216216
session.commit()
217217

218+
file_hash = "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e"
219+
file_content = b"Hello Aleph.im"
220+
218221
storage_service = StorageService(
219-
storage_engine=MockStorageEngine(
220-
files={
221-
"c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e": b"Hello Aleph.im"
222-
}
223-
),
222+
storage_engine=MockStorageEngine(files={file_hash: file_content}),
224223
ipfs_service=mocker.AsyncMock(),
225224
node_cache=mocker.AsyncMock(),
226225
)
226+
227227
message_processor.message_handler.storage_service = storage_service
228228
storage_handler = message_processor.message_handler.content_handlers[
229229
MessageType.store
@@ -738,7 +738,6 @@ async def test_pre_check_balance_with_existing_costs(
738738
"aleph.storage.StorageService.get_hash_content",
739739
return_value=small_file_content,
740740
):
741-
742741
# Process first message to add existing costs
743742
await message_handler.process(
744743
session=session, pending_message=fixture_ipfs_store_message

tests/test_network.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ async def test_incoming_inline_content(
120120
reception_time=dt.datetime(2022, 1, 1),
121121
fetched=True,
122122
)
123-
124-
message = await message_handler.verify_message(pending_message=pending_message)
125-
assert message is not None
123+
with session_factory() as session:
124+
message = await message_handler.verify_and_fetch_message(
125+
pending_message=pending_message, session=session
126+
)
127+
assert message is not None

0 commit comments

Comments
 (0)