Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/0.4.9.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
## Added

- `get_job` tool now supports fetching job details by job ID.
- `run_sql` tool now supports optional username and password parameters for database authentication.
4 changes: 2 additions & 2 deletions src/api/tools/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Database tools for SingleStore MCP server."""

from .database import run_sql
from .database import run_sql, create_pipeline

__all__ = ["run_sql"]
__all__ = ["run_sql", "create_pipeline"]
239 changes: 231 additions & 8 deletions src/api/tools/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ class DatabaseCredentials(BaseModel):


async def _get_database_credentials(
ctx: Context, target: WorkspaceTarget, database_name: str | None = None
ctx: Context,
target: WorkspaceTarget,
database_name: str | None = None,
provided_username: str | None = None,
provided_password: str | None = None,
) -> tuple[str, str]:
"""
Get database credentials based on the authentication method.
Expand All @@ -40,6 +44,8 @@ async def _get_database_credentials(
ctx: The MCP context
target: The workspace target
database_name: The database name to use for key generation
provided_username: Optional username provided by the caller
provided_password: Optional password provided by the caller

Returns:
Tuple of (username, password)
Expand All @@ -57,6 +63,11 @@ async def _get_database_credentials(
)

if is_using_api_key:
# If credentials are provided directly, use them
if provided_username and provided_password:
logger.debug(f"Using provided credentials for workspace: {target.name}")
return (provided_username, provided_password)

# For API key authentication, we need database credentials
# Generate database key using credentials manager
credentials_manager = get_session_credentials_manager()
Expand Down Expand Up @@ -212,7 +223,7 @@ async def __execute_sql_unified(
raise


def __get_workspace_by_id(workspace_id: str) -> WorkspaceTarget:
def get_workspace_by_id(workspace_id: str) -> WorkspaceTarget:
"""
Get a workspace or starter workspace by ID.

Expand Down Expand Up @@ -272,8 +283,216 @@ def __init__(self, data):
return WorkspaceTarget(target, is_shared)


async def create_pipeline(
ctx: Context,
pipeline_name: str,
data_source: str,
target_table_or_procedure: str,
workspace_id: str,
database: Optional[str] = None,
credentials: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a SQL pipeline for streaming CSV data from an S3 data source to a database table or stored procedure.

This tool is restricted to S3 data sources only and automatically configures the pipeline with:
- CSV format only
- Comma delimiter (,)
- MAX_PARTITIONS_PER_BATCH = 2
- Auto-start enabled
- No config options

Args:
pipeline_name: Name for the pipeline (will be used in CREATE PIPELINE statement)
data_source: S3 URL (must start with 's3://')
target_table_or_procedure: Target table name or procedure name (use "PROCEDURE procedure_name" for procedures)
workspace_id: Workspace ID where the pipeline should be created
database: Optional database name to use
credentials: Optional AWS credentials in JSON format for private S3 buckets:
'{"aws_access_key_id": "key", "aws_secret_access_key": "secret", "aws_session_token": "token"}'
Use '{}' for public S3 buckets
username: Optional database username for API key authentication
password: Optional database password for API key authentication

Returns:
Dictionary with pipeline creation status and details

Example:
# Create pipeline from public S3 bucket
pipeline_name = "uk_price_paid"
data_source = "s3://singlestore-docs-example-datasets/pp-monthly/pp-monthly-update-new-version.csv"
target_table_or_procedure = "process_uk_price_paid"
credentials = "{}"

# Create pipeline from private S3 bucket
credentials = '{"aws_access_key_id": "AKIAIOSFODNN7EXAMPLE", "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"}'
"""
# Validate workspace ID format
validated_id = validate_workspace_id(workspace_id)

# Validate that data source is S3
if not data_source.lower().startswith("s3://"):
return {
"status": "error",
"message": "Only S3 data sources are supported. Data source must start with 's3://'",
"errorCode": "INVALID_DATA_SOURCE",
}

await ctx.info(
f"Creating S3 pipeline '{pipeline_name}' from '{data_source}' to '{target_table_or_procedure}' in workspace '{validated_id}'"
)

start_time = time.time()

# Fixed parameters for restrictions
file_format = "CSV"
max_partitions_per_batch = 2
field_terminator = ","
field_enclosure = '"'
field_escape = "\\\\"
line_terminator = "\\n"
line_starting = ""

# Build the CREATE PIPELINE SQL statement
sql_parts = [
f"CREATE OR REPLACE PIPELINE {pipeline_name} AS",
f" LOAD DATA S3 '{data_source}'",
]

# Add credentials if provided
if credentials:
sql_parts.append(f" CREDENTIALS '{credentials}'")

# Add fixed max partitions per batch
sql_parts.append(f" MAX_PARTITIONS_PER_BATCH {max_partitions_per_batch}")

# Determine if target is a table or procedure and format accordingly
is_procedure = (
target_table_or_procedure.upper().startswith("PROCEDURE ")
or "process_" in target_table_or_procedure.lower()
)

if is_procedure:
# Remove "PROCEDURE " prefix if present
procedure_name = target_table_or_procedure
if procedure_name.upper().startswith("PROCEDURE "):
procedure_name = procedure_name[10:] # Remove "PROCEDURE " prefix
sql_parts.append(f" INTO PROCEDURE {procedure_name}")
else:
sql_parts.append(f" INTO TABLE {target_table_or_procedure}")

# Add fixed format and CSV-specific options
sql_parts.append(f" FORMAT {file_format.upper()}")
sql_parts.append(
f" FIELDS TERMINATED BY '{field_terminator}' ENCLOSED BY '{field_enclosure}' ESCAPED BY '{field_escape}'"
)
sql_parts.append(
f" LINES TERMINATED BY '{line_terminator}' STARTING BY '{line_starting}'"
)

# Combine all parts into final SQL
create_pipeline_sql = "\n".join(sql_parts) + ";"

logger.info(create_pipeline_sql)

try:
# Execute the CREATE PIPELINE statement
create_result = await run_sql(
ctx=ctx,
sql_query=create_pipeline_sql,
id=validated_id,
database=database,
username=username,
password=password,
)

if create_result.get("status") != "success":
return {
"status": "error",
"message": f"Failed to create pipeline: {create_result.get('message', 'Unknown error')}",
"errorCode": "PIPELINE_CREATION_FAILED",
"errorDetails": create_result,
}

# If auto_start is True, also start the pipeline (always true in restricted mode)
start_result = None
start_pipeline_sql = f"START PIPELINE IF NOT RUNNING {pipeline_name};"
start_result = await run_sql(
ctx=ctx,
sql_query=start_pipeline_sql,
id=validated_id,
database=database,
username=username,
password=password,
)

execution_time = (time.time() - start_time) * 1000

# Track analytics
settings = config.get_settings()
user_id = config.get_user_id()
settings.analytics_manager.track_event(
user_id,
"tool_calling",
{
"name": "create_pipeline",
"pipeline_name": pipeline_name,
"workspace_id": validated_id,
"auto_start": True,
},
)

# Build success message
success_message = f"Pipeline '{pipeline_name}' created successfully"
if start_result and start_result.get("status") == "success":
success_message += " and started"

return {
"status": "success",
"message": success_message,
"data": {
"pipelineName": pipeline_name,
"dataSource": data_source,
"targetTableOrProcedure": target_table_or_procedure,
"workspaceId": validated_id,
"database": database,
"autoStarted": start_result and start_result.get("status") == "success",
"createSql": create_pipeline_sql,
"startSql": f"START PIPELINE IF NOT RUNNING {pipeline_name};",
},
"metadata": {
"executionTimeMs": round(execution_time, 2),
"timestamp": datetime.now().isoformat(),
"fileFormat": "CSV",
"maxPartitionsPerBatch": 2,
"creationResult": create_result,
"startResult": start_result,
},
}

except Exception as e:
logger.error(f"Error creating pipeline: {str(e)}")
return {
"status": "error",
"message": f"Failed to create pipeline: {str(e)}",
"errorCode": "PIPELINE_CREATION_ERROR",
"errorDetails": {
"exception_type": type(e).__name__,
"pipelineName": pipeline_name,
"workspaceId": validated_id,
},
}


async def run_sql(
ctx: Context, sql_query: str, id: str, database: Optional[str] = None
ctx: Context,
sql_query: str,
id: str,
database: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Dict[str, Any]:
"""
Use this tool to execute a single SQL statement against a SingleStore database.
Expand All @@ -289,6 +508,8 @@ async def run_sql(
id: Workspace or starter workspace ID
sql_query: The SQL query to execute
database: (optional) Database name to use
username: (optional) Database username for authentication. If not provided, will be requested via elicitation for API key auth
password: (optional) Database password for authentication. If not provided, will be requested via elicitation for API key auth

Returns:
Standardized response with query results and metadata
Expand All @@ -303,7 +524,7 @@ async def run_sql(
settings = config.get_settings()

# Target can either be a workspace or a starter workspace
target = __get_workspace_by_id(validated_id)
target = get_workspace_by_id(validated_id)
database_name = database

# For starter workspaces, use their database name if not specified
Expand All @@ -312,7 +533,9 @@ async def run_sql(

# Get database credentials based on authentication method
try:
username, password = await _get_database_credentials(ctx, target, database_name)
db_username, db_password = await _get_database_credentials(
ctx, target, database_name, username, password
)
except Exception as e:
if "Database credentials required" in str(e):
# Handle the specific case where elicitation is not supported
Expand Down Expand Up @@ -343,8 +566,8 @@ async def run_sql(
ctx=ctx,
target=target,
sql_query=sql_query,
username=username,
password=password,
username=db_username,
password=db_password,
database=database_name,
)
except Exception as e:
Expand Down Expand Up @@ -377,7 +600,7 @@ async def run_sql(

# Track analytics
settings.analytics_manager.track_event(
username,
db_username,
"tool_calling",
{
"name": "run_sql",
Expand Down
5 changes: 5 additions & 0 deletions src/api/tools/stage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Stage tools for SingleStore MCP server."""

from .stage import upload_file_to_stage

__all__ = ["upload_file_to_stage"]
Loading
Loading