Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from kivy.clock import Clock
from kivy.config import Config
from kivy.properties import BooleanProperty, StringProperty
from kivy.uix.boxlayout import BoxLayout
from kivy.storage.jsonstore import JsonStore
Config.set('kivy', 'log_level', 'debug')
Config.set('kivy', 'log_enable', '1')
class MainLayout(BoxLayout):
pass
class BLETestApp(App):
ble = BluetoothDispatcher()
state = StringProperty('')
test_string = StringProperty('')
rssi = StringProperty('')
notification_value = StringProperty('')
counter_value = StringProperty('')
increment_count_value = StringProperty('')
incremental_interval = StringProperty('100')
counter_max = StringProperty('128')
counter_value = StringProperty('')
counter_state = StringProperty('')
counter_total_time = StringProperty('')
queue_timeout_enabled = BooleanProperty(True)
queue_timeout = StringProperty('1000')
device_name = StringProperty('KivyBLETest')
device_address = StringProperty('')
def update_string_value(self, characteristic, status):
result = 'ERROR'
if status == GATT_SUCCESS:
value = characteristic.getStringValue(0)
if value == 'test':
result = 'OK'
self.test_string = result
def setaddr(self, a):
global address
address = a
def discover(self):
self.start_scan()
self.state = 'scan'
def on_device(self, device, rssi, advertisement):
if self.state != 'scan':
return
Logger.debug("on_device event {}".format(list(advertisement)))
scoot_found = False
name = ''
for ad in advertisement:
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
if ad.data.startswith(self.identity):
scoot_found = True
else:
break
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
name = str(ad.data)
if scoot_found:
self.state = 'found'
self.ble_device = device
Logger.debug("Ninebot detected: {}".format(name))
self.stop_scan()
def write(self, barray):
self.write_characteristic(self.receive_characteristic, barray) #writes packets to receive characteristic
def on_characteristic_changed(self, transmit_characteristic):
Logger.debug("on_device event {}".format(list(advertisement)))
self.addr = device.getAddress()
if self.addr and address.startswith(self.addr):
print(self.addr)
self.ble_device = device
self.scoot_found = True
self.stop_scan()
else:
for ad in advertisement:
print(ad)
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
if ad.data.startswith(self.identity):
scoot_found = True
else:
break
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
name = str(ad.data)
if scoot_found:
self.state = 'found'
print(self.state)
self.ble_device = device
Logger.debug("Scooter detected: {}".format(name))
self.stop_scan()
def on_device(self, device, rssi, advertisement):
global scoot_found
if self.state != 'scan':
return
Logger.debug("on_device event {}".format(list(advertisement)))
self.addr = device.getAddress()
if self.addr and address.startswith(self.addr):
print(self.addr)
self.ble_device = device
self.scoot_found = True
self.stop_scan()
else:
for ad in advertisement:
print(ad)
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
if ad.data.startswith(self.identity):
scoot_found = True
else:
break
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
name = str(ad.data)
if scoot_found:
self.state = 'found'
print(self.state)
self.ble_device = device
Logger.debug("Scooter detected: {}".format(name))
self.stop_scan()
self.start_scan()
self.state = 'scan'
def on_device(self, device, rssi, advertisement):
if self.state != 'scan':
return
Logger.debug("on_device event {}".format(list(advertisement)))
scoot_found = False
name = ''
for ad in advertisement:
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
if ad.data.startswith(self.identity):
scoot_found = True
else:
break
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
name = str(ad.data)
if scoot_found:
self.state = 'found'
self.ble_device = device
Logger.debug("Ninebot detected: {}".format(name))
self.stop_scan()
def write(self, barray):
self.write_characteristic(self.receive_characteristic, barray) #writes packets to receive characteristic
def on_characteristic_changed(self, transmit_characteristic):
txchar = self.read_characteristic(self.transmit_characteristic) #reads packets from transmit characteristic
self.rx_fifo.write(txchar) #into fifo
_write_chunk_size = 20
class Fifo():
def __init__(self):
self.q = queue.Queue()
def write(self, data): # put bytes
for b in data:
self.q.put(b)
def read(self, size=1, timeout=None): # but read string
res = ''
for i in range(size):
res += chr(self.q.get(True, timeout))
return res
class ScootBT(BluetoothDispatcher):
def __init__(self):
super(ScootBT, self).__init__()
self.rx_fifo = Fifo()
self.ble_device = None
self.state = StringProperty()
self.dump = True
self.tx_characteristic = None
self.rx_characteristic = None
self.timeout = SCAN_TIMEOUT
self.set_queue_timeout(self.timeout)
def __enter__(self):
return self
class Fifo():
def __init__(self):
self.q = Queue.Queue()
def write(self, data): # put bytes
for b in data:
self.q.put(b)
def read(self, size=1, timeout=None): # but read string
res = ''
for i in xrange(size):
res += chr(self.q.get(True, timeout))
return res
class NineBLE(BluetoothDispatcher):
device = client_characteristic = receive_characteristic = transmit_characteristic = None
address = ''
def setaddr(self, a):
global address
address = a
def discover(self):
self.start_scan()
self.state = 'scan'
def on_device(self, device, rssi, advertisement):
if self.state != 'scan':
return
Logger.debug("on_device event {}".format(list(advertisement)))
scoot_found = False
"""Turn the alert on Mi Band device
"""
from kivy.app import App
from kivy.uix.button import Button
from able import BluetoothDispatcher, GATT_SUCCESS
from error_message import install_exception_handler
class BLE(BluetoothDispatcher):
device = alert_characteristic = None
def start_alert(self, *args, **kwargs):
if self.alert_characteristic: # alert service is already discovered
self.alert(self.alert_characteristic)
elif self.device: # device is already founded during the scan
self.connect_gatt(self.device) # reconnect
else:
self.stop_scan() # stop previous scan
self.start_scan() # start a scan for devices
def on_device(self, device, rssi, advertisement):
# some device is found during the scan
name = device.getName()
if name and name.startswith('MI'): # is a Mi Band device
self.device = device
def on_services(self, status, services):
services_dict = Services()
if status == GATT_SUCCESS:
for service in services.toArray():
service_uuid = service.getUuid().toString()
Logger.debug("Service discovered: {}".format(service_uuid))
services_dict[service_uuid] = {}
for c in service.getCharacteristics().toArray():
characteristic_uuid = c.getUuid().toString()
Logger.debug("Characteristic discovered: {}".format(
characteristic_uuid))
services_dict[service_uuid][characteristic_uuid] = c
self.dispatcher.dispatch('on_services', status, services_dict)