-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_session.incn
More file actions
434 lines (341 loc) · 19.5 KB
/
Copy pathtest_session.incn
File metadata and controls
434 lines (341 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
"""RFC 004 Phase 2: Session construction and backend seam."""
from std.testing import assert_is_err, assert_is_ok, fail, parametrize
from rust::std::path import Path
from backends import DataFusion, csv_source, datafusion_backend_selection, parquet_source
from dataset import LazyFrame, lazy_frame_named_table
from session import Session, SessionError, SessionErrorKind
from substrait.inspect import read_kind_name, root_rel
@derive(Clone)
model Order:
id: int
# --- Constants, test fixtures, helper functions ---
const TESTS_DIR: str = "tests/"
const FIXTURE_DIR: str = TESTS_DIR + "fixtures/"
const TARGET_DIR: str = TESTS_DIR + "target/"
const ORDERS_CSV_FIXTURE: str = FIXTURE_DIR + "orders.csv"
const ORDERS_PARQUET_FIXTURE: str = FIXTURE_DIR + "orders.parquet"
const ORDERS_ARROW_FIXTURE: str = FIXTURE_DIR + "orders.arrow"
def _target_uri(name: str) -> str:
return TARGET_DIR + name
def _touch[T](x: T) -> None:
"""
Helper function to touch a value of any type without doing anything with it.
Useful for ensuring that types are constructible and callable without triggering unused variable warnings.
"""
pass
# --- Test cases for Session construction, backend selection, and API behavior ---
def test_session__default_uses_datafusion_backend() -> None:
"""Session.default() should select the DataFusion backend with its default configuration settings."""
# -- Arrange & Act --
session = Session.default()
backend: DataFusion = session.datafusion_backend()
# -- Assert --
assert session.backend_name() == "datafusion", "Session.default() should select the DataFusion backend"
assert backend.enable_optimizer is true, "default DataFusion config should enable optimizer behavior"
def test_session__builder_selects_datafusion_backend() -> None:
"""Session.builder().with_datafusion should select the DataFusion backend and preserve backend-specific options."""
# -- Arrange & Act --
session = Session.builder().with_datafusion(DataFusion(enable_optimizer=false)).build()
backend: DataFusion = session.datafusion_backend()
# -- Assert --
assert session.backend_name() == "datafusion", "builder-selected backend should remain DataFusion in the first RFC 004 slice"
assert backend.enable_optimizer is false, "builder should preserve backend-specific DataFusion options"
def test_session__builder_accepts_portable_backend_selection() -> None:
"""Session.builder().with_backend should accept an adapter-neutral backend selection envelope."""
# -- Arrange --
selection = datafusion_backend_selection(DataFusion(enable_optimizer=false))
# -- Act --
session = Session.builder().with_backend(selection).build()
backend: DataFusion = session.datafusion_backend()
# -- Assert --
assert session.backend_name() == "datafusion", "generic backend selection should preserve the selected kind"
assert backend.enable_optimizer is false, "generic backend selection should preserve encoded backend options"
def test_session__public_types_construct_locally() -> None:
"""
Session and its public API types should be constructible and callable locally without unexpected errors,
even if they are primarily designed for cross-language interop.
"""
# -- Arrange --
session = Session.default()
builder = Session.builder()
# -- Act --
_touch(session)
_touch(builder)
# -- Assert --
# Compile-shape test: construction and generic touch calls typecheck for public Session surface.
pass
def test_session__register_tracks_named_sources() -> None:
"""Session.register should retain registered source information and make it discoverable through the Session API."""
# -- Arrange --
mut session = Session.default()
# -- Act --
assert_is_ok(session.register("orders", csv_source(ORDERS_CSV_FIXTURE)), "orders registration should succeed")
assert_is_ok(
session.register("orders_archive", parquet_source(ORDERS_PARQUET_FIXTURE)),
"orders_archive registration should succeed",
)
# -- Assert --
assert session.registration_count() == 2, "register should retain two source bindings"
assert session.has_registration("orders") is true, "registered logical name should be discoverable"
assert session.has_registration("orders_archive") is true, "both registered logical names should be discoverable"
assert session.registered_source_kind("orders") == "csv", "csv source kind should be preserved"
assert session.registered_source_kind("orders_archive") == "parquet", "parquet source kind should be preserved"
def test_session__table_returns_prism_backed_named_table_plan() -> None:
"""
Session.table should return a Prism-backed logical plan rooted in a NamedTable that corresponds to the registered
source.
"""
# -- Arrange --
mut session = Session.default()
assert_is_ok(session.register("orders", csv_source(ORDERS_CSV_FIXTURE)), "orders registration should succeed")
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(session.table("orders"), "registered table lookup should succeed")
# -- Assert --
assert read_kind_name(root_rel(lazy.to_substrait_plan())) == "NamedTable", "table lookup should create a logical named-table read"
assert len(lazy.schema().declared_columns) > 0, "registered CSV tables should bind declared columns for downstream execution"
def test_session__read_csv_registers_source_and_returns_named_table_plan() -> None:
"""
Session.read_csv should register the CSV source and return a Prism-backed logical plan rooted in a NamedTable
that corresponds to the provided logical name.
"""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
declared_cols = lazy.declared_columns()
planned_cols = lazy.planned_columns()
lazy_schema = lazy.schema()
# -- Assert --
assert session.registration_count() == 1, "read_csv should register one CSV source"
assert session.has_registration("orders") is true, "read_csv should register the provided logical name"
assert session.registered_source_kind("orders") == "csv", "read_csv should register CSV source kind"
assert read_kind_name(root_rel(lazy.to_substrait_plan())) == "NamedTable", "read_csv should still lower through a logical named-table root"
assert len(declared_cols) > 0, "read_csv should infer declared columns from the CSV header"
assert declared_cols[0] == "id", "declared columns should preserve source column order"
assert len(planned_cols) == len(declared_cols), "lazy planned_columns() should currently mirror declared columns"
assert len(lazy.columns()) == len(planned_cols), "lazy columns() should mirror planned columns in deferred mode"
assert len(lazy_schema.declared_columns) == len(declared_cols), "lazy schema() should surface declared columns"
assert len(lazy_schema.planned_columns) == len(planned_cols), "lazy schema() should surface planned columns"
assert len(lazy_schema.resolved_columns) == 0, "lazy schema() should keep resolved columns empty pre-collect"
def test_session__read_csv_rejects_duplicate_logical_names() -> None:
"""read_csv should reject duplicate logical registration names with a typed duplicate_registration error."""
# -- Arrange --
mut session = Session.default()
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"initial orders CSV read should succeed",
)
_touch(lazy)
# -- Act --
duplicate_result: Result[LazyFrame[Order], SessionError] = session.read_csv("orders", ORDERS_CSV_FIXTURE)
err = assert_is_err(duplicate_result, "duplicate read_csv logical name should fail")
# -- Assert --
assert err.kind == SessionErrorKind.DuplicateRegistration, "read_csv should surface duplicate-registration errors"
def test_session__read_parquet_registers_source_and_executes_named_table_plan() -> None:
"""read_parquet should register a parquet source and execute through a named-table logical plan."""
# -- Arrange --
mut session = Session.default()
parquet_uri = _target_uri("session_read_parquet_input")
# -- Act --
csv_lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders_csv", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
assert_is_ok(session.write_parquet(csv_lazy, parquet_uri), "CSV fixture should be written as Parquet")
lazy: LazyFrame[Order] = assert_is_ok(
session.read_parquet("orders_parquet", parquet_uri),
"generated Parquet fixture should load",
)
# -- Assert --
assert session.has_registration("orders_parquet") is true, "read_parquet should register the provided logical name"
assert session.registered_source_kind("orders_parquet") == "parquet", "read_parquet should register Parquet source kind"
assert read_kind_name(root_rel(lazy.to_substrait_plan())) == "NamedTable", "read_parquet should lower through a logical named-table root"
assert_is_ok(session.execute(lazy), "generated Parquet fixture should execute")
def test_session__read_arrow_registers_source_and_returns_named_table_plan() -> None:
"""read_arrow should register an Arrow source and return a named-table rooted lazy plan."""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_arrow("orders_arrow", ORDERS_ARROW_FIXTURE),
"Arrow fixture should load",
)
# -- Assert --
assert session.registration_count() == 1, "read_arrow should register one Arrow source"
assert session.has_registration("orders_arrow") is true, "read_arrow should register the provided logical name"
assert session.registered_source_kind("orders_arrow") == "arrow", "read_arrow should register Arrow source kind"
assert read_kind_name(root_rel(lazy.to_substrait_plan())) == "NamedTable", "read_arrow should lower through a logical named-table root"
def test_session__execute_runs_plan_and_preserves_carrier_shape() -> None:
"""execute should run the backend path and preserve the deferred lazy carrier shape on success."""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
executed = assert_is_ok(session.execute(lazy), "named-table execution should succeed")
# -- Assert --
assert read_kind_name(root_rel(executed.to_substrait_plan())) == "NamedTable", "execute should preserve deferred named-table carrier shape on success"
def test_session__collect_returns_materialized_dataframe() -> None:
"""collect should return a DataFrame with structured materialization metadata and preview text."""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
df = assert_is_ok(session.collect(lazy), "orders collect should succeed")
declared_cols = df.declared_columns()
planned_cols = df.planned_columns()
resolved_cols = df.resolved_columns()
schema = df.schema()
# -- Assert --
assert read_kind_name(root_rel(df.to_substrait_plan())) == "NamedTable", "collect should return a typed DataFrame carrier"
assert len(df.preview_text()) > 0, "collect should populate a non-empty preview render for display/debug paths"
assert df.row_count() > 0, "collect should record structural row-count metadata"
assert len(declared_cols) > 0, "collect should preserve declared schema columns"
assert len(planned_cols) == len(declared_cols), "DataFrame planned_columns() should currently mirror declared columns"
assert len(resolved_cols) > 0, "collect should expose resolved columns from structured materialization metadata"
assert len(df.columns()) == len(resolved_cols), "DataFrame columns() should prefer resolved columns when available"
assert resolved_cols[0] == "id", "resolved columns should preserve output column order"
assert len(schema.declared_columns) == len(declared_cols), "schema() should surface declared columns"
assert len(schema.planned_columns) == len(planned_cols), "schema() should surface planned columns"
assert len(schema.resolved_columns) == len(resolved_cols), "schema() should surface resolved columns"
def test_session__lazy_collect_fails_without_active_session() -> None:
"""lazy.collect should fail with no_active_session when no Session has been activated."""
# -- Arrange --
Session.clear_active_session()
lazy: LazyFrame[Order] = lazy_frame_named_table("orders_never_registered_in_active_session_test")
# -- Act --
err = assert_is_err(lazy.collect(), "lazy.collect should fail without an active Session")
# -- Assert --
assert err.kind == SessionErrorKind.NoActiveSession, "lazy.collect should fail with no_active_session when no session is active"
def test_session__activate_enables_lazy_collect_with_live_session() -> None:
"""activate should bind the live session so lazy.collect delegates through it successfully."""
# -- Arrange --
Session.clear_active_session()
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
session.activate()
df = assert_is_ok(lazy.collect(), "lazy.collect should delegate through the active session")
# -- Assert --
assert read_kind_name(root_rel(df.to_substrait_plan())) == "NamedTable", "lazy.collect should delegate through the active session"
def test_session__lazy_collect_uses_currently_active_session() -> None:
"""lazy.collect should resolve against the currently active session, not the session that built the lazy plan."""
# -- Arrange --
Session.clear_active_session()
mut source_session = Session.default()
other_session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
source_session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
source_session.activate()
other_session.activate()
err = assert_is_err(lazy.collect(), "lazy.collect should resolve against the currently active session")
# -- Assert --
assert err.kind == SessionErrorKind.UnknownTable, "active-session switching should route collect through the latest active session"
def test_session__clear_active_session_forces_no_active_session() -> None:
"""clear_active_session should remove the active binding so subsequent lazy.collect fails with no_active_session."""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
session.activate()
Session.clear_active_session()
err = assert_is_err(lazy.collect(), "clear_active_session should remove active-session binding")
# -- Assert --
assert err.kind == SessionErrorKind.NoActiveSession, "clear_active_session should force no_active_session for lazy.collect"
def test_session__read_transform_collect_write_csv_end_to_end() -> None:
"""End-to-end flow should read, transform, collect, and then write CSV successfully."""
# -- Arrange --
mut session = Session.default()
output_uri = _target_uri("session_collect_then_write_output.csv")
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
transformed = lazy.limit(2)
df = assert_is_ok(session.collect(transformed.clone()), "transformed collect should succeed")
assert_is_ok(session.write_csv(transformed, output_uri), "write_csv should succeed after collect")
# -- Assert --
assert len(df.preview_text()) > 0, "collect should materialize preview text before downstream sink operations"
assert df.row_count() > 0, "collect should record row-count metadata before downstream sink operations"
assert Path.new(output_uri).exists() is true, "write_csv should still succeed on the Session path after collect"
@parametrize("sink_kind, output_name", [("csv", "session_write_csv_output.csv"), ("parquet", "session_write_parquet_output")])
def test_session__write_succeeds_for_deferred_lazyframe(sink_kind: str, output_name: str) -> None:
"""write sinks should execute and create output artifacts from a deferred LazyFrame."""
# -- Arrange --
mut session = Session.default()
output_uri = _target_uri(output_name)
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
match sink_kind:
"csv" => assert_is_ok(session.write_csv(lazy, output_uri), "CSV write should succeed")
"parquet" => assert_is_ok(session.write_parquet(lazy, output_uri), "Parquet write should succeed")
_ => fail(f"unexpected sink kind: {sink_kind}")
# -- Assert --
assert Path.new(output_uri).exists() is true, "write should create the requested output artifact"
def test_session__write_rejects_empty_sink_uri() -> None:
"""write_csv should reject empty sink URIs with a typed invalid_sink error."""
# -- Arrange --
mut session = Session.default()
# -- Act --
lazy: LazyFrame[Order] = assert_is_ok(
session.read_csv("orders", ORDERS_CSV_FIXTURE),
"orders CSV fixture should load",
)
err = assert_is_err(session.write_csv(lazy, ""), "empty CSV sink URI should fail")
# -- Assert --
assert err.kind == SessionErrorKind.InvalidSink, "write_csv should reject empty sink URIs"
def test_session__register_rejects_duplicates_with_typed_error() -> None:
"""register should reject duplicate logical names with duplicate_registration."""
# -- Arrange --
mut session = Session.default()
assert_is_ok(
session.register("orders", csv_source(ORDERS_CSV_FIXTURE)),
"initial orders registration should succeed",
)
# -- Act --
err = assert_is_err(session.register("orders", csv_source(ORDERS_CSV_FIXTURE)), "duplicate registration should fail")
# -- Assert --
assert err.kind == SessionErrorKind.DuplicateRegistration, "duplicate names should surface a typed registration error"
def test_session__table_reports_unknown_name_with_typed_error() -> None:
"""table should return unknown_table when the logical name is not registered."""
# -- Arrange --
session = Session.default()
# -- Act --
table_result: Result[LazyFrame[Order], SessionError] = session.table("missing")
err = assert_is_err(table_result, "unknown table lookup should fail")
# -- Assert --
assert err.kind == SessionErrorKind.UnknownTable, "unknown table lookup should surface a typed binding error"
def test_session__empty_registration_input_is_rejected() -> None:
"""register should reject empty logical names and empty source URIs with invalid_registration."""
# -- Arrange --
mut session = Session.default()
# -- Act --
empty_name_err = assert_is_err(
session.register("", csv_source(ORDERS_CSV_FIXTURE)),
"empty logical name should fail",
)
empty_uri_err = assert_is_err(session.register("orders", csv_source("")), "empty source URI should fail")
# -- Assert --
assert empty_name_err.kind == SessionErrorKind.InvalidRegistration, "empty logical names should be rejected"
assert empty_uri_err.kind == SessionErrorKind.InvalidRegistration, "empty source uris should be rejected"