Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
63c21cb
New dependencies
millioner Nov 28, 2025
b2e3ee9
First steps for DSperse integration. Doesn't work yet even close. Sti…
millioner Dec 2, 2025
bf36769
dslices files handling for models
millioner Dec 3, 2025
150659d
Model metadata fix
millioner Dec 3, 2025
e5f2349
Adding dslice requests to the queue and processing them
millioner Dec 3, 2025
3f5c1c1
Request generation fixes
millioner Dec 4, 2025
cc8d79b
Move DSperseManager to use it by miner
millioner Dec 4, 2025
2110cfa
Proving slices on miner side
millioner Dec 8, 2025
672920d
Verify dslice proofs in validator side
millioner Dec 10, 2025
bf901c6
Cleanup completed DSperse run
millioner Dec 10, 2025
50f142b
Generate random input for DSLice request
millioner Dec 10, 2025
14a9396
Log request type to console
millioner Dec 10, 2025
1a9bafb
Removing dsperse run fix
millioner Dec 10, 2025
2d23514
Option to disable metrics logging
millioner Dec 10, 2025
80fbdee
Requests rescktheduling and a lot of refactoring
millioner Dec 11, 2025
99f954e
Upgrade urllib3 to 2.6.2
millioner Dec 12, 2025
ecd5fea
Small fixes suggested by copilot
millioner Dec 12, 2025
c3cd0bd
Requests reskedjuling fixes :stuck_out_tongue_winking_eye:
millioner Dec 13, 2025
cb10e72
Check proof inputs before verifying
millioner Dec 13, 2025
0e92ec3
Getting slice settings fix
millioner Dec 13, 2025
59883d8
DSperse cleanup un exit
millioner Dec 13, 2025
f19be4f
Actual urls for slices
millioner Dec 13, 2025
6897792
Compile DSlices if needed during pre-flight stage
millioner Dec 13, 2025
8b6e359
Tiniest fix ever
millioner Dec 13, 2025
6042d08
Remve EZKL models
millioner Dec 15, 2025
c1b09b2
Pull DSperse from main branch
millioner Dec 15, 2025
0a0eab3
Merge branch 'testnet' into dsperse-intergration
HudsonGraeme Dec 15, 2025
2f7b5e4
Some fixes suggested by AI wisdom :kneeling_man:
millioner Dec 15, 2025
c19b98d
More fixes suggested by AI wisdom :kneeling_man: :kneeling_person: :k…
millioner Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
}
},
// Install ezkl cli, done here instead of the Dockerfile to test new versions without rebuilding the image.
"onCreateCommand": "curl https://raw.githubusercontent.com/zkonduit/ezkl/main/install_ezkl_cli.sh | bash -s v19.0.7",
"onCreateCommand": "curl https://raw.githubusercontent.com/zkonduit/ezkl/main/install_ezkl_cli.sh | bash -s v22.2.1",
"postCreateCommand": "uv tool install bittensor-cli",
"remoteEnv": {
"PATH": "${containerEnv:PATH}:/home/vscode/.ezkl"
Expand Down
25 changes: 25 additions & 0 deletions neurons/_miner/miner_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
SINGLE_PROOF_OF_WEIGHTS_MODEL_ID,
)
from deployment_layer.circuit_store import circuit_store
from execution_layer.dsperse_manager import DSperseManager
from execution_layer.generic_input import GenericInput
from execution_layer.verified_model_session import VerifiedModelSession
from protocol import (
Competition,
DSliceProofGenerationDataModel,
ProofOfWeightsDataModel,
QueryForCapacities,
QueryZkProof,
Expand All @@ -48,6 +50,7 @@ def __init__(self):
self.configure()
self.check_register(should_exit=True)
self.auto_update = AutoUpdate()
self.dsperse_manager = DSperseManager()
self.log_batch = []
self.shuffled_uids = None
self.last_shuffle_epoch = -1
Expand Down Expand Up @@ -80,6 +83,10 @@ def start_server(self) -> bool:
self.server.register_route(
path=f"/{QueryForCapacities.name}", endpoint=self.handleCapacityRequest
)
self.server.register_route(
path=f"/{DSliceProofGenerationDataModel.name}",
endpoint=self.handleDSliceRequest,
)
self.server.start()

existing_miner = self.metagraph.axons[self.subnet_uid]
Expand Down Expand Up @@ -466,6 +473,24 @@ def handleCompetitionRequest(self, data: Competition) -> JSONResponse:
status_code=500,
)

