# %% Imports
# Standard library imports
from pathlib import Path
from typing import Union
import warnings
# Package imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
import plotly_express as px
import seaborn as sns
import yaml
from larch import Group
from larch.xafs import find_e0, pre_edge, fluo_corr
from larch.xray import xray_edge
from lmfit import Parameters, minimize
from sklearn.decomposition import NMF, PCA
# from pymcr.mcr import McrAR
from tqdm.auto import tqdm
# %% Setup
pd.options.mode.chained_assignment = None # default='warn'
sns.set_theme()
pio.renderers.default = "vscode"
# %% autoXAS class
[docs]
class autoXAS:
def __init__(self) -> None:
# Data parameters
self.data_directory = None
self.data_format = ".dat"
self.data = None
self.raw_data = None
self.standards_config = None
self.experiments = None
self.save_directory = "./"
self.energy_column = None
self.I0_columns = None
self.I1_columns = None
self.temperature_column = None
self.metals = None
self.edges = {}
self.xas_mode = "Fluorescence"
self.fluorescence_correction = False
self.formula = None
self.energy_unit = "eV"
self.energy_column_unitConversion = 1
self.temperature_unit = "K"
self.interactive = False
self.edge_correction_energies = {}
# Standards parameters
self.data_directory_standards = None
self.data_format_standards = ".dat"
self.standards = None
self.raw_standards = None
self.standard_experiments = None
self.energy_column_standards = None
self.I0_columns_standards = None
self.I1_columns_standards = None
self.temperature_column_standards = None
self.xas_mode_standards = "Fluorescence"
self.fluorescence_correction_standards = False
self.energy_unit_standards = "eV"
self.energy_column_unitConversion_standards = 1
self.temperature_unit_standards = "K"
# Analysis dataframes
self.LCA_result = None
self.PCA_result = None
self.NMF_result = None
self.NMF_component_results = None
[docs]
def save_config(
self, config_name: str, save_directory: str = "./", standards: bool = False
) -> None:
"""
Save configuration file.
Args:
config_name (str): Name of the configuration file.
save_directory (str, optional): Directory where the configuration file will be saved. Defaults to "./".
Returns:
None: Function does not return anything.
"""
if standards:
config = dict(
data_directory=self.data_directory_standards,
standards_config=self.standards_config,
data_format=self.data_format_standards,
energy_column=self.energy_column_standards,
I0_columns=self.I0_columns_standards,
I1_columns=self.I1_columns_standards,
temperature_column=self.temperature_column_standards,
edges=self.edges,
xas_mode=self.xas_mode_standards,
energy_unit=self.energy_unit_standards,
energy_column_unitConversion=self.energy_column_unitConversion_standards,
temperature_unit=self.temperature_unit_standards,
save_directory=self.save_directory_standards,
)
with open(save_directory + config_name + "_standards", "w") as file:
yaml.dump(config, file)
else:
config = dict(
data_directory=self.data_directory,
data_format=self.data_format,
energy_column=self.energy_column,
I0_columns=self.I0_columns,
I1_columns=self.I1_columns,
temperature_column=self.temperature_column,
edges=self.edges,
xas_mode=self.xas_mode,
energy_unit=self.energy_unit,
energy_column_unitConversion=self.energy_column_unitConversion,
temperature_unit=self.temperature_unit,
save_directory=self.save_directory,
)
with open(save_directory + config_name, "w") as file:
yaml.dump(config, file)
return None
[docs]
def load_config(self, configuration_file: str, standards: bool = False) -> None:
"""
Load configuration file.
Args:
configuration_file (str): Path to the configuration file.
Returns:
None: Function does not return anything.
"""
with open(configuration_file, "r") as file:
config = yaml.load(file, Loader=yaml.FullLoader)
if standards:
# Set configuration parameters if they exist
if "data_directory" in config:
self.data_directory_standards = config["data_directory"]
if "data_format" in config:
self.data_format_standards = config["data_format"]
if "energy_column" in config:
self.energy_column_standards = config["energy_column"]
if "I0_columns" in config:
self.I0_columns_standards = config["I0_columns"]
if "I1_columns" in config:
self.I1_columns_standards = config["I1_columns"]
if "temperature_column" in config:
self.temperature_column_standards = config["temperature_column"]
if "xas_mode" in config:
self.xas_mode_standards = config["xas_mode"]
if self.xas_mode_standards not in [
"Fluorescence",
"Transmission",
"Pre-calculated",
]:
raise ValueError("Invalid XAS mode")
if "fluorescence_correction" in config:
self.fluorescence_correction_standards = config[
"fluorescence_correction"
]
if "energy_unit" in config:
self.energy_unit_standards = config["energy_unit"]
if "energy_column_unitConversion" in config:
self.energy_column_unitConversion_standards = config[
"energy_column_unitConversion"
]
if "temperature_unit" in config:
self.temperature_unit_standards = config["temperature_unit"]
if "save_directory" in config:
self.save_directory_standards = config["save_directory"]
if not Path(self.save_directory_standards).exists():
Path(self.save_directory_standards).mkdir(
parents=True, exist_ok=True
)
else:
# Set configuration parameters if they exist
if "data_directory" in config:
self.data_directory = config["data_directory"]
if "data_format" in config:
self.data_format = config["data_format"]
if "energy_column" in config:
self.energy_column = config["energy_column"]
if "I0_columns" in config:
self.I0_columns = config["I0_columns"]
if "I1_columns" in config:
self.I1_columns = config["I1_columns"]
if "temperature_column" in config:
self.temperature_column = config["temperature_column"]
if "edges" in config:
self.edges = config["edges"]
self.metals = list(self.edges.keys())
if "edge_correction_energies" in config:
self.edge_correction_energies = config["edge_correction_energies"]
if "xas_mode" in config:
self.xas_mode = config["xas_mode"]
if self.xas_mode not in [
"Fluorescence",
"Transmission",
"Pre-calculated",
]:
raise ValueError("Invalid XAS mode")
if "fluorescence_correction" in config:
self.fluorescence_correction = config["fluorescence_correction"]
if "sample_formula" in config:
self.formula = config["sample_formula"]
if "energy_unit" in config:
self.energy_unit = config["energy_unit"]
if "energy_column_unitConversion" in config:
self.energy_column_unitConversion = config[
"energy_column_unitConversion"
]
if "temperature_unit" in config:
self.temperature_unit = config["temperature_unit"]
if "save_directory" in config:
self.save_directory = config["save_directory"]
if not Path(self.save_directory).exists():
Path(self.save_directory).mkdir(parents=True, exist_ok=True)
if "standards_config" in config:
self.standards_config = config["standards_config"]
self.load_config(self.standards_config, standards=True)
return None
def _read_data(self, standards: bool = False) -> None:
"""
Read data files and store in DataFrame.
Args:
standards (bool, optional): Whether to read standards. Defaults to False.
Raises:
ValueError: No data directory specified.
NotImplementedError: HDF5 file reading not implemented yet.
Returns:
None: Function does not return anything.
"""
# TODO: Supporting potential steps as an alternative to time steps. They will most likely come as seperate files instead of the single files for time steps.
if standards:
if self.data_directory_standards is None:
raise ValueError("No standards directory specified")
else:
if self.data_directory is None:
raise ValueError("No data directory specified")
if self.data_format == ".dat":
if standards:
data_files = list(Path(self.data_directory_standards).rglob("*.dat"))
else:
data_files = list(Path(self.data_directory).rglob("*.dat"))
for file in tqdm(data_files, desc="Reading data files", leave=False):
rows_to_skip = 0
last_line = ""
with open(file, "r") as f:
for line in f:
if line.startswith("\n") or line.startswith("#"):
last_line = line
rows_to_skip += 1
else:
columns = last_line.split()[1:]
break
raw_data = pd.read_csv(
file,
sep="\s+",
header=None,
names=columns,
skiprows=rows_to_skip,
skip_blank_lines=True,
on_bad_lines="skip",
keep_default_na=False,
)
raw_data = raw_data.apply(
pd.to_numeric, errors="coerce"
) # , downcast='float')
raw_data.dropna(inplace=True)
data = pd.DataFrame()
raw_data["File"] = file.name
data["File"] = raw_data["File"]
data["Experiment"] = file.stem
for fragment in file.stem.split("_"):
if len(fragment) < 3 and fragment.isalpha():
data["Metal"] = fragment
if data.get("Metal") is None:
if self.metals is not None:
for metal in self.metals:
if metal in fragment:
data["Metal"] = metal
break
# Calculate energy
if standards:
data["Energy"] = (
raw_data[self.energy_column_standards]
* self.energy_column_unitConversion_standards
)
else:
data["Energy"] = (
raw_data[self.energy_column] * self.energy_column_unitConversion
)
# Calculate temperature
if self.temperature_column is not None and not standards:
data["Temperature"] = raw_data[self.temperature_column]
elif self.temperature_column_standards is not None and standards:
data["Temperature"] = raw_data[self.temperature_column_standards]
else:
data["Temperature"] = 0 # Placeholder for temperature
data["Temperature (std)"] = 0 # Placeholder for temperature std
# Get I0 and I1 columns
if standards:
# Calculate I0
if isinstance(self.I0_columns_standards, list):
data["I0"] = 0
for column in self.I0_columns_standards:
data["I0"] += raw_data[column]
elif isinstance(self.I0_columns_standards, str):
data["I0"] = raw_data[self.I0_columns_standards]
# Calculate I1
if isinstance(self.I1_columns_standards, list):
data["I1"] = 0
for column in self.I1_columns_standards:
data["I1"] += raw_data[column]
elif isinstance(self.I1_columns_standards, str):
data["I1"] = raw_data[self.I1_columns_standards]
else:
# Calculate I0
if isinstance(self.I0_columns, list):
data["I0"] = 0
for column in self.I0_columns:
data["I0"] += raw_data[column]
elif isinstance(self.I0_columns, str):
data["I0"] = raw_data[self.I0_columns]
# Calculate I1
if isinstance(self.I1_columns, list):
data["I1"] = 0
for column in self.I1_columns:
data["I1"] += raw_data[column]
elif isinstance(self.I1_columns, str):
data["I1"] = raw_data[self.I1_columns]
# Calculate absorption coefficient
if standards:
if self.xas_mode_standards == "Fluorescence":
data["mu"] = data["I1"] / data["I0"]
elif self.xas_mode_standards == "Transmission":
data["mu"] = np.log(data["I0"] / data["I1"])
elif self.xas_mode_standards == "Pre-calculated":
data["mu"] = raw_data["mu"]
else:
raise ValueError("Invalid XAS mode")
else:
if self.xas_mode == "Fluorescence":
data["mu"] = data["I1"] / data["I0"]
elif self.xas_mode == "Transmission":
data["mu"] = np.log(data["I0"] / data["I1"])
elif self.xas_mode == "Pre-calculated":
data["mu"] = raw_data["mu"]
else:
raise ValueError("Invalid XAS mode")
# Remove data points with energy = 0
data = data[data["Energy"] != 0]
# Determine which measurement each data point belongs to
measurement_number = 1
measurement_number_values = []
for energy_step in data["Energy"].diff().round(1):
if energy_step < 0:
measurement_number += 1
measurement_number_values.append(int(measurement_number))
data["Measurement"] = measurement_number_values
# Ensure all measurements have the same number of data points
for metal in data["Metal"].unique():
metal_filter = data["Metal"] == metal
data_point_count = data["Measurement"][metal_filter].value_counts()
if len(data_point_count) > 1:
most_common_count = data_point_count.value_counts().idxmax()
outlier_indices = data_point_count[
data_point_count != most_common_count
].index
# Deal with outliers
for outlier_index in outlier_indices:
if data_point_count[outlier_index] > most_common_count:
# Remove data points from the end of the measurement
data.drop(
data[
metal_filter
& (data["Measurement"] == outlier_index)
].index[
-(
data_point_count[outlier_index]
- most_common_count
) :
],
inplace=True,
)
print(
f"Removed {data_point_count[outlier_index] - most_common_count} data points from the end of measurement {outlier_index} for {metal}."
)
elif data_point_count[outlier_index] < most_common_count:
# Remove the measurement
data.drop(
data[
metal_filter
& (data["Measurement"] == outlier_index)
].index,
inplace=True,
)
print(
f"Removed measurement {outlier_index} for {metal} due to an unexpected low number of data points."
)
# Calculate mean and standard deviation of temperature for each measurement
for metal in data["Metal"].unique():
metal_filter = data["Metal"] == metal
for measurement in data["Measurement"][metal_filter].unique():
measurement_filter = (data["Metal"] == metal) & (
data["Measurement"] == measurement
)
temperature_mean = data["Temperature"][
measurement_filter
].mean()
temperature_std = data["Temperature"][measurement_filter].std()
data.loc[measurement_filter, "Temperature"] = temperature_mean
data.loc[measurement_filter, "Temperature (std)"] = (
temperature_std
)
# Specify data types in specific columns
data = data.astype(
{"Experiment": str, "Metal": str, "Measurement": int}
)
if standards:
if self.raw_standards is None:
self.raw_standards = raw_data
else:
self.raw_standards = pd.concat([self.raw_standards, raw_data])
if self.standards is None:
self.standards = data
else:
self.standards = pd.concat([self.standards, data]).reset_index(
drop=True
)
else:
if self.raw_data is None:
self.raw_data = raw_data
else:
self.raw_data = pd.concat([self.raw_data, raw_data])
if self.data is None:
self.data = data
else:
self.data = pd.concat([self.data, data]).reset_index(drop=True)
elif self.data_format == ".h5":
raise NotImplementedError("HDF5 file reading not implemented yet")
return None
def _fluorescence_correction(self, standards: bool = False) -> None:
"""
Apply fluorescence correction to the data using the `larch.xafs.fluo_corr` function.
Args:
standards (bool, optional): Whether to apply fluorescence correction to standards. Defaults to False.
Returns:
None: Function does not return anything.
"""
dataframe = self.standards if standards else self.data
experiments = self.standard_experiments if standards else self.experiments
if self.edges is None or self.edges == {}:
raise ValueError("No edges specified for fluorescence correction")
if self.formula is None or self.formula == "":
raise ValueError("No formula specified for fluorescence correction")
# dataframe["mu_corr"] = 0.0 # Initialize corrected mu column
for experiment in tqdm(
experiments, desc="Fluorescence correction", leave=False
):
experiment_filter = dataframe["Experiment"] == experiment
n_measurements = dataframe["Measurement"][experiment_filter].max()
formula = experiment if standards else self.formula
if "acac" in formula:
formula = formula.replace("acac", "C5H7O2")
element = dataframe["Metal"][experiment_filter].values[0]
edge = self.edges.get(
dataframe["Metal"][
(experiment_filter) & (dataframe["Measurement"] == 1)
].values[0],
"K",
)
# Apply fluorescence correction
for measurement in range(1, n_measurements + 1):
measurement_filter = (experiment_filter) & (
dataframe["Measurement"] == measurement
)
dummy_group = Group(name="dummy")
fluo_corr(
dataframe["Energy"][measurement_filter],
dataframe["mu"][measurement_filter],
formula=formula,
elem=element,
edge=edge,
group=dummy_group,
)
dataframe["mu"][measurement_filter] = dummy_group.mu_corr
# Update the dataframe with the corrected data
if standards:
self.standards = dataframe
else:
self.data = dataframe
return None
def _calculate_edge_shift(self, edges: dict, standards: bool = False) -> None:
"""
Calculate the shift in edge energy for each metal and edge pair.
Args:
edges (dict): Dictionary containing the metal and edge pairs.
standards (bool, optional): Whether to calculate edge shift for standards. Defaults to False.
Returns:
None: Function does not return anything.
"""
dataframe = self.standards if standards else self.data
experiments = self.standard_experiments if standards else self.experiments
for metal, edge in tqdm(
edges.items(), desc="Calculating edge shifts", leave=False
):
if self.edge_correction_energies.get(metal) is not None:
continue
else:
edge_energy_table = xray_edge(metal, edge, energy_only=True)
if self.energy_unit == "keV":
edge_energy_table /= 1000 # Convert eV to keV
elif self.energy_unit != "eV":
raise ValueError("Invalid energy unit. Must be 'eV' or 'keV'.")
if metal in experiments:
experiment_filter = dataframe["Experiment"] == metal
elif dataframe["Experiment"].str.contains(metal).any():
continue
else:
continue
for measurement in range(
1, dataframe["Measurement"][experiment_filter].max() + 1
):
try:
measurement_filter = (experiment_filter) & (
dataframe["Measurement"] == measurement
)
edge_energy_measured = find_e0(
dataframe["Energy"][measurement_filter],
dataframe["mu"][measurement_filter],
)
self.edge_correction_energies[metal] = (
edge_energy_table - edge_energy_measured
)
break
except:
continue
if self.edge_correction_energies.get(metal) is None:
print(
f"Could not find edge energy for {metal} {edge}. Edge shift will not be applied."
)
# self.edge_correction_energies[metal] = 0.0
return None
def _energy_correction(self, standards: bool = False) -> None:
"""
Correct energy range and align measured energy points for each experiment.
Args:
standards (bool, optional): Whether to correct standards. Defaults to False.
Returns:
None: Function does not return anything.
"""
dataframe = self.standards if standards else self.data
experiments = self.standard_experiments if standards else self.experiments
for experiment in tqdm(experiments, desc="Energy correction", leave=False):
experiment_filter = dataframe["Experiment"] == experiment
n_measurements = dataframe["Measurement"][experiment_filter].max()
# Correct for small variations in measured energy points
energy = (
dataframe["Energy"][experiment_filter]
.to_numpy()
.reshape(n_measurements, -1)
)
energy_correction = energy.mean(axis=0)
# Correct for edge shift
energy_correction += self.edge_correction_energies.get(
dataframe["Metal"][experiment_filter].values[0], 0.0
)
# Estimate mu at corrected energy points using linear interpolation
for measurement in range(1, n_measurements + 1):
measurement_filter = (dataframe["Experiment"] == experiment) & (
dataframe["Measurement"] == measurement
)
i0_interpolated = np.interp(
energy_correction,
dataframe["Energy"][measurement_filter],
dataframe["I0"][measurement_filter],
)
i1_interpolated = np.interp(
energy_correction,
dataframe["Energy"][measurement_filter],
dataframe["I1"][measurement_filter],
)
mu_interpolated = np.interp(
energy_correction,
dataframe["Energy"][measurement_filter],
dataframe["mu"][measurement_filter],
)
# Apply correction
dataframe["Energy"][measurement_filter] = energy_correction
dataframe["I0"][measurement_filter] = i0_interpolated
dataframe["I1"][measurement_filter] = i1_interpolated
dataframe["mu"][measurement_filter] = mu_interpolated
if standards:
self.standards = dataframe
else:
self.data = dataframe
return None
def _average_data(
self,
measurements_to_average: Union[str, list[int], np.ndarray, range] = "all",
standards: bool = False,
) -> None:
"""
Average data points for each experiment.
Args:
measurements_to_average (Union[str, list[int], np.ndarray, range], optional): Measurements to average. Defaults to "all".
standards (bool, optional): Whether to average standards. Defaults to False.
Returns:
None: Function does not return anything.
"""
dataframe = self.standards if standards else self.data
experiments = self.standard_experiments if standards else self.experiments
avg_measurements = []
for experiment in tqdm(experiments, desc="Averaging data", leave=False):
first = True
experiment_filter = dataframe["Experiment"] == experiment
if measurements_to_average == "all":
measurements = dataframe["Measurement"][experiment_filter].unique()
else:
measurements = measurements_to_average
for measurement in measurements:
measurement_filter = (dataframe["Experiment"] == experiment) & (
dataframe["Measurement"] == measurement
)
if first:
df_avg = dataframe[measurement_filter].copy()
i0_avg = np.zeros_like(df_avg["I0"], dtype=np.float64)
i1_avg = np.zeros_like(df_avg["I1"], dtype=np.float64)
mu_avg = np.zeros_like(df_avg["mu"], dtype=np.float64)
first = False
i0_avg += dataframe["I0"][measurement_filter].to_numpy()
i1_avg += dataframe["I1"][measurement_filter].to_numpy()
mu_avg += dataframe["mu"][measurement_filter].to_numpy()
n_measurements = len(measurements)
df_avg["I0"] = i0_avg / n_measurements
df_avg["I1"] = i1_avg / n_measurements
df_avg["mu"] = mu_avg / n_measurements
df_avg["Temperature"] = dataframe["Temperature"][measurement_filter].mean()
df_avg["Temperature (std)"] = dataframe["Temperature"][
measurement_filter
].std()
avg_measurements.append(df_avg)
if standards:
self.standards = pd.concat(avg_measurements)
else:
self.data = pd.concat(avg_measurements)
return None
def _average_data_periodic(
self,
period: Union[None, int] = None,
n_periods: Union[None, int] = None,
standards: bool = False,
) -> None:
"""
Average data points for each experiment using periodic grouping of measurements.
Args:
period (Union[None, int], optional): Number of measurements to group for averaging. Will determine the number of periods automatically. Defaults to None.
n_periods (Union[None, int], optional): Number of periods to group for averaging. Will determine the number of measurements per period automatically. Defaults to None.
Raises:
Exception: Exactly 1 optional argument should be given.
Returns:
None: Function does not return anything.
"""
dataframe = self.standards if standards else self.data
experiments = self.standard_experiments if standards else self.experiments
avg_measurements = []
if (period and n_periods) or (not period and not n_periods):
n_arguments = bool(period) + bool(n_periods)
raise Exception(
f"Exactly 1 optional argument should be given. {n_arguments} was given."
)
for experiment in tqdm(experiments, desc="Averaging data", leave=False):
experiment_filter = dataframe["Experiment"] == experiment
n_total_measurements = np.amax(dataframe["Measurement"][experiment_filter])
if period:
n_measurements_to_average = period
new_n_measurements = int(np.ceil(n_total_measurements / period))
elif n_periods:
n_measurements_to_average = n_total_measurements // n_periods
new_n_measurements = n_periods
measurements_to_average = np.arange(n_measurements_to_average) + 1
measurements_to_average_temp = measurements_to_average.copy()
for measurement_number in range(new_n_measurements):
if measurements_to_average_temp.any() >= n_total_measurements:
measurements_to_average_temp = np.array(
[
i
for i in measurements_to_average_temp
if i < n_total_measurements
]
)
for measurement in measurements_to_average_temp:
measurement_filter = (dataframe["Experiment"] == experiment) & (
dataframe["Measurement"] == measurement
)
if measurement == measurements_to_average_temp[0]:
df_avg = dataframe[measurement_filter].copy()
energy_avg = np.zeros_like(df_avg["Energy"], dtype=np.float64)
i0_avg = np.zeros_like(df_avg["I0"], dtype=np.float64)
i1_avg = np.zeros_like(df_avg["I1"], dtype=np.float64)
mu_avg = np.zeros_like(df_avg["mu"], dtype=np.float64)
energy_avg += dataframe["Energy"][measurement_filter].to_numpy()
i0_avg += dataframe["I0"][measurement_filter].to_numpy()
i1_avg += dataframe["I1"][measurement_filter].to_numpy()
mu_avg += dataframe["mu"][measurement_filter].to_numpy()
n_measurements = len(measurements_to_average_temp)
df_avg["Energy"] = energy_avg / n_measurements
df_avg["I0"] = i0_avg / n_measurements
df_avg["I1"] = i1_avg / n_measurements
df_avg["mu"] = mu_avg / n_measurements
df_avg["Temperature"] = dataframe["Temperature"][
measurement_filter
].mean()
df_avg["Temperature (std)"] = dataframe["Temperature"][
measurement_filter
].std()
df_avg["Measurement"] = measurement_number + 1
measurements_to_average_temp += n_measurements_to_average
avg_measurements.append(df_avg)
if standards:
self.standards = pd.concat(avg_measurements)
else:
self.data = pd.concat(avg_measurements)
return None
def _normalize_data(self, standards: bool = False) -> None:
"""
Normalize data by subtracting the pre-edge fit and dividing by the post-edge fit.
Pre- and post-edge fits are done using the `larch.xafs.pre_edge` function.
Args:
standards (bool, optional): Whether to normalize standards. Defaults to False.
Raises:
ValueError: No data to normalize.
Returns:
None: Function does not return anything.
"""
if standards:
if self.standards is None:
raise ValueError("No standards to normalize")
dataframe = self.standards
else:
if self.data is None:
raise ValueError("No data to normalize")
dataframe = self.data
experiments = self.standard_experiments if standards else self.experiments
dataframe["mu_norm"] = 0
dataframe["pre_edge"] = 0
dataframe["post_edge"] = 0
for experiment in tqdm(experiments, desc="Normalizing data", leave=False):
experiment_filter = dataframe["Experiment"] == experiment
for measurement in dataframe["Measurement"][experiment_filter].unique():
measurement_filter = (dataframe["Experiment"] == experiment) & (
dataframe["Measurement"] == measurement
)
dummy_group = Group(name="dummy")
dataframe["mu_norm"][measurement_filter] = dataframe["mu"][
measurement_filter
] - np.amin(dataframe["mu"][measurement_filter])
try:
pre_edge(
dataframe["Energy"][measurement_filter],
dataframe["mu_norm"][measurement_filter],
group=dummy_group,
make_flat=False,
)
dataframe["pre_edge"][measurement_filter] = dummy_group.pre_edge
dataframe["post_edge"][measurement_filter] = dummy_group.post_edge
dataframe["mu_norm"][measurement_filter] -= dummy_group.pre_edge
pre_edge(
dataframe["Energy"][measurement_filter],
dataframe["mu_norm"][measurement_filter],
group=dummy_group,
make_flat=False,
)
dataframe["mu_norm"][measurement_filter] /= dummy_group.post_edge
except:
dataframe.drop(dataframe[measurement_filter].index, inplace=True)
print(
f"Error normalizing {experiment} measurement {measurement}. Measurement removed."
)
if standards:
self.standards = dataframe
else:
self.data = dataframe
return None
[docs]
def load_data(
self,
average: Union[bool, str] = False,
measurements_to_average: Union[str, list[int], np.ndarray, range] = "all",
n_periods: Union[None, int] = None,
period: Union[None, int] = None,
) -> None:
"""
Load data, apply corrections, and normalize data.
Args:
average (Union[bool, str], optional): Whether to average data in a standard or periodic manner. Defaults to False.
measurements_to_average (Union[str, list[int], np.ndarray, range], optional): Measurements to average. Defaults to "all".
n_periods (Union[None, int], optional): Number of periods to group measurements in for averaging. Will determine the number of measurements per period automatically. Defaults to None.
period (Union[None, int], optional): Number of measurements to group for averaging. Will determine the number of periods automatically. Defaults to None.
Raises:
ValueError: Invalid average. Must be "standard" or "periodic".
Returns:
None: Function does not return anything.
"""
self._read_data()
self.experiments = list(self.data["Experiment"].unique())
self.metals = list(self.data["Metal"].unique())
self.excluded_data = {experiment: [] for experiment in self.experiments}
if average:
if average.lower() == "standard":
self._average_data(measurements_to_average=measurements_to_average)
elif average.lower() == "periodic":
self._average_data_periodic(period=period, n_periods=n_periods)
else:
raise ValueError('Invalid average. Must be "standard" or "periodic".')
if self.fluorescence_correction and self.xas_mode == "Fluorescence":
self._fluorescence_correction()
if (self.edges is not None) or (self.edges != {}):
if self.standards_config is not None:
warnings.warn(
'Calculating the edge shift using the standards is recommended. Please load the standards using the "load_standards" method before calling "load_data".'
)
self._calculate_edge_shift(self.edges)
self._energy_correction()
self._normalize_data()
return None
[docs]
def load_standards(
self,
average: Union[bool, str] = False,
measurements_to_average: Union[str, list[int], np.ndarray, range] = "all",
n_periods: Union[None, int] = None,
period: Union[None, int] = None,
) -> None:
"""
Load standards, apply corrections, and normalize data.
Args:
average (Union[bool, str], optional): Whether to average data in a standard or periodic manner. Defaults to False.
measurements_to_average (Union[str, list[int], np.ndarray, range], optional): Measurements to average. Defaults to "all".
n_periods (Union[None, int], optional): Number of periods to group measurements in for averaging. Will determine the number of measurements per period automatically. Defaults to None.
period (Union[None, int], optional): Number of measurements to group for averaging. Will determine the number of periods automatically. Defaults to None.
Raises:
ValueError: Invalid average. Must be "standard" or "periodic".
Returns:
None: Function does not return anything.
"""
self._read_data(standards=True)
self.standard_experiments = list(self.standards["Experiment"].unique())
if average:
if average.lower() == "standard":
self._average_data(
measurements_to_average=measurements_to_average, standards=True
)
elif average.lower() == "periodic":
self._average_data_periodic(
period=period, n_periods=n_periods, standards=True
)
else:
raise ValueError('Invalid average. Must be "standard" or "periodic".')
if (
self.fluorescence_correction_standards
and self.xas_mode_standards == "Fluorescence"
):
self._fluorescence_correction(standards=True)
if (self.edges is not None) or (self.edges != {}):
self._calculate_edge_shift(self.edges, standards=True)
self._energy_correction(standards=True)
self._normalize_data(standards=True)
return None
[docs]
def exclude_data(
self,
measurements: Union[list[int], np.ndarray],
experiment: Union[str, int] = "all",
) -> None:
"""
Exclude data points from the DataFrame.
Args:
measurements (Union[list[int], np.ndarray]): Measurements to exclude.
experiment (Union[str, int], optional): Experiment to exclude data from. Defaults to "all".
Returns:
None: Function does not return anything.
"""
if experiment == "all":
experiments = self.experiments
elif isinstance(experiment, str):
if not experiment in self.experiments:
raise ValueError("Invalid experiment name")
experiments = [experiment]
elif isinstance(experiment, int):
experiments = [self.experiments[experiment]]
else:
raise ValueError("Invalid experiment name")
for experiment in experiments:
for measurement in measurements:
self.data = self.data[
~(
(self.data["Experiment"] == experiment)
& (self.data["Measurement"] == measurement)
)
]
# Log excluded data
self.excluded_data[experiment].append(measurement)
return None
[docs]
def convert_energy_units(
self,
energy_unit: str = "eV",
unit_conversion_factor: float = 1.0,
data_to_convert: str = "all",
) -> None:
"""
Convert energy units in the data.
Args:
energy_unit (str, optional): Desired energy unit. Defaults to "eV".
unit_conversion_factor (float, optional): Factor to convert energy units. Defaults to 1.0.
data_to_convert (str, optional): Specify which data to convert. Options are 'all', 'data', or 'standards'. Defaults to 'all'.
Returns:
None: Function does not return anything.
"""
if self.energy_unit == energy_unit:
return
if data_to_convert in ["all", "data"]:
self.data["Energy"] *= unit_conversion_factor
if self.standards is not None and data_to_convert in ["all", "standards"]:
self.standards["Energy"] *= unit_conversion_factor
elif self.standards is None and data_to_convert in ["all", "standards"]:
warnings.warn("No standards loaded. Standards data will not be converted.")
self.energy_unit = energy_unit
return None
[docs]
def homogenize_standards(self, renormalize: bool = True) -> None:
"""
Homogenize standards data to the data energy range.
Args:
renormalize (bool, optional): Whether to renormalize the standards data after homogenization. Defaults to True.
Returns:
None: Function does not return anything.
"""
if self.standards is None:
raise ValueError("No standards loaded")
self.standard_experiments = list(self.standards["Experiment"].unique())
# Create a new DataFrame to store homogenized standards
homogenized_standards = pd.DataFrame()
for experiment in tqdm(
self.standard_experiments, desc="Homogenizing standards", leave=False
):
experiment_filter = self.standards["Experiment"] == experiment
n_measurements = self.standards["Measurement"][experiment_filter].max()
metal = self.standards["Metal"][experiment_filter].values[0]
# Get the energy range of the data
data_energy_range = self.data[self.data["Metal"] == metal][
"Energy"
].unique()
# Interpolate standards data to the data energy range
for measurement in range(1, n_measurements + 1):
measurement_filter = (experiment_filter) & (
self.standards["Measurement"] == measurement
)
if renormalize:
# Interpolate unnormalized mu to the data energy range
mu_interpolated = np.interp(
data_energy_range,
self.standards["Energy"][measurement_filter],
self.standards["mu"][measurement_filter],
)
# Append the homogenized data to the new DataFrame
homogenized_standards_i = pd.DataFrame(
{
"File": pd.Series(
[self.standards["File"][measurement_filter].values[0]]
* len(data_energy_range)
),
"Experiment": pd.Series(
[experiment] * len(data_energy_range)
),
"Metal": pd.Series([metal] * len(data_energy_range)),
"Energy": pd.Series(data_energy_range),
"Temperature": pd.Series(
[
self.standards["Temperature"][
measurement_filter
].values[0]
]
* len(data_energy_range)
),
"Temperature (std)": pd.Series(
[
self.standards["Temperature (std)"][
measurement_filter
].values[0]
]
* len(data_energy_range)
),
"mu": pd.Series(mu_interpolated),
"Measurement": pd.Series(
[measurement] * len(data_energy_range)
),
}
)
else:
# Interpolate normalized mu to the data energy range
mu_interpolated = np.interp(
data_energy_range,
self.standards["Energy"][measurement_filter],
self.standards["mu"][measurement_filter],
)
mu_norm_interpolated = np.interp(
data_energy_range,
self.standards["Energy"][measurement_filter],
self.standards["mu_norm"][measurement_filter],
)
# Append the homogenized data to the new DataFrame
homogenized_standards_i = pd.DataFrame(
{
"File": pd.Series(
[self.standards["File"][measurement_filter].values[0]]
* len(data_energy_range)
),
"Experiment": pd.Series(
[experiment] * len(data_energy_range)
),
"Metal": pd.Series([metal] * len(data_energy_range)),
"Energy": pd.Series(data_energy_range),
"Temperature": pd.Series(
[
self.standards["Temperature"][
measurement_filter
].values[0]
]
* len(data_energy_range)
),
"Temperature (std)": pd.Series(
[
self.standards["Temperature (std)"][
measurement_filter
].values[0]
]
* len(data_energy_range)
),
"mu": pd.Series(mu_interpolated),
"Measurement": pd.Series(
[measurement] * len(data_energy_range)
),
"mu_norm": pd.Series(mu_norm_interpolated),
}
)
# Concatenate the homogenized standards data
homogenized_standards = pd.concat(
[homogenized_standards, homogenized_standards_i],
ignore_index=True,
)
# Update the standards DataFrame with the homogenized data
self.standards = homogenized_standards
# Normalize the homogenized standards data if required
if renormalize:
self._normalize_data(standards=True)
return None
def _linear_combination(
self, weights: Parameters, components: list[np.array]
) -> np.array:
"""
Calculate the linear combination of components using the given weights.
Args:
weights (Parameters): Weights for each component.
components (list[np.array]): List of components to combine.
Returns:
np.array: Linear combination of components.
"""
weights = np.array(list(weights.valuesdict().values()))
components = np.array(components)
return np.dot(weights, components)
def _residual(self, target: np.array, combination: np.array) -> np.array:
"""
Calculate the residual between the target and the linear combination.
Args:
target (np.array): Target data.
combination (np.array): Linear combination of components.
Returns:
np.array: Residual between target and linear combination.
"""
return target - combination
def _fit_function(
self, weights: Parameters, components: list[np.array], target: np.array
) -> np.array:
"""
Fit function for the linear combination of components.
Args:
weights (Parameters): Weights for each component.
components (list[np.array]): List of components to combine.
target (np.array): Target data.
Returns:
np.array: Residual between target and linear combination.
"""
combination = self._linear_combination(weights, components)
return self._residual(target, combination)
# Analysis functions
[docs]
def LCA(
self,
use_standards: bool = False,
components: Union[list[int], list[list[int]], None] = [0, -1],
fit_range: Union[None, tuple[float, float], list[tuple[float, float]]] = None,
standards_to_use: Union[
str, list[int], list[list[int]], list[list[str]]
] = "all",
) -> None:
"""
Perform Linear Combination Analysis (LCA) on the data.
Args:
use_standards (bool, optional): Whether to use standards for LCA. Defaults to False.
components (Union[list[int], list[str], None], optional): Components to use for LCA. Defaults to [0, -1].
fit_range (Union[None, tuple[float, float], list[tuple[float, float]]], optional): Energy range to use for fitting. Defaults to None.
standards_to_use (Union[str, list[int]], optional): Standards to use for LCA. Defaults to "all".
Raises:
ValueError: No standards loaded.
ValueError: Number of fit ranges must match number of metals.
ValueError: At least 2 components are required to perform LCA.
Returns:
None: Function does not return anything.
"""
if use_standards:
if self.standards is None:
raise ValueError("No standards loaded")
if isinstance(fit_range, list):
if len(fit_range) != len(self.metals):
raise ValueError("Number of fit ranges must match number of metals")
elif isinstance(fit_range, tuple):
fit_range = [fit_range] * len(self.metals)
else:
fit_range = [(0, np.inf)] * len(self.metals)
# Initialize list to store results
list_experiment = []
list_metal = []
list_measurement = []
list_temperature = []
list_fit_range = []
list_energy = []
list_component_name = []
list_component = []
list_parameter_name = []
list_parameter_value = []
list_parameter_error = []
for i, metal in enumerate(
tqdm(self.metals, desc="Performing LCA", leave=False)
):
fit_range_filter = (self.data["Energy"] >= fit_range[i][0]) & (
self.data["Energy"] <= fit_range[i][1]
)
if use_standards:
fit_range_filter_standards = (
self.standards["Energy"] >= fit_range[i][0]
) & (self.standards["Energy"] <= fit_range[i][1])
component_measurements = []
component_names = []
for standard_experiment in self.standards["Experiment"][
self.standards["Metal"] == metal
].unique():
measurements = self.standards["Measurement"][
self.standards["Experiment"] == standard_experiment
].unique()
for measurement in measurements:
component_measurements.append(
(standard_experiment, measurement)
)
if len(measurements) == 1:
component_names.append(f"{standard_experiment}")
else:
component_names.append(
f"{standard_experiment} ({measurement})"
)
if isinstance(standards_to_use, list):
if isinstance(standards_to_use[0], list):
if len(standards_to_use) != len(self.experiments):
raise ValueError(
"Length of list must match number of experiments."
)
if isinstance(standards_to_use[i][0], int):
for exp_standards_to_use in standards_to_use:
component_names = [
component_names[i] for i in exp_standards_to_use
]
component_measurements = [
component_measurements[i]
for i in exp_standards_to_use
]
elif isinstance(standards_to_use[i][0], str):
exp_standards_to_use = standards_to_use[i]
component_measurements = [
component_measurements[
component_names.index(standard_name)
]
for standard_name in exp_standards_to_use
]
component_names = exp_standards_to_use
elif isinstance(standards_to_use[0], int):
component_names = [component_names[i] for i in standards_to_use]
component_measurements = [
component_measurements[i] for i in standards_to_use
]
else:
raise ValueError(
"Standards to use must be 'all' or a list of integers or strings."
)
elif standards_to_use != "all":
raise ValueError(
"Standards to use must be 'all' or a list of integers or strings."
)
n_components = len(component_measurements)
if n_components < 2:
raise ValueError(
"At least 2 components are required to perform LCA."
)
components_mu = []
for standard_name, measurement in component_measurements:
measurement_filter = (
(self.standards["Experiment"] == standard_name)
& (self.standards["Measurement"] == measurement)
& fit_range_filter_standards
)
components_mu.append(
self.standards["mu_norm"][measurement_filter].to_numpy()
)
else:
if isinstance(components[0], int):
component_measurements = self.data["Measurement"][
self.data["Metal"] == metal
].unique()[components]
component_names = [
f"Measurement {meas_num}" for meas_num in component_measurements
]
elif isinstance(components[0], list):
if len(components) != len(self.experiments):
raise ValueError(
"Length of list must match number of experiments."
)
component_measurements = self.data["Measurement"][
self.data["Metal"] == metal
].unique()[components[i]]
component_names = [
f"Measurement {meas_num}" for meas_num in component_measurements
]
else:
raise ValueError(
"Components must be a list of integers or a list of lists of integers."
)
n_components = len(component_measurements)
if n_components < 2:
raise ValueError(
"At least 2 components are required to perform LCA."
)
components_mu = []
for measurement in component_measurements:
measurement_filter = (
(self.data["Metal"] == metal)
& (self.data["Measurement"] == measurement)
& fit_range_filter
)
components_mu.append(
self.data["mu_norm"][measurement_filter].to_numpy()
)
for measurement in self.data["Measurement"][
self.data["Metal"] == metal
].unique():
measurement_filter = (
(self.data["Metal"] == metal)
& (self.data["Measurement"] == measurement)
& fit_range_filter
)
target = self.data["mu_norm"][measurement_filter].to_numpy()
weights = Parameters()
for j in range(1, n_components):
weights.add(f"w{j}", value=1 / n_components, min=0, max=1)
weights.add(
f"w{n_components}",
min=0,
max=1,
expr="1 - " + " - ".join([f"w{j}" for j in range(1, n_components)]),
)
fit_output = minimize(
self._fit_function, weights, args=(components_mu, target)
)
# Store data used for LCA
list_experiment.append(
self.data["Experiment"][measurement_filter].values[0]
)
list_metal.append(metal)
list_measurement.append(measurement)
list_temperature.append(
self.data["Temperature"][measurement_filter].values[0]
)
list_fit_range.append(fit_range[i])
list_energy.append(self.data["Energy"][measurement_filter].to_numpy())
list_component_name.append(f"(Ref) Measurement {measurement}")
list_component.append(target)
list_parameter_name.append(None)
list_parameter_value.append(None)
list_parameter_error.append(None)
# Store LCA results
for j, (name, param) in enumerate(fit_output.params.items()):
list_experiment.append(
self.data["Experiment"][measurement_filter].values[0]
)
list_metal.append(metal)
list_measurement.append(measurement)
list_temperature.append(
self.data["Temperature"][measurement_filter].values[0]
)
list_fit_range.append(fit_range[i])
list_energy.append(
self.data["Energy"][measurement_filter].to_numpy()
)
list_component_name.append(component_names[j])
list_component.append(components_mu[j])
list_parameter_name.append(name)
list_parameter_value.append(param.value)
if param.stderr is not None:
list_parameter_error.append(param.stderr)
else:
list_parameter_error.append(1e-6)
self.LCA_result = pd.DataFrame(
{
"Experiment": list_experiment,
"Metal": list_metal,
"Measurement": list_measurement,
"Temperature": list_temperature,
"Fit Range": list_fit_range,
"Energy": list_energy,
"Component Name": list_component_name,
"Component": list_component,
"Parameter Name": list_parameter_name,
"Parameter Value": list_parameter_value,
"Parameter Error": list_parameter_error,
}
)
return None
[docs]
def PCA(
self,
n_components: Union[None, str, float, int, list] = None,
fit_range: Union[None, tuple[float, float], list[tuple[float, float]]] = None,
max_components: int = 10,
seed: Union[None, int] = None,
) -> None:
"""
Perform Principal Component Analysis (PCA) on the data.
Args:
n_components (Union[None, str, float, int, list], optional): Number of components to keep or fraction of variance to be explained by components. Defaults to None.
fit_range (Union[None, tuple[float, float], list[tuple[float, float]]], optional): Energy range to use for fitting. Defaults to None.
max_components (int, optional): Maximum number of components to use for automatic PCA component determination. Defaults to 10.
seed (Union[None, int], optional): Random seed for reproducibility. Defaults to None.
Raises:
ValueError: Length of list must match number of experiments.
Returns:
None: Function does not return anything.
"""
if isinstance(n_components, list):
if len(n_components) != len(self.experiments):
raise ValueError("Length of list must match number of experiments")
else:
n_components = [n_components] * len(self.experiments)
if isinstance(fit_range, list):
if len(fit_range) != len(self.metals):
raise ValueError("Number of fit ranges must match number of metals")
elif isinstance(fit_range, tuple):
fit_range = [fit_range] * len(self.metals)
else:
fit_range = [(0, np.inf)] * len(self.metals)
experiment_list = []
metal_list = []
measurement_list = []
temperature_list = []
fit_range_list = []
n_components_list = []
pca_mean_list = []
explained_variance_list = []
explained_variance_ratio_list = []
cumulative_explained_variance_list = []
energy_list = []
component_list = []
component_names_list = []
component_number_list = []
weights_list = []
for i, (experiment, n_components) in enumerate(
zip(self.experiments, n_components)
):
fit_range_filter = (self.data["Energy"] >= fit_range[i][0]) & (
self.data["Energy"] <= fit_range[i][1]
)
measurements = self.data["Measurement"][
self.data["Experiment"] == experiment
].unique()
temperatures = np.array(
[
self.data["Temperature"][
(self.data["Experiment"] == experiment)
& (self.data["Measurement"] == measurement)
].unique()[0]
for measurement in measurements
]
)
data = (
self.data["mu_norm"][
(self.data["Experiment"] == experiment) & fit_range_filter
]
.to_numpy()
.reshape(len(measurements), -1)
)
pca = PCA(n_components=n_components, random_state=seed)
pca.fit(data)
if pca.n_components_ > max_components:
pca = PCA(n_components=max_components, random_state=seed)
pca.fit(data)
pca_weights = pca.transform(data)
# Store PCA results
for j, component in enumerate(pca.components_):
for measurement, temperature in zip(measurements, temperatures):
experiment_list.append(experiment)
metal_list.append(
self.data["Metal"][
self.data["Experiment"] == experiment
].values[0]
)
measurement_list.append(measurement)
temperature_list.append(temperature)
fit_range_list.append(fit_range[i])
n_components_list.append(pca.n_components_)
pca_mean_list.append(pca.mean_)
explained_variance_list.append(pca.explained_variance_[j])
explained_variance_ratio_list.append(
pca.explained_variance_ratio_[j]
)
cumulative_explained_variance_list.append(
pca.explained_variance_ratio_[: j + 1].sum()
)
energy_list.append(
self.data["Energy"][
(self.data["Experiment"] == experiment) & fit_range_filter
].to_numpy()
)
component_list.append(component)
component_names_list.append(f"PC {j+1}")
component_number_list.append(j + 1)
weights_list.append(
pca_weights[np.where(measurements == measurement)[0][0], j]
)
self.PCA_result = pd.DataFrame(
{
"Experiment": experiment_list,
"Metal": metal_list,
"Measurement": measurement_list,
"Temperature": temperature_list,
"Fit Range": fit_range_list,
"n_components": n_components_list,
"PCA Mean": pca_mean_list,
"Explained Variance": explained_variance_list,
"Explained Variance Ratio": explained_variance_ratio_list,
"Cumulative Explained Variance": cumulative_explained_variance_list,
"Energy": energy_list,
"Component": component_list,
"Component Name": component_names_list,
"Component Number": component_number_list,
"Weight": weights_list,
}
)
return None
[docs]
def NMF(
self,
n_components: Union[None, str, float, int, list] = None,
change_cutoff: float = 0.25,
fit_range: Union[None, tuple[float, float], list[tuple[float, float]]] = None,
max_components: int = 10,
seed: Union[None, int] = None,
) -> None:
"""
Perform Non-negative Matrix Factorization (NMF) on the data.
Args:
n_components (Union[None, str, float, int, list], optional): Number of components to use. Defaults to None.
change_cutoff (float, optional): Minimum change in reconstruction error to determine number of components. Defaults to 0.25.
fit_range (Union[None, tuple[float, float], list[tuple[float, float]]], optional): Energy range to use for fitting. Defaults to None.
max_components (int, optional): Maximum number of components to use for automatic NMF component determination. Defaults to 10.
seed (Union[None, int], optional): Random seed for reproducibility. Defaults to None.
Raises:
ValueError: Length of list must match number of experiments.
Returns:
None: Function does not return anything.
"""
if isinstance(n_components, list):
if len(n_components) != len(self.experiments):
raise ValueError("Length of list must match number of experiments")
elif n_components is None:
n_components = self._determine_NMF_components(
change_cutoff=change_cutoff,
fit_range=fit_range,
max_components=max_components,
)
else:
n_components = [n_components] * len(self.experiments)
if isinstance(fit_range, list):
if len(fit_range) != len(self.metals):
raise ValueError("Number of fit ranges must match number of metals")
elif isinstance(fit_range, tuple):
fit_range = [fit_range] * len(self.metals)
else:
fit_range = [(0, np.inf)] * len(self.metals)
experiment_list = []
metal_list = []
measurement_list = []
temperature_list = []
fit_range_list = []
n_components_list = []
energy_list = []
component_list = []
component_names_list = []
component_number_list = []
weights_list = []
for i, (experiment, n_components) in enumerate(
zip(self.experiments, n_components)
):
fit_range_filter = (self.data["Energy"] >= fit_range[i][0]) & (
self.data["Energy"] <= fit_range[i][1]
)
measurements = self.data["Measurement"][
self.data["Experiment"] == experiment
].unique()
temperatures = np.array(
[
self.data["Temperature"][
(self.data["Experiment"] == experiment)
& (self.data["Measurement"] == measurement)
].unique()[0]
for measurement in measurements
]
)
data = (
self.data["mu_norm"][
(self.data["Experiment"] == experiment) & fit_range_filter
]
.to_numpy()
.reshape(len(measurements), -1)
)
# Remove negative values
data = np.clip(data, a_min=0, a_max=None)
nmf = NMF(n_components=n_components, random_state=seed, init="nndsvda")
nmf.fit(data)
nmf_weights = nmf.transform(data)
# Store NMF results
for j, component in enumerate(nmf.components_):
for measurement, temperature in zip(measurements, temperatures):
experiment_list.append(experiment)
metal_list.append(
self.data["Metal"][
self.data["Experiment"] == experiment
].values[0]
)
measurement_list.append(measurement)
temperature_list.append(temperature)
fit_range_list.append(fit_range[i])
n_components_list.append(nmf.n_components_)
energy_list.append(
self.data["Energy"][
(self.data["Experiment"] == experiment) & fit_range_filter
].to_numpy()
)
component_list.append(component)
component_names_list.append(f"Component {j+1}")
component_number_list.append(j + 1)
weights_list.append(
nmf_weights[np.where(measurements == measurement)[0][0], j]
)
self.NMF_result = pd.DataFrame(
{
"Experiment": experiment_list,
"Metal": metal_list,
"Measurement": measurement_list,
"Temperature": temperature_list,
"Fit Range": fit_range_list,
"n_components": n_components_list,
"Energy": energy_list,
"Component": component_list,
"Component Name": component_names_list,
"Component Number": component_number_list,
"Weight": weights_list,
}
)
return None
def _determine_NMF_components(
self,
change_cutoff: int,
fit_range: Union[None, tuple[float, float], list[tuple[float, float]]] = None,
max_components: int = 10,
) -> list[int]:
"""
Determine the number of components to use for Non-negative Matrix Factorization (NMF).
Args:
change_cutoff (int): Minimum change in reconstruction error to determine number of components.
fit_range (Union[None, tuple[float, float], list[tuple[float, float]]], optional): Energy range to use for fitting. Defaults to None.
max_components (int, optional): Maximum number of components to use for automatic NMF component determination. Defaults to 10.
Returns:
list[int]: Number of components to use for NMF.
"""
if isinstance(fit_range, list):
if len(fit_range) != len(self.metals):
raise ValueError("Number of fit ranges must match number of metals")
elif isinstance(fit_range, tuple):
fit_range = [fit_range] * len(self.metals)
else:
fit_range = [(0, np.inf)] * len(self.metals)
experiment_list = []
metal_list = []
n_components_list = []
reconstruction_error_list = []
for i, experiment in enumerate(self.experiments):
fit_range_filter = (self.data["Energy"] >= fit_range[i][0]) & (
self.data["Energy"] <= fit_range[i][1]
)
measurements = self.data["Measurement"][
self.data["Experiment"] == experiment
].unique()
data = np.clip(
self.data["mu_norm"][
(self.data["Experiment"] == experiment) & fit_range_filter
]
.to_numpy()
.reshape(len(measurements), -1),
a_min=0,
a_max=None,
)
for n_components in range(min(max_components, len(measurements))):
nmf = NMF(n_components=n_components + 1)
nmf.fit(data)
# Log reconstruction error
experiment_list.append(experiment)
metal_list.append(
self.data["Metal"][self.data["Experiment"] == experiment].values[0]
)
n_components_list.append(n_components + 1)
reconstruction_error_list.append(nmf.reconstruction_err_)
self.NMF_component_results = pd.DataFrame(
{
"Experiment": experiment_list,
"Metal": metal_list,
"n_components": n_components_list,
"Reconstruction Error": reconstruction_error_list,
}
)
nmf_k = []
error_change_list = []
for experiment in self.experiments:
_nmf = self.NMF_component_results[
self.NMF_component_results["Experiment"] == experiment
]
error_change = np.absolute(
np.gradient(_nmf["Reconstruction Error"], _nmf["n_components"])
)
error_change_list.extend(error_change)
k = _nmf["n_components"][error_change > np.abs(change_cutoff)].max()
nmf_k.append(k)
self.NMF_component_results["Error Change"] = error_change_list
return nmf_k
[docs]
def MCR_ALS(self):
"""
Perform Multivariate Curve Resolution - Alternating Least Squares (MCR-ALS) on the data.
This function is a placeholder and does not implement MCR-ALS yet.
Returns:
None: Function does not return anything.
"""
raise NotImplementedError("MCR-ALS is not implemented yet")
# Data export functions
[docs]
def to_csv(
self,
data: str = "data",
filename: Union[None, str] = None,
directory: Union[None, str] = None,
columns: Union[str, list[str]] = None,
explode_columns: bool = True,
):
"""
Export data to a CSV file.
Args:
filename (str): Name of the file to save.
directory (Union[None, str], optional): Directory to save the file. Defaults to None.
columns (Union[None, list[str]], optional): Columns to export. Defaults to None.
Returns:
None: Function does not return anything.
"""
if data not in ["data", "standards", "LCA", "PCA", "NMF"]:
raise ValueError("Invalid data type")
if directory is None:
directory = self.save_directory
if data == "data":
dataframe = self.data
elif data == "standards":
dataframe = self.standards
elif data == "LCA":
dataframe = self.LCA_result
if explode_columns:
# Explode data in list columns
dataframe = dataframe.explode(["Energy", "Component"])
elif data == "PCA":
dataframe = self.PCA_result
if explode_columns:
# Explode data in list columns
dataframe = dataframe.explode(["PCA Mean", "Energy", "Component"])
elif data == "NMF":
dataframe = self.NMF_result
if explode_columns:
# Explode data in list columns
dataframe = dataframe.explode(["Energy", "Component"])
dataframe.to_csv(directory + filename, index=False, columns=columns)
return None
[docs]
def to_athena(
self,
filename: str,
directory: str = "./",
columns: Union[None, list[str]] = None,
):
# with open(directory + filename + '.nor', 'w') as file:
# file.write('# Exported from autoXAS')
raise NotImplementedError("Athena export not implemented yet")
return None
# Plotting functions
[docs]
def plot_data(
self,
experiment: Union[str, int] = 0,
standards: Union[None, str, int] = None,
save: bool = False,
filename: str = "data",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the normalized data for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
standards (Union[None, str, int], optional): Standards to plot. Defaults to None.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "data".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Standards not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
if isinstance(standards, int):
standards = [self.standard_experiments[standards]]
elif isinstance(standards, str):
if len(standards) == 1 or len(standards) == 2:
standards = self.standards["Experiment"][
self.standards["Metal"] == standards
].unique()
elif len(standards) > 2:
if standards not in self.standard_experiments:
raise ValueError("Invalid standard name")
else:
standards = [standards]
n_measurements = int(
self.data["Measurement"][self.data["Experiment"] == experiment].max()
)
experiment_filter = self.data["Experiment"] == experiment
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment
fig = px.line(
data_frame=self.data[experiment_filter],
x="Energy",
y="mu_norm",
color="Measurement",
color_discrete_sequence=px.colors.sample_colorscale(
"viridis", samplepoints=n_measurements
),
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
# visible="legendonly", # Used only for paper figure
)
if standards is not None:
i_standard = 0
for standard in standards:
measurements = self.standards["Measurement"][
self.standards["Experiment"] == standard
].unique()
for measurement in measurements:
standard_filter = (self.standards["Experiment"] == standard) & (
self.standards["Measurement"] == measurement
)
if len(measurements) == 1:
line_name = f"{standard}"
else:
line_name = f"{standard} ({measurement})"
fig.add_scatter(
x=self.standards["Energy"][standard_filter],
y=self.standards["mu_norm"][standard_filter],
mode="lines",
name=line_name,
line=dict(
width=3,
dash="dash",
color=px.colors.qualitative.Safe[i_standard],
),
)
i_standard += 1
# Place standard traces in front of data traces
fig.data = fig.data[-i_standard:] + fig.data[:-i_standard]
# Specify title text
if show_title:
title_text = f"<b>Normalized data<br><sup><i>{experiment}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title="<b>Normalized μ(E) [a.u.]</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# fig.for_each_trace(lambda trace: trace.update(visible='legendonly') if trace.name in ["Pt (2)", "Pt (3)", "Pt (4)", "K2PtCl4", "Pt foil"] else (trace.update(visible=True)),) # # Used only for paper figure
# # Set limits of x-axis to match the edge measurements
fig.update_xaxes(
range=(
np.amin(self.data[experiment_filter]["Energy"]),
np.amax(self.data[experiment_filter]["Energy"]),
),
autorange=False,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
# Create figure object and set the figure size
plt.figure(figsize=(10, 8))
# Plot all measurements of specified experiment
sns.lineplot(
data=self.data[experiment_filter],
x="Energy",
y="mu_norm",
hue="Measurement",
palette="viridis",
)
# Set limits of x-axis to match the edge measurements
plt.xlim(
(
np.amin(self.data["Energy"][experiment_filter]),
np.amax(self.data["Energy"][experiment_filter]),
)
)
# Specify text and formatting of axis labels
plt.xlabel(f"Energy [{self.energy_unit}]", fontsize=14, fontweight="bold")
plt.ylabel("Normalized", fontsize=14, fontweight="bold")
# Specify placement, formatting and title of the legend
plt.legend(
loc="center left",
bbox_to_anchor=(1, 0.5),
title="Measurement",
fontsize=12,
title_fontsize=13,
ncol=1,
)
# Enforce matplotlibs tight layout
plt.tight_layout()
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
plt.savefig(filepath)
if show:
plt.show()
return None
[docs]
def plot_temperature_curves(
self,
save: bool = False,
filename: str = "temperature_curve",
format: str = ".png",
show: bool = True,
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the temperature curves for all experiments.
Args:
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "temperature_curve".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = "%{y:.1f} +/- %{customdata[0]:.1f} " + self.temperature_unit
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = line(
data_frame=self.data,
x="Measurement",
y="Temperature",
error_y="Temperature (std)",
error_y_mode="band",
custom_data=["Temperature (std)"],
color="Experiment",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Temperature curves</b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title="<b>Measurement</b>",
yaxis_title=f"<b>Temperature [{self.temperature_unit}]</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_waterfall(
self,
experiment: Union[str, int] = 0,
y_axis: str = "Measurement",
vmin: Union[None, float] = None,
vmax: Union[None, float] = None,
save: bool = False,
filename: str = "waterfall",
format: str = ".png",
show: bool = True,
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot a waterfall plot for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
y_axis (str, optional): Column to use as Y-axis in the plot. Defaults to "Measurement".
vmin (Union[None, float], optional): Minimum value for the color scale. Defaults to None.
vmax (Union[None, float], optional): Maximum value for the color scale. Defaults to None.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "waterfall".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
experiment_filter = self.data["Experiment"] == experiment
heatmap_data = self.data[experiment_filter].pivot(
index=y_axis, columns="Energy", values="mu_norm"
)
if y_axis == "Temperature":
y_unit = self.temperature_unit
y_axis_unit = f"[{self.temperature_unit}]"
else:
y_unit = ""
y_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".2f"
# Plot the measurements of the selected experiment
fig = px.imshow(
img=heatmap_data,
zmin=vmin,
zmax=vmax,
origin="lower",
color_continuous_scale="viridis",
aspect="auto",
labels=dict(color="<b>Normalized μ(E) [a.u.]</b>"),
)
# Specify title text
if show_title:
title_text = (
f"<b><i>In situ</i> overview<br><sup><i>{experiment}</i></sup></b>"
)
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title=f"<b>{y_axis} {y_axis_unit}</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode="closest",
coloraxis=dict(
colorbar=dict(
title=dict(
side="right",
)
),
),
)
hovertemplate = f"{y_axis}: %{{y}} {y_unit}<br>Energy: %{{x:{x_formatting}}} {self.energy_unit}<br>Normalized: %{{z:.2f}}<extra></extra>"
fig.update_traces(hovertemplate=hovertemplate)
# Add and format spikes
fig.update_xaxes(showspikes=True, spikecolor="red", spikethickness=-2)
fig.update_yaxes(showspikes=True, spikecolor="red", spikethickness=-2)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_change(
self,
experiment: Union[str, int] = 0,
reference_measurement: int = 1,
y_axis: str = "Measurement",
vmin: Union[None, float] = None,
vmax: Union[None, float] = None,
save: bool = False,
filename: str = "change",
format: str = ".png",
show: bool = True,
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the change in normalized data for a given experiment
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
reference_measurement (int, optional): Measurement to use as reference. Defaults to 1.
y_axis (str, optional): Column to use as Y-axis in the plot. Defaults to "Measurement".
vmin (Union[None, float], optional): Minimum value for the color scale. Defaults to None.
vmax (Union[None, float], optional): Maximum value for the color scale. Defaults to None.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "change".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
experiment_filter = self.data["Experiment"] == experiment
experiment_data = self.data[experiment_filter]
reference = experiment_data["mu_norm"][
(self.data["Measurement"] == reference_measurement)
].to_numpy()
difference_from_reference = (
experiment_data["mu_norm"].to_numpy().reshape((-1, reference.shape[0]))
- reference
)
experiment_data["Difference from reference"] = (
difference_from_reference.reshape((-1))
)
heatmap_data = experiment_data.pivot(
index=y_axis, columns="Energy", values="Difference from reference"
)
if y_axis == "Temperature":
y_unit = self.temperature_unit
y_axis_unit = f"[{self.temperature_unit}]"
else:
y_unit = ""
y_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".2f"
# Plot the measurements of the selected experiment
fig = px.imshow(
img=heatmap_data,
zmin=vmin,
zmax=vmax,
origin="lower",
color_continuous_scale="RdBu_r",
color_continuous_midpoint=0.0,
aspect="auto",
labels=dict(color="<b>\u0394 Normalized μ(E) [a.u.]</b>"),
)
fig.add_hline(
y=experiment_data[y_axis].unique()[reference_measurement - 1],
line_color="black",
line_width=2,
annotation_text=f"<b>Reference (Measurement {reference_measurement})</b>",
annotation_position="top left",
annotation_font=dict(
color="black",
size=font_size * 0.9,
),
)
# Specify title text
if show_title:
title_text = (
f"<b><i>In situ</i> changes<br><sup><i>{experiment}</i></sup></b>"
)
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title=f"<b>{y_axis} {y_axis_unit}</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode="closest",
coloraxis=dict(
colorbar=dict(
title=dict(
side="right",
)
),
),
)
hovertemplate = f"{y_axis}: %{{y}} {y_unit}<br>Energy: %{{x:{x_formatting}}} {self.energy_unit}<br>\u0394 Normalized: %{{z:.2f}}<extra></extra>"
fig.update_traces(hovertemplate=hovertemplate)
# Add and format spikes
fig.update_xaxes(showspikes=True, spikecolor="red", spikethickness=-2)
fig.update_yaxes(showspikes=True, spikecolor="red", spikethickness=-2)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_LCA(
self,
experiment: Union[str, int] = 0,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "LCA",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the results of Linear Combination Analysis (LCA) for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "LCA".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
experiment_filter = self.LCA_result["Experiment"] == experiment
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = (
f"%{{y:{hover_format}}} +/- %{{customdata[0]:{hover_format}}}"
)
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
df_data = self.LCA_result[experiment_filter]
fig = line(
data_frame=df_data,
x=x_axis,
y="Parameter Value",
error_y="Parameter Error",
error_y_mode="band",
custom_data=["Parameter Error"],
color="Parameter Name",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
labels={"Parameter Name": "Component"},
)
for trace in fig["data"]:
if trace["name"] != None:
trace["name"] = f"Component {trace['name'][1:]}"
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Linear Combination Analysis<br><sup><i>{experiment}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
yaxis_range=[-0.5, 1.5],
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_LCA_frame(
self,
experiment: Union[str, int] = 0,
measurement: int = 1,
save: bool = False,
filename: str = "LCA_frame",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot a single frame of the Linear Combination Analysis (LCA) for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
measurement (int): Measurement to plot. Defaults to 1.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "LCA_frame".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
data_filter = (self.LCA_result["Experiment"] == experiment) & (
self.LCA_result["Measurement"] == measurement
)
reference_data_filter = (self.data["Experiment"] == experiment) & (
self.data["Measurement"] == measurement
)
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot reference measurement
reference_data = self.data["mu_norm"][reference_data_filter].to_numpy()
reference = self.LCA_result["Component"][
(
self.LCA_result["Component Name"]
== f"(Ref) Measurement {measurement}"
)
& data_filter
].values[0]
fig = go.Figure(
go.Scatter(
x=self.data["Energy"][reference_data_filter].to_numpy(),
y=reference_data,
name="Data",
mode="lines",
line=dict(
color="black",
),
)
)
linear_combination = np.zeros_like(reference)
# Plot components
for i, component in enumerate(
self.LCA_result["Component Name"][data_filter].unique()
):
if "(Ref)" in component:
continue
fig.add_trace(
go.Scatter(
x=self.LCA_result["Energy"][
(self.LCA_result["Component Name"] == component)
& data_filter
].values[0],
y=self.LCA_result["Component"][
(self.LCA_result["Component Name"] == component)
& data_filter
].values[0]
* self.LCA_result["Parameter Value"][
(self.LCA_result["Component Name"] == component)
& data_filter
].values[0],
name=f"Component {self.LCA_result['Parameter Name'][(self.LCA_result['Component Name'] == component) & data_filter].values[0][1:]}",
mode="lines",
line=dict(
color=sns.color_palette("colorblind").as_hex()[i],
),
)
)
linear_combination += (
self.LCA_result["Component"][
(self.LCA_result["Component Name"] == component) & data_filter
].values[0]
* self.LCA_result["Parameter Value"][
(self.LCA_result["Component Name"] == component) & data_filter
].values[0]
)
# Plot linear combination
fig.add_trace(
go.Scatter(
x=self.LCA_result["Energy"][
(
self.LCA_result["Component Name"]
== f"(Ref) Measurement {measurement}"
)
& data_filter
].values[0],
y=linear_combination,
name="Linear combination",
mode="lines",
line=dict(
color="magenta",
),
)
)
# Plot residuals
fig.add_trace(
go.Scatter(
x=self.LCA_result["Energy"][
(
self.LCA_result["Component Name"]
== f"(Ref) Measurement {measurement}"
)
& data_filter
].values[0],
y=reference - linear_combination,
name="Residual",
mode="lines",
line=dict(
color="red",
),
)
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Linear Combination Analysis<br><sup><i>{experiment} - Measurement {measurement}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title="<b>Normalized μ(E) [a.u.]</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_LCA_comparison(
self,
component: int = 1,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "LCA_comparison",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot comparison of Linear Combination Analysis (LCA) components across experiments.
Args:
component (int): Components to compare. Defaults to 1.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "LCA_comparison".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = (
f"%{{y:{hover_format}}} +/- %{{customdata[0]:{hover_format}}}"
)
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = line(
data_frame=self.LCA_result[
self.LCA_result["Parameter Name"] == f"w{component}"
],
x=x_axis,
y="Parameter Value",
error_y="Parameter Error",
error_y_mode="band",
custom_data=["Parameter Error"],
color="Metal",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>LCA Transition Comparison<br><sup><i>Component {component}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
yaxis_range=[-0.5, 1.5],
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_PCA(
self,
experiment: Union[str, int] = 0,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "PCA",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the results of Principal Component Analysis (PCA) for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "PCA".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
experiment_filter = self.PCA_result["Experiment"] == experiment
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = px.line(
data_frame=self.PCA_result[experiment_filter],
x=x_axis,
y="Weight",
color="Component Name",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
labels={"Parameter Name": "Component"},
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Principal Component Analysis<br><sup><i>{experiment}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_PCA_frame(
self,
experiment: Union[str, int] = 0,
measurement: int = 1,
save: bool = False,
filename: str = "PCA_frame",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot a single frame of the Principal Component Analysis (PCA) for a given experiment
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
measurement (int): Measurement to plot. Defaults to 1.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "PCA_frame".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
data_filter = (self.PCA_result["Experiment"] == experiment) & (
self.PCA_result["Measurement"] == measurement
)
reference_data_filter = (self.data["Experiment"] == experiment) & (
self.data["Measurement"] == measurement
)
fit_range = self.PCA_result["Fit Range"][data_filter].values[0]
fit_range_filter = (self.data["Energy"] >= fit_range[0]) & (
self.data["Energy"] <= fit_range[1]
)
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot reference measurement
reference_data = self.data["mu_norm"][reference_data_filter].to_numpy()
reference = self.data["mu_norm"][
reference_data_filter & fit_range_filter
].to_numpy()
fig = go.Figure(
go.Scatter(
x=self.data["Energy"][reference_data_filter].to_numpy(),
y=reference_data,
name="Data",
mode="lines",
line=dict(
color="black",
),
)
)
# Plot PCA mean
pca_mean = self.PCA_result["PCA Mean"][data_filter].values[0]
fig.add_trace(
go.Scatter(
x=self.PCA_result["Energy"][
(self.PCA_result["Component Name"] == f"PC 1") & data_filter
].values[0],
y=pca_mean,
name="PCA mean",
mode="lines",
line=dict(
color="grey",
),
)
)
pca_reconstruction = np.zeros_like(reference)
# Plot components
for i, component in enumerate(
self.PCA_result["Component Name"][data_filter].unique()
):
fig.add_trace(
go.Scatter(
x=self.PCA_result["Energy"][
(self.PCA_result["Component Name"] == component)
& data_filter
].values[0],
y=self.PCA_result["Component"][
(self.PCA_result["Component Name"] == component)
& data_filter
].values[0]
* self.PCA_result["Weight"][
(self.PCA_result["Component Name"] == component)
& data_filter
].values[0],
name=component,
mode="lines",
line=dict(
color=sns.color_palette("colorblind").as_hex()[i],
),
)
)
pca_reconstruction += (
self.PCA_result["Component"][
(self.PCA_result["Component Name"] == component) & data_filter
].values[0]
* self.PCA_result["Weight"][
(self.PCA_result["Component Name"] == component) & data_filter
].values[0]
)
pca_reconstruction += pca_mean
# Plot PCA reconstruction
fig.add_trace(
go.Scatter(
x=self.PCA_result["Energy"][
(self.PCA_result["Component Name"] == f"PC 1") & data_filter
].values[0],
y=pca_reconstruction,
name="PCA reconstruction",
mode="lines",
line=dict(
color="magenta",
),
)
)
# Plot residual
fig.add_trace(
go.Scatter(
x=self.PCA_result["Energy"][
(self.PCA_result["Component Name"] == f"PC 1") & data_filter
].values[0],
y=reference - pca_reconstruction,
name="Residual",
mode="lines",
line=dict(
color="red",
),
)
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Principal Component Analysis<br><sup><i>{experiment} - Measurement {measurement}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title="<b>Normalized μ(E) [a.u.]</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
[docs]
def plot_PCA_comparison(
self,
component: Union[int, list[int]] = 1,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "PCA_comparison",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot comparison of Principal Component Analysis (PCA) components across experiments.
Args:
component (Union[int, list[int]]): Components to compare. Defaults to 1.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "PCA_comparison".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Length of list must match number of experiments.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(component, list):
if len(component) != len(self.experiments):
raise ValueError("Length of list must match number of experiments")
else:
component = [component] * len(self.experiments)
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = go.Figure()
for i, experiment in enumerate(self.experiments):
data_filter = (self.PCA_result["Experiment"] == experiment) & (
self.PCA_result["Component Number"] == component[i]
)
fig.add_trace(
go.Scatter(
x=self.PCA_result[x_axis][data_filter],
y=self.PCA_result["Weight"][data_filter],
name=self.PCA_result["Metal"][data_filter].values[0]
+ f" (PC {component[i]})",
mode="lines",
line=dict(
color=sns.color_palette("colorblind").as_hex()[i],
),
)
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>PCA Transition Comparison</b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_PCA_explained_variance(
self,
plot_type: str = "cumulative",
variance_threshold: Union[None, float] = None,
fit_range: Union[None, tuple[float, float], list[tuple[float, float]]] = None,
seed: Union[None, int] = None,
save: bool = False,
filename: str = "PCA_explained_variance",
format: str = ".png",
show: bool = True,
hover_format: str = ".2%",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
return_df: bool = False,
):
"""
Plot the explained variance of Principal Component Analysis (PCA) for all experiments.
Args:
plot_type (str, optional): Type of plot to generate. Defaults to "cumulative".
variance_threshold (Union[None, float], optional): Threshold for the explained variance to plot. Defaults to None.
fit_range (Union[None, tuple[float, float], list[tuple[float, float]]], optional): Range of energies to fit the PCA. Defaults to None.
seed (Union[None, int], optional): Random seed for PCA. Defaults to None.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "PCA_explained_variance".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2%".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
return_df (bool, optional): Whether to return the DataFrame with PCA results. Defaults to False.
Raises:
ValueError: Invalid plot type.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if plot_type not in ["ratio", "cumulative"]:
raise ValueError(
'Invalid plot type. Choose between "ratio" and "cumulative"'
)
if isinstance(fit_range, list):
if len(fit_range) != len(self.metals):
raise ValueError("Number of fit ranges must match number of metals")
elif isinstance(fit_range, tuple):
fit_range = [fit_range] * len(self.metals)
else:
fit_range = [(0, np.inf)] * len(self.metals)
experiment_list = []
metal_list = []
component_number_list = []
component_name_list = []
explained_variance_list = []
explained_variance_ratio_list = []
cumulative_explained_variance_list = []
for i, experiment in enumerate(self.experiments):
fit_range_filter = (self.data["Energy"] >= fit_range[i][0]) & (
self.data["Energy"] <= fit_range[i][1]
)
measurements = self.data["Measurement"][
self.data["Experiment"] == experiment
].unique()
data = (
self.data["mu_norm"][
(self.data["Experiment"] == experiment) & fit_range_filter
]
.to_numpy()
.reshape(len(measurements), -1)
)
pca = PCA(random_state=seed)
pca.fit(data)
# Store PCA results
for i in range(pca.n_components_ + 1):
experiment_list.append(experiment)
metal_list.append(
self.data["Metal"][self.data["Experiment"] == experiment].values[0]
)
component_number_list.append(i)
component_name_list.append(f"PC {i}")
if i == 0:
explained_variance_list.append(0)
explained_variance_ratio_list.append(0)
cumulative_explained_variance_list.append(0)
else:
explained_variance_list.append(pca.explained_variance_[i - 1])
explained_variance_ratio_list.append(
pca.explained_variance_ratio_[i - 1]
)
cumulative_explained_variance_list.append(
pca.explained_variance_ratio_[:i].sum()
)
pca_results = pd.DataFrame(
{
"Experiment": experiment_list,
"Metal": metal_list,
"Component Number": component_number_list,
"Component Name": component_name_list,
"Explained Variance": explained_variance_list,
"Explained Variance Ratio": explained_variance_ratio_list,
"Cumulative Explained Variance": cumulative_explained_variance_list,
}
)
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
if plot_type == "ratio":
fig = px.bar(
data_frame=pca_results[pca_results["Component Number"] != 0],
x="Component Name",
y="Explained Variance Ratio",
color="Metal",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
barmode="group",
)
# Specify axis titles
xaxis_title = "<b>Principal Components</b>"
yaxis_title = "<b>Explained Variance</b>"
# Specify title text
if show_title:
title_text = f"<b>Explained Variance Ratio</b>"
else:
title_text = ""
# Change bar formatting
fig.update_traces(
marker=dict(
line=dict(
width=1,
),
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
elif plot_type == "cumulative":
fig = px.line(
data_frame=pca_results,
x="Component Number",
y="Cumulative Explained Variance",
color="Metal",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
)
# Specify axis titles
xaxis_title = "<b>Number of Principal Components</b>"
yaxis_title = "<b>Explained Variance</b>"
# Specify title text
if show_title:
title_text = f"<b>Cumulative Explained Variance</b>"
else:
title_text = ""
if variance_threshold:
fig.add_hline(
y=variance_threshold,
line_color="black",
line_width=2,
line_dash="dash",
annotation_text=f"<b>{variance_threshold:.0%}</b>",
annotation_position="bottom right",
annotation_font=dict(
color="black",
size=font_size * 0.8,
),
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=xaxis_title,
yaxis_title=yaxis_title,
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
yaxis=dict(
tickformat=".0%",
),
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
if return_df:
return pca_results
else:
return None
[docs]
def plot_NMF(
self,
experiment: Union[str, int] = 0,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "NMF",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the results of Non-negative Matrix Factorization (NMF) for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "NMF".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
experiment_filter = self.NMF_result["Experiment"] == experiment
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = px.line(
data_frame=self.NMF_result[experiment_filter],
x=x_axis,
y="Weight",
color="Component Name",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
labels={"Parameter Name": "Component"},
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Non-negative Matrix Factorization<br><sup><i>{experiment}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_NMF_frame(
self,
experiment: Union[str, int] = 0,
measurement: int = 1,
save: bool = False,
filename: str = "NMF_frame",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot a single frame of the Non-negative Matrix Factorization (NMF) for a given experiment.
Args:
experiment (Union[str, int]): Experiment to plot. Defaults to 0.
measurement (int): Measurement to plot. Defaults to 1.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "NMF_frame".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Invalid experiment name.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(experiment, int):
experiment = self.experiments[experiment]
elif isinstance(experiment, str) and experiment not in self.experiments:
raise ValueError("Invalid experiment name")
data_filter = (self.NMF_result["Experiment"] == experiment) & (
self.NMF_result["Measurement"] == measurement
)
reference_data_filter = (self.data["Experiment"] == experiment) & (
self.data["Measurement"] == measurement
)
fit_range = self.NMF_result["Fit Range"][data_filter].values[0]
fit_range_filter = (self.data["Energy"] >= fit_range[0]) & (
self.data["Energy"] <= fit_range[1]
)
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot reference measurement
reference_data = self.data["mu_norm"][reference_data_filter].to_numpy()
reference = self.data["mu_norm"][
reference_data_filter & fit_range_filter
].to_numpy()
fig = go.Figure(
go.Scatter(
x=self.data["Energy"][reference_data_filter].to_numpy(),
y=reference_data,
name="Data",
mode="lines",
line=dict(
color="black",
),
)
)
nmf_reconstruction = np.zeros_like(reference)
# Plot components
for i, component in enumerate(
self.NMF_result["Component Name"][data_filter].unique()
):
fig.add_trace(
go.Scatter(
x=self.NMF_result["Energy"][
(self.NMF_result["Component Name"] == component)
& data_filter
].values[0],
y=self.NMF_result["Component"][
(self.NMF_result["Component Name"] == component)
& data_filter
].values[0]
* self.NMF_result["Weight"][
(self.NMF_result["Component Name"] == component)
& data_filter
].values[0],
name=component,
mode="lines",
line=dict(
color=sns.color_palette("colorblind").as_hex()[i],
),
)
)
nmf_reconstruction += (
self.NMF_result["Component"][
(self.NMF_result["Component Name"] == component) & data_filter
].values[0]
* self.NMF_result["Weight"][
(self.NMF_result["Component Name"] == component) & data_filter
].values[0]
)
# Plot NMF reconstruction
fig.add_trace(
go.Scatter(
x=self.NMF_result["Energy"][
(self.NMF_result["Component Name"] == f"Component 1")
& data_filter
].values[0],
y=nmf_reconstruction,
name="NMF reconstruction",
mode="lines",
line=dict(
color="magenta",
),
)
)
# Plot residual
fig.add_trace(
go.Scatter(
x=self.NMF_result["Energy"][
(self.NMF_result["Component Name"] == f"Component 1")
& data_filter
].values[0],
y=reference - nmf_reconstruction,
name="Residual",
mode="lines",
line=dict(
color="red",
),
)
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>Non-negative Matrix Factorization<br><sup><i>{experiment} - Measurement {measurement}</i></sup></b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>Energy [{self.energy_unit}]</b>",
yaxis_title="<b>Normalized μ(E) [a.u.]</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_NMF_comparison(
self,
component: Union[int, list[int]] = 1,
x_axis: str = "Measurement",
save: bool = False,
filename: str = "NMF_comparison",
format: str = ".png",
show: bool = True,
hover_format: str = ".2f",
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot comparison of Non-negative Matrix Factorization (NMF) components across experiments.
Args:
component (Union[int, list[int]]): Components to compare. Defaults to 1.
x_axis (str, optional): Column to use as X-axis in the plot. Defaults to "Measurement".
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "NMF_comparison".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
hover_format (str, optional): Format of numbers in the hover text. Defaults to ".2f".
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
ValueError: Length of list must match number of experiments.
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if isinstance(component, list):
if len(component) != len(self.experiments):
raise ValueError("Length of list must match number of experiments")
else:
component = [component] * len(self.experiments)
if x_axis == "Temperature":
x_unit = self.temperature_unit
x_axis_unit = f"[{self.temperature_unit}]"
else:
x_unit = ""
x_axis_unit = ""
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = f"%{{y:{hover_format}}}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = go.Figure()
for i, experiment in enumerate(self.experiments):
data_filter = (self.NMF_result["Experiment"] == experiment) & (
self.NMF_result["Component Number"] == component[i]
)
fig.add_trace(
go.Scatter(
x=self.NMF_result[x_axis][data_filter],
y=self.NMF_result["Weight"][data_filter],
name=self.NMF_result["Metal"][data_filter].values[0]
+ f" (Component {component[i]})",
mode="lines",
line=dict(
color=sns.color_palette("colorblind").as_hex()[i],
),
)
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>NMF Transition Comparison</b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title=f"<b>{x_axis} {x_axis_unit}</b>",
yaxis_title=f"<b>Weight</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
[docs]
def plot_NMF_error_gradient(
self,
change_cutoff: float = 0.25,
save: bool = False,
filename: str = "NMF_error_change",
format: str = ".png",
show: bool = True,
show_title: bool = True,
figure_size: Union[tuple[int, int], str] = "auto",
font_size: int = 14,
):
"""
Plot the gradient of reconstruction error of Non-negative Matrix Factorization (NMF) as a function of number of components.
Args:
change_cutoff (float, optional): Error change cutoff to plot. Defaults to 0.25.
save (bool, optional): Whether to save the plot. Defaults to False.
filename (str, optional): Name of the file to save. Defaults to "NMF_error_change".
format (str, optional): Format of the file to save. Defaults to ".png".
show (bool, optional): Whether to show the plot. Defaults to True.
show_title (bool, optional): Whether to show the title. Defaults to True.
figure_size (Union[tuple[int, int], str], optional): Size of the figure. Defaults to "auto".
font_size (int, optional): Size of the font. Defaults to 14.
Raises:
NotImplementedError: Matplotlib plot not implemented yet.
Returns:
None: Function does not return anything.
"""
if self.NMF_component_results is None:
_ = self._determine_NMF_components(change_cutoff=change_cutoff)
if self.interactive:
# Formatting for hover text
x_formatting = ".0f"
hovertemplate = "%{y:.2f}"
hovermode = "x unified"
# Plot the measurements of the selected experiment/edge
fig = px.line(
data_frame=self.NMF_component_results,
x="n_components",
y="Error Change",
color="Metal",
color_discrete_sequence=sns.color_palette("colorblind").as_hex(),
)
fig.add_hline(
y=change_cutoff,
line_color="black",
line_width=2,
line_dash="dash",
annotation_text=f"<b>{change_cutoff:.2f}</b>",
annotation_position="top right",
annotation_font=dict(
color="black",
size=font_size * 0.8,
),
)
# Change line formatting
fig.update_traces(
line=dict(
width=2,
),
xhoverformat=x_formatting,
hovertemplate=hovertemplate,
)
# Specify title text
if show_title:
title_text = f"<b>NMF Error Gradient</b>"
else:
title_text = ""
# Specify text and formatting of axis labels
fig.update_layout(
title=title_text,
title_x=0.5,
xaxis_title="<b>Number of NMF components</b>",
yaxis_title="<b>Error gradient</b>",
font=dict(
family="Arial",
size=font_size,
),
hovermode=hovermode,
)
# Set figure size
if figure_size == "auto":
fig.update_layout(
autosize=True,
)
else:
fig.update_layout(
autosize=False,
width=figure_size[0],
height=figure_size[1],
)
if save:
filepath = self.save_directory + filename + format
print(f"Saving plot to {filepath}")
fig.write_image(filepath)
if show:
fig.show()
else:
raise NotImplementedError("Matplotlib plot not implemented yet")
return None
# %% Other functions
[docs]
def line(error_y_mode=None, **kwargs):
"""Extension of `plotly.express.line` to use error bands."""
ERROR_MODES = {"bar", "band", "bars", "bands", None}
if error_y_mode not in ERROR_MODES:
raise ValueError(
f"'error_y_mode' must be one of {ERROR_MODES}, received {repr(error_y_mode)}."
)
if error_y_mode in {"bar", "bars", None}:
fig = px.line(**kwargs)
elif error_y_mode in {"band", "bands"}:
if "error_y" not in kwargs:
raise ValueError(
f"If you provide argument 'error_y_mode' you must also provide 'error_y'."
)
figure_with_error_bars = px.line(**kwargs)
fig = px.line(**{arg: val for arg, val in kwargs.items() if arg != "error_y"})
for data in figure_with_error_bars.data:
x = list(data["x"])
y_upper = list(data["y"] + data["error_y"]["array"])
y_lower = list(
data["y"] - data["error_y"]["array"]
if data["error_y"]["arrayminus"] is None
else data["y"] - data["error_y"]["arrayminus"]
)
color = (
f"rgba({tuple(int(data['line']['color'].lstrip('#')[i:i+2], 16) for i in (0, 2, 4))},.3)".replace(
"((", "("
)
.replace("),", ",")
.replace(" ", "")
)
fig.add_trace(
go.Scatter(
x=x + x[::-1],
y=y_upper + y_lower[::-1],
fill="toself",
fillcolor=color,
line=dict(color="rgba(255,255,255,0)"),
hoverinfo="skip",
showlegend=False,
legendgroup=data["legendgroup"],
xaxis=data["xaxis"],
yaxis=data["yaxis"],
)
)
# Reorder data as said here: https://stackoverflow.com/a/66854398/8849755
reordered_data = []
for i in range(int(len(fig.data) / 2)):
reordered_data.append(fig.data[i + int(len(fig.data) / 2)])
reordered_data.append(fig.data[i])
fig.data = tuple(reordered_data)
return fig