Skip to main content
RAG is one of the keystones for efficient data processing and search in the age of AI agents. The goal is to surface information from a knowledge base relevant to a specific user query and provide curated context to the LLM. This is a complex topic with many variants. We will focus on the fundamental building blocks that any RAG pipeline needs. The document processing pipeline:
  1. text extraction - process complex document formats (PDF, CSV, etc.)
  2. text splitting - create meaningful chunks out of long pages of text
  3. embedding - vectorize chunks (extract semantic meaning)
  4. store - insert chunks to a specialized database
Retrieval:
  1. embedding - vectorize user query
  2. search - retrieve the document chunks most similar to the user query

Building blocks

Let’s break down how each step can be implemented with the Agent Stack API, but first, make sure you have the Platform API extension enabled in your agent:
from typing import Annotated

from a2a.types import Message

from agentstack_sdk import server
from agentstack_sdk.a2a.extensions import (
    PlatformApiExtensionServer,
    PlatformApiExtensionSpec,
    EmbeddingServiceExtensionServer,
    EmbeddingServiceExtensionSpec,
)
from agentstack_sdk.server.context import RunContext

# Fileformats supported by the text-extraction service (docling)
default_input_modes = [
    "text/plain",
    "application/pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",  # DOCX
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",  # XLSX
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",  # PPTX
    "text/markdown",  # Markdown
    "text/asciidoc",  # AsciiDoc
    "text/html",  # HTML
    "application/xhtml+xml",  # XHTML
    "text/csv",  # CSV
    "image/png",  # PNG
    "image/jpeg",  # JPEG
    "image/tiff",  # TIFF
    "image/bmp",  # BMP
    "image/webp",  # WEBP
]


@server.agent(
    default_input_modes=default_input_modes, default_output_modes=["text/plain"]
)
async def rag_agent(
    input: Message,
    context: RunContext,
    embedding: Annotated[
        EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
    ],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
): ...  # Agent code

Agent Stack uses docling for extracting text out of documents in various supoported formats. To select which formats the agent can accept, use the default_input_modes parameter in the agent decorator. First, let’s build a set of functions to process the documents which we will then use in the agent.

Text Extraction

To extract text from a File uploaded to the Platform API, simply use file.create_extraction() and wait for the result. After extraction is completed, the extraction object will contain extracted_file_id, which is an ID of a new file containing the extracted text in Markdown.
from agentstack_sdk.platform import File, Extraction
import asyncio

async def extract_file(file: File):
    extraction = await file.create_extraction()
    while extraction.status in {"pending", "in_progress"}:
        await asyncio.sleep(1)
        extraction = await file.get_extraction()
    if extraction.status != "completed" or not extraction.extracted_file_id:
        raise ValueError(f"Extraction failed with status: {extraction.status}")

Text Splitting

In this example we will use MarkdownTextSplitter from the langchain-text-splitters package. This will split a long document into reasonably sized chunks based on the Markdown header structure.
from langchain_text_splitters import MarkdownTextSplitter

def chunk_markdown(markdown_text: str) -> list[str]:
    return MarkdownTextSplitter().split_text(markdown_text)

Embedding

Now we need to embed each chunk using the embedding service. Similarly to LLM, Agent Stack implements OpenAI-compatible embedding API. You can use any preferred client, in this example we will use the embedding extension to create an AsyncOpenAI client:
from openai import AsyncOpenAI
from agentstack_sdk.a2a.extensions import EmbeddingServiceExtensionServer

def get_embedding_client(
    embedding: EmbeddingServiceExtensionServer,
) -> tuple[AsyncOpenAI, str]:
    if not embedding:
        raise ValueError("Embedding extension not provided")

    embedding_config = embedding.data.embedding_fulfillments.get("default")
    embedding_client = AsyncOpenAI(
        api_key=embedding_config.api_key, base_url=embedding_config.api_base
    )
    embedding_model = embedding_config.api_model
    return embedding_client, embedding_model


Now we can use this client to embed our chunks and create vector store items:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStoreItem, File


async def embed_chunks(
    file: File, chunks: list[str], embedding_client: AsyncOpenAI, embedding_model: str
) -> list[VectorStoreItem]:
    vector_store_items = []
    embedding_result = await embedding_client.embeddings.create(
        input=chunks,
        model=embedding_model,
        encoding_format="float",
    )
    for i, embedding_data in enumerate(embedding_result.data):
        item = VectorStoreItem(
            document_id=file.id,
            document_type="platform_file",
            model_id=embedding_model,
            text=chunks[i],
            embedding=embedding_data.embedding,
            metadata={"chunk_index": str(i)},  # add arbitrary string metadata
        )
        vector_store_items.append(item)
    return vector_store_items

Store

Finally, to insert the prepared items, we need a function to create a vector store. For this we will need to know the dimension of the embeddings and model_id. Because the model is chosen by the embedding extension and we don’t know it in advance, we will create a test embedding request to calculate the dimension:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStore


