Creating an ECG Data Stream with Polar device | by Pareeknikhil | Dec, 2020
[ad_1]
A Python-based application for visualizing ECG with Polar-H10 tracker in real-time is available here. The communication with the Polar device is via a Bluetooth service protocol, knows as GATT (Generic Attribute Profile). It defines the way that two Bluetooth Low Energy devices transfer data back and forth using concepts called Services and Characteristics.
Furthermore, along with the GATT protocol, a special protocol — Heart Rate GATT Service Protocol that the device manufacturer follows to allow users to obtain specific responses such as Hear Beat, Raw ECG data, battery level, etc. via an API would be used in this tutorial. For each of these features such as Heart Beat, ECG, battery level, etc. we will use predefined UUID (Universal Unique Identifier) mapping and write it on the device to obtain real-time data.
Let’s get started ❤
We will use the Bleak library for the Bluetooth Low Energy (BLE) connection. And also Asyncio for the asynchronous programming of the application.
import asyncioimport mathimport osimport signalimport sysimport timeimport pandas as pdfrom bleak import BleakClientfrom bleak.uuids import uuid16_dictimport matplotlib.pyplot as pltimport matplotlib
We will further define the UUID’s for all the features we want to extract from the Polar device:
""" Predefined UUID (Universal Unique Identifier) mapping are based on Heart Rate GATT service Protocol that mostFitness/Heart Rate device manufacturer follow (Polar H10 in this case) to obtain a specific response input fromthe device acting as an API """## UUID mappinguuid16_dict = {v: k for k, v in uuid16_dict.items()}## This is the device MAC ID, please update with your device IDADDRESS = "D4:52:48:88:EA:04"## UUID for model number ##MODEL_NBR_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Model Number String"))## UUID for manufacturer name ##MANUFACTURER_NAME_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Manufacturer Name String"))## UUID for battery level ##BATTERY_LEVEL_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(uuid16_dict.get("Battery Level"))## UUID for connection establsihment with device ##PMD_SERVICE = "FB005C80-02E7-F387-1CAD-8ACD2D8DF0C8"## UUID for Request of stream settings ##PMD_CONTROL = "FB005C81-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of start stream ##PMD_DATA = "FB005C82-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of ECG Stream ##ECG_WRITE = bytearray([0x02, 0x00, 0x00, 0x01, 0x82, 0x00, 0x01, 0x01, 0x0E, 0x00])## For Plolar H10 sampling frequency ##ECG_SAMPLING_FREQ = 130## Resource allocation for data collectionecg_session_data = []ecg_session_time = []
The data from the Polar device is received as a stream and in hexadecimal bytes and we would require to decode and build a streaming service for the same in decimal bytes. The following functions do the same:
def data_conv(sender, data): if data[0] == 0x00:
timestamp = convert_to_unsigned_long(data, 1, 8)
step = 3
samples = data[10:]
offset = 0 while offset < len(samples):
ecg = convert_array_to_signed_int(samples, offset, step)
offset += step
ecg_session_data.extend([ecg])
ecg_session_time.extend([timestamp])def convert_array_to_signed_int(data, offset, length):
return int.from_bytes(bytearray(data[offset : offset + length]),
byteorder="little", signed=True,)def convert_to_unsigned_long(data, offset, length):
return int.from_bytes(bytearray(data[offset : offset + length]),
byteorder="little", signed=False,)
Now comes the last bit to write an async method that would write and assemble all necessary UUID’s on the device and open the ECG stream from Polar. This is a fairly simple task, but a lengthy one, and it would be convenient for the readers here to direct you to my Github repo for a complete working application. On running the main.py, one could replicate the plot shown below.
Read More …
[ad_2]