From a7689802b5826141df9883b27df390b4b8cc8d2a Mon Sep 17 00:00:00 2001 From: Dmitry Kaukov Date: Mon, 27 Jan 2025 05:29:21 +1100 Subject: [PATCH] USB Serial driver patch (#192) * USB driver patch Refactored SerialInputOutputManager: - Implemented multiple read buffers to improve USB reading efficiency and reduce latency. - Used separate threads for reading and writing, enhancing concurrency and performance. - Removed read timeouts, as they are not necessary for event-driven data reading. These changes enhance the responsiveness, reliability, and robustness of USB communication. * fixed merge * fixed merge * cleanup. * Removed unused import --- .../kv4pht/radio/RadioAudioService.java | 45 +-- .../hoho/android/usbserial/DeviceTest.java | 15 +- .../usbserial/driver/CommonUsbSerialPort.java | 7 +- .../usbserial/driver/UsbSerialPort.java | 2 + .../util/SerialInputOutputManager.java | 277 ++++++++++++------ 5 files changed, 196 insertions(+), 150 deletions(-) diff --git a/android-src/KV4PHT/app/src/main/java/com/vagell/kv4pht/radio/RadioAudioService.java b/android-src/KV4PHT/app/src/main/java/com/vagell/kv4pht/radio/RadioAudioService.java index 089dc26c..da6b4aef 100644 --- a/android-src/KV4PHT/app/src/main/java/com/vagell/kv4pht/radio/RadioAudioService.java +++ b/android-src/KV4PHT/app/src/main/java/com/vagell/kv4pht/radio/RadioAudioService.java @@ -56,7 +56,6 @@ kv4p HT (see http://kv4p.com) import com.google.android.gms.tasks.CancellationTokenSource; import com.google.android.gms.tasks.OnFailureListener; import com.google.android.gms.tasks.OnSuccessListener; -import com.hoho.android.usbserial.driver.SerialTimeoutException; import com.hoho.android.usbserial.driver.UsbSerialDriver; import com.hoho.android.usbserial.driver.UsbSerialPort; import com.hoho.android.usbserial.driver.UsbSerialProber; @@ -389,9 +388,6 @@ public void setMode(int mode) { usbIoManager.stop(); break; default: - if (null != usbIoManager && usbIoManager.getState() == SerialInputOutputManager.State.STOPPED) { - usbIoManager.start(); - } break; } @@ -935,7 +931,8 @@ public void onRunError(Exception e) { } }); usbIoManager.setWriteBufferSize(90000); // Must be large enough that ESP32 can take its time accepting our bytes without overrun. - usbIoManager.setReadTimeout(1000); // Must not be 0 (infinite) or it may block on read() until a write() occurs. + usbIoManager.setReadBufferSize(1024/4); // Must not be 0 (infinite) or it may block on read() until a write() occurs. + usbIoManager.setReadBufferCount(16*4); usbIoManager.start(); checkedFirmwareVersion = false; @@ -1221,43 +1218,7 @@ public synchronized void sendBytesToESP32(byte[] newBytes) { Log.d("DEBUG", "Warning: Attempted to send bytes to ESP32 while in the process of flashing a new firmware."); return; } - - int usbRetries = 0; - try { - // usbIoManager.writeAsync(newBytes); // On MCUs like the ESP32 S2 this causes USB failures with concurrent USB rx/tx. - int bytesWritten = 0; - int totalBytes = newBytes.length; - final int MAX_BYTES_PER_USB_WRITE = 128; - do { - try { - byte[] arrayPart = Arrays.copyOfRange(newBytes, bytesWritten, Math.min(bytesWritten + MAX_BYTES_PER_USB_WRITE, totalBytes)); - serialPort.write(arrayPart, 200); - bytesWritten += MAX_BYTES_PER_USB_WRITE; - usbRetries = 0; - } catch (SerialTimeoutException ste) { - // Do nothing, we'll try again momentarily. ESP32's serial buffer may be full. - usbRetries++; - Log.d("DEBUG", "usbRetries: " + usbRetries); - } - } while (bytesWritten < totalBytes && usbRetries < 10); - // Log.d("DEBUG", "Wrote data: " + Arrays.toString(newBytes)); - } catch (Exception e) { - e.printStackTrace(); - try { - serialPort.close(); - } catch (Exception ex) { - // Ignore. We did our best to close it! - } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // Ignore. This should only happen if the app is paused in this brief moment between USB retries, not a serious issue. - } - findESP32Device(); // Attempt to reconnect after the brief pause above. - } - if (usbRetries == 10) { - Log.d("DEBUG", "sendBytesToESP32: Connected to ESP32 via USB serial, but could not send data after 10 retries."); - } + usbIoManager.writeAsync(newBytes); } public static UsbSerialPort getUsbSerialPort() { diff --git a/android-src/KV4PHT/usbSerialForAndroid/src/androidTest/java/com/hoho/android/usbserial/DeviceTest.java b/android-src/KV4PHT/usbSerialForAndroid/src/androidTest/java/com/hoho/android/usbserial/DeviceTest.java index 617afde3..0a9e2b69 100644 --- a/android-src/KV4PHT/usbSerialForAndroid/src/androidTest/java/com/hoho/android/usbserial/DeviceTest.java +++ b/android-src/KV4PHT/usbSerialForAndroid/src/androidTest/java/com/hoho/android/usbserial/DeviceTest.java @@ -1206,7 +1206,6 @@ private int readSpeedInt(int writeSeconds, int readBufferSize, int readTimeout) writeAhead = 50; usb.open(EnumSet.of(UsbWrapper.OpenCloseFlags.NO_IOMANAGER_START)); - usb.ioManager.setReadTimeout(readTimeout); if(readBufferSize > 0) usb.ioManager.setReadBufferSize(readBufferSize); usb.ioManager.start(); @@ -1383,9 +1382,6 @@ public void IoManager() throws Exception { usb.ioManager = new SerialInputOutputManager(usb.serialPort, usb); assertEquals(usb, usb.ioManager.getListener()); - assertEquals(0, usb.ioManager.getReadTimeout()); - usb.ioManager.setReadTimeout(10); - assertEquals(10, usb.ioManager.getReadTimeout()); assertEquals(0, usb.ioManager.getWriteTimeout()); usb.ioManager.setWriteTimeout(11); assertEquals(11, usb.ioManager.getWriteTimeout()); @@ -1399,7 +1395,6 @@ public void IoManager() throws Exception { usb.ioManager.setReadBufferSize(usb.ioManager.getReadBufferSize()); usb.ioManager.setWriteBufferSize(usb.ioManager.getWriteBufferSize()); - usb.ioManager.setReadTimeout(usb.ioManager.getReadTimeout()); usb.ioManager.setWriteTimeout(usb.ioManager.getWriteTimeout()); usb.close(); @@ -1416,7 +1411,7 @@ public void IoManager() throws Exception { } catch (IllegalStateException ignored) { } try { - usb.ioManager.run(); + usb.ioManager.start(); fail("already running error expected"); } catch (IllegalStateException ignored) { } @@ -1424,11 +1419,6 @@ public void IoManager() throws Exception { usb.ioManager.setThreadPriority(Process.THREAD_PRIORITY_LOWEST); fail("setThreadPriority IllegalStateException expected"); } catch (IllegalStateException ignored) {} - try { - usb.ioManager.setReadTimeout(20); - fail("setReadTimeout IllegalStateException expected"); - } catch (IllegalStateException ignored) {} - assertEquals(0, usb.ioManager.getReadTimeout()); usb.ioManager.setWriteTimeout(21); assertEquals(21, usb.ioManager.getWriteTimeout()); usb.ioManager.setReadBufferSize(22); @@ -1484,7 +1474,6 @@ public void IoManager() throws Exception { usb.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE); telnet.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE); usb.ioManager.setThreadPriority(Process.THREAD_PRIORITY_DEFAULT); - Executors.newSingleThreadExecutor().submit(usb.ioManager); usb.waitForIoManagerStarted(); try { usb.ioManager.start(); @@ -1514,7 +1503,6 @@ public void writeAsync() throws Exception { // with timeout: write after timeout usb.open(EnumSet.of(UsbWrapper.OpenCloseFlags.NO_IOMANAGER_START)); - usb.ioManager.setReadTimeout(100); usb.ioManager.start(); usb.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE); telnet.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE); @@ -1522,7 +1510,6 @@ public void writeAsync() throws Exception { usb.ioManager.writeAsync(buf); data = telnet.read(2); assertEquals(2, data.length); - usb.ioManager.setReadTimeout(200); } @Test diff --git a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/CommonUsbSerialPort.java b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/CommonUsbSerialPort.java index 8cb460e9..57608a51 100644 --- a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/CommonUsbSerialPort.java +++ b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/CommonUsbSerialPort.java @@ -33,7 +33,12 @@ public abstract class CommonUsbSerialPort implements UsbSerialPort { protected final UsbDevice mDevice; protected final int mPortNumber; - // non-null when open() + @Override + public UsbDeviceConnection getConnection() { + return mConnection; + } + + // non-null when open() protected UsbDeviceConnection mConnection = null; protected UsbEndpoint mReadEndpoint; protected UsbEndpoint mWriteEndpoint; diff --git a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/UsbSerialPort.java b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/UsbSerialPort.java index 45a6e80a..5c517664 100644 --- a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/UsbSerialPort.java +++ b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/UsbSerialPort.java @@ -281,4 +281,6 @@ enum ControlLine { RTS, CTS, DTR, DSR, CD, RI } */ boolean isOpen(); + UsbDeviceConnection getConnection(); + } diff --git a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/util/SerialInputOutputManager.java b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/util/SerialInputOutputManager.java index 128ba652..39dca4f7 100644 --- a/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/util/SerialInputOutputManager.java +++ b/android-src/KV4PHT/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/util/SerialInputOutputManager.java @@ -6,23 +6,30 @@ package com.hoho.android.usbserial.util; +import android.hardware.usb.UsbRequest; import android.os.Process; import android.util.Log; +import androidx.annotation.VisibleForTesting; import com.hoho.android.usbserial.driver.UsbSerialPort; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; /** - * Utility class which services a {@link UsbSerialPort} in its {@link #run()} method. + * Utility class which services a {@link UsbSerialPort} for reading and writing in background threads. * * @author mike wakerly (opensource@hoho.com) */ -public class SerialInputOutputManager implements Runnable { +public class SerialInputOutputManager { public enum State { STOPPED, + STARTING, + STARTING_STAGE2, RUNNING, STOPPING } @@ -32,22 +39,21 @@ public enum State { private static final String TAG = SerialInputOutputManager.class.getSimpleName(); private static final int BUFSIZ = 4096; - /** - * default read timeout is infinite, to avoid data loss with bulkTransfer API - */ - private int mReadTimeout = 0; private int mWriteTimeout = 0; - private final Object mReadBufferLock = new Object(); private final Object mWriteBufferLock = new Object(); - private ByteBuffer mReadBuffer; // default size = getReadEndpoint().getMaxPacketSize() + private int mReadBufferSize; // default size = getReadEndpoint().getMaxPacketSize() + private int mReadBufferCount = 4; private ByteBuffer mWriteBuffer = ByteBuffer.allocate(BUFSIZ); private int mThreadPriority = Process.THREAD_PRIORITY_URGENT_AUDIO; - private State mState = State.STOPPED; // Synchronized by 'this' + private final AtomicReference mState = new AtomicReference<>(State.STOPPED); + private CountDownLatch mStartuplatch = new CountDownLatch(2); + private CountDownLatch mShutdownlatch = new CountDownLatch(2); private Listener mListener; // Synchronized by 'this' private final UsbSerialPort mSerialPort; + private Supplier mRequestSupplier = UsbRequest::new; public interface Listener { /** @@ -56,20 +62,20 @@ public interface Listener { void onNewData(byte[] data); /** - * Called when {@link SerialInputOutputManager#run()} aborts due to an error. + * Called when service thread aborts due to an error. */ void onRunError(Exception e); } public SerialInputOutputManager(UsbSerialPort serialPort) { mSerialPort = serialPort; - mReadBuffer = ByteBuffer.allocate(serialPort.getReadEndpoint().getMaxPacketSize()); + mReadBufferSize = serialPort.getReadEndpoint().getMaxPacketSize(); } public SerialInputOutputManager(UsbSerialPort serialPort, Listener listener) { mSerialPort = serialPort; mListener = listener; - mReadBuffer = ByteBuffer.allocate(serialPort.getReadEndpoint().getMaxPacketSize()); + mReadBufferSize = serialPort.getReadEndpoint().getMaxPacketSize(); } public synchronized void setListener(Listener listener) { @@ -86,23 +92,27 @@ public synchronized Listener getListener() { * @param threadPriority see {@link Process#setThreadPriority(int)} * */ public void setThreadPriority(int threadPriority) { - if (mState != State.STOPPED) + if (!mState.compareAndSet(State.STOPPED, State.STOPPED)) { throw new IllegalStateException("threadPriority only configurable before SerialInputOutputManager is started"); + } mThreadPriority = threadPriority; } /** - * read/write timeout + * read buffer count */ - public void setReadTimeout(int timeout) { - // when set if already running, read already blocks and the new value will not become effective now - if(mReadTimeout == 0 && timeout != 0 && mState != State.STOPPED) - throw new IllegalStateException("readTimeout only configurable before SerialInputOutputManager is started"); - mReadTimeout = timeout; + public int getReadBufferCount() { + return mReadBufferCount; } - public int getReadTimeout() { - return mReadTimeout; + /** + * read buffer count + */ + public void setReadBufferCount(int mReadBuffeCount) { + if (!mState.compareAndSet(State.STOPPED, State.STOPPED)) { + throw new IllegalStateException("ReadBufferCount only configurable before SerialInputOutputManager is started"); + } + this.mReadBufferCount = mReadBuffeCount; } public void setWriteTimeout(int timeout) { @@ -117,15 +127,21 @@ public int getWriteTimeout() { * read/write buffer size */ public void setReadBufferSize(int bufferSize) { - if (getReadBufferSize() == bufferSize) - return; - synchronized (mReadBufferLock) { - mReadBuffer = ByteBuffer.allocate(bufferSize); + if (getReadBufferSize() != bufferSize) { + if (!mState.compareAndSet(State.STOPPED, State.STOPPED)) { + throw new IllegalStateException("ReadBuffeCount only configurable before SerialInputOutputManager is started"); + } + mReadBufferSize = bufferSize; } } public int getReadBufferSize() { - return mReadBuffer.capacity(); + return mReadBufferSize; + } + + @VisibleForTesting + void setRequestSupplier(Supplier mRequestSupplier) { + this.mRequestSupplier = mRequestSupplier; } public void setWriteBufferSize(int bufferSize) { @@ -154,105 +170,180 @@ public void writeAsync(byte[] data) { } /** - * start SerialInputOutputManager in separate thread + * start SerialInputOutputManager in separate threads */ public void start() { - if(mState != State.STOPPED) + if(mState.compareAndSet(State.STOPPED, State.STARTING)) { + mStartuplatch = new CountDownLatch(2); + mShutdownlatch = new CountDownLatch(2); + new ServiceReadThread(this.getClass().getSimpleName() + "_read").start(); + new ServiceWriteThread(this.getClass().getSimpleName() + "_write").start(); + try { + mStartuplatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { throw new IllegalStateException("already started"); - new Thread(this, this.getClass().getSimpleName()).start(); + } } /** - * stop SerialInputOutputManager thread - * + * stop SerialInputOutputManager threads * when using readTimeout == 0 (default), additionally use usbSerialPort.close() to * interrupt blocking read */ - public synchronized void stop() { - if (getState() == State.RUNNING) { + public void stop() { + if(mState.compareAndSet(State.RUNNING, State.STOPPING)) { Log.i(TAG, "Stop requested"); - mState = State.STOPPING; } } - public synchronized State getState() { - return mState; + public State getState() { + return mState.get(); } - /** - * Continuously services the read and write buffers until {@link #stop()} is - * called, or until a driver exception is raised. - */ - @Override - public void run() { - synchronized (this) { - if (getState() != State.STOPPED) { - Log.d("DEBUG", "Warning: Tried to run() a SerialInputOutputManager that was already running."); - return; + abstract class ServiceThread extends Thread { + + ServiceThread(String name) { + setName(name); + } + + private boolean isStillRunning() { + SerialInputOutputManager.State state = mState.get(); + return ((state == SerialInputOutputManager.State.RUNNING) || (state == SerialInputOutputManager.State.STARTING) || (state == SerialInputOutputManager.State.STARTING_STAGE2)) + && (mShutdownlatch.getCount() == 2) + && !Thread.currentThread().isInterrupted(); + } + + private void notifyErrorListener(Throwable e) { + if (getListener() != null) { + try { + getListener().onRunError(e instanceof Exception ? (Exception) e : new Exception(e)); + } catch (Throwable t) { + Log.w(TAG, "Exception in onRunError: " + t.getMessage(), t); + } } - mState = State.RUNNING; } - Log.i(TAG, "Running ..."); - try { - if(mThreadPriority != Process.THREAD_PRIORITY_DEFAULT) + + private void startThread() { + if (mThreadPriority != Process.THREAD_PRIORITY_DEFAULT) { Process.setThreadPriority(mThreadPriority); - while (true) { - if (getState() != State.RUNNING) { - Log.i(TAG, "Stopping mState=" + getState()); - break; + } + if (!mState.compareAndSet(SerialInputOutputManager.State.STARTING, SerialInputOutputManager.State.STARTING_STAGE2)) { + if (mState.compareAndSet(SerialInputOutputManager.State.STARTING_STAGE2, SerialInputOutputManager.State.RUNNING)) { + Log.i(TAG, getName() + ": Started mState=" + mState.get()); } - step(); } - } catch (Exception e) { - Log.w(TAG, "Run ending due to exception: " + e.getMessage(), e); - final Listener listener = getListener(); - if (listener != null) { - listener.onRunError(e); + mStartuplatch.countDown(); + } + + private void finalizeThread() { + if (!mState.compareAndSet(SerialInputOutputManager.State.RUNNING, SerialInputOutputManager.State.STOPPING)) { + if (mState.compareAndSet(SerialInputOutputManager.State.STOPPING, SerialInputOutputManager.State.STOPPED)) { + Log.i(TAG, getName() + ": Stopped mState=" + mState.get()); + } } - } finally { - synchronized (this) { - mState = State.STOPPED; - Log.i(TAG, "Stopped"); + mShutdownlatch.countDown(); + } + + abstract void init(); + abstract void step() throws IOException; + + @Override + public void run() { + try { + startThread(); + init(); + do { + step(); + } while (isStillRunning()); + Log.i(TAG, getName() + ": Stopping mState=" + mState.get()); + } catch (Throwable e) { + if (Thread.currentThread().isInterrupted()) { + Log.w(TAG, "Thread interrupted, stopping " + getName()); + } else { + Log.w(TAG, getName() + " ending due to exception: " + e.getMessage(), e); + notifyErrorListener(e); + } + } finally { + finalizeThread(); } } } - private void step() throws IOException { - // Handle incoming data. - byte[] buffer; - synchronized (mReadBufferLock) { - buffer = mReadBuffer.array(); + class ServiceReadThread extends ServiceThread { + + ServiceReadThread(String name) { + super(name); } - int len = mSerialPort.read(buffer, mReadTimeout); - if (len > 0) { - if (DEBUG) { - Log.d(TAG, "Read data len=" + len); - } - final Listener listener = getListener(); - if (listener != null) { - final byte[] data = new byte[len]; - System.arraycopy(buffer, 0, data, 0, len); - listener.onNewData(data); + + @Override + void init() { + // Initialize buffers and requests + for (int i = 0; i < mReadBufferCount; i++) { + ByteBuffer buffer = ByteBuffer.allocate(mReadBufferSize); + UsbRequest request = new UsbRequest(); + request.setClientData(buffer); + request.initialize(mSerialPort.getConnection(), mSerialPort.getReadEndpoint()); + request.queue(buffer, buffer.capacity()); } } - // Handle outgoing data. - buffer = null; - synchronized (mWriteBufferLock) { - len = mWriteBuffer.position(); - if (len > 0) { - buffer = new byte[len]; - mWriteBuffer.rewind(); - mWriteBuffer.get(buffer, 0, len); - mWriteBuffer.clear(); + @Override + void step() throws IOException { + // Wait for the request to complete + final UsbRequest completedRequest = mSerialPort.getConnection().requestWait(); + if (completedRequest != null) { + final ByteBuffer completedBuffer = (ByteBuffer) completedRequest.getClientData(); + completedBuffer.flip(); // Prepare for reading + final byte[] data = new byte[completedBuffer.remaining()]; + completedBuffer.get(data); + if ((getListener() != null) && (data.length > 0)) { + getListener().onNewData(data); // Handle data + } + completedBuffer.clear(); // Prepare for reuse + // Requeue the buffer and handle potential failures + if (!completedRequest.queue(completedBuffer, completedBuffer.capacity())) { + Log.e(TAG, "Failed to requeue the buffer"); + throw new IOException("Failed to requeue the buffer"); + } + } else { + Log.e(TAG, "Error waiting for request"); + throw new IOException("Error waiting for request"); } } - if (buffer != null) { - if (DEBUG) { - Log.d(TAG, "Writing data len=" + len); + } + + class ServiceWriteThread extends ServiceThread { + + ServiceWriteThread(String name) { + super(name); + } + + @Override + void init() { + } + + @Override + void step() throws IOException { + // Handle outgoing data. + byte[] buffer = null; + synchronized (mWriteBufferLock) { + int len = mWriteBuffer.position(); + if (len > 0) { + buffer = new byte[len]; + mWriteBuffer.rewind(); + mWriteBuffer.get(buffer, 0, len); + mWriteBuffer.clear(); + } + } + if (buffer != null) { + if (DEBUG) { + Log.d(TAG, "Writing data len=" + buffer.length); + } + mSerialPort.write(buffer, mWriteTimeout); } - mSerialPort.write(buffer, mWriteTimeout); } } - }