micropython: add micropython component

This commit is contained in:
KY-zhang-X
2022-09-29 12:10:37 +08:00
parent 1514f1cb9b
commit dd76146324
2679 changed files with 354110 additions and 0 deletions

View File

@@ -0,0 +1,192 @@
# Test characteristic read/write/notify from both GATTS and GATTC.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_IRQ_GATTS_INDICATE_DONE = const(20)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE", ble.gatts_read(data[-1]))
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_GATTC_READ_DONE:
print("_IRQ_GATTC_READ_DONE", data[-1])
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE", data[-1])
elif event == _IRQ_GATTC_NOTIFY:
print("_IRQ_GATTC_NOTIFY", bytes(data[-1]))
elif event == _IRQ_GATTC_INDICATE:
print("_IRQ_GATTC_INDICATE", bytes(data[-1]))
elif event == _IRQ_GATTS_INDICATE_DONE:
print("_IRQ_GATTS_INDICATE_DONE", data[-1])
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Write initial characteristic value.
ble.gatts_write(char_handle, "periph0")
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# A
# Wait for a write to the characteristic from the central,
# then reply with a notification.
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("gatts_write")
ble.gatts_write(char_handle, "periph1")
print("gatts_notify")
ble.gatts_notify(conn_handle, char_handle)
# B
# Wait for a write to the characteristic from the central,
# then reply with value-included notification.
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("gatts_notify")
ble.gatts_notify(conn_handle, char_handle, "periph2")
# C
# Wait for a write to the characteristic from the central,
# then reply with an indication.
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("gatts_write")
ble.gatts_write(char_handle, "periph3")
print("gatts_indicate")
ble.gatts_indicate(conn_handle, char_handle)
wait_for_event(_IRQ_GATTS_INDICATE_DONE, TIMEOUT_MS)
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Issue read of characteristic, should get initial value.
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Write to the characteristic, which will trigger a notification.
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central0", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# A
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
print("gattc_read") # Read the new value set immediately before notification.
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Write to the characteristic, which will trigger a value-included notification.
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central1", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# B
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
print("gattc_read") # Read value should be unchanged.
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Write to the characteristic, which will trigger an indication.
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central2", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# C
wait_for_event(_IRQ_GATTC_INDICATE, TIMEOUT_MS)
print("gattc_read") # Read the new value set immediately before indication.
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,41 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_GATTS_WRITE b'central0'
gatts_write
gatts_notify
_IRQ_GATTS_WRITE b'central1'
gatts_notify
_IRQ_GATTS_WRITE b'central2'
gatts_write
gatts_indicate
_IRQ_GATTS_INDICATE_DONE 0
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_read
_IRQ_GATTC_READ_RESULT b'periph0'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph1'
gattc_read
_IRQ_GATTC_READ_RESULT b'periph1'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph2'
gattc_read
_IRQ_GATTC_READ_RESULT b'central1'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_INDICATE b'periph3'
gattc_read
_IRQ_GATTC_READ_RESULT b'periph3'
_IRQ_GATTC_READ_DONE 0
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,63 @@
# Test BLE GAP advertising and scanning
from micropython import const
import time, machine, bluetooth
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
ADV_TIME_S = 3
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
multitest.next()
print("gap_advertise(100_000, connectable=False)")
ble.gap_advertise(100_000, b"\x02\x01\x06\x04\xffMPY", connectable=False)
time.sleep(ADV_TIME_S)
print("gap_advertise(20_000, connectable=True)")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY", connectable=True)
time.sleep(ADV_TIME_S)
print("gap_advertise(None)")
ble.gap_advertise(None)
ble.active(0)
def instance1():
multitest.next()
finished = False
adv_types = {}
adv_data = None
def irq(ev, data):
nonlocal finished, adv_types, adv_data
if ev == _IRQ_SCAN_RESULT:
if data[0] == BDADDR[0] and data[1] == BDADDR[1]:
adv_types[data[2]] = True
if adv_data is None:
adv_data = bytes(data[4])
else:
if adv_data != data[4]:
adv_data = b"MISMATCH"
elif ev == _IRQ_SCAN_DONE:
finished = True
try:
ble.config(rxbuf=2000)
except:
pass
ble.irq(irq)
ble.gap_scan(2 * ADV_TIME_S * 1000, 10000, 10000)
while not finished:
machine.idle()
ble.active(0)
print("adv_types:", sorted(adv_types))
print("adv_data:", adv_data)
ble = bluetooth.BLE()
ble.active(1)

View File

@@ -0,0 +1,7 @@
--- instance0 ---
gap_advertise(100_000, connectable=False)
gap_advertise(20_000, connectable=True)
gap_advertise(None)
--- instance1 ---
adv_types: [0, 2]
adv_data: b'\x02\x01\x06\x04\xffMPY'

View File

