golang

Как я реверс-инженерил китайский блютуз контроллер

  • суббота, 18 ноября 2023 г. в 00:00:17
https://habr.com/ru/articles/774588/

С чего все началось

Мы с друзьями купили лед ленту с управлением по блютузу и повесили её в зале. В приложении была прикольная функция цветомузыки, но она была недостаточно хороша. Так что я решил реализовать её самому.

Шаги к реализации

  • Научиться отправлять свои данные на блютуз контроллер

  • Анализировать аудиопоток, находит резкие изменения в бите.

  • Скрестить два скрипта

  • Написать обертку для удобной работы

Как понять что отправлять на блютуз контроллер?

У нас есть рабочее приложение для контроля лед ленты через блютуз. На каждую кнопку в интерфейсе отправляется некий запрос. Нам надо "перехватить" его и расшифровать.

Скриншот из MohuanLED
Скриншот из MohuanLED

Получаем информацию о устройстве

Чтобы разобраться в логах - нам надо вытащить мак адрес устройства.
Идем в настройки блютуза, ищем лед ленту и копируем мак адрес:

Пора доставать логи!
В инструментах разработчика находим пункт "Enable Bluetooth HCI snoop log"

Перезагружаем блютуз, теперь все блютуз запросы будут логироваться.

Получение логов

К сожалению вы не можете просто достать напрямую файл с логами, это придётся делать окольными путями.

Подключите телефон к компьютеру, поставьте adb на пк и включите отладку по юсб в настройках подключения.

Теперь надо сгенерировать «багрепорт» — это зип архив с логами системы. Команда для генерации: adb bugreport btsnoop_hci.log.

Теперь на устройстве хранится зип архив с данными о логах, выкачиваем его по имени которое появилось в выводе:

adb pull /data/user_de/0/com.android.shell/files/bugreports/bugreport-flame-TP1A.221005.002.B2-2023-11-02-19-00-13.zip

