Powering Glovo’s Machine Learning with Real-Time Data. Part 3: Event-Driven Architecture

Meghna
The Glovo Tech Blog
9 min readApr 6, 2021

--

This article is part of a series co-written with Francisco Rodriguez, Pablo Barbero, and Enrique Fernandez about Machine Learning feature engineering in production environments. You may find the previous posts here and here.

If you have read our first post you would have an understanding of how we make use of the Machine learning (ML) models to make predictions about order stages and what is the significance of real-time aggregation to these models.

Real-time aggregation here means performing a set of data transformations on multiple data points to compute a result which we refer to as real-time features. These features act as input for ML models. For instance, the number of unpicked orders for a restaurant is a really important input while predicting the preparation time of any new order.

Initial Solution & its drawbacks

In our initial days, we started using SQL queries for doing aggregation in real-time. We would query the MySQL database from our JAVA application and cache this result with a time to live of 10 mins. This information was used as features for our machine learning models, and to balance the accuracy of the calculations with load these queries would add to the system. This means that each feature would be calculated at present or up to 10 minutes ago. With time these models were getting more complex and required features that needed multiple data points such as variance, rolling averages, spectral density, and more.

Initial implementation of feature-engine

Querying this type of information directly from the database had multiple drawbacks:

Inaccurate results:
Since SQL results got refreshed every 10 minutes, real-timeliness was missing. If there is a sudden surge in the number of orders for a restaurant, ML models would predict an incorrect time to prepare for at least 10 minutes. This impacted the overall efficiency of the operations. We saw a drop of 20% in error after we started using aggregated features.

Difficult to Scale:
The SQL queries we used were very complex and consumed significant system resources, we started seeing huge traffics and increased latencies in Database queries while fetching aggregated results, which impacted the complete subsystem. The situation was even worse during peak operational hours.

Limited Java Libraries for data transformations:
Java has a limited number of libraries and frameworks to perform this type of data transformations as compared to the stack that data scientists have access to in python.

We had to code the transformation in Java and ensure similar results for both platforms. This leads to duplication of a code in training and serving pipelines, which opens the door for inconsistencies.

Higher Time to Deliver a Feature:
Since the Data scientists were dependent on the Engineers to tweak/update a transformation, the time to get a new feature to production was quite high. For even a small feature a considerable amount of engineering effort was required to understand and implement the desired transformation as part of our Java code. It took around 3 sprints to implement a new feature, and maintaining that code was quite inefficient.

Event-Driven Architecture

To solve these problems, we’ve developed an event-driven module that acts as a platform for consuming various business events, as well as computing and serving the real-time aggregated features.

Every time a relevant business event happens — an order is placed, an order was picked by courier or delivered to a customer — we publish a data-rich event on an AWS Kinesis topic. Each of the event topics is mapped with one or more AWS Lambda functions by leveraging the event source mapping. Lambda function performs the following operations:

  • Storing the events on to a Redis Cluster (data store),
  • Removing any stale event from the data store if any.
  • Compute the real-time features.
  • Publish the results to another AWS Kinesis topic.

The real-time feature result event is subscribed by the application subscriber that stores it to a central Redis cluster which we call Feature Store. These features are fetched from Redis and used as input arguments whenever machine learning models are used.

The Time difference between receiving a new event and publishing the results should be nearly zero, this ensures that the information is aggregated at a near real-time instance, enabling the client to use the latest results.

Event-Driven Architecture for Real-time features

There are multiple lambda functions. Each Lambda computes and publishes a different set of real-time features. Every Lambda function defines an Aggregator Dimension and Type of Aggregation that is to be performed.

Aggregator Dimension

Aggregation requires grouping messages across certain dimensions and then applying some aggregate operations on those messages. Each lambda function has its own aggregator dimension. Let’s take an example to understand better. An Order Delivered Event computes two different sets of features:

  • The Average Waiting Time at the delivery location for a customer dimension
  • The Average Time to accept an order request by a courier dimension.

Two separate lambda functions are mapped to this event. One of them is aggregated on the Id of the courier person and the other is aggregated on the Customer Id field. This dimension acts as the key for storage and retrieval from Redis. We have different aggregator dimensions eg: Id of the store, Id of the courier, Id of the customer, etc.

Type of Aggregations:

Count-based feature Aggregations

The feature aggregation is done on the last n number of events. The data store always maintains the last N events in the cache. The eviction of the event is based on FIFO, removing the oldest event every time a new event arrives in the system once the maximum size is reached. Time to accept an order for the last 50 orders is one of such features. We have made use of Redis’ list data structure to maintain a queue for this aggregation. Any new event is added to the start of the list, if the list size is more than the limit the last element in the list is removed.

Data Store update for last 5 events

Time window-based Aggregations

