> ## Documentation Index
> Fetch the complete documentation index at: https://agentstack.beeai.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Build RAG Pipelines

> Use vector stores, embedding, and text-extraction services to build RAG pipelines

Retrieval Augmented Generation (RAG) is one of the keystones for efficient data processing and search in the age of AI agents. The goal is to surface
information from a knowledge base relevant to a specific user query and provide curated context to the LLM. This is
a complex topic with many variants. We will focus on the fundamental building blocks that any RAG pipeline needs.

The document processing pipeline:

1. **Text extraction:** Process complex document formats (PDF, CSV, etc.).
2. **Text splitting:** Create meaningful chunks out of long pages of text.
3. **Embedding:** Vectorize chunks (extract semantic meaning).
4. **Store:** Insert chunks to a specialized database.

Retrieval:

1. **Embedding:** Vectorize the user query.
2. **Search:** Retrieve the document chunks most similar to the user query.

## Steps

<Steps>
  <Step title="Start Agent Stack with Docling enabled!!">
    Use `agentstack platform start --set docling.enabled=true` to start Agent Stack with Docling.
  </Step>

  <Step title="Enable file uploads in your agent">
    Add the `default_input_modes` parameter to your agent decorator to allow users to upload files to your agent. This also specifies which file types users can upload.
  </Step>

  <Step title="Import the Platform API and Embedding Service extensions">
    Import `PlatformApiExtensionServer`, `PlatformApiExtensionSpec`,
    `EmbeddingServiceExtensionServer` and `EmbeddingServiceExtensionSpec` from `agentstack_sdk.a2a.extensions`.
  </Step>

  <Step title="Inject the extensions">
    Add parameters to your agent function using the `Annotated` type hints.
  </Step>

  <Step title="Implement document processing functions">
    Implement functions to handle text extraction, text splitting, embedding generation, and vector storing.
  </Step>

  <Step title="Implement query functions">
    Implement functions to generate embeddings for the user query and search the vector store for similar document chunks.
  </Step>

  <Step title="Put it all together">
    Put it all together in an agent that 1) processes uploaded documents and 2) answers questions about them.
  </Step>

  <Step title="Use an LLM to help form a response">
    The examples here skip this part and return details from the document.
    In practice, you'll want to use an LLM to help form a response about the selected document chunks
    instead of returning the actual chunk text (i.e., implement an assistant and not just a search tool).
  </Step>
</Steps>

Let's break down how each step can be implemented with the Agent Stack API.

## Building blocks

### Enable File Uploads

