Proposal for a DJ materialization service #407
Replies: 8 comments 5 replies
-
Since we're maintaining the re: Materialization service calling the query service, I'm wondering if the materialization service should have direct connections to the engines the way the query service does, mainly because I think the SLAs for both of these services are so different that it might make sense to keep their logical paths separate. The materialization service being down for 3 hours may almost be a non-event that users don't even notice, whereas the query service being down for 1 hour is probably a huge event (all dashboards, reports, BI tools, etc. stop working) Overall this design makes sense to me except for the part where multiple tables with different columns and different data are attached to a single node, although I think that's a separate feature that has implications for how we materialize. |
Beta Was this translation helpful? Give feedback.
-
The
I'm not sure I understand your point. I think we both agree that the query service needs a high SLA and the materialization service a lower one. If the materialization service uses the query service it should be fine then; it can go down for a few hours and no one will care. As long as we have a low SLA service depending on a high SLA one it should be fine, no?
That part is orthogonal to the materialization service, I just put it here to reinforce the fact that nodes should have multiple materialization attributes. |
Beta Was this translation helpful? Give feedback.
-
Is there a reason why the Druid call needs to be a separate plugin? I think it can live as a part of the OSS djqs as well, because in my view the "query service" is not just limited to running SQL, but acts more like a data access layer. So DJ can just make a call to the query service that tells it to ingest to Druid, with the fields necessary to generate the Druid ingestion spec, and the query service can generate the full spec and hit the Druid cluster, assuming one is configured. |
Beta Was this translation helpful? Give feedback.
-
I was thinking more about how the materialization service and main DJ service will have to compete for the query service's resources. In a scenario where the query service is under high load, we may end up going down the path of trying to do things like add a priority queue to separate the requests within the query service and I guess I'm just wondering if we should just separate them all together from the start. |
Beta Was this translation helpful? Give feedback.
-
I've been thinking about this since our discussion in the last sync. I agree that we should address this now rather than later for users that are dealing with fragmented data. I sort of see 3 user scenarios and potential solutions listed below in order of increasing complexity. @betodealmeida let me know if these scenarios are aligned with how you view this problem space.
Tackling 2 next feels like the right thing to do because it's aligned with where the industry is heading. Most organizations have a data lake strategy where they ingest data from all of these disparate sources into a data lake, usually OSS table formats in cloud storage, and then choose from the myriad of OSS compute engines depending on the individual use case. The organizations that don't have this setup are in the process of migrating towards this setup. 3 has the most challenging solution and represents the complexity that probably moved most orgs towards the data lake strategy. I think it's important that we don't understate the additional complexity to the DJ codebase by trying to solve for this. I mentioned in the sync that I have a strong suspicion that if we successfully deliver 2, then people will actually find that as a preferable alternative to more complex runtime analysis. |
Beta Was this translation helpful? Give feedback.
-
@betodealmeida @samredai @shangyian I read through all your comments about the Materialization service functionality vs the Query service one in the light of competition for resources. One thing that I feel no-mentioned clearly yet (sorry if I missed it somewhere) is that the Query service main data access mode is Read, while the Materialization ... mode is Write. With that in mind I'd like to propose the following:
The above solution would by default separate the read and write access paths, even though you can see some small implementations going through the same DA layer in the end. But even then the DJ admin would be able to see the Read and Write calls coming from two separate services, which should be easier to manage. |
Beta Was this translation helpful? Give feedback.
-
+1 to the splitting of functionality between the services along read/write access paths. I think we can have two materialization config options: (1) using a pre-configured With the latter materialization option, we would expect this flow:
Note that steps (a) and (b) could be combined into a single endpoint in DJ like |
Beta Was this translation helpful? Give feedback.
-
Some additional thoughts on when/how we should materialize: In the initial version of the materialization service, I think we should materialize all transform, dimension, and cube nodes, assuming that these nodes have Option "Always Wait for Upstreams": The DJ core service will return nodes with their immediate parents, which lets us determine which nodes to materialize first. With this DAG of nodes, the materialization service would start by finding all nodes with only source-node upstreams and kick off the materializations for these first. Then, when each node materialization finishes, it would find each node's immediate child nodes and determine if they're ready to be materialized (only true if all of their immediate parents have finished materialization) and kick them off if they are. Option "Materialize Straight Away": In this strategy, the DJ core service just returns a list of nodes. The materialization service will materialize each node without looking at their parents, meaning that each materialization query will include the transform SQL of each of its parent nodes rather than using the materialized parent node table. This setup is more wasteful, but the logic is very simple. It is also the case that if we have many transform nodes operating on "small" data, the extra overhead of waiting for upstreams so that we can reuse materialized datasets is unnecessary. My vote is for "Always Wait for Upstreams" as that seems more robust. |
Beta Was this translation helpful? Give feedback.
-
Summary
This is a proposal for a materialization service, responsible for reading declarative metadata in non-source nodes and running all the necessary queries to make the metadata true. Note that the term "service" here is used not for an HTTP service, but a scheduler worker that periodically runs tasks; for this reason "service" and "scheduler" are used interchangeably to refer to the materialization service.
Motivation
There are 2 main use cases for this service:
dbt
.Example
Imagine a DAG with a simple source, transform, and metric nodes. The source node has 2 tables in different catalogs:
(Note: it would've been nice if we had standardized on colors and shapes for node types! I'll ask Kim about it!)
When computing metric
C
we the following query is built:(Note that
some_other_table
can't be used because we filter onbar
.)Now maybe
some_table
doesn't have an index on thebar
column, so that the inner select in the metric query is slow. A user might want to materializeB
. They would do this by adding materialization attributes to the node. In pseudo-syntax (note that here nodes can have multiple representations):The materialization service would poll the DJ API for all nodes with a
materialization
attribute. For each node it would fetch a "materialization SQL"; for nodeB
this looks like:The materialization service can then run this query in the query service:
Now the DAG looks like this:
And queries for metric
C
are now faster:The second use case would be for creating and managing tables in a data warehouse, similar to how
dbt
works. This way all the information about how tables are generated would live in a single system, DJ, and migrating to different databases would be relatively easy.Architecture
Goals
Discovery
The first step in materializing nodes is discovering which nodes need to be materialized. This could be done either via the GraphQL API, or via the REST API. For the rest API we could add a new endpoint
/nodes/searches
to where the service would post:Ideally this would return a list of nodes with the materialization attributes in the payload, so that additional requests are no longer needed. For this reason, GraphQL might be a better interface for the native scheduler, since it's probably much simpler.
Execution
Once the service knows the query for each node it can use the query service to submit the materialization query. The response from the query service should be atomic and idempotent, ie, it shouldn't be possible for the
CREATE QUERY ...
to run but forSELECT ...
to fail (the service should rollback), and the result should be the same if the query is submitted multiple times. This allows the materialization service to handle retries on its own.Advanced
Incremental materialization
For many nodes we'll want to perform incremental materialization. One way of doing that is allowing users to specify a special temporal column in the
materialization
attribute, eg:When a time column is present the materialization query would take it into consideration:
If the upstream table is partitioned on the time column the scheduler could leverage that, allowing the initial materialization (or backfills) to be run in parallel for different partitions.
Non-standard materialization
Some use cases might require tables to be materialized via other methods than SQL. For example, Netflix might have a tool to send data to Druid, since currently Druid only supports
SELECT
statements. When materialization cannot be done via SQL the node attribute would specify a custom plugin, eg:The service would then find the
NetflixDruidMaterialization
plugin via an entry point:Note that a drawback of this approach is that it requires the schedulers to be written in Python, so they have access to the plugin.
Open questions
Schema changes
What should we do for schema changes? The easy solution would be to drop and recreate the table when the schema changes, but that might be expensive and might result in data loss if someone is editing a node and accidentally drops a table. The main concerns here are:
ALTER TABLE
, but the column would need to be backfilled, so it might be easier to create a new table instead.DROP COLUMN
we could keep the column and fill it withNULL
when the materialization runs after the column is deleted. Since the deleted column won't be referenced in any queries it shouldn't be a problem to keep it.Who is responsible for detecting schema changes? Should (1) the materialization service compare the schema of existing tables and the nodes being materialized? Or should (2) the DJ service compare the schemas of different versions of a given node when building the SQL?
The benefit of the second approach is that the materialization SQL produced could contain the
ALTER TABLE
statements needed to adjust the schema. But it would required the DJ service to keep track of the version currently materialized and the version being materialized, which IMHO should be the responsibility of the materialization service.The first approach might be easier, and the materialization service can use the reflection service to get the schema of an existing table, so it can be compared with the schema of the node being materialized.
Beta Was this translation helpful? Give feedback.
All reactions