Skip to content

Commit

Permalink
operator persistence - basic operators (#7905)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergey <[email protected]>
GitOrigin-RevId: 5ee8ed454ac463957d414a785e713daf3979b64e
  • Loading branch information
2 people authored and Manul from Pathway committed Jan 3, 2025
1 parent 4b5c5c0 commit c21bf24
Show file tree
Hide file tree
Showing 9 changed files with 982 additions and 268 deletions.
308 changes: 305 additions & 3 deletions python/pathway/tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@
import json
import multiprocessing
import os
import pathlib
import time
from typing import Callable

import pandas as pd
import pytest

import pathway as pw
from pathway.internals import api
from pathway.internals.parse_graph import G
from pathway.tests.utils import (
CsvPathwayChecker,
consolidate,
needs_multiprocessing_fork,
run,
wait_result_with_checker,
write_csv,
write_lines,
)


Expand Down Expand Up @@ -212,7 +219,7 @@ def pw_identity_program():
pw.io.jsonlines.write(table, output_path)
pw.run(persistence_config=persistence_config)

file_contents = {}
file_contents: dict[str, str] = {}
next_file_contents = 0
for sequence in scenario:
expected_diffs = []
Expand Down Expand Up @@ -259,8 +266,303 @@ def pw_identity_program():
actual_diffs = []
with open(output_path, "r") as f:
for row in f:
row = json.loads(row)
actual_diffs.append([row["data"], row["diff"]])
row_parsed = json.loads(row)
actual_diffs.append([row_parsed["data"], row_parsed["diff"]])
actual_diffs.sort()
expected_diffs.sort()
assert actual_diffs == expected_diffs


def combine_columns(df: pd.DataFrame) -> pd.Series:
result = None
for column in df.columns:
if column == "time":
continue
if result is None:
result = df[column].astype(str)
else:
result += "," + df[column].astype(str)
return result


def get_one_table_runner(
tmp_path: pathlib.Path,
mode: api.PersistenceMode,
logic: Callable[[pw.Table], pw.Table],
schema: type[pw.Schema],
) -> tuple[Callable[[list[str], set[str]], None], pathlib.Path]:
input_path = tmp_path / "1"
os.makedirs(input_path)
output_path = tmp_path / "out.csv"
persistent_storage_path = tmp_path / "p"
count = 0

def run_computation(inputs, expected):
nonlocal count
count += 1
G.clear()
path = input_path / str(count)
write_lines(path, inputs)
t_1 = pw.io.csv.read(input_path, schema=schema, mode="static")
res = logic(t_1)
pw.io.csv.write(res, output_path)
run(
persistence_config=pw.persistence.Config(
pw.persistence.Backend.filesystem(persistent_storage_path),
persistence_mode=mode,
)
)
result = consolidate(pd.read_csv(output_path))
assert set(combine_columns(result)) == expected

return run_computation, input_path


def get_two_tables_runner(
tmp_path: pathlib.Path,
mode: api.PersistenceMode,
logic: Callable[[pw.Table, pw.Table], pw.Table],
schema: type[pw.Schema],
terminate_on_error: bool = True,
) -> tuple[
Callable[[list[str], list[str], set[str]], None], pathlib.Path, pathlib.Path
]:

input_path_1 = tmp_path / "1"
input_path_2 = tmp_path / "2"
os.makedirs(input_path_1)
os.makedirs(input_path_2)
output_path = tmp_path / "out.csv"
persistent_storage_path = tmp_path / "p"
count = 0

def run_computation(inputs_1, inputs_2, expected):
nonlocal count
count += 1
G.clear()
path_1 = input_path_1 / str(count)
path_2 = input_path_2 / str(count)
write_lines(path_1, inputs_1)
write_lines(path_2, inputs_2)
t_1 = pw.io.csv.read(input_path_1, schema=schema, mode="static")
t_2 = pw.io.csv.read(input_path_2, schema=schema, mode="static")
res = logic(t_1, t_2)
pw.io.csv.write(res, output_path)
run(
persistence_config=pw.persistence.Config(
pw.persistence.Backend.filesystem(persistent_storage_path),
persistence_mode=mode,
),
terminate_on_error=terminate_on_error,
# hack to allow changes from different files at different point in time
)
result = consolidate(pd.read_csv(output_path))
assert set(combine_columns(result)) == expected

return run_computation, input_path_1, input_path_2


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_restrict(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
t_2.promise_universe_is_subset_of(t_1)
return t_1.restrict(t_2)

run, _, input_path_2 = get_two_tables_runner(
tmp_path, mode, logic, InputSchema, terminate_on_error=False
)

run(["a", "1", "2", "3"], ["a", "1"], {"1,1"})
run(["a"], ["a", "3"], {"3,1"})
run(["a", "4", "5"], ["a", "5"], {"5,1"})
run(["a", "6"], ["a", "4", "6"], {"4,1", "6,1"})
os.remove(input_path_2 / "3")
run(["a"], ["a"], {"5,-1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_with_universe_of(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)
b: int

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
return t_1.with_universe_of(t_2).with_columns(c=t_2.b)

run, input_path_1, input_path_2 = get_two_tables_runner(
tmp_path, mode, logic, InputSchema, terminate_on_error=False
)

run(["a,b", "1,2", "2,3"], ["a,b", "1,3", "2,4"], {"1,2,3,1", "2,3,4,1"})
run(["a,b", "3,3", "5,1"], ["a,b", "3,4", "5,0"], {"3,3,4,1", "5,1,0,1"})
os.remove(input_path_1 / "2")
os.remove(input_path_2 / "2")
run(
["a,b", "3,4"],
["a,b", "3,5"],
{
"3,3,4,-1",
"5,1,0,-1",
"3,4,5,1",
},
)


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_intersect(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
return t_1.intersect(t_2)

run, _, input_path_2 = get_two_tables_runner(tmp_path, mode, logic, InputSchema)

run(["a", "1", "2", "3"], ["a", "1"], {"1,1"})
run(["a"], ["a", "3"], {"3,1"})
run(["a", "4", "5"], ["a", "5", "6"], {"5,1"})
run(["a", "6"], ["a", "4"], {"4,1", "6,1"})
os.remove(input_path_2 / "3")
run(["a"], ["a"], {"5,-1", "6,-1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_difference(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
return t_1.difference(t_2)

run, _, input_path_2 = get_two_tables_runner(tmp_path, mode, logic, InputSchema)

run(["a", "1", "2", "3"], ["a", "1"], {"2,1", "3,1"})
run(["a"], ["a", "3"], {"3,-1"})
run(["a", "4", "5"], ["a", "5", "6"], {"4,1"})
run(["a", "6"], ["a", "4"], {"4,-1"})
os.remove(input_path_2 / "3")
run(["a"], ["a"], {"5,1", "6,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_sorting_ix(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)

def logic(t_1: pw.Table) -> pw.Table:
t_1 += t_1.sort(pw.this.a)
t_1_filtered = t_1.filter(pw.this.prev.is_not_none())
return t_1_filtered.select(b=t_1.ix(pw.this.prev).a, a=pw.this.a)

run, input_path = get_one_table_runner(tmp_path, mode, logic, InputSchema)

run(["a", "1", "6"], {"1,6,1"})
run(["a", "3"], {"1,6,-1", "1,3,1", "3,6,1"})
run(["a", "4", "5"], {"3,6,-1", "3,4,1", "4,5,1", "5,6,1"})
os.remove(input_path / "2")
run(["a"], {"1,3,-1", "3,4,-1", "1,4,1"})
run(["a", "2"], {"1,4,-1", "1,2,1", "2,4,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_update_rows(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)
b: int

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
return t_1.update_rows(t_2)

run, _, input_path_2 = get_two_tables_runner(tmp_path, mode, logic, InputSchema)

run(["a,b", "1,2", "2,4"], ["a,b", "1,3", "3,5"], {"1,3,1", "2,4,1", "3,5,1"})
run(["a,b", "3,3"], ["a,b", "2,6", "5,1"], {"2,4,-1", "2,6,1", "5,1,1"})
os.remove(input_path_2 / "1")
run(["a,b"], ["a,b"], {"3,5,-1", "3,3,1", "1,3,-1", "1,2,1"})
run(["a,b", "7,10"], ["a,b", "3,8"], {"3,3,-1", "3,8,1", "7,10,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_update_cells(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)
b: int

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
t_2.promise_universe_is_subset_of(t_1)
return t_1.update_cells(t_2)

run, _, input_path_2 = get_two_tables_runner(
tmp_path, mode, logic, InputSchema, terminate_on_error=False
)

run(["a,b", "1,2", "2,4"], ["a,b", "1,3"], {"1,3,1", "2,4,1"})
run(["a,b", "3,3"], ["a,b", "2,6"], {"2,4,-1", "2,6,1", "3,3,1"})
os.remove(input_path_2 / "1")
run(["a,b"], ["a,b"], {"1,3,-1", "1,2,1"})
run(["a,b", "7,10"], ["a,b", "3,8"], {"3,3,-1", "3,8,1", "7,10,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_join(tmp_path, mode):
class InputSchema(pw.Schema):
a: int = pw.column_definition(primary_key=True)
b: int

def logic(t_1: pw.Table, t_2: pw.Table) -> pw.Table:
return t_1.join(t_2, t_1.a == t_2.a).select(
pw.this.a, b=pw.left.b, c=pw.right.b
)

run, _, input_path_2 = get_two_tables_runner(tmp_path, mode, logic, InputSchema)

run(["a,b", "1,2", "2,4"], ["a,b", "1,3"], {"1,2,3,1"})
run(["a,b", "3,3"], ["a,b", "2,6", "1,4"], {"2,4,6,1", "1,2,4,1"})
os.remove(input_path_2 / "1")
run(["a,b"], ["a,b"], {"1,2,3,-1"})
run(["a,b", "1,4"], ["a,b", "1,8"], {"1,2,8,1", "1,4,8,1", "1,4,4,1"})


@pytest.mark.parametrize(
"mode", [api.PersistenceMode.PERSISTING, api.PersistenceMode.OPERATOR_PERSISTING]
)
def test_groupby(tmp_path, mode):
class InputSchema(pw.Schema):
a: int
b: int

def logic(t_1: pw.Table) -> pw.Table:
return t_1.groupby(pw.this.a).reduce(
pw.this.a,
c=pw.reducers.count(),
s=pw.reducers.sum(pw.this.b),
m=pw.reducers.max(pw.this.b),
)

run, input_path = get_one_table_runner(tmp_path, mode, logic, InputSchema)

run(["a,b", "1,3", "2,4"], {"1,1,3,3,1", "2,1,4,4,1"})
run(["a,b", "1,1"], {"1,1,3,3,-1", "1,2,4,3,1"})
run(["a,b", "2,5"], {"2,1,4,4,-1", "2,2,9,5,1"})
os.remove(input_path / "2")
run(["a,b"], {"1,1,3,3,1", "1,2,4,3,-1"})
run(["a,b", "2,0"], {"2,2,9,5,-1", "2,3,9,5,1"})
30 changes: 30 additions & 0 deletions python/pathway/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,11 @@ def write_lines(path: str | pathlib.Path, data: str | list[str]):
f.writelines(data)


def read_lines(path: str | pathlib.Path) -> list[str]:
with open(path) as f:
return f.readlines()


def get_aws_s3_settings():
return pw.io.s3.AwsS3Settings(
bucket_name="aws-integrationtest",
Expand Down Expand Up @@ -777,3 +782,28 @@ def deprecated_call_here(
*, match: str | re.Pattern[str] | None = None
) -> AbstractContextManager[pytest.WarningsRecorder]:
return warns_here((DeprecationWarning, PendingDeprecationWarning), match=match)


def consolidate(df: pd.DataFrame) -> pd.DataFrame:
values = None
for column in df.columns:
if column in ["time", "diff"]:
continue
if values is None:
values = df[column].astype(str)
else:
values = values + "," + df[column].astype(str)
df["_all_values"] = values

total = {}
for _, row in df.iterrows():
value = row["_all_values"]
if value not in total:
total[value] = 0
total[value] += row["diff"]

for i in range(df.shape[0]):
value = df.at[i, "_all_values"]
df.at[i, "diff"] = total[value]
total[value] = 0
return df[df["diff"] != 0].drop(columns=["_all_values"])
Loading

0 comments on commit c21bf24

Please sign in to comment.