Skip to content

Commit

Permalink
Merge pull request #4 from neuro-team-femto/sim
Browse files Browse the repository at this point in the history
Merge Simulation API
  • Loading branch information
jjau authored Apr 26, 2024
2 parents b23b2f3 + 0ad8fa1 commit 618a482
Show file tree
Hide file tree
Showing 32 changed files with 11,175 additions and 125 deletions.
2,101 changes: 2,101 additions & 0 deletions data/pitch_interrogation/results_subj_20111971.csv

Large diffs are not rendered by default.

5,101 changes: 5,101 additions & 0 deletions python/model.csv

Large diffs are not rendered by default.

132 changes: 66 additions & 66 deletions python/new_sim.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions python/palin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
3 changes: 3 additions & 0 deletions python/palin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
#from palin.utils import *
#from palin.kernels import *
#from palin.internal_noise import *

from palin.kernels import classification_images
from palin.metrics import metrics
1 change: 1 addition & 0 deletions python/palin/internal_noise/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
160 changes: 160 additions & 0 deletions python/palin/internal_noise/double_pass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python
'''
PALIN toolbox v0.1
December 2022, Aynaz Adl Zarrabi, JJ Aucouturier (CNRS/UBFC)
Functions for kernel calculating method in Classification images
'''

import pandas as pd
import numpy as np
import os.path
import warnings
import ast
from .internal_noise_extractor import InternalNoiseExtractor
from ..simulation.linear_observer import LinearObserver
from ..simulation.simple_experiment import SimpleExperiment
from ..simulation.trial import Int2Trial, Int1Trial
from ..simulation.double_pass_experiment import DoublePassExperiment
from ..simulation.trial import Int2Trial, Int1Trial
from ..simulation.linear_observer import LinearObserver
from ..simulation import double_pass_statistics as dps
from ..simulation.simulation import Simulation as Sim


class DoublePass(InternalNoiseExtractor):
'''
This class provides methods to estimate internal noise and criteria from reverse correlation data, provided the data has double pass trials.
Internal noise and criteria are computed using the Ponsot/Neri double-pass simulation method, by first computing prob_agree and prob_first from double-pass data,
and then simulating an ideal observer with a range of internal noise and criteria values to find what duplet of values generates prob_agree and prob_first that are closest to those observed in the actual data.
'''

@classmethod
def extract_single_internal_noise(cls,data_df, trial_id, stim_id, feature_id, value_id, response_id, model_file, rebuild_model=False, internal_noise_range=np.arange(0,5,.1),criteria_range=np.arange(-5,5,1), n_repeated_trials=100, n_runs=10):
'''
Extracts internal noise and criteria for a single observer/session.
To extract for several users/sessions, use the superclass's method extract_internal_noise
'''
double_pass_id = 'double_pass_id' # column by which to identify double pass trials
# index double pass trials
data_df = cls.index_double_pass_trials(data_df, trial_id=trial_id, value_id = value_id, double_pass_id = double_pass_id)
# compute probability of agreement over double pass
prob_agree = cls.compute_prob_agreement(data_df, trial_id=trial_id, response_id=response_id, double_pass_id=double_pass_id)
# compute probability of choosing first response option
prob_first = cls.compute_prob_first(data_df, trial_id=trial_id, response_id=response_id, stim_id=stim_id, double_pass_id=double_pass_id)

internal_noise, criteria = cls.estimate_noise_criteria(prob_agree, prob_first, model_file, rebuild_model, internal_noise_range,criteria_range, n_repeated_trials, n_runs)

return internal_noise,criteria

def __str__(self):
return 'Double-Pass method'

@classmethod
def index_double_pass_trials(cls, data_df, trial_id='trial',double_pass_id='double_pass_id',value_id='value'):
'''
Runs over data by a single user, identifies any repeated trials based on the set of their values, and tags them with a column called double_pass_id.
At the end of the procedure, data-df[double_pass_id].max() is the total number of repeated trials found in the data.
'''
# represent the several values of a given trial (ex. 6 features for interval 1, 6 features for interval 2) as a tuple
set_df = data_df.groupby(trial_id).agg({value_id: lambda group: tuple(group)}).reset_index()

# count how many trials have each unique pair of stimuli
pass_count_df = set_df.groupby(value_id).agg({trial_id: ['nunique','first','last']})
pass_count_df.columns = ["_".join(x) for x in pass_count_df.columns]
pass_count_df = pass_count_df.reset_index()