В домашней папке появился зип архив. По пути /FS/data/misc/bluetooth/logs/ внутри архива будут логи. Открываем их через WireShark(https://www.wireshark.org/) и начинаем анализ.

Что и как анализировать?

Для начала надо отсеять ненужные логи, вводим
bluetooth.addr==<MAC адресс вашего устройства>

Теперь у нас есть все данные которые мы отправляли на контроллер.

В моем случае все было достаточно просто. Основание запроса было всегда одинаковым - 6996060101, а после - шел некоторый код, очень сильно напоминавший 16-ричное представление цвета. То есть 69960501ffffffff - включит белый цвет, а 69960501fff000 - желтый.

Питон скрипт

Попробуем подключиться к контроллеру через питон.

Я написал простенький скрипт:

import sounddevice as sd
import numpy as np
from bluepy import btle
import sys

class LEDController(btle.Peripheral):
    def __init__(self, device_addr):
        print("Connecting to:", device_addr)
        super().__init__(device_addr)
        self.characteristic = self.getCharacteristics(uuid='0000ee01-0000-1000-8000-00805f9b34fb')[0]
    
    def send_command(self, command):
        self.characteristic.write(bytes.fromhex(command), withResponse=True)

def list_devices():
    scanner = btle.Scanner()
    devices = scanner.scan(5.0)
    device_dict = {}
    for i, device in enumerate(devices):
        print(f"{i}. {device.addr} ({device.getValueText(9)})")
        device_dict[i] = device
    return device_dict


def main():
    devices = list_devices()
    device_list = list(devices.values())  # Convert values to a list
    device_choice = int(input("Choose a device (number): "))
    device_addr = device_list[device_choice].addr
    print(f"Connecting to {device_addr}...")
    device = LEDController(device_addr)
    
    color_code = input("Enter the 16-hex digit color code: ")

    try:
        device.send_command(f'6996060101{color_code}')
    finally:
        device.disconnect()  # Отсоединение устройства при завершении программы


if __name__ == "__main__":
    main()

Он выводит список блютуз устройств и пытается подключиться к выбранному и , а затем - пробует отправить запрос со сменой цвета на выбранный пользователем.

Анализ аудиопотока

Для анализа аудио можно использовать библиотеку sounddevice для питона.

Напишем простой скрипт, который будет читать аудиопоток со стандартного вывода и выводить ON, в пиковых моментах, а OFF в моментах со средней амплитудой звука:

import sounddevice as sd
import numpy as np
import json
previous_volume = None
minVol = 0
maxVol = 0
count_of_zeros = 0
def callback(indata, frames, time, status):
    global minVol
    global maxVol
    global previous_volume
    global count_of_zeros
    if status:
        print(status, file=sys.stderr)
    volume_norm = np.linalg.norm(indata) * 10
    
    if volume_norm == 0:
        return
    if previous_volume is not None and volume_norm < previous_volume / 7:
        count_of_zeros += 1
        if(count_of_zeros > 20):
            minVol = 0
            maxVol = 0
            count_of_zeros = 0
            previous_volume = None
        return
    if maxVol < volume_norm:
        maxVol = volume_norm
    minVol = maxVol - 45
    if minVol < 0:
        minVol = 0
    if volume_norm*10 < minVol:
        minVol = volume_norm*10
        maxVol = minVol + 45
    if volume_norm < minVol:
        print("OFF", volume_norm)
        return
    string_to_print = "ON " * 20
    if count_of_zeros > 0:
        count_of_zeros -= 1  
    
    previous_volume = volume_norm

stream = sd.InputStream(callback=callback)
with stream:
    input("Press Enter to stop recording")

Следующим шагом было совместить два скрипта в один, но это привело к высокой задержке(около 5 секунд), стало понятно, что питон - не самый оптимальный вариант для риалтайм анализа.

Я решил переписать код на го, вот что получилось:

package main

import (
	"fmt"
	"log"
	"math"
	"strings"
	"time"

	"github.com/gordonklaus/portaudio"
	"encoding/hex"
	"flag"
	"tinygo.org/x/bluetooth"
)

// Переменные для настройки параметров аудио и устройства Bluetooth
var (
	SampleRate      float64 = 44100
	Threshold       float64 = 800
	BufferSize      int     = 512
	CutoffFrequency float64 = 250
	delay           int     = 3
	color           string  = "ff0000"
)

// Константы для удобства работы с аудио и состоянием
const (
	NumInputChannels = 1
	StateOff         = "off"
	StateOn          = "on"
)

// Инициализация флагов для настройки параметров программы
func init() {
	flag.Float64Var(&SampleRate, "sampleRate", 44100, "Sample rate of the audio")
	flag.Float64Var(&Threshold, "threshold", 800, "Volume threshold for triggering an action")
	flag.IntVar(&BufferSize, "bufferSize", 256, "Size of the audio buffer")
	flag.Float64Var(&CutoffFrequency, "cutoffFrequency", 250, "Cutoff frequency for the low-pass filter")
	flag.IntVar(&delay, "delay", 3, "Delay between audio buffer reads")
	flag.StringVar(&color, "color", "ff0000", "Color for the LED")
}

// Структура для отслеживания пиков звука
type PeakTracker struct {
	state          string
	isRising       bool
	peakStartLevel float64
	peakEndLevel   float64
	lastState      string
}

// Структура для хранения данных для Bluetooth
type data struct {
	off   []byte
	on    []byte
	color []byte
}

// Заранее определенные данные для Bluetooth
var sampleData = data{
	off: []byte{0x69, 0x96, 0x02, 0x01, 0x00},
	on:  []byte{0x69, 0x96, 0x02, 0x01, 0x01},
	//6996060101ffff to color red
	color: []byte{0x69, 0x96, 0x06, 0x01, 0x01, 0xff, 0xff},
}

// Функция для создания нового объекта PeakTracker
func NewPeakTracker() *PeakTracker {
	return &PeakTracker{
		state:    StateOff,
		isRising: false,
	}
}

// Функция для проверки изменения состояния и выполнения действий при изменении
func (pt *PeakTracker) CheckStateChange() {
	if pt.state != pt.lastState {
		pt.lastState = pt.state // Обновляем lastState после печати
		switch pt.state {
		case StateOff:
			fmt.Println("off")
			_, err := bluetoothChar.WriteWithoutResponse(sampleData.off)
			if err != nil {
				log.Fatalf("can't write characteristic: %s", err)
			}
		case StateOn:
			fmt.Println("on")
			_, err := bluetoothChar.WriteWithoutResponse(getColor(color))
			if err != nil {
				log.Fatalf("can't write characteristic: %s", err)
			}
		}
	}
}

// Структура для отслеживания порога амплитуды звука
type ThresholdTracker struct {
	maxMovingAverage *MovingAverage
	minMovingAverage *MovingAverage
	threshold        float64
}

// Функция для создания нового объекта ThresholdTracker
func NewThresholdTracker(size int) *ThresholdTracker {
	return &ThresholdTracker{
		maxMovingAverage: NewMovingAverage(size),
		minMovingAverage: NewMovingAverage(size),
	}
}

// Функция для обновления порога на основе движущегося среднего максимума и минимума
func (tt *ThresholdTracker) Update(value float64) {
	// Обновляем maxMovingAverage, если текущее значение выше текущего среднего максимума или окно еще не полное.
	if value > tt.maxMovingAverage.Average() || !tt.maxMovingAverage.full {
		tt.maxMovingAverage.Add(value)
	}

	// Обновляем minMovingAverage, если текущее значение ниже текущего среднего минимума или окно еще не полное.
	if value < tt.minMovingAverage.Average() || !tt.minMovingAverage.full {
		tt.minMovingAverage.Add(value)
	}

	// Обновляем порог на основе новых средних максимума и минимума.
	tt.threshold = (tt.maxMovingAverage.Average() + tt.minMovingAverage.Average()) / 2
}

// Функция для вычисления среднего значения в окне
func (ma *MovingAverage) Average() float64 {
	if ma.full {
		return ma.sum / float64(ma.size)
	}
	// Если окно еще не полное, делим на фактическое количество элементов.
	return ma.sum / float64(ma.index)
}

// Функция для обновления состояния PeakTracker с использованием порога
func (pt *PeakTracker) UpdateWithThreshold(amplitude, threshold float64) {
	switch pt.state {
	case StateOff:
		if amplitude > threshold {
			pt.isRising = true
			pt.peakStartLevel = amplitude
		}
		if pt.isRising && amplitude < pt.peakStartLevel-threshold {
			pt.state = StateOn
			pt.peakEndLevel = amplitude
			pt.isRising = false
		}
	case StateOn:
		if amplitude < threshold {
			pt.state = StateOff
			pt.isRising = false
		}
	}
	pt.CheckStateChange()
}

// Функция для обновления состояния PeakTracker с использованием динамического порога
func (pt *PeakTracker) UpdateWithDynamicThreshold(amplitude, threshold float64) {
	switch pt.state {
	case StateOff:
		if amplitude > threshold {
			pt.state = StateOn
		}
	case StateOn:
		if amplitude < threshold {
			pt.state = StateOff
		}
	}
	pt.CheckStateChange()
}

// Структура для применения низкочастотного фильтра
type LowPassFilter struct {
	a        float64
	y, yPrev float64
}

// Создание нового объекта LowPassFilter
func NewLowPassFilter(sampleRate, cutoffFrequency float64) *LowPassFilter {
	dt := 1.0 /