From 8971914c46d796c5bbeb4085f8f7f561fc0aef5c Mon Sep 17 00:00:00 2001 From: Vanya Belyaev Date: Tue, 5 Dec 2023 13:05:06 +0100 Subject: [PATCH] 1. Improve `ds2numpy` code and test 1. small fixes for `ostap.utuls.split_ranges` 1. add conversion to int for `RooAbsCategory` 1. add iterator/contains/len functions for `RooAbsDataStore` --- ReleaseNotes/release_notes.md | 4 + ostap/fitting/dataset.py | 10 + ostap/fitting/ds2numpy.py | 335 +++++++------------ ostap/fitting/tests/test_fitting_ds2numpy.py | 50 ++- ostap/fitting/variables.py | 1 + ostap/utils/utils.py | 54 ++- 6 files changed, 207 insertions(+), 247 deletions(-) diff --git a/ReleaseNotes/release_notes.md b/ReleaseNotes/release_notes.md index eab359ec..2bb6f921 100644 --- a/ReleaseNotes/release_notes.md +++ b/ReleaseNotes/release_notes.md @@ -8,6 +8,10 @@ 1. add test for `ostap.stats.ustat` module 1. Add `Ostap::Math::two_samples` function 1. Add the first version of code for RooDataSety -> numpy conversion by Artem Egorychev + 1. Improve `ds2numpy` code and test + 1. small fixes for `ostap.utuls.split_ranges` + 1. add conversion to int for `RooAbsCategory` + 1. add iterator/contains/len functions for `RooAbsDataStore` ## Backward incompatible: diff --git a/ostap/fitting/dataset.py b/ostap/fitting/dataset.py index 1d375979..e1725c48 100644 --- a/ostap/fitting/dataset.py +++ b/ostap/fitting/dataset.py @@ -819,6 +819,16 @@ def _rds_make_unique_ ( dataset , ] +ROOT.RooAbsDataStore . __len__ = lambda s : s.numEntries() +ROOT.RooAbsDataStore . __iter__ = _rad_iter_ +ROOT.RooAbsDataStore . __contains__ = _rad_contains_ + + +_new_methods_ += [ + ROOT.RooAbsDataStore . __len__ , + ROOT.RooAbsDataStore . __iter__ , + ROOT.RooAbsDataStore . __contains__ + ] # ============================================================================= ## Helper project method for RooDataSet/DataFrame/... and similar objects # diff --git a/ostap/fitting/ds2numpy.py b/ostap/fitting/ds2numpy.py index aeb13e10..0ee4690a 100644 --- a/ostap/fitting/ds2numpy.py +++ b/ostap/fitting/ds2numpy.py @@ -21,9 +21,11 @@ ) # ============================================================================= from ostap.core.meta_info import root_info -from ostap.core.ostap_types import string_types +from ostap.core.ostap_types import string_types +from ostap.utils.utils import split_range +from ostap.fitting.dataset import useStorage +from ostap.utils.progress_bar import progress_bar import ostap.fitting.roocollections -import ostap.fitting.dataset import ROOT # ============================================================================= # logging @@ -75,16 +77,27 @@ def add_weight ( ds , data ): return data # ============================================================================= -if np and ( 6 , 26 ) <= root_info : ## 6.26 <= ROOT +if np and ( 6 , 26 ) <= root_info : ## 6.26 <= ROOT # ============================================================================= # ========================================================================= - ## Convert dataset into numpy array using ROOT.RooDataSet.to_numpy - # @see `ROOT.RooDataSet.to_numpy` - # - def ds2numpy ( dataset , var_lst ) : - """ Convert dataset into numpy array using `ROOT.RooDataSet.to_numpy` methdod from new ROOT - - see `ROOT.RooDataSet.to_numpy` + ## Convert dataset into numpy array using ROOT.RooAbsData interface + # @see ROOT.RooAbsData.getBatches + # @see ROOT.RooAbsData.getCategoryBatches + # @see ROOT.RooAbsData.getWeightBatche + # @see ROOT.RooAbsDataStore.getBatches + # @see ROOT.RooAbsDataStore.getCategoryBatches + # @see ROOT.RooAbsDataStore.getWeightBatche + # @attention conversion to ROOT.RooVectorDataStore is used! + def ds2numpy ( dataset , var_lst , silent = True ) : + """ Convert dataset into numpy array using `ROOT.RooAbsData` iterface + - see ROOT.RooAbsData.getBatches + - see ROOT.RooAbsData.getCategoryBatches + - see ROOT.RooAbsData.getWeightBatche + - see ROOT.RooAbsDataStore.getBatches + - see ROOT.RooAbsDataStore.getCategoryBatches + - see ROOT.RooAbsDataStore.getWeightBatche + - attention: Conversion to `ROOT.RooVectorDataStore` is used! """ ## 1) check that all variables are present in dataset @@ -98,132 +111,96 @@ def ds2numpy ( dataset , var_lst ) : ## 2) check that all variables are present in the dataset assert all ( ( v in dataset ) for v in var_lst ) , 'Not all variables are in dataset!' - ## remove duplicated - new_names = [] - for v in vnames : - if not v in new_names : new_names.append ( v ) - vnames = new_names - ## 3) reduce dataset if only a small subset of variables is requested - nvars = len( dataset.get() ) - if 2 * len ( vnames ) <= nvars : - dstmp = dataset.subset ( vnames ) + nvars = len ( dataset.get() ) + if 2 * len ( vnames ) <= nvars : + with useStorage ( ROOT.RooAbsData.Vector ) : + dstmp = dataset.subset ( vnames ) result = ds2numpy ( dstmp , vnames ) dstmp.erase() del dstmp - return result - - ## 4) convert to numpy - data = dataset.to_numpy() - - ## 5) convert to named/structured array - - dtypes = [ ( name , data [ name ].dtype ) for name in vnames if name in data ] - lst = [ data [ name ] for name in vnames if name in data ] - - ## 6) add the weight - if dataset.isWeighted() : - weight = dataset.weightVar().GetName() - if not weight in vnames : - dtypes.append ( ( weight , data [ weight ] .dtype ) ) - lst .append ( data [ weight ] ) - - ## is there a better way to avoid a creation of lists ??? - data = np.array ( list ( zip ( *lst ) ) , dtype = dtypes ) - - return data - - - __all__ = __all__ + ( 'ds2numpy' , ) - ROOT.RooDataSet.tonumpy = ds2numpy - ROOT.RooDataSet.tonp = ds2numpy - ROOT.RooDataSet.to_np = ds2numpy - _new_methods_ += [ ROOT.RooDataSet.tonumpy , - ROOT.RooDataSet.tonp , - ROOT.RooDataSet.to_np ] + return result + + ## 4) convert to VectorDataStore + # batches are not (yet) implemented for Tree & Composite stores + dataset.convertToVectorStore() + ## dataset.convertToTreeStore() + + ## 5) convert to VectorStore again... + # batches are not (yet) implemented for Tree & Composite stores + store = dataset.store() + source = dataset + twoargs = False + if not isinstance ( store , ROOT.RooVectorDataStore ) : + source = ROOT.RooVectorDataStore ( store , dataset.get() , store.name + '_vct' ) + twoargs = True -# ============================================================================= -elif np and ( 6 , 24 ) <= root_info : ## 6.24 <= ROOT < 6.26 -# ============================================================================= - # ========================================================================= - ## Convert dataset into numpy array using ROOT.RooVectorDataStore.getArrays - # @see `ROOT.RooVectorDataStore.getArrays` - # - def ds2numpy ( dataset , var_lst ) : - """ Convert dataset into numpy array using `ROOT.RooVectorDataStore.getArrays` - - see `ROOT.RooVectorDataStore.getArrays` - """ + vars = source.get() + vars = [ v for v in vars if v.name in vnames ] + doubles = [ v.name for v in vars if isinstance ( v , ROOT.RooAbsReal ) ] + categories = [ v.name for v in vars if isinstance ( v , ROOT.RooAbsCategory ) ] - ## 1) check that all variables are present in dataset - if all ( isinstance ( v , string_types ) for v in var_lst ) : - vnames = [ v for v in var_lst ] - elif all ( isinstance ( v , ROOT.RooAbsArg ) for v in var_lst ) : - vnames = [ v.GetName() for v in var_lst ] - else : - raise TypeError ( "Invalid type of `var_list`!" ) + ## name of weight variable + weight = '' if not dataset.isWeighted() else dataset.weightVar().GetName () - ## 2) check that all variables are present in the dataset - assert all ( ( v in dataset ) for v in var_lst ) , 'Not all variables are in dataset!' - - ## remove duplicated - new_names = [] + dtypes = [] for v in vnames : - if not v in new_names : new_names.append ( v ) - vnames = new_names - - ## 3) reduce dataset if only a small subset of variables is requested - nvars = len( dataset.get() ) - if 2 * len ( vnames ) <= nvars : - dstmp = dataset.subset ( vnames ) - result = ds2numpy ( dstmp , vnames ) - dstmp.erase() - del dstmp - return result - - ## 4) here we need RooVectorDataStore - store = dataset.store() - if not isinstance ( store , ROOT.RooVectorDataStore ) : - dataset.ConvertToVectorStore() - store = dataset.store() + if v in doubles : dtypes.append ( ( v , np.float64 ) ) + elif v in vategories : dtypes.append ( ( v , np.int64 ) ) + if weight : dtypes.append ( ( weight , np.float64 ) ) - new_store = False - if not isinstance ( store , ROOT.RooVectorDataStore ) : - variables = store.get() - store = ROOT.RooVectorDataStore ( store, variables , store.GetName() ) - new_store = True - - ## 5) get arrays from the store + ## get data in batches + nevts = len ( dataset ) - array_info = store.getArrays() - n = array_info.size + data = None - ## 6) using numpy structured array - dtypes = [ ( name , 'f8') for name in vnames ] + ## maximal size of data chunk + nmax = max ( nevts // 6 , 30000 // nvars ) - ## 7) weight? - if dataset.isWeighted() : - weight = dataset.weightVar().GetName() - if not weight in vnames : - dtypes.append ( ( weight , 'f8' ) ) - - ## 8) create the structured array - data = np.zeros ( len ( dtypes ) , dtype = dtypes ) - - for x in array_info.reals: - if x.name in vnames : - data [ x.name ] = np.frombuffer ( x.data , dtype = np.float64 , count = n ) + ## get data is chunks/batches + for first, last in progress_bar ( split_range ( 0 , nevts , nmax ) , silent = silent ) : + + num = last - first + wget = False + part = np.zeros ( num , dtype = dtypes ) + + if doubles : + dpart = source.getBatches ( first , num ) + for d in dpart : + dname = d.first.name + if dname in doubles : + part [ dname ] = d.second + + + elif d == weight : + part [ dname ] = d.second + del dpart - for x in array_info.cats: - if x.name in vnames : - data [ x.name ] = np.frombuffer ( x.data , dtype = np.int32 , count = n ) - - if new_store : ## delete newly created store - store.reset() - del store - - ## check here!!! - return add_weight ( dataset , data ) + if categories : + cpart = source.getCategoryBatches ( first , num ) + for c in cpart : + cname = c.first.name + if cname in categroies : + part [ cname ] = c.second + del cpart + + if weight and not wget : + if twoargs : weights = source.getWeightBatch ( first , num ) + else : weights = source.getWeightBatch ( first , num , False ) + if weights : part [ weight ] = weights + else : part [ weight ] = np.full ( num , source.weight() , dtype = np.float64 ) + + if data is None : data = part + else : + data = np.concatenate ( [ data , part ] ) + del part + + if not source is dataset : + source.reset() + del source + + return data __all__ = __all__ + ( 'ds2numpy' , ) @@ -234,21 +211,16 @@ def ds2numpy ( dataset , var_lst ) : ROOT.RooDataSet.tonp , ROOT.RooDataSet.to_np ] - # ============================================================================= -elif np and ( 6, 20 ) <= root_info : ## 6.20 <= ROOT < 6.24 +elif np : ## ROOT < 6.26 # ============================================================================= # ========================================================================= - ## Convert dataset into numpy array using ROOT.RooVectorDataStore.getBatches - # @see `ROOT.RooVectorDataStore.getBatches` - # - def ds2numpy ( dataset , var_lst ) : - """ Convert dataset into numpy array using `ROOT.RooVectorDataStore.getBatches` - - see `ROOT.RooVectorDataStore.getBatches` + ## Convert dataset into numpy array using (slow) explicit loops + def ds2numpy ( dataset , var_lst , silent = False ) : + """ Convert dataset into numpy array using (slow) explicit loops """ - ## 1) check that all variables are present in dataset if all ( isinstance ( v , string_types ) for v in var_lst ) : vnames = [ v for v in var_lst ] @@ -260,94 +232,45 @@ def ds2numpy ( dataset , var_lst ) : ## 2) check that all variables are present in the dataset assert all ( ( v in dataset ) for v in var_lst ) , 'Not all variables are in dataset!' - ## remove duplicated - new_names = [] - for v in vnames : - if not v in new_names : new_names.append ( v ) - vnames = new_names - ## 3) reduce dataset if only a small subset of variables is requested - nvars = len( dataset.get() ) + nvars = len ( dataset.get() ) if 2 * len ( vnames ) <= nvars : dstmp = dataset.subset ( vnames ) result = ds2numpy ( dstmp , vnames ) dstmp.erase() del dstmp return result + + vars = dataset.get() + vars = [ v for v in vars if v.name in vnames ] + doubles = [ v.name for v in vars if isinstance ( v , ROOT.RooAbsReal ) ] + categories = [ v.name for v in vars if isinstance ( v , ROOT.RooAbsCategory ) ] - ## 4) here we need RooVectorDataStore - store = dataset.store() - if not isinstance ( store , ROOT.RooVectorDataStore ) : - dataset.ConvertToVectorStore() - store = dataset.store() - - new_store = False - if not isinstance ( store , ROOT.RooVectorDataStore ) : - variables = store.get() - store = ROOT.RooVectorDataStore ( store, variables, store.GetName() ) - new_store = True + ## name of weight variable + weight = '' if not dataset.isWeighted() else dataset.weightVar().GetName () + dtypes = [] + for v in vnames : + if v in doubles : dtypes.append ( ( v , np.float64 ) ) + elif v in vategories : dtypes.append ( ( v , np.int64 ) ) + if weight : dtypes.append ( ( weight , np.float64 ) ) - #$ 5) using numpy structed array - dtypes = [ ( name , 'f8') for name in vnames ] - - ## 6) weight? - weight = None - if dataset.isWeighted() : - weight = dataset.weightVar().GetName() - if not weight in vnames : - dtypes.append ( ( weight , 'f8' ) ) - - ## 7) book the array - - # for large datasets - # check batch size * var size < 10^6 - num_entries = len ( dataset ) - data_limit = num_entries * nvars - num_limit = 110000000 - nb , r = divmod ( n , num_limit ) - - ## - ## - ## REWRITE: should be RunContext here!!! - ## - if data_limit < num_limit: - data = np.zeros ( len ( dtypes ) , dtype = dtypes ) - batches = store.getBatches ( 0 , n) - count = 0 - for name in vnames : - for x in batches : - if name == x.first.__follow__().GetName() : - data [ name ] = x.second - break - if weight : - data [ weight ] = store.getWeightBatch ( 0 , n ) - - else: - - rargs = [ ( i * num_limit , num_limit ) for i in range ( nb ) ] + [ ( nb * num_limit , r ) ] - - data = None - for first , num in rargs : - - part = np.zeros ( num , dtype = dtypes ) - batches = store.getBatches ( first, num) - for x in vnames : - for x in batches : - if name == x.first.__follow__().GetName() : - part [ name ] = x.second - break - if weight : part [ weight ] = store.getWeightBatch ( 0 , n ) - - if data : data = np.concatenate ( [ data , part ] ) - else : data = part + ## create data + data = np.zeros ( len ( dataset ) , dtype = dtypes ) + + ## make an explict loop: + for i , evt in enumerate ( progress_bar ( dataset , silent = silent ) ) : + + for v in evt : + vname = v.name + if vname in doubles : data [ vname ] [ i ] = float ( v ) + elif vname in categories : data [ vname ] [ i ] = int ( v ) - if new_store : ## delete newly created store - store.reset() - del store - + if weight : data [ weight ] [ i ] = dataset.weight() + return data + __all__ = __all__ + ( 'ds2numpy' , ) ROOT.RooDataSet.tonumpy = ds2numpy @@ -357,6 +280,8 @@ def ds2numpy ( dataset , var_lst ) : ROOT.RooDataSet.tonp , ROOT.RooDataSet.to_np ] + + # ============================================================================= else : # ============================================================================= diff --git a/ostap/fitting/tests/test_fitting_ds2numpy.py b/ostap/fitting/tests/test_fitting_ds2numpy.py index 47134461..c4957a0c 100644 --- a/ostap/fitting/tests/test_fitting_ds2numpy.py +++ b/ostap/fitting/tests/test_fitting_ds2numpy.py @@ -39,8 +39,8 @@ def test_small_ds(): y.setVal(y_val) data.add(ROOT.RooArgSet(x, y)) - with timing ('Test small ds' , logger ) : - ws = ds2numpy ( data, ['x', 'y'] ) + + ws = ds2numpy ( data, ['x', 'y'] ) # ============================================================================= @@ -68,12 +68,10 @@ def test_small_ds_with_weights(): ds = data.makeWeighted('x+y') - with timing ('Test small ds with weights' , logger ) : - ws = ds2numpy ( ds, ['x', 'y' ] ) + ws = ds2numpy ( ds, ['x', 'y' ] ) # ============================================================================= -## def test_ds_with_weights(): -if 1 < 2 : +def test_ds_with_weights(): logger = getLogger ( 'test_ds2numpy_ds_with_weights' ) @@ -91,7 +89,7 @@ def test_small_ds_with_weights(): # Заполняем датасет случайными данными random_generator = ROOT.TRandom3(42) # устанавливаем seed - NN = 10 + NN = 10000 for _ in range ( NN ): x_val = random.gauss ( 0 , 1 ) y_val = random.gauss ( 10 , 1 ) @@ -106,17 +104,15 @@ def test_small_ds_with_weights(): ds = data.makeWeighted('x+y') - with timing ('Test ds with weights ' , logger ) : - ws = ds2numpy ( ds , ['x', 'y' , 'q' ] ) + ws = ds2numpy ( ds , ['x', 'y' , 'q' ] ) # ============================================================================= def test_large_ds_with_weights(): - logger = getLogger ( 'test_ds2numpy_large_ds_with_weights' ) N = 100 - NN = 50000 + NN = 10000 # Создаем RooDataSet variables = [] for i in range ( N ): @@ -138,17 +134,16 @@ def test_large_ds_with_weights(): var_lst = list ( set( "x{}".format( random.randint ( 0 , N - 1 ) ) for i in range ( 50 ) ) ) - with timing ('Test large ds with weights ' , logger ) : - ws = ds2numpy(ds, var_lst ) + ws = ds2numpy(ds, var_lst ) # ============================================================================ def test_large_ds_without_weights(): - logger = getLogger ( 'test_ds2numpy_large_ds_without_weights' ) + logger = getLogger ( 'test_ds2numpy_large_ds_no_weights' ) N = 100 - NN = 50000 + NN = 10000 # Создаем RooDataSet variables = [] for i in range ( N ): @@ -169,29 +164,28 @@ def test_large_ds_without_weights(): var_lst = list ( set( "x{}".format( random.randint ( 0 , N - 1 ) ) for i in range ( 50 ) ) ) - with timing ('Test large ds without weights ' , logger ) : - ws = ds2numpy(data, var_lst ) + ws = ds2numpy(data, var_lst ) # ============================================================================ if '__main__' == __name__ : - pass + ## pass - ## with timing ('Test small ds' , logger ) : -## test_small_ds() + with timing ('Test small ds' , logger ) : + test_small_ds() -## with timing ('Test small dataset with weights', logger ) : -## test_small_ds_with_weights() + with timing ('Test small dataset with weights', logger ) : + test_small_ds_with_weights() -## with timing ('Test large dataset with weights', logger ) : -## test_ds_with_weights() + with timing ('Test large dataset with weights', logger ) : + test_ds_with_weights() -## with timing ('Test large dataset with weights', logger ) : -## test_large_ds_with_weights() + with timing ('Test large dataset with weights', logger ) : + test_large_ds_with_weights() -## with timing ('Test large dataset without weights', logger ) : -## test_large_ds_without_weights() + with timing ('Test large dataset without weights', logger ) : + test_large_ds_without_weights() # ============================================================================= diff --git a/ostap/fitting/variables.py b/ostap/fitting/variables.py index 614fad08..cbe7adca 100644 --- a/ostap/fitting/variables.py +++ b/ostap/fitting/variables.py @@ -960,6 +960,7 @@ def _rcat_str_ ( cat ) : ROOT.RooAbsCategory.keys = _racat_labels_ ROOT.RooCategory .__str__ = _rcat_str_ ROOT.RooCategory .__repr__ = _rcat_str_ +ROOT.RooCategory .__int__ = lambda s : s.getCurrentIndex() _new_methods_ += [ ROOT.RooAbsCategory.__iter__ , diff --git a/ostap/utils/utils.py b/ostap/utils/utils.py index cefcf8cf..a05d7340 100644 --- a/ostap/utils/utils.py +++ b/ostap/utils/utils.py @@ -986,6 +986,43 @@ def crange ( vmin , vmax , n = 10 ) : """ return CRange ( vmin , vmax , n ) + + +# ============================================================================= +## split range into smaller chunks: +# @code +# for i in SplitRange ( 0 , 10000 , 200 ) : +# for j in range (*i) : +# ... +# @endcode +class SplitRange(object) : + """Split range into smaller chunks: + >>> for i in SplitRange ( 0 , 10000 , 200 ) : + >>> for j in range (*i) : + >>> ... + """ + def __init__ ( self , low , high , num ) : + + self.__low = low + self.__high = high + self.__num = num + + self.__size = 0 + if low < high and 1 <= num : + self.__size , r = divmod ( self.__high - self.__low , self.__num ) + if r : self.__size += 1 + + def __iter__ ( self ) : + + if 1 <= self.__num : + while self.__low < self.__high : + yield self.__low , self.__high + self.__low += self.__num + + def __len__ ( self ) : + return self.__size + + # ============================================================================= ## split range into smaller chunks: # @code @@ -999,19 +1036,7 @@ def split_range ( low , high , num ) : >>> for j in range (*i) : >>> ... """ - if high <= low or num < 1 : - - yield low , low - - else : - - next = low + num - while next < high : - yield low , next - low = next - next += num - - yield low , high + return SplitRange ( low , high , num ) # ============================================================================= ## split range into num smaller chunks of approximate size @@ -1026,7 +1051,8 @@ def split_n_range ( low , high , num ) : >>> for j in range (*i) : >>> ... """ - if high <= low or num < 1 : yield low , low + + if high <= low or num < 1 : pass elif 1 == num : yield low , high elif low < high and high <= num + low : yield low , high else :