Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

my problem is, how can i implement threadings to my program, where i have communication BLE with Rpi3.

My program works great, but response is too slow. Please help with this. Thanks.

BMS_reading:

import gatt
import sys
import time
import threading

class AnyDevice(gatt.Device):

    def write(self, characteristic):
        self.response=bytearray()
        self.bms_write_characteristic.write_value(bytes([0xDD,0xA5,0x03,0x00,0xFF,0xFD,0x77]));

    def services_resolved(self):
        super().services_resolved() 
        device_information_service = next(
            s for s in self.services
            if s.uuid == '0000ff00-0000-1000-8000-00805f9b34fb')

        self.bms_read_characteristic = next(
            c for c in device_information_service.characteristics
            if c.uuid == '0000ff01-0000-1000-8000-00805f9b34fb')
            
        self.bms_write_characteristic = next(
            c for c in device_information_service.characteristics
            if c.uuid == '0000ff02-0000-1000-8000-00805f9b34fb')
            
        self.bms_read_characteristic.enable_notifications()
        self.write(self.bms_read_characteristic)

    def characteristic_value_updated(self, characteristic, value):
        self.value=value
        def write(): 
            self.response+=self.value
            if (self.response.endswith(b'w')):
                self.response=self.response[4:]
                self.SoC=int.from_bytes(self.response[19:20], byteorder = 'big')
                self.manager.stop()
        write()

 
#reading loop (I want add threading and read info "SoC")
while True:
    address="A4:C1:38:A0:59:EB"
    manager = gatt.DeviceManager(adapter_name='hci0')
    device = AnyDevice(mac_address=address, manager=manager)
    device.connect()
    manager.run()
    print("Capacity is: "+str(device.SoC)+"%")

TERMINAL <<< Capacity is: 76% 
#long delay which i dont want
<<< Capacity is: 76% 

I dont know how can i make it. when i make thread all while loop, the communication does not have time to react and prints bad numbers or errors.

Please help.

--------------------EDITED--PROGRAM--FOR--NOTIFICATION------UPDATE----------

import gatt
import json
import sys
#from gi.repository import GLib

manager = gatt.DeviceManager(adapter_name='hci0')
class AnyDevice(gatt.Device):
    def connect_succeeded(self):
        super().connect_succeeded()
        print("[%s] P?ipojeno" % (self.mac_address))

    def connect_failed(self, error):
        super().connect_failed(error)
        print("[%s] Connection failed: %s" % (self.mac_address, str(error)))

    def disconnect_succeeded(self):
        super().disconnect_succeeded()
        print("[%s] Disconnected" % (self.mac_address))
        self.manager.stop()

    def services_resolved(self):
        super().services_resolved()

        device_information_service = next(
            s for s in self.services
            if s.uuid == '0000ff00-0000-1000-8000-00805f9b34fb')

        self.bms_read_characteristic = next(
            c for c in device_information_service.characteristics
            if c.uuid == '0000ff01-0000-1000-8000-00805f9b34fb')

        self.bms_write_characteristic = next(
            c for c in device_information_service.characteristics
            if c.uuid == '0000ff02-0000-1000-8000-00805f9b34fb')

        print("BMS found")
        self.bms_read_characteristic.enable_notifications()
        

    def characteristic_enable_notifications_succeeded(self, characteristic):
        super().characteristic_enable_notifications_succeeded(characteristic)
        print("BMS request generic data")
        self.response=bytearray()
        self.bms_write_characteristic.write_value(bytes([0xDD,0xA5,0x03,0x00,0xFF,0xFD,0x77]));

    def characteristic_enable_notifications_failed(self, characteristic, error):
        super.characteristic_enable_notifications_failed(characteristic, error)
        print("BMS notification failed:",error)

    def characteristic_value_updated(self, characteristic, value):
        self.response+=value
        if (self.response.endswith(b'w')):
            self.response=self.response[4:]
            temperature= (int.from_bytes(self.response[23+1*2:1*2+25],'big')-2731)/10
            print("Temperature is: "+str(temperature) + " C")
         

    def characteristic_write_value_failed(self, characteristic, error):
        print("BMS write failed:",error)

device = AnyDevice(mac_address="A4:C1:38:A0:59:EB", manager=manager)
device.connect()
manager.run()

Terminal print, even if the value changes and the manager is running:

>>>BMS found
>>>BMS request generic data
>>>Temperature is: 19 C
#there program get stuck even if value is changing

thank you, I edited the program with notifications and as you can see, it supports it.

But I have a problem here that even if the values ??(temperatures) change and manager in in manager.run (), the terminal will send me only one value and did nothing else even if I heat the device. when I restart the program the value changes again and only one remains. Do I have a code written correctly, please?

Thnak you so much for your time sir.

question from:https://stackoverflow.com/questions/65649203/threading-bluetooth-communication-raspberry-pi-python-3

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
256 views
Welcome To Ask or Share your Answers For Others

1 Answer

My assumption is that you are using the gatt-python library.

The line manager.run() is starting the event loop so you do not need to have a while loop in your code.

If the temperature characteristic supports notifications, then turning them on would be the most efficient way of reading the values when they change.

If the device does not have notifications then creating a timed event to read the temperature at the frequency you require would be recommended. The documentation for for timeout_add_seconds isn't always the easiest to understand, but the import is:

from gi.repository import GLib

Then just before you run the event loop call:

GLib.timeout_add_seconds(2, my_callback_to_read_temperature)

I expect gi.repository to be installed on the RPi already but if you need the instructions for installing, then they are at: https://pygobject.readthedocs.io/en/latest/getting_started.html#ubuntu-getting-started


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...