# Using Postgres as memory

This notebook shows how to use Postgres as a memory store in Semantic Kernel.

The code below pulls the most recent papers from [ArviX](https://arxiv.org/), creates embeddings from the paper abstracts, and stores them in a Postgres database.

In the future, we can use the Postgres vector store to search the database for similar papers based on the embeddings - stay tuned!

In [1]:
import textwrap
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime
from typing import Annotated, Any

import numpy as np
import requests

from semantic_kernel import Kernel
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import (
 AzureChatCompletion,
 AzureChatPromptExecutionSettings,
 AzureTextEmbedding,
 OpenAIEmbeddingPromptExecutionSettings,
 OpenAITextEmbedding,
)
from semantic_kernel.connectors.memory.postgres import PostgresCollection
from semantic_kernel.contents import ChatHistory
from semantic_kernel.data import (
 DistanceFunction,
 IndexKind,
 VectorSearchOptions,
 VectorStoreRecordDataField,
 VectorStoreRecordKeyField,
 VectorStoreRecordUtils,
 VectorStoreRecordVectorField,
 VectorStoreTextSearch,
 vectorstoremodel,
)
from semantic_kernel.functions import KernelParameterMetadata
from semantic_kernel.functions.kernel_arguments import KernelArguments

## Set up your environment

You'll need to set up your environment to provide connection information to Postgres, as well as OpenAI or Azure OpenAI.

To do this, copy the `.env.example` file to `.env` and fill in the necessary information.

__Note__: If you're using VSCode to execute the notebook, the settings in `.env` in the root of the repository will be picked up automatically.

### Postgres configuration

You'll need to provide a connection string to a Postgres database. You can use a local Postgres instance, or a cloud-hosted one.
You can provide a connection string, or provide environment variables with the connection information. See the .env.example file for `POSTGRES_` settings.

#### Using Docker

You can also use docker to bring up a Postgres instance by following the steps below:

Create an `init.sql` that has the following:

```sql
CREATE EXTENSION IF NOT EXISTS vector;
```

Now you can start a postgres instance with the following:

```
docker pull pgvector/pgvector:pg16
docker run --rm -it --name pgvector -p 5432:5432 -v ./init.sql:/docker-entrypoint-initdb.d/init.sql -e POSTGRES_PASSWORD=example pgvector/pgvector:pg16
```

_Note_: Use `.\init.sql` on Windows and `./init.sql` on WSL or Linux/Mac.

Then you could use the connection string:

```
POSTGRES_CONNECTION_STRING="host=localhost port=5432 dbname=postgres user=postgres password=example"
```

### OpenAI configuration

You can either use OpenAI or Azure OpenAI APIs. You provide the API key and other configuration in the `.env` file. Set either the `OPENAI_` or `AZURE_OPENAI_` settings.


In [2]:
# Path to the environment file
env_file_path = ".env"

Here we set some additional configuration.

In [3]:
# -- ArXiv settings --

# The search term to use when searching for papers on arXiv. All metadata fields for the papers are searched.
SEARCH_TERM = "RAG"

# The category of papers to search for on arXiv. See https://arxiv.org/category_taxonomy for a list of categories.
ARVIX_CATEGORY = "cs.AI"

# The maximum number of papers to search for on arXiv.
MAX_RESULTS = 300

# -- OpenAI settings --

# Set this flag to False to use the OpenAI API instead of Azure OpenAI
USE_AZURE_OPENAI = True

Here we define a vector store model. This model defines the table and column names for storing the embeddings. We use the `@vectorstoremodel` decorator to tell Semantic Kernel to create a vector store definition from the model. The VectorStoreRecordField annotations define the fields that will be stored in the database, including key and vector fields.

In [4]:
@vectorstoremodel
@dataclass
class ArxivPaper:
 id: Annotated[str, VectorStoreRecordKeyField()]
 title: Annotated[str, VectorStoreRecordDataField()]
 abstract: Annotated[str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="abstract_vector")]
 published: Annotated[datetime, VectorStoreRecordDataField()]
 authors: Annotated[list[str], VectorStoreRecordDataField()]
 link: Annotated[str | None, VectorStoreRecordDataField()]

 abstract_vector: Annotated[
 np.ndarray | None,
 VectorStoreRecordVectorField(
 embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
 index_kind=IndexKind.HNSW,
 dimensions=1536,
 distance_function=DistanceFunction.COSINE_DISTANCE,
 property_type="float",
 serialize_function=np.ndarray.tolist,
 deserialize_function=np.array,
 ),
 ] = None

 @classmethod
 def from_arxiv_info(cls, arxiv_info: dict[str, Any]) -> "ArxivPaper":
 return cls(
 id=arxiv_info["id"],
 title=arxiv_info["title"].replace("\n ", " "),
 abstract=arxiv_info["abstract"].replace("\n ", " "),
 published=arxiv_info["published"],
 authors=arxiv_info["authors"],
 link=arxiv_info["link"],
 )

Below is a function that queries the ArviX API for the most recent papers based on our search query and category.

