As we’ve shared in other blogs in the past, getting a Retrieval Augmented Generation (RAG) application started is pretty straightforward. The problem comes when trying to scale it and making it production-ready. In this blog we will go into some technical and architectural details of how we do this at Neum AI, specifically on how we did this for a pipeline syncing 1 billion vectors.
First off, can you explain what RAG is to a 5 year old? - Thanks ChatGPT
RAG helps finding data quickly by performing search in a “natural way” and use that information/knowledge to power a more accurate AI application that needs such information!
This is what a typical RAG system looks like
- Data is extracted, processed, embedded and stored in a vector database for fast semantic search lookup
- User submits input, generate embeddings of input, searches across vector database to find most relevant information and passes that down as context to an AI application for accurate responses.
Now, let’s talk about the problem at hand - how to effectively ingest and synchronize billions of text embeddings to be used in a RAG workflow?
Problem
RAG is straightforward, but when dealing with lots of source data a couple of complex problems arise.
- Ingestion at large scale - It is vital to understand the volume of data, ingestion time requirement, search latency, cost, and more. How to efficiently parallelize, maximize compute, and process data is daunting.
- Embedding (transforming it into vector format for low-latency semantic search) - generating embeddings is not a problem except when you have large data and have to deal with rate limits, retry logic, self hosted models, and more. Not to mention, but syncing data becomes crucial here. Embeddings can be costly if not done efficiently. There will always be a one time cost of embedding all the data, but for an application that relies on new/changed data, embedding all of the source data can be very expensive, and so, there has to be a mechanism to detect whether or not data needs to be re-embedded.
In this specific case, our data pipeline is responsible for 3 operations.
a) Reading data
b) Processing data
c) Embedding data
d) Storing data in a vector database - in this case Weaviate!
Each of the points above have their own challenges.
- Reading data needs to be done efficiently and attempt to maximize parallelization to meet ingestion time requirements
- Once data is read, it needs to be processed, we can’t just dump everything to an embedding model. It needs to carefully be chunked depending on the source type, extract the relevant metadata fields, and clean any anomalies.
- Embedding data as mentioned on #1 needs to be done only if required and parallelized in terms of requests/compute as per the constraints of the system and external api limits if applicable.
- Storing in the vector database has its own limitations
What are the compute resources in the Vector Database?
Is it self hosted, managed? is there monitoring?
Is data sharded, what is the latency? What about compression?
Did you know that HNSW algorithm is pretty inefficient when trying to store identical vectors? (more on this later)
Could the ingestion into the database be the bottleneck of our system?
In addition to that, the system itself must have great monitoring, cancellation options, logging and alerting in place, all of the things you would expect from a robust distributed system.
For the rest of the blog we will explore solutions and share a bit into our architectural diagram for how we tested, benchmarked and ran the pipeline moving 1 billion vectors.
High-level architecture
Let’s talk about the high level architecture and break down each of the components.
As mentioned before, this distributed system has the responsibility of four main tasks, and each of them dance together in harmony.
In plain English, this is what’s happening when a user request comes in through our FastAPI service:
- Create a pipeline and store its metadata in our system, immediately return to the user to acknowledge their request.
- Send an event to our first processing queue - requests where workers will be dequeuing events from. This queue is responsible for taking in the request, figuring out the source type, in this specific case we were processing lots of files from an S3 bucket.
- For each of the file in the S3 bucket, send an event to process_document queue with the file name where other consumers will read messages from. These workers will read the file and start processing it.
- For each of the processed files we will split it into chunks (if the file is large) so that we can fit within memory and other resource constraints. Each of these chunks will be sent to embed_store - our final one queue, promise ;) - where other consumers will be dequeuing.
- Of course we have set up logs and monitoring in place for us to detect issues and be able to surface any important status messages/codes to the user upon requested. Additionally, we care about analytics and metrics such as average time taken, number of tasks, etc. and we display those as well upon request.
A note on distributed queueing in Python
While FastAPI has support for BackgroundTasks, we chose Celery to help us handle the abstractions between our Message Broker and our workers because this is a more intense-heavy operation which requires distributed logging, monitoring, and further parallelization. Because the work is distributed across multiple machines, having a message broker and an event-driven system is vital for the processing and monitoring of tasks.
Celery is a vegetable ;), and it’s also an asynchronous task queue written in Python. It provides great abstractions from dealing with message brokers, producers, and consumers in a distributed system. There’s a lot of inner things about Celery that we could spend time talking about but we will leave those for another post. For example, took us a couple of debugging sessions to understand that our consumers were picking up jobs even though our message broker was empty… and it’s because Celery’s prefetch_count.
Let’s go a bit in depth
Reading
As mentioned above, the first part of our system is the one in charge of determining the source type and distributing the files in this case for parallel reading. These tasks are sent to the process_document queue. Then, because the files might be large, we process each of them individually and sub divide it into chunks. These per-file chunks are then sent to the embed_store .
There’s a couple of important aspects here
- Because we process lots of data and files, our distributed queueing system built with Celery allows us to properly distribute tasks, monitor and retry them if needed. Along the way we have checkpointing mechanisms to understand what has been processed, which worker has picked up a task, what’s left, what succeeded and what failed.
- Second, we need to be smart about how we chunk, how we assign metadata, and more. We give the user the ability to select how they want to chunk and assign metadata to their source data, but we also have incorporated smart chunking mechanisms to properly split the data accordingly. The data for this pipeline is json-based and so chunking is based on property field, same with the metadata to be used in the vector database for each of the vectors.
Once we have finished distributing all the files and their respective subtasks we are ready for our final list of “heavy-consumers” to dequeue messages from our last queue depicted above and perform the embeddings and the vector db storing
Embeddings and Vector DB storing
Our final stage (which runs for every subtask mentioned above) is the one that will embed our chunks of data and store them into the vector database.
For the case study we are talking about here, we chose two main technologies to assist with this.
While we ended using Replicate, it’s important to note that we did start with OpenAI embeddings and their text-ada-002 model. This worked seamlessly and it took about 3-5 seconds to embed about 1500 different documents each with about 30 tokens each. Also, their cost was acceptable as well.
One thing to note is that storing vectors in a vector database has implications on a lot of things like latency for querying, storing, memory needed to manage it, and more. Because we are dealing with a large scale number of vectors, it was imperative to try and reduce the 1536 dimensions into a smaller dimensional model to avoid unnecessary memory storage and usage in the Weaviate cluster. Reducing the dimensions in half leads to huge $$ savings. While there are techniques to do Dimensionality Reduction algorithms, Neum also offers integration with Replicate where customers can choose their embeddings model of their choice to be hosted and we simply connect to it, which is what we did for this run. Replicate has great customer support and were able to handle this load seamlessly.
We need a powerful and efficient vector database capable of storing billions of vectors. Weaviate is a popular one that has great support and very technical capabilities, for those who are interested and know, Weaviate is also built using Cassandra’s architecture for sharding and replication. Having had experience with this in the past and it being open-source, it was a good choice as we needed a lot of deep customization and integration like being able to deploy on a kubernetes cluster and choose the number of nodes and shards, adjust the number of workers and ingestion batch size, and more. There’s tons of great documentation on Weaviate here.
The core here is to have a vector database that will be fast upon doing semantic search and also allowing parallelization of ingestion requests via multi-node cluster while offering logging and retry capabilities.
Storing in-depth
So, going back to the beginning of this section, we had our chunks that needed to be embedded, we used Replicate to do so, with Dead-Letter-Queue and retry mechanisms in place. After we got our embeddings, we used Weaviate to store the data with all the configurations mentioned above and more. Again, logging and handling errors accordingly.
To share some numbers, we ran benchmarks with different levels of parallelization from Neum’s end and Weaviate’s end as well as played with the number of CPUs in the Weaviate cluster. This is not a super extensive benchmark and was done at the beginning with OpenAI where the dimensions would be greater so as to plan for a “worst case” scenario. Also, there’s some other improvements we are in the process of trying out as well like using Weaviate’s GRPC client - which claims to have significantly faster ingestion times.
One key insight we had to pay attention to was on how to parallelize the writes to Weaviate.
So far we shared how we parallelized the requests, the files, the chunking and the embeddings, but when we get to the storing we have a choice of how much further to parallelize ingestion, specifically for Weaviate, as they have an option for users to specify num_workers when ingesting data, which essentially parallelizes the request on their end further.
Because we had a number of consumers dequeuing from embed_store (remember our queues and consumers ;) I know, lots of moving pieces, it isn’t trivial!) we were already parallelizing the ingestion requests to Weaviate, and so, we had to do benchmarks to understand the “magic number” of ingestion threads from our end and worker parallelization from Weaviate’s end.
The most important thing was to understand how many CPU cores does the Weaviate cluster have and how many parallel threads/number of workers are you actively using. We got hit initially by a number of “connection errors” and large ingestion times because we were over-parallelizing the requests. As long as you maximize but don’t go over your Weaviate cluster’s resources limits, you should be good.
There’s a lot of other learnings from the Weaviate side of things like Product Quantization, increased latency because identical vectors and how HNSW stores data, parallelization of Weaviate workers via sharding, and more. Let us know if you are interested in such an analysis and we’ll share some of those learnings in another post!
As a side note, Neum’s platform works great with Weaviate with its deep integration but we support other vector DBs as well if the user prefers it.
Conclusion
Building distributed data pipelines have lots of moving pieces, and now with the rise of Generative AI and RAG-based applications, things can get complicated very fast. We keep learning and keep delving ourselves into all these new ideas and technologies popping up to ensure we stay up to date with the latest trends. However, having a robust system with retry, logging, monitoring and ease-of-use remains top priorities for us when supporting large-scale data pipelines for embeddings.
There’s a bunch of moving pieces as you probably figured out. The beautiful thing about this is that all of this happens within a single API request to our Neum AI platform :). If you are interested and have large-scale data requirements, sign up here or contact us!
As mentioned, let us know if you are interested in going even more in depth to some of our Weaviate and embeddings learnings!