@@ -0,0 +1,88 @@
# Test BLE GAP connect/disconnect
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 4000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect, then wait for it to disconnect.
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
# Start advertising again.
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
# Wait for central to connect, then disconnect it.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
# Connect to peripheral and then let the peripheral disconnect us.
# Extra scan timeout allows for the peripheral to receive the disconnect
# event and start advertising again.
print("gap_connect")
ble.gap_connect(BDADDR[0], BDADDR[1], 5000)
wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,16 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
gap_disconnect: True
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,121 @@
# Test BLE GAP device name get/set
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
GAP_DEVICE_NAME_UUID = bluetooth.UUID(0x2A00)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == GAP_DEVICE_NAME_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
# Test setting and getting the GAP device name before registering services.
ble.config(gap_name="GAP_NAME")
print(ble.config("gap_name"))
# Create an empty service and start advertising.
ble.gatts_register_services([])
print("gap_advertise")
multitest.next()
try:
# Do multiple iterations to test changing the name.
for iteration in range(2):
# Set the GAP device name and start advertising.
ble.config(gap_name="GAP_NAME{}".format(iteration))
print(ble.config("gap_name"))
ble.gap_advertise(20_000)
# Wait for central to connect, then wait for it to disconnect.
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
wait_for_event(_IRQ_CENTRAL_DISCONNECT, 4 * TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
value_handle = None
for iteration in range(2):
# Wait for peripheral to start advertising.
time.sleep_ms(500)
# Connect to peripheral.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
if iteration == 0:
# Only do characteristic discovery on the first iteration,
# assume value_handle is unchanged on the second.
print("gattc_discover_characteristics")
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Read the peripheral's GAP device name.
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,25 @@
--- instance0 ---
b'GAP_NAME'
gap_advertise
b'GAP_NAME0'
_IRQ_CENTRAL_CONNECT
_IRQ_CENTRAL_DISCONNECT
b'GAP_NAME1'
_IRQ_CENTRAL_CONNECT
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID(0x2a00)
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_read
_IRQ_GATTC_READ_RESULT b'GAP_NAME0'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_read
_IRQ_GATTC_READ_RESULT b'GAP_NAME1'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,133 @@
# Test BLE GAP connect/disconnect with pairing, and read an encrypted characteristic
# TODO: add gap_passkey testing
from micropython import const
import time, machine, bluetooth
if not hasattr(bluetooth.BLE, "gap_pair"):
print("SKIP")
raise SystemExit
TIMEOUT_MS = 4000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_ENCRYPTION_UPDATE = const(28)
_FLAG_READ = const(0x0002)
_FLAG_READ_ENCRYPTED = const(0x0200)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (CHAR_UUID, _FLAG_READ | _FLAG_READ_ENCRYPTED)
SERVICE = (SERVICE_UUID, (CHAR,))
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_READ_REQUEST:
print("_IRQ_GATTS_READ_REQUEST")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_ENCRYPTION_UPDATE:
print("_IRQ_ENCRYPTION_UPDATE", data[1], data[2], data[3])
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services((SERVICE,))
ble.gatts_write(char_handle, "encrypted")
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect.
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Wait for pairing event.
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)
# Wait for GATTS read request.
wait_for_event(_IRQ_GATTS_READ_REQUEST, TIMEOUT_MS)
# Wait for central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics (before pairing, doesn't need to be encrypted).
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Pair with the peripheral.
print("gap_pair")
ble.gap_pair(conn_handle)
# Wait for the pairing event.
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)
# Read the peripheral's characteristic, should be encrypted.
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Disconnect from the peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.config(mitm=True, le_secure=True, bond=False)
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,17 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_ENCRYPTION_UPDATE 1 0 0
_IRQ_GATTS_READ_REQUEST
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gap_pair
_IRQ_ENCRYPTION_UPDATE 1 0 0
gattc_read
_IRQ_GATTC_READ_RESULT b'encrypted'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,138 @@
# Test BLE GAP connect/disconnect with pairing and bonding, and read an encrypted
# characteristic
# TODO: reconnect after bonding to test that the secrets persist
from micropython import const
import time, machine, bluetooth
if not hasattr(bluetooth.BLE, "gap_pair"):
print("SKIP")
raise SystemExit
TIMEOUT_MS = 4000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_SET_SECRET = const(30)
_FLAG_READ = const(0x0002)
_FLAG_READ_ENCRYPTED = const(0x0200)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (CHAR_UUID, _FLAG_READ | _FLAG_READ_ENCRYPTED)
SERVICE = (SERVICE_UUID, (CHAR,))
waiting_events = {}
secrets = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_READ_REQUEST:
print("_IRQ_GATTS_READ_REQUEST")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_ENCRYPTION_UPDATE:
print("_IRQ_ENCRYPTION_UPDATE", data[1], data[2], data[3])
elif event == _IRQ_SET_SECRET:
return True
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services((SERVICE,))
ble.gatts_write(char_handle, "encrypted")
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect.
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Wait for pairing event.
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)
# Wait for GATTS read request.
wait_for_event(_IRQ_GATTS_READ_REQUEST, TIMEOUT_MS)
# Wait for central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics (before pairing, doesn't need to be encrypted).
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Pair with the peripheral.
print("gap_pair")
ble.gap_pair(conn_handle)
# Wait for the pairing event.
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)
# Read the peripheral's characteristic, should be encrypted.
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# Disconnect from the peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.config(mitm=True, le_secure=True, bond=True)
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,17 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_ENCRYPTION_UPDATE 1 0 1
_IRQ_GATTS_READ_REQUEST
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gap_pair
_IRQ_ENCRYPTION_UPDATE 1 0 1
gattc_read
_IRQ_GATTC_READ_RESULT b'encrypted'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,160 @@
# Test GATTC/S data transfer between peripheral and central, and use of gatts_set_buffer()
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
SERVICE_UUID = bluetooth.UUID("00000001-1111-2222-3333-444444444444")
CHAR_CTRL_UUID = bluetooth.UUID("00000002-1111-2222-3333-444444444444")
CHAR_RX_UUID = bluetooth.UUID("00000003-1111-2222-3333-444444444444")
CHAR_TX_UUID = bluetooth.UUID("00000004-1111-2222-3333-444444444444")
CHAR_CTRL = (CHAR_CTRL_UUID, bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY)
CHAR_RX = (CHAR_RX_UUID, bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE)
CHAR_TX = (CHAR_TX_UUID, bluetooth.FLAG_NOTIFY)
SERVICE = (SERVICE_UUID, (CHAR_CTRL, CHAR_RX, CHAR_TX))
SERVICES = (SERVICE,)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE")
waiting_events[(event, data[1])] = None
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_CTRL_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[(event, CHAR_CTRL_UUID)] = data[2]
elif data[-1] == CHAR_RX_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[(event, CHAR_RX_UUID)] = data[2]
elif data[-1] == CHAR_TX_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[(event, CHAR_TX_UUID)] = data[2]
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", data[-1])
elif event == _IRQ_GATTC_READ_DONE:
print("_IRQ_GATTC_READ_DONE", data[-1])
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE", data[-1])
elif event == _IRQ_GATTC_NOTIFY:
print("_IRQ_GATTC_NOTIFY", bytes(data[-1]))
waiting_events[(event, data[1])] = None
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_ctrl_handle, char_rx_handle, char_tx_handle),) = ble.gatts_register_services(SERVICES)
# Increase the size of the rx buffer and enable append mode.
ble.gatts_set_buffer(char_rx_handle, 100, True)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Wait for the central to signal that it's done with its part of the test.
wait_for_event((_IRQ_GATTS_WRITE, char_ctrl_handle), 2 * TIMEOUT_MS)
# Read all accumulated data from the central.
print("gatts_read:", ble.gatts_read(char_rx_handle))
# Notify the central a few times.
for i in range(4):
time.sleep_ms(300)
ble.gatts_notify(conn_handle, char_tx_handle, "message{}".format(i))
# Notify the central that we are done with our part of the test.
time.sleep_ms(300)
ble.gatts_notify(conn_handle, char_ctrl_handle, "OK")
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
ctrl_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_CTRL_UUID)]
rx_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_RX_UUID)]
tx_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_TX_UUID)]
# Write to the characteristic a few times, with and without response.
for i in range(4):
print("gattc_write")
ble.gattc_write(conn_handle, rx_value_handle, "central{}".format(i), i & 1)
if i & 1:
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
time.sleep_ms(400)
# Write to say that we are done with our part of the test.
ble.gattc_write(conn_handle, ctrl_value_handle, "OK", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Wait for notification that peripheral is done with its part of the test.
wait_for_event((_IRQ_GATTC_NOTIFY, ctrl_value_handle), TIMEOUT_MS)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,31 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_GATTS_WRITE
_IRQ_GATTS_WRITE
_IRQ_GATTS_WRITE
_IRQ_GATTS_WRITE
_IRQ_GATTS_WRITE
gatts_read: b'central0central1central2central3'
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000002-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000003-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000004-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_write
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'message0'
_IRQ_GATTC_NOTIFY b'message1'
_IRQ_GATTC_NOTIFY b'message2'
_IRQ_GATTC_NOTIFY b'message3'
_IRQ_GATTC_NOTIFY b'OK'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,101 @@
# Test BLE GAP connect/disconnect
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
UUID_A = bluetooth.UUID(0x180D)
UUID_B = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
SERVICE_A = (
UUID_A,
(),
)
SERVICE_B = (
UUID_B,
(),
)
SERVICES = (SERVICE_A, SERVICE_B)
waiting_events = {}
num_service_result = 0
def irq(event, data):
global num_service_result
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_SERVICE_RESULT:
if data[3] == UUID_A or data[3] == UUID_B:
print("_IRQ_GATTC_SERVICE_RESULT", data[3])
num_service_result += 1
elif event == _IRQ_GATTC_SERVICE_DONE:
print("_IRQ_GATTC_SERVICE_DONE")
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
ble.gatts_register_services(SERVICES)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover services.
ble.gattc_discover_services(conn_handle)
wait_for_event(_IRQ_GATTC_SERVICE_DONE, TIMEOUT_MS)
print("discovered:", num_service_result)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,13 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_SERVICE_RESULT UUID(0x180d)
_IRQ_GATTC_SERVICE_RESULT UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
_IRQ_GATTC_SERVICE_DONE
discovered: 2
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,179 @@
# Test L2CAP COC send/recv.
# Sends a sequence of varying-sized payloads from central->peripheral, and
# verifies that the other device sees the same data, then does the same thing
# peripheral->central.
from micropython import const
import time, machine, bluetooth, random
if not hasattr(bluetooth.BLE, "l2cap_connect"):
print("SKIP")
raise SystemExit
TIMEOUT_MS = 1000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
_L2CAP_MTU = const(450)
_L2CAP_PSM = const(22)
_PAYLOAD_LEN = const(_L2CAP_MTU - 50)
_PAYLOAD_LEN_STEP = -23
_NUM_PAYLOADS = const(16)
_RANDOM_SEED = 22
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, addr_type, addr = data
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = conn_handle
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = conn_handle
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_L2CAP_ACCEPT:
conn_handle, cid, psm, our_mtu, peer_mtu = data
print("_IRQ_L2CAP_ACCEPT", psm, our_mtu, peer_mtu)
waiting_events[event] = (conn_handle, cid, psm)
elif event == _IRQ_L2CAP_CONNECT:
conn_handle, cid, psm, our_mtu, peer_mtu = data
print("_IRQ_L2CAP_CONNECT", psm, our_mtu, peer_mtu)
waiting_events[event] = (conn_handle, cid, psm, our_mtu, peer_mtu)
elif event == _IRQ_L2CAP_DISCONNECT:
conn_handle, cid, psm, status = data
print("_IRQ_L2CAP_DISCONNECT", psm, status)
elif event == _IRQ_L2CAP_RECV:
conn_handle, cid = data
elif event == _IRQ_L2CAP_SEND_READY:
conn_handle, cid, status = data
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
def send_data(ble, conn_handle, cid):
buf = bytearray(_PAYLOAD_LEN)
mv = memoryview(buf)
print("l2cap_send", _NUM_PAYLOADS, _PAYLOAD_LEN)
for i in range(_NUM_PAYLOADS):
n = _PAYLOAD_LEN + i * _PAYLOAD_LEN_STEP
for j in range(n):
buf[j] = random.randint(0, 255)
if not ble.l2cap_send(conn_handle, cid, mv[:n]):
wait_for_event(_IRQ_L2CAP_SEND_READY, TIMEOUT_MS)
def recv_data(ble, conn_handle, cid):
buf = bytearray(_PAYLOAD_LEN)
recv_bytes = 0
recv_correct = 0
expected_bytes = (
_PAYLOAD_LEN * _NUM_PAYLOADS + _PAYLOAD_LEN_STEP * _NUM_PAYLOADS * (_NUM_PAYLOADS - 1) // 2
)
print("l2cap_recvinto", expected_bytes)
while recv_bytes < expected_bytes:
wait_for_event(_IRQ_L2CAP_RECV, TIMEOUT_MS)
while True:
n = ble.l2cap_recvinto(conn_handle, cid, buf)
if n == 0:
break
recv_bytes += n
for i in range(n):
if buf[i] == random.randint(0, 255):
recv_correct += 1
return recv_bytes, recv_correct
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
print("l2cap_listen")
ble.l2cap_listen(_L2CAP_PSM, _L2CAP_MTU)
conn_handle, cid, psm = wait_for_event(_IRQ_L2CAP_ACCEPT, TIMEOUT_MS)
conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS)
random.seed(_RANDOM_SEED)
recv_bytes, recv_correct = recv_data(ble, conn_handle, cid)
send_data(ble, conn_handle, cid)
wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS)
print("received", recv_bytes, recv_correct)
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
print("l2cap_connect")
ble.l2cap_connect(conn_handle, _L2CAP_PSM, _L2CAP_MTU)
conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS)
random.seed(_RANDOM_SEED)
send_data(ble, conn_handle, cid)
recv_bytes, recv_correct = recv_data(ble, conn_handle, cid)
# Disconnect channel.
print("l2cap_disconnect")
ble.l2cap_disconnect(conn_handle, cid)
wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS)
print("received", recv_bytes, recv_correct)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,23 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
l2cap_listen
_IRQ_L2CAP_ACCEPT 22 450 450
_IRQ_L2CAP_CONNECT 22 450 450
l2cap_recvinto 3640
l2cap_send 16 400
_IRQ_L2CAP_DISCONNECT 22 0
received 3640 3640
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
l2cap_connect
_IRQ_L2CAP_CONNECT 22 450 450
l2cap_send 16 400
l2cap_recvinto 3640
l2cap_disconnect
_IRQ_L2CAP_DISCONNECT 22 0
received 3640 3640
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,194 @@
# Test MTU exchange (initiated by both central and peripheral) and the effect on
# notify and write size.
# Seven connections are made (four central->peripheral, three peripheral->central).
#
# Test | Requested | Preferred | Result | Notes
# 0 | 300 (C) | 256 (P) | 256 |
# 1 | 300 (C) | 200 (P) | 200 |
# 2 | 300 (C) | 400 (P) | 300 |
# 3 | 300 (C) | 50 (P) | 50 | Shorter than 64 so the notification is truncated.
# 4 | 290 (P) | 256 (C) | 256 |
# 5 | 290 (P) | 190 (C) | 190 |
# 6 | 290 (P) | 350 (C) | 290 |
#
# For each connection a notification is sent by the server (peripheral) and a characteristic
# is written by the client (central) to ensure that the expected size is transmitted.
#
# Note: This currently fails on btstack for two reasons:
# - btstack doesn't truncate writes to the MTU (it fails instead)
# - btstack (in central mode) doesn't handle the peripheral initiating the MTU exchange
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_MTU_EXCHANGED = const(21)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE")
elif event == _IRQ_GATTC_NOTIFY:
print("_IRQ_GATTC_NOTIFY", len(data[-1]), chr(data[-1][0]))
elif event == _IRQ_MTU_EXCHANGED:
print("_IRQ_MTU_EXCHANGED", data[-1])
waiting_events[event] = data[-1]
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
ble.gatts_set_buffer(char_handle, 500, False)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
for i in range(7):
if i == 1:
ble.config(mtu=200)
elif i == 2:
ble.config(mtu=400)
elif i == 3:
ble.config(mtu=50)
elif i >= 4:
ble.config(mtu=290)
else:
# This is the NimBLE default.
ble.config(mtu=256)
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
if i >= 4:
print("gattc_exchange_mtu")
ble.gattc_exchange_mtu(conn_handle)
mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
print("gatts_notify")
ble.gatts_notify(conn_handle, char_handle, str(i) * 64)
# Extra timeout while client does service discovery.
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS * 2)
print("gatts_read")
data = ble.gatts_read(char_handle)
print("characteristic len:", len(data), chr(data[0]))
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
for i in range(7):
if i < 4:
ble.config(mtu=300)
elif i == 5:
ble.config(mtu=190)
elif i == 6:
ble.config(mtu=350)
else:
ble.config(mtu=256)
# Connect to peripheral and then disconnect.
# Extra scan timeout allows for the peripheral to receive the previous disconnect
# event and start advertising again.
print("gap_connect")
ble.gap_connect(BDADDR[0], BDADDR[1], 5000)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
if i < 4:
print("gattc_exchange_mtu")
ble.gattc_exchange_mtu(conn_handle)
mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
print("gattc_discover_characteristics")
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Write 20 more than the MTU to test truncation.
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, chr(ord("a") + i) * (mtu + 20), 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,143 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_MTU_EXCHANGED 256
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 253 a
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_MTU_EXCHANGED 200
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 197 b
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_MTU_EXCHANGED 300
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 297 c
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_MTU_EXCHANGED 50
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 47 d
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 256
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 253 e
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 190
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 187 f
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 290
gatts_notify
_IRQ_GATTS_WRITE
gatts_read
characteristic len: 287 g
_IRQ_CENTRAL_DISCONNECT
gap_advertise
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 256
_IRQ_GATTC_NOTIFY 64 0
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 200
_IRQ_GATTC_NOTIFY 64 1
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 300
_IRQ_GATTC_NOTIFY 64 2
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_exchange_mtu
_IRQ_MTU_EXCHANGED 50
_IRQ_GATTC_NOTIFY 47 3
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_MTU_EXCHANGED 256
_IRQ_GATTC_NOTIFY 64 4
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_MTU_EXCHANGED 190
_IRQ_GATTC_NOTIFY 64 5
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_MTU_EXCHANGED 290
_IRQ_GATTC_NOTIFY 64 6
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_write
_IRQ_GATTC_WRITE_DONE
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,246 @@
# Test for sending notifications to subscribed clients.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_CCCD_UUID = bluetooth.UUID(const(0x2902))
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE", ble.gatts_read(data[-1]))
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
# conn_handle, dsc_handle, uuid = data
if data[-1] == _CCCD_UUID:
print("_IRQ_GATTC_DESCRIPTOR_RESULT", data[-1])
waiting_events[event] = data[1]
else:
return
elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
print("_IRQ_GATTC_DESCRIPTOR_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_GATTC_READ_DONE:
print("_IRQ_GATTC_READ_DONE", data[-1])
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE", data[-1])
elif event == _IRQ_GATTC_NOTIFY:
print("_IRQ_GATTC_NOTIFY", bytes(data[-1]))
elif event == _IRQ_GATTC_INDICATE:
print("_IRQ_GATTC_NOTIFY", bytes(data[-1]))
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") # \x04\x09MPY
multitest.next()
try:
# Write initial characteristic value (will be read by client).
ble.gatts_write(char_handle, "periph0") ###
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS * 10)
# A
# Wait for a write to the characteristic from the central (to synchronise).
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("sync A")
# This should be local-only.
ble.gatts_write(char_handle, "periph1")
time.sleep_ms(100)
# Update local-only, then force notify.
ble.gatts_write(char_handle, "periph2")
ble.gatts_notify(conn_handle, char_handle) ###
time.sleep_ms(100)
# Update local and notify subscribers. No notification should be sent.
ble.gatts_write(char_handle, "periph3", True)
time.sleep_ms(100)
multitest.broadcast("A")
# B
# Synchronise with the client (which should now be subscribed for notify).
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("sync B")
# This should be local-only (send_update=False).
ble.gatts_write(char_handle, "periph4", False)
time.sleep_ms(100)
# This should notify the subscribed client.
ble.gatts_write(char_handle, "periph5", True) ###
time.sleep_ms(100)
multitest.broadcast("B")
# C
# Synchronise with the client (which should now be subscribed for indicate).
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("sync C")
# This should be local-only (send_update=False).
ble.gatts_write(char_handle, "periph6", False)
time.sleep_ms(100)
# This should indicate the subscribed client.
ble.gatts_write(char_handle, "periph7", True) ###
time.sleep_ms(100)
multitest.broadcast("C")
# D
# Synchronise with the client (which should now be unsubscribed).
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("sync D")
# This should be local-only (send_update=False).
ble.gatts_write(char_handle, "periph8", False)
time.sleep_ms(100)
# This should be local-only (no more subscribers).
ble.gatts_write(char_handle, "periph9", True)
time.sleep_ms(100)
# Update local-only, then another force notify.
ble.gatts_write(char_handle, "periph10")
ble.gatts_notify(conn_handle, char_handle) ###
time.sleep_ms(100)
multitest.broadcast("D")
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Discover CCCD.
ble.gattc_discover_descriptors(conn_handle, value_handle, value_handle + 5)
cccd_handle = wait_for_event(_IRQ_GATTC_DESCRIPTOR_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_DESCRIPTOR_DONE, TIMEOUT_MS)
# Issue read of characteristic, should get initial value.
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
# While the four states are active, all incoming notifications
# and indications will be printed by the event handler. We
# should only expect to see certain ones.
# Start unsubscribed.
# Write to the characteristic (triggers A).
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central0", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Wait for A to complete.
multitest.wait("A")
# Subscribe for notify.
ble.gattc_write(conn_handle, cccd_handle, b"\x01\x00", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Write to the characteristic (triggers B).
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central1", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Wait for B to complete.
multitest.wait("B")
# Subscribe for indicate.
ble.gattc_write(conn_handle, cccd_handle, b"\x02\x00", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Write to the characteristic (triggers C).
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central2", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Wait for C to complete.
multitest.wait("C")
# Unsubscribe.
ble.gattc_write(conn_handle, cccd_handle, b"\x00\x00", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Write to the characteristic (triggers D).
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "central3", 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Wait for D to complete.
multitest.wait("D")
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,39 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_GATTS_WRITE b'central0'
sync A
_IRQ_GATTS_WRITE b'central1'
sync B
_IRQ_GATTS_WRITE b'central2'
sync C
_IRQ_GATTS_WRITE b'central3'
sync D
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
_IRQ_GATTC_DESCRIPTOR_RESULT UUID(0x2902)
_IRQ_GATTC_DESCRIPTOR_DONE
gattc_read
_IRQ_GATTC_READ_RESULT b'periph0'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph2'
_IRQ_GATTC_WRITE_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph5'
_IRQ_GATTC_WRITE_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph7'
_IRQ_GATTC_WRITE_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
_IRQ_GATTC_NOTIFY b'periph10'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT

View File

@@ -0,0 +1,151 @@
# Write characteristic from central to peripheral and time data rate.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 2000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_MTU_EXCHANGED = const(21)
# How long to run the test for.
_NUM_NOTIFICATIONS = const(40)
_MTU_SIZE = const(131)
_CHAR_SIZE = const(_MTU_SIZE - 3)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (CHAR_UUID, bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE)
SERVICE = (SERVICE_UUID, (CHAR,))
SERVICES = (SERVICE,)
packet_sequence = 0
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_GATTS_WRITE:
global packet_sequence
conn_handle, attr_handle = data
data = ble.gatts_read(attr_handle)
if not (data[0] == packet_sequence and data[-1] == (256 - packet_sequence) & 0xFF):
print("_IRQ_GATTS_WRITE data invalid:", packet_sequence, data)
elif packet_sequence % 10 == 0:
print("_IRQ_GATTS_WRITE", packet_sequence)
packet_sequence += 1
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_MTU_EXCHANGED:
# ATT MTU exchange complete (either initiated by us or the remote device).
conn_handle, mtu = data
print("_IRQ_MTU_EXCHANGED:", mtu)
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
ble.gatts_set_buffer(char_handle, _CHAR_SIZE)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Wait for central to disconnect us.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, 30000)
print("final packet_sequence:", packet_sequence)
finally:
ble.active(0)
# Acting in central role.
def instance1():
global packet_sequence
((char_handle,),) = ble.gatts_register_services(SERVICES)
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.config(mtu=_MTU_SIZE)
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
ble.gattc_exchange_mtu(conn_handle)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Send data!
data = bytearray(ord("A") + (i % 64) for i in range(_CHAR_SIZE))
for mode in (0, 1):
ticks_start = time.ticks_ms()
for i in range(_NUM_NOTIFICATIONS):
data[0] = packet_sequence
data[-1] = 256 - packet_sequence
if packet_sequence % 10 == 0:
print("gattc_write", packet_sequence)
if mode == 0:
while True:
try:
ble.gattc_write(conn_handle, value_handle, data, mode)
break
except OSError:
pass
else:
ble.gattc_write(conn_handle, value_handle, data, mode)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
packet_sequence += 1
ticks_end = time.ticks_ms()
ticks_total = time.ticks_diff(ticks_end, ticks_start)
print(
"Did {} writes in {} ms. {} ms/write, {} bytes/sec".format(
_NUM_NOTIFICATIONS,
ticks_total,
ticks_total / _NUM_NOTIFICATIONS,
_NUM_NOTIFICATIONS * len(data) * 1000 // ticks_total,
)
)
time.sleep_ms(100)
# DIsconnect the peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, 20000)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,24 @@
--- instance0 ---
gap_advertise
_IRQ_MTU_EXCHANGED: 131
_IRQ_GATTS_WRITE 0
_IRQ_GATTS_WRITE 10
_IRQ_GATTS_WRITE 20
_IRQ_GATTS_WRITE 30
_IRQ_GATTS_WRITE 40
_IRQ_GATTS_WRITE 50
_IRQ_GATTS_WRITE 60
_IRQ_GATTS_WRITE 70
final packet_sequence: 80
--- instance1 ---
gap_connect
_IRQ_MTU_EXCHANGED: 131
gattc_write 0
gattc_write 10
gattc_write 20
gattc_write 30
gattc_write 40
gattc_write 50
gattc_write 60
gattc_write 70
gap_disconnect: True

View File

@@ -0,0 +1,133 @@
# Ping-pong GATT notifications between two devices.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 2000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_NOTIFY = const(18)
# How long to run the test for.
_NUM_NOTIFICATIONS = const(50)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_NOTIFY,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
is_central = False
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_NOTIFY:
if is_central:
conn_handle, value_handle, notify_data = data
ble.gatts_notify(conn_handle, value_handle, b"central" + notify_data)
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Give the central enough time to discover chars.
time.sleep_ms(500)
ticks_start = time.ticks_ms()
for i in range(_NUM_NOTIFICATIONS):
# Send a notification and wait for a response.
ble.gatts_notify(conn_handle, value_handle, "peripheral" + str(i))
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
ticks_end = time.ticks_ms()
ticks_total = time.ticks_diff(ticks_end, ticks_start)
print(
"Acknowledged {} notifications in {} ms. {} ms/notification.".format(
_NUM_NOTIFICATIONS, ticks_total, ticks_total // _NUM_NOTIFICATIONS
)
)
# Disconnect the central.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
global is_central
is_central = True
((char_handle,),) = ble.gatts_register_services(SERVICES)
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# The IRQ handler will respond to each notification.
# Wait for the peripheral to disconnect us.
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, 20000)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,152 @@
# Send L2CAP data as fast as possible and time it.
from micropython import const
import time, machine, bluetooth, random
if not hasattr(bluetooth.BLE, "l2cap_connect"):
print("SKIP")
raise SystemExit
TIMEOUT_MS = 1000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
_L2CAP_PSM = const(22)
_L2CAP_MTU = const(512)
_PAYLOAD_LEN = const(_L2CAP_MTU)
_NUM_PAYLOADS = const(20)
_RANDOM_SEED = 22
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, addr_type, addr = data
waiting_events[event] = conn_handle
elif event == _IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
waiting_events[event] = conn_handle
elif event == _IRQ_L2CAP_ACCEPT:
conn_handle, cid, psm, our_mtu, peer_mtu = data
waiting_events[event] = (conn_handle, cid, psm)
elif event == _IRQ_L2CAP_CONNECT:
conn_handle, cid, psm, our_mtu, peer_mtu = data
waiting_events[event] = (conn_handle, cid, psm, our_mtu, peer_mtu)
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
def send_data(ble, conn_handle, cid):
buf = bytearray(_PAYLOAD_LEN)
for i in range(_NUM_PAYLOADS):
for j in range(_PAYLOAD_LEN):
buf[j] = random.randint(0, 255)
if not ble.l2cap_send(conn_handle, cid, buf):
wait_for_event(_IRQ_L2CAP_SEND_READY, TIMEOUT_MS)
def recv_data(ble, conn_handle, cid):
buf = bytearray(_PAYLOAD_LEN)
recv_bytes = 0
recv_correct = 0
expected_bytes = _PAYLOAD_LEN * _NUM_PAYLOADS
ticks_first_byte = 0
while recv_bytes < expected_bytes:
wait_for_event(_IRQ_L2CAP_RECV, TIMEOUT_MS)
if not ticks_first_byte:
ticks_first_byte = time.ticks_ms()
while True:
n = ble.l2cap_recvinto(conn_handle, cid, buf)
if n == 0:
break
recv_bytes += n
for i in range(n):
if buf[i] == random.randint(0, 255):
recv_correct += 1
ticks_end = time.ticks_ms()
return recv_bytes, recv_correct, time.ticks_diff(ticks_end, ticks_first_byte)
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
ble.l2cap_listen(_L2CAP_PSM, _L2CAP_MTU)
conn_handle, cid, psm = wait_for_event(_IRQ_L2CAP_ACCEPT, TIMEOUT_MS)
conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS)
random.seed(_RANDOM_SEED)
send_data(ble, conn_handle, cid)
wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS)
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
# Connect to peripheral and then disconnect.
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
ble.l2cap_connect(conn_handle, _L2CAP_PSM, _L2CAP_MTU)
conn_handle, cid, psm, our_mtu, peer_mtu = wait_for_event(_IRQ_L2CAP_CONNECT, TIMEOUT_MS)
random.seed(_RANDOM_SEED)
recv_bytes, recv_correct, total_ticks = recv_data(ble, conn_handle, cid)
# Disconnect channel.
ble.l2cap_disconnect(conn_handle, cid)
wait_for_event(_IRQ_L2CAP_DISCONNECT, TIMEOUT_MS)
print(
"Received {}/{} bytes in {} ms. {} B/s".format(
recv_bytes, recv_correct, total_ticks, recv_bytes * 1000 // total_ticks
)
)
# Disconnect from peripheral.
ble.gap_disconnect(conn_handle)
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@@ -0,0 +1,188 @@
# Test concurrency between filesystem access and BLE host. This is
# particularly relevant on STM32WB where the second core is stalled while
# flash operations are in progress.
from micropython import const
import time, machine, bluetooth, os
TIMEOUT_MS = 10000
LOG_PATH_INSTANCE0 = "stress_log_filesystem_0.log"
LOG_PATH_INSTANCE1 = "stress_log_filesystem_1.log"
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
waiting_events = {}
log_file = None
def write_log(*args):
if log_file:
print(*args, file=log_file)
log_file.flush()
last_file_write = 0
def periodic_log_write():
global last_file_write
t = time.ticks_ms()
if time.ticks_diff(t, last_file_write) > 50:
write_log("tick")
last_file_write = t
def irq(event, data):
write_log("event", event)
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_SERVICE_RESULT:
# conn_handle, start_handle, end_handle, uuid = data
if data[-1] == SERVICE_UUID:
print("_IRQ_GATTC_SERVICE_RESULT", data[3])
waiting_events[event] = (data[1], data[2])
else:
return
elif event == _IRQ_GATTC_SERVICE_DONE:
print("_IRQ_GATTC_SERVICE_DONE")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_GATTC_READ_DONE:
print("_IRQ_GATTC_READ_DONE", data[-1])
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE", data[-1])
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE")
elif event == _IRQ_GATTS_READ_REQUEST:
print("_IRQ_GATTS_READ_REQUEST")
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
periodic_log_write()
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
global log_file
log_file = open(LOG_PATH_INSTANCE0, "w")
write_log("start")
ble.active(1)
ble.irq(irq)
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
multitest.next()
try:
for repeat in range(2):
print("gap_advertise")
ble.gap_advertise(50_000, b"\x02\x01\x06\x04\xffMPY")
# Wait for central to connect, do a sequence of read/write, then disconnect.
wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
for op in range(4):
wait_for_event(_IRQ_GATTS_READ_REQUEST, TIMEOUT_MS)
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
wait_for_event(_IRQ_CENTRAL_DISCONNECT, 2 * TIMEOUT_MS)
finally:
ble.active(0)
log_file.close()
os.unlink(LOG_PATH_INSTANCE0)
# Acting in central role.
def instance1():
global log_file
log_file = open(LOG_PATH_INSTANCE1, "w")
write_log("start")
ble.active(1)
ble.irq(irq)
multitest.next()
try:
for repeat in range(2):
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.gap_connect(BDADDR[0], BDADDR[1], 5000)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover services.
print("gattc_discover_services")
ble.gattc_discover_services(conn_handle)
start_handle, end_handle = wait_for_event(_IRQ_GATTC_SERVICE_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_SERVICE_DONE, TIMEOUT_MS)
# Discover characteristics.
print("gattc_discover_characteristics")
ble.gattc_discover_characteristics(conn_handle, start_handle, end_handle)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
for op in range(4):
print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_READ_DONE, TIMEOUT_MS)
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, "{}".format(op), 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
# Disconnect.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, 2 * TIMEOUT_MS)
finally:
ble.active(0)
log_file.close()
os.unlink(LOG_PATH_INSTANCE1)
ble = bluetooth.BLE()

View File

@@ -0,0 +1,84 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_CENTRAL_DISCONNECT
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_GATTS_READ_REQUEST
_IRQ_GATTS_WRITE
_IRQ_CENTRAL_DISCONNECT
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_discover_services
_IRQ_GATTC_SERVICE_RESULT UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
_IRQ_GATTC_SERVICE_DONE
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_read
_IRQ_GATTC_READ_RESULT b''
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'0'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'1'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'2'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_connect
_IRQ_PERIPHERAL_CONNECT
gattc_discover_services
_IRQ_GATTC_SERVICE_RESULT UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
_IRQ_GATTC_SERVICE_DONE
gattc_discover_characteristics
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gattc_read
_IRQ_GATTC_READ_RESULT b'3'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'0'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'1'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gattc_read
_IRQ_GATTC_READ_RESULT b'2'
_IRQ_GATTC_READ_DONE 0
gattc_write
_IRQ_GATTC_WRITE_DONE 0
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT