Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"dnet-p2p @ file://${PROJECT_ROOT}/lib/dnet-p2p/bindings/py",
"rich>=13.0.0",
"psutil>=5.9.0",
"pytest>=8.4.2",
]

[project.optional-dependencies]
Expand All @@ -56,6 +57,7 @@ dev = [
[project.scripts]
dnet-api = "cli.api:main"
dnet-shard = "cli.shard:main"
dnet-netcfg = "dnet.core.network.netcfg:main"

[build-system]
requires = ["uv_build>=0.8.17,<0.9.0"]
Expand All @@ -79,6 +81,7 @@ markers = [
"core: tests for core memory/cache/utils not tied to api/shard",
"e2e: integration tests requiring live servers or multiple components",
"integration: model catalog integration tests for CI (manual trigger)",
"netcfg: test low-level network configuration executable"
]

[tool.ruff]
Expand Down
1 change: 1 addition & 0 deletions src/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def update_tui_model_info(
inference_manager=inference_manager,
model_manager=model_manager,
node_id=node_id,
tui=tui,
)

tui.update_status("Starting Servers...")
Expand Down
98 changes: 98 additions & 0 deletions src/dnet/api/http_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional, Any, List
import httpx
import asyncio
import os
from hypercorn import Config
Expand All @@ -7,6 +8,7 @@
from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from dnet.utils.model import get_model_config_json
from dnet.core.network.create import topo_to_network
from distilp.profiler import profile_model
from dnet.utils.logger import logger
from .models import (
Expand All @@ -25,6 +27,8 @@
from .inference import InferenceManager
from .model_manager import ModelManager
from dnet_p2p import DnetDeviceProperties
import sys
import getpass


class HTTPServer:
Expand All @@ -35,6 +39,7 @@ def __init__(
inference_manager: InferenceManager,
model_manager: ModelManager,
node_id: str,
tui: Optional["DnetTUI"] = None,
):
self.http_port = http_port
self.cluster_manager = cluster_manager
Expand All @@ -43,6 +48,7 @@ def __init__(
self.node_id = node_id
self.app = FastAPI()
self.http_server: Optional[asyncio.Task] = None
self.tui = tui

async def start(self, shutdown_trigger: Any = lambda: asyncio.Future()) -> None:
await self._setup_routes()
Expand Down Expand Up @@ -251,6 +257,89 @@ async def get_topology(self) -> TopologyInfo:
)
return topo

async def _create_subnetworks(
self,
topology: TopologyInfo,
netcfg_password: Optional[str] = None,
netcfg_persist: Optional[bool] = False,
netcfg_user: Optional[str] = None,
) -> None:
net_topo = topo_to_network(topology)
shards = self.cluster_manager.shards

# Forward subnetwork config to shards for execution
async with httpx.AsyncClient(trust_env=False) as client:
for instance, plan in net_topo.nodes.items():
shard = shards.get(instance)
if not shard or shard.is_manager:
continue
url = f"http://{shard.local_ip}:{shard.server_port}/register"
payload = {"mappings": [r.model_dump(exclude_none=True) for r in plan.routes]}

try:
res = await client.post(url, json=payload, timeout=10.0)
except Exception as e:
logger.warning("Network config failed on %s: %r", instance, e)
continue
logger.error(res)
if res.status_code == 200:
continue

if res.status_code == 403:
password = netcfg_password
persist = bool(netcfg_persist or False)
user = netcfg_user
if not password and getattr(self, "tui", None) is not None:
persist = await self.tui.prompt_yes_no(
f"Shard {instance} needs sudo. Persist NOPASSWD for future runs?",
default=False,
)
if persist:
default_user = getpass.getuser()
user = await self.tui.prompt_text(
"User to grant NOPASSWD", default=default_user
)
password = await self.tui.prompt_password(
f"Enter admin password for {instance}: "
)

if password:
body = {
"mappings": payload["mappings"],
"password": password,
"persist": persist,
"user": user,
}
try:
res2 = await client.post(
f"http://{shard.local_ip}:{shard.server_port}/register_with_password",
json=body,
timeout=60.0,
)
logger.error("=====================")
logger.error(res2)
if res2.status_code == 200:
continue
if res2.status_code == 401:
logger.warning("Wrong password for %s", instance)
continue
except Exception as e:
logger.warning(
"Privileged apply failed on %s: %r", instance, e
)
else:
logger.warning(
"Network config skipped on %s (password not provided)",
instance,
)
else:
logger.warning(
"Network config skipped on %s (status %s): %s",
instance,
res.status_code,
(res.text or "")[:120],
)

async def prepare_topology(self, req: PrepareTopologyRequest) -> TopologyInfo:
try:
if not self.model_manager.is_model_available(req.model):
Expand Down Expand Up @@ -292,6 +381,15 @@ async def prepare_topology(self, req: PrepareTopologyRequest) -> TopologyInfo:
profiles, model_profile, req.model, num_layers, req.kv_bits
)
self.cluster_manager.current_topology = topology

await self._create_subnetworks(
topology,
netcfg_password=req.netcfg_password,
netcfg_persist=req.netcfg_persist,
netcfg_user=req.netcfg_user,
)
logger.warning("AWAIT NETWORK DONE")

return topology
except Exception as e:
logger.exception("Error in prepare_topology: %s", e)
Expand Down
10 changes: 10 additions & 0 deletions src/dnet/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ class PrepareTopologyRequest(BaseModel):
max_batch_exp: int = Field(
default=2, description="Max batch size as power of 2 exponent"
)
# Optional network config credentials for privileged apply on shards
netcfg_password: Optional[str] = Field(
default=None, description="Admin password on shards for network config"
)
netcfg_persist: Optional[bool] = Field(
default=False, description="Persist sudoers rule for future runs"
)
netcfg_user: Optional[str] = Field(
default=None, description="User to grant NOPASSWD to (when persisting)"
)


class ManualDevice(BaseModel):
Expand Down
169 changes: 169 additions & 0 deletions src/dnet/core/network/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@

import ipaddress
from pydantic import BaseModel, Field
from typing import List, Optional, Literal, Dict, Tuple
from dnet.core.types.topology import TopologyInfo
from dnet_p2p import DnetDeviceProperties
from dnet.utils.logger import logger
from dnet.core.types.topology import (
TopologyInfo as TypesTopologyInfo,
)

# Globals
TB_MDP_COST = 10
ETH_MDP_COST = 20
ETH_FDP_COST = 100
WIFI_FDB_COST = 200

MDP_IPV4_BASE = ipaddress.IPv4Address("10.101.0.0")
FDP_IPV4_BASE = ipaddress.IPv4Address("10.101.128.0")
AP_IPV4_BASE = ipaddress.IPv4Address("10.101.160.0")

MDP_INC = 0
FDP_INV = 0
AP_INV = 0

# Models
# NOTE: for now the FDP route is empty
class NetworkRoute(BaseModel):
node: str = Field(..., description="Name of neighbor node")
mdp_self_ipv4: str = Field( ..., description="Primary path self IPv4")
mdp_ipv4: str = Field( ..., description="Primary path IPv4")
mdp_interface: str = Field( ..., description="inet for primary path")
mdp_cost: int = Field(..., description="Routing cost for primary path")
fdp_ipv4: str = Field(..., description="Fallback path IPv4")
fdp_interface: str = Field( ..., description="inet for fallback path")
fdp_cost: int = Field(..., description="Routing cost for fallback path")

class NodeNetworkPlan(BaseModel):
instance: str = Field(..., description="")
mgmt_ipv4: str = Field(..., description="Management IPv4")
routes: List[NetworkRoute] = Field(
default_factory=list, description="Per-neighbor route plans"
)
tb_bridges: List[str] = Field(
default_factory=list,
description="per‑port Thunderbolt bridge interface names",
)

# L2/L3 topology for orchestration network.
class NetworkTopologyPlan(BaseModel):
nodes: Dict[str, NodeNetworkPlan] = Field(
default_factory=dict, description="Per‑node network plans"
)

def _get_tb_link(a: DnetDeviceProperties, b: DnetDeviceProperties):
if not a.thunderbolt and not b.thunderbolt:
return (False, None, None)
for b_host, b_connected in b.thunderbolt.instances:
for b_con in b_connected:
if hasattr(a.thunderbolt, "instances"):
for a_host, _ in a.thunderbolt.instances:
if b_con.uuid == a_host.uuid:
return (True, a_host.uuid, b_host.uuid)
return (False, None, None)

def topo_to_network(topo: TypesTopologyInfo) -> NetworkTopologyPlan:
_reset_addr_inc()
nodes: Dict[str, NodeNetworkPlan] = {}
dev_by_name = {d.instance: d for d in topo.devices}
br_count: Dict[str, int] = {}
tb_uuid_to_bridge: Dict[str, Dict[str, str]] = {}
links: set[tuple[str, str]] = set()

for name, dev in dev_by_name.items():
nodes[name] = NodeNetworkPlan(
instance=name,
mgmt_ipv4="",
routes=[],
tb_bridges=[],
)

for ass in topo.assignments:
if not ass.next_instance: continue
link = tuple(sorted([ass.instance, ass.next_instance]))
links.add(link)

for a_name, b_name in links:
a = dev_by_name.get(a_name)
b = dev_by_name.get(b_name)
connected, uuid_a, uuid_b = _get_tb_link(a, b)
if not connected or not uuid_a or not uuid_b:
# TODO: Fallback to ethernet or wifi
continue

# map bridges to connections
if a_name not in tb_uuid_to_bridge: tb_uuid_to_bridge[a_name] = {}
if uuid_a not in tb_uuid_to_bridge[a_name]:
idx = br_count.get(a_name, 0) + 1
br_count[a_name] = idx
br_a = f"bridge{idx}"
tb_uuid_to_bridge[a_name][uuid_a] = br_a
if br_a not in nodes[a_name].tb_bridges:
nodes[a_name].tb_bridges.append(br_a)
else:
br_a = tb_uuid_to_bridge[a_name][uuid_a]

if b_name not in tb_uuid_to_bridge: tb_uuid_to_bridge[b_name] = {}
if uuid_b not in tb_uuid_to_bridge[b_name]:
idx = br_count.get(b_name, 0) + 1
br_count[b_name] = idx
br_b = f"bridge{idx}"
tb_uuid_to_bridge[b_name][uuid_b] = br_b
if br_b not in nodes[b_name].tb_bridges:
nodes[b_name].tb_bridges.append(br_b)
else:
br_b = tb_uuid_to_bridge[b_name][uuid_b]

ip_a, ip_b = _alloc_mdp_link()
nodes[a_name].routes.append(
NetworkRoute(
node=b_name,
mdp_self_ipv4=ip_a,
mdp_ipv4=ip_b,
mdp_interface=br_a,
mdp_cost=TB_MDP_COST,
fdp_ipv4="",
fdp_interface="",
fdp_cost=0,
)
)
nodes[b_name].routes.append(
NetworkRoute(
node=a_name,
mdp_self_ipv4=ip_b,
mdp_ipv4=ip_a,
mdp_interface=br_b,
mdp_cost=TB_MDP_COST,
fdp_ipv4="",
fdp_interface="",
fdp_cost=0,
)
)
return NetworkTopologyPlan(nodes=nodes)

def _reset_addr_inc():
global MDP_INC
global FDP_INC
global AP_INC
MDP_INC = 0
FDP_INC = 0
AP_INC = 0

def _alloc_mdp_link() -> Tuple[str, str]:
global MDP_INC
root = int(MDP_IPV4_BASE) + MDP_INC
MDP_INC += 2
return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}"

def _alloc_fdp_link() -> Tuple[str, str]:
global FDP_INC
root = int(FDP_IPV4_BASE) + FDP_INC
FDP_INC += 2
return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}"

def _alloc_ap_link() -> Tuple[str, str]:
global AP_INC;
root = int(AP_IPV4_BASE) + AP_INC
AP_INC += 2
return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}"
Loading
Loading