diff --git a/app/build.gradle b/app/build.gradle index cd5faed..335f4e6 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -69,7 +69,7 @@ repositories { jcenter() } -def zitiVersion = "0.15.0" +def zitiVersion = "0.16.1" dependencies { implementation fileTree(include: ['*.jar'], dir: 'libs') diff --git a/app/src/main/java/org/openziti/mobile/Tunnel.kt b/app/src/main/java/org/openziti/mobile/Tunnel.kt index d82999f..e71cee3 100644 --- a/app/src/main/java/org/openziti/mobile/Tunnel.kt +++ b/app/src/main/java/org/openziti/mobile/Tunnel.kt @@ -6,40 +6,34 @@ package org.openziti.mobile import android.os.ParcelFileDescriptor import android.util.Log -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow import org.openziti.mobile.net.PacketRouter import org.openziti.mobile.net.TUNNEL_MTU import java.nio.ByteBuffer import java.nio.channels.ClosedByInterruptException import java.util.concurrent.Executors -class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter) { +class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter, val toPeerChannel: ReceiveChannel) { + val output = ParcelFileDescriptor.AutoCloseOutputStream(fd).channel val input = ParcelFileDescriptor.AutoCloseInputStream(fd).channel val readerThread = Thread(this::reader, "tunnel-read") - val toPeerChannel = Channel(Channel.UNLIMITED) - val writeDispatcher = Executors.newSingleThreadExecutor{ - r -> Thread(r, "tunner-write") - }.asCoroutineDispatcher() - fun start() { + val writer: Job + init { readerThread.start() - GlobalScope.launch(writeDispatcher){ - writer() + writer = GlobalScope.launch(writeDispatcher) { + toPeerChannel.consumeAsFlow().collect { + Log.v(TAG, "writing ${it.remaining()} on t[${Thread.currentThread().name}]") + output.write(it) + } } - } - - suspend fun writer() { - for (b in toPeerChannel) { - Log.v(TAG, "writing ${b.remaining()} on t[${Thread.currentThread().name}]") - output.write(b) + writer.invokeOnCompletion { + Log.i(TAG, "writer() finished ${it?.localizedMessage}") } - - Log.i(TAG, "writer() finished") } fun reader() { @@ -67,16 +61,20 @@ class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter) { } } - fun onInbound(b: ByteBuffer) = - runBlocking { toPeerChannel.send(b) } +// fun onInbound(b: ByteBuffer): Unit = runBlocking { +// toPeerChannel.runCatching { +// toPeerChannel.send(b) +// }.onFailure { +// Log.w(TAG, "") +// } +// } fun close(ex: Throwable? = null) { ex?.let { Log.e(TAG, "closing with exception: $it") } - toPeerChannel.close() - writeDispatcher.close() + runBlocking { writer.cancelAndJoin() } readerThread.interrupt() output.close() input.close() @@ -84,5 +82,8 @@ class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter) { companion object { val TAG = Tunnel::class.java.simpleName + val writeDispatcher = Executors.newSingleThreadExecutor{ + r -> Thread(r, "tunnel-write").apply { isDaemon = true } + }.asCoroutineDispatcher() } } \ No newline at end of file diff --git a/app/src/main/java/org/openziti/mobile/ZitiContextModel.kt b/app/src/main/java/org/openziti/mobile/ZitiContextModel.kt index b18fc69..4edecbf 100644 --- a/app/src/main/java/org/openziti/mobile/ZitiContextModel.kt +++ b/app/src/main/java/org/openziti/mobile/ZitiContextModel.kt @@ -10,7 +10,6 @@ import androidx.lifecycle.ViewModel import androidx.lifecycle.ViewModelProvider import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.launch import org.openziti.ZitiContext import org.openziti.android.Ziti @@ -31,13 +30,13 @@ class ZitiContextModel(val ctx: ZitiContext): ViewModel() { init { GlobalScope.launch { - statusSub.consumeAsFlow().collect { + statusSub.collect { statusLive.postValue(it) } } GlobalScope.launch { - serviceSub.consumeAsFlow().collect { + serviceSub.collect { when(it.type) { ZitiContext.ServiceUpdate.Available -> { servicesMap.put(it.service.name, it.service) @@ -76,7 +75,5 @@ class ZitiContextModel(val ctx: ZitiContext): ViewModel() { } override fun onCleared() { - statusSub.cancel() - serviceSub.cancel() } } \ No newline at end of file diff --git a/app/src/main/java/org/openziti/mobile/ZitiVPNService.kt b/app/src/main/java/org/openziti/mobile/ZitiVPNService.kt index d140d9e..91384bf 100644 --- a/app/src/main/java/org/openziti/mobile/ZitiVPNService.kt +++ b/app/src/main/java/org/openziti/mobile/ZitiVPNService.kt @@ -16,11 +16,13 @@ import android.os.IBinder import android.system.OsConstants import android.util.Log import androidx.localbroadcastmanager.content.LocalBroadcastManager +import kotlinx.coroutines.channels.Channel import org.openziti.android.Ziti import org.openziti.mobile.net.PacketRouter import org.openziti.mobile.net.PacketRouterImpl import org.openziti.mobile.net.TUNNEL_MTU import org.openziti.net.dns.DNSResolver +import java.nio.ByteBuffer class ZitiVPNService : VpnService() { @@ -31,6 +33,7 @@ class ZitiVPNService : VpnService() { private val TAG: String = javaClass.simpleName val dnsAddr = "169.254.0.2" + private val peerChannel = Channel(128) private var tunnel: Tunnel? = null lateinit var packetRouter: PacketRouter lateinit var dnsResolver: DNSResolver @@ -73,7 +76,7 @@ class ZitiVPNService : VpnService() { dnsResolver = Ziti.getDnsResolver() packetRouter = PacketRouterImpl(dnsResolver, dnsAddr) {buf -> - tunnel?.onInbound(buf) ?: Log.w(TAG, "failed to send packet because tunnel was closed") + peerChannel.send(buf) } LocalBroadcastManager.getInstance(applicationContext).registerReceiver(receiver, @@ -153,9 +156,8 @@ class ZitiVPNService : VpnService() { val fd = builder.establish()!! Log.i(TAG, "starting tunnel for fd=$fd") - return Tunnel(fd, packetRouter).let { + return Tunnel(fd, packetRouter, peerChannel).let { tunnel = it - it.start() } } catch (ex: Throwable) { Log.wtf(TAG, ex) diff --git a/app/src/main/java/org/openziti/mobile/net/PacketRouterImpl.kt b/app/src/main/java/org/openziti/mobile/net/PacketRouterImpl.kt index f1f2299..a2727d3 100644 --- a/app/src/main/java/org/openziti/mobile/net/PacketRouterImpl.kt +++ b/app/src/main/java/org/openziti/mobile/net/PacketRouterImpl.kt @@ -5,6 +5,7 @@ package org.openziti.mobile.net import android.util.Log +import kotlinx.coroutines.runBlocking import org.openziti.net.dns.DNSResolver import org.pcap4j.packet.IpPacket import org.pcap4j.packet.IpSelector @@ -18,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap import kotlin.concurrent.timer -class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound: (b: ByteBuffer) -> Unit) : PacketRouter { +class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound: suspend (b: ByteBuffer) -> Unit) : PacketRouter { val TAG = "routing" val tcpConnections = ConcurrentHashMap() val zitiNameserver = ZitiNameserver(dnsAddr, resolver) @@ -37,7 +38,7 @@ class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound: if (packet.header.dstAddr.hostAddress == dnsAddr) { zitiNameserver.process(packet)?.let { - inbound(ByteBuffer.wrap(it.rawData)) + runBlocking { inbound(ByteBuffer.wrap(it.rawData)) } } return diff --git a/app/src/main/java/org/openziti/mobile/net/ZitiTunnelConnection.kt b/app/src/main/java/org/openziti/mobile/net/ZitiTunnelConnection.kt index 0954518..dbaaf16 100644 --- a/app/src/main/java/org/openziti/mobile/net/ZitiTunnelConnection.kt +++ b/app/src/main/java/org/openziti/mobile/net/ZitiTunnelConnection.kt @@ -32,7 +32,7 @@ import kotlin.coroutines.CoroutineContext * */ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSocketAddress, synPack: IpV4Packet, - val onInbound: (ByteBuffer) -> Unit) : CoroutineScope { + val onInbound: suspend (ByteBuffer) -> Unit) : CoroutineScope { val supervisor = SupervisorJob() override val coroutineContext: CoroutineContext @@ -61,9 +61,11 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock tcpConn.init(tcp) try { - val c = conn.await() + val c = withTimeout(3000) { + conn.await() + } sendToPeer(tcpConn.accept().toList()) - processPeerPackets() + processPeerPackets(c as AsynchronousSocketChannel) readZitiConn(c) } catch (ex: Exception) { Log.e(info, "failed to connect to Ziti Service: $ex") @@ -72,7 +74,6 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock } } - fun readZitiConn(conn: ZitiConnection) = launch { try { val buf = ByteArray(1024 * 16) @@ -111,27 +112,33 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock fun isClosed() = tcpConn.isClosed() - fun processPeerPackets() = launch { - - val zitiConn = conn.await() as AsynchronousSocketChannel + fun processPeerPackets(zitiConn: AsynchronousSocketChannel) = launch { inboundPackets.receiveAsFlow().collect { packet -> val out = mutableListOf() val payload = tcpConn.fromPeer(packet.payload as TcpPacket, out) - payload?.let { - Log.v(info, "sending ${it.size} bytes to ziti backend") - zitiConn.writeCompletely(ByteBuffer.wrap(it)) - } - - when (tcpConn.state) { - TCP.State.LAST_ACK, TCP.State.Closed -> { - zitiConn.close() + runCatching { + payload?.let { + Log.v(info, "sending ${it.size} bytes to ziti backend") + zitiConn.writeCompletely(ByteBuffer.wrap(it)) + } + }.onSuccess { + when (tcpConn.state) { + TCP.State.LAST_ACK, TCP.State.Closed -> { + zitiConn.close() + } + TCP.State.FIN_WAIT_1 -> { + zitiConn.shutdownOutput() + } + else -> { + } } - TCP.State.FIN_WAIT_1 -> { - zitiConn.shutdownOutput() + }.onFailure { exc -> + Log.w(info, "failed to deliver date to Ziti side", exc) + tcpConn.close()?.let { it -> + out.add(it) } - else -> {} } sendToPeer(out) @@ -154,7 +161,7 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock override fun toString(): String = info - fun sendToPeer(packets: List) { + suspend fun sendToPeer(packets: List) { for (p in packets) { val ipPack = IpV4Packet.Builder() .version(IpVersion.IPV4) diff --git a/build.gradle b/build.gradle index f51fdf7..cd044d7 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ buildscript { jcenter() } dependencies { - classpath 'com.android.tools.build:gradle:4.1.0' + classpath 'com.android.tools.build:gradle:4.1.1' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" // NOTE: Do not place your application dependencies here; they belong