From thousands to millions: Scaling our reporting engine

Our previous blog post introduced the Rippling analytics engine and how we tuned it to support real-time queries. As Rippling began onboarding larger customers, we noticed that data quickly grew over 10x in volume. We also saw our tail latencies increase as our beta customers began to fill their home dashboards with reports. We now run more than 400,000 reports a week, with the most performance-intensive ones loading intermediate data from more than 2.5 million rows.

In this blog post, we’ll discuss our novel architecture that provides real-time reporting over millions of rows of employee data and helps organizations quickly discover insights. Specifically, we’ll cover how we re-engineered crucial functionalities such as drill-downs, column filters, and rollups. And we’ll delve into our rollout strategy for this comprehensive re-architecture, which was executed to ensure our customers didn’t experience any glitches during the transition.

Growing data: 10x volume

Until Q4 of 2022, Rippling’s datasets for analytical queries peaked at around 200,000 rows. However, within a few months, the size of our intermediate datasets grew more than 10x. Apart from onboarding larger customers to Rippling, product launches such as Rippling Recruiting contributed to this explosive growth in data. With this growth, we started seeing bottlenecks in our architecture.

Number of report data rows generated over time

Existing architecture

Reporting v1 architecture

The reporting engine relied on our homegrown parallel processing and asynchronous task management framework called ETA to load raw data in chunks from MongoDB and Pinot. MongoDB is a document database optimized for write-heavy workloads and efficiently inserts and retrieves semi-structured documents. Pinot is a columnar datastore optimized for read-heavy workloads and analytical queries. 

Once data was loaded in memory, Pandas was leveraged for fast in-memory transformations. We extensively cached intermediate datasets in Redis.

With growing data volume, serialization and deserialization across various layers became a bottleneck. In the trace shown in Figure 3, LoadOperation (load raw data) and WriteMergeScreenCache (cache intermediate data to Redis) were the two most expensive operations. More than 80% of the time in both these operations was spent on serialization and deserialization of data, which slowed this report run by 25 seconds.

Example report showing stepwise time distribution of latencies in data generation

We relied on ETA workers to parallelize data loading. As we were loading multiple reports on the same screen, we often encountered delays when starting tasks. 

Data loader ETA worker start delay p95

V2 architecture

Our v1 system was spending most of its execution time on data serialization and deserialization across multiple layers, especially as the data volume grew.

To address this bottleneck, we adopted an architecture to push query operations to the database. Pinot could natively support query operations such as WHERE clauses and MIN/MAX/SUM aggregations. However, it lacked support for key functionalities such as nested queries and joins. 

To overcome these limitations on Pinot, we integrated Presto into our stack. Presto, a distributed query engine, allows SQL-like querying and has built-in optimizations to minimize data serialization costs by pushing query operations down to Pinot. 

For example, we apply permission filters on individual datasets before joining them and performing aggregations. Since Presto can push such filters to Pinot, it ends up fetching much smaller datasets to memory for the JOINs.

The use of Presto eliminated our reliance on ETAs by handling parallelization internally and eliminated the need for intermediate data caching in Redis, further simplifying our processes.

Reporting v2 architecture

In this architecture, the logical operations are now converted into queries instead of in-memory transformations. 

Instead of directly converting operations to presto-SQL queries, we first convert them to our homegrown DSL, Rippling Query Language (RQL). This strategy frees us from being locked into a specific query engine implementation and allows us to apply query optimizations across a single, reusable layer for various products within Rippling. The process of serialization initiates the execution of the query. The RQL layer transforms the query into PrestoSQL. Following this, Presto carries out the execution through Pinot and delivers the final filtered dataset.

Performance

Below is a visual depiction of the p99 latencies for the two systems.

Reporting v1 vs Reporting v2 data generation latency p99 comparison over time

Our new architecture tackles the issue of timeouts or prolonged loading times when reporting on large datasets. For example, a report template named "Milestone to hired ratios by department" previously took over 30 seconds to load a dataset with an intermediate joined size of 1 million rows. With the new architecture, this report now loads in approximately three seconds.