async def create_vector_store(embedding_client: AsyncOpenAI, embedding_model: str):
    embedding_response = await embedding_client.embeddings.create(
        input="test", model=embedding_model
    )
    dimension = len(embedding_response.data[0].embedding)
    return await VectorStore.create(
        name="rag-example",
        dimension=dimension,
        model_id=embedding_model,
    )
We can then add the prepared items using vector_store.add_documents, this will become clear in the final example.

Query vector store

Assuming we have our knowledge base of documents prepared, we can now easily search the store according to the user query. The following function will retrieve five document chunks most similar to the query embedding:
from openai import AsyncOpenAI
from agentstack_sdk.platform import VectorStore, VectorStoreSearchResult

async def search_vector_store(
    vector_store: VectorStore,
    query: str,
    embedding_client: AsyncOpenAI,
    embedding_model: str,
) -> list[VectorStoreSearchResult]:
    embedding_response = await embedding_client.embeddings.create(
        input=query, model=embedding_model
    )
    query_vector = embedding_response.data[0].embedding
    return await vector_store.search(query_vector=query_vector, limit=5)

Putting all together

Having all the pieces in place, we can now build the agent.

Simple agent

This is a simplified agent that expects a message with one or more files attached as FilePart and a user query as TextPart. A new vector store is created for each message.
@server.agent(
    default_input_modes=default_input_modes,
    default_output_modes=["text/plain"],
)
async def rag_agent(
    input: Message,
    context: RunContext,
    embedding: Annotated[
        EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
    ],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
) -> AsyncGenerator[RunYield, None]:
    # Create embedding client
    embedding_client, embedding_model = get_embedding_client(embedding)

    # Extract files and query from input
    files = []
    query = ""
    for part in input.parts:
        match part.root:
            case FilePart(file=FileWithUri(uri=uri)):
                files.append(await File.get(PlatformFileUrl(uri).file_id))
            case TextPart(text=text):
                query = text
            case _:
                raise NotImplementedError(f"Unsupported part: {type(part.root)}")

    if not files or not query:
        raise ValueError("No files or query provided")

    # Create vector store
    vector_store = await create_vector_store(embedding_client, embedding_model)

    # Process files, add to vector store
    for file in files:
        await extract_file(file)
        async with file.load_text_content() as loaded_file:
            chunks = chunk_markdown(loaded_file.text)
        items = await embed_chunks(file, chunks, embedding_client, embedding_model)
        await vector_store.add_documents(items=items)

    # Search vector store
    results = await search_vector_store(
        vector_store, query, embedding_client, embedding_model
    )

    # TODO: You can add LLM result processing here

    snippet = [res.model_dump() for res in results]
    yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```"
Instead of simply returning the output of the vector store, you would typically plug this as a tool into your favorite agentic framework.

Conversational agent

Having a new vector store for each message is not really a good practice. Typically, you would want to search through all documents uploaded in the conversation. Below is a version of the agent which will reuse the vector store across messages so you can ask multiple queries and or additional documents later on.
@server.agent(
    default_input_modes=default_input_modes,
    default_output_modes=["text/plain"],
)
async def rag_agent(
    input: Message,
    context: RunContext,
    embedding: Annotated[
        EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
    ],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
) -> AsyncGenerator[RunYield, None]:
    # Create embedding client
    embedding_client, embedding_model = get_embedding_client(embedding)

    # Extract files and query from input
    files = []
    query = ""
    for part in input.parts:
        match part.root:
            case FilePart(file=FileWithUri(uri=uri)):
                files.append(await File.get(PlatformFileUrl(uri).file_id))
            case TextPart(text=text):
                query = text
            case _:
                raise NotImplementedError(f"Unsupported part: {type(part.root)}")

    # Check if vector store exists
    vector_store = None
    async for message in context.load_history():
        match message:
            case Message(parts=[Part(root=DataPart(data=data))]):
                vector_store = await VectorStore.get(data["vector_store_id"])

    # Create vector store if it does not exist
    if not vector_store:
        vector_store = await create_vector_store(embedding_client, embedding_model)
        # store vector store id in context for future messages
        data_part = DataPart(data={"vector_store_id": vector_store.id})
        await context.store(AgentMessage(parts=[data_part]))

    # Process files, add to vector store
    for file in files:
        await extract_file(file)
        async with file.load_text_content() as loaded_file:
            chunks = chunk_markdown(loaded_file.text)
        items = await embed_chunks(file, chunks, embedding_client, embedding_model)
        await vector_store.add_documents(items=items)

    # Search vector store
    if query:
        results = await search_vector_store(
            vector_store, query, embedding_client, embedding_model
        )
        snippet = [res.model_dump() for res in results]

        # TODO: You can add LLM result processing here

        yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```"
    elif files:
        yield f"{len(files)} file(s) processed"
    else:
        yield "Nothing to do"

Next steps

To further improve the agent, learn how to use other parts of the platform such as LLMs, file uploads and conversations: