Skip to content
This repository was archived by the owner on Jul 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions storb/validator/challenge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import uuid
from datetime import UTC, datetime, timedelta

import storb.db as db
from storb import protocol
from storb.challenge import APDPTag, Proof
from storb.util.logging import get_logger
from storb.util.message_signing import sign_message
from storb.util.query import Payload

logger = get_logger(__name__)


class ValiChallengeMixin:
async def challenge_miner(self, miner_id: int, piece_id: str, tag: str):
"""Challenge the miners to verify they are storing the pieces

Parameters
----------
miner_id : int
The ID of the miner to challenge
piece_id : str
The ID of the piece to challenge the miner for
tag : str
The tag of the piece to challenge the miner for
"""

logger.debug(f"Challenging miner {miner_id} for piece {piece_id}")
# Create the challenge message
challenge = self.challenge.issue_challenge(tag)
try:
signature = sign_message(challenge, self.keypair)
except Exception as e:
logger.error(f"Failed to sign challenge: {e}")
return

challenge_deadline: datetime = datetime.now(UTC) + timedelta(minutes=15)
challenge_deadline = challenge_deadline.isoformat()

challenge_message = protocol.NewChallenge(
challenge_id=uuid.uuid4().hex,
piece_id=piece_id,
validator_id=self.uid,
miner_id=miner_id,
challenge_deadline=challenge_deadline,
public_key=self.challenge.key.rsa.public_key().public_numbers().n,
public_exponent=self.challenge.key.rsa.public_key().public_numbers().e,
challenge=challenge,
signature=signature,
)

logger.debug(f"Challenge message: {challenge_message}")
# Send the challenge to the miner
miner_hotkey = list(self.metagraph.nodes.keys())[miner_id]
if miner_hotkey is None:
logger.error(f"Miner {miner_id} not found in metagraph")
return

payload = Payload(
data=challenge_message,
file=None,
time_elapsed=0,
)
logger.info(
f"Sent challenge {challenge_message.challenge_id} to miner {miner_id} for piece {piece_id}"
)

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats = await db.get_miner_stats(
conn=conn, miner_uid=challenge_message.miner_id
)
miner_stats["challenge_attempts"] += 1
await db.update_stats(
conn=conn, miner_uid=challenge_message.miner_id, stats=miner_stats
)

logger.debug(f"PRF KEY: {payload.data.challenge.prf_key}")
_, response = await self.query_miner(
miner_hotkey, "/challenge", payload, method="POST"
)

if response is None:
logger.error(f"Failed to challenge miner {miner_id}")
return

self.challenges[challenge_message.challenge_id] = challenge_message

logger.debug(f"Received response from miner {miner_id}, response: {response}")
logger.info(f"Successfully challenged miner {miner_id}")

async def remove_expired_challenges(self):
"""
Remove expired challenges from the `self.challenges` dictionary

Returns:
None
"""
now = datetime.now(UTC).isoformat()
keys_to_remove = []

for key, challenge in self.challenges.items():
if challenge.challenge_deadline < now:
keys_to_remove.append(key)

if not keys_to_remove:
return

for key in keys_to_remove:
challenge = self.challenges[key]
del self.challenges[key]

async def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool:
"""Verify the challenge proof from the miner

Parameters
----------
challenge_request : protocol.ProofResponse
The challenge proof from the miner

Returns
-------
bool
True if the proof is valid, False otherwise
"""

logger.debug(f"Verifying challenge proof: {challenge_request.challenge_id}")

challenge: protocol.NewChallenge = self.challenges.get(
challenge_request.challenge_id
)

if not challenge:
logger.error(f"Challenge {challenge_request.challenge_id} not found")
return False

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats = await db.get_miner_stats(conn, challenge.miner_id)

proof = challenge_request.proof
try:
proof = Proof.model_validate(proof)
except Exception as e:
logger.error(f"Invalid proof: {e}")
return False

if challenge.challenge_deadline < datetime.now(UTC).isoformat():
logger.error(f"Challenge {challenge_request.challenge_id} has expired")
return False

tag = challenge.challenge.tag
tag = APDPTag.model_validate(tag)
logger.debug(f"Tag: {tag}")

logger.debug(
f"Proof: {proof} \n Challenge: {challenge.challenge} \n tag: {tag} \n n: {challenge.public_key} \n e: {challenge.public_exponent}"
)

