|
|
import csv from datetime import datetime
import wfdb import matplotlib.pyplot as plt import numpy as np from scipy.signal import medfilt import pywt
fliter = int(0.8*500) Initial_intercept_point = 0 Final_intercept_point = 2000 Give_up_size = int(fliter/2)
# Get dates, high, and low temperatures from file. filename = 'sunyan.csv' with open(filename) as f: reader = csv.reader(f) header_row = next(reader)
leaddatas= [] for row in reader: try: leaddata = int(row[0]) except ValueError: print(leaddata, 'missing data') else: leaddatas.append(leaddata) #读取文件结束
#bandpass filter from scipy import signal fs = 500 # 采样率为500 Hz
# 设计低通滤波器 cutoff_freq = 40 # 截止频率 order = 1 # 滤波器阶数 nyquist = 0.5 * fs cutoff = cutoff_freq / nyquist b, a = signal.butter(order, cutoff, btype='low')
# 应用低通滤波器 lowpass_ecg_leaddatas = signal.filtfilt(b, a, leaddatas)
# 设计高通滤波器 cutoff_freq_high = 0.5 # 截止频率 order_high = 1 # 滤波器阶数 #nyquist = 0.5 * fs cutoff_high = cutoff_freq_high / nyquist b, a = signal.butter(order_high, cutoff_high, btype='high')
# 应用高通滤波器 filtered_ecg_signal_leaddatas = signal.filtfilt(b, a, lowpass_ecg_leaddatas)
'''
# 使用小波变换,效果不好 wavelet = 'sym5' # 选择小波类型,sym5是一种对称小波,常用于ECG信号 levels = 4 # 分解级数 coeffs = pywt.wavedec(filtered_ecg_signal_leaddatas, wavelet, level=levels)
# 对小波系数进行阈值处理,这有助于去噪 threshold = 0.2 * np.max(coeffs[-1]) coeffs = [pywt.threshold(c, threshold, mode='soft') for c in coeffs] reconstructed_signal = pywt.waverec(coeffs, wavelet) '''
'''
# 小波分解 wavelet = 'db4' coeffs = pywt.wavedec(filtered_ecg_signal_leaddatas, wavelet, level=4)
# 对高频系数进行阈值处理(软阈值) sigma = np.median(np.abs(coeffs[-1])) / 0.6745 # 估计噪声标准差 threshold = sigma * np.sqrt(2 * np.log(len(filtered_ecg_signal_leaddatas)))
# 对各层系数进行阈值处理 denoised_coeffs = [pywt.threshold(c, threshold, mode='soft') for c in coeffs]
# 重建信号 denoised_signal = pywt.waverec(denoised_coeffs, wavelet)
'''
# Define the window size for moving average (adjust as needed) window_size = 2
# Perform moving average filtering moving_avg_ecg1 = np.convolve(lowpass_ecg_leaddatas, np.ones(window_size)/window_size, mode='valid')
#record = wfdb.rdrecord('mit-bih-arrhythmia-database-1.0.0/100', sampfrom=0, sampto=25000, physical=True, channels=[0, ]) #Original_ECG = record.p_signal[Initial_intercept_point:Final_intercept_point].flatten() Original_ECG = moving_avg_ecg1 ECG_baseline = medfilt(Original_ECG, fliter+1) Totality_Bias = np.sum(ECG_baseline[Give_up_size:-Give_up_size])/(Final_intercept_point-Initial_intercept_point-fliter) Filtered_ECG = Original_ECG - ECG_baseline Final_Filtered_ECG = Filtered_ECG[Give_up_size:-Give_up_size]-Totality_Bias
plt.figure(figsize=(100, 8)) plt.subplot(2, 1, 1) plt.ylabel("Original ECG signal") plt.plot(leaddatas)#输出原图像leaddatas
plt.subplot(2, 1, 2) plt.ylabel("Filtered ECG signal") plt.plot(Original_ECG)#基线滤波结果
plt.show()
#心率计算 import biosppy.signals.ecg as ecg
rpeaks = ecg.hamilton_segmenter(signal=Final_Filtered_ECG, sampling_rate=500)
def calculate_heart_rate(r_peaks, sampling_rate): # 计算相邻R波之间的时间间隔(单位:秒) rr_intervals = np.diff(r_peaks) / sampling_rate # 将时间间隔转换为每分钟的心跳数(bpm) heart_rates = 60 / rr_intervals return heart_rates
# 计算心率 heart_rates = calculate_heart_rate(rpeaks, sampling_rate=500) print("Heart rates (bpm):", heart_rates)
|