These features are aggregated for a certain time window. They are periodically calculated. The number of new orders in the last 5, 10, 30, and N minutes are perfect examples of this type of aggregations. These features are to be re-calculated after each interval. For example: if for a restaurant one order is placed at 10:01 AM, the number of orders in the last 10 minutes between 10:01 am and 10:10 am is 1. If no new order arrives into the system before 10:10 then, at 10:11, the number of orders in the last 10 should be recalculated to 0.

Use of step functions for lambda invocation

For this, we use the AWS step functions along with wait n minutes to schedule calculations as time passes.

For these features we accumulate different events for each dimension to have a snapshot of the lifecycle of an order, for example, we store Order Created Event, Order Accepted Event, and Order Picked Event for store dimension. It is used to calculate features like the number of unpicked orders or the number of orders waiting for being accepted by the store etc. We have made use of Redis’ Hash data structure to enable a nested key structure and updated info to cache on the relevant events. Eg: The same event info block is updated for the order created and order picked event one aggregated dimension.

Deployment using Serverless Framework

We use the Serverless framework and git-hub actions for deploying the lambda function to the production. A Jenkin script is triggered on every push to the master branch. Serverless has plugins that facilitate dependency resolution from requirements.txt and including and external packages within lambda deployment for common libraries, which can be specified in serverless.yml

plugins:
- serverless-python-requirements
- serverless-package-external

Configuring Lambda and event source mapping can be done in the functions section of serverless.yml. We need to specify the handler method, memory limit, and timeout. For event source mapping all you need to do is to specify the Kinesis stream ARN, the batch size, and the number of retries.

functions:
order-created-lambda:
handler: handler.lambda_handler
name: serviceName
environment:
LOG_LEVEL: info
memorySize: 256
timeout: 30
events:
- stream:
arn: kinesisStream:ARN
batchSize: 1
maximumRetryAttempts: 5
startingPosition: LATEST

Monitoring with DataDog

We use Datadog for monitoring the Lambda function and event publishers and consumers. Lambda provides various invocation, concurrency, and performance metrics that enable you to identify bottlenecks and errors as well as manage costs. Datadog provides out-of-box integration with all AWS Lambda metrics, hence providing visibility to the whole system.

Impact of new architecture

Developer’s velocity:
Time to market a feature increased by 3x. The aggregator codebase is using python and has a separate git repository and CI pipeline, so now engineers and data scientists can work independently. Decoupling the real-time features from the application code has enabled rolling out new features at a faster pace and minimizing the dependency of Data scientists on engineers.

Timeliness and Accuracy:
Realtimeness of an event aggregation is crucial for machine learning models to predict accurate results. For example, the number of unpicked orders acts as input to the model predicting how much time will a restaurant take to prepare an order. Having this information updated with new order creation and order-picked events increases accuracy. For some of our features, we have observed a reduction of 20% in error metrics after using the real-time aggregated features.

Scalability:
Using AWS Lambda and Kinesis stream benefits in terms of scaling the computation resources as per the requirements, without any extra effort. During the peak hours, we process 16K transformations per minute, using 100+ lambda instances, without any additional overhead of scaling.

Plots depicting the scale-out during the day

Consistency of the code:
To address the code duplication issue we mentioned earlier, we created a new Python package to cover the logic used both for training and inference. You can read more about it in our Real-Time Data Aggregators post.

Alternatives considered

Why we used AWS Lambda to provide scalability:

We compared using lambda vs dedicated EC2 instances for the Aggregator module.

Using EC2 instances requires management and provisioning of the environment, Autoscaling of instances at peak hours, and effort to develop the functionality of consumers to consume the event from Kinesis. We picked AWS Lambda because of these Benefits:

  • It provides a fully scalable and managed service along with monitoring. We scale from 3 instances to 100+ lambda instances during the peak hours.
  • Available Event source mapping to trigger lambda on the arrival of any new events along with retry functionality
  • As per the load, we have Lambda was more cost-effective, as you only pay for the execution duration.

Why we used a Redis elastic-cache Instance instead of a database:

Redis (elastic cache) has been part of our infrastructure for quite some time, we are well aware of all the complexities and bottleneck with using Redis.

We explored some time-series databases: Influx(NoSQL), Timescale(SQL), and Redis Time Series module to store the data points for transformation. We identified certain Benefits with Redis:

  • Amazon ElasticCache for Redis instances is a fully managed service, whereas we would have to set up, configure and manage the other database explicitly.
  • Various data structures provided by Redis make it very effective for our use case, which is not supported by these Databases.
  • The speed of operation is unmatchable with any persistent system.

Future Enhancements

The current Implementation is missing the requirement of historical features like the total number of orders delivered by a courier, or the total number of orders placed by a customer, etc. As part of future enhancements, we want to use a persistent database as a feature store. This would enable us to store historical information.

If you have experience in building similar applications, we’d love to hear your feedback. We value knowledge sharing, and it’s the drive for industry innovation. Please check our website to learn more about us. Lastly, if you are interested in joining us, we are hiring!

--

--