Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _hrv_rsa_pb(ecg_period, sampling_rate, continuous=False):
"""Porges-Bohrer method."""
if continuous is True:
return None
# Re-sample at 2 Hz
resampled = signal_resample(ecg_period, sampling_rate=sampling_rate, desired_sampling_rate=2)
# Fit 21-point cubic polynomial filter (zero mean, 3rd order)
# with a low-pass cutoff frequency of 0.095Hz
trend = signal_filter(
resampled, sampling_rate=2, lowcut=0.095, highcut=None, method="savgol", order=3, window_size=21
)
zero_mean = resampled - trend
# Remove variance outside bandwidth of spontaneous respiration
zero_mean_filtered = signal_filter(zero_mean, sampling_rate=2, lowcut=0.12, highcut=0.40)
# Divide into 30-second epochs
time = np.arange(0, len(zero_mean_filtered)) / 2
time = pd.DataFrame({"Epoch Index": time // 30, "Signal": zero_mean_filtered})
time = time.set_index("Epoch Index")
def load_signal_from_disk(filename=None, sampling_rate=2000):
if filename is None:
ecg = nk.ecg_simulate(duration=10, sampling_rate=sampling_rate, method="ecgsyn")
else:
filename = (pathlib.Path(__file__) / "../ecg_data" / filename).resolve().as_posix()
ecg = np.array(pd.read_csv(filename))[:, 1]
return ecg, sampling_rate
def test_hrv():
ecg = nk.ecg_simulate(duration=60, sampling_rate=1000, heart_rate=110, random_state=42)
_, peaks = nk.ecg_process(ecg, sampling_rate=1000)
ecg_hrv = nk.hrv(peaks, sampling_rate=1000)
columns = ['HRV_RMSSD', 'HRV_MeanNN', 'HRV_SDNN', 'HRV_SDSD', 'HRV_CVNN',
'HRV_CVSD', 'HRV_MedianNN', 'HRV_MadNN', 'HRV_MCVNN', 'HRV_IQRNN',
'HRV_pNN50', 'HRV_pNN20', 'HRV_TINN', 'HRV_HTI', 'HRV_ULF',
'HRV_VLF', 'HRV_LF', 'HRV_HF', 'HRV_VHF', 'HRV_LFHF', 'HRV_LFn',
'HRV_HFn', 'HRV_LnHF', 'HRV_SD1', 'HRV_SD2', 'HRV_SD1SD2', 'HRV_S',
'HRV_CSI', 'HRV_CVI', 'HRV_CSI_Modified', 'HRV_PIP', 'HRV_IALS',
'HRV_PSS', 'HRV_PAS', 'HRV_GI', 'HRV_SI', 'HRV_AI', 'HRV_PI',
'HRV_C1d', 'HRV_C1a', 'HRV_SD1d',
'HRV_SD1a', 'HRV_C2d',
'HRV_C2a', 'HRV_SD2d', 'HRV_SD2a',
'HRV_Cd', 'HRV_Ca', 'HRV_SDNNd',
assert info_gamboa["ECG_R_Peaks"].size == 69
# Test elgendi2010 method
info_elgendi = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="elgendi2010"), method="elgendi2010")
assert info_elgendi["ECG_R_Peaks"].size == 70
# Test engzeemod2012 method
info_engzeemod = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="engzeemod2012"), method="engzeemod2012")
assert info_engzeemod["ECG_R_Peaks"].size == 70
# Test kalidas2017 method
info_kalidas = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="kalidas2017"), method="kalidas2017")
assert np.allclose(info_kalidas["ECG_R_Peaks"].size, 68, atol=1)
# Test martinez2003 method
ecg = nk.ecg_simulate(duration=60, sampling_rate=sampling_rate, noise=0, random_state=42)
ecg_cleaned = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
info_martinez = nk.ecg_findpeaks(ecg_cleaned, method="martinez2003")
assert np.allclose(info_martinez["ECG_R_Peaks"].size, 69, atol=1)
def test_ecg_simulate():
ecg1 = nk.ecg_simulate(duration=20, length=5000, method="simple", noise=0)
assert len(ecg1) == 5000
ecg2 = nk.ecg_simulate(duration=20, length=5000, heart_rate=500)
# pd.DataFrame({"ECG1":ecg1, "ECG2": ecg2}).plot()
# pd.DataFrame({"ECG1":ecg1, "ECG2": ecg2}).hist()
assert len(nk.signal_findpeaks(ecg1, height_min=0.6)["Peaks"]) < len(
nk.signal_findpeaks(ecg2, height_min=0.6)["Peaks"]
)
ecg3 = nk.ecg_simulate(duration=10, length=5000)
# pd.DataFrame({"ECG1":ecg1, "ECG3": ecg3}).plot()
assert len(nk.signal_findpeaks(ecg2, height_min=0.6)["Peaks"]) > len(
nk.signal_findpeaks(ecg3, height_min=0.6)["Peaks"]
)
def test_hrv_frequency():
# Test frequency domain
ecg1 = nk.ecg_simulate(duration=60, sampling_rate=2000, heart_rate=70, random_state=42)
_, peaks1 = nk.ecg_process(ecg1, sampling_rate=2000)
hrv1 = nk.hrv_frequency(peaks1, sampling_rate=2000)
ecg2 = nk.signal_resample(ecg1, sampling_rate=2000, desired_sampling_rate=500)
_, peaks2 = nk.ecg_process(ecg2, sampling_rate=500)
hrv2 = nk.hrv_frequency(peaks2, sampling_rate=500)
assert np.allclose(hrv1["HRV_HF"] - hrv2["HRV_HF"], 0, atol=1.5)
assert np.isnan(hrv1["HRV_LF"][0])
assert np.isnan(hrv2["HRV_LF"][0])
assert np.isnan(hrv1["HRV_VLF"][0])
assert np.isnan(hrv2["HRV_LF"][0])
def test_ecg_clean():
sampling_rate = 1000
noise = 0.05
ecg = nk.ecg_simulate(sampling_rate=sampling_rate, noise=noise)
ecg_cleaned_nk = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
assert ecg.size == ecg_cleaned_nk.size
# Assert that highpass filter with .5 Hz lowcut was applied.
fft_raw = np.abs(np.fft.rfft(ecg))
fft_nk = np.abs(np.fft.rfft(ecg_cleaned_nk))
freqs = np.fft.rfftfreq(ecg.size, 1 / sampling_rate)
assert np.sum(fft_raw[freqs < 0.5]) > np.sum(fft_nk[freqs < 0.5])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69)
ecg_biosppy = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=ecg,
def test_ecg_findpeaks():
sampling_rate = 1000
ecg = nk.ecg_simulate(duration=60, sampling_rate=sampling_rate, noise=0, method="simple", random_state=42)
ecg_cleaned = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
# Test neurokit methodwith show=True
info_nk = nk.ecg_findpeaks(ecg_cleaned, show=True)
assert info_nk["ECG_R_Peaks"].size == 69
# This will identify the latest figure.
fig = plt.gcf()
assert len(fig.axes) == 2
# Test pantompkins1985 method
info_pantom = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="pantompkins1985"), method="pantompkins1985")
assert info_pantom["ECG_R_Peaks"].size == 70
# Test hamilton2002 method
def test_eog_intervalrelated():
eog = nk.data('eog_200hz')['vEOG']
eog_signals, info = nk.eog_process(eog, sampling_rate=200)
columns = ['EOG_Peaks_N', 'EOG_Rate_Mean']
# Test with signal dataframe
features = nk.eog_intervalrelated(eog_signals)
assert all(elem in np.array(features.columns.values, dtype=str) for elem
in columns)
assert features.shape[0] == 1 # Number of rows
# Test with dict
epochs = nk.epochs_create(eog_signals, events=[5000, 10000, 15000], epochs_start=-0.1, epochs_end=1.9)
epochs_dict = nk.eog_intervalrelated(epochs)
assert all(elem in columns for elem
def test_eog_findpeaks():
eog_signal = nk.data('eog_100hz')
eog_cleaned = nk.eog_clean(eog_signal, sampling_rate=100)
# Test with Neurokit
nk_peaks = nk.eog_findpeaks(eog_cleaned, sampling_rate=100, method="neurokit", threshold=0.33, show=False)
assert nk_peaks.size == 19
# Test with MNE
mne_peaks = nk.eog_findpeaks(eog_cleaned, method="mne")
assert mne_peaks.size == 44
# Test with brainstorm
brainstorm_peaks = nk.eog_findpeaks(eog_cleaned, method="brainstorm")
assert brainstorm_peaks.size == 28
blinker_peaks = nk.eog_findpeaks(eog_cleaned, method="blinker", sampling_rate=100)
assert blinker_peaks.size == 14