Blog

Epsio performance on TPC-DS dataset versus Flink

January 27, 2025
Maor Kern

Epsio is a streaming SQL engine that allows you to create incremental views within your database. To do this, Epsio consumes a replication stream from the customer’s database, and sinks back the results into that same DB.

In comparison with other streaming engines such as Flink (or most - perhaps any- corresponding streaming engines), Epsio's first aim is to have absolutely seamless integration with your current database; this means no need to setup any external components (such as Debezium/Kafka) aside from Epsio itself, and no new API outside of your database.


Our other main focus outside of integration was of course, world-class speed. This article showcases that speed in relation to the highly used open-source Flink; another stream-processing engine.

TL;DR

Epsio goes from being 7x all the way to 21x the speed of Flink, while maintaining absolute consistency and correctness (in contrast to Flink and many other streaming engines).

To benchmark ourselves we run internally designed aggregations against the TPC-DS dataset. The TPC-DS benchmark simulates a retail business environment, offering a schema with multiple interrelated tables (e.g., sales, inventory, customers) and complex queries. We use internally designed queries in order to better simulate building “views” above the dataset; e.g. instead of running a query to select the total amount of sales for a specific date, we would have a query to give the total amount of sales per day. This means the queries we run to benchmark ourselves are much heavier on the “aggregatory” factor as opposed to searching specific data.

In this blog’s benchmark, we are going to run ourselves against Flink. In a “real-world” scenario, both Epsio and Flink would be consuming changes directly from Postgres; to ensure we’re benchmarking Epsio against Flink, rather than comparing Epsio’s CDC forwarder with Debezium (despite our significant advantage there :)), both benchmarks are designed to begin only after Debezium or the CDC forwarder have completed fetching all changes from the database. No additional “work” beyond fetching the rows is performed before the benchmark starts.


We’ll be running aggregative queries on TPC-DS Scale Factor 100 (approx. ~1 billion rows overall in the database, not all used for every query), and writing back the results to the original Postgres. Each will run on a GCP N2 instance with 16 vCPUs, 64 GB RAM, running Ubuntu 22.04 (`6.8.0-1018-gcp` kernel), and a 2TB disk.

Flink Setup

To setup Flink to consume from a Postgres DB, we used the recommended Debezium connector, which streamed into different Kafka topics.

Debezium, although an industry standard, has its fair share of issues, which is why Epsio doesn’t use it internally. For starters, almost any Streaming SQL engine based on Debezium is **inconsistent across tables,** since the engine will need to consume the tables separately. For most companies this is a non-starter, as you’re continuously showing incorrect results. Debezium is also pretty un-performant, and even with the Avro format which sped things up, we weren’t able to get Debezium to stream above 70–100k rows/s (depending on the rows). But since we’re not measuring Debezium’s speed, that’s fine- we’ll make sure Debezium finished loading all rows into Kafka before running our queries on Flink.

As for Flink itself, we tried copying most of Flink’s configuration from a Nexmark benchmark. Flink by default isn’t configured to leverage the machine it’s on fully, so we settled on creating one jobmanager and one taskmanager, with the taskmanager set to use 16 cpus with parallelism set to 16 (given we have 16 CPUs). This means every action in Flink will be parallelized to 16 CPUs, including sinking into the result DB. We also disabled incremental checkpoints to speed things up; this setup wouldn’t be crash safe.

The whole setup can be found here:

https://github.com/Epsio-Labs/public-benchmarks/tree/main/flink

Epsio Setup

Because Epsio is self contained, we didn't need to run any other components in the system like Debezium or Kafka. Epsio is run with a one liner:

curl https://epsio.storage.googleapis.com/20.14.0/install.sh | DEPLOYMENT_ID=XXX VERSION=20.14.0 bash

and setup is then done to configure Postgres details.

All Epsio views (queries with "names" where the results will be stored) are created via the user's own database; no need to interface with another server/program!

To create views, we ran the following in the TPC-DS database (which notifies Epsio to create the view):

CALL epsio.create_view('view_name', 'SELECT ... FROM ...'); 

which populated the first result and then began streaming incoming CDC (which we preloaded in this scenario).

Aggregative Queries

