From 23df7db896146fe7b04266da7d33507c45ef9fed Mon Sep 17 00:00:00 2001 From: shrinitgoyal Date: Thu, 28 Nov 2024 13:31:27 +0530 Subject: [PATCH 1/5] Added support for mongoDB KV store --- llama_stack/providers/utils/kvstore/config.py | 36 ++++++-- .../providers/utils/kvstore/kvstore.py | 6 +- .../utils/kvstore/mongodb/__init__.py | 7 ++ .../utils/kvstore/mongodb/mongodb.py | 87 +++++++++++++++++++ 4 files changed, 129 insertions(+), 7 deletions(-) create mode 100644 llama_stack/providers/utils/kvstore/mongodb/__init__.py create mode 100644 llama_stack/providers/utils/kvstore/mongodb/mongodb.py diff --git a/llama_stack/providers/utils/kvstore/config.py b/llama_stack/providers/utils/kvstore/config.py index ed400efaea..867ae3f980 100644 --- a/llama_stack/providers/utils/kvstore/config.py +++ b/llama_stack/providers/utils/kvstore/config.py @@ -17,7 +17,8 @@ class KVStoreType(Enum): redis = "redis" sqlite = "sqlite" - postgres = "postgres" + postgres = "postgres", + mongodb = "mongodb" class CommonConfig(BaseModel): @@ -55,15 +56,15 @@ class SqliteKVStoreConfig(CommonConfig): @classmethod def sample_run_config( - cls, __distro_dir__: str = "runtime", db_name: str = "kvstore.db" + cls, __distro_dir__: str = "runtime", db_name: str = "kvstore.db" ): return { "type": "sqlite", "namespace": None, "db_path": "${env.SQLITE_STORE_DIR:~/.llama/" - + __distro_dir__ - + "}/" - + db_name, + + __distro_dir__ + + "}/" + + db_name, } @@ -106,7 +107,30 @@ def validate_table_name(cls, v: str) -> str: return v +class MongoDBKVStoreConfig(CommonConfig): + type: Literal[KVStoreType.mongodb.value] = KVStoreType.mongodb.value + host: str = "localhost" + port: int = 5432 + db: str = "llamastack" + user: str = None + password: Optional[str] = None + collection_name: str = "llamastack_kvstore" + + @classmethod + def sample_run_config(cls, collection_name: str = "llamastack_kvstore"): + return { + "type": "mongodb", + "namespace": None, + "host": "${env.MONGODB_HOST:localhost}", + "port": "${env.MONGODB_PORT:5432}", + "db": "${env.MONGODB_DB}", + "user": "${env.MONGODB_USER}", + "password": "${env.MONGODB_PASSWORD}", + "table_name": "${env.MONGODB_COLLECTION_NAME:" + collection_name + "}", + } + + KVStoreConfig = Annotated[ - Union[RedisKVStoreConfig, SqliteKVStoreConfig, PostgresKVStoreConfig], + Union[RedisKVStoreConfig, SqliteKVStoreConfig, PostgresKVStoreConfig, MongoDBKVStoreConfig], Field(discriminator="type", default=KVStoreType.sqlite.value), ] diff --git a/llama_stack/providers/utils/kvstore/kvstore.py b/llama_stack/providers/utils/kvstore/kvstore.py index 469f400d03..deab906020 100644 --- a/llama_stack/providers/utils/kvstore/kvstore.py +++ b/llama_stack/providers/utils/kvstore/kvstore.py @@ -9,7 +9,7 @@ def kvstore_dependencies(): - return ["aiosqlite", "psycopg2-binary", "redis"] + return ["aiosqlite", "psycopg2-binary", "redis", "pymongo"] class InmemoryKVStoreImpl(KVStore): @@ -46,6 +46,10 @@ async def kvstore_impl(config: KVStoreConfig) -> KVStore: from .postgres import PostgresKVStoreImpl impl = PostgresKVStoreImpl(config) + elif config.type == KVStoreType.mongodb.value: + from .mongodb import MongoDBKVStoreImpl + + impl = MongoDBKVStoreImpl(config) else: raise ValueError(f"Unknown kvstore type {config.type}") diff --git a/llama_stack/providers/utils/kvstore/mongodb/__init__.py b/llama_stack/providers/utils/kvstore/mongodb/__init__.py new file mode 100644 index 0000000000..4f7fe46e76 --- /dev/null +++ b/llama_stack/providers/utils/kvstore/mongodb/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .mongodb import MongoDBKVStoreImpl diff --git a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py new file mode 100644 index 0000000000..f749f26d0e --- /dev/null +++ b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py @@ -0,0 +1,87 @@ +import datetime +import logging +from datetime import datetime +from typing import Optional, List + +from pymongo import MongoClient +from pymongo.errors import ConfigurationError + +from llama_stack.providers.utils.kvstore import KVStore, MongoDBKVStoreConfig + +log = logging.getLogger(__name__) + + +class MongoDBKVStoreImpl(KVStore): + def __init__(self, config: MongoDBKVStoreConfig): + self.config = config + self.conn = None + self.collection = None + + async def initialize(self) -> None: + try: + conn_creds = { + "host": self.config.host, + "port": self.config.port, + "username": self.config.user, + "password": self.config.password, + } + conn_creds = {k: v for k, v in conn_creds if v is not None} + + try: + self.conn = MongoClient(**conn_creds) + self.collection = self.conn[self.config.db] + except (ConnectionError, ConfigurationError) as e: + raise Exception(f"Failed to connect to MongoDB: {e}") + except Exception as e: + log.exception("Could not connect to MongoDB database server") + raise RuntimeError("Could not connect to MongoDB database server") from e + + def _namespaced_key(self, key: str) -> str: + if not self.config.namespace: + return key + return f"{self.config.namespace}:{key}" + + async def set( + self, key: str, value: str, expiration: Optional[datetime] = None + ) -> None: + key = self._namespaced_key(key) + update_query = { + "$set": { + "value": value, + "expiration": expiration + } + } + self.collection.update_one( + {"key": key}, + update_query, + upsert=True + ) + + async def get(self, key: str) -> Optional[str]: + key = self._namespaced_key(key) + query = { + "key": key, + "$or": [ + {"expiration": {"$exists": False}}, + {"expiration": {"$gt": datetime.now(datetime.UTC)}}, + ], + } + result = self.collection.find_one(query, {"value": 1, "_id": 0}) + return result["value"] if result else None + + async def delete(self, key: str) -> None: + key = self._namespaced_key(key) + self.collection.delete_one({"key": key}) + + async def range(self, start_key: str, end_key: str) -> List[str]: + start_key = self._namespaced_key(start_key) + end_key = self._namespaced_key(end_key) + query = { + "key": {"$gte": start_key, "$lt": end_key}, + "$or": [ + {"expiration": {"$exists": False}}, + {"expiration": {"$gt": datetime.now(datetime.UTC)}}, + ], + } + cursor = self.collection.find(query, {"value": 1, "_id": 0}).sort("key", 1) + return [doc["value"] for doc in cursor] From c2f3a7f87e186f5e5d7612f40a111d47b8bb34a7 Mon Sep 17 00:00:00 2001 From: shrinitgoyal Date: Thu, 28 Nov 2024 22:03:30 +0530 Subject: [PATCH 2/5] fixed port and collection_name issue --- llama_stack/providers/utils/kvstore/config.py | 4 ++-- .../providers/utils/kvstore/mongodb/mongodb.py | 15 +++------------ 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/llama_stack/providers/utils/kvstore/config.py b/llama_stack/providers/utils/kvstore/config.py index 867ae3f980..b54926d07e 100644 --- a/llama_stack/providers/utils/kvstore/config.py +++ b/llama_stack/providers/utils/kvstore/config.py @@ -17,7 +17,7 @@ class KVStoreType(Enum): redis = "redis" sqlite = "sqlite" - postgres = "postgres", + postgres = "postgres" mongodb = "mongodb" @@ -110,7 +110,7 @@ def validate_table_name(cls, v: str) -> str: class MongoDBKVStoreConfig(CommonConfig): type: Literal[KVStoreType.mongodb.value] = KVStoreType.mongodb.value host: str = "localhost" - port: int = 5432 + port: int = 27017 db: str = "llamastack" user: str = None password: Optional[str] = None diff --git a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py index f749f26d0e..2aaf3ec9fb 100644 --- a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py +++ b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py @@ -1,4 +1,3 @@ -import datetime import logging from datetime import datetime from typing import Optional, List @@ -25,11 +24,11 @@ async def initialize(self) -> None: "username": self.config.user, "password": self.config.password, } - conn_creds = {k: v for k, v in conn_creds if v is not None} + conn_creds = {k: v for k, v in conn_creds.items() if v is not None} try: self.conn = MongoClient(**conn_creds) - self.collection = self.conn[self.config.db] + self.collection = self.conn[self.config.db][self.config.collection_name] except (ConnectionError, ConfigurationError) as e: raise Exception(f"Failed to connect to MongoDB: {e}") except Exception as e: @@ -60,11 +59,7 @@ async def set( async def get(self, key: str) -> Optional[str]: key = self._namespaced_key(key) query = { - "key": key, - "$or": [ - {"expiration": {"$exists": False}}, - {"expiration": {"$gt": datetime.now(datetime.UTC)}}, - ], + "key": key } result = self.collection.find_one(query, {"value": 1, "_id": 0}) return result["value"] if result else None @@ -78,10 +73,6 @@ async def range(self, start_key: str, end_key: str) -> List[str]: end_key = self._namespaced_key(end_key) query = { "key": {"$gte": start_key, "$lt": end_key}, - "$or": [ - {"expiration": {"$exists": False}}, - {"expiration": {"$gt": datetime.now(datetime.UTC)}}, - ], } cursor = self.collection.find(query, {"value": 1, "_id": 0}).sort("key", 1) return [doc["value"] for doc in cursor] From fdda9f3098c61f60cdd25bcede18b8b74eda8c40 Mon Sep 17 00:00:00 2001 From: shrinitgoyal Date: Thu, 28 Nov 2024 22:06:49 +0530 Subject: [PATCH 3/5] removed extra spaces and fix collection_name --- llama_stack/providers/utils/kvstore/config.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/llama_stack/providers/utils/kvstore/config.py b/llama_stack/providers/utils/kvstore/config.py index b54926d07e..df9e86a12a 100644 --- a/llama_stack/providers/utils/kvstore/config.py +++ b/llama_stack/providers/utils/kvstore/config.py @@ -56,15 +56,15 @@ class SqliteKVStoreConfig(CommonConfig): @classmethod def sample_run_config( - cls, __distro_dir__: str = "runtime", db_name: str = "kvstore.db" + cls, __distro_dir__: str = "runtime", db_name: str = "kvstore.db" ): return { "type": "sqlite", "namespace": None, "db_path": "${env.SQLITE_STORE_DIR:~/.llama/" - + __distro_dir__ - + "}/" - + db_name, + + __distro_dir__ + + "}/" + + db_name, } @@ -126,7 +126,7 @@ def sample_run_config(cls, collection_name: str = "llamastack_kvstore"): "db": "${env.MONGODB_DB}", "user": "${env.MONGODB_USER}", "password": "${env.MONGODB_PASSWORD}", - "table_name": "${env.MONGODB_COLLECTION_NAME:" + collection_name + "}", + "collection_name": "${env.MONGODB_COLLECTION_NAME:" + collection_name + "}", } From 8e79f531fcd00095e063c58f260b5a8746bf94f2 Mon Sep 17 00:00:00 2001 From: shrinitgoyal Date: Fri, 29 Nov 2024 13:02:38 +0530 Subject: [PATCH 4/5] removed unused imports --- .../utils/kvstore/mongodb/mongodb.py | 37 +++++++------------ run.yaml | 0 2 files changed, 14 insertions(+), 23 deletions(-) create mode 100644 run.yaml diff --git a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py index 2aaf3ec9fb..625fce9299 100644 --- a/llama_stack/providers/utils/kvstore/mongodb/mongodb.py +++ b/llama_stack/providers/utils/kvstore/mongodb/mongodb.py @@ -1,9 +1,14 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + import logging from datetime import datetime -from typing import Optional, List +from typing import List, Optional from pymongo import MongoClient -from pymongo.errors import ConfigurationError from llama_stack.providers.utils.kvstore import KVStore, MongoDBKVStoreConfig @@ -25,12 +30,8 @@ async def initialize(self) -> None: "password": self.config.password, } conn_creds = {k: v for k, v in conn_creds.items() if v is not None} - - try: - self.conn = MongoClient(**conn_creds) - self.collection = self.conn[self.config.db][self.config.collection_name] - except (ConnectionError, ConfigurationError) as e: - raise Exception(f"Failed to connect to MongoDB: {e}") + self.conn = MongoClient(**conn_creds) + self.collection = self.conn[self.config.db][self.config.collection_name] except Exception as e: log.exception("Could not connect to MongoDB database server") raise RuntimeError("Could not connect to MongoDB database server") from e @@ -41,26 +42,16 @@ def _namespaced_key(self, key: str) -> str: return f"{self.config.namespace}:{key}" async def set( - self, key: str, value: str, expiration: Optional[datetime] = None + self, key: str, value: str, expiration: Optional[datetime] = None ) -> None: + key = self._namespaced_key(key) - update_query = { - "$set": { - "value": value, - "expiration": expiration - } - } - self.collection.update_one( - {"key": key}, - update_query, - upsert=True - ) + update_query = {"$set": {"value": value, "expiration": expiration}} + self.collection.update_one({"key": key}, update_query, upsert=True) async def get(self, key: str) -> Optional[str]: key = self._namespaced_key(key) - query = { - "key": key - } + query = {"key": key} result = self.collection.find_one(query, {"value": 1, "_id": 0}) return result["value"] if result else None diff --git a/run.yaml b/run.yaml new file mode 100644 index 0000000000..e69de29bb2 From a585e08f8cf37fcdf669c2bcac45c800703ef1c5 Mon Sep 17 00:00:00 2001 From: shrinitgoyal Date: Fri, 29 Nov 2024 13:04:44 +0530 Subject: [PATCH 5/5] removed run.yaml --- run.yaml | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 run.yaml diff --git a/run.yaml b/run.yaml deleted file mode 100644 index e69de29bb2..0000000000