Volga - Streaming Engine and Networking - Performance and Scalability Benchmark - Part 3
Designing a distributed networking stack for real-time high-throughput and low-latency data processing from scratch. Part 3.
TL;DR
We have successfully scaled Volga's streaming engine to 1 million messages per second (and beyond) for payload of various sizes while maintaining stable sub-30 ms over-the-network p99 latency for a Python-based Word Count pipeline. We have also demonstrated stable horizontal scalability of the streaming engine on a 100+ CPU Kubernetes cluster.
This is the third part of a 3-post series about Volga’s streaming engine and networking stack demonstrating performance numbers and horizontal scalability under various conditions. For in-depth overview of streaming engine internals and networking stack see post 1 and post 2. For Volga's motivation and high-level architecture see this and this.
Contents:
Assumptions, Limitations and Expectations
The Experiment Set up and Run Environments
What we measure and how
Parameters
Infrastructure set up
Results and Analysis
Conclusion
Assumptions, Limitations and Expectations
Before going into experiment details it is necessary to form realistic expectations for the reader with respect to the assumptions made when designing the experiment and how the results map to real-world conditions.
The main point is that all of the experiment scenarios in this article are designed to mainly test the behaviour of Volga's networking stack, hence other parts of what a reader would expect to be present in a real-life production streaming pipeline are disabled (either intentionally or just because we did not have them yet). One such important part is Operators State Persistence - the process of continuously synchronizing all of the pipeline's state with a local/remote storage for durability purposes. State persistence is a heavy I/O-bound operation which essentially blocks all of the upstream/downstream CPU intensive steps and significantly reduces overall throughput and latency as the whole pipeline essentially becomes I/O-bound. To read more about this and if you are curious about a what fully functional streaming pipeline performance look like you can check this post on Flink's performance tuning.
For our case we test scenarios where state persistency is disabled and the only bottleneck is expected to be either inside the network stack or during performing the actual per-message calculations. Volga’s streaming engine persistent state is currently a work in progress and is a not a focus of this article.
The Experiment Set up and Run Environments
All of the test job runs we measure perform a famous Word Count pipeline.
Why Word Count? As mentioned previously, our goal is to demonstrate how the entire network stack works hence we wanted a job topology that utilizes all of the n*n
connections. Word Count does just that: for each instance of a word from our input dictionary produced by a source worker Volga calculates it’s hash and uses it to map all instances of the word to a specific worker, same words end up only on one worker and can be properly aggregated (this is what KeyPartition strategy does). As a result, every single source worker ends up communicating with all of the sink workers rather than just a single peer (which would be the case for a simple embarrassingly-parallel job, e.g. Source→Map→Sink which would use ForwardPartition).

As mentioned in the previous post, the way we map workers to nodes (Node Assignment Strategy) also plays a crucial role in resulting performance. For our jobs we use OperatorFirst node assignment strategy: every node in a cluster contains workers for only one operator (no sharing) and all of the worker-to-worker communication is performed via RemoteChannels (and TransferActors, described in the first post), no in-node comms happening. As a result, since we enable Operator Chaining (described in the first post), this produces a fully connected n-to-n graph of workers with half being producers (DataWriters belonging to ChainedSourceOperator) and another half being consumers (DataReaders belonging to ChainedSinkOperator) communicating with each other over the network only, which matches our goal of testing the networking stack.
An alternative option would be to use ParallelismFirst strategy which relies more on in-node communication and uses LocalChannels: in theory this should demonstrate even better results due to smaller network overhead, but, since our goal is to test networking stack, we do not use it.

