Skip to content

Commit

Permalink
Merge branch 'evt-storage' (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbguimaraes committed Nov 8, 2024
2 parents 2a3fbe1 + 2d9ddd8 commit 610f8af
Show file tree
Hide file tree
Showing 22 changed files with 1,834 additions and 75 deletions.
2 changes: 1 addition & 1 deletion bw2data/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def copy(self, name):
warnings.simplefilter("ignore")
new_database = self.__class__(name)
metadata = copy.copy(self.metadata)
metadata['format'] = f"Copied from '{self.name}'"
metadata["format"] = f"Copied from '{self.name}'"
new_database.register(**metadata)

new_database.write(data, searchable=databases[name].get("searchable"))
Expand Down
101 changes: 62 additions & 39 deletions bw2data/backends/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd

from bw2data import databases, geomapping
from bw2data import databases, geomapping, projects
from bw2data.backends import sqlite3_lci_db
from bw2data.backends.schema import ActivityDataset, ExchangeDataset
from bw2data.backends.typos import (
Expand All @@ -20,6 +20,7 @@
from bw2data.logs import stdout_feedback_logger
from bw2data.proxies import ActivityProxyBase, ExchangeProxyBase
from bw2data.search import IndexManager
from bw2data.signals import on_activity_code_change, on_activity_database_change


class Exchanges(Iterable):
Expand Down Expand Up @@ -71,7 +72,9 @@ def __init__(self, key, kinds=None, reverse=False):
def filter(self, expr):
self._args.append(expr)

def delete(self):
def delete(self, allow_in_sourced_project: bool = False):
if projects.dataset.is_sourced and not allow_in_sourced_project:
raise NotImplementedError("Mass exchange deletion not supported in sourced projects")
databases.set_dirty(self._key[0])
ExchangeDataset.delete().where(*self._args).execute()

Expand Down Expand Up @@ -183,6 +186,8 @@ def to_dataframe(


class Activity(ActivityProxyBase):
ORMDataset = ActivityDataset

def __init__(self, document=None, **kwargs):
"""Create an `Activity` proxy object.
Expand All @@ -191,7 +196,7 @@ def __init__(self, document=None, **kwargs):
If the activity exists in the database, `document` should be an `ActivityDataset`.
"""
if document is None:
self._document = ActivityDataset()
self._document = self.ORMDataset()
self._data = kwargs
else:
self._document = document
Expand Down Expand Up @@ -254,7 +259,7 @@ def __setitem__(self, key, value):
def key(self):
return (self.get("database"), self.get("code"))

def delete(self):
def delete(self, signal: bool = True):
from bw2data import Database, calculation_setups
from bw2data.parameters import ActivityParameter, ParameterizedExchange

Expand All @@ -274,7 +279,7 @@ def purge(obj: Activity, dct: dict) -> dict:
except ActivityParameter.DoesNotExist:
pass
IndexManager(Database(self["database"]).filename).delete_dataset(self._data)
self.exchanges().delete()
self.exchanges().delete(allow_in_sourced_project=True)

for name, setup in calculation_setups.items():
if any(
Expand All @@ -286,10 +291,10 @@ def purge(obj: Activity, dct: dict) -> dict:
setup["inv"] = [purge(self, dct) for dct in setup["inv"] if purge(self, dct)]
calculation_setups.flush()

self._document.delete_instance()
self._document.delete_instance(signal=signal)
self = None

def save(self):
def save(self, signal: bool = True, data_already_set: bool = False, force_insert: bool = False):
"""
Saves the current activity to the database after performing various checks.
This method validates the activity, updates the database status, and handles
Expand Down Expand Up @@ -319,7 +324,7 @@ def save(self):
"""
from bw2data import Database

if not self.valid():
if not data_already_set and not self.valid():
raise ValidityError(
"This activity can't be saved for the "
+ "following reasons\n\t* "
Expand All @@ -328,39 +333,40 @@ def save(self):

databases.set_dirty(self["database"])

check_activity_type(self._data.get("type"))
check_activity_keys(self)
if not data_already_set:
check_activity_type(self._data.get("type"))
check_activity_keys(self)

for key, value in dict_as_activitydataset(self._data).items():
if key != "id":
setattr(self._document, key, value)
self._document.save()
for key, value in dict_as_activitydataset(self._data).items():
if key != "id":
setattr(self._document, key, value)

self._document.save(signal=signal, force_insert=force_insert)

if self.get("location") and self["location"] not in geomapping:
geomapping.add([self["location"]])

if databases[self["database"]].get("searchable", True):
IndexManager(Database(self["database"]).filename).update_dataset(self._data)

def _change_code(self, new_code):
def _change_code(self, new_code: str, signal: bool = True):
if self["code"] == new_code:
return

previous = self["code"]

if (
ActivityDataset.select()
self.ORMDataset.select()
.where(
ActivityDataset.database == self["database"],
ActivityDataset.code == new_code,
self.ORMDataset.database == self["database"],
self.ORMDataset.code == new_code,
)
.count()
):
raise ValueError("Activity database with code `{}` already exists".format(new_code))

with sqlite3_lci_db.atomic() as txn:
ActivityDataset.update(code=new_code).where(
ActivityDataset.database == self["database"],
ActivityDataset.code == self["code"],
).execute()
self.ORMDataset.update(code=new_code).where(self.ORMDataset.id == self.id).execute()
ExchangeDataset.update(output_code=new_code).where(
ExchangeDataset.output_database == self["database"],
ExchangeDataset.output_code == self["code"],
Expand All @@ -379,17 +385,24 @@ def _change_code(self, new_code):
else:
self._data["code"] = new_code

def _change_database(self, new_database):
if signal:
on_activity_code_change.send(
old={"id": self.id, "code": previous},
new={"id": self.id, "code": new_code},
)

def _change_database(self, new_database: str, signal: bool = True):
if self["database"] == new_database:
return

previous = self["database"]

if new_database not in databases:
raise ValueError("Database {} does not exist".format(new_database))

with sqlite3_lci_db.atomic() as txn:
ActivityDataset.update(database=new_database).where(
ActivityDataset.database == self["database"],
ActivityDataset.code == self["code"],
self.ORMDataset.update(database=new_database).where(
self.ORMDataset.id == self.id
).execute()
ExchangeDataset.update(output_database=new_database).where(
ExchangeDataset.output_database == self["database"],
Expand All @@ -409,6 +422,12 @@ def _change_database(self, new_database):
else:
self._data["database"] = new_database

if signal:
on_activity_database_change.send(
old={"id": self.id, "database": previous},
new={"id": self.id, "database": new_database},
)

def exchanges(self, exchanges_class=Exchanges):
return exchanges_class(self.key)

Expand Down Expand Up @@ -480,7 +499,7 @@ def new_edge(self, **kwargs):
exc[key] = kwargs[key]
return exc

def copy(self, code=None, **kwargs):
def copy(self, code: Optional[str] = None, signal: bool = True, **kwargs):
"""Copy the activity. Returns a new `Activity`.
`code` is the new activity code; if not given, a UUID is used.
Expand All @@ -495,19 +514,21 @@ def copy(self, code=None, **kwargs):
for k, v in kwargs.items():
activity._data[k] = v
activity._data["code"] = str(code or uuid.uuid4().hex)
activity.save()
activity.save(signal=signal)

for exc in self.exchanges():
data = copy.deepcopy(exc._data)
data["output"] = activity.key
# Change `input` for production exchanges
if exc["input"] == exc["output"]:
data["input"] = activity.key
ExchangeDataset.create(**dict_as_exchangedataset(data))
ExchangeDataset(**dict_as_exchangedataset(data)).save(signal=signal)
return activity


class Exchange(ExchangeProxyBase):
ORMDataset = ExchangeDataset

def __init__(self, document=None, **kwargs):
"""Create an `Exchange` proxy object.
Expand All @@ -516,7 +537,7 @@ def __init__(self, document=None, **kwargs):
If the exchange exists in the database, `document` should be an `ExchangeDataset`.
"""
if document is None:
self._document = ExchangeDataset()
self._document = self.ORMDataset()
self._data = kwargs
else:
self._document = document
Expand All @@ -530,28 +551,30 @@ def __init__(self, document=None, **kwargs):
self._document.output_code,
)

def save(self):
if not self.valid():
def save(self, signal: bool = True, data_already_set: bool = False, force_insert: bool = False):
if not data_already_set and not self.valid():
raise ValidityError(
"This exchange can't be saved for the "
"following reasons\n\t* " + "\n\t* ".join(self.valid(why=True)[1])
)

databases.set_dirty(self["output"][0])

check_exchange_type(self._data.get("type"))
check_exchange_keys(self)
if not data_already_set:
check_exchange_type(self._data.get("type"))
check_exchange_keys(self)

for key, value in dict_as_exchangedataset(self._data).items():
setattr(self._document, key, value)

for key, value in dict_as_exchangedataset(self._data).items():
setattr(self._document, key, value)
self._document.save()
self._document.save(signal=signal, force_insert=force_insert)

def delete(self):
def delete(self, signal: bool = True):
from bw2data.parameters import ParameterizedExchange

ParameterizedExchange.delete().where(
ParameterizedExchange.exchange == self._document.id
).execute()
self._document.delete_instance()
self._document.delete_instance(signal=signal)
databases.set_dirty(self["output"][0])
self = None
7 changes: 4 additions & 3 deletions bw2data/backends/schema.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from peewee import DoesNotExist, Model, TextField
from peewee import DoesNotExist, TextField

from bw2data.errors import UnknownObject
from bw2data.signals import SignaledDataset
from bw2data.sqlite import PickleField


class ActivityDataset(Model):
class ActivityDataset(SignaledDataset):
data = PickleField() # Canonical, except for other C fields
code = TextField() # Canonical
database = TextField() # Canonical
Expand All @@ -18,7 +19,7 @@ def key(self):
return (self.database, self.code)


class ExchangeDataset(Model):
class ExchangeDataset(SignaledDataset):
data = PickleField() # Canonical, except for other C fields
input_code = TextField() # Canonical
input_database = TextField() # Canonical
Expand Down
24 changes: 23 additions & 1 deletion bw2data/backends/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import copy
import warnings
from typing import Optional

import numpy as np

from bw2data import config
from bw2data.backends.schema import get_id
from bw2data.backends.schema import SignaledDataset, get_id
from bw2data.configuration import labels
from bw2data.errors import InvalidExchange, UntypedExchange
from bw2data.meta import databases, methods
Expand All @@ -31,11 +32,15 @@ def convert_backend(database_name, backend):
if database_name not in databases:
raise ValueError(f"Can't find database {database_name}")

from bw2data import projects
from bw2data.database import Database

db = Database(database_name)
if db.backend == backend:
return False
if backend == "iotable" and projects.dataset.is_sourced:
raise ValueError("`iotable` backend not consistent with `sourced` project")

# Needed to convert from async json dict
data = db.load(as_dict=True)
if database_name in config.cache:
Expand Down Expand Up @@ -84,6 +89,23 @@ def dict_as_exchangedataset(ds):
}


def get_obj_as_dict(cls: SignaledDataset, obj_id: Optional[int]) -> dict:
"""
Loads an object's data from the database as a dictionary.
The format used is that of the serialization of revisions (see also the
`dict_as_*` functions above); in particular, an empty dictionary is returned
if the ID is `None` (but not if the object does not exist).
"""
if obj_id is None:
return {}
to_dict = globals()["dict_as_" + cls.__name__.lower()]
obj = cls.get_by_id(obj_id)
ret = to_dict(obj.data)
ret["id"] = obj_id
return ret


def replace_cfs(old_key, new_key):
"""Replace ``old_key`` with ``new_key`` in characterization factors.
Expand Down
5 changes: 4 additions & 1 deletion bw2data/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from bw2data import databases
from bw2data import databases, projects
from bw2data.data_store import ProcessedDataStore


Expand All @@ -10,6 +10,9 @@ def DatabaseChooser(name: str, backend: str = "sqlite") -> ProcessedDataStore:
"""
from bw2data.subclass_mapping import DATABASE_BACKEND_MAPPING

if backend == "iotable" and projects.dataset.is_sourced:
raise ValueError("`iotable` backend not consistent with `sourced` project")

if name in databases:
backend = databases[name].get("backend") or backend

Expand Down
18 changes: 18 additions & 0 deletions bw2data/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ class InvalidDatapackage(BW2Exception):
"""The given datapackage can't be used for the requested task."""

pass


class IncompatibleClasses(BW2Exception):
"""Revision comparison across two different classes doesn't make sense and isn't allowed"""

pass


class DifferentObjects(BW2Exception):
"""Revision comparison of two different objects doesn't make sense and isn't allowed"""

pass


class InconsistentData(BW2Exception):
"""Attempted a change on data which was in an inconsistent state with the changeset."""

pass
Loading

0 comments on commit 610f8af

Please sign in to comment.