Skip to content

feat: Enhance DataFrame query capabilities to support Python expressions #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mcp_tools/dataframe_service/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def input_schema(self) -> Dict[str, Any]:
# For query operation
"expr": {
"type": "string",
"description": "Query expression using pandas query syntax (e.g., 'age > 30 and status == \"active\"')"
"description": "Query expression using pandas query syntax (e.g., 'age > 30 and status == \"active\"') or Python expressions (e.g., 'df[df[\"Ex\"].str.len()>0][[\"TimeStamp\",\"Msg\",\"Ex\"]].head(10)')"
},
# For describe operation
"include": {
Expand Down
47 changes: 28 additions & 19 deletions utils/dataframe_manager/query/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,28 +172,37 @@ async def query(
df: pd.DataFrame,
expr: str,
) -> DataFrameQueryResult:
"""Query DataFrame using pandas query syntax."""
"""Query DataFrame using pandas query syntax or Python expressions."""
start_time = time.time()
try:
# Try pandas query syntax first
result_df = df.query(expr)
execution_time = (time.time() - start_time) * 1000

return DataFrameQueryResult(
data=result_df,
operation="query",
parameters={"expr": expr},
metadata={
"original_shape": df.shape,
"result_shape": result_df.shape,
"rows_filtered": len(df) - len(result_df),
"filter_ratio": len(result_df) / len(df) if len(df) > 0 else 0,
"query_expression": expr,
},
execution_time_ms=execution_time,
)
except Exception as e:
self._logger.error(f"Error in query operation: {e}")
raise
query_method = "pandas_query"
except Exception:
# Fall back to Python expression evaluation
try:
result_df = eval(expr)
query_method = "python_eval"
except Exception as e:
self._logger.error(f"Error in query operation: {e}")
raise

execution_time = (time.time() - start_time) * 1000

return DataFrameQueryResult(
data=result_df,
operation="query",
parameters={"expr": expr},
metadata={
"original_shape": df.shape,
"result_shape": result_df.shape,
"rows_filtered": len(df) - len(result_df),
"filter_ratio": len(result_df) / len(df) if len(df) > 0 else 0,
"query_expression": expr,
"query_method": query_method,
},
execution_time_ms=execution_time,
)

async def filter(
self,
Expand Down
144 changes: 144 additions & 0 deletions utils/dataframe_manager/tests/test_query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,150 @@ async def test_execution_time_measurement(self, processor, sample_dataframe):
assert result.execution_time_ms >= 0
assert isinstance(result.execution_time_ms, float)

@pytest.mark.asyncio
async def test_query_operation_python_expression_basic(self, processor, sample_dataframe):
"""Test query operation with Python expression that falls back from pandas query."""
# This expression should fail with pandas query and succeed with eval
expr = "df[df['age'] > 30]"
result = await processor.query(sample_dataframe, expr)

assert isinstance(result, DataFrameQueryResult)
assert result.operation == "query"
assert result.parameters == {"expr": expr}
assert result.metadata["query_expression"] == expr
assert result.metadata["query_method"] == "python_eval"
assert result.metadata["original_shape"] == (100, 6)
assert result.metadata["rows_filtered"] > 0
assert result.metadata["filter_ratio"] < 1.0

# All results should have age > 30
if len(result.data) > 0:
assert (result.data["age"] > 30).all()

@pytest.mark.asyncio
async def test_query_operation_python_expression_with_column_selection(self, processor, sample_dataframe):
"""Test query operation with Python expression that includes column selection."""
expr = "df[df['age'] > 30][['name', 'age', 'category']]"
result = await processor.query(sample_dataframe, expr)

assert result.parameters == {"expr": expr}
assert result.metadata["query_method"] == "python_eval"
assert result.data.shape[1] == 3 # Should have only 3 columns
assert list(result.data.columns) == ['name', 'age', 'category']

# All results should have age > 30
if len(result.data) > 0:
assert (result.data["age"] > 30).all()

@pytest.mark.asyncio
async def test_query_operation_python_expression_with_head(self, processor, sample_dataframe):
"""Test query operation with Python expression that includes head()."""
expr = "df[df['age'] > 30].head(5)"
result = await processor.query(sample_dataframe, expr)

assert result.parameters == {"expr": expr}
assert result.metadata["query_method"] == "python_eval"
assert len(result.data) <= 5 # Should have at most 5 rows

# All results should have age > 30
if len(result.data) > 0:
assert (result.data["age"] > 30).all()

@pytest.mark.asyncio
async def test_query_operation_python_expression_string_length(self, processor, sample_dataframe):
"""Test query operation with Python expression using string length."""
expr = "df[df['name'].str.len() > 6]"
result = await processor.query(sample_dataframe, expr)

assert result.parameters == {"expr": expr}
assert result.metadata["query_method"] == "python_eval"

# All results should have name length > 6
if len(result.data) > 0:
assert (result.data["name"].str.len() > 6).all()

@pytest.mark.asyncio
async def test_query_operation_python_expression_complex_chaining(self, processor, sample_dataframe):
"""Test query operation with complex Python expression chaining."""
expr = "df[df['category'] == 'A'][['name', 'age']].sort_values('age').head(3)"
result = await processor.query(sample_dataframe, expr)

assert result.parameters == {"expr": expr}
assert result.metadata["query_method"] == "python_eval"
assert result.data.shape[1] == 2 # Should have only 2 columns
assert list(result.data.columns) == ['name', 'age']
assert len(result.data) <= 3 # Should have at most 3 rows

# All results should have category == 'A' (but category column is not in result)
# We can't check this directly since category column was filtered out

@pytest.mark.asyncio
async def test_query_operation_pandas_query_method_tracking(self, processor, sample_dataframe):
"""Test that pandas query method is correctly tracked in metadata."""
expr = "age > 30" # This should work with pandas query
result = await processor.query(sample_dataframe, expr)

assert result.metadata["query_method"] == "pandas_query"
assert result.parameters == {"expr": expr}

# All results should have age > 30
if len(result.data) > 0:
assert (result.data["age"] > 30).all()

@pytest.mark.asyncio
async def test_query_operation_fallback_behavior(self, processor, sample_dataframe):
"""Test that the fallback from pandas query to Python eval works correctly."""
# Test cases that should use pandas query
pandas_expressions = [
"age > 30",
"category == 'A'",
"name.str.contains('user_1')",
"age.between(25, 35)"
]

for expr in pandas_expressions:
result = await processor.query(sample_dataframe, expr)
assert result.metadata["query_method"] == "pandas_query"

# Test cases that should fallback to Python eval
python_expressions = [
"df[df['age'] > 30]",
"df.head(10)",
"df[df['category'] == 'A'][['name', 'age']]",
"df.sort_values('age').head(5)"
]

for expr in python_expressions:
result = await processor.query(sample_dataframe, expr)
assert result.metadata["query_method"] == "python_eval"

@pytest.mark.asyncio
async def test_query_operation_python_expression_error_handling(self, processor, sample_dataframe):
"""Test error handling for invalid Python expressions."""
# This should fail both pandas query and Python eval
invalid_expr = "df[df['nonexistent_column'] > 30]"

with pytest.raises(Exception):
await processor.query(sample_dataframe, invalid_expr)

# Test syntax error
syntax_error_expr = "df[df['age'] > 30" # Missing closing bracket

with pytest.raises(Exception):
await processor.query(sample_dataframe, syntax_error_expr)

@pytest.mark.asyncio
async def test_query_operation_python_expression_empty_result(self, processor, sample_dataframe):
"""Test Python expression that returns empty result."""
expr = "df[df['age'] > 1000]" # Should return no results
result = await processor.query(sample_dataframe, expr)

assert result.parameters == {"expr": expr}
assert result.metadata["query_method"] == "python_eval"
assert result.data.empty
assert result.metadata["rows_filtered"] == len(sample_dataframe)
assert result.metadata["filter_ratio"] == 0.0


if __name__ == "__main__":
pytest.main([__file__])