As mentioned above, each source worker independently generates words from a pre-defined dictionary (see code for the WordCountSource), sink workers keep count of all messages received by this worker locally and periodically dump data to a shared Ray Actor collecting counts for the whole run (see code for the SinkToCacheDictFunction).
For the run environment we distinguish 2 cases (more details in Infrastructure set up part):
LOCAL
- a job is run on a laptop (single node cluster) and uses local-only (no transfer actors or remote channels) networking stack.CLUSTER
- a job is run on a cluster of machines and hence uses remote-based networking stack with TransferActors and Remote Channels.
What we measure and how
All of the experiment runs track 2 key metrics: throughput and latency
Throughput - the number of messages processed by a pipeline averaged over duration of a run. Each source worker records the number of messages it has sent and reports it to a centralized stats actor, which later produces final result.
Latency - we define latency as a time it takes for a message to go from source worker to a corresponding sink worker. On every n-th (100-th by default) message we add a latency marker with a timestamp of when a message was generated by source and, when sink receives it, we subtract it from sink's reception timestamp to get a value in milliseconds. All of the latency readings are later binned in a histogram to get final percentile values for a job.
Parameters
Each job is run with a fixed set of parameters which we have manually identified to have the most sensitivity w.r.t. throughput and latency:
payload_size
- size of a per-message user-defined payload in bytes, a single word length in our case. Note that each message also carries extra metadata (routing key), in our case the total size of a message (msg_size
) is approximately 2x of that number (in bytes).batch_size
- number of messages to be batched on Python side before serializing them together and pushing down to Rust networking stack as a single batch for further downstream processing.parallelism
- number of parallel instances of streaming workers for an operator. Each job will have 2 * parallelism workers in total (for source + for sink)
As described in the previous post, there is also a number of other configurable options that may impact performance for certain runs (e.g. input/output buffer sizes) but we have decided to keep them fairly generic and fixed across all runs (large enough so backpressure happens only when there is an actual stall) so we do not have combinatorial explosion of data points. Although the goal of this post is to give an aggregated overview of performance numbers, it is obvious that for each user-specific case tweaking all of the possible params will most likely lead to an even better performance.
Infrastructure set up
As mentioned previously, all tests run in 2 different physical environments:
Laptop (
LOCAL
) - we run a local one node Ray cluster inconda
environment. Laptop: Apple M2 Max, 12 CPU and 96 GB RAM.Cluster (
CLUSTER
env) - we run Ray in a distributed manner via KubeRay (1 KubeRay pod per Kubernetes node) over Kubernetes cluster on AWS Elastic Kubernetes Service. Each cluster has a single Head Node and a group of Worker Nodes (which was sized according to job's parallelism), all the nodes belong to compute-optimized c5 family. We also make sure that all the nodes are co-located in the same region and availability zone (ap-northeast-1
) to minimize network latency.Head Node Spec: c5.xlarge - 4 vCPUs, 8 GiB RAM, 10 Gibps network bandwidth.
Worker Node Spec: c5.2xlarge - 8 vCPUs, 16 GiB RAM, 10 Gibps network bandwidth
You can find full Ray Cluster spec here and EKS Cluster spec here. For a detailed tutorial on how to run Volga on EKS refer to Volga Ops repo.
Important point to cover is how parallelism of a job maps to a CPU spec of a cluster/instance. We have experimentally identified that setting a limit of 2 vCPUs per worker seems to give the most stable results among variety of other parameters. The limit is purely logical which means that there is no actual virtualization-based isolation of resources per actor on a single node and we can not guarantee fair sharing of CPUs among workers (except for what OS CPU schedulers provide). Ideally number of CPUs per worker should be a variable parameter and the fact that we fix it may dilute the experiment in certain cases, however for the sake of simplicity we leave it as it is.
Results and Analysis
For the benchmark code see network_perf_benchmark.py
Cluster Runs (
CLUSTER
) - data in benchmark_wordcount_cluster_final.json (Note: we set a cut-off limit of 300 ms for latency graphs for readability).Cluster Runs: Throughput (millions messages/sec), p99 and average latency (milliseconds) for different payload sizes (message sizes) and batch sizes. All of the configurations demonstrate linear throughput scalability with increasing parallelism. The only exception is a left bottom corner (red line) - this is the case when we have the largest total message size (
msg_size
*batch_size
) of all runs which most likely reaches the limit of internal buffer inside TransferActors (as mentioned above, we fix internal buffer size for all runs). Theoretically, increasing this config value would allow pipeline to scale further.Apart from the case of
batch_size=1000
(red lines), all of the configurations demonstrate stable latency while the throughput grows with increasing parallelism.It can be seen that for each specific message size there is an optimal
batch_size
value (between 10 and 100 for most cases) which leads to optimal throughput/latency ratio. Increasing batch size beyond that will only lead to increased latency and in some cases lower throughput. It can be seen that the batch size of 1000 is excessive for all of the cases and in some cases even stalls the pipeline’s throughput (red line in the right bottom corner)As a result, we have scaled the cluster to 16 c5.2xlarge machines with total of 128 vCPUs. Theoretically we would be able to continue growing but we have put a limit here due to practical reasons ($).
Most optimal throughput/latency ratio per payload size
(payload_size=32) 1030933 msg/s, 30 ms p99 , 13 ms avg
(payload_size=256) 996462 msg/s, 20 ms p99 , 10 ms avg
(payload_size=1024) 904540 msg/s, 30 ms p99 , 16 ms avg
Local Runs (
LOCAL
) - data in benchmark_wordcount_local_final.jsonLocal Runs: Throughput (millions messages/sec), p99 and average latency (ms) for different payload sizes (message sizes) and batch sizes For the local environment it is harder to do any definite conclusions about latency since we can not guarantee any resource isolation as in the distributed cluster. Throughput, on the other hand, is still a good indication of the engine performance, even in the local setting.
(Note: we have manually zeroed case for payload_size=1024 and batch_size=10 (bottom orange line) due to unstable numbers)
Most configurations’ throughput peaks at parallelism of 3 - 4, which matches our estimation of logical CPU isolation of approx. 2 CPUs per worker or 4 CPUs per parallelism level (2 workers - one reader, one writer) - we have 12 CPUs which is exactly 3 (parallelism) * 4 (CPUs/parallelism).
Based on a cutoff of parallelism=3-4 (meaning we only consider runs with smaller parallelism as not starving the system) we can observe steady latency levels for most cases with a sharp increase after the cutoff line. Intuitively, due to absence of any networking and all comms being local, we would expect latency to be close to 0 (i.e. sub-millisecond), which is the case for most of the configurations with low enough parallelism when resources starvation does not happen. Most of the non-zero latency reading can be explained by messages staying in internal buffers being backpressured while workers are starved for CPU by other processes.
As in the previous case, there is an obvious optimal batch_size for all message sizes, batch_size of 1000 seems to be excessive for all of the cases and yields only perf degradations.
Per-parallelism (per-CPU) throughput is significantly higher for local case (which is expected - no network overhead).
Conclusion
In this series of posts we have covered the internals of Volga’s streaming engine, specifically it’s networking stack. Volga demonstrated horizontal scalability with stable latency with perf numbers on par with existing streaming engines. Considering that Volga provides Python-native runtime environment for user’s code, these numbers are pretty solid: we have been able to scale it to million messages per second while maintaining milliseconds-scale latency on a 100+ CPU cluster.
The network stack’s development will continue with more perf/reliability improvements/test (Credit-Based Flow Control, Backpressure monitoring, more metrics, etc.).
For more general milestones which would unblock Volga’s release we aim to deliver such critical parts as Checkpointing, Persistent State Backend and Basic Data Connectors (Kafka and ScyllaDB), checkout roadmap:
Thanks for reading! Please star the project on GitHub, join the community on Slack, share the blog and leave your feedback.