We ran four different aggregative queries on Epsio & Flink.

Count Query

The first query is essentially a “simple aggregate” query, seeing how long it takes to do a simple count(*) while streaming in the largest table of the TPC-DS dataset: store_sales, with ~288 million rows. This query mainly tests the overhead of moving data through the system, as well as applying a very simple aggregation.

SELECT count(*) FROM store_sales;

Performance Factor: 7.77x

Sales Aggregations Group By

The next query will give us different aggregations of sales, grouped by day and a host of other factors (manufacturer, brand). The end result has ~4.8m rows. This query has a mix of heavy joins and aggregates, with the GROUP BY outputting a relatively high amount of aggregations.

SELECT item.i_manufact_id,
	dt.d_year,    
	dt.d_moy,    
	item.i_brand_id AS brand_id,    
	item.i_brand AS brand,    
	SUM(CAST(FLOOR(ss_ext_sales_price) AS BIGINT)) AS sum_agg,    
	MAX(CAST(FLOOR(ss_ext_sales_price) AS BIGINT)) AS max_ss_ext_sales_price
FROM date_dim AS dt
JOIN store_sales AS ss ON dt.d_date_sk = ss.ss_sold_date_sk
JOIN item ON ss.ss_item_sk = item.i_item_sk
GROUP BY i_manufact_id, dt.d_year, dt.d_moy, item.i_brand, item.i_brand_id;
Performance Factor: 9.05x

Sales Aggregations Group By #2

The third query also centers on aggregating sales per group but is much more “aggregative”. Its output is condensed to 2,600 rows through aggregation, though nearly all rows (~95%) contribute to the final result.


SELECT     
	t_hour,    
	CASE WHEN t_minute >= 0 AND t_minute <= 29 THEN 'FIRST' ELSE 'SECOND' END AS half_hour,
	s_store_name,    
  hd_dep_count,    
  COUNT(*)
FROM store_sales
JOIN household_demographics ON ss_hdemo_sk = hd_demo_sk
JOIN time_dim ON ss_sold_time_sk = t_time_sk
JOIN store ON ss_store_sk = s_store_sk
GROUP BY     
  t_hour,     
  half_hour,     
  hd_dep_count,     
  s_store_name;
Performance Factor: 21.44x

Sales Trends

The last query analyzes customer purchase behavior across multiple sales channels (store, catalog, and web) over two consecutive years, focusing on identifying customers whose catalog purchases show stronger growth trends compared to other channels. This query is fairly complex and heavy; it tests how well the streaming engine is able to handle many operators as well as moving large amounts of data through the different operators. We tried playing a bit with Flink to reduce the parallelism per operator, but found the default settings we set to still be best here.


WITH year_total AS (
    SELECT
        c_customer_id AS customer_id, 
        c_first_name AS customer_first_name,
        c_last_name AS customer_last_name,
        c_preferred_cust_flag AS customer_preferred_cust_flag,
        c_birth_country AS customer_birth_country,
        c_login AS customer_login,
        c_email_address AS customer_email_address,
        d_year AS dyear,
        SUM(((ss_ext_list_price - ss_ext_wholesale_cost - ss_ext_discount_amt) + ss_ext_sales_price) / 2) AS year_total,
        's' AS sale_type
    FROM customer
    JOIN store_sales ON c_customer_sk = ss_customer_sk
    JOIN date_dim ON ss_sold_date_sk = d_date_sk
    GROUP BY c_customer_id, c_first_name, c_last_name, c_preferred_cust_flag, 
             c_birth_country, c_login, c_email_address, d_year

    UNION ALL

    SELECT
        c_customer_id AS customer_id,
        c_first_name AS customer_first_name,
        c_last_name AS customer_last_name,
        c_preferred_cust_flag AS customer_preferred_cust_flag,
        c_birth_country AS customer_birth_country,
        c_login AS customer_login,
        c_email_address AS customer_email_address,
        d_year AS dyear,
        SUM(((cs_ext_list_price - cs_ext_wholesale_cost - cs_ext_discount_amt) + cs_ext_sales_price) / 2) AS year_total,
        'c' AS sale_type
    FROM customer
    JOIN catalog_sales ON c_customer_sk = cs_bill_customer_sk
    JOIN date_dim ON cs_sold_date_sk = d_date_sk
    GROUP BY c_customer_id, c_first_name, c_last_name, c_preferred_cust_flag, 
             c_birth_country, c_login, c_email_address, d_year

    UNION ALL

    SELECT
        c_customer_id AS customer_id,
        c_first_name AS customer_first_name,
        c_last_name AS customer_last_name,
        c_preferred_cust_flag AS customer_preferred_cust_flag,
        c_birth_country AS customer_birth_country,
        c_login AS customer_login,
        c_email_address AS customer_email_address,
        d_year AS dyear,
        SUM(((ws_ext_list_price - ws_ext_wholesale_cost - ws_ext_discount_amt) + ws_ext_sales_price) / 2) AS year_total,
        'w' AS sale_type
    FROM customer
    JOIN web_sales ON c_customer_sk = ws_bill_customer_sk
    JOIN date_dim ON ws_sold_date_sk = d_date_sk
    GROUP BY c_customer_id, c_first_name, c_last_name, c_preferred_cust_flag, 
             c_birth_country, c_login, c_email_address, d_year
)
SELECT
    t_s_firstyear.dyear AS first_year,
    t_s_firstyear.dyear + 1 AS second_year,
    t_s_secyear.customer_id,
    t_s_secyear.customer_first_name,
    t_s_secyear.customer_last_name,
    t_s_secyear.customer_email_address
