import os
import sys
import time
import asyncio
import argparse
import threading
import socketserver # we use the socketsever module that comes with python3
from datetime import datetime
from bleak import BleakScanner, BleakClient
from bleakheart import PolarMeasurementData
"""
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
Copyright (C) 2023 Fabrizio Smeraldi <fabrizio@smeraldi.net>
"""
""" ECG acquisition using an asynchronous queue - producer/consumer model
Copied from bleakheart example
"""
[docs]
class UDPHandler(socketserver.DatagramRequestHandler):
[docs]
def handle(self):
# for line terminated massages
# msgRecvd = self.rfile.readline().strip()
print("received message - stopping stream")
def server_shutdown(server):
server.shutdown()
threading.Thread(target=server_shutdown, kwargs={"server": self.server}).start()
# with receive buffer - reads max 1024 bytes
# rec_bytes = self.request.recv(1024)
# your processing here
[docs]
class EcgProducer:
"""Add ECG data from one polar h10 device to a queue.
Based on producer/consumer design pattern.
"""
def __init__(self, polar_sn, polar_id):
self.quitclient = asyncio.Event()
self.polar_sn = polar_sn # polar h10 serial number
self.polar_id = polar_id
self.stream_flag = False
self.stop_stream = False
self.stream_ended = False
[docs]
def udp_handler(self, loop=None, port=5354):
"""UDP socket listening on port for signal to stop ECG stream."""
# using logger results in an error..
print(f"listening on port {port}")
listen_addr = ("0.0.0.0", port)
# socketserver.UDPServer("0.0.0.0", port)
# with allowing to reuse the address we dont get into problems
# running it consecutively sometimes
socketserver.UDPServer.allow_reuse_address = True
serverUDP = socketserver.UDPServer(listen_addr, UDPHandler)
serverUDP.serve_forever()
print("stopping stream")
self.stop_stream = True
if loop is None:
self.quitclient.set() # we are in the event loop thread
else:
# we are in a separate thread - call set in the event loop thread
loop.call_soon_threadsafe(self.quitclient.set)
[docs]
def disconnected_callback(self, client):
"""Called by BleakClient if the sensor disconnects"""
print(f"Sensor {self.polar_id} disconnected")
self.stream_ended = True
self.quitclient.set() # causes the ble client task to exit
[docs]
async def connect_to_sn(self, devices):
"""Scan bluetooth devices and find polar devices by their Serial Numbers"""
# devices = await BleakScanner.discover()
polar_devices = [
dev for dev in devices if dev.name and "polar" in dev.name.lower()
]
print(
f"Bluetooth scanning found {len(devices)} devices, {len(polar_devices)} polar device"
)
# print(devices)
# print(polar_devices)
# for SN in polar_list:
device = [dev for dev in polar_devices if self.polar_sn in dev.name]
# print(device)
if len(device) == 0:
# logger.error(f"failure to find device")
print("failure to find device")
return device
# async def run_multiple_devices(self, device, queue):
# """connect to multiple polar devices and stream ECG data
# Design pattern : Multiple producers - 1 consumer?
# """
# async def run_ble_client(self, device, queue, logger, port, dongle_name):
[docs]
async def run_ble_client(
self, device, polar_id, ecg_queue, acc_queue, port, dongle_name
):
"""This task connects to the BLE server (the heart rate sensor)
identified by device, starts ECG notification and pushes the ECG
data to the queue. The tasks terminates when the sensor disconnects
or the user hits enter."""
## Flag stream launched
# self.stream_flag = False
# listen_addr = ("0.0.0.0", 5354)
## with allowing to reuse the address we dont get into problems running it consecutively sometimes
# socketserver.UDPServer.allow_reuse_address = True
## register our class
# serverUDP = socketserver.UDPServer(listen_addr, UDPHandler)
self.quitclient = asyncio.Event()
# while True and not self.stop_stream:
print("scanning for device")
found_device = await BleakScanner.find_device_by_address(device.address)
if found_device is None:
# logger.warning("no device found, wait 5s then scan again")
print("no device found, wait 5s then scan again")
await asyncio.sleep(5)
try:
# logger.info(f"Connecting to {device}...")
print(f"Connecting to {device}...")
# the context manager will handle connection/disconnection for us
async with BleakClient(
device,
disconnected_callback=self.disconnected_callback,
adapter=dongle_name,
) as client:
# logger.info(f"Connected: {client.is_connected}")
print(f"Connected: {client.is_connected}")
# create the Polar Measurement Data object; set queue for
# ecg data
# pmd = PolarMeasurementData(client, ecg_queue=queue)
pmd = PolarMeasurementData(
client, ecg_queue=ecg_queue, acc_queue=acc_queue
)
# ask about ECG settings
if not self.stream_flag:
ecg_settings = await pmd.available_settings("ECG")
# logger.info
print("Request for available ECG settings returned the following:")
for k, v in ecg_settings.items():
# logger.info(f"{k}:\t{v}")
print(f"{k}:\t{v}")
acc_settings = await pmd.available_settings("ACC")
# logger.info
print("Request for available ECG settings returned the following:")
for k, v in acc_settings.items():
# logger.info
print(f"{k}:\t{v}")
# press enter to stop streaming
loop = asyncio.get_running_loop()
# Set the loop to call keyboard_handler when one line of input is
# ready on stdin
# loop.add_reader(sys.stdin, self.keyboard_handler)
# loop.add_reader(sys.stdin, self.udp_handler)
# else:
# # run keyboard_handler in a daemon thread
threading.Thread(
target=self.udp_handler,
kwargs={"loop": loop, "port": port},
daemon=True,
).start()
# start notifications; bleakheart will start pushing
# data to the queue we passed to PolarMeasurementData
# start streaming if not already done
# if not self.stream_flag :
(err_code, err_msg, _) = await pmd.start_streaming("ECG")
(err_code, err_msg, _) = await pmd.start_streaming("ACC")
with open("experiment.log", "a") as xp_fout:
xp_fout.write(
f"ecg & acc stream start at {datetime.fromtimestamp(time.time()).strftime('%A, %B %d, %Y %I:%M:%S')} for {polar_id}\n"
)
self.stream_flag = True
self.stream_ended = False
if err_code != 0:
# logger.error
print(f"PMD returned an error: {err_msg}")
sys.exit(err_code)
# else: # disconnected - try to stop and restart stream
# this task does not need to do anything else; wait until
# user hits enter or the sensor disconnects
await self.quitclient.wait()
# no need to stop notifications if we are exiting the context
# manager anyway, as they will disconnect the client; however,
# it's easy to stop them if we want to
if client.is_connected and self.stop_stream:
await pmd.stop_streaming("ECG")
self.stream_flag = False
# if ADD_READER_SUPPORT:
# loop.remove_reader(sys.stdin)
# signal the consumer task to quit
ecg_queue.put_nowait(("QUIT", None, None, None))
self.stream_ended = True
except:
# logger.warning
self.stream_ended = True
print(f"lost connection for {polar_id}")
[docs]
async def run_consumer_task(
# queue,
polar_id,
ecg_queue,
acc_queue,
stream_ended,
output,
stream_not_started,
trigger_port=5355,
):
"""This task retrieves ECG data from the queue and does
all the processing. You should ensure it returns control before
the next frame is received from the sensor.
In this example, we simply prints decoded ECG data as it
is received"""
print("After connecting, will print ECG data in the form")
print("('ECG', tstamp, [s1,S2,...,sn])")
print("where samples s1,...sn are in microVolt, tstamp is in ns")
print("and it refers to the last sample sn.")
trigger = 0
trigger_ts = 0
trigger_flag = False
class TriggerHandler(socketserver.DatagramRequestHandler):
def handle(self):
# for line terminated massages
# msgRecvd = self.rfile.readline().strip()
print("received message - stopping stream")
# def server_shutdown(server):
# server.shutdown()
# threading.Thread(
# target=server_shutdown, kwargs={"server": self.server}
# ).start()
# with receive buffer - reads max 1024 bytes
# rec_bytes = self.request.recv(1024)
# your processing here
nonlocal trigger
nonlocal trigger_ts
trigger = 1
trigger_ts = time.time()
def trigger_handler(loop=None, serverUDP=None, port=5355):
"""UDP socket listening on port for signal to stop ECG stream."""
# using logger results in an error..
print(f"listening on port {port}")
# with allowing to reuse the address we dont get into problems
# running it consecutively sometimes
socketserver.UDPServer.allow_reuse_address = True
# serverUDP = socketserver.UDPServer(listen_addr, TriggerHandler)
serverUDP.serve_forever()
print("stopping stream")
loop = asyncio.get_running_loop()
listen_addr = ("0.0.0.0", trigger_port)
trigger_server = socketserver.UDPServer(listen_addr, TriggerHandler)
threading.Thread(
target=trigger_handler,
kwargs={"loop": loop, "serverUDP": trigger_server, "port": trigger_port},
daemon=True,
).start()
while True and not stream_ended:
# frame = await queue.get()
ecg_frame = await ecg_queue.get()
acc_frame = await acc_queue.get()
if ecg_frame[0] == "QUIT": # intercept exit signal
# if ecg_frame[0] == "QUIT":
break
# print(len(frame[2]))
# print(frame)
if stream_not_started: # record time of first packet
stream_not_started = False
ecg_output = str(int(time.time())) + "_ecg_" + output
acc_output = str(int(time.time())) + "_acc_" + output
# Check if file already exists, and replace line if polar id is already in it
if os.path.isfile("streaming.csv"):
with open("streaming.csv", "r") as stream_out:
file_in = []
found_polar = False
for line in stream_out.readlines():
if line.startswith(f"{polar_id}"):
found_polar = True
file_in.append(f"{polar_id},{ecg_output}\n")
else:
file_in.append(line)
with open("streaming.csv", "w") as stream_out:
for line in file_in:
stream_out.write(line)
if not found_polar:
stream_out.write(f"{polar_id},{ecg_output}\n")
else:
with open("streaming.csv", "w") as stream_out:
stream_out.write(f"{polar_id},{ecg_output}\n")
with open(ecg_output, "a") as ecg_fout, open(acc_output, "a") as acc_fout:
if trigger and not trigger_flag:
trigger_flag = True
computer_timestamp = time.time()
ecg_fout.write(
",".join(
[str(ecg_frame[1])]
+ [str(computer_timestamp)]
+ [str(trigger)]
# + [str(trigger_ts)]
# + [str(val) for val in frame[2]]
+ [str(val) for val in ecg_frame[2]]
)
+ "\n"
)
acc_fout.write(
",".join(
[str(ecg_frame[1])]
+ [str(computer_timestamp)]
+ [str(trigger)]
# + [str(trigger_ts)]
# + [str(val) for val in frame[2]]
+ [str(val) for val in acc_frame[2]]
)
+ "\n"
)
if trigger and trigger_flag:
trigger = 0
trigger_ts = 0
trigger_flag = False
# def server_shutdown(server):
# server.shutdown()
# threading.Thread(target=server_shutdown, kwargs={"server": self.server}).start()
# Add exiting timestamp
with open("experiment.log", "a") as xp_fout:
xp_fout.write(
f"ecg & acc stream stop at {datetime.fromtimestamp(time.time()).strftime('%A, %B %d, %Y %I:%M:%S')} for {polar_id}\n"
)
trigger_server.shutdown()
[docs]
def read_config(polar_id, config):
"""Read configuration file and output config data.
Expected format for the configuration file is:
POLAR_ID,POLAR_SERIALNUMBER,QUIT_PORT,TRIGGER_PORT,OUTPUT_FILE,DONGLE_MAC
Returns
-------
sn: string
the polar device serial number to which we connect
port: int
the port which is used to stop the stream
trigger_port: int
the port which is used to write a trigger
output: string
the name used for the output file
dongle_mac: string
the mac address of the dongle to use for connection
"""
with open(config, "r") as fin:
for line in fin.readlines():
if line.startswith(polar_id):
_, sn, port, trigger_port, output, dongle_mac = line.strip().split(",")
return sn, int(port), int(trigger_port), output, dongle_mac
[docs]
async def main():
# parse arguments
parser = argparse.ArgumentParser(description="heartSync")
# input output arguments
parser.add_argument(
"-p", "--polar_id", type=str, help="ID of the polar device used"
)
parser.add_argument(
"-c",
"--config",
type=str,
help="csv file containing the config used for this polar id",
)
parser.add_argument(
"-dm",
"--dongle_mapping",
type=str,
help="path to the file"
"containing the mapping from dongle name to its mac address",
)
args = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
# Get configuration
polar_sn, port, trigger_port, output, dongle_mac = read_config(
args.polar_id, args.config
)
if args.dongle_mapping is not None:
with open(args.dongle_mapping, "r") as fin:
# mac2dongle = {}
for line in fin.readlines():
if dongle_mac in line:
dongle_name, _ = line.strip().split(",")
# mac2dongle[daddress] = dname
else:
# mac2dongle = None
dongle_name = None
# define logger
print("Scanning for BLE devices")
# polar_sn = args.serial_number
devices = await BleakScanner.discover(adapter=dongle_name)
ecgprod = EcgProducer(polar_sn, args.polar_id)
polar_devices = await ecgprod.connect_to_sn(devices)
# if len(devices) == 0:
# logger.error("Polar device not found.")
# sys.exit(-4)
# the queue needs to be long enough to cache all the frames, since
# PolarMeasurementData uses put_nowait
# ecgqueues = [asyncio.Queue() for dev in polar_devices] # create 1 queue per device
ecgqueue = asyncio.Queue()
accqueue = asyncio.Queue() #
stream_not_started = True
# while not ecgprod.stop_stream: # try reconnection after connection is lost
producer = ecgprod.run_ble_client(
polar_devices[0],
args.polar_id,
ecgqueue,
accqueue,
port,
dongle_name,
)
consumer = run_consumer_task(
args.polar_id,
ecgqueue,
accqueue,
ecgprod.stream_ended,
output,
stream_not_started,
trigger_port,
)
await asyncio.gather(producer, consumer)
print("Bye.")
if __name__ == "__main__":
# execute the main coroutine
asyncio.run(main())