Skip to content

Commit

Permalink
fix converting and creating new in logic river
Browse files Browse the repository at this point in the history
  • Loading branch information
Alonreznik committed Mar 21, 2021
1 parent d376b7e commit e6199d0
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 1,442 deletions.
5 changes: 3 additions & 2 deletions rivery_cli/cli/rivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def push(ctx, *args, **kwargs):
raise NotImplementedError(f'River type of {river_type} is not supported for now. '
f'Supported river types: {RIVER_TYPE_CONVERTERS.keys()}.')
# Make the river convertion to entity def that will be sent to the API.
entity = river_converter(content=content).convert()
river_converter = river_converter(content=content)
entity = river_converter.convert()
if converter.entity_name in all_rivers:
raise KeyError(f'Duplicate Entity Name: {converter.entity_name}.'
f'Already exists in {all_rivers.get(converter.entity_name, {}).get("path")}')
Expand All @@ -95,7 +96,7 @@ def push(ctx, *args, **kwargs):
"converter": river_converter,
"cross_id": entity.get('cross_id'),
"_id": entity.get('_id'),
"is_new": False if river_converter.cross_id else True,
"is_new": False if river_converter.cross_id is not None else True,
"yaml": converter.full_yaml,
"path": yaml_path
}
Expand Down
15 changes: 10 additions & 5 deletions rivery_cli/converters/entities/rivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class LogicConverter(RiverConverter):
task_type_id = "logic"
datasource_id = "logic"

step_bson_converter = ['river_id', 'action_id', 'gConnection']
step_bson_converter = ['river_id', 'action_id', 'gConnection',"connection_id", "fzConnection"]

def __init__(self, **kwargs):
super(LogicConverter, self).__init__(**kwargs)
Expand All @@ -107,7 +107,8 @@ def bson_converter(self, dct):
for k, v in dct.items():
if k in self.step_bson_converter and v:
newdct[k] = bson.ObjectId(v)
newdct[k] = v
else:
newdct[k] = v
return newdct

def valid_step(self, step_type):
Expand Down Expand Up @@ -156,7 +157,7 @@ def steps_converter(self, steps: list) -> list:
# This is "low level" step. Means, it is not container in any kind.
content = {}
primary_type = step.pop('block_primary_type', 'sql')
block_db_type = step.pop('block_db_type', None)
block_db_type = step.pop('block_db_type', primary_type)
content[global_keys.BLOCK_PRIMARY_TYPE] = primary_type
content[global_keys.BLOCK_TYPE] = block_db_type
content[global_keys.BLOCK_DB_TYPE] = block_db_type
Expand All @@ -180,6 +181,7 @@ def steps_converter(self, steps: list) -> list:
content.update(step)

current_step[global_keys.CONTNET] = content
current_step[global_keys.NODES] = []

all_steps.append(current_step)

Expand All @@ -203,7 +205,8 @@ def convert(self) -> dict:
global_keys.TASK_TYPE_ID: self.task_type_id,
global_keys.TASK_CONFIG: {},
global_keys.SCHEDULING: self.definition.get(
global_keys.SCHEDULING) or {"isEnabled": False}
global_keys.SCHEDULING) or {"isEnabled": False},
global_keys.RIVER_ID: self.cross_id
}
]

Expand All @@ -216,7 +219,7 @@ def convert(self) -> dict:
# Convert the steps to river definitions
steps = self.steps_converter(self.properties.get('steps', []))

steps = json.loads(json.dumps(steps), object_hook=self.bson_converter)
# steps = json.loads(json.dumps(steps), object_hook=self.bson_converter)

# Make the full definition of the logic under the tasks definitions [0]
self.river_full_definition[global_keys.TASKS_DEF][0][
Expand All @@ -227,6 +230,8 @@ def convert(self) -> dict:
"variables": self.vars}
)

self.river_full_definition = json.loads(json.dumps(self.river_full_definition), object_hook=self.bson_converter)