# identify pairs of stimuli that have 2 trials (i.e. for which there has been a double pass)
double_pass_df = pass_count_df[pass_count_df['%s_nunique'%trial_id]==2].reset_index(drop=True)

# assign unique id
double_pass_df[double_pass_id] = double_pass_df.index

# join to base dataset
double_pass_df = double_pass_df.melt(id_vars=double_pass_id,
value_vars=['%s_first'%trial_id,'%s_last'%trial_id],
var_name='%s_type'%trial_id,
value_name=trial_id)
data_df= pd.merge(data_df, double_pass_df[[trial_id, double_pass_id]],
how="left", on=trial_id)
return data_df


@classmethod
def compute_prob_agreement(cls,data_df, trial_id='trial', response_id='response', double_pass_id='double_pass_id'):
'''
Computes the probability of giving the same response over all pairs of repeated trials (as identified by double_pass_id, see index_double_pass_trials)
'''
# compute agreements for each double_pass trial
def same_answer(group, trial_id, response_id):
d = group.groupby(trial_id).agg({response_id: lambda group: tuple(group)}).reset_index()
return d.response.nunique()==1
agrees = data_df.groupby(double_pass_id).apply(lambda group: same_answer(group, trial_id, response_id))

# return agreement probability
return agrees.sum()/len(agrees)

@classmethod
def compute_prob_first(cls, data_df, trial_id='trial', response_id='response', stim_id='stim_order', double_pass_id='double_pass_id'):
'''
Computes probability to choose the first response option (i.e. a measure of response bias) across the subset of double_pass trials
'''

# compute first response for each double_pass trial
def first_option(group, stim_id, response_id):
resp = group.sort_values(by=stim_id)[response_id].iloc[0]
return resp==1
firsts = data_df[data_df[double_pass_id].notna()].groupby(trial_id).apply(lambda group: first_option(group, stim_id, response_id))

return firsts.sum()/len(firsts)

@classmethod
def estimate_noise_criteria(cls,prob_agree, prob_first, model_file,rebuild_model=False, internal_noise_range=np.arange(0,5,.1),criteria_range=np.arange(-5,5,1), n_repeated_trials=100, n_runs=10):
'''
Estimates internal noise and criteria given a measure of prob_agree and prob_first.
Either uses a prebuilt model (a dataframe previously generated by @build_model and stored as a .csv file), or rebuild a new model.
Searches through a range of possible internal noise and criteria values
'''
# load model or rebuild
if os.path.isfile(model_file) & ~rebuild_model:
model_df = pd.read_csv(model_file, index_col=0)
else:
model_df = cls.build_model(internal_noise_range, criteria_range, n_repeated_trials, n_runs)
model_df.to_csv(model_file)

# find internal_noise & criteria settings that minimizes distance to prob_agree and prob_first
model_df['dist'] = model_df.apply(lambda row: (row.prob_agree-prob_agree)**2 + (row.prob_first-prob_first)**2, axis=1)

best_match = model_df[model_df.dist==model_df.dist.min()]

return best_match.internal_noise_std.iloc[0], best_match.criteria.iloc[0]

@classmethod
def build_model(cls,internal_noise_range=np.arange(0,5,.1),criteria_range=np.arange(-5,5,1), n_repeated_trials=100, n_runs=10):
'''
Build a model that associates a range of internal noise and criteria values with their corresponding (simulated) prob_agree and prob_first.
This uses a simulated LinearObserver, and returns the model as a dataframe
'''
print('Rebuilding double-pass model')

observer_params = {'kernel':[[1]],
'internal_noise_std':internal_noise_range,
'criteria':criteria_range}
experiment_params = {'n_trials':[n_repeated_trials],
'n_repeated':[n_repeated_trials],
'trial_type': [Int2Trial],
'n_features': [1],
'external_noise_std': [1]}
analyser_params = {}

sim = Sim(DoublePassExperiment, experiment_params,
LinearObserver, observer_params,
dps.DoublePassStatistics, analyser_params)

sim_df = sim.run_all(n_runs=n_runs, verbose=True)

# average measures over all runs
sim_df.groupby(['internal_noise_std','criteria'])[dps.DoublePassStatistics.get_metric_names()].mean()
return sim_df


27 changes: 27 additions & 0 deletions python/palin/internal_noise/internal_noise_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python
'''
PALIN toolbox v0.1
Decemberr 2022, Aynaz Adl Zarrabi, JJ Aucouturier (CNRS/UBFC)
Functions for kernel calculating method in Classification images
'''

import pandas as pd
import numpy as np
from abc import ABC, abstractmethod

class InternalNoiseExtractor(ABC):

