From f9387e7719ecf2230caecf9f247db295b6c04770 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Fri, 25 Oct 2024 10:56:35 -0700 Subject: [PATCH 1/6] Diffusion Coefficients with Onsager Formalism --- transport_analysis/__init__.py | 3 +- transport_analysis/diffusion.py | 240 ++++++++++++++++++++++++++++++++ transport_analysis/utils.py | 77 ++++++++++ 3 files changed, 319 insertions(+), 1 deletion(-) create mode 100644 transport_analysis/diffusion.py create mode 100644 transport_analysis/utils.py diff --git a/transport_analysis/__init__.py b/transport_analysis/__init__.py index 4aa7f33..b5c832d 100644 --- a/transport_analysis/__init__.py +++ b/transport_analysis/__init__.py @@ -15,4 +15,5 @@ from . import _version -__version__ = _version.get_versions()["version"] + +__version__ = _version.get_versions()["version"] \ No newline at end of file diff --git a/transport_analysis/diffusion.py b/transport_analysis/diffusion.py new file mode 100644 index 0000000..311ea19 --- /dev/null +++ b/transport_analysis/diffusion.py @@ -0,0 +1,240 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import stats +from transport_analysis.utils import msd_fft_1d, msd_variance_1d, average_directions + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class DiffusionCoefficientEinstein(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + def __init__(self, + allatomgroup: "AtomGroup", + atomgroup_query: str, + temp_avg: float = 298.0, + linear_fit_window: tuple[int, int] = None, + num_atoms_per_species = 1, + mass_per_species = int, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window + + self.allatomgroup = allatomgroup + self.atomgroup = self.allatomgroup.select_atoms(atomgroup_query) + self.n_particles = len(self.atomgroup) + self.num_atoms_per_species = num_atoms_per_species + self.mass_per_species = mass_per_species + + self._dim = 3 + + if self.n_particles%self.num_atoms_per_species != 0: + raise TypeError( + "Some species are fragmented. Invalid AtomGroup" + ) + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + self._msds = np.zeros((self.n_frames,self._dim)) + self._msds_var = np.zeros((self.n_frames,self._dim)) + self._lii_self = np.zeros((self.n_frames)) + self.weights = np.ones((self.n_frames)) + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._masses = self.atomgroup.masses + self._masses_rs = self._masses.reshape((1, len(self._masses), 1)) + + # 3D arrays of frames x particles x dimensions + # positions + self._positions = np.zeros( + (self.n_frames, int(self.n_particles), self._dim) + ) + self._mass_weighted_positions = np.zeros( + (self.n_frames, int(self.n_particles/self.num_atoms_per_species), self._dim) + ) + self.J_to_KJ = 1000 + self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not ( + self._ts.has_positions + and self._ts.volume != 0 + ): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._positions[self._frame_index] = self.atomgroup.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + self._mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._masses,self._dim,axis = 0).reshape(-1,self._dim),self._positions[self._frame_index])/self.mass_per_species).reshape(-1, int(self.num_atoms_per_species), self._dim).sum(axis=1) + self._mass_weighted_positions[self._frame_index] = self._mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] + + + + def _conclude(self): + """ + Calculates the diffusion coefficient via the fft. + """ + for specie_id in (range(int(self.n_particles/self.num_atoms_per_species))): + for i in range(self._dim): + msd_temp = msd_fft_1d(np.array(self._mass_weighted_positions[:,specie_id, :][:,i])) + self._msds[:,i] += msd_temp + self._msds_var[:,i] += msd_variance_1d(np.array(self._mass_weighted_positions[:,specie_id, :][:,i]), self._msds[:,i]) + + + self._lii_self = average_directions(self._msds,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + self.weights = np.abs(1/average_directions(self._msds_var,self._dim)) + self.weights /= np.sum(self.weights) + + lii_self = None + lii_intercept = None + beta = None + if self.linear_fit_window: + lii_self , lii_intercept , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lii_self[self.linear_fit_window[0]:self.linear_fit_window[1]]) + fit_slope = np.gradient(np.log(self._lii_self[self.linear_fit_window[0]:self.linear_fit_window[1]])[1:], np.log(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] - self._times[0])[1:]) + beta = np.nanmean(np.array(fit_slope)) + + else: + lii_intercept , lii_self = np.polynomial.polynomial.polyfit(self._times[1:], self._lii_self[1:], 1, w=self.weights[1:]) + fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) + beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + + + conc = float(self.n_particles/self.num_atoms_per_species)/(self._volumes.mean()*(self.A_to_m**3)) + + self.results.linearity = beta + self.results.fit_slope = lii_self + self.results.fit_intercept = lii_intercept + self.results.lii_self = lii_self*(1/(self.A_to_m*self.fs_to_s)) + self.results.diffusion_coefficient = (lii_self*(1/(self.A_to_m*self.fs_to_s)))*(self.boltzmann*self.temp_avg/conc) + + breakpoint() + + + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot(self._times, self._times*self.results.fit_slope + self.results.fit_intercept, "k--", alpha=0.5) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline(x=self._times[self.linear_fit_window[0]], color='r', linestyle='--', linewidth=2) + plt.axvline(x=self._times[self.linear_fit_window[1]], color='r', linestyle='--', linewidth=2) + #plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + #i = int(len(self._msds) / 5) + #slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + #plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() + + + + + + + + + + + + + diff --git a/transport_analysis/utils.py b/transport_analysis/utils.py new file mode 100644 index 0000000..589a022 --- /dev/null +++ b/transport_analysis/utils.py @@ -0,0 +1,77 @@ +import numpy as np + +def autocorrFFT(x): + """ + Calculates the autocorrelation function using the fast Fourier transform. + + :param x: array[float], function on which to compute autocorrelation function + :return: acf: array[float], autocorrelation function + """ + N= len(x) + F = np.fft.fft(x, n=2*N) + PSD = F * F.conjugate() + res = np.fft.ifft(PSD) + res= (res[:N]).real + n=N*np.ones(N)-np.arange(0,N) + acf = res/n + return acf + +def msd_fft_1d(r): + """Calculates mean square displacement of the array r using the fast Fourier transform.""" + # Algorithm based on https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + N = len(r) + D = np.square(r) + D = np.append(D, 0) + S2 = autocorrFFT(r) + Q = 2 * D.sum() + S1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + S1[m] = Q / (N - m) + return S1 - 2 * S2 + + +def cross_corr(x, y): + N = len(x) + F1 = np.fft.fft( + x, n=2 ** (N * 2 - 1).bit_length() + ) # 2*N because of zero-padding, use next highest power of 2 + F2 = np.fft.fft(y, n=2 ** (N * 2 - 1).bit_length()) + PSD = F1 * F2.conjugate() + res = np.fft.ifft(PSD) + res = (res[:N]).real + n = N * np.ones(N) - np.arange(0, N) # divide res(m) by (N-m) + return res / n + +def msd_variance_1d(r, msd): + + # compute A1, recursive relation with D = r^4 + N = len(r) + D = r**4 + D = np.append(D, 0) + Q = 2 * D.sum() + A1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + A1[m] = Q / (N - m) + + # compute A2, autocorrelation of r^2 + A2 = cross_corr(r**2, r**2) + + # compute A3 and A4, cross correlations of r and r^3 + A3 = cross_corr(r, r**3) + A4 = cross_corr(r**3, r) + + var_x = A1 + 6*A2 - 4*A3 - 4*A4 - msd**2 + n_minus_m = N * np.ones(N) - np.arange(0, N) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + + return var_x/n_minus_m + +def average_directions(r, dim): + if dim == 3: + return (r[:,0] + r[:,1] + r[:,2])/3 + if dim == 2: + return (r[:,0] + r[:,1])/2 + if dim == 1: + return r[:,0] + From 0da02ea046af6de0b7f5376fd9aace32702aa7e9 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Mon, 4 Nov 2024 19:48:35 -0800 Subject: [PATCH 2/6] Conductivity under the Onsager Formalism --- transport_analysis/conductivity.py | 295 +++++++++++++++++++++++++++++ transport_analysis/diffusion.py | 12 +- transport_analysis/utils.py | 55 ++++++ 3 files changed, 354 insertions(+), 8 deletions(-) create mode 100644 transport_analysis/conductivity.py diff --git a/transport_analysis/conductivity.py b/transport_analysis/conductivity.py new file mode 100644 index 0000000..afddd5f --- /dev/null +++ b/transport_analysis/conductivity.py @@ -0,0 +1,295 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import stats +from transport_analysis.utils import msd_fft_cross_1d, msd_variance_cross_1d, average_directions + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class ConductivityEinstein(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + def __init__(self, + allatomgroup: "AtomGroup", + cationgroup_query: str, + aniongroup_query: str, + temp_avg: float = 298.0, + linear_fit_window: tuple[int, int] = None, + cation_num_atoms_per_species = int, + anion_num_atoms_per_species = int, + cation_mass_per_species = float, + anion_mass_per_species = float, + cation_charge = int, + anion_charge = int, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles%self.cation_num_atoms_per_species != 0) or (self.anion_particles%self.anion_num_atoms_per_species != 0): + raise TypeError( + "Some species are fragmented. Invalid AtomGroup" + ) + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + + self._msds_cation_cation = np.zeros((self.n_frames,self._dim)) + self._msds_cation_anion = np.zeros((self.n_frames,self._dim)) + self._msds_anion_anion = np.zeros((self.n_frames,self._dim)) + + self._msds_cation_cation_var = np.zeros((self.n_frames,self._dim)) + self._msds_cation_anion_var = np.zeros((self.n_frames,self._dim)) + self._msds_anion_anion_var = np.zeros((self.n_frames,self._dim)) + self._cond = np.zeros((self.n_frames)) + self._cond_var = np.zeros((self.n_frames)) + + + self._lij_cation_cation = np.zeros((self.n_frames)) + self._lij_cation_anion = np.zeros((self.n_frames)) + self._lij_anion_anion = np.zeros((self.n_frames)) + + self.weights = np.ones((self.n_frames)) + + + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses + + # 3D arrays of frames x particles x dimensions + # positions + self._cation_positions = np.zeros( + (self.n_frames, int(self.cation_particles), self._dim) + ) + self._anion_positions = np.zeros( + (self.n_frames, int(self.anion_particles), self._dim) + ) + + self._cation_mass_weighted_positions = np.zeros( + (self.n_frames, int(self.cation_particles/self.cation_num_atoms_per_species), self._dim) + ) + + self._anion_mass_weighted_positions = np.zeros( + (self.n_frames, int(self.anion_particles/self.anion_num_atoms_per_species), self._dim) + ) + self.J_to_KJ = 1000 + self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants['elementary_charge'] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not ( + self._ts.has_positions + and self._ts.volume != 0 + ): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_positions[self._frame_index] = self.cations.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + self._cation_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._cation_masses,self._dim,axis = 0).reshape(-1,self._dim),self._cation_positions[self._frame_index])/self.cation_mass_per_species).reshape(-1, int(self.cation_num_atoms_per_species), self._dim).sum(axis=1) + self._anion_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._anion_masses,self._dim,axis = 0).reshape(-1,self._dim),self._anion_positions[self._frame_index])/self.anion_mass_per_species).reshape(-1, int(self.anion_num_atoms_per_species), self._dim).sum(axis=1) + + self._cation_mass_weighted_positions[self._frame_index] = self._cation_mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] + self._anion_mass_weighted_positions[self._frame_index] = self._anion_mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] + + + + + def _conclude(self): + """ + Calculates the diffusion coefficient via the fft. + """ + cation_summed_positions = np.sum(self._cation_mass_weighted_positions, axis=1) + anion_summed_positions = np.sum(self._anion_mass_weighted_positions, axis=1) + + self._msds_cation_cation = np.transpose( + [msd_fft_cross_1d(cation_summed_positions[:, i], cation_summed_positions[:, i]) for i in range(self._dim)] + ) + self._msds_cation_cation_var = np.transpose( + [msd_variance_cross_1d(cation_summed_positions[:, i], cation_summed_positions[:, i], self._msds_cation_cation[:, i]) for i in range(self._dim)] + ) + + self._msds_anion_anion = np.transpose( + [msd_fft_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] + ) + self._msds_anion_anion_var = np.transpose( + [msd_variance_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i], self._msds_anion_anion[:, i]) for i in range(self._dim)] + ) + + self._msds_cation_anion = np.transpose( + [msd_fft_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] + ) + self._msds_cation_anion_var = np.transpose( + [msd_variance_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i], self._msds_cation_anion[:, i]) for i in range(self._dim)] + ) + self._lij_cation_cation = average_directions(self._msds_cation_cation,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + self._lij_cation_anion = average_directions(self._msds_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + self._lij_anion_anion = average_directions(self._msds_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + + self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion + self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var + self.weights = np.sqrt(np.abs(1/average_directions(self._cond_var,self._dim))) + self.weights /= np.sum(self.weights) + + cond = None + cond_intercept = None + beta = None + if self.linear_fit_window: + cond , cond_intercept , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._cond[self.linear_fit_window[0]:self.linear_fit_window[1]]) + fit_slope = np.gradient(np.log(self._cond[self.linear_fit_window[0]:self.linear_fit_window[1]])[1:], np.log(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] - self._times[0])[1:]) + beta = np.nanmean(np.array(fit_slope)) + + else: + cond_intercept , cond = np.polynomial.polynomial.polyfit(self._times[1:], self._cond[1:], 1, w=self.weights[1:]) + fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) + beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + + + self.results.linearity = beta + self.results.fit_slope = cond + self.results.fit_intercept = cond_intercept + self.results.conductivity = (cond*(1/(self.A_to_m*self.fs_to_s)))*(self.e_to_C**2)*1000 + + + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot(self._times, self._times*self.results.fit_slope + self.results.fit_intercept, "k--", alpha=0.5) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline(x=self._times[self.linear_fit_window[0]], color='r', linestyle='--', linewidth=2) + plt.axvline(x=self._times[self.linear_fit_window[1]], color='r', linestyle='--', linewidth=2) + #plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + #i = int(len(self._msds) / 5) + #slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + #plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() + + + + + + + + + + + + + diff --git a/transport_analysis/diffusion.py b/transport_analysis/diffusion.py index 311ea19..461feb9 100644 --- a/transport_analysis/diffusion.py +++ b/transport_analysis/diffusion.py @@ -74,8 +74,8 @@ def __init__(self, atomgroup_query: str, temp_avg: float = 298.0, linear_fit_window: tuple[int, int] = None, - num_atoms_per_species = 1, - mass_per_species = int, + num_atoms_per_species = int, + mass_per_species = float, **kwargs, ) -> None: # the below line must be kept to initialize the AnalysisBase class! @@ -121,7 +121,6 @@ def _prepare(self): self._times = np.zeros((self.n_frames)) self._masses = self.atomgroup.masses - self._masses_rs = self._masses.reshape((1, len(self._masses), 1)) # 3D arrays of frames x particles x dimensions # positions @@ -179,7 +178,7 @@ def _conclude(self): self._lii_self = average_directions(self._msds,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self.weights = np.abs(1/average_directions(self._msds_var,self._dim)) + self.weights = np.sqrt(np.abs(1/average_directions(self._msds_var,self._dim))) self.weights /= np.sum(self.weights) lii_self = None @@ -202,10 +201,7 @@ def _conclude(self): self.results.fit_slope = lii_self self.results.fit_intercept = lii_intercept self.results.lii_self = lii_self*(1/(self.A_to_m*self.fs_to_s)) - self.results.diffusion_coefficient = (lii_self*(1/(self.A_to_m*self.fs_to_s)))*(self.boltzmann*self.temp_avg/conc) - - breakpoint() - + self.results.diffusion_coefficient = (lii_self*(1/(self.A_to_m*self.fs_to_s)))*(self.boltzmann*self.temp_avg/conc) def plot_linear_fit(self): """ diff --git a/transport_analysis/utils.py b/transport_analysis/utils.py index 589a022..755069c 100644 --- a/transport_analysis/utils.py +++ b/transport_analysis/utils.py @@ -67,6 +67,61 @@ def msd_variance_1d(r, msd): return var_x/n_minus_m +def msd_fft_cross_1d(r, k): + """Calculates mean square displacement of the array r using the fast Fourier transform.""" + + N = len(r) + D = np.multiply(r, k) + D = np.append(D, 0) + S2 = cross_corr(r, k) + S3 = cross_corr(k, r) + Q = 2 * D.sum() + S1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + S1[m] = Q / (N - m) + return S1 - S2 - S3 + +def msd_variance_cross_1d(r1, r2, msd): + + # compute A1, recursive relation with D = r1^2 r2^2 + N = len(r1) + D = np.square(r1)*np.square(r2) + D = np.append(D, 0) + Q = 2 * D.sum() + A1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + A1[m] = Q / (N - m) + + # compute A2, cross correlation of r1^2r2 and r2 + A2 = cross_corr(np.square(r1)*r2, r2) + + # compute A3, cross correlation of r1^2 and r2^2 + A3 = cross_corr(np.square(r1), np.square(r2)) + + # compute A4, cross correlation of (r1r2^2) and r1 + A4 = cross_corr(r1*np.square(r2), r1) + + # compute A5, cross correlation of r1r2 and r1r2 + A5 = cross_corr(r1*r2, r1*r2) + + # compute A6, cross correlation of r1 and r1r2^2 + A6 = cross_corr(r1, np.square(r2)*r1) + + # compute A7, cross correlation of r2^2 and r1^2 + A7 = cross_corr(np.square(r2), np.square(r1)) + + # compute A8, cross correlation of r2 and r1^2r2 + A8 = cross_corr(r2, np.square(r1)*r2) + + var_x = A1 - 2*A2 + A3 - 2*A4 +4*A5 - 2*A6 + A7 - 2*A8 - msd**2 + n_minus_m = N * np.ones(N) - np.arange(0, N) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + + return var_x/n_minus_m + + + def average_directions(r, dim): if dim == 3: return (r[:,0] + r[:,1] + r[:,2])/3 From 9f792ce30c7d0f9bc2de75c25551dfaed6373460 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Tue, 19 Nov 2024 09:45:49 -0800 Subject: [PATCH 3/6] Tested the conductivity computation --- transport_analysis/conductivity.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/transport_analysis/conductivity.py b/transport_analysis/conductivity.py index afddd5f..661dcb0 100644 --- a/transport_analysis/conductivity.py +++ b/transport_analysis/conductivity.py @@ -195,8 +195,10 @@ def _single_frame(self): self._times[self._frame_index] = self._ts.time # set shape of position array - self._cation_positions[self._frame_index] = self.cations.positions + self._cation_positions[self._frame_index] = self.cations.positions + self._anion_positions[self._frame_index] = self.anions.positions self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + self._cation_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._cation_masses,self._dim,axis = 0).reshape(-1,self._dim),self._cation_positions[self._frame_index])/self.cation_mass_per_species).reshape(-1, int(self.cation_num_atoms_per_species), self._dim).sum(axis=1) self._anion_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._anion_masses,self._dim,axis = 0).reshape(-1,self._dim),self._anion_positions[self._frame_index])/self.anion_mass_per_species).reshape(-1, int(self.anion_num_atoms_per_species), self._dim).sum(axis=1) @@ -208,7 +210,7 @@ def _single_frame(self): def _conclude(self): """ - Calculates the diffusion coefficient via the fft. + Calculates the conductivity coefficient via the fft. """ cation_summed_positions = np.sum(self._cation_mass_weighted_positions, axis=1) anion_summed_positions = np.sum(self._anion_mass_weighted_positions, axis=1) @@ -219,20 +221,18 @@ def _conclude(self): self._msds_cation_cation_var = np.transpose( [msd_variance_cross_1d(cation_summed_positions[:, i], cation_summed_positions[:, i], self._msds_cation_cation[:, i]) for i in range(self._dim)] ) - self._msds_anion_anion = np.transpose( [msd_fft_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] ) self._msds_anion_anion_var = np.transpose( [msd_variance_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i], self._msds_anion_anion[:, i]) for i in range(self._dim)] ) - self._msds_cation_anion = np.transpose( [msd_fft_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] ) self._msds_cation_anion_var = np.transpose( [msd_variance_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i], self._msds_cation_anion[:, i]) for i in range(self._dim)] - ) + ) self._lij_cation_cation = average_directions(self._msds_cation_cation,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) self._lij_cation_anion = average_directions(self._msds_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) self._lij_anion_anion = average_directions(self._msds_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) @@ -254,12 +254,11 @@ def _conclude(self): cond_intercept , cond = np.polynomial.polynomial.polyfit(self._times[1:], self._cond[1:], 1, w=self.weights[1:]) fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) beta = np.average(np.array(fit_slope), weights=self.weights[1:]) - self.results.linearity = beta self.results.fit_slope = cond self.results.fit_intercept = cond_intercept - self.results.conductivity = (cond*(1/(self.A_to_m*self.fs_to_s)))*(self.e_to_C**2)*1000 + self.results.conductivity = cond*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 def plot_linear_fit(self): From bdfd861b823d9f60c3dc07a9208e29070dcbbe65 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Mon, 25 Nov 2024 09:17:40 -0800 Subject: [PATCH 4/6] Added Conductivity using Green Kubo Integration method --- transport_analysis/conductivity_green_kubo.py | 258 ++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 transport_analysis/conductivity_green_kubo.py diff --git a/transport_analysis/conductivity_green_kubo.py b/transport_analysis/conductivity_green_kubo.py new file mode 100644 index 0000000..dcb9268 --- /dev/null +++ b/transport_analysis/conductivity_green_kubo.py @@ -0,0 +1,258 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import integrate +from transport_analysis.utils import cross_corr, average_directions + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class ConductivityKubo(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + def __init__(self, + allatomgroup: "AtomGroup", + cationgroup_query: str, + aniongroup_query: str, + temp_avg: float = 298.0, + cutoff_step: int = 10000, + cation_num_atoms_per_species = int, + anion_num_atoms_per_species = int, + cation_mass_per_species = float, + anion_mass_per_species = float, + cation_charge = int, + anion_charge = int, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.cutoff_step = cutoff_step + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles%self.cation_num_atoms_per_species != 0) or (self.anion_particles%self.anion_num_atoms_per_species != 0): + raise TypeError( + "Some species are fragmented. Invalid AtomGroup" + ) + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + + self._acf_cation_cation = np.zeros((self.n_frames,self._dim)) + self._acf_cation_anion = np.zeros((self.n_frames,self._dim)) + self._acf_anion_anion = np.zeros((self.n_frames,self._dim)) + + self._l_cation_cation = np.zeros((self.n_frames-1, self._dim)) + self._l_cation_anion = np.zeros((self.n_frames-1, self._dim)) + self._l_anion_anion = np.zeros((self.n_frames-1, self._dim)) + self._lij_cation_cation = np.zeros((self.n_frames-1)) + self._lij_cation_anion = np.zeros((self.n_frames-1)) + self._lij_anion_anion = np.zeros((self.n_frames-1)) + + + self._cond = np.zeros((self.n_frames-1)) + + self._coms_velocity = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses + self._all_masses = self.allatomgroup.masses + + # 3D arrays of frames x particles x dimensions + # positions + self._cation_velocties = np.zeros( + (self.n_frames, int(self.cation_particles), self._dim) + ) + self._anion_velocities = np.zeros( + (self.n_frames, int(self.anion_particles), self._dim) + ) + + self._cation_mass_weighted_velocities = np.zeros( + (self.n_frames, int(self.cation_particles/self.cation_num_atoms_per_species), self._dim) + ) + + self._anion_mass_weighted_velocities = np.zeros( + (self.n_frames, int(self.anion_particles/self.anion_num_atoms_per_species), self._dim) + ) + self.J_to_KJ = 1000 + self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants['elementary_charge'] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not ( + self._ts.has_velocities + and self._ts.volume != 0 + ): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_velocties[self._frame_index] = self.cations.velocities + self._anion_velocities[self._frame_index] = self.anions.velocities + self._coms_velocity[self._frame_index] = np.multiply(np.repeat(self._all_masses,3,axis = 0).reshape(-1,3), self.allatomgroup.velocities).sum(axis = 0)/self._all_masses.sum() + + self._cation_mass_weighted_velocities[self._frame_index] = (np.multiply(np.repeat(self._cation_masses,self._dim,axis = 0).reshape(-1,self._dim),self._cation_velocties[self._frame_index])/self.cation_mass_per_species).reshape(-1, int(self.cation_num_atoms_per_species), self._dim).sum(axis=1) + self._anion_mass_weighted_velocities[self._frame_index] = (np.multiply(np.repeat(self._anion_masses,self._dim,axis = 0).reshape(-1,self._dim),self._anion_velocities[self._frame_index])/self.anion_mass_per_species).reshape(-1, int(self.anion_num_atoms_per_species), self._dim).sum(axis=1) + + self._cation_mass_weighted_velocities[self._frame_index] = self._cation_mass_weighted_velocities[self._frame_index] - self._coms_velocity[self._frame_index] + self._anion_mass_weighted_velocities[self._frame_index] = self._anion_mass_weighted_velocities[self._frame_index] - self._coms_velocity[self._frame_index] + + + + + def _conclude(self): + """ + Calculates the conductivity coefficient via the fft. + """ + cation_summed_velocities = np.sum(self._cation_mass_weighted_velocities, axis=1) + anion_summed_velocities = np.sum(self._anion_mass_weighted_velocities, axis=1) + for i in range(self._dim): + self._acf_cation_cation[:,i] = cross_corr(cation_summed_velocities[:,i],cation_summed_velocities[:,i]) + self._acf_anion_anion[:,i] = cross_corr(anion_summed_velocities[:,i],anion_summed_velocities[:,i]) + self._acf_cation_anion[:,i] = cross_corr(cation_summed_velocities[:,i],anion_summed_velocities[:,i]) + for i in range(self._dim): + self._l_cation_cation[:,i] = integrate.cumulative_trapezoid(self._acf_cation_cation[:,i],self._times[:,i]) + self._l_anion_anion[:,i] = integrate.cumulative_trapezoid(self._acf_anion_anion[:,i],self._times[:,i]) + self._l_cation_anion[:,i] = integrate.cumulative_trapezoid(self._acf_cation_anion[:,i],self.times[:,i]) + + + self._lij_cation_cation = average_directions(self._l_cation_cation,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + self._lij_cation_anion = average_directions(self._l_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + self._lij_anion_anion = average_directions(self._l_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) + + self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion + + self.results.conductivity = self._cond[self.cutoff_step]*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 + + + def plot_acf(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times[:self.cutoff_step],self._acf_cation_cation[:self.cutoff_step], label = '+ +') + plt.plot(self._times[:self.cutoff_step],self._acf_cation_anion[:self.cutoff_step], label = '+ -') + plt.plot(self._times[:self.cutoff_step],self._acf_anion_anion[:self.cutoff_step], label = '- -') + + plt.grid() + plt.ylabel("Velocity Autocorrelation Function") + plt.xlabel("Time") + plt.tight_layout() + plt.show() + + + + + + + + + + + + + From a013294bb8e4debe14606b0c5ddaa0d3b731c969 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Mon, 25 Nov 2024 10:04:00 -0800 Subject: [PATCH 5/6] Changed file names and added transference num --- ...en_kubo.py => conductivity_integration.py} | 10 +++- .../{conductivity.py => conductivity_msd.py} | 59 ++++++++++++++----- .../{diffusion.py => diffusion_msd.py} | 11 ++-- 3 files changed, 58 insertions(+), 22 deletions(-) rename transport_analysis/{conductivity_green_kubo.py => conductivity_integration.py} (92%) rename transport_analysis/{conductivity.py => conductivity_msd.py} (76%) rename transport_analysis/{diffusion.py => diffusion_msd.py} (96%) diff --git a/transport_analysis/conductivity_green_kubo.py b/transport_analysis/conductivity_integration.py similarity index 92% rename from transport_analysis/conductivity_green_kubo.py rename to transport_analysis/conductivity_integration.py index dcb9268..6853441 100644 --- a/transport_analysis/conductivity_green_kubo.py +++ b/transport_analysis/conductivity_integration.py @@ -226,8 +226,16 @@ def _conclude(self): self._lij_anion_anion = average_directions(self._l_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion - + cation_mass_fraction = self._cation_masses.sum()/self._all_masses.sum() + anion_mass_fraction = self._anion_masses.sum()/self._all_masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction self.results.conductivity = self._cond[self.cutoff_step]*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 + self.results.cation_transference_com = ((self.cation_charge**2)*self._lij_cation_cation[self.cutoff_step] + (self.cation_charge*self.anion_charge)*self._lij_cation_anion[self.cutoff_step])/self._cond[self.cutoff_step] + self.results.anion_transference_com = ((self.anion_charge**2)*self._lij_anion_anion[self.cutoff_step] + (self.cation_charge*self.anion_charge)*self._lij_cation_anion[self.cutoff_step])/self._cond[self.cutoff_step] + self.results.cation_transference_solvent = (self.results.cation_transference_com - anion_mass_fraction)/solvent_fraction + self.results.anion_transference_solvent = (self.results.anion_transference_com -cation_mass_fraction)/solvent_fraction + + def plot_acf(self): diff --git a/transport_analysis/conductivity.py b/transport_analysis/conductivity_msd.py similarity index 76% rename from transport_analysis/conductivity.py rename to transport_analysis/conductivity_msd.py index 661dcb0..af592f7 100644 --- a/transport_analysis/conductivity.py +++ b/transport_analysis/conductivity_msd.py @@ -140,7 +140,9 @@ def _prepare(self): self._lij_cation_anion = np.zeros((self.n_frames)) self._lij_anion_anion = np.zeros((self.n_frames)) - self.weights = np.ones((self.n_frames)) + self._lij_cation_cation_weights = np.ones((self.n_frames)) + self._lij_cation_anion_weights = np.ones((self.n_frames)) + self._lij_anion_anion_weights = np.ones((self.n_frames)) self._coms = np.zeros((self.n_frames, self._dim)) @@ -237,30 +239,55 @@ def _conclude(self): self._lij_cation_anion = average_directions(self._msds_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) self._lij_anion_anion = average_directions(self._msds_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion - self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var - self.weights = np.sqrt(np.abs(1/average_directions(self._cond_var,self._dim))) - self.weights /= np.sum(self.weights) + #self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion + #self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var + + self._lij_cation_cation_weights = np.sqrt(np.abs(1/average_directions(self._msds_cation_cation_var,self._dim))) + self._lij_cation_cation_weights /= np.sum(self._lij_cation_cation_weights) + + self._lij_cation_anion_weights = np.sqrt(np.abs(1/average_directions(self._msds_cation_anion_var,self._dim))) + self._lij_cation_anion_weights /= np.sum(self._lij_cation_anion_weights) + + self._lij_anion_anion_weights = np.sqrt(np.abs(1/average_directions(self._msds_anion_anion_var,self._dim))) + self._lij_anion_anion_weights /= np.sum(self._lij_anion_anion_weights) cond = None - cond_intercept = None + lij_cation_cation = None + lij_cation_anion = None + lij_anion_anion = None beta = None if self.linear_fit_window: - cond , cond_intercept , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._cond[self.linear_fit_window[0]:self.linear_fit_window[1]]) + lij_cation_cation , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_cation_cation[self.linear_fit_window[0]:self.linear_fit_window[1]]) + lij_cation_anion , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_cation_anion[self.linear_fit_window[0]:self.linear_fit_window[1]]) + lij_anion_anion , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_anion_anion[self.linear_fit_window[0]:self.linear_fit_window[1]]) + cond = (self.cation_charge**2)*lij_cation_cation + (self.anion_charge**2)*lij_anion_anion + (2*self.cation_charge*self.anion_charge)*lij_cation_anion + + self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion fit_slope = np.gradient(np.log(self._cond[self.linear_fit_window[0]:self.linear_fit_window[1]])[1:], np.log(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] - self._times[0])[1:]) beta = np.nanmean(np.array(fit_slope)) else: - cond_intercept , cond = np.polynomial.polynomial.polyfit(self._times[1:], self._cond[1:], 1, w=self.weights[1:]) - fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) - beta = np.average(np.array(fit_slope), weights=self.weights[1:]) - - self.results.linearity = beta - self.results.fit_slope = cond - self.results.fit_intercept = cond_intercept - self.results.conductivity = cond*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 - + _ , lij_cation_cation = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_cation_cation[1:], 1, w=self._lij_cation_cation_weights[1:]) + _ , lij_cation_anion = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_cation_anion[1:], 1, w=self._lij_cation_anion_weights[1:]) + _ , lij_anion_anion = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_anion_anion[1:], 1, w=self._lij_anion_anion_weights[1:]) + cond = (self.cation_charge**2)*lij_cation_cation + (self.anion_charge**2)*lij_anion_anion + (2*self.cation_charge*self.anion_charge)*lij_cation_anion + #fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) + #beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + if self.linear_fit_window: + self.results.linearity = beta + #self.results.fit_slope = cond + #self.results.fit_intercept = cond_intercept + cation_mass_fraction = self._cation_masses.sum()/self.allatomgroup.masses.sum() + anion_mass_fraction = self._anion_masses.sum()/self.allatomgroup.masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction + + self.results.conductivity = cond*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 + self.results.cation_transference_com = ((self.cation_charge**2)*lij_cation_cation + (self.cation_charge*self.anion_charge)*lij_cation_anion)/cond + self.results.anion_transference_com = ((self.anion_charge**2)*lij_anion_anion + (self.cation_charge*self.anion_charge)*lij_cation_anion)/cond + self.results.cation_transference_solvent = (self.results.cation_transference_com - anion_mass_fraction)/solvent_fraction + self.results.anion_transference_solvent = (self.results.anion_transference_com -cation_mass_fraction)/solvent_fraction + def plot_linear_fit(self): """ Plot the mead square displacement vs time and the fit region in the log-log scale diff --git a/transport_analysis/diffusion.py b/transport_analysis/diffusion_msd.py similarity index 96% rename from transport_analysis/diffusion.py rename to transport_analysis/diffusion_msd.py index 461feb9..f446e3a 100644 --- a/transport_analysis/diffusion.py +++ b/transport_analysis/diffusion_msd.py @@ -191,15 +191,16 @@ def _conclude(self): else: lii_intercept , lii_self = np.polynomial.polynomial.polyfit(self._times[1:], self._lii_self[1:], 1, w=self.weights[1:]) - fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) - beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + #fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) + #beta = np.average(np.array(fit_slope), weights=self.weights[1:]) conc = float(self.n_particles/self.num_atoms_per_species)/(self._volumes.mean()*(self.A_to_m**3)) - self.results.linearity = beta - self.results.fit_slope = lii_self - self.results.fit_intercept = lii_intercept + if self.linear_fit_window: + self.results.linearity = beta + #self.results.fit_slope = lii_self + #self.results.fit_intercept = lii_intercept self.results.lii_self = lii_self*(1/(self.A_to_m*self.fs_to_s)) self.results.diffusion_coefficient = (lii_self*(1/(self.A_to_m*self.fs_to_s)))*(self.boltzmann*self.temp_avg/conc) From 49ca3d86bb489f0671a9b5e67c58d214a454a357 Mon Sep 17 00:00:00 2001 From: Rohith Srinivaas M Date: Sun, 1 Dec 2024 21:29:59 -0800 Subject: [PATCH 6/6] Linted Code with Black --- transport_analysis/__init__.py | 2 +- transport_analysis/_version.py | 24 +- .../conductivity_integration.py | 412 +++++++----- transport_analysis/conductivity_msd.py | 591 +++++++++++------- transport_analysis/diffusion_msd.py | 323 +++++----- transport_analysis/due.py | 4 +- .../tests/test_velocityautocorr.py | 48 +- transport_analysis/tests/test_viscosity.py | 4 +- transport_analysis/tests/utils.py | 4 +- transport_analysis/utils.py | 51 +- transport_analysis/velocityautocorr.py | 26 +- transport_analysis/viscosity.py | 28 +- 12 files changed, 863 insertions(+), 654 deletions(-) diff --git a/transport_analysis/__init__.py b/transport_analysis/__init__.py index b5c832d..05dbe1e 100644 --- a/transport_analysis/__init__.py +++ b/transport_analysis/__init__.py @@ -16,4 +16,4 @@ from . import _version -__version__ = _version.get_versions()["version"] \ No newline at end of file +__version__ = _version.get_versions()["version"] diff --git a/transport_analysis/_version.py b/transport_analysis/_version.py index e9a7a4c..1d48932 100644 --- a/transport_analysis/_version.py +++ b/transport_analysis/_version.py @@ -288,9 +288,7 @@ def git_pieces_from_vcs( env.pop("GIT_DIR", None) runner = functools.partial(runner, env=env) - _, rc = runner( - GITS, ["rev-parse", "--git-dir"], cwd=root, hide_stderr=not verbose - ) + _, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root, hide_stderr=not verbose) if rc != 0: if verbose: print("Directory %s not under git control" % root) @@ -325,9 +323,7 @@ def git_pieces_from_vcs( pieces["short"] = full_out[:7] # maybe improved later pieces["error"] = None - branch_name, rc = runner( - GITS, ["rev-parse", "--abbrev-ref", "HEAD"], cwd=root - ) + branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"], cwd=root) # --abbrev-ref was added in git-1.6.3 if rc != 0 or branch_name is None: raise NotThisMethod("'git rev-parse --abbrev-ref' returned error") @@ -376,9 +372,7 @@ def git_pieces_from_vcs( mo = re.search(r"^(.+)-(\d+)-g([0-9a-f]+)$", git_describe) if not mo: # unparsable. Maybe git-describe is misbehaving? - pieces["error"] = ( - "unable to parse git-describe output: '%s'" % describe_out - ) + pieces["error"] = "unable to parse git-describe output: '%s'" % describe_out return pieces # tag @@ -407,9 +401,7 @@ def git_pieces_from_vcs( pieces["distance"] = len(out.split()) # total number of commits # commit date: see ISO-8601 comment in git_versions_from_keywords() - date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[ - 0 - ].strip() + date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[0].strip() # Use only the last line. Previous lines may contain GPG signature # information. date = date.splitlines()[-1] @@ -497,9 +489,7 @@ def render_pep440_pre(pieces: Dict[str, Any]) -> str: if pieces["closest-tag"]: if pieces["distance"]: # update the post release segment - tag_version, post_version = pep440_split_post( - pieces["closest-tag"] - ) + tag_version, post_version = pep440_split_post(pieces["closest-tag"]) rendered = tag_version if post_version is not None: rendered += ".post%d.dev%d" % ( @@ -688,9 +678,7 @@ def get_versions() -> Dict[str, Any]: verbose = cfg.verbose try: - return git_versions_from_keywords( - get_keywords(), cfg.tag_prefix, verbose - ) + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, verbose) except NotThisMethod: pass diff --git a/transport_analysis/conductivity_integration.py b/transport_analysis/conductivity_integration.py index 6853441..325a519 100644 --- a/transport_analysis/conductivity_integration.py +++ b/transport_analysis/conductivity_integration.py @@ -12,23 +12,24 @@ if TYPE_CHECKING: from MDAnalysis.core.universe import AtomGroup + ## Need to pass the entire the universe to get the com of the universe not just the atomg class ConductivityKubo(AnalysisBase): - """ - Class to calculate the diffusion_coefficient of a species. - Note that the slope of the mean square displacement provides - the diffusion coeffiecient. This implementation uses FFT to compute MSD faster - as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft - Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory - plays a critical role in the accuracy of the result. - - Particle velocities are required to calculate the viscosity function. Thus - you are limited to MDA trajectories that contain velocity information, e.g. - GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: - https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. - - Parameters - ---------- + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- atomgroup : AtomGroup An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. Note that :class:`UpdatingAtomGroup` instances are not accepted. @@ -41,7 +42,7 @@ class ConductivityKubo(AnalysisBase): A tuple of two integers specifying the start and end lag-time for the linear fit of the viscosity function. If not provided, the linear fit is not performed and viscosity is not calculated. - + Attributes ---------- atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` @@ -69,198 +70,273 @@ class ConductivityKubo(AnalysisBase): n_particles : int Number of particles viscosity was calculated over. """ - def __init__(self, + + def __init__( + self, allatomgroup: "AtomGroup", - cationgroup_query: str, - aniongroup_query: str, + cationgroup_query: str, + aniongroup_query: str, temp_avg: float = 298.0, - cutoff_step: int = 10000, - cation_num_atoms_per_species = int, - anion_num_atoms_per_species = int, - cation_mass_per_species = float, - anion_mass_per_species = float, - cation_charge = int, - anion_charge = int, + cutoff_step: int = 10000, + cation_num_atoms_per_species=int, + anion_num_atoms_per_species=int, + cation_mass_per_species=float, + anion_mass_per_species=float, + cation_charge=int, + anion_charge=int, **kwargs, ) -> None: # the below line must be kept to initialize the AnalysisBase class! - super().__init__(allatomgroup.universe.trajectory, **kwargs) + super().__init__(allatomgroup.universe.trajectory, **kwargs) # after this you will be able to access `self.results` # `self.results` is a dictionary-like object # that can should used to store and retrieve results # See more at the MDAnalysis documentation: # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results - if isinstance(allatomgroup, UpdatingAtomGroup): - raise TypeError( + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( "UpdatingAtomGroups are not valid for diffusion computation" ) - self.temp_avg = temp_avg - self.cutoff_step = cutoff_step - - self.allatomgroup = allatomgroup - self.cations = self.allatomgroup.select_atoms(cationgroup_query) - self.anions = self.allatomgroup.select_atoms(aniongroup_query) - self.cation_num_atoms_per_species = cation_num_atoms_per_species - self.anion_num_atoms_per_species = anion_num_atoms_per_species - self.cation_mass_per_species = cation_mass_per_species - self.anion_mass_per_species = anion_mass_per_species - self.cation_charge = cation_charge - self.anion_charge = anion_charge - - self._dim = 3 - self.cation_particles = len(self.cations) - self.anion_particles = len(self.anions) - - if (self.cation_particles%self.cation_num_atoms_per_species != 0) or (self.anion_particles%self.anion_num_atoms_per_species != 0): - raise TypeError( - "Some species are fragmented. Invalid AtomGroup" - ) - - def _prepare(self): - """ + self.temp_avg = temp_avg + self.cutoff_step = cutoff_step + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles % self.cation_num_atoms_per_species != 0) or ( + self.anion_particles % self.anion_num_atoms_per_species != 0 + ): + raise TypeError("Some species are fragmented. Invalid AtomGroup") + + def _prepare(self): + """ Set up viscosity, mass, velocity, and position arrays before the analysis loop begins """ # 2D viscosity array of frames x particles - self._volumes = np.zeros((self.n_frames)) - - self._acf_cation_cation = np.zeros((self.n_frames,self._dim)) - self._acf_cation_anion = np.zeros((self.n_frames,self._dim)) - self._acf_anion_anion = np.zeros((self.n_frames,self._dim)) - - self._l_cation_cation = np.zeros((self.n_frames-1, self._dim)) - self._l_cation_anion = np.zeros((self.n_frames-1, self._dim)) - self._l_anion_anion = np.zeros((self.n_frames-1, self._dim)) - self._lij_cation_cation = np.zeros((self.n_frames-1)) - self._lij_cation_anion = np.zeros((self.n_frames-1)) - self._lij_anion_anion = np.zeros((self.n_frames-1)) - - - self._cond = np.zeros((self.n_frames-1)) - - self._coms_velocity = np.zeros((self.n_frames, self._dim)) - self._times = np.zeros((self.n_frames)) - - self._cation_masses = self.cations.masses - self._anion_masses = self.anions.masses - self._all_masses = self.allatomgroup.masses + self._volumes = np.zeros((self.n_frames)) + + self._acf_cation_cation = np.zeros((self.n_frames, self._dim)) + self._acf_cation_anion = np.zeros((self.n_frames, self._dim)) + self._acf_anion_anion = np.zeros((self.n_frames, self._dim)) + + self._l_cation_cation = np.zeros((self.n_frames - 1, self._dim)) + self._l_cation_anion = np.zeros((self.n_frames - 1, self._dim)) + self._l_anion_anion = np.zeros((self.n_frames - 1, self._dim)) + self._lij_cation_cation = np.zeros((self.n_frames - 1)) + self._lij_cation_anion = np.zeros((self.n_frames - 1)) + self._lij_anion_anion = np.zeros((self.n_frames - 1)) + + self._cond = np.zeros((self.n_frames - 1)) + + self._coms_velocity = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses + self._all_masses = self.allatomgroup.masses # 3D arrays of frames x particles x dimensions # positions - self._cation_velocties = np.zeros( + self._cation_velocties = np.zeros( (self.n_frames, int(self.cation_particles), self._dim) ) - self._anion_velocities = np.zeros( + self._anion_velocities = np.zeros( (self.n_frames, int(self.anion_particles), self._dim) ) - self._cation_mass_weighted_velocities = np.zeros( - (self.n_frames, int(self.cation_particles/self.cation_num_atoms_per_species), self._dim) + self._cation_mass_weighted_velocities = np.zeros( + ( + self.n_frames, + int(self.cation_particles / self.cation_num_atoms_per_species), + self._dim, + ) ) - self._anion_mass_weighted_velocities = np.zeros( - (self.n_frames, int(self.anion_particles/self.anion_num_atoms_per_species), self._dim) + self._anion_mass_weighted_velocities = np.zeros( + ( + self.n_frames, + int(self.anion_particles / self.anion_num_atoms_per_species), + self._dim, + ) ) - self.J_to_KJ = 1000 - self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] - self.A_to_m = 1e-10 - self.fs_to_s = 1e-15 - self.e_to_C = constants['elementary_charge'] - - def _single_frame(self): - """ - Constructs arrays of velocities and positions - for viscosity computation. - """ + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] + ) + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants["elementary_charge"] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ # This runs once for each frame of the trajectory # The trajectory positions update automatically # You can access the frame number using self._frame_index # trajectory must have velocity and position information - if not ( - self._ts.has_velocities - and self._ts.volume != 0 - ): - raise NoDataError( + if not (self._ts.has_velocities and self._ts.volume != 0): + raise NoDataError( "Einstein diffusion computation requires positions, and box volume in the trajectory" ) - # fill volume array - self._volumes[self._frame_index] = self._ts.volume - self._times[self._frame_index] = self._ts.time - # set shape of position array - - self._cation_velocties[self._frame_index] = self.cations.velocities - self._anion_velocities[self._frame_index] = self.anions.velocities - self._coms_velocity[self._frame_index] = np.multiply(np.repeat(self._all_masses,3,axis = 0).reshape(-1,3), self.allatomgroup.velocities).sum(axis = 0)/self._all_masses.sum() - - self._cation_mass_weighted_velocities[self._frame_index] = (np.multiply(np.repeat(self._cation_masses,self._dim,axis = 0).reshape(-1,self._dim),self._cation_velocties[self._frame_index])/self.cation_mass_per_species).reshape(-1, int(self.cation_num_atoms_per_species), self._dim).sum(axis=1) - self._anion_mass_weighted_velocities[self._frame_index] = (np.multiply(np.repeat(self._anion_masses,self._dim,axis = 0).reshape(-1,self._dim),self._anion_velocities[self._frame_index])/self.anion_mass_per_species).reshape(-1, int(self.anion_num_atoms_per_species), self._dim).sum(axis=1) - - self._cation_mass_weighted_velocities[self._frame_index] = self._cation_mass_weighted_velocities[self._frame_index] - self._coms_velocity[self._frame_index] - self._anion_mass_weighted_velocities[self._frame_index] = self._anion_mass_weighted_velocities[self._frame_index] - self._coms_velocity[self._frame_index] - - - - - def _conclude(self): - """ - Calculates the conductivity coefficient via the fft. - """ - cation_summed_velocities = np.sum(self._cation_mass_weighted_velocities, axis=1) - anion_summed_velocities = np.sum(self._anion_mass_weighted_velocities, axis=1) - for i in range(self._dim): - self._acf_cation_cation[:,i] = cross_corr(cation_summed_velocities[:,i],cation_summed_velocities[:,i]) - self._acf_anion_anion[:,i] = cross_corr(anion_summed_velocities[:,i],anion_summed_velocities[:,i]) - self._acf_cation_anion[:,i] = cross_corr(cation_summed_velocities[:,i],anion_summed_velocities[:,i]) - for i in range(self._dim): - self._l_cation_cation[:,i] = integrate.cumulative_trapezoid(self._acf_cation_cation[:,i],self._times[:,i]) - self._l_anion_anion[:,i] = integrate.cumulative_trapezoid(self._acf_anion_anion[:,i],self._times[:,i]) - self._l_cation_anion[:,i] = integrate.cumulative_trapezoid(self._acf_cation_anion[:,i],self.times[:,i]) - - - self._lij_cation_cation = average_directions(self._l_cation_cation,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self._lij_cation_anion = average_directions(self._l_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self._lij_anion_anion = average_directions(self._l_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - - self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion - cation_mass_fraction = self._cation_masses.sum()/self._all_masses.sum() - anion_mass_fraction = self._anion_masses.sum()/self._all_masses.sum() - solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction - self.results.conductivity = self._cond[self.cutoff_step]*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 - self.results.cation_transference_com = ((self.cation_charge**2)*self._lij_cation_cation[self.cutoff_step] + (self.cation_charge*self.anion_charge)*self._lij_cation_anion[self.cutoff_step])/self._cond[self.cutoff_step] - self.results.anion_transference_com = ((self.anion_charge**2)*self._lij_anion_anion[self.cutoff_step] + (self.cation_charge*self.anion_charge)*self._lij_cation_anion[self.cutoff_step])/self._cond[self.cutoff_step] - self.results.cation_transference_solvent = (self.results.cation_transference_com - anion_mass_fraction)/solvent_fraction - self.results.anion_transference_solvent = (self.results.anion_transference_com -cation_mass_fraction)/solvent_fraction - - - - - def plot_acf(self): - """ - Plot the mead square displacement vs time and the fit region in the log-log scale - """ - plt.plot(self._times[:self.cutoff_step],self._acf_cation_cation[:self.cutoff_step], label = '+ +') - plt.plot(self._times[:self.cutoff_step],self._acf_cation_anion[:self.cutoff_step], label = '+ -') - plt.plot(self._times[:self.cutoff_step],self._acf_anion_anion[:self.cutoff_step], label = '- -') - - plt.grid() - plt.ylabel("Velocity Autocorrelation Function") - plt.xlabel("Time") - plt.tight_layout() - plt.show() - - - - - - + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_velocties[self._frame_index] = self.cations.velocities + self._anion_velocities[self._frame_index] = self.anions.velocities + self._coms_velocity[self._frame_index] = ( + np.multiply( + np.repeat(self._all_masses, 3, axis=0).reshape(-1, 3), + self.allatomgroup.velocities, + ).sum(axis=0) + / self._all_masses.sum() + ) + self._cation_mass_weighted_velocities[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._cation_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._cation_velocties[self._frame_index], + ) + / self.cation_mass_per_species + ) + .reshape(-1, int(self.cation_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._anion_mass_weighted_velocities[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._anion_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._anion_velocities[self._frame_index], + ) + / self.anion_mass_per_species + ) + .reshape(-1, int(self.anion_num_atoms_per_species), self._dim) + .sum(axis=1) + ) - + self._cation_mass_weighted_velocities[self._frame_index] = ( + self._cation_mass_weighted_velocities[self._frame_index] + - self._coms_velocity[self._frame_index] + ) + self._anion_mass_weighted_velocities[self._frame_index] = ( + self._anion_mass_weighted_velocities[self._frame_index] + - self._coms_velocity[self._frame_index] + ) + def _conclude(self): + """ + Calculates the conductivity coefficient via the fft. + """ + cation_summed_velocities = np.sum(self._cation_mass_weighted_velocities, axis=1) + anion_summed_velocities = np.sum(self._anion_mass_weighted_velocities, axis=1) + for i in range(self._dim): + self._acf_cation_cation[:, i] = cross_corr( + cation_summed_velocities[:, i], cation_summed_velocities[:, i] + ) + self._acf_anion_anion[:, i] = cross_corr( + anion_summed_velocities[:, i], anion_summed_velocities[:, i] + ) + self._acf_cation_anion[:, i] = cross_corr( + cation_summed_velocities[:, i], anion_summed_velocities[:, i] + ) + for i in range(self._dim): + self._l_cation_cation[:, i] = integrate.cumulative_trapezoid( + self._acf_cation_cation[:, i], self._times[:, i] + ) + self._l_anion_anion[:, i] = integrate.cumulative_trapezoid( + self._acf_anion_anion[:, i], self._times[:, i] + ) + self._l_cation_anion[:, i] = integrate.cumulative_trapezoid( + self._acf_cation_anion[:, i], self.times[:, i] + ) + self._lij_cation_cation = average_directions( + self._l_cation_cation, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_cation_anion = average_directions(self._l_anion_anion, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + self._lij_anion_anion = average_directions(self._l_cation_anion, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + self._cond = ( + (self.cation_charge**2) * self._lij_cation_cation + + (self.anion_charge**2) * self._lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * self._lij_cation_anion + ) + cation_mass_fraction = self._cation_masses.sum() / self._all_masses.sum() + anion_mass_fraction = self._anion_masses.sum() / self._all_masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction + self.results.conductivity = ( + self._cond[self.cutoff_step] + * (self.e_to_C * self.e_to_C) + * (1 / (self.fs_to_s * self.A_to_m)) + * 10 + ) + self.results.cation_transference_com = ( + (self.cation_charge**2) * self._lij_cation_cation[self.cutoff_step] + + (self.cation_charge * self.anion_charge) + * self._lij_cation_anion[self.cutoff_step] + ) / self._cond[self.cutoff_step] + self.results.anion_transference_com = ( + (self.anion_charge**2) * self._lij_anion_anion[self.cutoff_step] + + (self.cation_charge * self.anion_charge) + * self._lij_cation_anion[self.cutoff_step] + ) / self._cond[self.cutoff_step] + self.results.cation_transference_solvent = ( + self.results.cation_transference_com - anion_mass_fraction + ) / solvent_fraction + self.results.anion_transference_solvent = ( + self.results.anion_transference_com - cation_mass_fraction + ) / solvent_fraction + + def plot_acf(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot( + self._times[: self.cutoff_step], + self._acf_cation_cation[: self.cutoff_step], + label="+ +", + ) + plt.plot( + self._times[: self.cutoff_step], + self._acf_cation_anion[: self.cutoff_step], + label="+ -", + ) + plt.plot( + self._times[: self.cutoff_step], + self._acf_anion_anion[: self.cutoff_step], + label="- -", + ) + plt.grid() + plt.ylabel("Velocity Autocorrelation Function") + plt.xlabel("Time") + plt.tight_layout() + plt.show() diff --git a/transport_analysis/conductivity_msd.py b/transport_analysis/conductivity_msd.py index af592f7..d118b83 100644 --- a/transport_analysis/conductivity_msd.py +++ b/transport_analysis/conductivity_msd.py @@ -7,28 +7,33 @@ import numpy as np import matplotlib.pyplot as plt from scipy import stats -from transport_analysis.utils import msd_fft_cross_1d, msd_variance_cross_1d, average_directions +from transport_analysis.utils import ( + msd_fft_cross_1d, + msd_variance_cross_1d, + average_directions, +) if TYPE_CHECKING: from MDAnalysis.core.universe import AtomGroup + ## Need to pass the entire the universe to get the com of the universe not just the atomg class ConductivityEinstein(AnalysisBase): - """ - Class to calculate the diffusion_coefficient of a species. - Note that the slope of the mean square displacement provides - the diffusion coeffiecient. This implementation uses FFT to compute MSD faster - as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft - Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory - plays a critical role in the accuracy of the result. - - Particle velocities are required to calculate the viscosity function. Thus - you are limited to MDA trajectories that contain velocity information, e.g. - GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: - https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. - - Parameters - ---------- + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- atomgroup : AtomGroup An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. Note that :class:`UpdatingAtomGroup` instances are not accepted. @@ -41,7 +46,7 @@ class ConductivityEinstein(AnalysisBase): A tuple of two integers specifying the start and end lag-time for the linear fit of the viscosity function. If not provided, the linear fit is not performed and viscosity is not calculated. - + Attributes ---------- atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` @@ -69,253 +74,401 @@ class ConductivityEinstein(AnalysisBase): n_particles : int Number of particles viscosity was calculated over. """ - def __init__(self, + + def __init__( + self, allatomgroup: "AtomGroup", - cationgroup_query: str, - aniongroup_query: str, + cationgroup_query: str, + aniongroup_query: str, temp_avg: float = 298.0, linear_fit_window: tuple[int, int] = None, - cation_num_atoms_per_species = int, - anion_num_atoms_per_species = int, - cation_mass_per_species = float, - anion_mass_per_species = float, - cation_charge = int, - anion_charge = int, + cation_num_atoms_per_species=int, + anion_num_atoms_per_species=int, + cation_mass_per_species=float, + anion_mass_per_species=float, + cation_charge=int, + anion_charge=int, **kwargs, ) -> None: # the below line must be kept to initialize the AnalysisBase class! - super().__init__(allatomgroup.universe.trajectory, **kwargs) + super().__init__(allatomgroup.universe.trajectory, **kwargs) # after this you will be able to access `self.results` # `self.results` is a dictionary-like object # that can should used to store and retrieve results # See more at the MDAnalysis documentation: # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results - if isinstance(allatomgroup, UpdatingAtomGroup): - raise TypeError( + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( "UpdatingAtomGroups are not valid for diffusion computation" ) - self.temp_avg = temp_avg - self.linear_fit_window = linear_fit_window - - self.allatomgroup = allatomgroup - self.cations = self.allatomgroup.select_atoms(cationgroup_query) - self.anions = self.allatomgroup.select_atoms(aniongroup_query) - self.cation_num_atoms_per_species = cation_num_atoms_per_species - self.anion_num_atoms_per_species = anion_num_atoms_per_species - self.cation_mass_per_species = cation_mass_per_species - self.anion_mass_per_species = anion_mass_per_species - self.cation_charge = cation_charge - self.anion_charge = anion_charge - - self._dim = 3 - self.cation_particles = len(self.cations) - self.anion_particles = len(self.anions) - - if (self.cation_particles%self.cation_num_atoms_per_species != 0) or (self.anion_particles%self.anion_num_atoms_per_species != 0): - raise TypeError( - "Some species are fragmented. Invalid AtomGroup" - ) - - def _prepare(self): - """ + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles % self.cation_num_atoms_per_species != 0) or ( + self.anion_particles % self.anion_num_atoms_per_species != 0 + ): + raise TypeError("Some species are fragmented. Invalid AtomGroup") + + def _prepare(self): + """ Set up viscosity, mass, velocity, and position arrays before the analysis loop begins """ # 2D viscosity array of frames x particles - self._volumes = np.zeros((self.n_frames)) + self._volumes = np.zeros((self.n_frames)) - self._msds_cation_cation = np.zeros((self.n_frames,self._dim)) - self._msds_cation_anion = np.zeros((self.n_frames,self._dim)) - self._msds_anion_anion = np.zeros((self.n_frames,self._dim)) + self._msds_cation_cation = np.zeros((self.n_frames, self._dim)) + self._msds_cation_anion = np.zeros((self.n_frames, self._dim)) + self._msds_anion_anion = np.zeros((self.n_frames, self._dim)) - self._msds_cation_cation_var = np.zeros((self.n_frames,self._dim)) - self._msds_cation_anion_var = np.zeros((self.n_frames,self._dim)) - self._msds_anion_anion_var = np.zeros((self.n_frames,self._dim)) - self._cond = np.zeros((self.n_frames)) - self._cond_var = np.zeros((self.n_frames)) - + self._msds_cation_cation_var = np.zeros((self.n_frames, self._dim)) + self._msds_cation_anion_var = np.zeros((self.n_frames, self._dim)) + self._msds_anion_anion_var = np.zeros((self.n_frames, self._dim)) + self._cond = np.zeros((self.n_frames)) + self._cond_var = np.zeros((self.n_frames)) - self._lij_cation_cation = np.zeros((self.n_frames)) - self._lij_cation_anion = np.zeros((self.n_frames)) - self._lij_anion_anion = np.zeros((self.n_frames)) + self._lij_cation_cation = np.zeros((self.n_frames)) + self._lij_cation_anion = np.zeros((self.n_frames)) + self._lij_anion_anion = np.zeros((self.n_frames)) - self._lij_cation_cation_weights = np.ones((self.n_frames)) - self._lij_cation_anion_weights = np.ones((self.n_frames)) - self._lij_anion_anion_weights = np.ones((self.n_frames)) + self._lij_cation_cation_weights = np.ones((self.n_frames)) + self._lij_cation_anion_weights = np.ones((self.n_frames)) + self._lij_anion_anion_weights = np.ones((self.n_frames)) + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) - self._coms = np.zeros((self.n_frames, self._dim)) - self._times = np.zeros((self.n_frames)) - - self._cation_masses = self.cations.masses - self._anion_masses = self.anions.masses + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses # 3D arrays of frames x particles x dimensions # positions - self._cation_positions = np.zeros( + self._cation_positions = np.zeros( (self.n_frames, int(self.cation_particles), self._dim) ) - self._anion_positions = np.zeros( + self._anion_positions = np.zeros( (self.n_frames, int(self.anion_particles), self._dim) ) - self._cation_mass_weighted_positions = np.zeros( - (self.n_frames, int(self.cation_particles/self.cation_num_atoms_per_species), self._dim) + self._cation_mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.cation_particles / self.cation_num_atoms_per_species), + self._dim, + ) ) - self._anion_mass_weighted_positions = np.zeros( - (self.n_frames, int(self.anion_particles/self.anion_num_atoms_per_species), self._dim) + self._anion_mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.anion_particles / self.anion_num_atoms_per_species), + self._dim, + ) + ) + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] ) - self.J_to_KJ = 1000 - self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] - self.A_to_m = 1e-10 - self.fs_to_s = 1e-15 - self.e_to_C = constants['elementary_charge'] - - def _single_frame(self): - """ - Constructs arrays of velocities and positions - for viscosity computation. - """ + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants["elementary_charge"] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ # This runs once for each frame of the trajectory # The trajectory positions update automatically # You can access the frame number using self._frame_index # trajectory must have velocity and position information - if not ( - self._ts.has_positions - and self._ts.volume != 0 - ): - raise NoDataError( + if not (self._ts.has_positions and self._ts.volume != 0): + raise NoDataError( "Einstein diffusion computation requires positions, and box volume in the trajectory" ) - # fill volume array - self._volumes[self._frame_index] = self._ts.volume - self._times[self._frame_index] = self._ts.time - # set shape of position array - - self._cation_positions[self._frame_index] = self.cations.positions - self._anion_positions[self._frame_index] = self.anions.positions - self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) - - self._cation_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._cation_masses,self._dim,axis = 0).reshape(-1,self._dim),self._cation_positions[self._frame_index])/self.cation_mass_per_species).reshape(-1, int(self.cation_num_atoms_per_species), self._dim).sum(axis=1) - self._anion_mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._anion_masses,self._dim,axis = 0).reshape(-1,self._dim),self._anion_positions[self._frame_index])/self.anion_mass_per_species).reshape(-1, int(self.anion_num_atoms_per_species), self._dim).sum(axis=1) - - self._cation_mass_weighted_positions[self._frame_index] = self._cation_mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] - self._anion_mass_weighted_positions[self._frame_index] = self._anion_mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] - - - - - def _conclude(self): - """ - Calculates the conductivity coefficient via the fft. - """ - cation_summed_positions = np.sum(self._cation_mass_weighted_positions, axis=1) - anion_summed_positions = np.sum(self._anion_mass_weighted_positions, axis=1) - - self._msds_cation_cation = np.transpose( - [msd_fft_cross_1d(cation_summed_positions[:, i], cation_summed_positions[:, i]) for i in range(self._dim)] - ) - self._msds_cation_cation_var = np.transpose( - [msd_variance_cross_1d(cation_summed_positions[:, i], cation_summed_positions[:, i], self._msds_cation_cation[:, i]) for i in range(self._dim)] - ) - self._msds_anion_anion = np.transpose( - [msd_fft_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] - ) - self._msds_anion_anion_var = np.transpose( - [msd_variance_cross_1d(anion_summed_positions[:, i], anion_summed_positions[:, i], self._msds_anion_anion[:, i]) for i in range(self._dim)] - ) - self._msds_cation_anion = np.transpose( - [msd_fft_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i]) for i in range(self._dim)] - ) - self._msds_cation_anion_var = np.transpose( - [msd_variance_cross_1d(cation_summed_positions[:, i], anion_summed_positions[:, i], self._msds_cation_anion[:, i]) for i in range(self._dim)] - ) - self._lij_cation_cation = average_directions(self._msds_cation_cation,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self._lij_cation_anion = average_directions(self._msds_cation_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self._lij_anion_anion = average_directions(self._msds_anion_anion,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - - #self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion - #self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var - - self._lij_cation_cation_weights = np.sqrt(np.abs(1/average_directions(self._msds_cation_cation_var,self._dim))) - self._lij_cation_cation_weights /= np.sum(self._lij_cation_cation_weights) - - self._lij_cation_anion_weights = np.sqrt(np.abs(1/average_directions(self._msds_cation_anion_var,self._dim))) - self._lij_cation_anion_weights /= np.sum(self._lij_cation_anion_weights) - - self._lij_anion_anion_weights = np.sqrt(np.abs(1/average_directions(self._msds_anion_anion_var,self._dim))) - self._lij_anion_anion_weights /= np.sum(self._lij_anion_anion_weights) - - cond = None - lij_cation_cation = None - lij_cation_anion = None - lij_anion_anion = None - beta = None - if self.linear_fit_window: - lij_cation_cation , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_cation_cation[self.linear_fit_window[0]:self.linear_fit_window[1]]) - lij_cation_anion , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_cation_anion[self.linear_fit_window[0]:self.linear_fit_window[1]]) - lij_anion_anion , _ , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lij_anion_anion[self.linear_fit_window[0]:self.linear_fit_window[1]]) - cond = (self.cation_charge**2)*lij_cation_cation + (self.anion_charge**2)*lij_anion_anion + (2*self.cation_charge*self.anion_charge)*lij_cation_anion - - self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion - fit_slope = np.gradient(np.log(self._cond[self.linear_fit_window[0]:self.linear_fit_window[1]])[1:], np.log(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] - self._times[0])[1:]) - beta = np.nanmean(np.array(fit_slope)) - - else: - _ , lij_cation_cation = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_cation_cation[1:], 1, w=self._lij_cation_cation_weights[1:]) - _ , lij_cation_anion = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_cation_anion[1:], 1, w=self._lij_cation_anion_weights[1:]) - _ , lij_anion_anion = np.polynomial.polynomial.polyfit(self._times[1:], self._msds_anion_anion[1:], 1, w=self._lij_anion_anion_weights[1:]) - cond = (self.cation_charge**2)*lij_cation_cation + (self.anion_charge**2)*lij_anion_anion + (2*self.cation_charge*self.anion_charge)*lij_cation_anion - - #fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) - #beta = np.average(np.array(fit_slope), weights=self.weights[1:]) - if self.linear_fit_window: - self.results.linearity = beta - #self.results.fit_slope = cond - #self.results.fit_intercept = cond_intercept - cation_mass_fraction = self._cation_masses.sum()/self.allatomgroup.masses.sum() - anion_mass_fraction = self._anion_masses.sum()/self.allatomgroup.masses.sum() - solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction - - self.results.conductivity = cond*(self.e_to_C*self.e_to_C)*(1/(self.fs_to_s*self.A_to_m))*10 - self.results.cation_transference_com = ((self.cation_charge**2)*lij_cation_cation + (self.cation_charge*self.anion_charge)*lij_cation_anion)/cond - self.results.anion_transference_com = ((self.anion_charge**2)*lij_anion_anion + (self.cation_charge*self.anion_charge)*lij_cation_anion)/cond - self.results.cation_transference_solvent = (self.results.cation_transference_com - anion_mass_fraction)/solvent_fraction - self.results.anion_transference_solvent = (self.results.anion_transference_com -cation_mass_fraction)/solvent_fraction - - def plot_linear_fit(self): - """ - Plot the mead square displacement vs time and the fit region in the log-log scale - """ - plt.plot(self._times, np.abs(self._lii_self)) - plt.plot(self._times, self._times*self.results.fit_slope + self.results.fit_intercept, "k--", alpha=0.5) - plt.grid() - plt.ylabel("MSD") - plt.xlabel("Time") - if self.linear_fit_window: - plt.axvline(x=self._times[self.linear_fit_window[0]], color='r', linestyle='--', linewidth=2) - plt.axvline(x=self._times[self.linear_fit_window[1]], color='r', linestyle='--', linewidth=2) - #plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) - #i = int(len(self._msds) / 5) - #slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) - #plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) - plt.tight_layout() - plt.show() - - - - - - - - - + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_positions[self._frame_index] = self.cations.positions + self._anion_positions[self._frame_index] = self.anions.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + + self._cation_mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._cation_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._cation_positions[self._frame_index], + ) + / self.cation_mass_per_species + ) + .reshape(-1, int(self.cation_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._anion_mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._anion_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._anion_positions[self._frame_index], + ) + / self.anion_mass_per_species + ) + .reshape(-1, int(self.anion_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._cation_mass_weighted_positions[self._frame_index] = ( + self._cation_mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + self._anion_mass_weighted_positions[self._frame_index] = ( + self._anion_mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + def _conclude(self): + """ + Calculates the conductivity coefficient via the fft. + """ + cation_summed_positions = np.sum(self._cation_mass_weighted_positions, axis=1) + anion_summed_positions = np.sum(self._anion_mass_weighted_positions, axis=1) + + self._msds_cation_cation = np.transpose( + [ + msd_fft_cross_1d( + cation_summed_positions[:, i], cation_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_cation_cation_var = np.transpose( + [ + msd_variance_cross_1d( + cation_summed_positions[:, i], + cation_summed_positions[:, i], + self._msds_cation_cation[:, i], + ) + for i in range(self._dim) + ] + ) + self._msds_anion_anion = np.transpose( + [ + msd_fft_cross_1d( + anion_summed_positions[:, i], anion_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_anion_anion_var = np.transpose( + [ + msd_variance_cross_1d( + anion_summed_positions[:, i], + anion_summed_positions[:, i], + self._msds_anion_anion[:, i], + ) + for i in range(self._dim) + ] + ) + self._msds_cation_anion = np.transpose( + [ + msd_fft_cross_1d( + cation_summed_positions[:, i], anion_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_cation_anion_var = np.transpose( + [ + msd_variance_cross_1d( + cation_summed_positions[:, i], + anion_summed_positions[:, i], + self._msds_cation_anion[:, i], + ) + for i in range(self._dim) + ] + ) + self._lij_cation_cation = average_directions( + self._msds_cation_cation, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_cation_anion = average_directions( + self._msds_cation_anion, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_anion_anion = average_directions( + self._msds_anion_anion, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + + # self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion + # self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var + + self._lij_cation_cation_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_cation_cation_var, self._dim)) + ) + self._lij_cation_cation_weights /= np.sum(self._lij_cation_cation_weights) + self._lij_cation_anion_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_cation_anion_var, self._dim)) + ) + self._lij_cation_anion_weights /= np.sum(self._lij_cation_anion_weights) + self._lij_anion_anion_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_anion_anion_var, self._dim)) + ) + self._lij_anion_anion_weights /= np.sum(self._lij_anion_anion_weights) + + cond = None + lij_cation_cation = None + lij_cation_anion = None + lij_anion_anion = None + beta = None + if self.linear_fit_window: + lij_cation_cation, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_cation_cation[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + lij_cation_anion, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_cation_anion[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + lij_anion_anion, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_anion_anion[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + cond = ( + (self.cation_charge**2) * lij_cation_cation + + (self.anion_charge**2) * lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * lij_cation_anion + ) + + self._cond = ( + (self.cation_charge**2) * self._lij_cation_cation + + (self.anion_charge**2) * self._lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * self._lij_cation_anion + ) + fit_slope = np.gradient( + np.log( + self._cond[self.linear_fit_window[0] : self.linear_fit_window[1]] + )[1:], + np.log( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]] + - self._times[0] + )[1:], + ) + beta = np.nanmean(np.array(fit_slope)) + + else: + _, lij_cation_cation = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_cation_cation[1:], + 1, + w=self._lij_cation_cation_weights[1:], + ) + _, lij_cation_anion = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_cation_anion[1:], + 1, + w=self._lij_cation_anion_weights[1:], + ) + _, lij_anion_anion = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_anion_anion[1:], + 1, + w=self._lij_anion_anion_weights[1:], + ) + cond = ( + (self.cation_charge**2) * lij_cation_cation + + (self.anion_charge**2) * lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * lij_cation_anion + ) + + # fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) + # beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + if self.linear_fit_window: + self.results.linearity = beta + # self.results.fit_slope = cond + # self.results.fit_intercept = cond_intercept + cation_mass_fraction = ( + self._cation_masses.sum() / self.allatomgroup.masses.sum() + ) + anion_mass_fraction = self._anion_masses.sum() / self.allatomgroup.masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction + + self.results.conductivity = ( + cond * (self.e_to_C * self.e_to_C) * (1 / (self.fs_to_s * self.A_to_m)) * 10 + ) + self.results.cation_transference_com = ( + (self.cation_charge**2) * lij_cation_cation + + (self.cation_charge * self.anion_charge) * lij_cation_anion + ) / cond + self.results.anion_transference_com = ( + (self.anion_charge**2) * lij_anion_anion + + (self.cation_charge * self.anion_charge) * lij_cation_anion + ) / cond + self.results.cation_transference_solvent = ( + self.results.cation_transference_com - anion_mass_fraction + ) / solvent_fraction + self.results.anion_transference_solvent = ( + self.results.anion_transference_com - cation_mass_fraction + ) / solvent_fraction + + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot( + self._times, + self._times * self.results.fit_slope + self.results.fit_intercept, + "k--", + alpha=0.5, + ) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline( + x=self._times[self.linear_fit_window[0]], + color="r", + linestyle="--", + linewidth=2, + ) + plt.axvline( + x=self._times[self.linear_fit_window[1]], + color="r", + linestyle="--", + linewidth=2, + ) + # plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + # i = int(len(self._msds) / 5) + # slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + # plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() diff --git a/transport_analysis/diffusion_msd.py b/transport_analysis/diffusion_msd.py index f446e3a..a3a12cf 100644 --- a/transport_analysis/diffusion_msd.py +++ b/transport_analysis/diffusion_msd.py @@ -12,23 +12,24 @@ if TYPE_CHECKING: from MDAnalysis.core.universe import AtomGroup + ## Need to pass the entire the universe to get the com of the universe not just the atomg class DiffusionCoefficientEinstein(AnalysisBase): - """ - Class to calculate the diffusion_coefficient of a species. - Note that the slope of the mean square displacement provides - the diffusion coeffiecient. This implementation uses FFT to compute MSD faster - as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft - Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory - plays a critical role in the accuracy of the result. - - Particle velocities are required to calculate the viscosity function. Thus - you are limited to MDA trajectories that contain velocity information, e.g. - GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: - https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. - - Parameters - ---------- + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- atomgroup : AtomGroup An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. Note that :class:`UpdatingAtomGroup` instances are not accepted. @@ -41,7 +42,7 @@ class DiffusionCoefficientEinstein(AnalysisBase): A tuple of two integers specifying the start and end lag-time for the linear fit of the viscosity function. If not provided, the linear fit is not performed and viscosity is not calculated. - + Attributes ---------- atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` @@ -69,169 +70,209 @@ class DiffusionCoefficientEinstein(AnalysisBase): n_particles : int Number of particles viscosity was calculated over. """ - def __init__(self, + + def __init__( + self, allatomgroup: "AtomGroup", - atomgroup_query: str, + atomgroup_query: str, temp_avg: float = 298.0, linear_fit_window: tuple[int, int] = None, - num_atoms_per_species = int, - mass_per_species = float, + num_atoms_per_species=int, + mass_per_species=float, **kwargs, ) -> None: # the below line must be kept to initialize the AnalysisBase class! - super().__init__(allatomgroup.universe.trajectory, **kwargs) + super().__init__(allatomgroup.universe.trajectory, **kwargs) # after this you will be able to access `self.results` # `self.results` is a dictionary-like object # that can should used to store and retrieve results # See more at the MDAnalysis documentation: # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results - if isinstance(allatomgroup, UpdatingAtomGroup): - raise TypeError( + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( "UpdatingAtomGroups are not valid for diffusion computation" ) - self.temp_avg = temp_avg - self.linear_fit_window = linear_fit_window + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window - self.allatomgroup = allatomgroup - self.atomgroup = self.allatomgroup.select_atoms(atomgroup_query) - self.n_particles = len(self.atomgroup) - self.num_atoms_per_species = num_atoms_per_species - self.mass_per_species = mass_per_species + self.allatomgroup = allatomgroup + self.atomgroup = self.allatomgroup.select_atoms(atomgroup_query) + self.n_particles = len(self.atomgroup) + self.num_atoms_per_species = num_atoms_per_species + self.mass_per_species = mass_per_species - self._dim = 3 + self._dim = 3 - if self.n_particles%self.num_atoms_per_species != 0: - raise TypeError( - "Some species are fragmented. Invalid AtomGroup" - ) + if self.n_particles % self.num_atoms_per_species != 0: + raise TypeError("Some species are fragmented. Invalid AtomGroup") - def _prepare(self): - """ + def _prepare(self): + """ Set up viscosity, mass, velocity, and position arrays before the analysis loop begins """ # 2D viscosity array of frames x particles - self._volumes = np.zeros((self.n_frames)) - self._msds = np.zeros((self.n_frames,self._dim)) - self._msds_var = np.zeros((self.n_frames,self._dim)) - self._lii_self = np.zeros((self.n_frames)) - self.weights = np.ones((self.n_frames)) - self._coms = np.zeros((self.n_frames, self._dim)) - self._times = np.zeros((self.n_frames)) + self._volumes = np.zeros((self.n_frames)) + self._msds = np.zeros((self.n_frames, self._dim)) + self._msds_var = np.zeros((self.n_frames, self._dim)) + self._lii_self = np.zeros((self.n_frames)) + self.weights = np.ones((self.n_frames)) + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) - self._masses = self.atomgroup.masses + self._masses = self.atomgroup.masses # 3D arrays of frames x particles x dimensions # positions - self._positions = np.zeros( - (self.n_frames, int(self.n_particles), self._dim) + self._positions = np.zeros((self.n_frames, int(self.n_particles), self._dim)) + self._mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.n_particles / self.num_atoms_per_species), + self._dim, + ) ) - self._mass_weighted_positions = np.zeros( - (self.n_frames, int(self.n_particles/self.num_atoms_per_species), self._dim) + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] ) - self.J_to_KJ = 1000 - self.boltzmann = self.J_to_KJ*constants["Boltzmann_constant"]/constants["N_Avogadro"] - self.A_to_m = 1e-10 - self.fs_to_s = 1e-15 - - def _single_frame(self): - """ - Constructs arrays of velocities and positions - for viscosity computation. - """ + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ # This runs once for each frame of the trajectory # The trajectory positions update automatically # You can access the frame number using self._frame_index # trajectory must have velocity and position information - if not ( - self._ts.has_positions - and self._ts.volume != 0 - ): - raise NoDataError( + if not (self._ts.has_positions and self._ts.volume != 0): + raise NoDataError( "Einstein diffusion computation requires positions, and box volume in the trajectory" ) - # fill volume array - self._volumes[self._frame_index] = self._ts.volume - self._times[self._frame_index] = self._ts.time - # set shape of position array - - self._positions[self._frame_index] = self.atomgroup.positions - self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) - self._mass_weighted_positions[self._frame_index] = (np.multiply(np.repeat(self._masses,self._dim,axis = 0).reshape(-1,self._dim),self._positions[self._frame_index])/self.mass_per_species).reshape(-1, int(self.num_atoms_per_species), self._dim).sum(axis=1) - self._mass_weighted_positions[self._frame_index] = self._mass_weighted_positions[self._frame_index] - self._coms[self._frame_index] - - - - def _conclude(self): - """ - Calculates the diffusion coefficient via the fft. - """ - for specie_id in (range(int(self.n_particles/self.num_atoms_per_species))): - for i in range(self._dim): - msd_temp = msd_fft_1d(np.array(self._mass_weighted_positions[:,specie_id, :][:,i])) - self._msds[:,i] += msd_temp - self._msds_var[:,i] += msd_variance_1d(np.array(self._mass_weighted_positions[:,specie_id, :][:,i]), self._msds[:,i]) - - - self._lii_self = average_directions(self._msds,self._dim)/(2*self.boltzmann*self.temp_avg*self._volumes) - self.weights = np.sqrt(np.abs(1/average_directions(self._msds_var,self._dim))) - self.weights /= np.sum(self.weights) - - lii_self = None - lii_intercept = None - beta = None - if self.linear_fit_window: - lii_self , lii_intercept , _ , _, _ = stats.linregress(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._lii_self[self.linear_fit_window[0]:self.linear_fit_window[1]]) - fit_slope = np.gradient(np.log(self._lii_self[self.linear_fit_window[0]:self.linear_fit_window[1]])[1:], np.log(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] - self._times[0])[1:]) - beta = np.nanmean(np.array(fit_slope)) - - else: - lii_intercept , lii_self = np.polynomial.polynomial.polyfit(self._times[1:], self._lii_self[1:], 1, w=self.weights[1:]) - #fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) - #beta = np.average(np.array(fit_slope), weights=self.weights[1:]) - - - conc = float(self.n_particles/self.num_atoms_per_species)/(self._volumes.mean()*(self.A_to_m**3)) - - if self.linear_fit_window: - self.results.linearity = beta - #self.results.fit_slope = lii_self - #self.results.fit_intercept = lii_intercept - self.results.lii_self = lii_self*(1/(self.A_to_m*self.fs_to_s)) - self.results.diffusion_coefficient = (lii_self*(1/(self.A_to_m*self.fs_to_s)))*(self.boltzmann*self.temp_avg/conc) - - def plot_linear_fit(self): - """ - Plot the mead square displacement vs time and the fit region in the log-log scale - """ - plt.plot(self._times, np.abs(self._lii_self)) - plt.plot(self._times, self._times*self.results.fit_slope + self.results.fit_intercept, "k--", alpha=0.5) - plt.grid() - plt.ylabel("MSD") - plt.xlabel("Time") - if self.linear_fit_window: - plt.axvline(x=self._times[self.linear_fit_window[0]], color='r', linestyle='--', linewidth=2) - plt.axvline(x=self._times[self.linear_fit_window[1]], color='r', linestyle='--', linewidth=2) - #plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) - #i = int(len(self._msds) / 5) - #slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) - #plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) - plt.tight_layout() - plt.show() - - - - - - - - - + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._positions[self._frame_index] = self.atomgroup.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + self._mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._masses, self._dim, axis=0).reshape(-1, self._dim), + self._positions[self._frame_index], + ) + / self.mass_per_species + ) + .reshape(-1, int(self.num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._mass_weighted_positions[self._frame_index] = ( + self._mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + + def _conclude(self): + """ + Calculates the diffusion coefficient via the fft. + """ + for specie_id in range(int(self.n_particles / self.num_atoms_per_species)): + for i in range(self._dim): + msd_temp = msd_fft_1d( + np.array(self._mass_weighted_positions[:, specie_id, :][:, i]) + ) + self._msds[:, i] += msd_temp + self._msds_var[:, i] += msd_variance_1d( + np.array(self._mass_weighted_positions[:, specie_id, :][:, i]), + self._msds[:, i], + ) + + self._lii_self = average_directions(self._msds, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + self.weights = np.sqrt( + np.abs(1 / average_directions(self._msds_var, self._dim)) + ) + self.weights /= np.sum(self.weights) + + lii_self = None + lii_intercept = None + beta = None + if self.linear_fit_window: + lii_self, lii_intercept, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lii_self[self.linear_fit_window[0] : self.linear_fit_window[1]], + ) + fit_slope = np.gradient( + np.log( + self._lii_self[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ] + )[1:], + np.log( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]] + - self._times[0] + )[1:], + ) + beta = np.nanmean(np.array(fit_slope)) + else: + lii_intercept, lii_self = np.polynomial.polynomial.polyfit( + self._times[1:], self._lii_self[1:], 1, w=self.weights[1:] + ) + # fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) + # beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + conc = float(self.n_particles / self.num_atoms_per_species) / ( + self._volumes.mean() * (self.A_to_m**3) + ) + if self.linear_fit_window: + self.results.linearity = beta + # self.results.fit_slope = lii_self + # self.results.fit_intercept = lii_intercept + self.results.lii_self = lii_self * (1 / (self.A_to_m * self.fs_to_s)) + self.results.diffusion_coefficient = ( + lii_self * (1 / (self.A_to_m * self.fs_to_s)) + ) * (self.boltzmann * self.temp_avg / conc) + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot( + self._times, + self._times * self.results.fit_slope + self.results.fit_intercept, + "k--", + alpha=0.5, + ) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline( + x=self._times[self.linear_fit_window[0]], + color="r", + linestyle="--", + linewidth=2, + ) + plt.axvline( + x=self._times[self.linear_fit_window[1]], + color="r", + linestyle="--", + linewidth=2, + ) + # plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + # i = int(len(self._msds) / 5) + # slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + # plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() diff --git a/transport_analysis/due.py b/transport_analysis/due.py index 7f51acd..2a10d19 100644 --- a/transport_analysis/due.py +++ b/transport_analysis/due.py @@ -65,9 +65,7 @@ def _donothing_func(*args, **kwargs): ) # lgtm [py/unused-import] if "due" in locals() and not hasattr(due, "cite"): - raise RuntimeError( - "Imported due lacks .cite. DueCredit is now disabled" - ) + raise RuntimeError("Imported due lacks .cite. DueCredit is now disabled") except Exception as e: if not isinstance(e, ImportError): import logging diff --git a/transport_analysis/tests/test_velocityautocorr.py b/transport_analysis/tests/test_velocityautocorr.py index 975edfe..5f69bba 100644 --- a/transport_analysis/tests/test_velocityautocorr.py +++ b/transport_analysis/tests/test_velocityautocorr.py @@ -93,9 +93,7 @@ def characteristic_poly(last, n_dim, first=0, step=1): return result -@pytest.mark.parametrize( - "tdim, tdim_keys", [(1, [0]), (2, [0, 1]), (3, [0, 1, 2])] -) +@pytest.mark.parametrize("tdim, tdim_keys", [(1, [0]), (2, [0, 1]), (3, [0, 1, 2])]) def test_characteristic_poly(step_vtraj, NSTEP, tdim, tdim_keys): # test `characteristic_poly()` against `tidynamics.acf()`` @@ -262,9 +260,7 @@ def test_plot_running_integral_custom_labels(self, vacf): assert x_act == x_exp assert y_act == y_exp - def test_plot_running_integral_start_stop_step( - self, vacf, start=1, stop=9, step=2 - ): + def test_plot_running_integral_start_stop_step(self, vacf, start=1, stop=9, step=2): t_range = range(start, stop, step) # Expected data to be plotted x_exp = vacf.times[start:stop:step] @@ -328,9 +324,7 @@ def test_fft_vs_simple_default_per_particle(self, vacf, vacf_fft): ], ) class TestAllDims: - def test_simple_step_vtraj_all_dims( - self, step_vtraj, NSTEP, tdim, tdim_factor - ): + def test_simple_step_vtraj_all_dims(self, step_vtraj, NSTEP, tdim, tdim_factor): # testing the "simple" windowed algorithm on unit velocity trajectory # VACF results should fit the polynomial defined in # characteristic_poly() @@ -353,9 +347,7 @@ def test_simple_start_stop_step_all_dims( # test start stop step is working correctly v_simple = VACF(step_vtraj.atoms, dim_type=tdim, fft=False) v_simple.run(start=tstart, stop=tstop, step=tstep) - poly = characteristic_poly( - tstop, tdim_factor, first=tstart, step=tstep - ) + poly = characteristic_poly(tstop, tdim_factor, first=tstart, step=tstep) # polynomial must take offset start into account assert_almost_equal(v_simple.results.timeseries, poly, decimal=4) @@ -370,9 +362,7 @@ def test_self_diffusivity_step_vtraj_all_dims( v_simple.run() sd_actual = v_simple.self_diffusivity_gk() sd_expected = ( - integrate.simpson( - y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP) - ) + integrate.simpson(y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP)) / tdim_factor ) # 24307638750.0 (act) agrees with 24307638888.888885 (exp) to 8 sig figs @@ -393,9 +383,7 @@ def test_self_diffusivity_start_stop_step_all_dims( # check that start, stop, step is working correctly v_simple = VACF(step_vtraj.atoms, dim_type=tdim, fft=False) v_simple.run() - sd_actual = v_simple.self_diffusivity_gk( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_simple.self_diffusivity_gk(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.simpson( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], @@ -415,9 +403,7 @@ def test_self_diffusivity_odd_step_vtraj_all_dims( v_simple.run() sd_actual = v_simple.self_diffusivity_gk_odd() sd_expected = ( - integrate.trapezoid( - characteristic_poly(NSTEP, tdim_factor), range(NSTEP) - ) + integrate.trapezoid(characteristic_poly(NSTEP, tdim_factor), range(NSTEP)) / tdim_factor ) # 24307638750.0 (exp) agrees with 24307638888.888885 (act) to 8 sig figs @@ -451,9 +437,7 @@ def test_self_diffusivity_odd_start_stop_step_all_dims( # 7705160166.66 (exp) agrees with 7705162888.88 (act) to 6 sig figs assert_approx_equal(sd_actual, sd_expected, significant=6) - def test_fft_step_vtraj_all_dims( - self, step_vtraj, NSTEP, tdim, tdim_factor - ): + def test_fft_step_vtraj_all_dims(self, step_vtraj, NSTEP, tdim, tdim_factor): # testing the fft algorithm on unit velocity trajectory # defined in step_vtraj() # VACF results should fit the characteristic polynomial defined in @@ -476,9 +460,7 @@ def test_fft_start_stop_step_all_dims( # test start stop step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run(start=tstart, stop=tstop, step=tstep) - poly = characteristic_poly( - tstop, tdim_factor, first=tstart, step=tstep - ) + poly = characteristic_poly(tstop, tdim_factor, first=tstart, step=tstep) # polynomial must take offset start into account assert_almost_equal(v_fft.results.timeseries, poly, decimal=3) @@ -493,9 +475,7 @@ def test_fft_self_diffusivity_step_vtraj_all_dims( v_fft.run() sd_actual = v_fft.self_diffusivity_gk() sd_expected = ( - integrate.simpson( - y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP) - ) + integrate.simpson(y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP)) / tdim_factor ) # 24307638750.0 (act) agrees with 24307638888.888885 (exp) to 8 sig figs @@ -516,9 +496,7 @@ def test_fft_self_diffusivity_start_stop_step_all_dims( # check that start, stop, step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run() - sd_actual = v_fft.self_diffusivity_gk( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_fft.self_diffusivity_gk(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.simpson( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], @@ -561,9 +539,7 @@ def test_fft_self_diffusivity_odd_start_stop_step_all_dims( # check that start, stop, step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run() - sd_actual = v_fft.self_diffusivity_gk_odd( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_fft.self_diffusivity_gk_odd(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.trapezoid( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], diff --git a/transport_analysis/tests/test_viscosity.py b/transport_analysis/tests/test_viscosity.py index 6e8ae8f..c85342f 100644 --- a/transport_analysis/tests/test_viscosity.py +++ b/transport_analysis/tests/test_viscosity.py @@ -177,9 +177,7 @@ def test_ec_universe(self, u_ec): ], ) class TestAllDims: - def test_step_vtraj_all_dims( - self, step_vtraj_full, NSTEP, tdim, tdim_factor - ): + def test_step_vtraj_all_dims(self, step_vtraj_full, NSTEP, tdim, tdim_factor): # Helfand viscosity results should agree with the unit velocity traj # defined in characteristic_poly_helfand() vis_h = VH(step_vtraj_full.atoms, dim_type=tdim) diff --git a/transport_analysis/tests/utils.py b/transport_analysis/tests/utils.py index a3b1d89..03b1495 100644 --- a/transport_analysis/tests/utils.py +++ b/transport_analysis/tests/utils.py @@ -50,9 +50,7 @@ def make_Universe( n_residues=n_residues, n_segments=n_segments, atom_resindex=np.repeat(np.arange(n_residues), n_atoms // n_residues), - residue_segindex=np.repeat( - np.arange(n_segments), n_residues // n_segments - ), + residue_segindex=np.repeat(np.arange(n_segments), n_residues // n_segments), # trajectory things trajectory=trajectory, velocities=velocities, diff --git a/transport_analysis/utils.py b/transport_analysis/utils.py index 755069c..bfbbf43 100644 --- a/transport_analysis/utils.py +++ b/transport_analysis/utils.py @@ -1,5 +1,6 @@ import numpy as np + def autocorrFFT(x): """ Calculates the autocorrelation function using the fast Fourier transform. @@ -7,15 +8,16 @@ def autocorrFFT(x): :param x: array[float], function on which to compute autocorrelation function :return: acf: array[float], autocorrelation function """ - N= len(x) - F = np.fft.fft(x, n=2*N) + N = len(x) + F = np.fft.fft(x, n=2 * N) PSD = F * F.conjugate() res = np.fft.ifft(PSD) - res= (res[:N]).real - n=N*np.ones(N)-np.arange(0,N) - acf = res/n + res = (res[:N]).real + n = N * np.ones(N) - np.arange(0, N) + acf = res / n return acf + def msd_fft_1d(r): """Calculates mean square displacement of the array r using the fast Fourier transform.""" # Algorithm based on https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft @@ -43,6 +45,7 @@ def cross_corr(x, y): n = N * np.ones(N) - np.arange(0, N) # divide res(m) by (N-m) return res / n + def msd_variance_1d(r, msd): # compute A1, recursive relation with D = r^4 @@ -62,10 +65,13 @@ def msd_variance_1d(r, msd): A3 = cross_corr(r, r**3) A4 = cross_corr(r**3, r) - var_x = A1 + 6*A2 - 4*A3 - 4*A4 - msd**2 - n_minus_m = N * np.ones(N) - np.arange(0, N) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + var_x = A1 + 6 * A2 - 4 * A3 - 4 * A4 - msd**2 + n_minus_m = N * np.ones(N) - np.arange( + 0, N + ) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + + return var_x / n_minus_m - return var_x/n_minus_m def msd_fft_cross_1d(r, k): """Calculates mean square displacement of the array r using the fast Fourier transform.""" @@ -82,11 +88,12 @@ def msd_fft_cross_1d(r, k): S1[m] = Q / (N - m) return S1 - S2 - S3 + def msd_variance_cross_1d(r1, r2, msd): # compute A1, recursive relation with D = r1^2 r2^2 N = len(r1) - D = np.square(r1)*np.square(r2) + D = np.square(r1) * np.square(r2) D = np.append(D, 0) Q = 2 * D.sum() A1 = np.zeros(N) @@ -95,38 +102,38 @@ def msd_variance_cross_1d(r1, r2, msd): A1[m] = Q / (N - m) # compute A2, cross correlation of r1^2r2 and r2 - A2 = cross_corr(np.square(r1)*r2, r2) + A2 = cross_corr(np.square(r1) * r2, r2) # compute A3, cross correlation of r1^2 and r2^2 A3 = cross_corr(np.square(r1), np.square(r2)) # compute A4, cross correlation of (r1r2^2) and r1 - A4 = cross_corr(r1*np.square(r2), r1) + A4 = cross_corr(r1 * np.square(r2), r1) # compute A5, cross correlation of r1r2 and r1r2 - A5 = cross_corr(r1*r2, r1*r2) + A5 = cross_corr(r1 * r2, r1 * r2) # compute A6, cross correlation of r1 and r1r2^2 - A6 = cross_corr(r1, np.square(r2)*r1) + A6 = cross_corr(r1, np.square(r2) * r1) # compute A7, cross correlation of r2^2 and r1^2 A7 = cross_corr(np.square(r2), np.square(r1)) # compute A8, cross correlation of r2 and r1^2r2 - A8 = cross_corr(r2, np.square(r1)*r2) + A8 = cross_corr(r2, np.square(r1) * r2) - var_x = A1 - 2*A2 + A3 - 2*A4 +4*A5 - 2*A6 + A7 - 2*A8 - msd**2 - n_minus_m = N * np.ones(N) - np.arange(0, N) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) - - return var_x/n_minus_m + var_x = A1 - 2 * A2 + A3 - 2 * A4 + 4 * A5 - 2 * A6 + A7 - 2 * A8 - msd**2 + n_minus_m = N * np.ones(N) - np.arange( + 0, N + ) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + return var_x / n_minus_m def average_directions(r, dim): if dim == 3: - return (r[:,0] + r[:,1] + r[:,2])/3 + return (r[:, 0] + r[:, 1] + r[:, 2]) / 3 if dim == 2: - return (r[:,0] + r[:,1])/2 + return (r[:, 0] + r[:, 1]) / 2 if dim == 1: - return r[:,0] - + return r[:, 0] diff --git a/transport_analysis/velocityautocorr.py b/transport_analysis/velocityautocorr.py index 3718272..d7d35da 100644 --- a/transport_analysis/velocityautocorr.py +++ b/transport_analysis/velocityautocorr.py @@ -47,6 +47,7 @@ enough to obtain a VACF suitable for your needs. """ + from typing import TYPE_CHECKING from MDAnalysis.analysis.base import AnalysisBase @@ -125,9 +126,7 @@ def __init__( # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results if isinstance(atomgroup, UpdatingAtomGroup): - raise TypeError( - "UpdatingAtomGroups are not valid for VACF computation" - ) + raise TypeError("UpdatingAtomGroups are not valid for VACF computation") # args self.dim_type = dim_type.lower() @@ -142,14 +141,10 @@ def __init__( def _prepare(self): """Set up velocity and VACF arrays before the analysis loop begins""" # 2D array of frames x particles - self.results.vacf_by_particle = np.zeros( - (self.n_frames, self.n_particles) - ) + self.results.vacf_by_particle = np.zeros((self.n_frames, self.n_particles)) # 3D array of frames x particles x dimensionality - self._velocities = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._velocities = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) # self.results.timeseries not set here @staticmethod @@ -184,14 +179,10 @@ def _single_frame(self): # trajectory must have velocity information if not self._ts.has_velocities: - raise NoDataError( - "VACF computation requires velocities in the trajectory" - ) + raise NoDataError("VACF computation requires velocities in the trajectory") # set shape of velocity array - self._velocities[self._frame_index] = self.atomgroup.velocities[ - :, self._dim - ] + self._velocities[self._frame_index] = self.atomgroup.velocities[:, self._dim] # Results will be in units of (angstroms / ps)^2 def _conclude(self): @@ -222,10 +213,7 @@ def _conclude_simple(self): # iterate through all possible lagtimes up to N for lag in range(N): # get product of velocities shifted by "lag" frames - veloc = ( - self._velocities[: N - lag, :, :] - * self._velocities[lag:, :, :] - ) + veloc = self._velocities[: N - lag, :, :] * self._velocities[lag:, :, :] # dot product of x(, y, z) velocities per particle sum_veloc = np.sum(veloc, axis=-1) diff --git a/transport_analysis/viscosity.py b/transport_analysis/viscosity.py index ea5d958..d468086 100644 --- a/transport_analysis/viscosity.py +++ b/transport_analysis/viscosity.py @@ -114,9 +114,7 @@ def _prepare(self): before the analysis loop begins """ # 2D viscosity array of frames x particles - self.results.visc_by_particle = np.zeros( - (self.n_frames, self.n_particles) - ) + self.results.visc_by_particle = np.zeros((self.n_frames, self.n_particles)) self._volumes = np.zeros((self.n_frames)) @@ -125,13 +123,9 @@ def _prepare(self): # 3D arrays of frames x particles x dimensionality # for velocities and positions - self._velocities = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._velocities = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) - self._positions = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._positions = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) # self.results.timeseries not set here # update when mda 2.6.0 releases with typo fix @@ -176,9 +170,7 @@ def _single_frame(self): # trajectory must have velocity and position information if not ( - self._ts.has_velocities - and self._ts.has_positions - and self._ts.volume != 0 + self._ts.has_velocities and self._ts.has_positions and self._ts.volume != 0 ): raise NoDataError( "Helfand viscosity computation requires " @@ -189,14 +181,10 @@ def _single_frame(self): self._volumes[self._frame_index] = self._ts.volume # set shape of velocity array - self._velocities[self._frame_index] = self.atomgroup.velocities[ - :, self._dim - ] + self._velocities[self._frame_index] = self.atomgroup.velocities[:, self._dim] # set shape of position array - self._positions[self._frame_index] = self.atomgroup.positions[ - :, self._dim - ] + self._positions[self._frame_index] = self.atomgroup.positions[:, self._dim] def _conclude(self): """ @@ -260,9 +248,7 @@ def plot_viscosity_function(self): self.linear_fit_window[0], self.linear_fit_window[1], ) - plt.axvline( - fit_start, color="red", linestyle="--", label="Fit Start" - ) + plt.axvline(fit_start, color="red", linestyle="--", label="Fit Start") plt.axvline(fit_end, color="blue", linestyle="--", label="Fit End") plt.xlabel("Lag-time")