In [5]:
def query_arxiv(search_query: str, category: str = "cs.AI", max_results: int = 10) -> list[dict[str, Any]]:
 """
 Query the ArXiv API and return a list of dictionaries with relevant metadata for each paper.

 Args:
 search_query: The search term or topic to query for.
 category: The category to restrict the search to (default is "cs.AI").
 See https://arxiv.org/category_taxonomy for a list of categories.
 max_results: Maximum number of results to retrieve (default is 10).
 """
 response = requests.get(
 "http://export.arxiv.org/api/query?"
 f"search_query=all:%22{search_query.replace(' ', '+')}%22"
 f"+AND+cat:{category}&start=0&max_results={max_results}&sortBy=lastUpdatedDate&sortOrder=descending"
 )

 root = ET.fromstring(response.content)
 ns = {"atom": "http://www.w3.org/2005/Atom"}

 return [
 {
 "id": entry.find("atom:id", ns).text.split("/")[-1],
 "title": entry.find("atom:title", ns).text,
 "abstract": entry.find("atom:summary", ns).text,
 "published": entry.find("atom:published", ns).text,
 "link": entry.find("atom:id", ns).text,
 "authors": [author.find("atom:name", ns).text for author in entry.findall("atom:author", ns)],
 "categories": [category.get("term") for category in entry.findall("atom:category", ns)],
 "pdf_link": next(
 (link_tag.get("href") for link_tag in entry.findall("atom:link", ns) if link_tag.get("title") == "pdf"),
 None,
 ),
 }
 for entry in root.findall("atom:entry", ns)
 ]

We use this function to query papers and store them in memory as our model types.

In [6]:
arxiv_papers: list[ArxivPaper] = [
 ArxivPaper.from_arxiv_info(paper)
 for paper in query_arxiv(SEARCH_TERM, category=ARVIX_CATEGORY, max_results=MAX_RESULTS)
]

print(f"Found {len(arxiv_papers)} papers on '{SEARCH_TERM}'")

Found 300 papers on 'RAG'


Create a `PostgresCollection`, which represents the table in Postgres where we will store the paper information and embeddings.

In [7]:
collection = PostgresCollection[str, ArxivPaper](
 collection_name="arxiv_records", data_model_type=ArxivPaper, env_file_path=env_file_path
)

Create a Kernel and add the TextEmbedding service, which will be used to generate embeddings of the abstract for each paper.

In [8]:
kernel = Kernel()
if USE_AZURE_OPENAI:
 text_embedding = AzureTextEmbedding(service_id="embedding", env_file_path=env_file_path)
else:
 text_embedding = OpenAITextEmbedding(service_id="embedding", env_file_path=env_file_path)

kernel.add_service(text_embedding)

Here we use VectorStoreRecordUtils to add embeddings to our models.

In [9]:
records = await VectorStoreRecordUtils(kernel).add_vector_to_records(arxiv_papers, data_model_type=ArxivPaper)

Now that the models have embeddings, we can write them into the Postgres database.

In [10]:
async with collection:
 await collection.create_collection_if_not_exists()
 keys = await collection.upsert_batch(records)

Here we retrieve the first few models from the database and print out their information.

In [None]:
async with collection:
 results = await collection.get_batch(keys[:3])
 if results:
 for result in results:
 print(f"# {result.title}")
 print()
 wrapped_abstract = textwrap.fill(result.abstract, width=80)
 print(f"Abstract: {wrapped_abstract}")
 print(f"Published: {result.published}")
 print(f"Link: {result.link}")
 print(f"PDF Link: {result.link}")
 print(f"Authors: {', '.join(result.authors)}")
 print(f"Embedding: {result.abstract_vector}")
 print()
 print()

# Engineering LLM Powered Multi-agent Framework for Autonomous CloudOps

Abstract: Cloud Operations (CloudOps) is a rapidly growing field focused on the
automated management and optimization of cloud infrastructure which is essential
for organizations navigating increasingly complex cloud environments. MontyCloud
Inc. is one of the major companies in the CloudOps domain that leverages
autonomous bots to manage cloud compliance, security, and continuous operations.
To make the platform more accessible and effective to the customers, we
leveraged the use of GenAI. Developing a GenAI-based solution for autonomous
CloudOps for the existing MontyCloud system presented us with various challenges
such as i) diverse data sources; ii) orchestration of multiple processes; and
iii) handling complex workflows to automate routine tasks. To this end, we
developed MOYA, a multi-agent framework that leverages GenAI and balances
autonomy with the necessary human control. This framework integrates vario

Now we can search for documents with `VectorStoreTextSearch`, which uses the embedding service to vectorize a query and search for semantically similar documents:

In [12]:
text_search = VectorStoreTextSearch[ArxivPaper].from_vectorized_search(collection, embedding_service=text_embedding)

The `VectorStoreTextSearch` object gives us the ability to retrieve semantically similar documents directly from a prompt.
Here we search for the top 5 ArXiV abstracts in our database similar to the query about chunking strategies in RAG applications:

In [None]:
query = "What are good chunking strategies to use for unstructured text in Retrieval-Augmented Generation applications?"