@classmethod
@abstractmethod
def extract_single_internal_noise(cls,data_df, trial_id = 'trial_id', feature_id = 'feature', value_id = 'value', response_id = 'response'):
raise NotImplementedError()

@classmethod
def extract_internal_noise(cls,data_df, group_ids, trial_id, feature_id, value_id, response_id, model_file):

# for each level in group, extract internal_noise
return data_df.groupby(group_ids).apply(lambda group: cls.extract_single_internal_noise(group,
trial_id, feature_id, value_id, response_id, model_file)).reset_index()


1 change: 1 addition & 0 deletions python/palin/kernels/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
112 changes: 53 additions & 59 deletions python/palin/kernels/classification_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,60 @@

import pandas as pd
import numpy as np
from .kernel_extractor import KernelExtractor

def compute_kernel(data_df,trial_ids=['experimentor','type','subject','session'], dimension_ids=['segment'],response_id='response', value_id='pitch', normalize=True):
''' computes first-order temporal kernels for each participant using the classification image, ie.
mean(stimulus features classified as positive) - mean(stimulus features classified as negative)'''

# for each participant, average stimulus features (e.g. mean pitch for each segment) separately for positive and negative responses, and subtract positives - negatives
dimension_mean_value = data_df.groupby(trial_ids+[dimension_ids]+[response_id])[value_id].mean().reset_index()
positives = dimension_mean_value.loc[dimension_mean_value[response_id] == True].reset_index()
negatives = dimension_mean_value.loc[dimension_mean_value[response_id] == False].reset_index()
kernels = pd.merge(positives, negatives, on=trial_ids+[dimension_ids])
kernels['kernel_value'] = kernels['%s_x'%value_id] - kernels['%s_y'%value_id]

if(normalize):
# Kernel are then normalized for each participant/session by dividing them
# by the square root of the sum of their squared values.
kernels['square_value'] = kernels['kernel_value']**2
if trial_ids:
for_norm = kernels.groupby(trial_ids)['square_value'].mean().reset_index()
kernels = pd.merge(kernels, for_norm, on=trial_ids, suffixes=('', '_mean'))
else:
kernels['square_value_mean'] = kernels.square_value.mean()

kernels['kernel_value'] = kernels['kernel_value']/np.sqrt(kernels['square_value_mean'])
kernels.drop(columns=['square_value', 'square_value_mean'], inplace=True)

kernels.drop(columns=['index_x','%s_x'%response_id,'%s_x'%value_id,'index_y','%s_y'%response_id,'%s_y'%value_id], inplace=True)

return kernels

def compute_accuracy(data_df, control_kernel, session_identifiers = ['experimentor','type','subject','session'], trial_identifier = 'trial', stimulus_dimension='segment', stimulus_value = 'pitch', stimulus_response='response'):
''' Computes participant's accuracy on the task as a measure of how well the participant performs
compared to an ideal participant model having the control group as an internal representation and zero internal noise.
Accuracy therefore combines both internal representation and noise in a single measure.'''

# for each participant, in each trial, compute the dot product of each stimulus with the control group kernel

# create a df of positive and negative trial data for each participant
positives = data_df.loc[data_df[stimulus_response] == 1].reset_index()[session_identifiers + [trial_identifier,stimulus_dimension,stimulus_value]]
negatives = data_df.loc[data_df[stimulus_response] == 0].reset_index()[session_identifiers + [trial_identifier,stimulus_dimension,stimulus_value]]

trial_data = positives.merge(negatives,
on=session_identifiers + [trial_identifier,stimulus_dimension],
suffixes=('_pos','_neg'))

# dot product of each positive and negative trial with the control group's kernel
def dot_control(x):
#print(list(x))
#print(control_kernel)
return np.dot(list(x), control_kernel)

trial_data = trial_data.groupby(session_identifiers+[trial_identifier]).agg({'%s_pos'%stimulus_value:dot_control,
'%s_neg'%stimulus_value:dot_control}).reset_index()

# count hits as trials for which the positive stimuli is the one with higher dot product to control kernel
trial_data['hit'] = trial_data['%s_pos'%stimulus_value] > trial_data['%s_neg'%stimulus_value]

# compute hit rate (average hit across trials) per participant
hit_rate = trial_data.groupby(session_identifiers, as_index=False).hit.mean()

return hit_rate
class ClassificationImage(KernelExtractor):

@classmethod
def extract_single_kernel(cls, data_df, feature_id = 'feature', value_id = 'value', response_id = 'response'):

## note this doesn't work for 1-int data

