Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
63c21cb
New dependencies
millioner Nov 28, 2025
b2e3ee9
First steps for DSperse integration. Doesn't work yet even close. Sti…
millioner Dec 2, 2025
bf36769
dslices files handling for models
millioner Dec 3, 2025
150659d
Model metadata fix
millioner Dec 3, 2025
e5f2349
Adding dslice requests to the queue and processing them
millioner Dec 3, 2025
3f5c1c1
Request generation fixes
millioner Dec 4, 2025
cc8d79b
Move DSperseManager to use it by miner
millioner Dec 4, 2025
2110cfa
Proving slices on miner side
millioner Dec 8, 2025
672920d
Verify dslice proofs in validator side
millioner Dec 10, 2025
bf901c6
Cleanup completed DSperse run
millioner Dec 10, 2025
50f142b
Generate random input for DSLice request
millioner Dec 10, 2025
14a9396
Log request type to console
millioner Dec 10, 2025
1a9bafb
Removing dsperse run fix
millioner Dec 10, 2025
2d23514
Option to disable metrics logging
millioner Dec 10, 2025
80fbdee
Requests rescktheduling and a lot of refactoring
millioner Dec 11, 2025
99f954e
Upgrade urllib3 to 2.6.2
millioner Dec 12, 2025
ecd5fea
Small fixes suggested by copilot
millioner Dec 12, 2025
c3cd0bd
Requests reskedjuling fixes :stuck_out_tongue_winking_eye:
millioner Dec 13, 2025
cb10e72
Check proof inputs before verifying
millioner Dec 13, 2025
0e92ec3
Getting slice settings fix
millioner Dec 13, 2025
59883d8
DSperse cleanup un exit
millioner Dec 13, 2025
f19be4f
Actual urls for slices
millioner Dec 13, 2025
6897792
Compile DSlices if needed during pre-flight stage
millioner Dec 13, 2025
8b6e359
Tiniest fix ever
millioner Dec 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
}
},
// Install ezkl cli, done here instead of the Dockerfile to test new versions without rebuilding the image.
"onCreateCommand": "curl https://raw.githubusercontent.com/zkonduit/ezkl/main/install_ezkl_cli.sh | bash -s v19.0.7",
"onCreateCommand": "curl https://raw.githubusercontent.com/zkonduit/ezkl/main/install_ezkl_cli.sh | bash -s v22.2.1",
"postCreateCommand": "uv tool install bittensor-cli",
"remoteEnv": {
"PATH": "${containerEnv:PATH}:/home/vscode/.ezkl"
Expand Down
12 changes: 7 additions & 5 deletions neurons/_validator/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bittensor as bt
from _validator.models.poc_rpc_request import ProofOfComputationRPCRequest
from _validator.models.pow_rpc_request import ProofOfWeightsRPCRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
import hashlib
from constants import (
MAX_SIGNATURE_LIFESPAN,
Expand Down Expand Up @@ -70,9 +71,10 @@ def _should_rate_limit(ip: str):
class ValidatorAPI:
def __init__(self, config: ValidatorConfig):
self.config = config
self.external_requests_queue: list[
ProofOfWeightsRPCRequest | ProofOfComputationRPCRequest
] = []
# a Queue of requests to be sent to miners
# consists of "real world requests" ProofOfWeightsRPCRequest and ProofOfComputationRPCRequest
# and a Request with one slice of a DSperse model (DSlice)
self.stacked_requests_queue: list[QueuedRequestDataModel] = []
self.ws_manager = WebSocketManager()
self.recent_requests: dict[str, int] = {}
self.validator_keys_cache = ValidatorKeysCache(config)
Expand Down Expand Up @@ -282,7 +284,7 @@ async def handle_proof_of_weights(
return InvalidParams(str(e))

self.pending_requests[external_request.hash] = asyncio.Event()
self.external_requests_queue.insert(0, external_request)
self.stacked_requests_queue.insert(0, external_request)
bt.logging.success(
f"External request with hash {external_request.hash} added to queue"
)
Expand Down Expand Up @@ -341,7 +343,7 @@ async def handle_proof_of_computation(
return InvalidParams(str(e))

self.pending_requests[external_request.hash] = asyncio.Event()
self.external_requests_queue.insert(0, external_request)
self.stacked_requests_queue.insert(0, external_request)
bt.logging.success(
f"External request with hash {external_request.hash} added to queue"
)
Expand Down
59 changes: 59 additions & 0 deletions neurons/_validator/core/dsperse_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import random
import uuid

from bittensor import logging

from deployment_layer.circuit_store import circuit_store
from execution_layer.circuit import CircuitType, Circuit
from _validator.api import ValidatorAPI


class DSperseManager:
def __init__(self, api: ValidatorAPI):
self.api = api
self.circuits: list[Circuit] = [
circuit
for circuit in circuit_store.circuits.values()
if circuit.metadata.type == CircuitType.DSPERSE_PROOF_GENERATION
]
self.runs = {}

def generate_dslice_requests(self) -> list:
"""
Generate DSlice requests for DSperse models.
Each DSlice request corresponds to one slice of a DSperse model.
"""
if self.api.stacked_requests_queue or not self.circuits:
# there are already requests stacked, do not generate new DSlice requests
return []

circuit = random.choice(self.circuits)
run_uid = str(uuid.uuid4())
logging.info(
f"Generating DSlice requests for circuit {circuit.metadata.name}... Run UID: {run_uid}"
)

# TODO: ...
self.run_dsperse(circuit, run_uid)
dslice_requests = []
# Logic to create DSlice requests goes here
return dslice_requests

def run_dsperse(self, circuit: Circuit, run_uid: str) -> None:
return []

# # Create temporary folder for run metadata
# run_metadata_path = Path(tempfile.mkdtemp(prefix=f"dsperse_run_{run_uid}_"))
# save_metadata_path = run_metadata_path / "metadata.json"
# logging.info(f"Running DSperse model. Run metadata path: {run_metadata_path}")

# # Generate benchmarking input JSON
# input_json_path = run_metadata_path / "input.json"
# with open(input_json_path, "w") as f:
# json.dump(circuit.input_handler(RequestType.BENCHMARK).generate(), f)

# # init runner and run the sliced model
# runner = Runner(
# run_metadata_path=run_metadata_path, save_metadata_path=save_metadata_path
# )
# results = runner.run(input_json_path=input_json_path, slice_path=slices_path)
27 changes: 6 additions & 21 deletions neurons/_validator/core/request_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,6 @@ def __init__(
self.api = api
self.hash_guard = HashGuard()

def prepare_requests(self, filtered_uids) -> list[Request]:
"""
Prepare requests for the current validation step.
This includes both regular benchmark requests and any external requests.

Args:
filtered_uids (list): List of UIDs to send requests to.

Returns:
list[Request]: List of prepared requests.
"""
if len(filtered_uids) == 0:
bt.logging.error("No UIDs to query")
return []

if self.api.external_requests_queue:
return self._prepare_real_world_requests(filtered_uids)
return self._prepare_benchmark_requests(filtered_uids)

def _check_and_create_request(
self,
uid: int,
Expand Down Expand Up @@ -91,13 +72,14 @@ def _check_and_create_request(
circuit=circuit,
request_type=request_type,
# 'inputs' are used for verification later on validator side:
# I suppose `RWR` passed here to prevent new data generation
inputs=GenericInput(RequestType.RWR, input_data),
request_hash=request_hash,
save=save,
)

def _prepare_real_world_requests(self, filtered_uids: list[int]) -> list[Request]:
external_request = self.api.external_requests_queue.pop()
external_request = self.api.stacked_requests_queue.pop()
requests = []

for uid in filtered_uids:
Expand Down Expand Up @@ -207,6 +189,9 @@ def get_request_data(
QueryZkProof(query_input=inputs, model_id=circuit.id, query_output=""),
False,
)
elif circuit.metadata.type == CircuitType.DSPERSE_PROOF_GENERATION:
# TODO: Handle DSPERSE_PROOF_GENERATION request data preparation
pass

return (
ProofOfWeightsDataModel(
Expand All @@ -230,7 +215,7 @@ def prepare_single_request(self, uid: int) -> Request | None:
Returns:
Request | None: The prepared request, or None if preparation failed.
"""
if self.api.external_requests_queue:
if self.api.stacked_requests_queue:
requests = self._prepare_real_world_requests([uid])
else:
requests = self._prepare_benchmark_requests([uid])
Expand Down
5 changes: 5 additions & 0 deletions neurons/_validator/core/validator_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from _validator.competitions.competition import Competition
from _validator.config import ValidatorConfig
from _validator.core.capacity_manager import CapacityManager
from _validator.core.dsperse_manager import DSperseManager
from _validator.core.prometheus import (
log_error,
log_queue_metrics,
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(self, config: ValidatorConfig):
self.request_pipeline = RequestPipeline(
self.config, self.score_manager, self.api
)
self.dsperse_manager = DSperseManager(self.api)

self.request_queue = asyncio.Queue()
self.active_tasks: dict[int, asyncio.Task] = {}
Expand Down Expand Up @@ -342,6 +344,9 @@ async def maintain_request_pool(self):
slots_available = self.current_concurrency - len(self.active_tasks)

if slots_available > 0:
# TODO: some conditions to trigger dsperse requests generation?
self.dsperse_manager.generate_dslice_requests()

available_uids = [
uid
for uid in self.queryable_uids
Expand Down
7 changes: 6 additions & 1 deletion neurons/_validator/models/base_rpc_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from _validator.utils.api import hash_inputs


class RealWorldRequest(BaseModel):
class QueuedRequestDataModel(BaseModel):
"""
Base model for requests that are stacked in the validator's queue and waiting to be sent to miners.
At the moment, that's a Real World Request (RWR) or a Request with one slice of a DSperse model (DSlice).
"""

circuit: Circuit
inputs: dict

Expand Down
12 changes: 12 additions & 0 deletions neurons/_validator/models/dslice_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from _validator.models.base_rpc_request import QueuedRequestDataModel
from pydantic import Field


class DSliceQueuedProofRequest(QueuedRequestDataModel):
"""
Request for a DSperse slice.
"""

slice_num: str = Field(..., description="Num of the DSperse slice")
run_uid: str = Field(..., description="UID of the DSperse run")
outputs: dict = Field(..., description="Outputs of the DSperse slice")
4 changes: 2 additions & 2 deletions neurons/_validator/models/poc_rpc_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from _validator.models.base_rpc_request import RealWorldRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
from pydantic import Field
from deployment_layer.circuit_store import circuit_store
from execution_layer.circuit import CircuitType


class ProofOfComputationRPCRequest(RealWorldRequest):
class ProofOfComputationRPCRequest(QueuedRequestDataModel):
"""
Request for the Proof of Computation RPC method.
"""
Expand Down
4 changes: 2 additions & 2 deletions neurons/_validator/models/pow_rpc_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations
from _validator.models.base_rpc_request import RealWorldRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
from pydantic import Field
from deployment_layer.circuit_store import circuit_store


class ProofOfWeightsRPCRequest(RealWorldRequest):
class ProofOfWeightsRPCRequest(QueuedRequestDataModel):
"""
Request for the Proof of Weights RPC method.
"""
Expand Down
17 changes: 17 additions & 0 deletions neurons/_validator/models/request_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@


class RequestType(Enum):
"""
Enumeration of different types of requests that the validator can send to miner.
- BENCHMARK: Requests with generated input data for benchmarking purposes.
In case of empty RWR queue the validator generates some input data to keep miners busy.
- RWR: Real World Requests with actual input data for real-world inference.
Validator collects such requests from external users and stacks them in a queue to be sent to miners.
- DSLICE: That's a tricky one. We use DSperse app for slicing large models into smaller parts.
And some requests involve sliced model and each slice is sent as a separate request to the miner.
That each slice request is of type DSLICE.
At the moment we just stack DSlices to the same RWR queue, and behave as normal RWR requests.
That means DSLICE request is a part of RWR or BENCHMARK request.
XXX: Yeah, not very elegant, but we'll improve it later, I promise.
"""

BENCHMARK = "benchmark_request"
RWR = "real_world_request"
DSLICE = "dslice_request"

def __str__(self) -> str:
if self == RequestType.BENCHMARK:
return "Benchmark"
elif self == RequestType.RWR:
return "Real World Request"
elif self == RequestType.DSPERSE:
return "DSperse Request (one slice)"
else:
raise ValueError(f"Unknown request type: {self}")

Expand Down
Loading
Loading