if not self.challenge.verify_proof(
proof=proof,
challenge=challenge.challenge,
tag=tag,
n=challenge.public_key,
e=challenge.public_exponent,
):
logger.error(
f"Proof verification failed for challenge {challenge.challenge_id}"
)
return False

logger.info(
f"Proof verification successful for challenge {challenge.challenge_id}"
)

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats["challenge_successes"] += 1
await db.update_stats(
conn=conn, miner_uid=challenge.miner_id, stats=miner_stats
)

# remove challenge from memory
del self.challenges[challenge_request.challenge_id]

return True
144 changes: 144 additions & 0 deletions storb/validator/dht.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from fastapi import HTTPException
from starlette.status import (
HTTP_404_NOT_FOUND,
HTTP_500_INTERNAL_SERVER_ERROR,
)

from storb import protocol
from storb.dht.piece_dht import PieceDHTValue
from storb.util.logging import get_logger
from storb.util.message_signing import (
PieceMessage,
TrackerMessage,
verify_message,
)

logger = get_logger(__name__)


class ValiDHTMixin:
async def get_infohash(self, infohash: str) -> protocol.GetMinersBase:
"""Retrieve all data associated with the provided infohash from the DHT.

This method looks up all piece IDs associated with the provided infohash
from the DHT. If no pieces are found, an HTTP 404 error is raised.
If the lookup for any piece's miner fails, an HTTP 500 error is raised.

Parameters
----------
infohash : str
The infohash of the file to retrieve the miners for.

Returns
-------
protocol.GetMinersBase
A GetMinersBase instance populated with the provided infohash, the associated
piece IDs, and the IDs of the miners that store those pieces.

Raises
------
HTTPException
- 404 if no pieces are found for the given infohash.
- 500 if any error occurs while retrieving miner information from the DHT.
"""

tracker_data = await self.dht.get_tracker_entry(infohash)
if tracker_data is None:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail="No tracker entry found for the given infohash",
)

try:
signature = tracker_data.signature
message = TrackerMessage(
infohash=tracker_data.infohash,
validator_id=tracker_data.validator_id,
filename=tracker_data.filename,
length=tracker_data.length,
chunk_size=tracker_data.chunk_size,
chunk_count=tracker_data.chunk_count,
chunk_hashes=tracker_data.chunk_hashes,
creation_timestamp=tracker_data.creation_timestamp,
)
logger.info(f"hotkeys: {self.metagraph}")
if not verify_message(
self.metagraph, message, signature, message.validator_id
):
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail="Signature verification failed for tracker entry",
)
except AttributeError:
logger.error(
f"Tracker entry {infohash} does not contain a signature attribute"
)
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Tracker entry {infohash} does not contain a signature attribute",
)

chunk_ids = tracker_data.chunk_hashes
chunks_metadatas = []
multi_piece_meta = []

for chunk_id in chunk_ids:
chunks_metadata = await self.dht.get_chunk_entry(chunk_id)
if chunks_metadata is None:
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get chunk for chunk_id {chunk_id}",
)
chunks_metadatas.append(chunks_metadata)
piece_ids = chunks_metadata.piece_hashes
pieces_metadata: list[PieceDHTValue] = []
for piece_id in piece_ids:
try:
piece = await self.dht.get_piece_entry(piece_id)
logger.info(f"piece: {piece}")
try:
signature = piece.signature
# create message object excluding the signature
message = PieceMessage(
piece_hash=piece_id,
chunk_idx=piece.chunk_idx,
piece_idx=piece.piece_idx,
piece_type=piece.piece_type,
)
# verify the signature
if not verify_message(
self.metagraph, message, signature, piece.validator_id
):
logger.error(
f"Signature verification failed for piece_id {piece_id}"
)
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Signature verification failed for piece_id {piece_id}",
)
pieces_metadata.append(piece)
except Exception as e:
logger.error(
f"Failed to verify signature for piece_id {piece_id}: {e}"
)
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to verify signature",
)
except Exception as e:
logger.error(f"Failed to get miner for piece_id {piece_id}: {e}")
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get miner",
)
multi_piece_meta.append(pieces_metadata)

response = protocol.GetMinersBase(
filename=tracker_data.filename,
infohash=infohash,
chunk_ids=chunk_ids,
chunks_metadata=chunks_metadatas,
pieces_metadata=multi_piece_meta,
)

return response
Loading