Here are some benchmarks comparing the performance of the old and the new architectures.

Report template

Intermediate dataset size (# rows)

Time taken by the old architecture

Time taken by the new architecture

Milestone to hired
ratios by department

1M

10M

33s

Times Out (>60s)

3.1s

7.5s

Transitioning from Python-based v1 to SQL-driven v2

Our v1 architecture coupled business logic with in-memory transformation. Transitioning to v2 required expressing the business logic into SQL queries. This necessitated a complete overhaul of all features in our analytics suite. Let’s dive into how we rebuilt some of the key features, such as drill-downs, column filters, pivots, and rollups.

Translating report features into SQL

Column filters choices

In reports, users can filter by selecting unique choices from dropdowns based on generated data for string and enum type attributes.

Column filter choices in reports

In v2, we’ve improved the generation of filter choices by effectively managing multiple columns without resorting to separate "column.distinct" queries. With Presto's assistance, we consolidate this process into a single query. This query consists of nested sub-queries linked together and followed by Presto's array aggregation operation (ARRAY_AGG), which returns unique choices per selected attribute. This method eliminates the inefficiencies encountered in reporting V1, which required loading all raw data into memory and executing distinct operations for each attribute sequentially.

Drilldown

Drilldown is a feature that lets users dive deeper into an aggregated value by presenting a detailed grid view of all the rows that contribute to that value. This allows for a more comprehensive understanding of the data.

Drilldown in reports

In the v2 stack, we intelligently formed the reports query to fetch the correct drilldown response. In contrast, v1 relied on intermediate caches storing all raw data, occasionally resulting in latency issues on cache misses.

In the following example, a user has selected attributes from the Application, Application Stage, and Application Rejection Reason tables.

Applied At

Days in stage(SUM)

Applicants(COUNT)

Application Rejection Reasons(COUNT)

Oct, 2019

15

2

1

Nov, 2019

12

2

1

If the user drilldowns on Applicants(COUNT) for Nov 2019, it will generate the subsequent view: 

Applied At

Days in stage(SUM)

Applicant ID

Application Rejection Reason

Nov 3, 2019

7

applicant_a

“no reason”

Nov 10, 2019

5

applicant_bd

“lacks experience”

1 2 3 4 5 6 7 8 9 10 def generate_drilldown_query(report_query, field_clicked, group_clicked, one_to_many_relations): report_query.remove_aggregations() #remove aggregations first drilldown_filter = generate_drilldown_filter(report_query.groupable_columns, group_clicked) #create a new WHERE query filter for the selected cell report_query.where_filters.add(drilldown_filter) #handle multi level drilldown if field_clicked in one_to_many_relations: report_query.aggregate_models(one_to_many_relations[field_clicked]) return report_query

In the above code snippet, we add a new WHERE filter for the selected cell. If the field selected is part of a one-to-many relationship, such as “Days in stage(SUM)” the query aggregates those models to support further drilldown.

Rollups

Rollups are a powerful feature that allow us to display aggregated values for each group, as well as their parent groups, when a report is grouped by multiple fields. This can be seen in the example below:

Rollups for aggregated values in reports

In this instance, the rollup “All” row shows the aggregated values across all locations and departments, while the "Chicago Office" rollup row shows aggregated values across all departments within the Chicago location.

In our v2 architecture, we use Presto's ROLLUP function to manage rollup rows. However, Presto's response assigns NULL values to the rolled-up field (e.g., Department), making it challenging to distinguish such rows from non-rolled-up rows where the field is actually NULL in the database. For example:

Work Location

Department

Annual Salary (Average)

Chicago Office

NULL

$125,843

Chicago Office

NULL

$96,529

In this table, the first row is a rollup row where Department has been rolled up, and the second row is where Department is actually NULL in the database.

Here's how we address this issue and distinguish between the rows in our response:

1. Add sorting in the query to ensure that parent rows always appear above child rows.

1 GROUPBY worklocation, department ORDERBY COUNT(*) DESC

2. Identify duplicate groups (e.g., (Chicago Office, NULL)) while parsing Presto's response.

1 2 3 grouped_df = df.groupby(group_cols, dropna=False) for group_key, sub_df in grouped_df: # Rows within a sub_df represent duplicate groups, like (Chicago Office, NULL)

3. Due to the ordering guarantee of Step 1, we can now safely distinguish the child rows from the rollup row. Hence, we replace the NULL in child rows with a different placeholder such as ‘N/A.’

Work Location

Department

Annual Salary (Average)

Rollup Row

Chicago Office

NULL

$125,843

Chicago Office

N/A

$96,529

Compared to v1, where we first computed non-rollup rows using dataframe's group-by & aggregations and then computed rollup rows by grouping and aggregating each supergroup separately, our v2 architecture makes the entire process more efficient and user-friendly—marking a significant advancement in our analytics engine capabilities.

V2 launch

After creating a scalable, backward-compatible architecture, our next step was its disruption-free deployment for customers.

In terms of feature prioritization, we used two metrics for each feature that prevented reports from running on v2: the percentage of total report runs and the percentage of slow reports.

Fallback reasons - % of total report runs(left) and % of slow reports(right)

This strategy enabled us to quickly make a significant impact on a large number of slow reports.

To guarantee that every feature in v2 aligns with its behavior in v1, we developed a shadow comparator. This tool compares the results between the v1 and v2 reporting stacks in the background for every report run, logging any discrepancies.

Lifecycle of a v1 and v2 mismatch

Some examples of mismatches that were identified and fixed with the help of this shadow runner include: 

  • Missing data in cached fields: In v1, some fields were computed dynamically. These were optimized in v2 by caching them directly on the model. The shadow runner helped detect any omissions in this cache.
  • Unapplied default values on Pinot: In v1, the Mongo client used to apply default values to NULL fields. However, the Presto client doesn't do this. Therefore, we explicitly incorporated these default values into the SQL query.

After a multi-quarter effort, 85% of all reports run on our new architecture, and we are striving to reach a 100% transition to this more efficient and robust platform.

Preparing for the next 10x scale: future challenges 

Considering Rippling’s growth, we’ll soon need to worry about 10x our max data sizes. At 10 million+ rows, our architecture starts to have a new set of bottlenecks. 

Incorporating permissions IDs into query filters 

Our permissions framework currently provides a list of object IDs to which the logged-in user has access. These IDs are then incorporated as a filter in the reports query. However, when working with large datasets, this method poses a scalability issue. It becomes inefficient to include, for instance, a million object IDs in a single query.

1 2 3 PrestoUserError(type=USER_ERROR, name=QUERY_TEXT_TOO_LARGE, message="Query text length (746949) exceeds the maximum length (600000)", query_id=20231002_211202_29424_h692w)

A solution to this problem could be employing a filter with a significantly lower cardinality than the current ID-based one. The downside to this approach is that it could lead to less granular permissions, like filtering job applications based on accessible job postings rather than individual job application IDs.

Dealing with large intermediate dataset sizes

Working with extremely large datasets often pushes us up against the memory limitations of our Presto workers at the node level.

1 2 3 4 [PrestoQueryError] Query exceeded per-node user memory limit of 400MB [Allocated: 399.71MB, Delta: 9.64MB (MarkDistinctOperator), Top Consumers: {MarkDistinctOperator=399.71MB, FilterAndProjectOperator=200.32kB}

A straightforward solution to this problem would be to upscale the server nodes. However, a more scalable and efficient solution lies in further optimizing the queries to allow more query operations to be pushed to Pinot, reducing the amount of data that needs to be fetched into the Presto nodes.

Special thanks to Nizar Hejazi, Rohit Sivakumar, and Marco Ndoping from the RQL team for their contributions to this project.

last edited: April 11, 2024

Authors

Ayush Rai

Software Engineer

Abhishek Gupta

Sr. Software Engineer

Naman Agarwal

Sr. Software Engineer

Nikunj Aggarwal

Staff Software Engineer