Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Couchbase Collection Datanode and Add Unit Tests #2010

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions taipy/core/data/couchbase_collection_datanode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# couchbase_collection_datanode.py

from datetime import datetime, timedelta
from inspect import isclass
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException

from taipy.common.config.common.scope import Scope

from .._version._version_manager_factory import _VersionManagerFactory
from ..data.operator import JoinOperator, Operator
from ..exceptions.exceptions import InvalidCustomDocument, MissingRequiredProperty
from .data_node import DataNode

from .data_node_id import DataNodeId, Edit


class CouchbaseDocument:
"""Class to define the structure of documents stored in Couchbase."""

def __init__(self, field1: str, field2: int, **kwargs):
self.field1 = field1 # Example field of type string
self.field2 = field2 # Example field of type integer
# Additional fields can be added dynamically
for key, value in kwargs.items():
setattr(self, key, value)
Comment on lines +21 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this class being used?
What is it used for?


class CouchBaseCollectionDataNode(DataNode):
"""Data Node stored in a Couchbase collection.

The *properties* attribute must contain the following mandatory entries:

- *db_name* (`str`): The bucket name in Couchbase.
- *collection_name* (`str`): The collection name in the Couchbase bucket to read from and to write the data to.

The *properties* attribute can also contain the following optional entries:

- *db_username* (`str`): The username for the Couchbase database.
- *db_password* (`str`): The password for the Couchbase database.
- *db_host* (`str`): The database host. The default value is *"localhost"*.
- *db_port* (`int`): The database port. The default value is *8091*.
"""

__STORAGE_TYPE = "couchbase_collection"

__DB_NAME_KEY = "db_name"
__COLLECTION_KEY = "collection_name"
__DB_USERNAME_KEY = "db_username"
__DB_PASSWORD_KEY = "db_password"
__DB_HOST_KEY = "db_host"
__DB_PORT_KEY = "db_port"

__DB_HOST_DEFAULT = "localhost"
__DB_PORT_DEFAULT = 8091

_REQUIRED_PROPERTIES: List[str] = [
__DB_NAME_KEY,
__COLLECTION_KEY,
]

def __init__(
self,
config_id: str,
scope: Scope,
id: Optional[DataNodeId] = None,
owner_id: Optional[str] = None,
parent_ids: Optional[Set[str]] = None,
last_edit_date: Optional[datetime] = None,
edits: List[Edit] = None,
version: str = None,
validity_period: Optional[timedelta] = None,
edit_in_progress: bool = False,
editor_id: Optional[str] = None,
editor_expiration_date: Optional[datetime] = None,
properties: Dict = None,
) -> None:
if properties is None:
properties = {}
required = self._REQUIRED_PROPERTIES
if missing := set(required) - set(properties.keys()):
raise MissingRequiredProperty(
f"The following properties {', '.join(missing)} were not informed and are required."
)

super().__init__(
config_id,
scope,
id,
owner_id,
parent_ids,
last_edit_date,
edits,
version or _VersionManagerFactory._build_manager()._get_latest_version(),
validity_period,
edit_in_progress,
editor_id,
editor_expiration_date,
**properties,
)


# Create a Couchbase connection using the provided properties.
# For more information on connecting to Couchbase, see:
# https://docs.couchbase.com/python-sdk/current/hello-world/start-using-sdk.html
try:
self.cluster = Cluster(
f"couchbase://{properties.get(self.__DB_HOST_KEY, self.__DB_HOST_DEFAULT)}",
ClusterOptions(
PsswordAuthenticator(
properties.get(self.__DB_USERNAME_KEY, ""),
properties.get(self.__DB_PASSWORD_KEY, "")
Comment on lines +113 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite sure that Couchbase requires username and password to connect.

If the properties doesn't contain the username and password, we should raise an error before this, meaning that we should add the self.__DB_USERNAME_KEY and self.__DB_PASSWORD_KEY to the self._REQUIRED_PROPERTIES

)
)
)
bucket = self.cluster.bucket(properties.get(self.__DB_NAME_KEY))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bucket = self.cluster.bucket(properties.get(self.__DB_NAME_KEY))
bucket = self.cluster.bucket(properties.get(self.__BUCKET_KEY))

The constant should be renamed to match what it actually presents

self.collection = bucket.collection(properties.get(self.__COLLECTION_KEY))

except CouchbaseException as e:
raise ConnectionError(f"Could not connect to Couchbase: {e}")

