Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4753ee4
feat: integrate Mapbox for enhanced mapping functionality
GURUDAS-DEV Feb 14, 2026
84f3c35
fix: remove error logging from token verification middleware response
GURUDAS-DEV Feb 14, 2026
558e13e
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
e16f2df
feat: Implement interactive map for selecting route source/destinatio…
GURUDAS-DEV Feb 14, 2026
0598536
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
a5c32d8
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
3385dee
feat: Implement interactive Mapbox map component for source and desti…
GURUDAS-DEV Feb 14, 2026
e8e953e
feat: Minute Bug Update
GURUDAS-DEV Feb 14, 2026
0e1e982
feat: implement interactive Mapbox map for location selection on the …
GURUDAS-DEV Feb 14, 2026
e430043
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
b870060
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
0faee97
feat: implement route discovery, comparison, and map visualization wi…
GURUDAS-DEV Feb 14, 2026
b35e17f
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
4650c18
Merge branch 'main' of https://github.com/GURUDAS-DEV/BreathClean
GURUDAS-DEV Feb 14, 2026
b01b129
feat1: implement route discovery, comparison, and map visualization w…
GURUDAS-DEV Feb 14, 2026
4ab28ec
feat: Implement route discovery and comparison features with pollutio…
GURUDAS-DEV Feb 14, 2026
ecbae17
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
a8da103
feat: Implement route discovery, comparison, and saving features with…
GURUDAS-DEV Feb 14, 2026
02db6ae
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
24f4a94
feat: Add user profile page with detailed card, display saved routes,…
GURUDAS-DEV Feb 14, 2026
43a6501
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 14, 2026
da61bb5
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
8d69776
feat: Add public About and Features pages, introduce Navbar and Saved…
GURUDAS-DEV Feb 15, 2026
88bb043
feat: Add a new About Us page and a SavedRouteItemClient component.
GURUDAS-DEV Feb 15, 2026
fd1079a
feat: Add a new About Us page and a SavedRouteItemClient component.
GURUDAS-DEV Feb 15, 2026
7034411
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
a31c58e
feat: implement initial landing page with navigation, mission, how it…
GURUDAS-DEV Feb 15, 2026
bb73943
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
d8803c7
feat: Implement route scoring API including breakpoint calculation an…
GURUDAS-DEV Feb 15, 2026
3d3ad64
feat: Implement API for route score calculation considering weather a…
GURUDAS-DEV Feb 15, 2026
79f1878
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
41881ef
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
4091e1c
feat: Integrate Air Quality Index (AQI) data into route comparison fe…
GURUDAS-DEV Feb 15, 2026
c111433
refactor: Remove outdated AQI documentation and guides
GURUDAS-DEV Feb 15, 2026
38c2d82
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 15, 2026
c7c87f9
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 16, 2026
a2c09e2
feat: Setup Django Project and setup api app and a testing router
GURUDAS-DEV Feb 16, 2026
0f32fd3
Merge branch 'kaihere14:main' into main
GURUDAS-DEV Feb 16, 2026
40a8199
feat: Implement scoring transformers for BreathClean Pathway pipeline
GURUDAS-DEV Feb 17, 2026
1056d1c
feat: Implement a data processing server and scheduler for dynamic ro…
GURUDAS-DEV Feb 20, 2026
f4f2d38
feat: Add data processing pipeline for route score computation using …
GURUDAS-DEV Feb 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions client/app/(private)/home/routes/(from)/(to)/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const RouteContent = () => {
const [selectedRouteIndex, setSelectedRouteIndex] = useState(0);
const [routeName, setRouteName] = useState("");
const [showSaveModal, setShowSaveModal] = useState(false);
const [searchId, setSearchId] = useState<string | null>(null);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
const saveInputRef = useRef<HTMLInputElement>(null);

// Parse query parameters
Expand Down Expand Up @@ -170,6 +171,12 @@ const RouteContent = () => {
}

const data = await response.json();

// Capture the searchId for later use in saving
if (data.searchId) {
setSearchId(data.searchId);
}

const scoredRoutes = data.data?.routes;
if (scoredRoutes && Array.isArray(scoredRoutes)) {
const scores = scoredRoutes.map(
Expand All @@ -194,7 +201,7 @@ const RouteContent = () => {
let bestDuration = Infinity;
overallScores.forEach((score: number, i: number) => {
const dur =
routes[i]?.trafficDuration ?? routes[i]?.duration ?? Infinity;
routeData[i]?.trafficDuration ?? routeData[i]?.duration ?? Infinity;
if (
score > overallScores[bestIndex] ||
(score === overallScores[bestIndex] && dur < bestDuration)
Expand Down Expand Up @@ -393,6 +400,7 @@ const RouteContent = () => {
try {
const payload = {
name: nameToSave,
searchId, // Include the searchId from the computation
from: {
address: sourceAddress,
location: {
Expand All @@ -411,8 +419,7 @@ const RouteContent = () => {
distance: route.distance / 1000,
duration: route.duration / 60,
routeGeometry: route.geometry,
lastComputedScore:
route.overallScore || Math.floor(Math.random() * 100),
lastComputedScore: route.overallScore ?? null,
lastComputedAt: new Date(),
travelMode: selectedMode,
})),
Expand Down
Binary file not shown.
Binary file not shown.
File renamed without changes.
Empty file.
12 changes: 12 additions & 0 deletions data-processing/dataProcessingServer/api/pathway/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Pathway pipeline module for BreathClean.
"""
from .pipeline import run_batch_pipeline, run_simple_batch
from .transformers import compute_route_score, compute_batch_scores

__all__ = [
"run_batch_pipeline",
"run_simple_batch",
"compute_route_score",
"compute_batch_scores"
]
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
322 changes: 322 additions & 0 deletions data-processing/dataProcessingServer/api/pathway/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
"""
Pathway pipeline for BreathClean batch score computation.

This pipeline can run in two modes:
1. Streaming mode: Continuously process incoming data
2. Batch mode: Process a batch of routes and return results

For the hackathon, we use batch mode triggered by HTTP endpoint.

NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which
uses direct Python computation. For production, run in Docker/WSL.
Comment on lines +10 to +11

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring incorrectly limits Pathway to Linux — macOS is also supported.

Pathway is available on macOS and Linux; users of other systems should run Pathway on a Virtual Machine.

-NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which
-uses direct Python computation. For production, run in Docker/WSL.
+NOTE: Pathway is available on Linux and macOS. On Windows, use run_simple_batch() which
+uses direct Python computation. For production on Windows, run in Docker/WSL.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which
uses direct Python computation. For production, run in Docker/WSL.
NOTE: Pathway is available on Linux and macOS. On Windows, use run_simple_batch() which
uses direct Python computation. For production on Windows, run in Docker/WSL.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@data-processing/dataProcessingServer/api/pathway/pipeline.py` around lines 10
- 11, Update the misleading NOTE comment/docstring that currently states
"Pathway requires Linux" to reflect that Pathway is supported on both Linux and
macOS; keep the guidance that Windows users should use run_simple_batch() or run
Pathway inside a VM/Docker/WSL for production. Locate the comment referencing
Pathway and run_simple_batch() in pipeline.py and change the text to mention
macOS explicitly while preserving the existing Windows and production
recommendations.

"""
from typing import List, Dict, Any, Optional
import json

# Try to import Pathway - gracefully handle if not available (Windows)
PATHWAY_AVAILABLE = False
pw = None

try:
import pathway as _pw
# Verify it's the real Pathway package
if hasattr(_pw, 'Schema'):
pw = _pw
PATHWAY_AVAILABLE = True
except (ImportError, AttributeError):
pass

from .transformers import compute_route_score, compute_batch_scores


# Define schema only if Pathway is available
RouteInputSchema = None
if PATHWAY_AVAILABLE and pw is not None:
class RouteInputSchema(pw.Schema):
"""Schema for incoming route data."""
route_id: str
route_index: int
distance: float
duration: float
travel_mode: str
weather_points: str # JSON string of weather data
aqi_points: str # JSON string of AQI data
traffic_value: float
last_computed_score: Optional[float]


def process_route_row(
route_id: str,
route_index: int,
distance: float,
duration: float,
travel_mode: str,
weather_points: str,
aqi_points: str,
traffic_value: float,
last_computed_score: Optional[float]
) -> str:
"""
Process a single route row and compute its score.
Returns JSON string of computed score.
"""
try:
weather_data = json.loads(weather_points) if weather_points else []
aqi_data = json.loads(aqi_points) if aqi_points else []
except json.JSONDecodeError:
weather_data = []
aqi_data = []
Comment thread
coderabbitai[bot] marked this conversation as resolved.

route_data = {
"routeId": route_id,
"routeIndex": route_index,
"distance": distance,
"duration": duration,
"travelMode": travel_mode,
"weatherPoints": weather_data,
"aqiPoints": aqi_data,
"trafficValue": traffic_value,
"lastComputedScore": last_computed_score
}

result = compute_route_score(route_data)
return json.dumps(result)


def run_batch_pipeline(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Run Pathway pipeline in batch mode for a list of routes.

This is the main entry point called by the Django view.
Uses Pathway's batch processing capabilities for efficient computation.

Falls back to simple batch processing if Pathway is not available (Windows).

Args:
routes_data: List of route dictionaries with weather/aqi/traffic data

Returns:
Dictionary with computed scores and summary
"""
# If Pathway is not available, fall back to simple batch processing
if not PATHWAY_AVAILABLE or pw is None:
result = run_simple_batch(routes_data)
result["engine"] = "python-fallback"
result["message"] = "Scores computed (Pathway not available, using Python fallback)"
return result

# For batch processing, we use Pathway's static mode
# This processes all data at once and returns results

if not routes_data:
return {
"success": False,
"message": "No routes provided",
"routes": [],
"summary": None
}

# Convert input to Pathway table format
table_data = []
for route in routes_data:
table_data.append({
"route_id": str(route.get("routeId", "")),
"route_index": route.get("routeIndex", 0),
"distance": float(route.get("distance", 0)),
"duration": float(route.get("duration", 0)),
"travel_mode": route.get("travelMode", "driving"),
"weather_points": json.dumps(route.get("weatherPoints", [])),
"aqi_points": json.dumps(route.get("aqiPoints", [])),
"traffic_value": float(route.get("trafficValue", 0)),
"last_computed_score": route.get("lastComputedScore")
})

# Create Pathway table from data
input_table = pw.debug.table_from_markdown(
_markdown_from_data(table_data),
schema=RouteInputSchema
)
Comment on lines +143 to +146

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "pipeline.py" -path "*/pathway/*" | head -5

Repository: kaihere14/BreathClean

Length of output: 127


🏁 Script executed:

cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '140,170p'

Repository: kaihere14/BreathClean

Length of output: 1160


🏁 Script executed:

cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '240,270p'

Repository: kaihere14/BreathClean

Length of output: 1085


🏁 Script executed:

grep -n "_markdown_from_data" data-processing/dataProcessingServer/api/pathway/pipeline.py

Repository: kaihere14/BreathClean

Length of output: 163


🏁 Script executed:

grep -n "RouteInputSchema" data-processing/dataProcessingServer/api/pathway/pipeline.py | head -10

Repository: kaihere14/BreathClean

Length of output: 206


🏁 Script executed:

cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '1,50p'

Repository: kaihere14/BreathClean

Length of output: 1833


🏁 Script executed:

grep -n "table_data\|weather_points\|aqi_points" data-processing/dataProcessingServer/api/pathway/pipeline.py | head -20

Repository: kaihere14/BreathClean

Length of output: 1068


🏁 Script executed:

grep -n "table_from_markdown\|table_from_pandas\|split_on_whitespace" data-processing/dataProcessingServer/api/pathway/pipeline.py

Repository: kaihere14/BreathClean

Length of output: 116


🏁 Script executed:

python3 << 'EOF'
import sys
try:
    import pathway as pw
    # Check if table_from_markdown has split_on_whitespace parameter
    import inspect
    sig = inspect.signature(pw.debug.table_from_markdown)
    print("table_from_markdown signature:")
    print(sig)
    print("\nParameters:")
    for param_name, param in sig.parameters.items():
        print(f"  {param_name}: {param.annotation if param.annotation != inspect.Parameter.empty else 'no annotation'}")
except ImportError:
    print("Pathway not available in this environment")
except Exception as e:
    print(f"Error: {e}")
EOF

Repository: kaihere14/BreathClean

Length of output: 106


🏁 Script executed:

find . -name "*.py" -type f | xargs grep -l "table_from_markdown\|table_from_pandas" | head -10

Repository: kaihere14/BreathClean

Length of output: 127


🏁 Script executed:

grep -r "split_on_whitespace" . --include="*.py" 2>/dev/null

Repository: kaihere14/BreathClean

Length of output: 47


🌐 Web query:

Pathway table_from_markdown split_on_whitespace parameter documentation

💡 Result:

In Pathway pw.debug.table_from_markdown(...), the parameter:

  • split_on_whitespace: bool = True — when True (default), the markdown rows are parsed by splitting on whitespace. Set split_on_whitespace=False if you need string cell values that contain spaces (so whitespace inside values isn’t used as a delimiter). [1]

Source: [1]


🌐 Web query:

Pathway Python library table_from_markdown API reference

💡 Result:

pw.debug.table_from_markdown (Pathway Python) API reference:

Docs page: Pathway API Docs → pw.debugtable_from_markdown.[1]

Signature

pw.debug.table_from_markdown(
    table_def,
    id_from=None,
    unsafe_trusted_ids=False,
    schema=None,
    *,
    split_on_whitespace=True,
) -> pw.Table
```[1][2]

**Parameters**
- `table_def` (`str`): Markdown-like table text (header row + data rows).[1][2]
- `id_from` (`list[str] | None`): Column(s) to build row ids from (primary key). Mutually exclusive with `schema`.[2]
- `unsafe_trusted_ids` (`bool`): Passed through to the underlying datasource options.[1][2]
- `schema` (`type[pw.Schema] | None`): Explicit schema; if set, must match the tables ordinary (non-pseudo) columns; mutually exclusive with `id_from`.[2]
- `split_on_whitespace` (`bool`, keyword-only): Default `True`. If `True`, also splits columns on whitespace; set `False` if you need string cells containing spaces.[1][2]

**Special pseudo-columns**
- `__time__`: If present, rows are split into batches by this timestamp column.[1][2]
- `__diff__`: If present, event type: `1` = insert, `-1` = delete.[1][2]

**Returns**
- A `pw.Table`.[2]

Sources: Pathway `pw.debug` docs[1] and the implementation docstring/signature in `pathway/debug/__init__.py`[2].

---

🏁 Script executed:

```shell
cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '1,35p'

Repository: kaihere14/BreathClean

Length of output: 1268


🏁 Script executed:

cat -n data-processing/dataProcessingServer/api/pathway/pipeline.py | sed -n '100,200p'

Repository: kaihere14/BreathClean

Length of output: 4071


🏁 Script executed:

python3 << 'EOF'
try:
    import pathway as pw
    # Check if table_from_pandas exists
    if hasattr(pw.debug, 'table_from_pandas'):
        import inspect
        sig = inspect.signature(pw.debug.table_from_pandas)
        print("table_from_pandas exists!")
        print("Signature:", sig)
    else:
        print("table_from_pandas not found in pw.debug")
        print("Available methods in pw.debug:")
        print([m for m in dir(pw.debug) if not m.startswith('_')])
except ImportError:
    print("Pathway not available in this environment")
EOF

Repository: kaihere14/BreathClean

Length of output: 106


🏁 Script executed:

python3 << 'EOF'
# Simulate the markdown parsing behavior
test_markdown = """route_id | weather_points
route1 | [{"main": {"temp": 22}}]
"""

# With split_on_whitespace=True (default), splitting happens like this:
lines = test_markdown.strip().split('\n')
header_line = lines[0]
data_line = lines[1]

print("Original markdown:")
print(test_markdown)

print("\nHeader line:", header_line)
print("Data line:", data_line)

print("\n--- With split_on_whitespace=True (current behavior) ---")
# Simulating default behavior - split on whitespace creates extra columns
parts = data_line.split('|')
print(f"Parts split by '|': {parts}")
for part in parts:
    tokens = part.strip().split()
    print(f"  Tokens in '{part.strip()}': {tokens}")

print("\n--- With split_on_whitespace=False (proposed fix) ---")
print("Values would be preserved as-is, no extra splitting on whitespace")
EOF

Repository: kaihere14/BreathClean

Length of output: 577


Add split_on_whitespace=False to table_from_markdown call — JSON columns with spaces are being whitespace-split into misaligned columns.

pw.debug.table_from_markdown splits column values on whitespace by default. The weather_points and aqi_points columns contain JSON strings (e.g., [{"main": {"temp": 22, "humidity": 55}}]) that include spaces. Without split_on_whitespace=False, each space-separated token becomes a separate column value, corrupting the entire table structure. The Pathway batch path will silently produce corrupted data for any route with non-empty environmental data.

Fix: Add the parameter to the table_from_markdown call at line 143:

    input_table = pw.debug.table_from_markdown(
        _markdown_from_data(table_data),
        schema=RouteInputSchema,
+       split_on_whitespace=False
    )

Also applies to line 291 (similar call in run_batch_from_json).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
input_table = pw.debug.table_from_markdown(
_markdown_from_data(table_data),
schema=RouteInputSchema
)
input_table = pw.debug.table_from_markdown(
_markdown_from_data(table_data),
schema=RouteInputSchema,
split_on_whitespace=False
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@data-processing/dataProcessingServer/api/pathway/pipeline.py` around lines
143 - 146, The pw.debug.table_from_markdown calls are incorrectly using the
default whitespace-splitting behavior which breaks JSON-containing columns;
update both calls to pw.debug.table_from_markdown (the one that builds
input_table using _markdown_from_data and RouteInputSchema and the similar call
inside run_batch_from_json) to pass split_on_whitespace=False so JSON strings
(e.g., weather_points/aqi_points) are not split into separate columns.


# Apply transformation using UDF
@pw.udf
def compute_score_udf(
route_id: str,
route_index: int,
distance: float,
duration: float,
travel_mode: str,
weather_points: str,
aqi_points: str,
traffic_value: float,
last_computed_score: Optional[float]
) -> str:
return process_route_row(
route_id, route_index, distance, duration, travel_mode,
weather_points, aqi_points, traffic_value, last_computed_score
)

# Transform table
result_table = input_table.select(
score_json=compute_score_udf(
input_table.route_id,
input_table.route_index,
input_table.distance,
input_table.duration,
input_table.travel_mode,
input_table.weather_points,
input_table.aqi_points,
input_table.traffic_value,
input_table.last_computed_score
)
)

# Run pipeline and collect results
results = pw.debug.table_to_pandas(result_table)

# Parse results back to dictionaries
route_scores = []
for _, row in results.iterrows():
try:
score_data = json.loads(row["score_json"])
route_scores.append(score_data)
except (json.JSONDecodeError, KeyError):
continue

if not route_scores:
return {
"success": False,
"message": "Failed to compute scores",
"routes": [],
"summary": None
}

# Sort by route index
route_scores.sort(key=lambda x: x.get("routeIndex", 0))

# Find best route
best_route = max(route_scores, key=lambda r: r.get("overallScore", 0))

# Calculate summary statistics
overall_scores = [r.get("overallScore", 0) for r in route_scores]
avg_score = round(sum(overall_scores) / len(overall_scores), 1)

from datetime import datetime, timezone

return {
"success": True,
"message": "Batch scores computed via Pathway pipeline",
"routes": route_scores,
"bestRoute": {
"index": best_route.get("routeIndex"),
"routeId": best_route.get("routeId"),
"score": best_route.get("overallScore")
},
"summary": {
"totalRoutes": len(route_scores),
"averageScore": avg_score,
"scoreRange": {
"min": min(overall_scores),
"max": max(overall_scores)
}
},
"computedAt": datetime.now(timezone.utc).isoformat(),
"engine": "pathway"
}


def run_simple_batch(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Simplified batch processing without full Pathway table operations.
Falls back to direct transformer calls for reliability.

Use this if Pathway table operations cause issues.
"""
return compute_batch_scores(routes_data)


def _markdown_from_data(data: List[Dict]) -> str:
"""
Convert list of dicts to markdown table format for Pathway.
"""
if not data:
return ""

headers = list(data[0].keys())
lines = [" | ".join(headers)]
lines.append(" | ".join(["---"] * len(headers)))

for row in data:
values = []
for h in headers:
val = row.get(h)
if val is None:
values.append("")
else:
# Escape pipe characters in values
values.append(str(val).replace("|", "\\|"))
lines.append(" | ".join(values))

return "\n".join(lines)


# Streaming mode (for future use)
def create_streaming_pipeline(input_connector, output_connector):
"""
Create a streaming Pathway pipeline.

This is for future use when we want real-time streaming updates
instead of batch processing.

Raises:
RuntimeError: If Pathway is not available (e.g., on Windows).
"""
if not PATHWAY_AVAILABLE or pw is None:
raise RuntimeError(
"Pathway is not available. Streaming pipeline requires Pathway "
"(Linux only). Use run_simple_batch() as a fallback or run in "
"Docker/WSL."
)

# Read from input connector
input_table = pw.io.jsonlines.read(
input_connector,
schema=RouteInputSchema,
mode="streaming"
)

@pw.udf
def compute_score_streaming(
route_id: str,
route_index: int,
distance: float,
duration: float,
travel_mode: str,
weather_points: str,
aqi_points: str,
traffic_value: float,
last_computed_score: Optional[float]
) -> str:
return process_route_row(
route_id, route_index, distance, duration, travel_mode,
weather_points, aqi_points, traffic_value, last_computed_score
)

# Transform
result_table = input_table.select(
score_json=compute_score_streaming(
input_table.route_id,
input_table.route_index,
input_table.distance,
input_table.duration,
input_table.travel_mode,
input_table.weather_points,
input_table.aqi_points,
input_table.traffic_value,
input_table.last_computed_score
)
)

# Write to output
pw.io.jsonlines.write(result_table, output_connector)

return result_table
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading