Eppo's new pipeline architecture reduces both warehouse costs and pipeline run-times. Here's how we did it.
Everyone enjoys fresh metrics in the morning – perhaps over a warm cup of coffee and a bagel – but data pipelines always seem to reach an inevitable breaking point. Compute may be cheap, but recomputing up-to-date metrics for the Complete History of the Company isn't free. At some point someone will get the idea to add today's data to yesterday's cumulative data, and thus begins the company's journey into the murky abyss of incrementally updated data pipelines.
What is an incremental pipeline, you ask? Imagine it’s election night, and that there is a “pipeline” in place that counts votes at the precinct level, then aggregates all of the precincts up to the county level, and then adds up the counties to compute state-level vote counts. This system works well to produce a final number and declare an election winner.
But now suppose you’re running a news organization, and don’t just want a final number, but would like to know the vote totals as they come in from the various precincts and counties over the course of the evening. When a precinct updates its vote tallies, it’s not very practical to ask every precinct to recount their votes as well. Instead, we’d want to update each county number to reflect the changed precincts, and only update the state-level numbers to reflect the changed counties. This is the essence of an incremental pipeline: it adds new data to existing aggregations, rather than re-computing everything from scratch with every update.
There are some new streaming-data technologies, including Timely Dataflow, that promise to solve the up-to-date metrics problem using streaming views. However, these technologies haven't reached run-of-the-mill warehouses, so the rest of us are left to roll up our sleeves and implement our own incremental systems.
Before doing so, it's worth asking whether incremental processing is really and truly necessary. Incremental processing introduces complexity, as the pipeline will have to maintain state that it didn't have to worry about before. In addition, there will need to be a strategy in place for handling late-arriving data. If Tuesday's data arrives on Thursday, Friday's incremental update may miss it.
Certain types of data benefit the most from incremental processing:
By reducing the amount of reading that occurs on these types of data sources, an incremental pipeline can work faster and more cheaply than their non-incremental counterparts.
Eppo began working on an incremental system last summer so that our customers could see up-to-date metrics every morning without incurring unnecessary costs. To track the state of processing each pipeline, we developed a data asset system (loosely inspired by Dagster’s) to track what exactly had been computed in the pipeline, and when. These assets refer to specific rows, columns, and tables in the customer data warehouse; "assets" include both intermediate computation artifacts as well as the final "data product", such as a data frame. Each asset is timestamped with the time of its computation, and also includes timestamps to reflect the range of data that is represented.
The job of the incremental pipeline is to take a description of a desired final product, evaluate which data assets exist already, and then form a computation plan to create or update all of the data assets necessary for the pipeline to run.
In our original incremental implementation, we rewrote our entire pipeline to make use of data assets and incrementality. That is, suppose that data asset C depends on B, and B depends on A. A planning phase may look like this:
The incremental pipeline runs by first updating A with the most recent two days of customer data, then using the result to update B, and finally to update C.
While our original incremental system was comprehensive, and performed the absolute minimum amount of compute on the customer warehouse, we found a few issues with it in practice:
Concurrent to our incremental pipeline initiative, we undertook a significant optimization effort on our core pipeline algorithms. Some of these optimizations included:
After performing these end-to-end optimizations, we found that in many cases, the performance gains from using a better algorithm more than outweighed the benefit of using an incremental computation.
To take a specific example: The Explore feature of Eppo lets users see how the results of an experiment evolved over time:
To create this graphic, we need to compute a partial sum of each metric value for each user over each day that the experiment is running. In our original implementation, we performed the naive $O(N^2)$ computation (where $N$ is the number of days in the experiment). A basic incremental version of this computation would save the results of previous days using the data asset system, and therefore only perform a daily $O(N)$ computation.
But there's another path to a linear time algorithm, one that does not require maintaining state in a separate system: writing out the partial sums, instead of computing each daily sum independently. In SQL, partial sums can be computed using a window specifier such as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. A SQL engine that processes window function efficiently will be able to compute these results in $O(N \log N)$ time at worst.
In practice, we found that our algorithmic optimization work eliminated several computational bottlenecks, and made incremental processing unnecessary for a majority of our pipeline. And so we decided to favor an incremental algorithm only when it helped to alleviate actual bottlenecks in our pipeline, and otherwise to prefer the simplicity of well-designed non-incremental algorithms.
Because we control all of the intermediate steps of our pipeline – where we have full freedom to structure, partition, and query data in an optimal manner – we finally settled on an architecture that uses incremental processing for customer (source) data, and non-incremental processing for Eppo-controlled (intermediate) data. After our algorithmic optimizations, we found that this bifurcation allowed us to imbibe the customer "firehose" while maintaining a minimum of data assets. Eliminating state from the majority of the pipeline gave us maximum flexibility for further data restructuring and optimization without having to explain computation spikes to customers due to occasional data flushes. On the customer side, the new architecture let us deliver dramatic reductions in both warehouse costs and pipeline run-times.
Overall, our incremental initiative ended up somewhat less ambitious than its original scope, but we were happy to enjoy the 80/20 benefits implied by the Pareto principle. Although SQL engines look like black boxes from the outside, and despite the temptation to defer to the implementation magic of a declarative language, reasoning about (and measuring) warehouses' internal operation allowed us to devise better table schemas and queries that obviated the need for complicated state-tracking in most of our pipeline.
We're now able to support planet-scale customers in an experiment pipeline that is able to process billions of events across hundreds of millions of end-users entirely within our customers' warehouses, delivering a daily basket of warm experiment metrics to complement our customers’ morning beverage of choice.
Building the Modern Experimentation Stack
The Warehouse-Native Experimentation Workflow
How to Set Up an Experiment in Eppo