Add the `default_input_modes` parameter to your agent decorator to allow users to upload files to your agent.
This also specifies which file types users can upload.
Agent Stack uses [docling](https://docling-project.github.io/docling/) for extracting text out of documents in various
[supported formats](https://docling-project.github.io/docling/usage/supported_formats/).

```python  theme={null}
from agentstack_sdk.server import Server

# File formats supported by the text-extraction service (docling)
default_input_modes = [
    "text/plain",
    "application/pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",  # DOCX
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",  # XLSX
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",  # PPTX
    "text/markdown",  # Markdown
    "text/asciidoc",  # AsciiDoc
    "text/html",  # HTML
    "application/xhtml+xml",  # XHTML
    "text/csv",  # CSV
    "image/png",  # PNG
    "image/jpeg",  # JPEG
    "image/tiff",  # TIFF
    "image/bmp",  # BMP
    "image/webp",  # WEBP
]

server = Server()

@server.agent(
    default_input_modes=default_input_modes, default_output_modes=["text/plain"]
)
async def rag_agent(
    # Parameter details below...
):
    """Agent code to follow... """

```

### Platform and Embedding Extensions

Make sure you have the Platform API and Embedding Service extensions imported and injected in your agent parameters:

```python  theme={null}
import json
from typing import Annotated

from a2a.types import Message, Part, DataPart, FilePart, TextPart, FileWithUri

from agentstack_sdk.server import Server
from agentstack_sdk.a2a.extensions import (
    PlatformApiExtensionServer,
    PlatformApiExtensionSpec,
    EmbeddingServiceExtensionServer,
    EmbeddingServiceExtensionSpec,
)
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.server.context import RunContext
from agentstack_sdk.util.file import PlatformFileUrl

# File formats supported by the text-extraction service (docling)
default_input_modes = [
    "text/plain",
    "application/pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",  # DOCX
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",  # XLSX
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",  # PPTX
    "text/markdown",  # Markdown
    "text/asciidoc",  # AsciiDoc
    "text/html",  # HTML
    "application/xhtml+xml",  # XHTML
    "text/csv",  # CSV
    "image/png",  # PNG
    "image/jpeg",  # JPEG
    "image/tiff",  # TIFF
    "image/bmp",  # BMP
    "image/webp",  # WEBP
]

server = Server()

@server.agent(
    default_input_modes=default_input_modes, default_output_modes=["text/plain"]
)
async def rag_agent(
    input: Message,
    context: RunContext,
    embedding: Annotated[
        EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()
    ],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
):
    """Agent code to follow... """
```

Next, let's build a set of functions to process the documents which we will then use in the agent.

### Text Extraction

To extract text from a `File` uploaded to the Platform API, simply use `file.create_extraction()` and wait for
the result. After extraction is completed, the `extraction` object will contain
`extracted_files`, which is a list of extracted files in different formats.

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

import asyncio

from agentstack_sdk.platform import File


async def extract_file(file: File):
    extraction = await file.create_extraction()
    while extraction.status in {"pending", "in_progress"}:
        await asyncio.sleep(1)
        extraction = await file.get_extraction()
    if extraction.status != "completed":
        raise ValueError(f"Extraction failed with status: {extraction.status}")

```

#### Extraction Formats

Text extraction produces two extraction formats and you can request either subset by passing `formats` to `create_extraction` (e.g., `["markdown"]` if you only need plain text):

* **markdown**: The extracted text formatted as Markdown (`file.load_text_content()`)
* **vendor\_specific\_json**: The Docling-specific JSON format containing document structure (`file.load_json_content()`)

> **WARNING**:
> The `vendor_specific_json` format is not generated for plain text or markdown files, as Docling does not support these formats as input.

### Text Splitting

In this example we will use `MarkdownTextSplitter` from the
[langchain-text-splitters](https://reference.langchain.com/python/langchain_text_splitters/) package.
This will split a long document into reasonably sized chunks based on the Markdown header structure.

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from langchain_text_splitters import MarkdownTextSplitter


def chunk_markdown(markdown_text: str) -> list[str]:
    return MarkdownTextSplitter().split_text(markdown_text)

```

### Embedding

Now we need to generate embeddings for each chunk using the embedding service. Similarly to LLM, Agent Stack implements
OpenAI-compatible embedding API. You can use any preferred client, in this example we will use the embedding extension
to create an `AsyncOpenAI` client:

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from agentstack_sdk.a2a.extensions import EmbeddingServiceExtensionServer
from openai import AsyncOpenAI


def get_embedding_client(
    embedding: EmbeddingServiceExtensionServer,
) -> tuple[AsyncOpenAI, str]:
    if not embedding or not embedding.data:
        raise ValueError("Embedding extension not provided")

    embedding_config = embedding.data.embedding_fulfillments.get("default")
    if not embedding_config:
        raise ValueError("Default embedding configuration not found")

    embedding_client = AsyncOpenAI(api_key=embedding_config.api_key, base_url=embedding_config.api_base)
    embedding_model = embedding_config.api_model
    return embedding_client, embedding_model

```

Now we can use this client to generate embeddings for our chunks and create vector store items:

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from agentstack_sdk.platform import File, VectorStoreItem
from openai import AsyncOpenAI


async def embed_chunks(
    file: File, chunks: list[str], embedding_client: AsyncOpenAI, embedding_model: str
) -> list[VectorStoreItem]:
    vector_store_items = []
    embedding_result = await embedding_client.embeddings.create(
        input=chunks,
        model=embedding_model,
        encoding_format="float",
    )
    for i, embedding_data in enumerate(embedding_result.data):
        item = VectorStoreItem(
            document_id=file.id,
            document_type="platform_file",
            model_id=embedding_model,
            text=chunks[i],
            embedding=embedding_data.embedding,
            metadata={"chunk_index": str(i)},  # add arbitrary string metadata
        )
        vector_store_items.append(item)
    return vector_store_items

```

### Store

Finally, to insert the prepared items, we need a function to create a vector store. For this we will need to know
the dimension of the embeddings and model\_id. Because the model is chosen by the embedding extension and we don't know
it in advance, we will create a test embedding request to calculate the dimension:

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from agentstack_sdk.platform import VectorStore
from openai import AsyncOpenAI


async def create_vector_store(embedding_client: AsyncOpenAI, embedding_model: str):
    embedding_response = await embedding_client.embeddings.create(input="test", model=embedding_model)
    dimension = len(embedding_response.data[0].embedding)
    return await VectorStore.create(
        name="rag-example",
        dimension=dimension,
        model_id=embedding_model,
    )

```

We can then add the prepared items using `vector_store.add_documents`, this will become clear in the final example.

### Query vector store

Assuming we have our knowledge base of documents prepared, we can now easily search the store according to the user
query. The following function will retrieve five document chunks most similar to the query embedding:

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from agentstack_sdk.platform import VectorStore, VectorStoreSearchResult
from openai import AsyncOpenAI


async def search_vector_store(
    vector_store: VectorStore,
    query: str,
    embedding_client: AsyncOpenAI,
    embedding_model: str,
) -> list[VectorStoreSearchResult]:
    embedding_response = await embedding_client.embeddings.create(input=query, model=embedding_model)
    query_vector = embedding_response.data[0].embedding
    return await vector_store.search(query_vector=query_vector, limit=5)

```

## Putting all together

Having all the pieces in place, we can now build the agent.

### Simple agent

This is a simplified agent that expects a message with one or more files attached as `FilePart` and a
user query as `TextPart`. A new vector store is created for each message.

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

import json
import os
from collections.abc import AsyncGenerator
from typing import Annotated

from a2a.types import FilePart, FileWithUri, Message, TextPart
from agentstack_sdk.a2a.extensions import (
    EmbeddingServiceExtensionServer,
    EmbeddingServiceExtensionSpec,
    PlatformApiExtensionServer,
    PlatformApiExtensionSpec,
)
from agentstack_sdk.a2a.types import RunYield
from agentstack_sdk.platform import File, PlatformFileUrl
from agentstack_sdk.server import Server

from .embedding.client import get_embedding_client
from .embedding.embed import embed_chunks
from .extraction import extract_file
from .text_splitting import chunk_markdown
from .vector_store.create import create_vector_store
from .vector_store.search import search_vector_store

# File formats supported by the text-extraction service (docling)
default_input_modes = [
    "text/plain",
    "application/pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",  # DOCX
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",  # XLSX
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",  # PPTX
    "text/markdown",  # Markdown
    "text/asciidoc",  # AsciiDoc
    "text/html",  # HTML
    "application/xhtml+xml",  # XHTML
    "text/csv",  # CSV
    "image/png",  # PNG
    "image/jpeg",  # JPEG
    "image/tiff",  # TIFF
    "image/bmp",  # BMP
    "image/webp",  # WEBP
]

server = Server()


@server.agent(default_input_modes=default_input_modes, default_output_modes=["text/plain"])
async def simple_rag_agent_example(
    input: Message,
    embedding: Annotated[EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
) -> AsyncGenerator[RunYield, None]:
    # Create embedding client
    embedding_client, embedding_model = get_embedding_client(embedding)

    # Extract files and query from input
    files: list[File] = []
    query = ""
    for part in input.parts:
        match part.root:
            case FilePart(file=FileWithUri(uri=uri)):
                files.append(await File.get(PlatformFileUrl(uri).file_id))
            case TextPart(text=text):
                query = text
            case _:
                raise NotImplementedError(f"Unsupported part: {type(part.root)}")

    if not files or not query:
        raise ValueError("No files or query provided")

    # Create vector store
    vector_store = await create_vector_store(embedding_client, embedding_model)

    # Process files, add to vector store
    for file in files:
        await extract_file(file)
        async with file.load_text_content() as loaded_file:
            chunks = chunk_markdown(loaded_file.text)
        items = await embed_chunks(file, chunks, embedding_client, embedding_model)
        await vector_store.add_documents(items=items)

    # Search vector store
    results = await search_vector_store(vector_store, query, embedding_client, embedding_model)

    # TODO: You can add LLM result processing here

    snippet = [res.model_dump() for res in results]
    yield f"# Results:\n{json.dumps(snippet, indent=2)}"


def run():
    server.run(host=os.getenv("HOST", "127.0.0.1"), port=int(os.getenv("PORT", 8000)))


if __name__ == "__main__":
    run()

```

Instead of simply returning the output of the vector store, you would typically plug this as a tool into your favorite
agentic framework.

### Conversational agent

Having a new vector store for each message is not really a good practice. Typically, you would want to search through
all documents uploaded in the conversation. Below is a version of the agent which will reuse the vector store across
messages so you can ask multiple queries and or additional documents later on.

```python  theme={null}
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0


import json
import os
from typing import Annotated

from a2a.types import DataPart, FilePart, FileWithUri, Message, Part, TextPart
from agentstack_sdk.a2a.extensions import (
    EmbeddingServiceExtensionServer,
    EmbeddingServiceExtensionSpec,
    PlatformApiExtensionServer,
    PlatformApiExtensionSpec,
)
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.platform import File, PlatformFileUrl, VectorStore
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext

from .embedding.client import get_embedding_client
from .embedding.embed import embed_chunks
from .extraction import extract_file
from .text_splitting import chunk_markdown
from .vector_store.create import create_vector_store
from .vector_store.search import search_vector_store

# File formats supported by the text-extraction service (docling)
default_input_modes = [
    "text/plain",
    "application/pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",  # DOCX
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",  # XLSX
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",  # PPTX
    "text/markdown",  # Markdown
    "text/asciidoc",  # AsciiDoc
    "text/html",  # HTML
    "application/xhtml+xml",  # XHTML
    "text/csv",  # CSV
    "image/png",  # PNG
    "image/jpeg",  # JPEG
    "image/tiff",  # TIFF
    "image/bmp",  # BMP
    "image/webp",  # WEBP
]


server = Server()


@server.agent(
    default_input_modes=default_input_modes,
    default_output_modes=["text/plain"],
)
async def conversation_rag_agent_example(
    input: Message,
    context: RunContext,
    embedding: Annotated[EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand()],
    _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()],
):
    # Create embedding client
    embedding_client, embedding_model = get_embedding_client(embedding)

    # Extract files and query from input
    files: list[File] = []
    query = ""
    for part in input.parts:
        match part.root:
            case FilePart(file=FileWithUri(uri=uri)):
                files.append(await File.get(PlatformFileUrl(uri).file_id))
            case TextPart(text=text):
                query = text
            case _:
                raise NotImplementedError(f"Unsupported part: {type(part.root)}")

    # Check if vector store exists
    vector_store = None
    async for message in context.load_history():
        match message:
            case Message(parts=[Part(root=DataPart(data=data))]):
                vector_store = await VectorStore.get(data["vector_store_id"])

    # Create vector store if it does not exist
    if not vector_store:
        vector_store = await create_vector_store(embedding_client, embedding_model)
        # store vector store id in context for future messages
        data_part = DataPart(data={"vector_store_id": vector_store.id})
        await context.store(AgentMessage(parts=[data_part]))

    # Process files, add to vector store
    for file in files:
        await extract_file(file)
        async with file.load_text_content() as loaded_file:
            chunks = chunk_markdown(loaded_file.text)
        items = await embed_chunks(file, chunks, embedding_client, embedding_model)
        await vector_store.add_documents(items=items)

    # Search vector store
    if query:
        results = await search_vector_store(vector_store, query, embedding_client, embedding_model)
        snippet = [res.model_dump() for res in results]

        # TODO: You can add LLM result processing here

        yield f"# Results:\n{json.dumps(snippet, indent=2)}"
    elif files:
        yield f"{len(files)} file(s) processed"
    else:
        yield "Nothing to do"


def run():
    server.run(host=os.getenv("HOST", "127.0.0.1"), port=int(os.getenv("PORT", 8000)))


if __name__ == "__main__":
    run()

```

### Next steps

To further improve the agent, learn how to use other parts of the platform such as LLMs,
file uploads and conversations:

* [LLM extension](./llm-proxy-service)
* [Multi-turn conversations](./multi-turn)
* [File handling](./files)
