-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dfview #297
base: master
Are you sure you want to change the base?
Dfview #297
Changes from 229 commits
a2d7008
62925bb
0e313dc
e211371
39e4535
329a7cc
d9d8b02
f7ba342
21f0fa9
c9363ef
14fc1f3
2d13342
666073e
73aa50e
689cc3f
8ba818f
abb3337
3180cbd
9b9c420
55989d6
cd69d04
f2136d5
30953e3
0dccc6e
000463d
37972b5
74c1dad
bf210c4
95c1645
5db42d2
664e255
02265fe
f536652
bafe9cf
223dbe9
cc48016
948ce1a
37b8ac2
0369c92
2096828
f213240
b050d74
76b5ff1
fe36b94
daa6012
5c43f38
459b91c
c0ac960
dd0867d
e52d825
463ea70
76d1952
7cfeceb
b1cb082
eaac2b6
a9ce1fb
113a83f
375982c
e9d1053
e1ed80d
f4fe394
a057677
0845a63
4d2886a
db3ec9f
f2efedc
4926330
56bb190
04d810b
f0b7e37
18d49a6
737eeed
732762d
ab6508c
39027f7
358d82b
98a4d7f
e6b1a57
a0e0167
204bd39
c788b96
60f2ba9
bba4829
c341eb2
b23f1d8
63bd5a0
650014e
cb9f2a2
013f401
18ce7ce
8657081
51e2fec
955aede
039d8ee
547bb88
700635f
dec92ca
b631932
4804417
f16cb09
37dac08
8c62e0a
cfcb69b
22504ef
23c373d
98624e6
76d8717
44a9c3d
210f847
f8829ae
a7d6673
3d322c2
294ec3a
e8edd9d
c2ba9ff
1a19815
1fb0362
937368e
cddcf66
534cbd4
e5dc536
9b1a4a9
1967685
6c3270a
6bdb08e
3680436
cf5f5a6
23ad71a
75eefc0
3ddc916
3a6dc51
c02fe32
58159d0
a7b477d
903f3b4
29f736d
7fd9bdc
04df757
e47e15c
a4b14fb
5492b94
25320bd
e289c6b
a59c13a
87df0bc
0875149
c335831
bdf783a
ea20c60
778d56c
611601a
66867b7
fbe396f
c2c7185
78cc222
001134c
eb0bb76
d646ac2
b55775b
0d23098
7774c6f
ae1d621
3d5738e
4685c6b
dc38d28
0df34bc
87353e3
87abe47
a3719ef
ed42f70
2157da2
1abeaa7
e77562e
03208aa
35430f2
de3e7e5
a8af750
d41a24b
c98b87c
a57c413
764650b
4676901
e5d74c6
030d587
55e62eb
7cf7bae
521142e
9373fd2
6f67ac4
703a19a
613532a
b981bb9
e35c1c4
a7ee946
0f319d1
a5ab148
f50ab1f
94a074a
ef563e9
e0caad0
ba06c9f
25a3cc3
2e03557
5f83d7e
d932731
fff9a19
ffbc4f0
7f82d58
474425a
1c59265
5002c65
efcde7c
825aaf1
47e2e7e
071be03
c908397
72cca74
bd9a85f
62d0d69
40e8770
81bbbcd
3507595
099c8f6
3d6966e
735b7a5
85169e6
9667dd7
c333f6d
bb764bd
7803d76
dfb36ab
5c93b43
fe9cee8
c1ad9ba
80c0339
6cb1d3e
e8cf7f2
3153f2b
135260e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,10 +58,16 @@ def __init__(self, | |
self.name = name | ||
self._columns = OrderedDict() | ||
self._dataset = dataset | ||
self._h5group = h5group | ||
self._h5group = h5group # the HDF5 group to store all fields | ||
|
||
for subg in h5group.keys(): | ||
self._columns[subg] = dataset.session.get(h5group[subg]) | ||
if subg[0] != '_': # stores metadata, for example filters | ||
self._columns[subg] = dataset.session.get(h5group[subg]) | ||
|
||
if '_filters' not in h5group.keys(): | ||
self._filters_grp = self._h5group.create_group('_filters') | ||
else: | ||
self._filters_grp = h5group['_filters'] | ||
|
||
@property | ||
def columns(self): | ||
|
@@ -101,15 +107,39 @@ def add(self, | |
nfield.data.write(field.data[:]) | ||
self._columns[dname] = nfield | ||
|
||
def add_view(self, field: fld.Field): | ||
""" | ||
|
||
""" | ||
if isinstance(field, fld.NumericField): | ||
view = fld.NumericField(field._session, field._field, self, write_enabled=True) | ||
view.data = field.data | ||
|
||
self._columns[view.name] = view | ||
return self._columns[view.name] | ||
|
||
# def add_reference(self, field: fld.Field): | ||
# """ | ||
# Add a field without coping the data over the HDF5 group. | ||
# :param field: field to be constructed in this dataframe. | ||
# """ | ||
# if isinstance(field, fld.NumericField): | ||
# fld.numeric_field_constructor(self._dataset.session, self, field.name, field._nformat) | ||
# nfield = fld.NumericField(self._dataset.session, field._field, self, write_enabled=True) | ||
# self._columns[field.name] = nfield | ||
# return self._columns[field.name] | ||
|
||
def drop(self, | ||
name: str): | ||
""" | ||
Drop a field from this dataframe as well as the HDF5 Group | ||
|
||
:param name: name of field to be dropped | ||
""" | ||
del self._columns[name] | ||
del self._h5group[name] | ||
if name in self._h5group.keys(): | ||
del self._h5group[name] | ||
if name in self._columns.keys(): | ||
del self._columns[name] | ||
|
||
def create_group(self, | ||
name: str): | ||
|
@@ -264,6 +294,92 @@ def contains_field(self, field): | |
return True | ||
return False | ||
|
||
def _write_filter(self, filter): | ||
""" | ||
|
||
""" | ||
nformat = 'int32' if filter[-1] < 2 ** 31 - 1 else 'int64' | ||
filter_name = '_filter' | ||
if filter_name not in self._filters_grp.keys(): | ||
fld.numeric_field_constructor(self._dataset.session, self._filters_grp, filter_name, nformat) | ||
filter_field = fld.NumericField(self._dataset.session, self._filters_grp[filter_name], self, write_enabled=True) | ||
filter_field.data.write(filter) | ||
else: | ||
filter_field = fld.NumericField(self._dataset.session, self._filters_grp[filter_name], self, write_enabled=True) | ||
if nformat not in filter_field._fieldtype: | ||
filter_field = filter_field.astype(nformat) | ||
filter_field.data.clear() | ||
filter_field.data.write(filter) | ||
|
||
def _get_filter_grp(self, field: Union[str, fld.Field]=None): | ||
""" | ||
Get a filter array specified by the field or field name. | ||
""" | ||
filter_name = '_filter' | ||
return self._filters_grp[filter_name] | ||
|
||
# def set_filter(self, field: Union[str, fld.Field], filter): | ||
# """ | ||
# Add or modify a filter of the field. | ||
# | ||
# :param field: The target field. | ||
# :param filter: The filter, as list or np.ndarray of indices. | ||
# """ | ||
# if not isinstance(field, str) and not isinstance(field, fld.Field): | ||
# raise TypeError("The target field should be type field or string (name of the field in this dataframe).") | ||
# | ||
# name = field if isinstance(field, str) else field.name | ||
# if name not in self._columns: | ||
# raise ValueError("The target field is not in this dataframe.") | ||
# | ||
# nformat = 'int32' if filter[-1] < 2 ** 31 - 1 else 'int64' | ||
# if name in self._filters_grp.keys(): | ||
# filter_field = fld.NumericField(self._dataset.session, self._filters_grp[name], self, | ||
# write_enabled=True) | ||
# if nformat not in filter_field._fieldtype: | ||
# filter_field = filter_field.astype(nformat) | ||
# filter_field.data.clear() | ||
# filter_field.data.write(filter) | ||
# else: | ||
# fld.numeric_field_constructor(self._dataset.session, self._filters_grp, name, nformat) | ||
# filter_field = fld.NumericField(self._dataset.session, self._filters_grp[name], self, | ||
# write_enabled=True) | ||
# filter_field.data.write(filter) | ||
# | ||
# self._columns[name].filter = self._filters_grp[name] | ||
# return filter_field | ||
|
||
def remove_filter(self, field: Union[str, fld.Field]): | ||
""" | ||
Remove filter from this dataframe specified by the field or field name. | ||
""" | ||
if not isinstance(field, str) and not isinstance(field, fld.Field): | ||
raise TypeError("The target field should be type field or string (name of the field in this dataframe).") | ||
|
||
name = field if isinstance(field, str) else field.name | ||
if name not in self._columns: | ||
raise ValueError("The target field is not in this dataframe.") | ||
else: | ||
del self._filters_grp[name] | ||
|
||
# def get_data(self, field: Union[str, fld.Field]): | ||
# """ | ||
# Get the data from a field. The data returned is masked by the filter. | ||
# | ||
# """ | ||
# if not isinstance(field, str) and not isinstance(field, fld.Field): | ||
# raise TypeError("The target field should be type field or string (name of the field in this dataframe).") | ||
# | ||
# name = field if isinstance(field, str) else field.name | ||
# if name not in self.columns.keys(): | ||
# raise ValueError("Can not found the field name from this dataframe.") | ||
# else: | ||
# if name in self.filters.keys(): | ||
# d_filter = self.filters[name].data[:] | ||
# return self.columns[name].data[d_filter] | ||
# else: | ||
# return self.columns[name].data[:] | ||
|
||
def __getitem__(self, name): | ||
""" | ||
Get a field stored by the field name. | ||
|
@@ -317,8 +433,10 @@ def __delitem__(self, name): | |
if not self.__contains__(name=name): | ||
raise ValueError("There is no field named '{}' in this dataframe".format(name)) | ||
else: | ||
del self._h5group[name] | ||
del self._columns[name] | ||
if name in self._h5group.keys(): | ||
del self._h5group[name] | ||
if name in self._columns.keys(): | ||
del self._columns[name] | ||
|
||
def delete_field(self, field): | ||
""" | ||
|
@@ -478,19 +596,25 @@ def apply_filter(self, filter_to_apply, ddf=None): | |
:returns: a dataframe contains all the fields filterd, self if ddf is not set | ||
""" | ||
filter_to_apply_ = val.validate_filter(filter_to_apply) | ||
|
||
if ddf is not None: | ||
if not isinstance(ddf, DataFrame): | ||
raise TypeError("The destination object must be an instance of DataFrame.") | ||
ddf._write_filter(np.where(filter_to_apply_ == True)[0]) | ||
for name, field in self._columns.items(): | ||
newfld = field.create_like(ddf, name) | ||
field.apply_filter(filter_to_apply_, target=newfld) | ||
# hard copy | ||
# newfld = field.create_like(ddf, name) | ||
# field.apply_filter(filter_to_apply_, target=newfld) | ||
# soft copy - view | ||
newfld = ddf.add_view(field) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _add_view There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _add_view with filter (passing the filter in), remove the filter setter |
||
newfld.filter = ddf._get_filter_grp() | ||
|
||
return ddf | ||
else: | ||
for field in self._columns.values(): | ||
field.apply_filter(filter_to_apply_, in_place=True) | ||
return self | ||
|
||
|
||
def apply_index(self, index_to_apply, ddf=None): | ||
""" | ||
Apply an index to all fields in this dataframe, returns \ | ||
|
@@ -517,16 +641,31 @@ def apply_index(self, index_to_apply, ddf=None): | |
if ddf is not None: | ||
if not isinstance(ddf, DataFrame): | ||
raise TypeError("The destination object must be an instance of DataFrame.") | ||
for name, field in self._columns.items(): | ||
newfld = field.create_like(ddf, name) | ||
field.apply_index(index_to_apply, target=newfld) | ||
return ddf | ||
else: | ||
val.validate_all_field_length_in_df(self) | ||
if ddf == self: | ||
val.validate_all_field_length_in_df(self) | ||
for field in self._columns.values(): | ||
|
||
if ddf == self: | ||
field.apply_index(index_to_apply, in_place=True) | ||
else: | ||
newfld = field.create_like(ddf, field.name) | ||
field.apply_index(index_to_apply, target=newfld) | ||
else: # | ||
nformat = 'int32' if index_to_apply[-1] < 2 ** 31 - 1 else 'int64' | ||
for field in self._columns.values(): | ||
field.apply_index(index_to_apply, in_place=True) | ||
return self | ||
if field.name in self._filters_grp.keys(): | ||
flt_fld = fld.NumericField(self._dataset.session, self._filters_grp[field.name], self, | ||
write_enabled=True) | ||
if nformat not in flt_fld._fieldtype: | ||
flt_fld = flt_fld.astype(nformat) | ||
flt_fld.data.clear() | ||
flt_fld.data.write(index_to_apply) | ||
else: | ||
fld.numeric_field_constructor(self._dataset.session, self._filters_grp, field.name, nformat) | ||
flt_fld = fld.NumericField(self._dataset.session, self._filters_grp[field.name], self, | ||
write_enabled=True) | ||
flt_fld.data.write(index_to_apply) | ||
field.filter = flt_fld._field | ||
|
||
|
||
def sort_values(self, by: Union[str, List[str]], ddf: DataFrame = None, axis=0, ascending=True, kind='stable'): | ||
|
@@ -981,6 +1120,11 @@ def describe(self, include=None, exclude=None, output='terminal'): | |
print('\n') | ||
return result | ||
|
||
def view(self): | ||
dfv = self.dataset.create_dataframe(self.name + '_view') | ||
for f in self.columns.values(): | ||
dfv.add_view(f) | ||
return dfv | ||
|
||
|
||
class HDF5DataFrameGroupBy(DataFrameGroupBy): | ||
|
@@ -1656,4 +1800,4 @@ def _ordered_merge(left: DataFrame, | |
if right[k].indexed: | ||
ops.ordered_map_valid_indexed_stream(right[k], right_map, dest_f, invalid) | ||
else: | ||
ops.ordered_map_valid_stream(right[k], right_map, dest_f, invalid) | ||
ops.ordered_map_valid_stream(right[k], right_map, dest_f, invalid) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,8 @@ def __init__(self, session, group, dataframe, write_enabled=False): | |
self._value_wrapper = None | ||
self._valid_reference = True | ||
|
||
self._filter_wrapper = None | ||
|
||
@property | ||
def valid(self): | ||
""" | ||
|
@@ -144,6 +146,7 @@ def _ensure_valid(self): | |
raise ValueError("This field no longer refers to a valid underlying field object") | ||
|
||
|
||
|
||
class MemoryField(Field): | ||
|
||
def __init__(self, session): | ||
|
@@ -283,6 +286,7 @@ def __init__(self, field, dataset_name): | |
self._field = field | ||
self._name = dataset_name | ||
self._dataset = field[dataset_name] | ||
self._references = list() | ||
|
||
def __len__(self): | ||
""" | ||
|
@@ -299,6 +303,25 @@ def dtype(self): | |
""" | ||
return self._dataset.dtype | ||
|
||
def register_reference(self, field: Field): | ||
""" | ||
|
||
""" | ||
self._references.append(field) | ||
|
||
def detach_reference(self, field: Field): | ||
""" | ||
|
||
""" | ||
self._references.remove(field) | ||
|
||
def concreate_all_fields(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: concrete_all_fields |
||
""" | ||
|
||
""" | ||
for field in self._references: | ||
field.concrete_reference() | ||
|
||
def __getitem__(self, item): | ||
return self._dataset[item] | ||
|
||
|
@@ -310,6 +333,8 @@ def clear(self): | |
Replaces current dataset with empty dataset. | ||
:return: None | ||
""" | ||
if len(self._references) > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can do this check inside the notification method |
||
self.concreate_all_fields() | ||
nformat = self._dataset.dtype | ||
DataWriter._clear_dataset(self._field, self._name) | ||
DataWriter.write(self._field, self._name, [], 0, nformat) | ||
|
@@ -340,6 +365,8 @@ def write(self, part): | |
:param part: numpy array to write to field | ||
:return: None | ||
""" | ||
if len(self._references) > 0: | ||
self.concreate_all_fields() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move these into field, so that the callback flow is: fieldarray -> field -> field_references |
||
if isinstance(part, Field): | ||
part = part.data[:] | ||
DataWriter.write(self._field, self._name, part, len(part), dtype=self._dataset.dtype) | ||
|
@@ -2564,6 +2591,54 @@ def data(self): | |
self._value_wrapper = ReadOnlyFieldArray(self._field, 'values') | ||
return self._value_wrapper | ||
|
||
@data.setter | ||
def data(self, FieldArray): | ||
""" | ||
Setting the Field Array (data interface) directly. This can also associate field with an existing field array to enable a view. | ||
""" | ||
self._value_wrapper = FieldArray | ||
if self.is_view(): | ||
self.data.register_reference(self) | ||
|
||
@property | ||
def filter(self): | ||
if self._filter_wrapper is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return filter field rather than dereferencing |
||
return None | ||
else: | ||
return self._filter_wrapper[:] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just return the wrapper, not data |
||
return self._filter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dead code |
||
|
||
@filter.setter | ||
def filter(self, filter_h5group): | ||
self._filter_wrapper = WriteableFieldArray(filter_h5group, 'values') | ||
|
||
def is_view(self): | ||
""" | ||
Return if the dataframe's name matches the field h5group path; if not, means this field is a view. | ||
""" | ||
if self._field.name[1:1+len(self.dataframe.name)] != self.dataframe.name: | ||
return True | ||
else: | ||
return False | ||
|
||
def __getitem__(self, item): | ||
if self._filter_wrapper != None: | ||
data_filter = self._filter_wrapper[:] | ||
return self.data[item][data_filter] | ||
else: | ||
return self.data[item] | ||
|
||
def concrete_reference(self): | ||
if not self.is_view(): | ||
raise ValueError("This field is already a concreted field.") | ||
|
||
self.data.detach_reference(self) # notice field array | ||
print(self.name) | ||
del self.dataframe[self.name] # notice dataframe | ||
concrete_field = self.create_like(self.dataframe, self.name) # create | ||
concrete_field.data.write(self[:]) # write data | ||
return concrete_field | ||
|
||
def is_sorted(self): | ||
""" | ||
Returns if data in field is sorted | ||
|
@@ -4096,7 +4171,6 @@ def timestamp_field_create_like(source, group, name, timestamp): | |
else: | ||
return group.create_timestamp(name, ts) | ||
|
||
|
||
@staticmethod | ||
def apply_isin(source: Field, test_elements: Union[list, set, np.ndarray]): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest that you return the filter reference here so you can directly assign it during add_view (line 608)