return self.river_full_definition

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions rivery_cli/globals/global_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
RIVER_DESCRIPTION = 'river_desc'
RIVER_TYPE = 'river_type'
CROSS_ID = 'cross_id'
RIVER_ID = 'river_id'


AVAILABLE_RIVER_TYPES = ['logic', 'src_to_trgt', 'action']
Expand Down
24 changes: 22 additions & 2 deletions rivery_cli/rivery_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import zlib

import requests
from .utils import bson_utils as json_util
from .utils import bson_utils as json_util, utils
import logging
import simplejson as json

MAX_PULL_TIME = 1800
SLEEP_PULL_TIME = 5
Expand Down Expand Up @@ -189,7 +190,7 @@ def save_river(self, **kwargs):
payload = {"river_definitions": data.get("river_definitions"),
"tasks_definitions": data.get("tasks_definitions")}
url = "/rivers/modify"
if kwargs.get("create_new", False):
if kwargs.get("create_new", False) or not data.get('cross_id'):
logging.debug('Creating New River: {}({})'.format(
data.get('river_definitions', {}).get('river_name'), data.get('cross_id')))
method = "put"
Expand All @@ -210,9 +211,18 @@ def save_river(self, **kwargs):
data['cross_id'] = exists.get('cross_id')
if not data.get('cross_id'):
raise RuntimeError('Please provide cross_id and cross_id to update river')
existing_tasks = exists.get('tasks_definitions', [])
for idx_, t in enumerate(payload.get('tasks_definitions', [])):
task_ = existing_tasks[idx_:idx_+1]
if task_:
payload['tasks_definitions'][idx_] = utils.recursive_update(task_[0], t)
else:
payload['tasks_definitions'][idx_] = t

# Check if the river already exist by the id or not
payload.update({"cross_id": data.get("cross_id"),
"_id": data.get("_id")})

# headers = {"Content-Encoding": "gzip"}
logging.debug('Saving River {}({}). Creating New? {}'.format(data.get('river_definitions', {}).get('river_name'),
data.get('cross_id'),
Expand Down Expand Up @@ -374,3 +384,13 @@ def _load(f_obj, **kwargs):
else:
obj = {}
return obj

def object_hook(self, dct):
""" Update ObjectId Object Hook for requesting and responding """
newdct = {}
for k,v in dct.items():
if (isinstance(v, str) or isinstance(v, bytes)) and len(v) == 12:
newdct[k] = json_util.convert_oid(v)
else:
newdct[k] = v
return newdct
43 changes: 43 additions & 0 deletions rivery_cli/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,46 @@ def recursive_update(source, overrides, overwrite_nones=False):
else:
source[key] = overrides[key]
return source


def merge_lists(list1, list2):
"""
not using set for the distinct operation
in order to support unhashable type(s)
"""
union_lists = [item for item in list1 if not isinstance(item, dict)] + \
[item for item in list2
if not isinstance(item, dict) and item not in list1]

dicts_ = [item for item in list1 if isinstance(item, dict)] + \
[item for item in list2
if isinstance(item, dict) and item not in list1]

merged_dicts = {}
for item in dicts_:
merged_dicts = recursively_merge_dictionaries(merged_dicts, item)

union_lists.append(merged_dicts)
return union_lists


def recursively_merge_dictionaries(updated_item, overwriting_item,
union_lists=False):
"""self explanatory.
notes:
(1) a simple dict.update function does not give the requested result,
hence the recursion
(2) when updated_item and overwriting_item params share the same key,
overwriting_item is stronger and will overwrite the value in
updated_item"""
res = updated_item
for key, val in overwriting_item.items():
if isinstance(val, list) and isinstance(updated_item.get(key), list) \
and union_lists:
res.update({key: merge_lists(val, updated_item.get(key))})
elif not isinstance(val, dict) or key not in updated_item.keys():
res.update({key: val})
else:
res[key] = recursively_merge_dictionaries(
updated_item.get(key), val)
return res
Loading

0 comments on commit e6199d0

Please sign in to comment.