import asyncio
import os
from bleak import BleakClient, BleakScanner, BleakError
from typing import Final
[docs]
class FrameBle:
"""
Class for managing a connection and transferring data to and
from the Brilliant Labs Frame device over Bluetooth LE using the Bleak library.
"""
_SERVICE_UUID = "7a230001-5475-a6a4-654c-8431f6ad49c4"
_TX_CHARACTERISTIC_UUID = "7a230002-5475-a6a4-654c-8431f6ad49c4"
_RX_CHARACTERISTIC_UUID = "7a230003-5475-a6a4-654c-8431f6ad49c4"
def __init__(self):
self._awaiting_print_response = False
self._awaiting_data_response = False
self._client = None
self._print_response = asyncio.Queue()
self._data_response = asyncio.Queue()
self._tx_characteristic = None
self._rx_characteristic = None
self._user_data_response_handler = lambda: None
self._user_disconnect_handler = lambda: None
self._user_print_response_handler = lambda: None
def _disconnect_handler(self, _):
self._user_disconnect_handler()
self.__init__()
async def _notification_handler(self, _, data):
if data[0] == 1:
# Use memoryview to avoid copying
data_view = memoryview(data)[1:]
if self._awaiting_data_response:
self._awaiting_data_response = False
await self._data_response.put(data_view)
# call the user response handler on all incoming notifications
if self._user_data_response_handler is not None:
if asyncio.iscoroutinefunction(self._user_data_response_handler):
await self._user_data_response_handler(data_view)
else:
self._user_data_response_handler(data_view)
else:
# decode the string data only once
decoded = data.decode()
if self._awaiting_print_response:
self._awaiting_print_response = False
await self._print_response.put(decoded)
# call the user response handler on all incoming notifications
if self._user_print_response_handler is not None:
if asyncio.iscoroutinefunction(self._user_print_response_handler):
await self._user_print_response_handler(decoded)
else:
self._user_print_response_handler(decoded)
[docs]
async def connect(
self,
name=None,
timeout=10,
print_response_handler=lambda _: None,
data_response_handler=lambda _: None,
disconnect_handler=lambda: None,
):
"""
Connects to the first Frame device discovered,
optionally matching a specified name e.g. "Frame AB",
or throws an Exception if a matching Frame is not found within timeout seconds.
`name` can optionally be provided as the local name containing the
2 digit ID shown on Frame, in order to only connect to that specific device.
The value should be a string, for example `"Frame 4F"`
`print_response_handler` and `data_response_handler` can be provided and
will be called whenever data arrives from the device asynchronously.
`disconnect_handler` can be provided to be called to run
upon a disconnect.
"""
self._user_disconnect_handler = disconnect_handler
self._user_print_response_handler = print_response_handler
self._user_data_response_handler = data_response_handler
# Create a scanner with a filter for our service UUID and optional name
device = await BleakScanner.find_device_by_filter(
lambda d, _: d.name is not None and (name is None or d.name == name),
timeout=timeout,
service_uuids=[self._SERVICE_UUID]
)
if not device:
raise Exception("No matching Frame device found")
self._client = BleakClient(
device,
disconnected_callback=self._disconnect_handler,
winrt=dict(use_cached_services=False)
)
try:
await self._client.connect()
# Workaround to acquire MTU size because Bleak doesn't do it automatically when using BlueZ backend
if self._client._backend.__class__.__name__ == "BleakClientBlueZDBus":
await self._client._backend._acquire_mtu()
except BleakError as ble_error:
raise Exception(f"Error connecting: {ble_error}")
service = self._client.services.get_service(
self._SERVICE_UUID,
)
self._tx_characteristic = service.get_characteristic(
self._TX_CHARACTERISTIC_UUID,
)
self._rx_characteristic = service.get_characteristic(
self._RX_CHARACTERISTIC_UUID,
)
try:
await self._client.start_notify(
self._RX_CHARACTERISTIC_UUID,
self._notification_handler,
)
except Exception as ble_error:
raise Exception(f"Error subscribing for notifications: {ble_error}")
return device.address
[docs]
async def disconnect(self):
"""
Disconnects from the device.
"""
if (self._client is not None):
await self._client.disconnect()
self._disconnect_handler(None)
[docs]
def is_connected(self):
"""
Returns `True` if the device is connected. `False` otherwise.
"""
try:
return (self._client is not None) and self._client.is_connected
except AttributeError:
return False
[docs]
def max_lua_payload(self):
"""
Returns the maximum length of a Lua string which may be transmitted.
"""
try:
return self._client.mtu_size - 3
except AttributeError:
return 0
[docs]
def max_data_payload(self):
"""
Returns the maximum length of a raw bytearray which may be transmitted.
"""
try:
return self._client.mtu_size - 4
except AttributeError:
return 0
async def _transmit(self, data, show_me=False):
if show_me:
print(data) # TODO make this print nicer
if len(data) > self._client.mtu_size - 3:
raise Exception("payload length is too large")
await self._client.write_gatt_char(self._tx_characteristic, data, response=True)
[docs]
async def send_lua(self, string: str, show_me=False, await_print=False):
"""
Sends a Lua string to the device. The string length must be less than or
equal to `max_lua_payload()`.
If `await_print=True`, the function will block until a Lua print()
occurs, or a timeout.
If `show_me=True`, the exact bytes send to the device will be printed.
"""
# set the awaiting status before we transmit
self._awaiting_print_response = await_print
await self._transmit(string.encode(), show_me=show_me)
if await_print:
try:
return await asyncio.wait_for(self._print_response.get(), timeout=5)
except asyncio.TimeoutError:
raise Exception("device didn't respond")
[docs]
async def send_data(self, data: bytearray, show_me=False, await_data=False):
"""
Sends raw data to the device. The payload length must be less than or
equal to `max_data_payload()`.
If `await_data=True`, the function will block until a data response
occurs, or a timeout.
If `show_me=True`, the exact bytes send to the device will be printed.
"""
# set the awaiting status before we transmit
self._awaiting_data_response = await_data
await self._transmit(bytearray(b"\x01") + data, show_me=show_me)
if await_data:
try:
return await asyncio.wait_for(self._data_response.get(), timeout=5)
except asyncio.TimeoutError:
raise Exception("device didn't respond")
[docs]
async def send_reset_signal(self, show_me=False):
"""
Sends a reset signal to the device which will reset the Lua virtual
machine.
If `show_me=True`, the exact bytes send to the device will be printed.
"""
await self._transmit(bytearray(b"\x04"), show_me=show_me)
# need to give it a moment after the Lua VM reset before it can handle any requests
await asyncio.sleep(0.2)
[docs]
async def send_break_signal(self, show_me=False):
"""
Sends a break signal to the device which will break any currently
executing Lua script.
If `show_me=True`, the exact bytes send to the device will be printed.
"""
await self._transmit(bytearray(b"\x03"), show_me=show_me)
# need to give it a moment after the break before it can handle any requests
await asyncio.sleep(0.2)
[docs]
async def upload_file_from_string(self, content: str, frame_file_path="main.lua"):
"""
Uploads a string as frame_file_path. If the file exists, it will be overwritten.
Args:
content (str): The string content to upload
frame_file_path (str): Target file path on the frame
"""
# Escape special characters
content = (content.replace("\r", "")
.replace("\\", "\\\\")
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("'", "\\'")
.replace('"', '\\"'))
# Open the file on the frame
await self.send_lua(
f"f=frame.file.open('{frame_file_path}','w');print(1)",
await_print=True
)
# Calculate chunk size accounting for the Lua command overhead
chunk_size: int = self.max_lua_payload() - 22
# Upload in chunks
i = 0
while i < len(content):
# Calculate initial chunk size
current_chunk_size = min(chunk_size, len(content) - i)
# Check for escape sequences at the chunk boundary
if current_chunk_size < len(content) - i:
# Look for the last non-escaped backslash in the chunk
pos = i + current_chunk_size - 1
while pos > i:
# Check if we're in the middle of an escape sequence
if content[pos] == '\\' and (pos == i or content[pos-1] != '\\'):
# If we find an unescaped backslash, adjust the chunk size
current_chunk_size = pos - i
break
pos -= 1
chunk: str = content[i:i + current_chunk_size]
await self.send_lua(f'f:write("{chunk}");print(1)', await_print=True)
i += current_chunk_size
# Close the file
await self.send_lua("f:close();print(nil)", await_print=True)
[docs]
async def upload_file(self, local_file_path: str, frame_file_path="main.lua"):
"""
Uploads a local file to the frame. If the target file exists, it will be overwritten.
Args:
local_file_path (str): Path to the local file to upload. Must exist.
frame_file_path (str): Target file path on the frame
Raises:
FileNotFoundError: If local_file_path doesn't exist
"""
if not os.path.exists(local_file_path):
raise FileNotFoundError(f"Local file not found: {local_file_path}")
with open(local_file_path, "r") as f:
content = f.read()
await self.upload_file_from_string(content, frame_file_path)
[docs]
async def send_message(self, msg_code: int, payload: bytes, show_me: bool=False) -> None:
"""
Send a large payload in chunks determined by BLE MTU size.
Args:
msg_code (int): Message type identifier (0-255)
payload (bytes): Data to be sent
show_me (bool): If True, the exact bytes send to the device will be printed
Raises:
ValueError: If msg_code is not in range 0-255 or payload size exceeds 65535
Note:
First packet format: [msg_code(1), size_high(1), size_low(1), data(...)]
Other packets format: [msg_code(1), data(...)]
"""
# Constants
HEADER_SIZE: Final = 3 # msg_code + 2 bytes size
SUBSEQUENT_HEADER_SIZE: Final = 1 # just msg_code
MAX_TOTAL_SIZE: Final = 65535 # 2^16 - 1, maximum size that fits in 2 bytes
# Validation
if not 0 <= msg_code <= 255:
raise ValueError(f"Message code must be 0-255, got {msg_code}")
total_size = len(payload)
if total_size > MAX_TOTAL_SIZE:
raise ValueError(f"Payload size {total_size} exceeds maximum {MAX_TOTAL_SIZE} bytes")
# Calculate maximum chunk sizes
max_first_chunk = self.max_data_payload() - HEADER_SIZE
max_chunk_size = self.max_data_payload() - SUBSEQUENT_HEADER_SIZE
# Pre-allocate buffer for maximum sized packets
buffer = bytearray(self.max_data_payload())
# Send first chunk
first_chunk_size = min(max_first_chunk, total_size)
buffer[0] = msg_code
buffer[1] = total_size >> 8
buffer[2] = total_size & 0xFF
buffer[HEADER_SIZE:HEADER_SIZE + first_chunk_size] = payload[:first_chunk_size]
await self.send_data(memoryview(buffer)[:HEADER_SIZE + first_chunk_size], show_me=show_me, await_data=True)
sent_bytes = first_chunk_size
# Send remaining chunks
if sent_bytes < total_size:
# Set message code in the reusable buffer
buffer[0] = msg_code
while sent_bytes < total_size:
remaining = total_size - sent_bytes
chunk_size = min(max_chunk_size, remaining)
# Copy next chunk into the pre-allocated buffer
buffer[SUBSEQUENT_HEADER_SIZE:SUBSEQUENT_HEADER_SIZE + chunk_size] = \
payload[sent_bytes:sent_bytes + chunk_size]
# Send only the used portion of the buffer
await self.send_data(memoryview(buffer)[:SUBSEQUENT_HEADER_SIZE + chunk_size], show_me=show_me, await_data=True)
sent_bytes += chunk_size