FROM year_total t_s_firstyear
JOIN year_total t_s_secyear ON 
    t_s_secyear.customer_id = t_s_firstyear.customer_id AND 
    t_s_secyear.dyear = t_s_firstyear.dyear + 1 AND 
    t_s_secyear.sale_type = 's'
JOIN year_total t_c_firstyear ON 
    t_c_firstyear.customer_id = t_s_firstyear.customer_id AND 
    t_c_firstyear.dyear = t_s_firstyear.dyear AND 
    t_c_firstyear.sale_type = 'c'
JOIN year_total t_c_secyear ON 
    t_c_secyear.customer_id = t_s_firstyear.customer_id AND 
    t_c_secyear.dyear = t_s_firstyear.dyear + 1 AND 
    t_c_secyear.sale_type = 'c'
JOIN year_total t_w_firstyear ON 
    t_w_firstyear.customer_id = t_s_firstyear.customer_id AND 
    t_w_firstyear.dyear = t_s_firstyear.dyear AND 
    t_w_firstyear.sale_type = 'w'
JOIN year_total t_w_secyear ON 
    t_w_secyear.customer_id = t_s_firstyear.customer_id AND 
    t_w_secyear.dyear = t_s_firstyear.dyear + 1 AND 
    t_w_secyear.sale_type = 'w'
WHERE 
    t_s_firstyear.sale_type = 's' AND 
    t_s_firstyear.year_total > 0 AND 
    t_c_firstyear.year_total > 0 AND 
    t_w_firstyear.year_total > 0 AND 
    CASE 
        WHEN t_c_firstyear.year_total > 0 THEN t_c_secyear.year_total / t_c_firstyear.year_total 
        ELSE NULL 
    END > CASE 
        WHEN t_s_firstyear.year_total > 0 THEN t_s_secyear.year_total / t_s_firstyear.year_total 
        ELSE NULL 
    END AND 
    CASE 
        WHEN t_c_firstyear.year_total > 0 THEN t_c_secyear.year_total / t_c_firstyear.year_total 
        ELSE NULL 
    END > CASE 
        WHEN t_w_firstyear.year_total > 0 THEN t_w_secyear.year_total / t_w_firstyear.year_total 
        ELSE NULL 
    END;
Performance Factor: 10.49x

Conclusion

Epsio is significantly faster than Flink; up to 21x in this benchmark. In contrast to Flink and many other alternatives, which demand setting up other components (Debezium, AVRO registry), configurations (memory, parallelism, etc), and dealing with a new interface, Epsio also works right out of the box with the existing database you already have.
Furthermore, Epsio is always correct to a certain point in time while Flink is not, and supports arbitrarily complex SQL.

Deliver instant & up-to-date results for complex queries