feature_average = data_df.groupby([feature_id,response_id])[value_id].mean().reset_index()
positives = feature_average.loc[feature_average[response_id] == True].reset_index()
negatives = feature_average.loc[feature_average[response_id] == False].reset_index()
kernels = pd.merge(positives, negatives, on=feature_id, suffixes=('_true','_false'))
kernels['kernel_value'] = kernels['%s_true'%value_id] - kernels['%s_false'%value_id]
kernels = kernels[[feature_id,'kernel_value']].set_index(feature_id)
kernels.index.names = ['feature']
return kernels

def __str__(self):
return 'Classification Image'



# def compute_accuracy(data_df, control_kernel, session_identifiers = ['experimentor','type','subject','session'], trial_identifier = 'trial', stimulus_dimension='segment', stimulus_value = 'pitch', stimulus_response='response'):
# ''' Computes participant's accuracy on the task as a measure of how well the participant performs
# compared to an ideal participant model having the control group as an internal representation and zero internal noise.
# Accuracy therefore combines both internal representation and noise in a single measure.'''

# # for each participant, in each trial, compute the dot product of each stimulus with the control group kernel

# # create a df of positive and negative trial data for each participant
# positives = data_df.loc[data_df[stimulus_response] == 1].reset_index()[session_identifiers + [trial_identifier,stimulus_dimension,stimulus_value]]
# negatives = data_df.loc[data_df[stimulus_response] == 0].reset_index()[session_identifiers + [trial_identifier,stimulus_dimension,stimulus_value]]

# trial_data = positives.merge(negatives,
# on=session_identifiers + [trial_identifier,stimulus_dimension],
# suffixes=('_pos','_neg'))

# # dot product of each positive and negative trial with the control group's kernel
# def dot_control(x):
# #print(list(x))
# #print(control_kernel)
# return np.dot(list(x), control_kernel)

# trial_data = trial_data.groupby(session_identifiers+[trial_identifier]).agg({'%s_pos'%stimulus_value:dot_control,
# '%s_neg'%stimulus_value:dot_control}).reset_index()

# # count hits as trials for which the positive stimuli is the one with higher dot product to control kernel
# trial_data['hit'] = trial_data['%s_pos'%stimulus_value] > trial_data['%s_neg'%stimulus_value]

# # compute hit rate (average hit across trials) per participant
# hit_rate = trial_data.groupby(session_identifiers, as_index=False).hit.mean()

# return hit_rate



43 changes: 43 additions & 0 deletions python/palin/kernels/kernel_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python
'''
PALIN toolbox v0.1
Decemberr 2022, Aynaz Adl Zarrabi, JJ Aucouturier (CNRS/UBFC)
Functions for kernel calculating method in Classification images
'''

import pandas as pd
import numpy as np
from abc import ABC, abstractmethod

class KernelExtractor(ABC):

@classmethod
@abstractmethod
def extract_single_kernel(cls,data_df, feature_id = 'feature', value_id = 'value', response_id = 'response'):
raise NotImplementedError()

@classmethod
def extract_kernels(cls,data_df, group_ids, feature_id, value_id, response_id, normalize = True):

# for each level in group, compute kernels

def extract_normalize(group):
kernel = cls.extract_single_kernel(group, feature_id, value_id, response_id)
if normalize:
kernel = cls.normalize_kernel(kernel)
return kernel

return data_df.groupby(group_ids).apply(lambda group: extract_normalize(group)).reset_index()

@classmethod
def normalize_kernel(cls,kernel):
if isinstance(kernel,pd.DataFrame):
rms = np.sqrt((kernel.kernel_value**2).mean())
kernel.kernel_value /= rms
elif isinstance(kernel,(np.ndarray, list)):
rms = np.sqrt(np.mean(np.power(kernel,2)))
kernel = kernel/rms
else:
raise TypeError('argument kernel is neither a pd.DataFrame or a np.ndarray')
return kernel
21 changes: 21 additions & 0 deletions python/palin/kernels/linear_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python
'''
PALIN toolbox v0.1
Decemberr 2022, Aynaz Adl Zarrabi, JJ Aucouturier (CNRS/UBFC)
Functions for kernel calculating method in Classification images
'''

import pandas as pd
import numpy as np
from .kernels import KernelAnalyser

class LinearModel(KernelExtractor):

@classmethod
def extract_single_kernel(cls, data_df, feature_id = 'feature', value_id = 'value', response_id = 'response'):

raise NotImplementedError()



Loading

0 comments on commit 618a482

Please sign in to comment.