Skip to content

Commit

Permalink
iceberg output connector docs (#7937)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 4ff44c03ec67cd96ca551d2a8e3efa7d580a6d2c
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Dec 31, 2024
1 parent 0fdae0c commit 5cb3377
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Added
- `pw.io.iceberg.write` method for writing Pathway tables into Apache Iceberg.

### Changed
- values of non-deterministic UDFs are not stored in tables that are `append_only`.
- `pw.Table.ix` has better runtime error message that includes id of the missing row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Before going into more details about the different connectors and how they work,
<span class="block"><a href="/developers/user-guide/connect/connectors/fs-connector">File System</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/pubsub">Google PubSub</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/http">http</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/iceberg">Iceberg</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/jsonlines-connector">JSON Lines</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/kafka_connectors">Kafka</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/nats">NATS</a></span>
Expand Down
66 changes: 66 additions & 0 deletions python/pathway/io/iceberg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,72 @@ def write(
warehouse: str | None = None,
min_commit_frequency: int | None = 60_000,
):
"""
Writes the stream of changes from ``table`` into `Iceberg <https://iceberg.apache.org/>`_
data storage. The data storage must be defined with the REST catalog URI, the namespace,
and the table name.
If the namespace or the table doesn't exist, they will be created by the connector.
The schema of the new table is inferred from the ``table``'s schema. The output table
must include two additional integer columns: ``time``, representing the computation
minibatch, and ``diff``, indicating the type of change (``1`` for row addition and
``-1`` for row deletion).
Args:
table: Table to be written.
catalog_uri: URI of the Iceberg REST catalog.
namespace: The name of the namespace containing the target table. If the namespace
doesn't exist, it will be created by the connector.
table_name: The name of the table to be written. If a table with such a name
doesn't exist, it will be created by the connector.
warehouse: Optional, path to the Iceberg storage warehouse.
min_commit_frequency: Specifies the minimum time interval between two data
commits in storage, measured in milliseconds. If set to ``None``, finalized
minibatches will be committed as soon as possible. Keep in mind that each
commit in Iceberg creates a new Parquet file and writes an entry in the
transaction log. Therefore, it is advisable to limit the frequency of commits
to reduce the overhead of processing the resulting table.
Returns:
None
Example:
Consider a users data table stored locally in a file called ``users.txt`` in CSV format.
The Iceberg output connector provides the capability to place this table into
Iceberg storage, defined by the catalog with URI ``http://localhost:8181``. The target
table is ``users``, located in the ``app`` namespace.
First, the table must be read. To do this, you need to define the schema. For
simplicity, consider that it consists of two fields: the user ID and the name.
The schema definition may look as follows:
>>> import pathway as pw
>>> class InputSchema(pw.Schema):
... user_id: int
... name: str
Using this schema, you can read the table from the input file. You need to use the
``pw.io.csv.read`` connector. Here, you can use the static mode since the text file
with the users doesn't change dynamically.
>>> users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")
Once the table is read, you can use ``pw.io.iceberg.write`` to save this table into
Iceberg storage.
>>> pw.io.iceberg.write(
... users,
... catalog_uri="http://localhost:8181/",
... namespace=["app"],
... table_name="users",
... )
Don't forget to run your program with ``pw.run`` once you define all necessary
computations. After execution, you will be able to see the users' data in the
Iceberg storage.
"""
_check_entitlements("iceberg")
data_storage = api.DataStorage(
storage_type="iceberg",
Expand Down

0 comments on commit 5cb3377

Please sign in to comment.