Inside Rippling’s real-time reporting engine

Published

Aug 7, 2023

Rippling’s platform becomes more powerful as we integrate data from more applications and services. Reporting is a critical element of that functionality, allowing users to merge data across applications to answer business questions. This functionality leverages Rippling’s core value proposition: an understanding of how business application data relates to the core employee record.

Why build our own?

Rippling’s mission is to free intelligent people to work on hard problems. To achieve that goal, Rippling reports should be able to quickly answer data questions. Such a system would also need to provide consistent access to all data within the Rippling platform while ensuring the information available to a user reflects their data access permissions. 

We wanted to build a BI tool that could compete with standalone BI systems. Every product we build would be deeply integrated with our report capabilities.

We published a product-focused blog on Rippling's new Custom Reports, which aims to unify and enhance workforce analytics by pulling data from multiple systems and third-party apps through Employee Graph. The new Custom Reports offers SQL-like joins and Report Formulas for customization and calculations within the platform. In this post, we’ll cover technical architecture, challenges, and the future of Rippling’s Reporting.

Reporting Engine Architecture

Reports product has a frontend component where users can select attributes they want to report on. User inputs are parsed into a query plan. The query plan is executed by our engine which interacts with different datastores such as MongoDB. We have multiple caching layers to speed up requests. Finally, the serializer converts the data into multiple output formats. 

Query Planner

User inputs are converted to SQL-like logical operations with domain-aware defaults. For example, in the figure above, users selected fields from three datasets (Employee, Time & Attendance, Zendesk). The system was able to automatically figure out that Zendesk needs to be pre-aggregated and then joined with Employee which is eventually joined with Time & Attendance. These domain defaults help make the overall experience simple for the user. Users can later edit any operations from the UI which updates the query plan. 

Logical operations form a Directed Acyclic Graph (DAG) since multiple downstream operations can depend on the same set of upstream operations. Our execution engine topologically sorts the DAG and executes the list of operations sequentially.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 [ { "operation": "GroupOperation", "inputId": "dataset_ZendeskTicket", "groupByConfigs": [ { "attributeId": "ZendeskTicket:assignee", "bucketByConfigs": [], "groupingDepth": 0 }, { "attributeId": "ZendeskTicket:thirdPartyDataCreatedAt", "bucketByConfigs": [ { "bucketId": "date_str" } ], "groupingDepth": 0 } ], "outputId": "group_output_0", }, { "operation": "AggregationOperation", "inputId": "group_output_0", "groupByConfigs": [ { "attributeId": "ZendeskTicket:assignee", "bucketByConfigs": [], "groupingDepth": 0 }, { "attributeId": "ZendeskTicket:thirdPartyDataCreatedAt", "bucketByConfigs": [ { "bucketId": "date_str" } ], "groupingDepth": 0 } ], "aggregationConfigs": [ { "id": "ZendeskTicket:dueAt", "numeric": false, "aggregationFunction": "MAX", "setIntegralValue": false, "name": "Due date of ticket", "dataType": "date", "alias": null, "inferred": null, "semanticType": null, "auxiliaryAttrIds": [] }, { "id": "ZendeskTicket:count", "numeric": true, "aggregationFunction": "COUNT", "setIntegralValue": false, "name": "Zendesk > Ticket Count", "dataType": "integer", "alias": null, "inferred": null, "semanticType": "CARDINAL", "auxiliaryAttrIds": [] } ], "outputId": "aggregation_output_0", }, { "operation": "JoinOperation", "leftInputId": "dataset_RoleWithCompany", "rightInputId": "dataset_TimeAttendanceReportsMaterializationCache", "leftJoinKeys": [ "RoleWithCompany:id" ], "rightJoinKeys": [ "TimeAttendanceReportsMaterializationCache:role" ], "outputId": "join_output_1", "joinConditions": [], "joinType": "left", }, { "operation": "JoinOperation", "leftInputId": "join_output_1", "rightInputId": "aggregation_output_0", "leftJoinKeys": [ "RoleWithCompany:id" ], "rightJoinKeys": [ "ZendeskTicket:assignee" ], "outputId": "join_output_2", "joinConditions": [], "joinType": "left", } ]

Execution Engine

This layer executes the sorted list of operations. Output from one operation could be used as input to subsequent operations. 

There are broadly three categories of operations:

  1. Load: Operations that load raw data from the datastores.
  2. Transform: Operations that transform data.
  3. Cache: Operations that cache data to improve performance.

Load Operations

Internally, our data is stored in MongoDB. Users can request attributes from multiple collections. The reports engine groups the attributes by collections and maintains a Dataset per collection. Each dataset is executed concurrently. We use our homegrown parallel processing and asynchronous task management framework called ETA (Estimated Time of Arrival) to schedule these parallel tasks and merge the output of these parallel tasks once the execution is done.

To minimize time, we use a two-level fanout mechanism to parallelize data fetch. In the first level, we trigger a parallel fetch of data among collections. Each parallel process identifies the primary keys of the documents to be fetched while ensuring permissions and then triggers the second level of parallel processing in batches to fetch the actual mongo documents.

Transform Operations