self._TAIPY_PROPERTIES.update(
{
self.__COLLECTION_KEY,
self.__DB_NAME_KEY,
self.__DB_USERNAME_KEY,
self.__DB_PASSWORD_KEY,
self.__DB_HOST_KEY,
self.__DB_PORT_KEY,
}
)

@classmethod
def storage_type(cls) -> str:
"""Return the storage type of the data node: "couchbase_collection"."""
return cls.__STORAGE_TYPE

def _read(self):
"""Read all documents from the Couchbase collection."""
try:
query = f"SELECT * FROM `{self.collection.name}`"
result = self.cluster.query(query)

return [doc.content_as[dict] for doc in documents]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is documents here?

except Exception as e:
print(f"An error occurred: {e}")
return []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: We have to handle this case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review and the detailed feedback! I appreciate the direction, and I'm eager to work on the improvements.

-> For document structure, I will define a Python class or object to structure the documents, ensuring consistency in what
is accepted.
-> I will proceed with implementing the _write, _append, and filter methods. I understand these are essential for CRUD
operations.
-> For testing, I will create a Mock class to simulate Couchbase interactions, allowing us to test without needing an
actual Couchbase server.


def _write(self,data : Union[Dict, List[Dict]]):
"""Write Documents to the Couchbase collection."""
try:
if isinstance(data, dict):
self.collection.upsert(data['id'],data)
elif isinstance(data, list):
for item in data:
self.collection.upsert(item['id'], item)
Comment on lines +155 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for using upsert here?
What should happens if the document doesn't have an id?

except CouchbaseException as e:
print(f"An error occurred while writing:{e}")
Comment on lines +153 to +160
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
if isinstance(data, dict):
self.collection.upsert(data['id'],data)
elif isinstance(data, list):
for item in data:
self.collection.upsert(item['id'], item)
except CouchbaseException as e:
print(f"An error occurred while writing:{e}")
try:
if isinstance(data, dict):
self.collection.upsert(data['id'],data)
elif isinstance(data, list):
for item in data:
self.collection.upsert(item['id'], item)
except CouchbaseException as e:
print(f"An error occurred while writing:{e}")

The except is misalign


def _append(self,data: Union[Dict, List[Dict]]):
"""Append data to the Couchbase collection without overwriting."""
try:
if isinstance(data, dict):
if not self.collection.exists(data['id']):
self.collection.insert(data['id'], data)
elif isinstance(data, list):
for item in data:
if not self.collection.exists(item['id']):
self.collection.insert(item['id'], item)
except CouchbaseException as e:
print(f"An error occurred while appending: {e}")
def filter(self, criteria: Dict[str, Any]):
"""Filter documents in the Couchbase collection based on criteria"""
try:
where_clause = " AND ".join([f"{key} = '{value}'" for key, value in criteria.items()])
query = f"SELECT * FROM `{self.collection.name}` WHERE {where_clause}"
result = self.cluster.query(query)
return [doc for doc in result]
except CouchbaseException as e:
print(f"An error occurred while filtering documents:{e}")
return []
59 changes: 59 additions & 0 deletions tests/core/data/mock_couchbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
class MockCouchbaseCollection:
def __init__(self):
self.documents = {}

def insert(self, doc_id, document):
if doc_id in self.documents:
raise Exception(f"Document with ID {doc_id} already exists.")
self.documents[doc_id] = document

def upsert(self, doc_id, document):
self.documents[doc_id] = document

def get(self, doc_id):
if doc_id not in self.documents:
raise Exception(f"Document with ID {doc_id} not found.")
return self.documents[doc_id]

def remove(self, doc_id):
if doc_id not in self.documents:
raise Exception(f"Document with ID {doc_id} not found.")
del self.documents[doc_id]
Comment on lines +1 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my understanding, the MockCouchbaseCollection is essentially a dictionary right?


class MockCouchbaseBucket:
def __init__(self):
self.collection = MockCouchbaseCollection()

def collection(self, name):
return self.collection


class MockCouchbaseCluster:
def __init__(self):
self.bucket = MockCouchbaseBucket()

def bucket(self, name):
return self.bucket

# Usage example
if __name__ == "__main__":
cluster = MockCouchbaseCluster()
bucket = cluster.bucket("test_bucket")
collection = bucket.collection("test_collection")

# Insert a document
collection.insert("doc1", {"name": "Test Document"})

# Get the document
doc = collection.get("doc1")
print(doc) # Output: {'name': 'Test Document'}

# Update the document
collection.upsert("doc1", {"name": "Updated Document"})

