Skip to content

Commit f438545

Browse files
committed
Fix reading rows when using interactive (streaming) queries
1 parent 9c6feba commit f438545

File tree

5 files changed

+855
-14
lines changed

5 files changed

+855
-14
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix reading rows when using interactive (streaming) queries
3+
time: 2025-10-30T00:10:10.751126+01:00
4+
custom:
5+
Author: Kayrnt
6+
Issue: ""

src/deltastream/api/rows.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ def castRowData(
3838

3939
for i, column in enumerate(columns):
4040
row_data_string = row_data_strings[i]
41-
value = row_data_string.actual_instance
41+
# Handle both ResultSetDataInnerInner objects and plain values (from WebSocket)
42+
if hasattr(row_data_string, 'actual_instance'):
43+
value = row_data_string.actual_instance
44+
else:
45+
value = row_data_string
46+
4247
if value is None:
4348
row_data.append(None)
4449
continue

src/deltastream/api/streaming_rows.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ async def open(self) -> None:
6868

6969
async def handle_messages():
7070
try:
71-
ws = await ws_connect(self.req.uri)
71+
# Convert HTTPS URL to WSS for WebSocket connection
72+
ws_uri = self.req.uri.replace("https://", "wss://", 1)
73+
ws = await ws_connect(ws_uri)
7274
self.ws = ws
7375

7476
# Send authentication
@@ -86,7 +88,27 @@ async def handle_messages():
8688
data = json.loads(message)
8789

8890
if data["type"] == "metadata":
89-
self.metadata = PrintTopicMetadata(**data)
91+
# Convert column dictionaries to Column objects
92+
columns = []
93+
for col in data.get("columns", []):
94+
if isinstance(col, dict):
95+
# Provide default value for nullable if missing
96+
columns.append(Column(
97+
name=col.get("name", ""),
98+
type=col.get("type", ""),
99+
nullable=col.get("nullable", True), # Default to True if not specified
100+
length=col.get("length"),
101+
precision=col.get("precision"),
102+
scale=col.get("scale"),
103+
))
104+
else:
105+
columns.append(col)
106+
107+
self.metadata = PrintTopicMetadata(
108+
type=data["type"],
109+
headers=data.get("headers", {}),
110+
columns=columns,
111+
)
90112
deferred_ready.resolve()
91113

92114
elif data["type"] == "data":
@@ -124,17 +146,7 @@ def columns(self) -> List[Column]:
124146
if self.metadata is None:
125147
return []
126148

127-
return [
128-
Column(
129-
name=column.name,
130-
type=column.type,
131-
nullable=column.nullable,
132-
length=column.length,
133-
precision=column.precision,
134-
scale=column.scale,
135-
)
136-
for column in self.metadata.columns
137-
]
149+
return self.metadata.columns
138150

139151
async def close(self) -> None:
140152
"""Closes the WebSocket connection."""

0 commit comments

Comments
 (0)