Skip to content

Commit 24826a5

Browse files
authored
Fix reading rows when using interactive (streaming) queries (#15)
1 parent 9c6feba commit 24826a5

File tree

5 files changed

+892
-17
lines changed

5 files changed

+892
-17
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix reading rows when using interactive (streaming) queries
3+
time: 2025-10-30T00:10:10.751126+01:00
4+
custom:
5+
Author: Kayrnt
6+
Issue: ""

src/deltastream/api/rows.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import List, Optional, Any
2+
from typing import List, Optional, Any, Union
33
import re
44
import json
55
from decimal import Decimal
@@ -22,13 +22,13 @@ class Column:
2222

2323

2424
def castRowData(
25-
row_data_strings: List[ResultSetDataInnerInner], columns: List[Column]
25+
row_data_strings: List[Union[ResultSetDataInnerInner, Any]], columns: List[Column]
2626
) -> List[Any]:
2727
"""
2828
Cast row data strings to their appropriate Python types based on column definitions.
2929
3030
Args:
31-
row_data_strings: List of string values or None from the database
31+
row_data_strings: List of ResultSetDataInnerInner objects or plain values from WebSocket
3232
columns: List of Column objects defining the data types
3333
3434
Returns:
@@ -38,7 +38,13 @@ def castRowData(
3838

3939
for i, column in enumerate(columns):
4040
row_data_string = row_data_strings[i]
41-
value = row_data_string.actual_instance
41+
# Handle both ResultSetDataInnerInner objects and plain values (from WebSocket)
42+
value: Any
43+
if hasattr(row_data_string, "actual_instance"):
44+
value = row_data_string.actual_instance
45+
else:
46+
value = row_data_string
47+
4248
if value is None:
4349
row_data.append(None)
4450
continue

src/deltastream/api/streaming_rows.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ async def open(self) -> None:
6868

6969
async def handle_messages():
7070
try:
71-
ws = await ws_connect(self.req.uri)
71+
# Convert HTTPS URL to WSS for WebSocket connection
72+
ws_uri = self.req.uri.replace("https://", "wss://", 1)
73+
ws = await ws_connect(ws_uri)
7274
self.ws = ws
7375

7476
# Send authentication
@@ -86,7 +88,31 @@ async def handle_messages():
8688
data = json.loads(message)
8789

8890
if data["type"] == "metadata":
89-
self.metadata = PrintTopicMetadata(**data)
91+
# Convert column dictionaries to Column objects
92+
columns = []
93+
for col in data.get("columns", []):
94+
if isinstance(col, dict):
95+
# Provide default value for nullable if missing
96+
columns.append(
97+
Column(
98+
name=col.get("name", ""),
99+
type=col.get("type", ""),
100+
nullable=col.get(
101+
"nullable", True
102+
), # Default to True if not specified
103+
length=col.get("length"),
104+
precision=col.get("precision"),
105+
scale=col.get("scale"),
106+
)
107+
)
108+
else:
109+
columns.append(col)
110+
111+
self.metadata = PrintTopicMetadata(
112+
type=data["type"],
113+
headers=data.get("headers", {}),
114+
columns=columns,
115+
)
90116
deferred_ready.resolve()
91117

92118
elif data["type"] == "data":
@@ -124,17 +150,7 @@ def columns(self) -> List[Column]:
124150
if self.metadata is None:
125151
return []
126152

127-
return [
128-
Column(
129-
name=column.name,
130-
type=column.type,
131-
nullable=column.nullable,
132-
length=column.length,
133-
precision=column.precision,
134-
scale=column.scale,
135-
)
136-
for column in self.metadata.columns
137-
]
153+
return self.metadata.columns
138154

139155
async def close(self) -> None:
140156
"""Closes the WebSocket connection."""

0 commit comments

Comments
 (0)