# Get the updated document
updated_doc = collection.get("doc1")
print(updated_doc) # Output: {'name': 'Updated Document'}

# Remove the document
collection.remove("doc1")
81 changes: 81 additions & 0 deletions tests/core/data/test_couchbase_collection_data_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

import pytest
from mocks.mock_couchbase import MockCouchbaseCollectionDataNode
from unittest.mock import patch, MagicMock
from taipy.core.data.couchbase_collection_datanode import CouchBaseCollectionDataNode
from taipy.exceptions.exceptions import MissingRequiredProperty


# Sample properties for initializing CouchBaseCollectionDataNode
PROPERTIES = {
"db_name": "test_db",
"collection_name": "test_collection",
"db_username": "username",
"db_password": "password",
"db_host": "localhost",
"db_port": 8091,
}

# Test class for CouchBaseCollectionDataNode
class TestCouchBaseCollectionDataNode:

@patch('taipy.core.data.couchbase_collection_datanode.Cluster')
@patch('taipy.core.data.couchbase_collection_datanode.PasswordAuthenticator')
def test_init_valid_properties(self, mock_authenticator, mock_cluster):
"""Test initialization with valid properties."""
mock_cluster.return_value.bucket.return_value.collection.return_value = MagicMock()

node = CouchBaseCollectionDataNode(
config_id='test_config',
scope=None,
properties=PROPERTIES
)

assert node.storage_type() == "couchbase_collection"
assert node.collection is not None

def test_init_missing_properties(self):
"""Test initialization raises error for missing required properties."""
with pytest.raises(MissingRequiredProperty):
CouchBaseCollectionDataNode(
config_id='test_config',
scope=None,
properties={"collection_name": "test_collection"} # Missing db_name
)

@patch('taipy.core.data.couchbase_collection_datanode.Cluster')
@patch('taipy.core.data.couchbase_collection_datanode.PasswordAuthenticator')
def test_read(self, mock_authenticator, mock_cluster):
"""Test the read method returns documents."""
mock_document = MagicMock()
mock_document.content_as.return_value = {"key": "value"}
mock_cluster.return_value.bucket.return_value.collection.return_value.get.return_value = mock_document

node = CouchBaseCollectionDataNode(
config_id='test_config',
scope=None,
properties=PROPERTIES
)

# Simulate reading from the collection
documents = node._read()

assert len(documents) == 1
assert documents[0] == {"key": "value"}

@patch('taipy.core.data.couchbase_collection_datanode.Cluster')
@patch('taipy.core.data.couchbase_collection_datanode.PasswordAuthenticator')
def test_read_empty(self, mock_authenticator, mock_cluster):
"""Test the read method returns an empty list when no documents exist."""
mock_cluster.return_value.bucket.return_value.collection.return_value.get.side_effect = Exception("No documents found")

node = CouchBaseCollectionDataNode(
config_id='test_config',
scope=None,
properties=PROPERTIES
)

# Simulate reading from the collection
documents = node._read()

assert documents == []
32 changes: 32 additions & 0 deletions tests/mocks/mock_couchbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# test_couchbase_collection_datanode.py
import unittest
from mock_couchbase import MockBucket
from couchbase_collection_datanode import CouchBaseCollectionDataNode # Make sure this import is correct


class TestCouchBaseCollectionDataNode(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test_read to this class?

def setUp(self):
self.mock_bucket = MockBucket()
self.data_node = CouchBaseCollectionDataNode(properties={}, mock_bucket=self.mock_bucket.collection)

def test_write_document(self):
doc_id = "doc1"
document = {"name": "John", "age": 30}
self.data_node._write(doc_id, document) # Assuming _write is a method that saves a document

retrieved_doc = self.mock_bucket.collection.get(doc_id)
self.assertEqual(retrieved_doc, document)

def test_append_document(self):
doc_id = "doc2"
document = {"name": "Jane", "hobbies": []}
self.data_node._write(doc_id, document) # Assuming _write is implemented correctly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

I'm asking because the signature of the CouchBaseCollectionDataNode._write() method only requires 1 argument.
Here you are providing 2.


additional_data = ["reading", "hiking"]
self.data_node._append(doc_id, additional_data) # Assuming _append is implemented correctly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, does _append() with 2 arguments work here?


retrieved_doc = self.mock_bucket.collection.get(doc_id)
self.assertEqual(retrieved_doc["hobbies"], additional_data)

if __name__ == "__main__":
unittest.main()