Why are we talking about this?
Real time data ingestion is a tough thing - we know! In essence, data ingestion boils down to accepting data, whether it is an MQTT message, REST API request or raw packets over UDP. Dealing with smaller quantities is usually fine, but as input rates grow it is one of the first bottlenecks you will experience. An analogy would be that ingestion is that it's your data highway, you can either reduce traffic or increase the number of lanes you have. In the case of IoT having just a couple devices sending a message per second is super lightweight in terms of data ingestion. However, if you increase your message rate to say 1000 different data points per second and then multiply that by another 1000 devices it becomes really hard to keep up. Another simple use case would be instrumenting your backend application like we do (eat your own dog food :)). A single service on a single node can produce 100s of counters like memory usage, CPU usage, incoming requests, error logs, latency trackers, database query tracking and much more. On a low request volume it is not a real issue, however, increasing traffic and because of that fanning out on multiple instances significantly increases the throughput.
So far we never shared on what our tech stack is, but it is time to open up.
The majority of our services run on the Google Cloud Platform but we do rely on other providers like AWS for redundancy and backup. Our ingestion pipeline consists of three major parts and that is before it gets into our processing and storage engines.
The first major component is the edge nodes which accept all the incoming data. The service itself is written in Go and deployed on Kubernetes for easy scaling and deployments. The load is spread across the nodes by courtesy of Google's global load balancers. All application state, including the connection state, is separated from the edge nodes itself which means that if a node goes down, another picks the work up without having to interrupt the connection. This allows us to scale as much as we need to handle all the incoming data, but also provide resilience to failures.
The second major component is also written in Go and is responsible for managing all the state for the ingestion nodes. To make this resilient we use a commit log and a replicated Redis store to keep active state. This allows us to scale the service out as needed but also provide resilience.
The third major component is our Kafka cluster which is our final persistent store before the processing pipeline.
Data Loss Prevention
One of the most important tasks for our data ingestion pipeline is preventing any kind of data loss, especially since we're dealing with distributed systems and ephemeral nodes to do the work.
The MQTT protocol has three different quality of service levels that can be specified:
- 0 - fire and forget where the device basically just shoots a message and hopes it gets received, if not, nothing happens
- 1 - we wait for the message to be acknowledged, in case of a timeout due to not receiving an acknowledgment or a negative response the device can resend the message until it is sure it was received. We operate at this service level!
- 2 - only once message passing. All the messages are kept on the server side and for any incoming message, we have to verify its message id and that it was not received before, otherwise it is discarded. On top of this, we still do everything that is contained within the QoS 1. We don't use this as it significantly impacts performance and requires our ingestion system to keep a very large state at all times. We manage data deduplication in our processing pipeline to ensure exactly once processing.
For our entire pipeline to be reliable we followed the logic of QoS 1 which means for every handoff of your data we rely on a strong acknowledgment that it is durably persisted. This means that once we receive your message, it is validated and then passed on to our Kafka cluster to be durably persisted. Only once we get a positive acknowledgment do we return control to our ingestion nodes and acknowledge we received your message. This prevents a lot of situations where your data can be lost in flight, or even worse, our ingestion node can fail and thus the live memory with it as well, thus all data living in that split moment in the ingestion node can be lost. However, you can then easily know that we didn't receive it so you can resend your data. The edge case here is that the data can be received multiple times and result in data duplication. However, that only lives until it gets into the data processing pipeline where we manage deduplication based on your project and client identifiers along with other data and its timestamp. We continue this practice in the subsequent steps as well all the way to the database where we are sure it is persisted, which we will cover in upcoming blog posts.
Handling data ingestion at any volume is a hard task and requires a lot of time and effort to set up and maintain. Counting reliability and availability in as a must have feature significantly ups the stakes and force you to deal with replication and distributed state. That is why we are working super hard to deliver you the best possible experience in the least amount of time needed for setup so you can work on your system instead of reinventing the wheel.
Subscribe to DataGekko
Get the latest posts delivered right to your inbox