Skip to content

Interactive Meshtastic Chat Client which chats on the Primary Channel over BLE.

License

Notifications You must be signed in to change notification settings

mytechnotalent/bimcc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

image

FREE Reverse Engineering Self-Study Course HERE



BLE Interactive Meshtastic Chat Client

Interactive Meshtastic Chat Client which chats on the Primary Channel over BLE.



STEP 1: python3 -m venv venv

STEP 2: source venv/bin/activate

STEP 3: pip install -r requirements.txt

STEP 4: ./bimcc.py "<BLE_DEVICE_ADDRESS>"

SOURCE

#!/usr/bin/env python3

"""
BLE Interactive Meshtastic Chat Client 0.1.0

Usage:
    python bimcc.py "<BLE_DEVICE_ADDRESS>"

This script sends and receives text messages over the BLE interface.
It uses PyPubSub to subscribe to text messages.
"""

import sys
import time
import asyncio
from pubsub import pub
from meshtastic.ble_interface import BLEInterface, BLEClient


def custom_find_device(address):
    """
    Scans for available BLE devices without service filtering and attempts a
    case-insensitive match on the provided address. Prints discovered devices
    for debugging.

    When the Meshtastic interface attempts to locate a specific BLE device,
    this function is called to perform an unfiltered scan and compare each
    discovered device’s address (in lowercase) to the requested address.

    Parameters:
        address (str): The target BLE address to find. The function converts
                       both the discovered device addresses and this parameter
                       to lowercase for case-insensitive matching.

    Side Effects:
        - Prints a list of discovered BLE devices, including their addresses
          and names, for debugging purposes.
        - If a matching device is found, it returns immediately and no further
          devices are processed.

    Raises:
        Exception: If no device is found whose address matches (case-insensitively)
                   the specified address. The user is advised to run a BLE scan
                   (e.g., `meshtastic --ble-scan`) for troubleshooting.

    Returns:
        BLEDevice: The matching BLE device object, if found.
    """
    devices = BLEInterface.scan()
    print("Discovered Devices:")
    for d in devices:
        print(f"  Address: {d.address}, Name: {d.name}")
    addr_lower = address.lower()
    for d in devices:
        if d.address.lower() == addr_lower:
            return d
    raise Exception(f"No BLE peripheral with address '{address}' found. Try --ble-scan.")


def custom_connect(self, address=None):
    """
    Attempts to connect to a BLE device, bypassing the normal scanning process if
    a specific address is provided. If no address is given, falls back to the original
    scanning-based connect method.

    When the Meshtastic interface attempts to establish a BLE connection, this
    function creates a BLEClient (using the specified address) for a synchronous
    connection and service discovery, thereby skipping the usual scanning. If
    no address is specified, it calls the previously saved “original” connect
    function.

    Parameters:
        address (str, optional): A case-insensitive BLE address for direct connection.
                                 If omitted or None, the standard scanning-based connect
                                 logic is used instead.

    Side Effects:
        - Immediately creates a BLEClient when an address is provided, invoking a
          synchronous `connect()` call and a subsequent `discover()` for
          characteristics and services.
        - If `address` is None, performs the typical scanning-based approach.

    Raises:
        Exception: If the BLEClient’s internal `connect()` or `discover()` calls
                   fail, or if scanning fails when no address is supplied.

    Returns:
        BLEClient: The BLEClient instance representing the established connection
                   to the device.
    """
    if address:
        client = BLEClient(address, disconnected_callback=lambda _: self.close)
        client.connect()  # synchronous connect (BleakClient.connect)
        client.discover()  # discover services/characteristics
        return client
    else:
        return _original_connect(self, address)


def onReceive(packet=None, interface=None):
    """
    Callback function to process an incoming text message when the topic
    "meshtastic.receive.text" is published.

    This function accepts both 'packet' and 'interface' as optional parameters
    to avoid errors from extra keyword arguments. When the Meshtastic library
    publishes on the "meshtastic.receive.text" topic, it may include an
    'interface' argument along with 'packet'; by default, PyPubSub can raise
    errors if it encounters unexpected keywords.

    Parameters:
        packet (dict, optional): A dictionary containing data about the received
            packet. Defaults to None if no packet info is supplied.
            If present, it may include:
              - 'decoded': A dictionary that may contain a 'text' key if the
                          packet is a text message.
              - 'fromId': (Optional) The identifier of the sender. If missing,
                          defaults to "unknown".
        interface (optional): The Meshtastic interface instance that received
            the packet (provided by the publisher). Ignored here, but accepted
            to avoid 'SenderUnknownMsgDataError'.

    Side Effects:
        - If the packet contains a text message (under 'decoded.text'), it
          prints the sender ID and the message to standard output.
        - Prints a prompt ("Ch0> ") to indicate that further user
          input can be entered.

    Returns:
        None
            This function does not return anything. It processes the packet
            for display and updates the console prompt.
    """
    if not packet:
        return  # no packet data, do nothing
    decoded = packet.get("decoded", {})
    if "text" in decoded:
        sender = packet.get("fromId", "unknown")
        message = decoded["text"]
        print(f"\n{sender}: {message}")
        print("Ch0> ", end="", flush=True)


async def main():
    if len(sys.argv) < 2:
        print("Usage: python bimcc.py <BLE_DEVICE_ADDRESS>")
        sys.exit(1)
    
    address = sys.argv[1].strip()
    print(f"Attempting to connect to BLE device at address: {address}")
    
    try:
        # creating BLEInterface automatically attempts to connect
        ble_iface = BLEInterface(address=address)
        print("Connected to BLE device!")
        # Wait a moment to stabilize
        time.sleep(2)
    except Exception as e:
        print("Error initializing BLE interface:", e)
        sys.exit(1)
    
    print("BLE Interactive Meshtastic Chat Client 0.1.0")
    print("--------------------------------------------")
    print("Type your message and press Enter to send.") 
    print("Press Ctrl+C to exit...")
    print("")

    loop = asyncio.get_running_loop()
    try:
        while True:
            # non-blocking input
            msg = await loop.run_in_executor(None, input, "Ch0> ")
            if msg:
                ble_iface.sendText(msg, channelIndex=0)
                await asyncio.sleep(0.1)
    except KeyboardInterrupt:
        print("\nExiting...")
    finally:
        ble_iface.close()
        sys.exit(0)


if __name__ == "__main__":
    BLEInterface.find_device = custom_find_device
    _original_connect = BLEInterface.connect
    BLEInterface.connect = custom_connect
    
    # subscribe to the text messages topic, using our callback that accepts both 'packet' and 'interface'
    pub.subscribe(onReceive, "meshtastic.receive.text")

    asyncio.run(main())

License

Apache License, Version 2.0

Packages

No packages published

Languages