Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,27 @@ __NOTE:__ On macOS only Data Format 5 works as macOS doesn't advertise MAC-addre

__NOTE:__ On Windows Bleson requires _Python 3.6_. Unfortunately on Windows, Bleson doesn't send any payload for advertised package, so it is still unusable.

## Bluegiga

Use Bluegiga's BGAPI, which is compatible with USB adapters like the BLED112. Bluegiga should work with Linux, macOS and Windows.

Requires pygatt and pexpect, which are not installed automatically with `ruuvitag_sensor` package. You can install those manually e.g. via pip.

```sh
$ pip install pygatt pexpect
```

Add environment variable `RUUVI_BLE_ADAPTER` with value `Bluegiga`. E.g.

```sh
$ export RUUVI_BLE_ADAPTER="Bluegiga"
```
By default, Pygatt will automatically detect USB adapters serial port, but if you have multiple Bluegiga adapters installed or pygatt can not find correct serial port automatically, serial port can be passed with `bt_device` parameter.

```sh
bt_device='/dev/ttyACM0'
```

## Examples

Examples are in [examples](https://github.com/ttu/ruuvitag-sensor/tree/master/examples) directory, e.g.
Expand Down
122 changes: 122 additions & 0 deletions ruuvitag_sensor/adapters/bluegiga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging
import os
import sys
import time
import pygatt
import binascii
import threading
import os

from multiprocessing import Manager
from queue import Queue

from ruuvitag_sensor.adapters import BleCommunication

log = logging.getLogger(__name__)

class BleCommunicationBluegiga(BleCommunication):
"""Bluetooth LE communication for Bluegiga"""

@staticmethod
def get_data(mac, bt_device=''):
if not bt_device:
adapter = pygatt.BGAPIBackend()
else:
adapter = pygatt.BGAPIBackend(bt_device)

def scan_received(devices, addr, packet_type):
if mac and mac == addr:
log.debug('Received data from device: %s %s', addr, packet_type)
return True # stop scan

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
adapter.start(reset=reset)
log.debug('Start receiving broadcasts (device %s)', bt_device)
try:
devices = adapter.scan(timeout=60, scan_interval=1, scan_window=100, active=False, scan_cb=scan_received)
for dev in devices:
if mac and mac == dev['address']:
log.debug('Result found for device %s', mac )
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data']
hexa = binascii.hexlify(rawdata).decode("ascii").upper()
log.debug('Data found: %s', hexa)
return hexa
finally:
adapter.stop()

@staticmethod
def get_datas(blacklist=[], bt_device=''):
m = Manager()
q = m.Queue()

# Use Manager dict to share data between processes
shared_data = m.dict()
shared_data['blacklist'] = blacklist
shared_data['stop'] = False

# Start background process
scanner = threading.Thread(
target=BleCommunicationBluegiga._run_get_data_background,
args=[q, shared_data, bt_device])
scanner.start()

try:
while True:
while not q.empty():
data = q.get()
log.debug('Found data: %s', data)
yield data
time.sleep(0.1)
except GeneratorExit:
pass
except KeyboardInterrupt as ex:
pass
except Exception as ex:
log.info(ex)

log.debug('Stop')
shared_data['stop'] = True
scanner.join()
log.debug('Exit')
return

@staticmethod
def _run_get_data_background(queue, shared_data, bt_device):
"""
Attributes:
device (string): BLE device (default auto)
"""

if bt_device:
adapter = pygatt.BGAPIBackend(bt_device)
else:
adapter = pygatt.BGAPIBackend()

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
adapter.start(reset=reset)
try:
while True:
try:
if shared_data['stop']:
break
devices = adapter.scan(timeout=0.5, scan_interval=1, scan_window=100, active=False, )
for dev in devices:
log.debug('received: %s', dev)
mac = str(dev['address'])
if mac and mac in shared_data['blacklist']:
log.debug('MAC blacklisted: %s', mac)
continue
try:
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data']
log.debug('Received manufacturer data from %s: %s', mac, rawdata)
hexa = binascii.hexlify(rawdata).decode("ascii").upper()
queue.put((mac, hexa))
except KeyError:
pass
except GeneratorExit:
return
except KeyboardInterrupt as ex:
return
finally:
log.debug('Stop scan')
adapter.stop()
11 changes: 11 additions & 0 deletions ruuvitag_sensor/ruuvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ def get_raw_bleson(raw, data_format):
DataFormats._parse_raw = get_raw_bleson # pylint: disable=protected-access
from ruuvitag_sensor.adapters.bleson import BleCommunicationBleson
ble = BleCommunicationBleson()
elif os.environ.get('RUUVI_BLE_ADAPTER') == 'Bluegiga':
def get_raw_bluegiga(raw, data_format):
# Bluegiga drops FF from the raw data and it is required for DF 3 and 5
# TODO: Move convert_data to adaptor specific code
if data_format in (2, 4):
return raw
return 'FF' + raw

DataFormats._parse_raw = get_raw_bluegiga # pylint: disable=protected-access
from ruuvitag_sensor.adapters.bluegiga import BleCommunicationBluegiga
ble = BleCommunicationBluegiga()
elif not sys.platform.startswith('linux') or os.environ.get('RUUVI_ENV') == 'CI':
# Use BleCommunicationDummy also for CI as it can't use bluez
from ruuvitag_sensor.adapters.dummy import BleCommunicationDummy
Expand Down