-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Setup Django Project and setup api app and a testing router #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 39 commits
4753ee4
84f3c35
558e13e
e16f2df
0598536
a5c32d8
3385dee
e8e953e
0e1e982
e430043
b870060
0faee97
b35e17f
4650c18
b01b129
4ab28ec
ecbae17
a8da103
02db6ae
24f4a94
43a6501
da61bb5
8d69776
88bb043
fd1079a
7034411
a31c58e
bb73943
d8803c7
3d3ad64
79f1878
41881ef
4091e1c
c111433
38c2d82
c7c87f9
a2c09e2
0f32fd3
40a8199
1056d1c
f4f2d38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| """ | ||
| Pathway pipeline module for BreathClean. | ||
| """ | ||
| from .pipeline import run_batch_pipeline, run_simple_batch | ||
| from .transformers import compute_route_score, compute_batch_scores | ||
|
|
||
| __all__ = [ | ||
| "run_batch_pipeline", | ||
| "run_simple_batch", | ||
| "compute_route_score", | ||
| "compute_batch_scores" | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,312 @@ | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Pathway pipeline for BreathClean batch score computation. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| This pipeline can run in two modes: | ||||||||||||||||||||
| 1. Streaming mode: Continuously process incoming data | ||||||||||||||||||||
| 2. Batch mode: Process a batch of routes and return results | ||||||||||||||||||||
|
|
||||||||||||||||||||
| For the hackathon, we use batch mode triggered by HTTP endpoint. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which | ||||||||||||||||||||
| uses direct Python computation. For production, run in Docker/WSL. | ||||||||||||||||||||
|
Comment on lines
+10
to
+11
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring incorrectly limits Pathway to Linux — macOS is also supported. Pathway is available on macOS and Linux; users of other systems should run Pathway on a Virtual Machine. -NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which
-uses direct Python computation. For production, run in Docker/WSL.
+NOTE: Pathway is available on Linux and macOS. On Windows, use run_simple_batch() which
+uses direct Python computation. For production on Windows, run in Docker/WSL.📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| """ | ||||||||||||||||||||
| from typing import List, Dict, Any, Optional | ||||||||||||||||||||
| import json | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Try to import Pathway - gracefully handle if not available (Windows) | ||||||||||||||||||||
| PATHWAY_AVAILABLE = False | ||||||||||||||||||||
| pw = None | ||||||||||||||||||||
|
|
||||||||||||||||||||
| try: | ||||||||||||||||||||
| import pathway as _pw | ||||||||||||||||||||
| # Verify it's the real Pathway package | ||||||||||||||||||||
| if hasattr(_pw, 'Schema'): | ||||||||||||||||||||
| pw = _pw | ||||||||||||||||||||
| PATHWAY_AVAILABLE = True | ||||||||||||||||||||
| except (ImportError, AttributeError): | ||||||||||||||||||||
| pass | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from .transformers import compute_route_score, compute_batch_scores | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Define schema only if Pathway is available | ||||||||||||||||||||
| RouteInputSchema = None | ||||||||||||||||||||
| if PATHWAY_AVAILABLE and pw is not None: | ||||||||||||||||||||
| class RouteInputSchema(pw.Schema): | ||||||||||||||||||||
| """Schema for incoming route data.""" | ||||||||||||||||||||
| route_id: str | ||||||||||||||||||||
| route_index: int | ||||||||||||||||||||
| distance: float | ||||||||||||||||||||
| duration: float | ||||||||||||||||||||
| travel_mode: str | ||||||||||||||||||||
| weather_points: str # JSON string of weather data | ||||||||||||||||||||
| aqi_points: str # JSON string of AQI data | ||||||||||||||||||||
| traffic_value: float | ||||||||||||||||||||
| last_computed_score: Optional[float] | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def process_route_row( | ||||||||||||||||||||
| route_id: str, | ||||||||||||||||||||
| route_index: int, | ||||||||||||||||||||
| distance: float, | ||||||||||||||||||||
| duration: float, | ||||||||||||||||||||
| travel_mode: str, | ||||||||||||||||||||
| weather_points: str, | ||||||||||||||||||||
| aqi_points: str, | ||||||||||||||||||||
| traffic_value: float, | ||||||||||||||||||||
| last_computed_score: Optional[float] | ||||||||||||||||||||
| ) -> str: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Process a single route row and compute its score. | ||||||||||||||||||||
| Returns JSON string of computed score. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| weather_data = json.loads(weather_points) if weather_points else [] | ||||||||||||||||||||
| aqi_data = json.loads(aqi_points) if aqi_points else [] | ||||||||||||||||||||
| except json.JSONDecodeError: | ||||||||||||||||||||
| weather_data = [] | ||||||||||||||||||||
| aqi_data = [] | ||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| route_data = { | ||||||||||||||||||||
| "routeId": route_id, | ||||||||||||||||||||
| "routeIndex": route_index, | ||||||||||||||||||||
| "distance": distance, | ||||||||||||||||||||
| "duration": duration, | ||||||||||||||||||||
| "travelMode": travel_mode, | ||||||||||||||||||||
| "weatherPoints": weather_data, | ||||||||||||||||||||
| "aqiPoints": aqi_data, | ||||||||||||||||||||
| "trafficValue": traffic_value, | ||||||||||||||||||||
| "lastComputedScore": last_computed_score | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| result = compute_route_score(route_data) | ||||||||||||||||||||
| return json.dumps(result) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def run_batch_pipeline(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Run Pathway pipeline in batch mode for a list of routes. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| This is the main entry point called by the Django view. | ||||||||||||||||||||
| Uses Pathway's batch processing capabilities for efficient computation. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Falls back to simple batch processing if Pathway is not available (Windows). | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Args: | ||||||||||||||||||||
| routes_data: List of route dictionaries with weather/aqi/traffic data | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Returns: | ||||||||||||||||||||
| Dictionary with computed scores and summary | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| # If Pathway is not available, fall back to simple batch processing | ||||||||||||||||||||
| if not PATHWAY_AVAILABLE or pw is None: | ||||||||||||||||||||
| result = run_simple_batch(routes_data) | ||||||||||||||||||||
| result["engine"] = "python-fallback" | ||||||||||||||||||||
| result["message"] = "Scores computed (Pathway not available, using Python fallback)" | ||||||||||||||||||||
| return result | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # For batch processing, we use Pathway's static mode | ||||||||||||||||||||
| # This processes all data at once and returns results | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if not routes_data: | ||||||||||||||||||||
| return { | ||||||||||||||||||||
| "success": False, | ||||||||||||||||||||
| "message": "No routes provided", | ||||||||||||||||||||
| "routes": [], | ||||||||||||||||||||
| "summary": None | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Convert input to Pathway table format | ||||||||||||||||||||
| table_data = [] | ||||||||||||||||||||
| for route in routes_data: | ||||||||||||||||||||
| table_data.append({ | ||||||||||||||||||||
| "route_id": str(route.get("routeId", "")), | ||||||||||||||||||||
| "route_index": route.get("routeIndex", 0), | ||||||||||||||||||||
| "distance": float(route.get("distance", 0)), | ||||||||||||||||||||
| "duration": float(route.get("duration", 0)), | ||||||||||||||||||||
| "travel_mode": route.get("travelMode", "driving"), | ||||||||||||||||||||
| "weather_points": json.dumps(route.get("weatherPoints", [])), | ||||||||||||||||||||
| "aqi_points": json.dumps(route.get("aqiPoints", [])), | ||||||||||||||||||||
| "traffic_value": float(route.get("trafficValue", 0)), | ||||||||||||||||||||
| "last_computed_score": route.get("lastComputedScore") | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Create Pathway table from data | ||||||||||||||||||||
| input_table = pw.debug.table_from_markdown( | ||||||||||||||||||||
| _markdown_from_data(table_data), | ||||||||||||||||||||
| schema=RouteInputSchema | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
Comment on lines
+143
to
+146
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find . -name "pipeline.py" -path "*/pathway/*" | head -5Repository: kaihere14/BreathClean Length of output: 127 🏁 Script executed: cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '140,170p'Repository: kaihere14/BreathClean Length of output: 1160 🏁 Script executed: cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '240,270p'Repository: kaihere14/BreathClean Length of output: 1085 🏁 Script executed: grep -n "_markdown_from_data" data-processing/dataProcessingServer/api/pathway/pipeline.pyRepository: kaihere14/BreathClean Length of output: 163 🏁 Script executed: grep -n "RouteInputSchema" data-processing/dataProcessingServer/api/pathway/pipeline.py | head -10Repository: kaihere14/BreathClean Length of output: 206 🏁 Script executed: cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '1,50p'Repository: kaihere14/BreathClean Length of output: 1833 🏁 Script executed: grep -n "table_data\|weather_points\|aqi_points" data-processing/dataProcessingServer/api/pathway/pipeline.py | head -20Repository: kaihere14/BreathClean Length of output: 1068 🏁 Script executed: grep -n "table_from_markdown\|table_from_pandas\|split_on_whitespace" data-processing/dataProcessingServer/api/pathway/pipeline.pyRepository: kaihere14/BreathClean Length of output: 116 🏁 Script executed: python3 << 'EOF'
import sys
try:
import pathway as pw
# Check if table_from_markdown has split_on_whitespace parameter
import inspect
sig = inspect.signature(pw.debug.table_from_markdown)
print("table_from_markdown signature:")
print(sig)
print("\nParameters:")
for param_name, param in sig.parameters.items():
print(f" {param_name}: {param.annotation if param.annotation != inspect.Parameter.empty else 'no annotation'}")
except ImportError:
print("Pathway not available in this environment")
except Exception as e:
print(f"Error: {e}")
EOFRepository: kaihere14/BreathClean Length of output: 106 🏁 Script executed: find . -name "*.py" -type f | xargs grep -l "table_from_markdown\|table_from_pandas" | head -10Repository: kaihere14/BreathClean Length of output: 127 🏁 Script executed: grep -r "split_on_whitespace" . --include="*.py" 2>/dev/nullRepository: kaihere14/BreathClean Length of output: 47 🌐 Web query:
💡 Result: In Pathway
Source: [1] 🌐 Web query:
💡 Result:
Docs page: Pathway API Docs → Signature pw.debug.table_from_markdown(
table_def,
id_from=None,
unsafe_trusted_ids=False,
schema=None,
*,
split_on_whitespace=True,
) -> pw.Table
```[1][2]
**Parameters**
- `table_def` (`str`): Markdown-like table text (header row + data rows).[1][2]
- `id_from` (`list[str] | None`): Column(s) to build row ids from (primary key). Mutually exclusive with `schema`.[2]
- `unsafe_trusted_ids` (`bool`): Passed through to the underlying datasource options.[1][2]
- `schema` (`type[pw.Schema] | None`): Explicit schema; if set, must match the table’s ordinary (non-pseudo) columns; mutually exclusive with `id_from`.[2]
- `split_on_whitespace` (`bool`, keyword-only): Default `True`. If `True`, also splits columns on whitespace; set `False` if you need string cells containing spaces.[1][2]
**Special pseudo-columns**
- `__time__`: If present, rows are split into batches by this timestamp column.[1][2]
- `__diff__`: If present, event type: `1` = insert, `-1` = delete.[1][2]
**Returns**
- A `pw.Table`.[2]
Sources: Pathway `pw.debug` docs[1] and the implementation docstring/signature in `pathway/debug/__init__.py`[2].
---
🏁 Script executed:
```shell
cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '1,35p'Repository: kaihere14/BreathClean Length of output: 1268 🏁 Script executed: cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '100,200p'Repository: kaihere14/BreathClean Length of output: 4071 🏁 Script executed: python3 << 'EOF'
try:
import pathway as pw
# Check if table_from_pandas exists
if hasattr(pw.debug, 'table_from_pandas'):
import inspect
sig = inspect.signature(pw.debug.table_from_pandas)
print("table_from_pandas exists!")
print("Signature:", sig)
else:
print("table_from_pandas not found in pw.debug")
print("Available methods in pw.debug:")
print([m for m in dir(pw.debug) if not m.startswith('_')])
except ImportError:
print("Pathway not available in this environment")
EOFRepository: kaihere14/BreathClean Length of output: 106 🏁 Script executed: python3 << 'EOF'
# Simulate the markdown parsing behavior
test_markdown = """route_id | weather_points
route1 | [{"main": {"temp": 22}}]
"""
# With split_on_whitespace=True (default), splitting happens like this:
lines = test_markdown.strip().split('\n')
header_line = lines[0]
data_line = lines[1]
print("Original markdown:")
print(test_markdown)
print("\nHeader line:", header_line)
print("Data line:", data_line)
print("\n--- With split_on_whitespace=True (current behavior) ---")
# Simulating default behavior - split on whitespace creates extra columns
parts = data_line.split('|')
print(f"Parts split by '|': {parts}")
for part in parts:
tokens = part.strip().split()
print(f" Tokens in '{part.strip()}': {tokens}")
print("\n--- With split_on_whitespace=False (proposed fix) ---")
print("Values would be preserved as-is, no extra splitting on whitespace")
EOFRepository: kaihere14/BreathClean Length of output: 577 Add
Fix: Add the parameter to the input_table = pw.debug.table_from_markdown(
_markdown_from_data(table_data),
schema=RouteInputSchema,
+ split_on_whitespace=False
)Also applies to line 291 (similar call in 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Apply transformation using UDF | ||||||||||||||||||||
| @pw.udf | ||||||||||||||||||||
| def compute_score_udf( | ||||||||||||||||||||
| route_id: str, | ||||||||||||||||||||
| route_index: int, | ||||||||||||||||||||
| distance: float, | ||||||||||||||||||||
| duration: float, | ||||||||||||||||||||
| travel_mode: str, | ||||||||||||||||||||
| weather_points: str, | ||||||||||||||||||||
| aqi_points: str, | ||||||||||||||||||||
| traffic_value: float, | ||||||||||||||||||||
| last_computed_score: Optional[float] | ||||||||||||||||||||
| ) -> str: | ||||||||||||||||||||
| return process_route_row( | ||||||||||||||||||||
| route_id, route_index, distance, duration, travel_mode, | ||||||||||||||||||||
| weather_points, aqi_points, traffic_value, last_computed_score | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Transform table | ||||||||||||||||||||
| result_table = input_table.select( | ||||||||||||||||||||
| score_json=compute_score_udf( | ||||||||||||||||||||
| input_table.route_id, | ||||||||||||||||||||
| input_table.route_index, | ||||||||||||||||||||
| input_table.distance, | ||||||||||||||||||||
| input_table.duration, | ||||||||||||||||||||
| input_table.travel_mode, | ||||||||||||||||||||
| input_table.weather_points, | ||||||||||||||||||||
| input_table.aqi_points, | ||||||||||||||||||||
| input_table.traffic_value, | ||||||||||||||||||||
| input_table.last_computed_score | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Run pipeline and collect results | ||||||||||||||||||||
| results = pw.debug.table_to_pandas(result_table) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Parse results back to dictionaries | ||||||||||||||||||||
| route_scores = [] | ||||||||||||||||||||
| for _, row in results.iterrows(): | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| score_data = json.loads(row["score_json"]) | ||||||||||||||||||||
| route_scores.append(score_data) | ||||||||||||||||||||
| except (json.JSONDecodeError, KeyError): | ||||||||||||||||||||
| continue | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if not route_scores: | ||||||||||||||||||||
| return { | ||||||||||||||||||||
| "success": False, | ||||||||||||||||||||
| "message": "Failed to compute scores", | ||||||||||||||||||||
| "routes": [], | ||||||||||||||||||||
| "summary": None | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Sort by route index | ||||||||||||||||||||
| route_scores.sort(key=lambda x: x.get("routeIndex", 0)) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Find best route | ||||||||||||||||||||
| best_route = max(route_scores, key=lambda r: r.get("overallScore", 0)) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Calculate summary statistics | ||||||||||||||||||||
| overall_scores = [r.get("overallScore", 0) for r in route_scores] | ||||||||||||||||||||
| avg_score = round(sum(overall_scores) / len(overall_scores), 1) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from datetime import datetime, timezone | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return { | ||||||||||||||||||||
| "success": True, | ||||||||||||||||||||
| "message": "Batch scores computed via Pathway pipeline", | ||||||||||||||||||||
| "routes": route_scores, | ||||||||||||||||||||
| "bestRoute": { | ||||||||||||||||||||
| "index": best_route.get("routeIndex"), | ||||||||||||||||||||
| "routeId": best_route.get("routeId"), | ||||||||||||||||||||
| "score": best_route.get("overallScore") | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| "summary": { | ||||||||||||||||||||
| "totalRoutes": len(route_scores), | ||||||||||||||||||||
| "averageScore": avg_score, | ||||||||||||||||||||
| "scoreRange": { | ||||||||||||||||||||
| "min": min(overall_scores), | ||||||||||||||||||||
| "max": max(overall_scores) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| "computedAt": datetime.now(timezone.utc).isoformat(), | ||||||||||||||||||||
| "engine": "pathway" | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def run_simple_batch(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Simplified batch processing without full Pathway table operations. | ||||||||||||||||||||
| Falls back to direct transformer calls for reliability. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Use this if Pathway table operations cause issues. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| return compute_batch_scores(routes_data) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def _markdown_from_data(data: List[Dict]) -> str: | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Convert list of dicts to markdown table format for Pathway. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| if not data: | ||||||||||||||||||||
| return "" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| headers = list(data[0].keys()) | ||||||||||||||||||||
| lines = [" | ".join(headers)] | ||||||||||||||||||||
| lines.append(" | ".join(["---"] * len(headers))) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for row in data: | ||||||||||||||||||||
| values = [] | ||||||||||||||||||||
| for h in headers: | ||||||||||||||||||||
| val = row.get(h) | ||||||||||||||||||||
| if val is None: | ||||||||||||||||||||
| values.append("") | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| # Escape pipe characters in values | ||||||||||||||||||||
| values.append(str(val).replace("|", "\\|")) | ||||||||||||||||||||
| lines.append(" | ".join(values)) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return "\n".join(lines) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Streaming mode (for future use) | ||||||||||||||||||||
| def create_streaming_pipeline(input_connector, output_connector): | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Create a streaming Pathway pipeline. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| This is for future use when we want real-time streaming updates | ||||||||||||||||||||
| instead of batch processing. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| # Read from input connector | ||||||||||||||||||||
| input_table = pw.io.jsonlines.read( | ||||||||||||||||||||
| input_connector, | ||||||||||||||||||||
| schema=RouteInputSchema, | ||||||||||||||||||||
| mode="streaming" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @pw.udf | ||||||||||||||||||||
| def compute_score_streaming( | ||||||||||||||||||||
| route_id: str, | ||||||||||||||||||||
| route_index: int, | ||||||||||||||||||||
| distance: float, | ||||||||||||||||||||
| duration: float, | ||||||||||||||||||||
| travel_mode: str, | ||||||||||||||||||||
| weather_points: str, | ||||||||||||||||||||
| aqi_points: str, | ||||||||||||||||||||
| traffic_value: float, | ||||||||||||||||||||
| last_computed_score: Optional[float] | ||||||||||||||||||||
| ) -> str: | ||||||||||||||||||||
| return process_route_row( | ||||||||||||||||||||
| route_id, route_index, distance, duration, travel_mode, | ||||||||||||||||||||
| weather_points, aqi_points, traffic_value, last_computed_score | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Transform | ||||||||||||||||||||
| result_table = input_table.select( | ||||||||||||||||||||
| score_json=compute_score_streaming( | ||||||||||||||||||||
| input_table.route_id, | ||||||||||||||||||||
| input_table.route_index, | ||||||||||||||||||||
| input_table.distance, | ||||||||||||||||||||
| input_table.duration, | ||||||||||||||||||||
| input_table.travel_mode, | ||||||||||||||||||||
| input_table.weather_points, | ||||||||||||||||||||
| input_table.aqi_points, | ||||||||||||||||||||
| input_table.traffic_value, | ||||||||||||||||||||
| input_table.last_computed_score | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Write to output | ||||||||||||||||||||
| pw.io.jsonlines.write(result_table, output_connector) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return result_table | ||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.