Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions tpot-analyzer/src/data/blob_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ def list_archives(self) -> List[str]:
logger.info(f"Found {len(usernames)} usernames in account table (will attempt import for each)")
return usernames

def fetch_archive(self, username: str) -> Optional[Dict]:
def fetch_archive(self, username: str) -> Optional[tuple[Dict, Optional[datetime]]]:
"""Fetch archive JSON from blob storage.

Args:
username: Twitter handle (will be lowercased)
Comment on lines +104 to 108

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Adapt bulk import to new fetch_archive return tuple

Changing fetch_archive to return (archive_dict, upload_timestamp) means callers must unpack the tuple. import_all_archives() still calls fetch_archive() and immediately treats the result as a dict (archive.get(...)), so the bulk import loop will now raise AttributeError: 'tuple' object has no attribute 'get' before any archive is processed. Update the caller to unpack the tuple or revert the return type to keep batch imports working.

Useful? React with 👍 / 👎.


Returns:
Archive dict or None if not found
Tuple of (archive_dict, upload_timestamp) or None if not found
upload_timestamp is extracted from Last-Modified header if available
"""
username_lower = username.lower()
url = f"{self.base_url}/storage/v1/object/public/archives/{username_lower}/archive.json"
Expand All @@ -124,7 +125,19 @@ def fetch_archive(self, username: str) -> Optional[Dict]:
logger.warning(f"Archive not found for '{username}' at {url}")
return None
response.raise_for_status()
return response.json()

# Extract upload timestamp from Last-Modified header
upload_timestamp = None
last_modified = response.headers.get("Last-Modified")
if last_modified:
try:
from email.utils import parsedate_to_datetime
upload_timestamp = parsedate_to_datetime(last_modified)
logger.debug(f"Archive for '{username}' last modified: {upload_timestamp}")
except Exception as e:
logger.warning(f"Failed to parse Last-Modified header: {e}")

return response.json(), upload_timestamp
except httpx.HTTPError as e:
logger.error(f"Failed to fetch archive for '{username}': {e}")
return None
Expand All @@ -146,10 +159,12 @@ def import_archive(
Returns:
Metadata about the import, or None if archive not found
"""
archive = self.fetch_archive(username)
if not archive:
result = self.fetch_archive(username)
if not result:
return None

archive, upload_timestamp = result

# Extract account info
account_data = archive.get("account", [])
if not account_data or len(account_data) == 0:
Expand Down Expand Up @@ -210,15 +225,17 @@ def import_archive(
source_account_id=account_id,
target_account_ids=following_ids,
edge_type="following",
merge_strategy=merge_strategy
merge_strategy=merge_strategy,
upload_timestamp=upload_timestamp
)

# Import follower edges
self._import_edges(
source_account_id=account_id,
target_account_ids=follower_ids,
edge_type="follower",
merge_strategy=merge_strategy
merge_strategy=merge_strategy,
upload_timestamp=upload_timestamp
)

return ArchiveMetadata(
Expand All @@ -237,7 +254,8 @@ def _import_edges(
source_account_id: str,
target_account_ids: List[str],
edge_type: str, # "following" or "follower"
merge_strategy: str
merge_strategy: str,
upload_timestamp: Optional[datetime] = None
):
"""Import edges into archive staging tables.

Expand All @@ -246,6 +264,7 @@ def _import_edges(
target_account_ids: List of account IDs in the relationship
edge_type: "following" (accounts source follows) or "follower" (accounts following source)
merge_strategy: Reserved for future use (currently always imports to staging)
upload_timestamp: Actual upload/modification time from archive metadata (HTTP Last-Modified header)

Directionality:
- "following": source_account → target_account (source follows target)
Expand All @@ -255,6 +274,8 @@ def _import_edges(
logger.debug(f"Skipping {edge_type} import (shadow_only mode)")
return

# Use actual upload timestamp if available, otherwise fall back to current time
uploaded_at = (upload_timestamp or datetime.utcnow()).isoformat()
now = datetime.utcnow().isoformat()

# Choose target table based on edge type
Expand Down Expand Up @@ -286,7 +307,7 @@ def _import_edges(
"""), {
"account_id": account_id,
"related_id": related_id,
"uploaded_at": now, # TODO: Get actual upload timestamp from archive metadata
"uploaded_at": uploaded_at, # Actual upload timestamp from HTTP Last-Modified header
"imported_at": now
})

Expand Down
22 changes: 14 additions & 8 deletions tpot-analyzer/src/graph/gpu_capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ def __str__(self) -> str:
return "GPU disabled (CPU mode)"


def _check_nvidia_smi() -> tuple[bool, Optional[str], Optional[str], Optional[str]]:
def _check_nvidia_smi() -> tuple[bool, int, Optional[str], Optional[str], Optional[str]]:
"""Check NVIDIA GPU via nvidia-smi command.

Returns:
(has_gpu, gpu_name, cuda_version, driver_version)
(has_gpu, gpu_count, gpu_name, cuda_version, driver_version)
gpu_name is from the first GPU if multiple are detected
"""
try:
result = subprocess.run(
Expand All @@ -59,16 +60,18 @@ def _check_nvidia_smi() -> tuple[bool, Optional[str], Optional[str], Optional[st
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split('\n')
if lines:
gpu_count = len(lines)
# Use first GPU's info for reporting
parts = lines[0].split(',')
gpu_name = parts[0].strip() if len(parts) > 0 else None
driver_version = parts[1].strip() if len(parts) > 1 else None
cuda_version = parts[2].strip() if len(parts) > 2 else None
return True, gpu_name, cuda_version, driver_version
return True, gpu_count, gpu_name, cuda_version, driver_version

except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
logger.debug(f"nvidia-smi check failed: {e}")

return False, None, None, None
return False, 0, None, None, None


def _check_numba_cuda() -> bool:
Expand Down Expand Up @@ -128,7 +131,7 @@ def detect_gpu_capability(force_cpu: bool = False) -> GpuCapability:
)

# Check CUDA availability
cuda_via_smi, gpu_name, cuda_version, driver_version = _check_nvidia_smi()
cuda_via_smi, gpu_count, gpu_name, cuda_version, driver_version = _check_nvidia_smi()
cuda_via_numba = _check_numba_cuda()

cuda_available = cuda_via_smi or cuda_via_numba
Expand Down Expand Up @@ -156,20 +159,23 @@ def detect_gpu_capability(force_cpu: bool = False) -> GpuCapability:
return GpuCapability(
cuda_available=True,
cugraph_available=False,
gpu_count=1 if cuda_via_smi else 0,
gpu_count=gpu_count if cuda_via_smi else 0,
gpu_name=gpu_name,
cuda_version=cuda_version,
driver_version=driver_version,
enabled=True
)

# Success - GPU fully available
logger.info(f"GPU metrics enabled: {gpu_name} (CUDA {cuda_version}, Driver {driver_version})")
gpu_info = f"GPU metrics enabled: {gpu_name} (CUDA {cuda_version}, Driver {driver_version})"
if gpu_count > 1:
gpu_info += f" - {gpu_count} GPUs detected"
logger.info(gpu_info)

return GpuCapability(
cuda_available=True,
cugraph_available=True,
gpu_count=1, # TODO: Detect multiple GPUs if needed
gpu_count=gpu_count,
gpu_name=gpu_name,
cuda_version=cuda_version,
driver_version=driver_version,
Expand Down