def handleDSliceRequest(self, data: DSliceProofGenerationDataModel) -> JSONResponse:
"""
Handle DSlice proof generation requests from validators.
"""
bt.logging.info(
f"Handling DSlice slice proof generation request for slice_num={data.slice_num} run_uid={data.run_uid}"
)

result = self.dsperse_manager.prove_slice(
circuit_id=data.circuit,
slice_num=data.slice_num,
inputs=data.inputs,
outputs=data.outputs,
)

# Implementation for handling DSlice slice requests goes here
return JSONResponse(content=result, status_code=200)

def queryZkProof(self, data: QueryZkProof) -> JSONResponse:
"""
This function run proof generation of the model (with its output as well)
Expand Down
12 changes: 7 additions & 5 deletions neurons/_validator/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bittensor as bt
from _validator.models.poc_rpc_request import ProofOfComputationRPCRequest
from _validator.models.pow_rpc_request import ProofOfWeightsRPCRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
import hashlib
from constants import (
MAX_SIGNATURE_LIFESPAN,
Expand Down Expand Up @@ -70,9 +71,10 @@ def _should_rate_limit(ip: str):
class ValidatorAPI:
def __init__(self, config: ValidatorConfig):
self.config = config
self.external_requests_queue: list[
ProofOfWeightsRPCRequest | ProofOfComputationRPCRequest
] = []
# a Queue of requests to be sent to miners
# consists of "real world requests" ProofOfWeightsRPCRequest and ProofOfComputationRPCRequest
# and a Request with one slice of a DSperse model (DSlice)
self.stacked_requests_queue: list[QueuedRequestDataModel] = []
self.ws_manager = WebSocketManager()
self.recent_requests: dict[str, int] = {}
self.validator_keys_cache = ValidatorKeysCache(config)
Expand Down Expand Up @@ -282,7 +284,7 @@ async def handle_proof_of_weights(
return InvalidParams(str(e))

self.pending_requests[external_request.hash] = asyncio.Event()
self.external_requests_queue.insert(0, external_request)
self.stacked_requests_queue.insert(0, external_request)
bt.logging.success(
f"External request with hash {external_request.hash} added to queue"
)
Expand Down Expand Up @@ -341,7 +343,7 @@ async def handle_proof_of_computation(
return InvalidParams(str(e))

self.pending_requests[external_request.hash] = asyncio.Event()
self.external_requests_queue.insert(0, external_request)
self.stacked_requests_queue.insert(0, external_request)
bt.logging.success(
f"External request with hash {external_request.hash} added to queue"
)
Expand Down
57 changes: 29 additions & 28 deletions neurons/_validator/core/request_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
from deployment_layer.circuit_store import circuit_store
from execution_layer.circuit import Circuit, CircuitType
from execution_layer.generic_input import GenericInput
from protocol import ProofOfWeightsDataModel, QueryZkProof
from protocol import (
ProofOfWeightsDataModel,
QueryZkProof,
DSliceProofGenerationDataModel,
)
from utils.wandb_logger import safe_log


Expand All @@ -34,25 +38,6 @@ def __init__(
self.api = api
self.hash_guard = HashGuard()

def prepare_requests(self, filtered_uids) -> list[Request]:
"""
Prepare requests for the current validation step.
This includes both regular benchmark requests and any external requests.

Args:
filtered_uids (list): List of UIDs to send requests to.