We heavily leverage Pandas for transform operations. This helps us avoid writing the same logic again and optimizes our in-memory computation. Through benchmarking, we found some of the Pandas operations are 25-200% faster than if we were to write in-memory versions of those ourselves. Some of the data transformation operations currently supported are joins, grouping, aggregation, filter, and pivots.

Cache Operations

We cache intermediate datasets to decrease the resource load, reduce run time and lower costs. At the same time, we re-architected all other features to use the same sequence of base operations so that the intermediate output, if cached once, can be used across.

Serializer

Once we get the dataset from the executor, it can be serialized into any of the multiple output formats we support.

Scaling reporting to a million users

We wanted our customers to generate reports on real-time data with the report data generation time of under 60 seconds and serve more than one million users. These posed significant technical and operational challenges. 

Infrastructure

To handle incoming traffic for more than one million users and adhere to a strict report data generation time of 60 seconds, we ensured each layer in our system can scale horizontally.

Sampling improvements

Reports support two views: Sampled and Full. When a user generates a report, we first give an instantaneous overview of the report before generating the report on full data. This data is generated on sampled data. We had to intelligently sample the data to make the sampled report data more meaningful. This intelligent sampling involves choosing to identify a sample set of users who has data on all the selected attributes and then fetching only a sample of feature-rich data from each model. This sampling happens on two levels. In the first level, we identify entries with feature-rich data even before fetching and then fetch those entries. In the second level, we join the datasets based on weights that are dynamically generated based on the diversity of the data present in the dataset.

1 dataframe=dataframe.sample(n=<operation.limit>, weights='_valid_fields').reset_index(drop=True)

Unique aggregations

We started noticing times in which our aggregations were off. We realized this problem was a common case even in relational databases when there are one-to-many relationships between tables. When we join datasets with one-to-many relationships and then aggregate them, this results in double-counting.

For example, take this scenario when the Employeedataset is joined with a Device dataset with a one-to-many relationship.

Entries in Employee

Entries in Device” table

Joined dataset:

1 SELECT * FROM Employee JOIN Device ON Employee.ID = Device.EmployeeID

Query 1:

1 SELECT SUM(age) FROM Employee => 181

Query 2:

1 SELECT SUM(age) FROM Employee JOIN Device ON Employee.ID=Device.EmplyeeID => 290

Query 1” and “Query 2” are both trying to get the sum of age for employees where the former is one single model and the latter is on the joined dataset. In “Query 2”, the sum of age is greater than the sum of ages of employees in “Query 1” due to double-counting present in the joined dataset due to one-to-many relationships. 

We implemented aggregation in-house to avoid double-counting while keeping things performant. We now assign a unique identifier to each column and use that unique identifier to do deduplication before aggregation to make sure nothing is counted more than once. In the example below, if we want to get the average age for all employees with a device associated with them, we will use the [Age_raw] column to dedupe duplicate entries.

Redis Caching Strategies

We use various Redis caching strategies to cache intermediate datasets and other metadata during the report generation. These caching strategies helped us improve p95 response times to < 6 seconds.

Read-Through Cache

For the intermediate dataset cache, we use the read-through cache strategy to treat the cache as the primary data store and read from it. If the cache doesn’t exist, the intermediate dataset is regenerated and cached, which is subsequently used by other data generation flows.

Refresh-Ahead Cache

While the report data generation is the primary component of the reports, there is a significant amount of metadata that is very frequently accessed throughout the report generation process. We cache the metadata in Redis and refresh-ahead the cache to ensure these entries exist in the cache whenever it's accessed, thereby reducing the overhead in fetching metadata.

Future

We want Rippling’s Reporting Engine to be the most powerful and fastest way to access real-time data, discover insights, inform decisions, and take action directly.

Dashboards & Embedded Reports

Dashboards provide insightful analytics that will drive users toward important business metrics and goals. As soon as users log into Rippling or click into an app, they will see a landing page that introduces anomalies, flags risks, highlights certain trends and patterns, outlines progress toward goals, and allows users to jump into action if needed. Embedded Reports bring insight into an important metric and provide a clear picture of what’s going on within a specific tab, app, or dashboard.

SLA for data freshness

Due to multiple layers of caching and materialized views existing on a lot of reportable models, there could be a delay in the change committed to the database and when the change is available in reports. We currently have monitoring on the SLAs for this change data but want to move to a better framework for instantaneous reporting for critical data and configure levels of SLAs for these changes.

OLAP Engine

Reporting use cases at Rippling involves ingesting data generated by customers in real-time and providing low-latency queries over those datasets. Applications within Rippling use MongoDB, which is great for OLTP (Online Transaction Processing) workloads. 

On the other hand, OLAP (Online analytical processing) specializes in running long and complex analytical queries under low latencies. Internally, a lot of these systems store data in a columnar format, which makes it efficient for analytical queries.

Take the above example of an employee table. All the data for a given column (e.g., Department) will be grouped together and stored in a contiguous manner – an excellent way for executing analytical queries. If you get a query like select count (*) where department is Engineering, all we need to do is process this one column instead of all the rows in that table, which leads to a massive speedup.

last edited: April 11, 2024

Authors

Venu Madhav

Engineering Manager

Nikunj Aggarwal

Staff Software Engineer