Creating an ECG Data Stream with Polar device | by Pareeknikhil | Dec, 2020

[ad_1]


import asyncioimport mathimport osimport signalimport sysimport timeimport pandas as pdfrom bleak import BleakClientfrom bleak.uuids import uuid16_dictimport matplotlib.pyplot as pltimport matplotlib
""" Predefined UUID (Universal Unique Identifier) mapping are based on Heart Rate GATT service Protocol that mostFitness/Heart Rate device manufacturer follow (Polar H10 in this case) to obtain a specific response input fromthe device acting as an API """## UUID mappinguuid16_dict = {v: k for k, v in uuid16_dict.items()}## This is the device MAC ID, please update with your device IDADDRESS = "D4:52:48:88:EA:04"## UUID for model number ##MODEL_NBR_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Model Number String"))## UUID for manufacturer name ##MANUFACTURER_NAME_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Manufacturer Name String"))## UUID for battery level ##BATTERY_LEVEL_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Battery Level"))## UUID for connection establsihment with device ##PMD_SERVICE = "FB005C80-02E7-F387-1CAD-8ACD2D8DF0C8"## UUID for Request of stream settings ##PMD_CONTROL = "FB005C81-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of start stream ##
PMD_DATA = "FB005C82-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of ECG Stream ##
ECG_WRITE = bytearray([0x02, 0x00, 0x00, 0x01, 0x82, 0x00, 0x01, 0x01, 0x0E, 0x00])## For Plolar H10 sampling frequency ##ECG_SAMPLING_FREQ = 130## Resource allocation for data collectionecg_session_data = []ecg_session_time = []
def data_conv(sender, data):    if data[0] == 0x00:
timestamp = convert_to_unsigned_long(data, 1, 8)
step = 3
samples = data[10:]
offset = 0
while offset < len(samples):
ecg = convert_array_to_signed_int(samples, offset, step)
offset += step
ecg_session_data.extend([ecg])
ecg_session_time.extend([timestamp])
def convert_array_to_signed_int(data, offset, length):
return int.from_bytes(bytearray(data[offset : offset + length]),
byteorder="little", signed=True,)
def convert_to_unsigned_long(data, offset, length):
return int.from_bytes(bytearray(data[offset : offset + length]),
byteorder="little", signed=False,)
Fig — 2: Working ECG stream with Polar-H10

Read More …

[ad_2]


Write a comment