Returns:
list[Request]: List of prepared requests.
"""
if len(filtered_uids) == 0:
bt.logging.error("No UIDs to query")
return []

if self.api.external_requests_queue:
return self._prepare_real_world_requests(filtered_uids)
return self._prepare_benchmark_requests(filtered_uids)

def _check_and_create_request(
self,
uid: int,
Expand All @@ -64,7 +49,9 @@ def _check_and_create_request(
) -> Request | None:
"""Check hash and create request if valid."""
try:
if isinstance(request_data, ProofOfWeightsDataModel):
if isinstance(request_data, ProofOfWeightsDataModel) or isinstance(
request_data, DSliceProofGenerationDataModel
):
input_data = request_data.inputs
else:
input_data = request_data.query_input
Expand All @@ -91,25 +78,28 @@ def _check_and_create_request(
circuit=circuit,
request_type=request_type,
# 'inputs' are used for verification later on validator side:
# I suppose `RWR` passed here to prevent new data generation
inputs=GenericInput(RequestType.RWR, input_data),
request_hash=request_hash,
save=save,
)

def _prepare_real_world_requests(self, filtered_uids: list[int]) -> list[Request]:
external_request = self.api.external_requests_queue.pop()
def _prepare_queued_requests(self, filtered_uids: list[int]) -> list[Request]:
external_request = self.api.stacked_requests_queue.pop()
requests = []

for uid in filtered_uids:
try:
request_data, save = self.get_request_data(
RequestType.RWR, external_request.circuit, external_request
external_request.request_type,
external_request.circuit,
external_request,
)
request = self._check_and_create_request(
uid=uid,
request_data=request_data,
circuit=external_request.circuit,
request_type=RequestType.RWR,
request_type=external_request.request_type,
request_hash=external_request.hash,
save=save,
)
Expand Down Expand Up @@ -169,7 +159,7 @@ def get_request_data(
circuit.input_handler(request_type)
if request_type == RequestType.BENCHMARK
else circuit.input_handler(
RequestType.RWR,
request_type,
copy.deepcopy(request.inputs),
)
)
Expand Down Expand Up @@ -207,6 +197,17 @@ def get_request_data(
QueryZkProof(query_input=inputs, model_id=circuit.id, query_output=""),
False,
)
elif circuit.metadata.type == CircuitType.DSPERSE_PROOF_GENERATION:
return (
DSliceProofGenerationDataModel(
circuit=circuit.id,
inputs=request.inputs,
outputs=request.outputs,
slice_num=request.slice_num,
run_uid=request.run_uid,
),
False,
)

return (
ProofOfWeightsDataModel(
Expand All @@ -230,8 +231,8 @@ def prepare_single_request(self, uid: int) -> Request | None:
Returns:
Request | None: The prepared request, or None if preparation failed.
"""
if self.api.external_requests_queue:
requests = self._prepare_real_world_requests([uid])
if self.api.stacked_requests_queue:
requests = self._prepare_queued_requests([uid])
else:
requests = self._prepare_benchmark_requests([uid])

Expand Down
9 changes: 9 additions & 0 deletions neurons/_validator/core/validator_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
ONE_HOUR,
ONE_MINUTE,
)
from execution_layer.dsperse_manager import DSperseManager
from utils import AutoUpdate, clean_temp_files, with_rate_limit
from utils.gc_logging import gc_log_competition_metrics
from utils.gc_logging import log_responses as gc_log_responses
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(self, config: ValidatorConfig):
self.request_pipeline = RequestPipeline(
self.config, self.score_manager, self.api
)
self.dsperse_manager = DSperseManager()

self.request_queue = asyncio.Queue()
self.active_tasks: dict[int, asyncio.Task] = {}
Expand Down Expand Up @@ -342,6 +344,13 @@ async def maintain_request_pool(self):
slots_available = self.current_concurrency - len(self.active_tasks)

if slots_available > 0:
if not self.api.stacked_requests_queue:
# Refill the stacked requests queue from DSperse manager if needed
for (
dslice_request
) in self.dsperse_manager.generate_dslice_requests():
self.api.stacked_requests_queue.insert(0, dslice_request)

