Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_eda_clean():
sampling_rate = 1000
eda = nk.eda_simulate(
duration=30, sampling_rate=sampling_rate, scr_number=6, noise=0.01, drift=0.01, random_state=42
)
clean = nk.eda_clean(eda, sampling_rate=sampling_rate)
assert len(clean) == len(eda)
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/eda.py)
eda_biosppy = nk.eda_clean(eda, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=eda, ftype="butter", band="lowpass", order=4, frequency=5, sampling_rate=sampling_rate
)
original, _ = biosppy.tools.smoother(signal=original, kernel="boxzen", size=int(0.75 * sampling_rate), mirror=True)
# pd.DataFrame({"our":eda_biosppy, "biosppy":original}).plot()
assert np.allclose((eda_biosppy - original).mean(), 0, atol=1e-5)
# Check if filter was applied.
fft_raw = np.abs(np.fft.rfft(rsp))
fft_khodadad2018 = np.abs(np.fft.rfft(khodadad2018))
fft_biosppy = np.abs(np.fft.rfft(rsp_biosppy))
freqs = np.fft.rfftfreq(len(rsp), 1 / sampling_rate)
assert np.sum(fft_raw[freqs > 3]) > np.sum(fft_khodadad2018[freqs > 3])
assert np.sum(fft_raw[freqs < 0.05]) > np.sum(fft_khodadad2018[freqs < 0.05])
assert np.sum(fft_raw[freqs > 0.35]) > np.sum(fft_biosppy[freqs > 0.35])
assert np.sum(fft_raw[freqs < 0.1]) > np.sum(fft_biosppy[freqs < 0.1])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/resp.py#L62)
rsp_biosppy = nk.rsp_clean(rsp, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=rsp, ftype="butter", band="bandpass", order=2, frequency=[0.1, 0.35], sampling_rate=sampling_rate
)
original = nk.signal_detrend(original, order=0)
assert np.allclose((rsp_biosppy - original).mean(), 0, atol=1e-6)
ecg = nk.ecg_simulate(sampling_rate=sampling_rate, noise=noise)
ecg_cleaned_nk = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
assert ecg.size == ecg_cleaned_nk.size
# Assert that highpass filter with .5 Hz lowcut was applied.
fft_raw = np.abs(np.fft.rfft(ecg))
fft_nk = np.abs(np.fft.rfft(ecg_cleaned_nk))
freqs = np.fft.rfftfreq(ecg.size, 1 / sampling_rate)
assert np.sum(fft_raw[freqs < 0.5]) > np.sum(fft_nk[freqs < 0.5])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69)
ecg_biosppy = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=ecg,
ftype="FIR",
band="bandpass",
order=int(0.3 * sampling_rate),
frequency=[3, 45],
sampling_rate=sampling_rate,
)
assert np.allclose((ecg_biosppy - original).mean(), 0, atol=1e-6)
def test_emg_clean():
sampling_rate = 1000
emg = nk.emg_simulate(duration=20, sampling_rate=sampling_rate)
emg_cleaned = nk.emg_clean(emg, sampling_rate=sampling_rate)
assert emg.size == emg_cleaned.size
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/emg.py)
original, _, _ = biosppy.tools.filter_signal(
signal=emg, ftype="butter", band="highpass", order=4, frequency=100, sampling_rate=sampling_rate
)
emg_cleaned_biosppy = nk.signal_detrend(original, order=0)
assert np.allclose((emg_cleaned - emg_cleaned_biosppy).mean(), 0, atol=1e-6)
v1s = int(1. * sampling_rate)
v100ms = int(0.1 * sampling_rate)
TH_elapsed = np.ceil(0.36 * sampling_rate)
sm_size = int(0.08 * sampling_rate)
init_ecg = 8 # seconds for initialization
if dur < init_ecg:
init_ecg = int(dur)
# filtering
filtered, _, _ = biosppy.tools.filter_signal(signal=signal,
ftype='butter',
band='lowpass',
order=4,
frequency=25.,
sampling_rate=sampling_rate)
filtered, _, _ = biosppy.tools.filter_signal(signal=filtered,
ftype='butter',
band='highpass',
order=4,
frequency=3.,
sampling_rate=sampling_rate)
# diff
dx = np.abs(np.diff(filtered, 1) * sampling_rate)
# smoothing
dx, _ = biosppy.tools.smoother(signal=dx, kernel='hamming', size=sm_size, mirror=True)
# buffers
qrspeakbuffer = np.zeros(init_ecg)
noisepeakbuffer = np.zeros(init_ecg)
peak_idx_test = np.zeros(init_ecg)
- Hamilton, P. (2002, September). Open source ECG analysis. In Computers in Cardiology, 2002 (pp. 101-104). IEEE.
- Kathirvel, P., Manikandan, M. S., Prasanna, S. R. M., & Soman, K. P. (2011). An efficient R-peak detection based on new nonlinear transformation and first-order Gaussian differentiator. Cardiovascular Engineering and Technology, 2(4), 408-425.
- Canento, F., Lourenço, A., Silva, H., & Fred, A. (2013). Review and Comparison of Real Time Electrocardiogram Segmentation Algorithms for Biometric Applications. In Proceedings of the 6th Int’l Conference on Health Informatics (HEALTHINF).
- Christov, I. I. (2004). Real time electrocardiogram QRS detection using combined adaptive threshold. Biomedical engineering online, 3(1), 28.
- Engelse, W. A. H., & Zeelenberg, C. (1979). A single scan algorithm for QRS-detection and feature extraction. Computers in cardiology, 6(1979), 37-42.
- Lourenço, A., Silva, H., Leite, P., Lourenço, R., & Fred, A. L. (2012, February). Real Time Electrocardiogram Segmentation for Finger based ECG Biometrics. In Biosignals (pp. 49-54).
"""
# Signal Processing
# =======================
# Transform to array
ecg = np.array(ecg)
# Filter signal
if filter_type in ["FIR", "butter", "cheby1", "cheby2", "ellip", "bessel"]:
order = int(filter_order * sampling_rate)
filtered, _, _ = biosppy.tools.filter_signal(signal=ecg,
ftype=filter_type,
band=filter_band,
order=order,
frequency=filter_frequency,
sampling_rate=sampling_rate)
else:
filtered = ecg # filtered is not-filtered
# Segment
if segmenter == "hamilton":
rpeaks, = biosppy.ecg.hamilton_segmenter(signal=filtered, sampling_rate=sampling_rate)
elif segmenter == "gamboa":
rpeaks, = biosppy.ecg.gamboa_segmenter(signal=filtered, sampling_rate=sampling_rate, tol=0.002)
elif segmenter == "engzee":
rpeaks, = biosppy.ecg.engzee_segmenter(signal=filtered, sampling_rate=sampling_rate, threshold=0.48)
elif segmenter == "christov":
References
-----------
- Greco, A., Valenza, G., & Scilingo, E. P. (2016). Evaluation of CDA and CvxEDA Models. In Advances in Electrodermal Activity Processing with Applications for Mental Health (pp. 35-43). Springer International Publishing.
- Greco, A., Valenza, G., Lanata, A., Scilingo, E. P., & Citi, L. (2016). cvxEDA: A convex optimization approach to electrodermal activity processing. IEEE Transactions on Biomedical Engineering, 63(4), 797-804.
- Kim, K. H., Bang, S. W., & Kim, S. R. (2004). Emotion recognition system using short-term monitoring of physiological signals. Medical and biological engineering and computing, 42(3), 419-427.
- Gamboa, H. (2008). Multi-Modal Behavioral Biometrics Based on HCI and Electrophysiology (Doctoral dissertation, PhD thesis, Universidade Técnica de Lisboa, Instituto Superior Técnico).
"""
# Initialization
eda = np.array(eda)
eda_df = pd.DataFrame({"EDA_Raw": np.array(eda)})
# Preprocessing
# ===================
# Filtering
filtered, _, _ = biosppy.tools.filter_signal(signal=eda,
ftype='butter',
band='lowpass',
order=4,
frequency=5,
sampling_rate=sampling_rate)
# Smoothing
filtered, _ = biosppy.tools.smoother(signal=filtered,
kernel='boxzen',
size=int(0.75 * sampling_rate),
mirror=True)
eda_df["EDA_Filtered"] = filtered
# Derive Phasic and Tonic
try:
tonic, phasic = cvxEDA(eda, sampling_rate=sampling_rate, alpha=alpha, gamma=gamma)