Skip to content
This repository was archived by the owner on Jul 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions db/migrations/20241212075345_validator_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,25 @@ CREATE TABLE chunk (
signature TEXT -- Signature of the DHT entry by the validator
);

-- Table for mapping piece_hash to piece metadata and miner_id
-- Table for mapping piece_hash to piece metadata and miner_uids
CREATE TABLE piece (
piece_hash TEXT PRIMARY KEY, -- Piece ID
validator_id INTEGER, -- ID of the validator
miner_id TEXT, -- IDs of the miner in a JSON Array
miner_uids TEXT, -- IDs of the miner in a JSON Array
chunk_idx INTEGER, -- Index of the chunk in the file
piece_idx INTEGER, -- Index of the piece in the chunk
piece_type INTEGER CHECK (piece_type IN (0, 1)), -- Type of the piece (0: data, 1: parity)
tag TEXT, -- APDP Tag of the piece
signature TEXT -- Signature of the DHT entry by the miner storing the piece
);

CREATE TABLE piece_challenge (
miner_uids TEXT, -- IDs of the miner in a JSON Array
piece_hash TEXT, -- Piece ID
challenge_timestamp TEXT, -- Timestamp of the challenge
tag TEXT -- APDP Tag of the piece
);

-- Table for miner stats --
CREATE TABLE miner_stats (
miner_uid INTEGER PRIMARY KEY,
Expand All @@ -64,3 +71,6 @@ DROP TABLE IF EXISTS chunk;

-- Drop the `tracker` table
DROP TABLE IF EXISTS tracker;

-- Drop the `piece_challenge` table
DROP TABLE IF EXISTS piece_challenge;
2 changes: 1 addition & 1 deletion docs/challenge.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ The Challenge System operates asynchronously to support distributed and scalable
The validator selects a miner and a specific data piece to challenge.

```python
await self.challenge_miner(miner_id, piece_id, tag)
await self.challenge_miner(miner_uids, piece_id, tag)
```

- **Process:**
Expand Down
87 changes: 55 additions & 32 deletions storb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from storb.dht.chunk_dht import ChunkDHTValue
from storb.dht.piece_dht import PieceDHTValue
from storb.dht.tracker_dht import TrackerDHTValue
from storb.protocol import PieceChallenge


@asynccontextmanager
Expand Down Expand Up @@ -229,28 +230,26 @@ async def delete_chunk_entry(conn: aiosqlite.Connection, chunk_hash: str):

async def set_piece_entry(conn: aiosqlite.Connection, entry: PieceDHTValue):
query = """
INSERT INTO piece (piece_hash, validator_id, miner_id, chunk_idx, piece_idx, piece_type, tag, signature)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO piece (piece_hash, validator_id, miner_uids, chunk_idx, piece_idx, piece_type, signature)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(piece_hash)
DO UPDATE SET
validator_id = excluded.validator_id,
miner_id = excluded.miner_id,
miner_uids = excluded.miner_uids,
chunk_idx = excluded.chunk_idx,
piece_idx = excluded.piece_idx,
piece_type = excluded.piece_type,
tag = excluded.tag,
signature = excluded.signature
"""
await conn.execute(
query,
(
entry.piece_hash,
entry.validator_id,
entry.miner_id,
entry.miner_uids,
entry.chunk_idx,
entry.piece_idx,
entry.piece_type,
entry.tag,
entry.signature,
),
)
Expand All @@ -270,12 +269,11 @@ async def get_piece_entry(
return PieceDHTValue(
piece_hash=row[0],
validator_id=row[1],
miner_id=row[2],
miner_uids=row[2],
chunk_idx=row[3],
piece_idx=row[4],
piece_type=row[5],
tag=row[6],
signature=row[7],
signature=row[6],
)
return None # Entry not found

Expand All @@ -289,43 +287,68 @@ async def delete_piece_entry(conn: aiosqlite.Connection, piece_hash: str):
await conn.commit()


async def get_random_piece(
conn: aiosqlite.Connection, validator_id: int
) -> PieceDHTValue | None:
"""Randomly selects a piece from the `piece` table for a given validator.
async def set_piece_challenge_entry(conn: aiosqlite.Connection, entry: PieceChallenge):
query = """
INSERT INTO piece_challenge (miner_uids, piece_hash, challenge_timestamp, tag)
VALUES (?, ?, ?, ?)
"""
await conn.execute(
query,
(
entry.miner_uids,
entry.piece_hash,
entry.challenge_timestamp,
entry.tag,
),
)
await conn.commit()


async def get_piece_challenge_entry(conn: aiosqlite.Connection, piece_id: str):
query = """
SELECT * FROM piece_challenge
WHERE piece_id = ?
"""
async with conn.execute(query, (piece_id,)) as cursor:
row = await cursor.fetchone()
if row:
return PieceChallenge(
miner_uids=row[0],
piece_hash=row[1],
challenge_timestamp=row[2],
tag=row[3],
)
return None


async def get_random_piece_challenge_entry(
conn: aiosqlite.Connection,
) -> PieceChallenge | None:
"""Randomly selects a piece from the `piece`_challenge table.

Parameters
----------
conn : aiosqlite.Connection
The database connection.
validator_id : int
The validator ID to query pieces for.

Returns
-------
PieceEntry or None
A random PieceEntry object if a piece is found, or None if the table is empty.
PieceChallenge or None
A random PieceChallenge object if a piece is found, or None if the table is empty.
"""

query = """
SELECT * FROM piece
WHERE validator_id = ?
SELECT * FROM piece_challenge
ORDER BY RANDOM()
LIMIT 1
"""

async with conn.execute(query, (validator_id,)) as cursor:
async with conn.execute(query) as cursor:
row = await cursor.fetchone()
if row:
miner_ids = [int(i) for i in row[2].split(",")]
return PieceDHTValue(
piece_hash=row[0],
validator_id=row[1],
miner_id=miner_ids,
chunk_idx=row[3],
piece_idx=row[4],
piece_type=row[5],
tag=row[6],
signature=row[7],
return PieceChallenge(
miner_uids=row[0],
piece_hash=row[1],
challenge_timestamp=row[2],
tag=row[3],
)
return None # No pieces found
return None
3 changes: 1 addition & 2 deletions storb/dht/piece_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
class PieceDHTValue(BaseModel):
piece_hash: str
validator_id: int
miner_id: set[int] | str
miner_uids: set[int] | str
chunk_idx: int
piece_idx: int
piece_type: PieceType
tag: str
signature: str

def to_dict(self) -> dict:
Expand Down
10 changes: 4 additions & 6 deletions storb/dht/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,14 @@ async def _db_write_data(self, namespace: str, key: str, value: bytes):

case "piece":
val = PieceDHTValue.model_validate_json(value)
miner_ids = ",".join(str(i) for i in val.miner_id)
miner_uids = ",".join(str(i) for i in val.miner_uids)
entry = PieceDHTValue(
piece_hash=key,
validator_id=val.validator_id,
miner_id=miner_ids,
miner_uids=miner_uids,
chunk_idx=val.chunk_idx,
piece_idx=val.piece_idx,
piece_type=val.piece_type,
tag=val.tag,
signature=val.signature,
)
logger.debug(f"flushing piece entry {entry} to disk")
Expand Down Expand Up @@ -342,16 +341,15 @@ async def _db_read_data(self, key: bytes) -> DHTValue:
entry = await db.get_piece_entry(conn, db_key)
if entry is None:
return None
miner_ids = [int(i) for i in entry.miner_id.split(",")]
miner_uids = [int(i) for i in entry.miner_uids.split(",")]
return (
PieceDHTValue(
piece_hash=entry.piece_hash,
validator_id=entry.validator_id,
miner_id=miner_ids,
miner_uids=miner_uids,
chunk_idx=entry.chunk_idx,
piece_idx=entry.piece_idx,
piece_type=entry.piece_type,
tag=entry.tag,
signature=entry.signature,
)
.model_dump_json()
Expand Down
Loading