available_uids = [
uid
for uid in self.queryable_uids
Expand Down
12 changes: 10 additions & 2 deletions neurons/_validator/models/base_rpc_request.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from pydantic import BaseModel
from execution_layer.circuit import Circuit

from _validator.models.request_type import RequestType
from _validator.utils.api import hash_inputs
from execution_layer.circuit import Circuit


class QueuedRequestDataModel(BaseModel):
"""
Base model for requests that are stacked in the validator's queue and waiting to be sent to miners.
At the moment, that's a Real World Request (RWR) or a Request with one slice of a DSperse model (DSlice).
"""

class RealWorldRequest(BaseModel):
circuit: Circuit
inputs: dict
request_type: RequestType = RequestType.RWR

model_config = {"arbitrary_types_allowed": True}

Expand Down
15 changes: 15 additions & 0 deletions neurons/_validator/models/dslice_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import Field

from _validator.models.base_rpc_request import QueuedRequestDataModel
from _validator.models.request_type import RequestType


class DSliceQueuedProofRequest(QueuedRequestDataModel):
"""
Request for a DSperse slice.
"""

request_type: RequestType = RequestType.DSLICE
slice_num: str = Field(..., description="Num of the DSperse slice")
run_uid: str = Field(..., description="UID of the DSperse run")
outputs: dict = Field(..., description="Outputs of the DSperse slice")
4 changes: 2 additions & 2 deletions neurons/_validator/models/poc_rpc_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from _validator.models.base_rpc_request import RealWorldRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
from pydantic import Field
from deployment_layer.circuit_store import circuit_store
from execution_layer.circuit import CircuitType


class ProofOfComputationRPCRequest(RealWorldRequest):
class ProofOfComputationRPCRequest(QueuedRequestDataModel):
"""
Request for the Proof of Computation RPC method.
"""
Expand Down
4 changes: 2 additions & 2 deletions neurons/_validator/models/pow_rpc_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations
from _validator.models.base_rpc_request import RealWorldRequest
from _validator.models.base_rpc_request import QueuedRequestDataModel
from pydantic import Field
from deployment_layer.circuit_store import circuit_store


class ProofOfWeightsRPCRequest(RealWorldRequest):
class ProofOfWeightsRPCRequest(QueuedRequestDataModel):
"""
Request for the Proof of Weights RPC method.
"""
Expand Down
17 changes: 17 additions & 0 deletions neurons/_validator/models/request_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@


class RequestType(Enum):
"""
Enumeration of different types of requests that the validator can send to miner.
- BENCHMARK: Requests with generated input data for benchmarking purposes.
In case of empty RWR queue the validator generates some input data to keep miners busy.
- RWR: Real World Requests with actual input data for real-world inference.
Validator collects such requests from external users and stacks them in a queue to be sent to miners.
- DSLICE: That's a tricky one. We use DSperse app for slicing large models into smaller parts.
And some requests involve sliced model and each slice is sent as a separate request to the miner.
That each slice request is of type DSLICE.
At the moment we just stack DSlices to the same RWR queue, and behave as normal RWR requests.
That means DSLICE request is a part of RWR or BENCHMARK request.
XXX: Yeah, not very elegant, but we'll improve it later, I promise.
"""

BENCHMARK = "benchmark_request"
RWR = "real_world_request"
DSLICE = "dslice_request"

def __str__(self) -> str:
if self == RequestType.BENCHMARK:
return "Benchmark"
elif self == RequestType.RWR:
return "Real World Request"
elif self == RequestType.DSPERSE:
return "DSperse Request (one slice)"
else:
raise ValueError(f"Unknown request type: {self}")

Expand Down
9 changes: 9 additions & 0 deletions neurons/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def init_config(role: Optional[str] = None):
default=None,
help="Custom location for storing models data (optional)",
)
parser.add_argument(
"--dsperse-run-dir",
default=None,
help="Custom location for storing dsperse run data (optional)",
)
if role == Roles.VALIDATOR:
# CLI arguments specific to the validator
_validator_config()
Expand Down Expand Up @@ -141,6 +146,10 @@ def init_config(role: Optional[str] = None):
else:
config.full_path_models = os.path.join(config.full_path, "models")

if not config.dsperse_run_dir:
config.dsperse_run_dir = os.path.join(config.full_path, "dsperse_runs")
os.makedirs(config.dsperse_run_dir, exist_ok=True)

if config.whitelisted_public_keys:
config.whitelisted_public_keys = config.whitelisted_public_keys.split(",")

Expand Down
Loading
Loading