diff --git a/README.md b/README.md index cf2e373..ec466a8 100644 --- a/README.md +++ b/README.md @@ -1,90 +1,143 @@ -# Vana Data Refinement Template +# Vana Data Refiner for Unwrapped Spotify Contribution Proofs -This repository serves as a template for creating Dockerized *data refinement instructions* that transform raw user data into normalized (and potentially anonymized) SQLite-compatible databases, so data in Vana can be querying by Vana's Query Engine. +This repository is a customized version of the Vana Data Refinement template, specifically designed to process and refine output data from the `unwrapped-proof-of-contribution` system. It transforms the JSON-based proof results into a normalized and queryable SQLite database, suitable for the Vana ecosystem. ## Overview -Here is an overview of the data refinement process on Vana. +This data refiner takes the `results.json` file generated by the Unwrapped Spotify proof of contribution system and transforms it into a structured SQLite database. The process involves: -![How Refinements Work](https://files.readme.io/25f8f6a4c8e785a72105d6eb012d09449f63ab5682d1f385120eaf5af871f9a2-image.png "How Refinements Work") +1. **Parsing Input**: Reading the `results.json` file which contains details about a Spotify data contribution, its validation status, score, and various attributes. +2. **Data Transformation**: Mapping the input JSON data to a predefined relational schema. This includes separating different aspects of the proof (main details, attributes, points breakdown, source file metadata) into distinct but related tables. +3. **Database Creation**: Storing the transformed data in a libSQL (SQLite compatible) database. +4. **Encryption**: Symmetrically encrypting the resulting SQLite database file using a key derived from the original file's encryption key. +5. **IPFS Upload (Optional)**: If configured, the encrypted database and its schema definition are uploaded to IPFS. -1. DLPs upload user-contributed data through their UI, and run proof-of-contribution against it. Afterwards, they call the refinement service to refine this data point. -1. The refinement service downloads the file from the Data Registry and decrypts it. -1. The refinement container, containing the instructions for data refinement (this repo), is executed - 1. The decrypted data is mounted to the container's `/input` directory - 1. The raw data points are transformed against a normalized SQLite database schema (specifically libSQL, a modern fork of SQLite) - 1. Optionally, PII (Personally Identifiable Information) is removed or masked - 1. The refined data is symmetrically encrypted with a derivative of the original file encryption key -1. The encrypted refined data is uploaded and pinned to a DLP-owned IPFS -1. The IPFS CID is written to the refinement container's `/output` directory -1. The CID of the file is added as a refinement under the original file in the Data Registry -1. Vana's Query Engine indexes that data point, aggregating it with all other data points of a given refiner. This allows SQL queries to run against all data of a particular refiner (schema). +The refined, encrypted database can then be registered with the Vana Data Registry, making the structured proof information queryable by permitted entities within the Vana ecosystem. + +## Refined Database Schema + +The refinement process generates a SQLite database with the following main tables: + +* **`unwrapped_proofs`**: Stores the core information for each proof, such as `file_id`, `dlp_id`, validity, scores, and basic metadata. +* **`proof_attributes`**: Contains detailed attributes from valid proofs, like `track_count`, `total_minutes_listened`, `unique_artist_count`, etc. Linked to `unwrapped_proofs`. +* **`points_breakdown_scores`**: Stores the breakdown of points (volume, diversity, history) for a contribution. Linked to `proof_attributes`. +* **`source_file_metadata`**: Contains metadata about the original encrypted data file processed by the proof system, including its source URL and checksums. Linked to `unwrapped_proofs`. + +For detailed column information, refer to `refiner/models/refined.py`. ## Project Structure - `refiner/`: Contains the main refinement logic - `refine.py`: Core refinement implementation - - `config.py`: Environment variables and settings needed to run your refinement + - `config.py`: Environment variables and settings (customized for Unwrapped proofs) - `__main__.py`: Entry point for the refinement execution - - `models/`: Pydantic and SQLAlchemy data models (for both unrefined and refined data) + - `models/`: Pydantic and SQLAlchemy data models + - `unrefined.py`: Pydantic models for the input `results.json` + - `refined.py`: SQLAlchemy models for the output SQLite database schema - `transformer/`: Data transformation logic + - `unwrapped_transformer.py`: Transforms Unwrapped proof data to the refined schema - `utils/`: Utility functions for encryption, IPFS upload, etc. -- `input/`: Contains raw data files to be refined +- `input/`: Directory where the input `results.json` (or similar) file should be placed - `output/`: Contains refined outputs: - - `schema.json`: Database schema definition - - `db.libsql`: SQLite database file - - `db.libsql.pgp`: Encrypted database file + - `schema.json`: SQLite schema definition (generated) + - `db.libsql`: SQLite database file (generated, unencrypted) + - `db.libsql.pgp`: Encrypted SQLite database file (generated) + - `output.json`: JSON file containing the `refinement_url` (IPFS or local file path) and schema details. - `Dockerfile`: Defines the container image for the refinement task - `requirements.txt`: Python package dependencies ## Getting Started -1. Fork this repository -1. Modify the config to match your environment, or add a .env file at the root. See below for defaults. -1. Update the schemas in `refiner/models/` to define your raw and normalized data models -1. Modify the refinement logic in `refiner/transformer/` to match your data structure -1. If needed, modify `refiner/refiner.py` with your file(s) that need to be refined -1. Build and test your refinement container +1. **Clone/Fork this Repository**: This repository contains the refiner logic tailored for Unwrapped. +2. **Input Data**: Place the `results.json` file (or a similarly structured JSON file) generated by the `unwrapped-proof-of-contribution` system into the `input/` directory. +3. **Environment Variables**: + * Create a `.env` file in the root of the project or set environment variables directly. + * The most important ones for local testing are `REFINEMENT_ENCRYPTION_KEY` (any string for testing) and optionally `PINATA_API_KEY` and `PINATA_API_SECRET` if you want to upload to IPFS via Pinata. + * See the "Environment Variables" section below for more details. +4. **Build and Test**: Follow the "Local Development" instructions. + +### Environment Variables + +Create a `.env` file in the project root or set these environment variables: -### Environment variables ```dotenv -# Local directories where inputs and outputs are found. When running on the refinement service, files will be mounted to the /input and /output directory of the container. +# Local directories where inputs and outputs are found. +# When running on the Vana refinement service, these will be /input and /output. INPUT_DIR=input OUTPUT_DIR=output -# This key is derived from the user file's original encryption key, automatically injected into the container by the refinement service. When developing locally, any string can be used here for testing. -REFINEMENT_ENCRYPTION_KEY=0x1234 - -# Required if using https://pinata.cloud (IPFS pinning service) -PINATA_API_KEY=xxx -PINATA_API_SECRET=yyy +# This key is derived from the user file's original encryption key, +# automatically injected into the container by the Vana refinement service. +# When developing locally, any non-empty string can be used here for testing. +REFINEMENT_ENCRYPTION_KEY=your_test_encryption_key_here_for_local_dev + +# Schema details (defaults are set in refiner/config.py for Unwrapped) +# SCHEMA_NAME="Unwrapped Spotify Contribution Proof" +# SCHEMA_VERSION="1.0.0" +# SCHEMA_DESCRIPTION="Schema for refined Unwrapped Spotify listening data contribution proofs." +# SCHEMA_DIALECT="sqlite" + +# Optional, required if using https://pinata.cloud (IPFS pinning service) +# If not provided, IPFS uploads will be skipped and output.refinement_url will be a local file:// path. +# PINATA_API_KEY=your_pinata_api_key +# PINATA_API_SECRET=your_pinata_api_secret ``` ## Local Development To run the refinement locally for testing: +**1. Install Dependencies:** ```bash -# With Python pip install --no-cache-dir -r requirements.txt +``` + +**2. Prepare Input:** +Place your `results.json` (or equivalent) from the `unwrapped-proof-of-contribution` output into the `input/` directory of this project. + +**3. Run with Python:** +Make sure your `.env` file is configured or environment variables are set. +```bash python -m refiner +``` + +**4. (Alternatively) Run with Docker:** + +First, build the Docker image: +```bash +docker build -t unwrapped-refiner . +``` + +Then, run the container. Remember to replace `your_test_encryption_key_here_for_local_dev` with an actual string if `REFINEMENT_ENCRYPTION_KEY` is not in your `.env` file or environment. +If using Pinata, also pass `PINATA_API_KEY` and `PINATA_API_SECRET`. -# Or with Docker -docker build -t refiner . +```bash +# Example without Pinata (output URL will be local file path) docker run \ --rm \ --volume $(pwd)/input:/input \ --volume $(pwd)/output:/output \ - --env PINATA_API_KEY=your_key \ - --env PINATA_API_SECRET=your_secret \ - refiner + --env REFINEMENT_ENCRYPTION_KEY="your_test_encryption_key_here_for_local_dev" \ + unwrapped-refiner + +# Example with Pinata +# docker run \ +# --rm \ +# --volume $(pwd)/input:/input \ +# --volume $(pwd)/output:/output \ +# --env REFINEMENT_ENCRYPTION_KEY="your_test_encryption_key_here_for_local_dev" \ +# --env PINATA_API_KEY="your_pinata_api_key" \ +# --env PINATA_API_SECRET="your_pinata_api_secret" \ +# unwrapped-refiner ``` +After execution, check the `output/` directory for `db.libsql` (the SQLite database), `db.libsql.pgp` (the encrypted database), `schema.json`, and `output.json`. + ## Contributing -If you have suggestions for improving this template, please open an issue or submit a pull request. +This refiner is specifically tailored for Unwrapped. +If you have suggestions for improving the Vana Data Refinement template itself, please refer to the [vana-data-refinement-template repository](https://github.com/vana-com/vana-data-refinement-template). For issues or improvements related to this Unwrapped-specific refiner, please open an issue or submit a pull request in this repository. ## License -[MIT License](LICENSE) - +[MIT License](LICENSE) \ No newline at end of file diff --git a/input/user.zip b/input/user.zip deleted file mode 100644 index 501bf53..0000000 Binary files a/input/user.zip and /dev/null differ diff --git a/output/.gitkeep b/output/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/refiner/config.py b/refiner/config.py index 17acf04..5c17293 100644 --- a/refiner/config.py +++ b/refiner/config.py @@ -4,55 +4,86 @@ class Settings(BaseSettings): """Global settings configuration using environment variables""" - + INPUT_DIR: str = Field( default="/input", description="Directory containing input files to process" ) - + OUTPUT_DIR: str = Field( default="/output", description="Directory where output files will be written" ) - + REFINEMENT_ENCRYPTION_KEY: str = Field( default=None, description="Key to symmetrically encrypt the refinement. This is derived from the original file encryption key" ) - + SCHEMA_NAME: str = Field( - default="Google Drive Analytics", - description="Name of the schema" + default="Unwrapped Spotify Data", + description="Schema name for Unwrapped Spotify listening data" ) - + SCHEMA_VERSION: str = Field( - default="0.0.1", - description="Version of the schema" + default="0.1.3", + description="Version of the Unwrapped Spotify schema" ) - + SCHEMA_DESCRIPTION: str = Field( - default="Schema for the Google Drive DLP, representing some basic analytics of the Google user", - description="Description of the schema" + default="Refined schema for Spotify listening history and derived top artists. Artist details are enriched via Spotify Web API.", + description="Description of the Unwrapped Spotify schema" ) - + SCHEMA_DIALECT: str = Field( default="sqlite", description="Dialect of the schema" ) - + # Optional, required if using https://pinata.cloud (IPFS pinning service) PINATA_API_KEY: Optional[str] = Field( default=None, description="Pinata API key" ) - + PINATA_API_SECRET: Optional[str] = Field( default=None, description="Pinata API secret" ) - + + PINATA_API_GATEWAY: Optional[str] = Field( + default="https://gateway.pinata.cloud/ipfs", + description="Pinata API gateway URL. Note: This is the gateway to access, not the API endpoint for upload." + ) + + # Spotify Web API Credentials + SPOTIFY_CLIENT_ID: Optional[str] = Field( + default=None, + description="Spotify Web API Client ID" + ) + SPOTIFY_CLIENT_SECRET: Optional[str] = Field( + default=None, + description="Spotify Web API Client Secret" + ) + SPOTIFY_API_URL: str = Field( + default="https://api.spotify.com/v1", + description="Base URL for Spotify Web API" + ) + SPOTIFY_TOKEN_URL: str = Field( + default="https://accounts.spotify.com/api/token", + description="Token URL for Spotify Web API" + ) + SPOTIFY_MAX_IDS_PER_BATCH: int = Field( + default=50, + description="Max IDs for Spotify batch API calls (artists/tracks)" + ) + SPOTIFY_API_CALL_DELAY_SECONDS: float = Field( + default=0.1, # Slightly increased default for Spotify API + description="Delay in seconds between individual Spotify API calls." + ) + class Config: env_file = ".env" case_sensitive = True -settings = Settings() \ No newline at end of file +settings = Settings() \ No newline at end of file diff --git a/refiner/models/offchain_schema.py b/refiner/models/offchain_schema.py index 4710e5b..1288abf 100644 --- a/refiner/models/offchain_schema.py +++ b/refiner/models/offchain_schema.py @@ -5,4 +5,4 @@ class OffChainSchema(BaseModel): version: str description: str dialect: str - schema: str \ No newline at end of file + schema_definition: str \ No newline at end of file diff --git a/refiner/models/output.py b/refiner/models/output.py index 98e9755..66fef68 100644 --- a/refiner/models/output.py +++ b/refiner/models/output.py @@ -5,4 +5,4 @@ class Output(BaseModel): refinement_url: Optional[str] = None - schema: Optional[OffChainSchema] = None \ No newline at end of file + output_schema: Optional[OffChainSchema] = None \ No newline at end of file diff --git a/refiner/models/refined.py b/refiner/models/refined.py index d06ce45..732da8b 100644 --- a/refiner/models/refined.py +++ b/refiner/models/refined.py @@ -1,41 +1,65 @@ from datetime import datetime -from sqlalchemy import Column, String, Integer, Float, ForeignKey, DateTime -from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, String, Integer, Float, DateTime, ForeignKey, Text, JSON from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base # Use this for SQLAlchemy Base -# Base model for SQLAlchemy Base = declarative_base() -# Define database models - the schema is generated using these -class UserRefined(Base): +class User(Base): __tablename__ = 'users' - - user_id = Column(String, primary_key=True) - email = Column(String, nullable=False, unique=True) + id_hash = Column(String, primary_key=True, index=True) + country = Column(String, nullable=True) + product = Column(String, nullable=True) + + listening_stats = relationship("UserListeningStats", back_populates="user", uselist=False, cascade="all, delete-orphan") + played_tracks = relationship("PlayedTrack", back_populates="user", cascade="all, delete-orphan") + top_artists_assoc = relationship("UserTopArtistAssoc", back_populates="user", cascade="all, delete-orphan") + +class UserListeningStats(Base): + __tablename__ = 'user_listening_stats' + id = Column(Integer, primary_key=True, autoincrement=True) + user_id_hash = Column(String, ForeignKey('users.id_hash'), nullable=False, index=True, unique=True) + total_minutes = Column(Integer, nullable=False) + track_count = Column(Integer, nullable=False) # From input stats, may not match count of PlayedTrack if some tracks are skipped + unique_artists_count = Column(Integer, nullable=False) # From input stats, may not match count of distinct resolved Artists + activity_period_days = Column(Integer, nullable=False) + first_listen_at = Column(DateTime, nullable=True) + last_listen_at = Column(DateTime, nullable=True) + refined_at = Column(DateTime, default=datetime.utcnow) + + user = relationship("User", back_populates="listening_stats") + +class Artist(Base): + __tablename__ = 'artists' + id = Column(String, primary_key=True, index=True) # Spotify artist ID name = Column(String, nullable=False) - locale = Column(String, nullable=False) - created_at = Column(DateTime, nullable=False) - - storage_metrics = relationship("StorageMetric", back_populates="user") - auth_sources = relationship("AuthSource", back_populates="user") - -class StorageMetric(Base): - __tablename__ = 'storage_metrics' - - metric_id = Column(Integer, primary_key=True, autoincrement=True) - user_id = Column(String, ForeignKey('users.user_id'), nullable=False) - percent_used = Column(Float, nullable=False) - recorded_at = Column(DateTime, nullable=False, default=datetime.utcnow) - - user = relationship("UserRefined", back_populates="storage_metrics") - -class AuthSource(Base): - __tablename__ = 'auth_sources' - - auth_id = Column(Integer, primary_key=True, autoincrement=True) - user_id = Column(String, ForeignKey('users.user_id'), nullable=False) - source = Column(String, nullable=False) - collection_date = Column(DateTime, nullable=False) - data_type = Column(String, nullable=False) - - user = relationship("UserRefined", back_populates="auth_sources") + popularity = Column(Integer, nullable=True) + genres = Column(JSON, nullable=True) # Storing as JSON array + followers_total = Column(Integer, nullable=True) + primary_image_url = Column(String, nullable=True) + + played_tracks = relationship("PlayedTrack", back_populates="artist") + top_artist_for_users_assoc = relationship("UserTopArtistAssoc", back_populates="artist") + +class PlayedTrack(Base): + __tablename__ = 'played_tracks' + id = Column(Integer, primary_key=True, autoincrement=True) + user_id_hash = Column(String, ForeignKey('users.id_hash'), nullable=False, index=True) + track_id = Column(String, nullable=False, index=True) + artist_id = Column(String, ForeignKey('artists.id'), nullable=False, index=True) + duration_ms = Column(Integer, nullable=False) + listened_at = Column(DateTime, nullable=False, index=True) + + user = relationship("User", back_populates="played_tracks") + artist = relationship("Artist", back_populates="played_tracks") + +class UserTopArtistAssoc(Base): + __tablename__ = 'user_top_artists' + id = Column(Integer, primary_key=True, autoincrement=True) + user_id_hash = Column(String, ForeignKey('users.id_hash'), nullable=False, index=True) + artist_id = Column(String, ForeignKey('artists.id'), nullable=False, index=True) + play_count = Column(Integer, nullable=False) + last_played_at = Column(DateTime, nullable=True) + + user = relationship("User", back_populates="top_artists_assoc") + artist = relationship("Artist", back_populates="top_artist_for_users_assoc") \ No newline at end of file diff --git a/refiner/models/unrefined.py b/refiner/models/unrefined.py index 4396bf4..09a74a5 100644 --- a/refiner/models/unrefined.py +++ b/refiner/models/unrefined.py @@ -1,23 +1,48 @@ -from typing import Optional -from pydantic import BaseModel +from typing import List, Optional, Dict, Any +from pydantic import BaseModel, Field # Reverted to Field +class UnwrappedUser(BaseModel): + id_hash: str + country: Optional[str] = None + product: Optional[str] = None -class Profile(BaseModel): +class UnwrappedStats(BaseModel): + total_minutes: int + track_count: int + unique_artists_count: int + activity_period_days: int + first_listen: Optional[str] = None # ISO datetime string + last_listen: Optional[str] = None # ISO datetime string + +class UnwrappedPlayedTrack(BaseModel): + track_id: str + artist_id: str # Primary artist ID + duration_ms: int + listened_at: str # ISO datetime string + +class UnwrappedArtistImage(BaseModel): + url: str + height: Optional[int] = None + width: Optional[int] = None + +class UnwrappedArtistFollowers(BaseModel): + href: Optional[str] = None + total: int + +class UnwrappedTopArtist(BaseModel): + id: str # Artist ID name: str - locale: str + popularity: Optional[int] = None + play_count: Optional[str] = None + last_played: Optional[str] = None -class Storage(BaseModel): - percentUsed: float +class UnwrappedData(BaseModel): + user: UnwrappedUser + stats: UnwrappedStats + tracks: List[UnwrappedPlayedTrack] = Field(default_factory=list) + top_artists_medium_term: Optional[List[UnwrappedTopArtist]] = Field(default_factory=list) class Metadata(BaseModel): source: str collectionDate: str - dataType: str - -class User(BaseModel): - userId: str - email: str - timestamp: int - profile: Profile - storage: Optional[Storage] = None - metadata: Optional[Metadata] = None \ No newline at end of file + dataType: str \ No newline at end of file diff --git a/refiner/refine.py b/refiner/refine.py index c391f53..4a211dc 100644 --- a/refiner/refine.py +++ b/refiner/refine.py @@ -4,54 +4,151 @@ from refiner.models.offchain_schema import OffChainSchema from refiner.models.output import Output -from refiner.transformer.user_transformer import UserTransformer +from refiner.transformer.unwrapped_spotify_transformer import UnwrappedSpotifyTransformer from refiner.config import settings from refiner.utils.encrypt import encrypt_file from refiner.utils.ipfs import upload_file_to_ipfs, upload_json_to_ipfs +logger = logging.getLogger(__name__) + class Refiner: def __init__(self): self.db_path = os.path.join(settings.OUTPUT_DIR, 'db.libsql') + # Ensure output directory exists + os.makedirs(settings.OUTPUT_DIR, exist_ok=True) + def transform(self) -> Output: - """Transform all input files into the database.""" - logging.info("Starting data transformation") + logger.info("Starting data transformation for Unwrapped Spotify Data") output = Output() - # Iterate through files and transform data + total_models_generated_across_all_files = 0 + json_files_found_and_attempted = 0 + + # Initialize transformer once if DB is cumulative or once per file if DB is recreated. + # Current UnwrappedSpotifyTransformer (via DataTransformer) recreates DB on init. + # So, it must be initialized inside the loop if multiple JSONs are to be processed into *separate* DB states, + # or if one JSON overwrites the previous. + # Given Vana's model of one input -> one refined output, we expect one JSON. + # If multiple JSONs are in input/, the last one processed will be the final DB content. + + # Let's assume one primary JSON file is expected, or they are merged. + # The current DataTransformer._initialize_database deletes and recreates. + # So, only the last processed JSON file's data will persist if multiple are present. + # This refinement is for "a contribution", typically one results.json. + + transformer = UnwrappedSpotifyTransformer(self.db_path) # Initializes DB (deletes if exists) + for input_filename in os.listdir(settings.INPUT_DIR): - input_file = os.path.join(settings.INPUT_DIR, input_filename) - if os.path.splitext(input_file)[1].lower() == '.json': - with open(input_file, 'r') as f: - input_data = json.load(f) - - # Transform account data - transformer = UserTransformer(self.db_path) - transformer.process(input_data) - logging.info(f"Transformed {input_filename}") - - # Create a schema based on the SQLAlchemy schema - schema = OffChainSchema( - name=settings.SCHEMA_NAME, - version=settings.SCHEMA_VERSION, - description=settings.SCHEMA_DESCRIPTION, - dialect=settings.SCHEMA_DIALECT, - schema=transformer.get_schema() - ) - output.schema = schema - - # Upload the schema to IPFS - schema_file = os.path.join(settings.OUTPUT_DIR, 'schema.json') - with open(schema_file, 'w') as f: - json.dump(schema.model_dump(), f, indent=4) - schema_ipfs_hash = upload_json_to_ipfs(schema.model_dump()) - logging.info(f"Schema uploaded to IPFS with hash: {schema_ipfs_hash}") - - # Encrypt and upload the database to IPFS - encrypted_path = encrypt_file(settings.REFINEMENT_ENCRYPTION_KEY, self.db_path) - ipfs_hash = upload_file_to_ipfs(encrypted_path) - output.refinement_url = f"https://ipfs.vana.org/ipfs/{ipfs_hash}" + input_file_path = os.path.join(settings.INPUT_DIR, input_filename) + if os.path.isfile(input_file_path) and input_filename.lower().endswith('.json'): + json_files_found_and_attempted += 1 + logger.info(f"Processing input file: {input_filename}") + try: + with open(input_file_path, 'r') as f: + input_data = json.load(f) + except json.JSONDecodeError as e: + logger.error(f"Error decoding JSON from {input_filename}: {e}. Skipping this file.") + continue + except Exception as e: + logger.error(f"Error reading file {input_filename}: {e}. Skipping this file.") continue - logging.info("Data transformation completed successfully") + # Transform data from the current file + # The transformer's DB is already initialized. If multiple JSONs, it's accumulating. + # If DataTransformer._initialize_database is called per file, then it's per file. + # As it stands, UnwrappedSpotifyTransformer's __init__ calls _initialize_database, + # so if created outside the loop, it's one DB. If inside, it's one DB per file (last one wins). + # For this specific Unwrapped use case, we assume one primary results.json. + # If multiple JSONs are present, and transformer is init outside, they will all be processed into *one* DB. + + models_from_current_file = transformer.transform(input_data) + + if models_from_current_file: + try: + # Save these models. The transformer instance (self.db_path) is the same. + num_saved = transformer.save_models(models_from_current_file) + logger.info(f"Saved {num_saved} models from {input_filename} to the database.") + total_models_generated_across_all_files += num_saved # Accumulate total models + + # Generate and set the schema definition in the output object (only once) + if not output.output_schema and num_saved > 0 : # Ensure schema is for a non-empty DB + schema_str = transformer.get_schema() + if schema_str: # Ensure schema string is not empty + schema_obj = OffChainSchema( + name=settings.SCHEMA_NAME, + version=settings.SCHEMA_VERSION, + description=settings.SCHEMA_DESCRIPTION, + dialect=settings.SCHEMA_DIALECT, + schema=schema_str + ) + output.output_schema = schema_obj + + schema_file_path = os.path.join(settings.OUTPUT_DIR, 'schema.json') + with open(schema_file_path, 'w') as sf: + json.dump(schema_obj.model_dump(exclude_none=True), sf, indent=4) # exclude_none for cleaner output + logger.info(f"Schema definition saved to {schema_file_path}") + + if settings.PINATA_API_KEY and settings.PINATA_API_SECRET: + try: + schema_ipfs_hash = upload_json_to_ipfs(schema_obj.model_dump(exclude_none=True)) + logger.info(f"Schema uploaded to IPFS with hash: {schema_ipfs_hash}") + # output.schema_ipfs_url = f"ipfs://{schema_ipfs_hash}" # Store if needed by Vana + except Exception as e: + logger.error(f"Failed to upload schema to IPFS: {e}") + else: + logger.info("Pinata API Key/Secret not set. Skipping IPFS upload for schema.") + else: + logger.warning("Generated schema string is empty. Schema will not be included in output.") + except Exception as e: + logger.error(f"Failed to save models from {input_filename} to database: {e}. Continuing...") + # Potentially some models from this file failed to save. total_models_generated_across_all_files might be optimistic. + # However, save_models re-raises, so this block might not be hit if save_models fails hard. + else: + logger.info(f"No models generated from {input_filename}. Nothing to save for this file.") + + # After processing all JSON files + if json_files_found_and_attempted > 0 and total_models_generated_across_all_files == 0: + logger.error("Data refinement process completed, but no valid records were generated and saved from the input file(s).") + raise ValueError("No records refined from input JSON file(s). Halting process.") + elif json_files_found_and_attempted == 0: + # This case should ideally be caught by __main__.py before calling Refiner. + logger.warning("No JSON input files were found in the input directory to process.") + # No ValueError here, as no work was attempted. __main__ might raise FileNotFoundError. + return output # Return empty output + + # Proceed with encryption and IPFS upload only if models were generated and saved + if total_models_generated_across_all_files > 0: + if not os.path.exists(self.db_path): + logger.error(f"Database file {self.db_path} not found after processing, but models were expected. Cannot encrypt or upload.") + # This indicates a potential issue, perhaps DB initialization failed silently or was deleted. + # Output will have no refinement_url. + else: + try: + encrypted_path = encrypt_file(settings.REFINEMENT_ENCRYPTION_KEY, self.db_path) + logger.info(f"Database encrypted to: {encrypted_path}") + + if settings.PINATA_API_KEY and settings.PINATA_API_SECRET: + try: + ipfs_hash = upload_file_to_ipfs(encrypted_path) + # Use the configured Pinata gateway for the URL + gateway_prefix = settings.PINATA_API_GATEWAY.rstrip('/') + output.refinement_url = f"{gateway_prefix}/{ipfs_hash}" + logger.info(f"Encrypted database uploaded to IPFS: {output.refinement_url}") + except Exception as e: + logger.error(f"Failed to upload refined database to IPFS: {e}") + output.refinement_url = f"file://{encrypted_path}" + else: + logger.info("Pinata API Key/Secret not set. Skipping IPFS upload for refined database.") + output.refinement_url = f"file://{encrypted_path}" + except Exception as e: + logger.error(f"Error during database encryption or IPFS upload preparation: {e}") + # output.refinement_url will remain None or be a local file path if encryption succeeded but upload failed. + else: + # This case (total_models_generated_across_all_files == 0) is handled by the ValueError above if JSONs were attempted. + # If no JSONs were attempted, this path is fine (empty output returned). + logger.info("No models were generated or saved, so no database to encrypt or upload.") + + + logger.info(f"Data transformation processing finished. Final output: {output.model_dump_json(indent=2, exclude_none=True)}") return output \ No newline at end of file diff --git a/refiner/transformer/base_transformer.py b/refiner/transformer/base_transformer.py index d55aad4..7b708b8 100644 --- a/refiner/transformer/base_transformer.py +++ b/refiner/transformer/base_transformer.py @@ -1,76 +1,123 @@ from typing import Dict, Any, List from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from refiner.models.refined import Base +from refiner.models.refined import Base # Assuming Base is correctly defined here import sqlite3 import os import logging +logger = logging.getLogger(__name__) + class DataTransformer: """ Base class for transforming JSON data into SQLAlchemy models. - Users should extend this class and override the transform method - to customize the transformation process for their specific data. + It initializes the database and provides methods for schema retrieval and saving models. """ - + def __init__(self, db_path: str): """Initialize the transformer with a database path.""" self.db_path = db_path self._initialize_database() - + def _initialize_database(self) -> None: """ Initialize or recreate the database and its tables. """ if os.path.exists(self.db_path): - os.remove(self.db_path) - logging.info(f"Deleted existing database at {self.db_path}") - - self.engine = create_engine(f'sqlite:///{self.db_path}') - Base.metadata.create_all(self.engine) - self.Session = sessionmaker(bind=self.engine) - + try: + os.remove(self.db_path) + logger.info(f"Deleted existing database at {self.db_path}") + except OSError as e: + logger.error(f"Error deleting existing database {self.db_path}: {e}") + # Depending on severity, you might want to raise this error + # or attempt to continue if the DB connection can still be made. + # For now, we'll proceed, and create_engine will handle it. + + try: + self.engine = create_engine(f'sqlite:///{self.db_path}') + Base.metadata.create_all(self.engine) + self.Session = sessionmaker(bind=self.engine) + logger.info(f"Database initialized and tables created at {self.db_path}") + except Exception as e: + logger.error(f"Failed to initialize database or create tables at {self.db_path}: {e}") + raise # Re-raise to halt if DB cannot be set up + def transform(self, data: Dict[str, Any]) -> List[Base]: """ Transform JSON data into SQLAlchemy model instances. - + Subclasses must implement this method. + Args: data: Dictionary containing the JSON data - + Returns: List of SQLAlchemy model instances to be saved to the database """ raise NotImplementedError("Subclasses must implement transform method") - - def get_schema(self): - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - # Get all table definitions in order - schema = [] - for table in cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' ORDER BY name"): - schema.append(table[0] + ";") - - conn.close() - return "\n\n".join(schema) - def process(self, data: Dict[str, Any]) -> None: + def get_schema(self) -> str: """ - Process the data transformation and save to database. - If the database already exists, it will be deleted and recreated. - - Args: - data: Dictionary containing the JSON data + Retrieves the DDL schema for all tables and indexes in the SQLite database. """ + if not os.path.exists(self.db_path): + logger.warning(f"Database file {self.db_path} does not exist. Cannot retrieve schema.") + return "" + + conn = None + try: + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + schema_parts = [] + # Get table definitions + cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name") + for table_row in cursor.fetchall(): + if table_row[0]: # Check if sql is not None + schema_parts.append(table_row[0] + ";") + + # Get index definitions + cursor.execute("SELECT sql FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' ORDER BY tbl_name, name") # Order by table then index name + for index_row in cursor.fetchall(): + if index_row[0]: # Check if sql is not None + schema_parts.append(index_row[0] + ";") + + return "\n\n".join(schema_parts) + except sqlite3.Error as e: + logger.error(f"SQLite error while getting schema from {self.db_path}: {e}") + return "" # Return empty string on error + finally: + if conn: + conn.close() + + def save_models(self, models: List[Base]) -> int: + """ + Saves a list of SQLAlchemy model instances to the database. + Returns the number of models intended for commit. + Raises an exception if the commit fails. + """ + if not models: + return 0 + session = self.Session() try: - # Transform data into model instances - models = self.transform(data) - for model in models: - session.add(model) + # Add all models to the session. + # For SQLAlchemy 1.x, session.add_all(models) is okay. + # For SQLAlchemy 2.x, it's still fine. + # If there are complex relationships and specific order of operations is needed, + # individual session.add() calls might be required, but usually add_all handles it. + session.add_all(models) session.commit() + logger.debug(f"Successfully committed {len(models)} models to the database.") + return len(models) except Exception as e: session.rollback() - raise e + logger.error(f"Error saving models to database: {e}") + # To aid debugging, you could log info about the models that failed + # for i, m_obj in enumerate(models): + # try: + # logger.debug(f"Model {i} type {type(m_obj)} content: {m_obj.__dict__}") + # except Exception as ie: + # logger.debug(f"Could not inspect model {i} type {type(m_obj)}: {ie}") + raise # Re-raise the exception to be handled by the caller finally: session.close() \ No newline at end of file diff --git a/refiner/transformer/unwrapped_spotify_transformer.py b/refiner/transformer/unwrapped_spotify_transformer.py new file mode 100644 index 0000000..f948f18 --- /dev/null +++ b/refiner/transformer/unwrapped_spotify_transformer.py @@ -0,0 +1,170 @@ +import logging +from typing import Dict, Any, List, Optional +from collections import defaultdict +from datetime import datetime + +from refiner.models.refined import Base, User, UserListeningStats, Artist, PlayedTrack, UserTopArtistAssoc +from refiner.transformer.base_transformer import DataTransformer +from refiner.models.unrefined import UnwrappedData +from refiner.utils.date import parse_timestamp +from refiner.config import settings +from refiner.utils.spotify_client import SpotifyAPIClient + +# Configure logging for this module +logger = logging.getLogger(__name__) + +class UnwrappedSpotifyTransformer(DataTransformer): + """ + Transformer for Unwrapped Spotify data, enriching artist info via Spotify API + and deriving top artists from play history. Skips artists/tracks not found via API. + """ + + def __init__(self, db_path: str): + super().__init__(db_path) + self.spotify_client = SpotifyAPIClient( + client_id=settings.SPOTIFY_CLIENT_ID, + client_secret=settings.SPOTIFY_CLIENT_SECRET + ) + if not settings.SPOTIFY_CLIENT_ID or not settings.SPOTIFY_CLIENT_SECRET: + logger.warning("Spotify Client ID or Secret is not configured. Artist/Track enrichment will likely fail or be incomplete.") + + + def transform(self, data: Dict[str, Any]) -> List[Base]: + # Reset API call count for each transform call if transformer instance is reused for multiple files + # (though current Refiner creates one transformer for all files) + # If UnwrappedSpotifyTransformer is long-lived and processes multiple independent inputs, + # you might want to reset self.spotify_client.api_call_count = 0 here. + # However, for a single run processing one main results.json, this is fine as is. + + try: + unrefined = UnwrappedData.model_validate(data) + except Exception as e: + logger.error(f"Failed to validate input data with UnwrappedData model: {e}") + logger.debug(f"Problematic data snippet: {str(data)[:500]}") + return [] + + models_to_save: List[Base] = [] + artists_in_db_cache: Dict[str, Artist] = {} + map_input_artist_id_to_db_id: Dict[str, str] = {} + + # 1. Create User + refined_user = User( + id_hash=unrefined.user.id_hash, + country=unrefined.user.country, + product=unrefined.user.product + ) + models_to_save.append(refined_user) + + # 2. Create UserListeningStats + stats_data = unrefined.stats + first_listen_dt = parse_timestamp(stats_data.first_listen) if stats_data.first_listen else None + last_listen_dt = parse_timestamp(stats_data.last_listen) if stats_data.last_listen else None + + listening_stats = UserListeningStats( + user_id_hash=unrefined.user.id_hash, + total_minutes=stats_data.total_minutes, + track_count=stats_data.track_count, + unique_artists_count=stats_data.unique_artists_count, + activity_period_days=stats_data.activity_period_days, + first_listen_at=first_listen_dt, + last_listen_at=last_listen_dt + ) + models_to_save.append(listening_stats) + + # 3. Artist Processing + all_artist_ids_from_input_tracks = list(set( + t.artist_id for t in unrefined.tracks if t.artist_id and t.track_id != t.artist_id + )) + + if all_artist_ids_from_input_tracks: + logger.info(f"Found {len(all_artist_ids_from_input_tracks)} unique artist IDs in input tracks. Fetching from Spotify API...") + spotify_artists_api_responses = self.spotify_client.get_artists(all_artist_ids_from_input_tracks) + + for i, input_id_sent_to_api in enumerate(all_artist_ids_from_input_tracks): + api_data = spotify_artists_api_responses[i] + + if api_data and api_data.get('id') and api_data.get('name'): + id_from_api_response = api_data['id'] + artist_name = api_data['name'] + + map_input_artist_id_to_db_id[input_id_sent_to_api] = id_from_api_response + + if id_from_api_response not in artists_in_db_cache: + new_artist = Artist( + id=id_from_api_response, + name=artist_name, + popularity=api_data.get('popularity'), + genres=api_data.get('genres', []), + followers_total=api_data.get('followers', {}).get('total'), + primary_image_url=(api_data.get('images', [{}])[0].get('url') if api_data.get('images') else None) + ) + artists_in_db_cache[id_from_api_response] = new_artist + models_to_save.append(new_artist) + if input_id_sent_to_api != id_from_api_response: + logger.info(f"Spotify API mapped input artist ID {input_id_sent_to_api} to {id_from_api_response} ({artist_name}).") + else: + logger.warning(f"Artist ID {input_id_sent_to_api} from input data not found, failed to fetch, or lacked essential fields (ID, name) from Spotify API. This artist and associated tracks will be skipped.") + + # 4. Process Played Tracks & Derive Top Artists data + artist_play_stats = defaultdict(lambda: {"play_count": 0, "last_played_at": None}) + actual_played_tracks_added = 0 + + for track_entry in unrefined.tracks: + if track_entry.track_id == track_entry.artist_id: + logger.debug(f"Skipping track {track_entry.track_id} as its ID matches artist_id.") + continue + + if not track_entry.artist_id: + logger.warning(f"Track {track_entry.track_id} is missing artist_id in input. Skipping.") + continue + + input_artist_id_for_track = track_entry.artist_id + db_artist_id_for_fk = map_input_artist_id_to_db_id.get(input_artist_id_for_track) + + if not db_artist_id_for_fk: + logger.warning(f"Skipping track {track_entry.track_id} (input artist: {input_artist_id_for_track}) because its artist was not resolved or mapped to a DB artist ID.") + continue + + listened_at_dt = parse_timestamp(track_entry.listened_at) + played_track = PlayedTrack( + user_id_hash=refined_user.id_hash, + track_id=track_entry.track_id, + artist_id=db_artist_id_for_fk, + duration_ms=track_entry.duration_ms, + listened_at=listened_at_dt + ) + models_to_save.append(played_track) + actual_played_tracks_added += 1 + + artist_play_stats[db_artist_id_for_fk]["play_count"] += 1 + current_last_played = artist_play_stats[db_artist_id_for_fk]["last_played_at"] + if current_last_played is None or listened_at_dt > current_last_played: + artist_play_stats[db_artist_id_for_fk]["last_played_at"] = listened_at_dt + + # 5. Create UserTopArtistAssoc from derived data + actual_top_artists_added = 0 + if actual_played_tracks_added > 0: + logger.info(f"Deriving top artists from {len(artist_play_stats)} unique played artists.") + for art_id_in_db, stats in artist_play_stats.items(): + if art_id_in_db not in artists_in_db_cache: + logger.error(f"CRITICAL LOGIC ERROR: Artist ID {art_id_in_db} in play_stats but not in artists_in_db_cache. Skipping UserTopArtistAssoc.") + continue + + top_artist_assoc = UserTopArtistAssoc( + user_id_hash=refined_user.id_hash, + artist_id=art_id_in_db, + play_count=stats["play_count"], + last_played_at=stats["last_played_at"] + ) + models_to_save.append(top_artist_assoc) + actual_top_artists_added += 1 + + logger.info(f"Data transformation for this file yielded {len(models_to_save)} model instances. " + f"Artists in DB: {len(artists_in_db_cache)}. " + f"Played tracks added: {actual_played_tracks_added}. " + f"Top artist associations added: {actual_top_artists_added}.") + + # Log the total API calls made by the Spotify client for processing this input file + logger.info(f"Spotify API client made approximately {self.spotify_client.api_call_count} calls for this transformation.") + + return models_to_save \ No newline at end of file diff --git a/refiner/transformer/user_transformer.py b/refiner/transformer/user_transformer.py deleted file mode 100644 index 693efbe..0000000 --- a/refiner/transformer/user_transformer.py +++ /dev/null @@ -1,57 +0,0 @@ -from typing import Dict, Any, List -from refiner.models.refined import Base -from refiner.transformer.base_transformer import DataTransformer -from refiner.models.refined import UserRefined, StorageMetric, AuthSource -from refiner.models.unrefined import User -from refiner.utils.date import parse_timestamp -from refiner.utils.pii import mask_email - -class UserTransformer(DataTransformer): - """ - Transformer for user data as defined in the example. - """ - - def transform(self, data: Dict[str, Any]) -> List[Base]: - """ - Transform raw user data into SQLAlchemy model instances. - - Args: - data: Dictionary containing user data - - Returns: - List of SQLAlchemy model instances - """ - # Validate data with Pydantic - unrefined_user = User.model_validate(data) - created_at = parse_timestamp(unrefined_user.timestamp) - - # Create user instance - user = UserRefined( - user_id=unrefined_user.userId, - email=mask_email(unrefined_user.email), # Apply any PII masking (optional) - name=unrefined_user.profile.name, - locale=unrefined_user.profile.locale, - created_at=created_at - ) - - models = [user] - - if unrefined_user.storage: - storage_metric = StorageMetric( - user_id=unrefined_user.userId, - percent_used=unrefined_user.storage.percentUsed, - recorded_at=created_at - ) - models.append(storage_metric) - - if unrefined_user.metadata: - collection_date = parse_timestamp(unrefined_user.metadata.collectionDate) - auth_source = AuthSource( - user_id=unrefined_user.userId, - source=unrefined_user.metadata.source, - collection_date=collection_date, - data_type=unrefined_user.metadata.dataType - ) - models.append(auth_source) - - return models \ No newline at end of file diff --git a/refiner/utils/ipfs.py b/refiner/utils/ipfs.py index 79374c8..40918a1 100644 --- a/refiner/utils/ipfs.py +++ b/refiner/utils/ipfs.py @@ -32,6 +32,7 @@ def upload_json_to_ipfs(data): result = response.json() logging.info(f"Successfully uploaded JSON to IPFS with hash: {result['IpfsHash']}") + logging.info(f"Access at: {settings.PINATA_API_GATEWAY}/{result['IpfsHash']}") return result['IpfsHash'] except requests.exceptions.RequestException as e: @@ -73,6 +74,7 @@ def upload_file_to_ipfs(file_path=None): response.raise_for_status() result = response.json() logging.info(f"Successfully uploaded file to IPFS with hash: {result['IpfsHash']}") + logging.info(f"Access at: {settings.PINATA_API_GATEWAY}/{result['IpfsHash']}") return result['IpfsHash'] except requests.exceptions.RequestException as e: @@ -83,8 +85,7 @@ def upload_file_to_ipfs(file_path=None): if __name__ == "__main__": ipfs_hash = upload_file_to_ipfs() print(f"File uploaded to IPFS with hash: {ipfs_hash}") - print(f"Access at: https://ipfs.vana.org/ipfs/{ipfs_hash}") - + ipfs_hash = upload_json_to_ipfs() print(f"JSON uploaded to IPFS with hash: {ipfs_hash}") - print(f"Access at: https://ipfs.vana.org/ipfs/{ipfs_hash}") \ No newline at end of file + print(f"Access at: {settings.PINATA_API_GATEWAY}/{ipfs_hash}") \ No newline at end of file diff --git a/refiner/utils/spotify_client.py b/refiner/utils/spotify_client.py new file mode 100644 index 0000000..73b889a --- /dev/null +++ b/refiner/utils/spotify_client.py @@ -0,0 +1,152 @@ +import logging +import time +import requests +from typing import Dict, Any, List, Optional + +from refiner.config import settings + +logger = logging.getLogger(__name__) + +class SpotifyAPIClient: + BASE_URL = settings.SPOTIFY_API_URL + TOKEN_URL = settings.SPOTIFY_TOKEN_URL + + def __init__(self, client_id: str, client_secret: str): + if not client_id or not client_secret: + logger.error("Spotify Client ID and Secret must be provided in environment variables (SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET). API calls will fail.") + self.client_id = client_id + self.client_secret = client_secret + self.access_token: Optional[str] = None + self.token_expires_at: Optional[float] = None + self.session = requests.Session() + self.api_call_delay = getattr(settings, 'SPOTIFY_API_CALL_DELAY_SECONDS', + getattr(settings, 'API_CALL_DELAY_SECONDS', 0.05)) + self.api_call_count: int = 0 # Initialize API call counter + + def _get_access_token(self) -> bool: + if not self.client_id or not self.client_secret: + logger.error("Cannot get Spotify token: Client ID or Secret not configured.") + return False + + if self.access_token and self.token_expires_at and time.time() < self.token_expires_at: + return True + + try: + self.api_call_count += 1 # Count token request + logger.debug(f"Spotify API call #{self.api_call_count} (auth): POST {self.TOKEN_URL}") + auth_response = self.session.post( + self.TOKEN_URL, + auth=(self.client_id, self.client_secret), + data={"grant_type": "client_credentials"}, + timeout=10 + ) + auth_response.raise_for_status() + token_data = auth_response.json() + self.access_token = token_data["access_token"] + self.token_expires_at = time.time() + token_data["expires_in"] - 60 + logger.info("Successfully obtained new Spotify API access token.") + return True + except requests.RequestException as e: + logger.error(f"Error obtaining Spotify access token: {e}") + self.access_token = None + self.token_expires_at = None + return False + + def _make_request(self, method: str, endpoint: str, params: Optional[Dict] = None, json_data: Optional[Dict] = None, retries: int = 3) -> Optional[Any]: + if not self._get_access_token(): + return None + + headers = {"Authorization": f"Bearer {self.access_token}"} + url = f"{self.BASE_URL}/{endpoint.lstrip('/')}" + + for attempt in range(retries): + try: + time.sleep(self.api_call_delay) + + self.api_call_count += 1 # Count data request + logger.debug(f"Spotify API call #{self.api_call_count}: {method} {url} | Params: {params} | JSON: {json_data is not None}") + + response = self.session.request(method, url, headers=headers, params=params, json=json_data, timeout=15) + + if response.status_code == 429: + retry_after = int(response.headers.get("Retry-After", "1")) + logger.warning(f"Rate limit hit for {url}. Retrying after {retry_after} seconds. Attempt {attempt + 1}/{retries}") + if attempt + 1 >= retries: + logger.error(f"Max retries reached for rate limit on {url}.") + response.raise_for_status() + time.sleep(retry_after) + continue + + response.raise_for_status() + + if response.status_code == 204 or not response.content: + return None + return response.json() + + except requests.RequestException as e: + logger.error(f"RequestException on {method} {url} (attempt {attempt + 1}/{retries}): {e}") + if attempt + 1 >= retries: + logger.error(f"Failed request to {url} after {retries} attempts.") + return None + time.sleep(min(30, (2 ** attempt))) + return None + + # ... rest of the methods (get_artists, get_artist, get_tracks, get_track) remain the same ... + def get_artists(self, artist_ids: List[str]) -> List[Optional[Dict[str, Any]]]: + if not artist_ids: + return [] + + unique_artist_ids = list(set(artist_id for artist_id in artist_ids if artist_id)) + if not unique_artist_ids: + return [] + + all_artists_data_map = {} + + for i in range(0, len(unique_artist_ids), settings.SPOTIFY_MAX_IDS_PER_BATCH): + batch_ids = unique_artist_ids[i:i + settings.SPOTIFY_MAX_IDS_PER_BATCH] + # logger.debug(f"Fetching artist batch: {batch_ids}") # Covered by _make_request debug log + params = {"ids": ",".join(batch_ids)} + response_data = self._make_request("GET", "artists", params=params) + + if response_data and "artists" in response_data: + for artist_data in response_data["artists"]: + if artist_data and 'id' in artist_data: + all_artists_data_map[artist_data['id']] = artist_data + else: + logger.warning(f"Failed to fetch or parse artist data for batch starting with: {batch_ids[0] if batch_ids else 'N/A'}") + + ordered_results = [all_artists_data_map.get(aid) for aid in unique_artist_ids] + return ordered_results + + def get_artist(self, artist_id: str) -> Optional[Dict[str, Any]]: + if not artist_id: return None + return self._make_request("GET", f"artists/{artist_id}") + + def get_tracks(self, track_ids: List[str]) -> List[Optional[Dict[str, Any]]]: + if not track_ids: + return [] + unique_track_ids = list(set(track_id for track_id in track_ids if track_id)) + if not unique_track_ids: + return [] + + all_tracks_data_map = {} + + for i in range(0, len(unique_track_ids), settings.SPOTIFY_MAX_IDS_PER_BATCH): + batch_ids = unique_track_ids[i:i + settings.SPOTIFY_MAX_IDS_PER_BATCH] + # logger.debug(f"Fetching track batch: {batch_ids}") # Covered by _make_request debug log + params = {"ids": ",".join(batch_ids)} + response_data = self._make_request("GET", "tracks", params=params) + + if response_data and "tracks" in response_data: + for track_data in response_data["tracks"]: + if track_data and 'id' in track_data: + all_tracks_data_map[track_data['id']] = track_data + else: + logger.warning(f"Failed to fetch or parse track data for batch starting with: {batch_ids[0] if batch_ids else 'N/A'}") + + ordered_results = [all_tracks_data_map.get(tid) for tid in unique_track_ids] + return ordered_results + + def get_track(self, track_id: str) -> Optional[Dict[str, Any]]: + if not track_id: return None + return self._make_request("GET", f"tracks/{track_id}") \ No newline at end of file