You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

129 lines
3.7 KiB

import csv
from datetime import datetime
import wfdb
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import medfilt
import pywt
fliter = int(0.8*500)
Initial_intercept_point = 0
Final_intercept_point = 2000
Give_up_size = int(fliter/2)
# Get dates, high, and low temperatures from file.
filename = 'sunyan.csv'
with open(filename) as f:
reader = csv.reader(f)
header_row = next(reader)
leaddatas= []
for row in reader:
try:
leaddata = int(row[0])
except ValueError:
print(leaddata, 'missing data')
else:
leaddatas.append(leaddata)
#读取文件结束
#bandpass filter
from scipy import signal
fs = 500 # 采样率为500 Hz
# 设计低通滤波器
cutoff_freq = 40 # 截止频率
order = 1 # 滤波器阶数
nyquist = 0.5 * fs
cutoff = cutoff_freq / nyquist
b, a = signal.butter(order, cutoff, btype='low')
# 应用低通滤波器
lowpass_ecg_leaddatas = signal.filtfilt(b, a, leaddatas)
# 设计高通滤波器
cutoff_freq_high = 0.5 # 截止频率
order_high = 1 # 滤波器阶数
#nyquist = 0.5 * fs
cutoff_high = cutoff_freq_high / nyquist
b, a = signal.butter(order_high, cutoff_high, btype='high')
# 应用高通滤波器
filtered_ecg_signal_leaddatas = signal.filtfilt(b, a, lowpass_ecg_leaddatas)
'''
# 使用小波变换,效果不好
wavelet = 'sym5' # 选择小波类型,sym5是一种对称小波,常用于ECG信号
levels = 4 # 分解级数
coeffs = pywt.wavedec(filtered_ecg_signal_leaddatas, wavelet, level=levels)
# 对小波系数进行阈值处理,这有助于去噪
threshold = 0.2 * np.max(coeffs[-1])
coeffs = [pywt.threshold(c, threshold, mode='soft') for c in coeffs]
reconstructed_signal = pywt.waverec(coeffs, wavelet)
'''
'''
# 小波分解
wavelet = 'db4'
coeffs = pywt.wavedec(filtered_ecg_signal_leaddatas, wavelet, level=4)
# 对高频系数进行阈值处理(软阈值)
sigma = np.median(np.abs(coeffs[-1])) / 0.6745 # 估计噪声标准差
threshold = sigma * np.sqrt(2 * np.log(len(filtered_ecg_signal_leaddatas)))
# 对各层系数进行阈值处理
denoised_coeffs = [pywt.threshold(c, threshold, mode='soft') for c in coeffs]
# 重建信号
denoised_signal = pywt.waverec(denoised_coeffs, wavelet)
'''
# Define the window size for moving average (adjust as needed)
window_size = 2
# Perform moving average filtering
moving_avg_ecg1 = np.convolve(lowpass_ecg_leaddatas, np.ones(window_size)/window_size, mode='valid')
#record = wfdb.rdrecord('mit-bih-arrhythmia-database-1.0.0/100', sampfrom=0, sampto=25000, physical=True, channels=[0, ])
#Original_ECG = record.p_signal[Initial_intercept_point:Final_intercept_point].flatten()
Original_ECG = moving_avg_ecg1
ECG_baseline = medfilt(Original_ECG, fliter+1)
Totality_Bias = np.sum(ECG_baseline[Give_up_size:-Give_up_size])/(Final_intercept_point-Initial_intercept_point-fliter)
Filtered_ECG = Original_ECG - ECG_baseline
Final_Filtered_ECG = Filtered_ECG[Give_up_size:-Give_up_size]-Totality_Bias
plt.figure(figsize=(100, 8))
plt.subplot(2, 1, 1)
plt.ylabel("Original ECG signal")
plt.plot(leaddatas)#输出原图像leaddatas
plt.subplot(2, 1, 2)
plt.ylabel("Filtered ECG signal")
plt.plot(Original_ECG)#基线滤波结果
plt.show()
#心率计算
import biosppy.signals.ecg as ecg
rpeaks = ecg.hamilton_segmenter(signal=Final_Filtered_ECG, sampling_rate=500)
def calculate_heart_rate(r_peaks, sampling_rate):
# 计算相邻R波之间的时间间隔(单位:秒)
rr_intervals = np.diff(r_peaks) / sampling_rate
# 将时间间隔转换为每分钟的心跳数(bpm)
heart_rates = 60 / rr_intervals
return heart_rates
# 计算心率
heart_rates = calculate_heart_rate(rpeaks, sampling_rate=500)
print("Heart rates (bpm):", heart_rates)