-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
materialize-iceberg: column type migrations #2540
Open
williamhbaker
wants to merge
4
commits into
main
Choose a base branch
from
wb/iceberg-migrations
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Mainly this adds support for column migrations as a concept in the materializer `RunApply` function. The set of fields to be migrated is computed and included in the struct of common updates to be applied for a binding. A few other changes in this commit that are related to some extent, and/or opportunistic changes I wanted to make and couldn't easily split into separate commits: * For namespace creation, often there is a "default" namespace that is set in the endpoint configuration. Although this is usually used by one of the bindings and will be created if it doesn't exist because of that, sometimes no bindings will use it if they all have an alternative namespace set. It can be important for this namespace to exist if, for example, metadata tables must be created in it, so it needs some representation outside the InfoSchema and resource configurations. This changes the endpoint config itself to an interface that must be able to say what the default namespace is, if there is one. * The prior handling for connectors that want to do some extra stuff during `Apply` was a little ugly, and also insufficient for cases where they need to do the extra stuff _before_ the typical apply actions are run. I think it's common enough for these extra actions to be needed (creating a checkpoints table for some SQL materializations, or uploading PySpark scripts for Iceberg) that a specific method for doing them is warranted. So here it is, and it's called `Setup`. * Field compatibility & descriptions are moved to a `MappedTyper` interface to separate concerns a little more. * Fixed the generated "comment" for a field that is a projection of the root document since it looked weird having the pointer be empty.
Adds a random suffix to integration test table names so that more than one CI job run can run them at a time. Otherwise concurrent job runs will delete the testing tables out from under each other and cause the tests to fail. This happens for many of our materialization integration tests, but it's particularly bad for these Iceberg tests since they take so long to run. Sometimes it can be legitimately difficult to get a build to pass CI if many different branches are trying to run the tests.
I recently added this `getHighestFieldID` function since the `(*Schema).HighestFieldID` did not work with schemas containing logical types like `decimal`. It is true that the custom version works where the other one doesn't, but neither one is the correct approach for figuring out which field ID to start with when adding a field to an Iceberg table schema. Since there could have been fields that previously existed but were since deleted in the table, it's not strictly correct to assume that the next highest field ID that has never been used before can be determined from the current schema by just looking at the numbers. There's an Iceberg table property that tracks the highest field ID on the table itself though, and using that should always work. This commit removes the `getHighestFieldID` function - the next commit will update the related schema calculation functions, among other things, as part of supporting column migrations.
Adds support for materialize-iceberg to do column type migrations for all known scenarios of schema "widening" without requiring a backfill to re-build the table. The Spark SQL queries for doing this are straightforward. A new PySpark script is added called `exec.py` that just runs a single query and that is used for running a migration query. This kind of script might be useful for other things in the future as well, so it's not called `migrate.py` or something like that. Most of the work for adding this capability is from needing to initialize all the EMR job running apparatus in `Apply` now to run migration Spark jobs, in addition to in `Open` for running transactions. So there's a lot of code that got moved around to support that, as well as the core changes for supporting column migrations.
e0fd676
to
ded4a69
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description:
Adds support for column type migrations to materialize-iceberg. See commit messages for more details.
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
I manually tested all of these migrations, and added as much unit test coverage as I could. Our SQL materializations have a test suite that covers migration scenarios, but this is the first materialization that doesn't use the
materialize-sql
framework, and so it can't use those same tests. We should make a more generalized set of tests for that and I'll probably get around to it at some point, but didn't do that right now since I'm not totally sure what it would look like (I want to get another materialization or two using theMaterializer
interface), and it's just a big pain to do the equivalent of "INSERT INTO" on an Iceberg table for putting in test value, so that will be a project unto itself - perhaps for if we pursue a compute-less delta updates mode.This change is