diff --git a/tpot-analyzer/src/data/blob_importer.py b/tpot-analyzer/src/data/blob_importer.py index 2377c05..1cd00b7 100644 --- a/tpot-analyzer/src/data/blob_importer.py +++ b/tpot-analyzer/src/data/blob_importer.py @@ -101,14 +101,15 @@ def list_archives(self) -> List[str]: logger.info(f"Found {len(usernames)} usernames in account table (will attempt import for each)") return usernames - def fetch_archive(self, username: str) -> Optional[Dict]: + def fetch_archive(self, username: str) -> Optional[tuple[Dict, Optional[datetime]]]: """Fetch archive JSON from blob storage. Args: username: Twitter handle (will be lowercased) Returns: - Archive dict or None if not found + Tuple of (archive_dict, upload_timestamp) or None if not found + upload_timestamp is extracted from Last-Modified header if available """ username_lower = username.lower() url = f"{self.base_url}/storage/v1/object/public/archives/{username_lower}/archive.json" @@ -124,7 +125,19 @@ def fetch_archive(self, username: str) -> Optional[Dict]: logger.warning(f"Archive not found for '{username}' at {url}") return None response.raise_for_status() - return response.json() + + # Extract upload timestamp from Last-Modified header + upload_timestamp = None + last_modified = response.headers.get("Last-Modified") + if last_modified: + try: + from email.utils import parsedate_to_datetime + upload_timestamp = parsedate_to_datetime(last_modified) + logger.debug(f"Archive for '{username}' last modified: {upload_timestamp}") + except Exception as e: + logger.warning(f"Failed to parse Last-Modified header: {e}") + + return response.json(), upload_timestamp except httpx.HTTPError as e: logger.error(f"Failed to fetch archive for '{username}': {e}") return None @@ -146,10 +159,12 @@ def import_archive( Returns: Metadata about the import, or None if archive not found """ - archive = self.fetch_archive(username) - if not archive: + result = self.fetch_archive(username) + if not result: return None + archive, upload_timestamp = result + # Extract account info account_data = archive.get("account", []) if not account_data or len(account_data) == 0: @@ -210,7 +225,8 @@ def import_archive( source_account_id=account_id, target_account_ids=following_ids, edge_type="following", - merge_strategy=merge_strategy + merge_strategy=merge_strategy, + upload_timestamp=upload_timestamp ) # Import follower edges @@ -218,7 +234,8 @@ def import_archive( source_account_id=account_id, target_account_ids=follower_ids, edge_type="follower", - merge_strategy=merge_strategy + merge_strategy=merge_strategy, + upload_timestamp=upload_timestamp ) return ArchiveMetadata( @@ -237,7 +254,8 @@ def _import_edges( source_account_id: str, target_account_ids: List[str], edge_type: str, # "following" or "follower" - merge_strategy: str + merge_strategy: str, + upload_timestamp: Optional[datetime] = None ): """Import edges into archive staging tables. @@ -246,6 +264,7 @@ def _import_edges( target_account_ids: List of account IDs in the relationship edge_type: "following" (accounts source follows) or "follower" (accounts following source) merge_strategy: Reserved for future use (currently always imports to staging) + upload_timestamp: Actual upload/modification time from archive metadata (HTTP Last-Modified header) Directionality: - "following": source_account → target_account (source follows target) @@ -255,6 +274,8 @@ def _import_edges( logger.debug(f"Skipping {edge_type} import (shadow_only mode)") return + # Use actual upload timestamp if available, otherwise fall back to current time + uploaded_at = (upload_timestamp or datetime.utcnow()).isoformat() now = datetime.utcnow().isoformat() # Choose target table based on edge type @@ -286,7 +307,7 @@ def _import_edges( """), { "account_id": account_id, "related_id": related_id, - "uploaded_at": now, # TODO: Get actual upload timestamp from archive metadata + "uploaded_at": uploaded_at, # Actual upload timestamp from HTTP Last-Modified header "imported_at": now }) @@ -356,9 +377,9 @@ def import_all_archives( logger.info(f"[{i}/{len(usernames)}] Processing '{username}'...") # Get account_id first to check if already imported - archive = None + result = None try: - archive = self.fetch_archive(username) + result = self.fetch_archive(username) except httpx.HTTPStatusError as e: if e.response.status_code == 400: logger.warning(f"Archive not found for '{username}' (400 Bad Request)") @@ -371,10 +392,13 @@ def import_all_archives( logger.error(f"Failed to fetch '{username}': {e}") continue - if not archive: + if not result: logger.warning(f"No archive data for '{username}'") continue + # Unpack tuple (archive_dict, upload_timestamp) + archive, upload_timestamp = result + # Extract account_id for skip check account_data = archive.get("account", []) if not account_data or len(account_data) == 0: diff --git a/tpot-analyzer/src/graph/gpu_capability.py b/tpot-analyzer/src/graph/gpu_capability.py index ce167fd..953fb2c 100644 --- a/tpot-analyzer/src/graph/gpu_capability.py +++ b/tpot-analyzer/src/graph/gpu_capability.py @@ -42,11 +42,12 @@ def __str__(self) -> str: return "GPU disabled (CPU mode)" -def _check_nvidia_smi() -> tuple[bool, Optional[str], Optional[str], Optional[str]]: +def _check_nvidia_smi() -> tuple[bool, int, Optional[str], Optional[str], Optional[str]]: """Check NVIDIA GPU via nvidia-smi command. Returns: - (has_gpu, gpu_name, cuda_version, driver_version) + (has_gpu, gpu_count, gpu_name, cuda_version, driver_version) + gpu_name is from the first GPU if multiple are detected """ try: result = subprocess.run( @@ -59,16 +60,18 @@ def _check_nvidia_smi() -> tuple[bool, Optional[str], Optional[str], Optional[st if result.returncode == 0 and result.stdout.strip(): lines = result.stdout.strip().split('\n') if lines: + gpu_count = len(lines) + # Use first GPU's info for reporting parts = lines[0].split(',') gpu_name = parts[0].strip() if len(parts) > 0 else None driver_version = parts[1].strip() if len(parts) > 1 else None cuda_version = parts[2].strip() if len(parts) > 2 else None - return True, gpu_name, cuda_version, driver_version + return True, gpu_count, gpu_name, cuda_version, driver_version except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e: logger.debug(f"nvidia-smi check failed: {e}") - return False, None, None, None + return False, 0, None, None, None def _check_numba_cuda() -> bool: @@ -128,7 +131,7 @@ def detect_gpu_capability(force_cpu: bool = False) -> GpuCapability: ) # Check CUDA availability - cuda_via_smi, gpu_name, cuda_version, driver_version = _check_nvidia_smi() + cuda_via_smi, gpu_count, gpu_name, cuda_version, driver_version = _check_nvidia_smi() cuda_via_numba = _check_numba_cuda() cuda_available = cuda_via_smi or cuda_via_numba @@ -156,7 +159,7 @@ def detect_gpu_capability(force_cpu: bool = False) -> GpuCapability: return GpuCapability( cuda_available=True, cugraph_available=False, - gpu_count=1 if cuda_via_smi else 0, + gpu_count=gpu_count if cuda_via_smi else 0, gpu_name=gpu_name, cuda_version=cuda_version, driver_version=driver_version, @@ -164,12 +167,15 @@ def detect_gpu_capability(force_cpu: bool = False) -> GpuCapability: ) # Success - GPU fully available - logger.info(f"GPU metrics enabled: {gpu_name} (CUDA {cuda_version}, Driver {driver_version})") + gpu_info = f"GPU metrics enabled: {gpu_name} (CUDA {cuda_version}, Driver {driver_version})" + if gpu_count > 1: + gpu_info += f" - {gpu_count} GPUs detected" + logger.info(gpu_info) return GpuCapability( cuda_available=True, cugraph_available=True, - gpu_count=1, # TODO: Detect multiple GPUs if needed + gpu_count=gpu_count, gpu_name=gpu_name, cuda_version=cuda_version, driver_version=driver_version,