async with collection:
 search_results = await text_search.get_search_results(
 query, options=VectorSearchOptions(top=5, include_total_count=True)
 )
 print(f"Found {search_results.total_count} results for query.")
 async for search_result in search_results.results:
 title = search_result.record.title
 score = search_result.score
 print(f"{title}: {score}")

Found 5 results for query.
Advanced ingestion process powered by LLM parsing for RAG system: 0.38676463602221456
StructRAG: Boosting Knowledge Intensive Reasoning of LLMs via Inference-time Hybrid Information Structurization: 0.39733734194342085
UDA: A Benchmark Suite for Retrieval Augmented Generation in Real-world Document Analysis: 0.3981809737466562
R^2AG: Incorporating Retrieval Information into Retrieval Augmented Generation: 0.4134050114864055
Enhancing Retrieval-Augmented Generation: A Study of Best Practices: 0.4144733752075731


We can enable chat completion to utilize the text search by creating a kernel function for searching the database...

In [14]:
plugin = kernel.add_functions(
 plugin_name="arxiv_plugin",
 functions=[
 text_search.create_search(
 # The default parameters match the parameters of the VectorSearchOptions class.
 description="Searches for ArXiv papers that are related to the query.",
 parameters=[
 KernelParameterMetadata(
 name="query", description="What to search for.", type="str", is_required=True, type_object=str
 ),
 KernelParameterMetadata(
 name="top",
 description="Number of results to return.",
 type="int",
 default_value=2,
 type_object=int,
 ),
 ],
 ),
 ],
)

...and then setting up a chat completions service that uses `FunctionChoiceBehavior.Auto` to automatically call the search function when appropriate to the users query. We also create the chat function that will be invoked by the kernel.

In [15]:
# Create the chat completion service. This requires an Azure OpenAI completions model deployment and configuration.
chat_completion = AzureChatCompletion(service_id="completions")
kernel.add_service(chat_completion)

# Now we create the chat function that will use the chat service.
chat_function = kernel.add_function(
 prompt="{{$chat_history}}{{$user_input}}",
 plugin_name="ChatBot",
 function_name="Chat",
)

# we set the function choice to Auto, so that the LLM can choose the correct function to call.
# and we exclude the ChatBot plugin, so that it does not call itself.
execution_settings = AzureChatPromptExecutionSettings(
 function_choice_behavior=FunctionChoiceBehavior.Auto(filters={"excluded_plugins": ["ChatBot"]}),
 service_id="chat",
 max_tokens=7000,
 temperature=0.7,
 top_p=0.8,
)

Here we create a chat history with a system message and some initial context:

In [16]:
history = ChatHistory()
system_message = """
You are a chat bot. Your name is Archie and
you have one goal: help people find answers
to technical questions by relying on the latest
research papers published on ArXiv.
You communicate effectively in the style of a helpful librarian. 
You always make sure to include the
ArXiV paper references in your responses.
If you cannot find the answer in the papers,
you will let the user know, but also provide the papers
you did find to be most relevant. If the abstract of the 
paper does not specifically reference the user's inquiry,
but you believe it might be relevant, you can still include it
BUT you must make sure to mention that the paper might not directly
address the user's inquiry. Make certain that the papers you link are
from a specific search result.
"""
history.add_system_message(system_message)
history.add_user_message("Hi there, who are you?")
history.add_assistant_message(
 "I am Archie, the ArXiV chat bot. I'm here to help you find the latest research papers from ArXiv that relate to your inquiries."
)

We can now invoke the chat function via the Kernel to get chat completions:

In [17]:
arguments = KernelArguments(
 user_input=query,
 chat_history=history,
 settings=execution_settings,
)

result = await kernel.invoke(chat_function, arguments=arguments)

Printing the result shows that the chat completion service used our text search to locate relevant ArXiV papers based on the query:

In [18]:
def wrap_text(text, width=90):
 paragraphs = text.split("\n\n") # Split the text into paragraphs
 wrapped_paragraphs = [
 "\n".join(textwrap.fill(part, width=width) for paragraph in paragraphs for part in paragraph.split("\n"))
 ] # Wrap each paragraph, split by newlines
 return "\n\n".join(wrapped_paragraphs) # Join the wrapped paragraphs back together


print(f"Archie:>\n{wrap_text(str(result))}")

Archie:>
What an excellent and timely question! Chunking strategies for unstructured text are
critical for optimizing Retrieval-Augmented Generation (RAG) systems since they
significantly affect how effectively a RAG model can retrieve and generate contextually
relevant information. Let me consult the latest papers on this topic from ArXiv and
provide you with relevant insights.
---
Here are some recent papers that dive into chunking strategies or similar concepts for
retrieval-augmented frameworks:
1. **"Post-training optimization of retrieval-augmented generation models"**
 *Authors*: Vibhor Agarwal et al.
 *Abstract*: While the paper discusses optimization strategies for retrieval-augmented
generation models, there is a discussion on handling unstructured text that could apply to
chunking methodologies. Chunking isn't always explicitly mentioned as "chunking" but may
be referred to in contexts like splitting data for retrieval.
 *ArXiv link*: [arXiv:2308.10701](https://arxiv.org/abs