Skip to content

Commit

Permalink
Merge pull request #42 from openziti/fix-tunner-restarts
Browse files Browse the repository at this point in the history
fixes
  • Loading branch information
ekoby authored Nov 11, 2020
2 parents 6ddd045 + 48b7afa commit f3b428b
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 56 deletions.
2 changes: 1 addition & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ repositories {
jcenter()
}

def zitiVersion = "0.15.0"
def zitiVersion = "0.16.1"

dependencies {
implementation fileTree(include: ['*.jar'], dir: 'libs')
Expand Down
51 changes: 26 additions & 25 deletions app/src/main/java/org/openziti/mobile/Tunnel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,34 @@ package org.openziti.mobile

import android.os.ParcelFileDescriptor
import android.util.Log
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import org.openziti.mobile.net.PacketRouter
import org.openziti.mobile.net.TUNNEL_MTU
import java.nio.ByteBuffer
import java.nio.channels.ClosedByInterruptException
import java.util.concurrent.Executors

class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter) {
class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter, val toPeerChannel: ReceiveChannel<ByteBuffer>) {

val output = ParcelFileDescriptor.AutoCloseOutputStream(fd).channel
val input = ParcelFileDescriptor.AutoCloseInputStream(fd).channel
val readerThread = Thread(this::reader, "tunnel-read")
val toPeerChannel = Channel<ByteBuffer>(Channel.UNLIMITED)
val writeDispatcher = Executors.newSingleThreadExecutor{
r -> Thread(r, "tunner-write")
}.asCoroutineDispatcher()

fun start() {
val writer: Job
init {
readerThread.start()
GlobalScope.launch(writeDispatcher){
writer()
writer = GlobalScope.launch(writeDispatcher) {
toPeerChannel.consumeAsFlow().collect {
Log.v(TAG, "writing ${it.remaining()} on t[${Thread.currentThread().name}]")
output.write(it)
}
}
}

suspend fun writer() {
for (b in toPeerChannel) {
Log.v(TAG, "writing ${b.remaining()} on t[${Thread.currentThread().name}]")
output.write(b)
writer.invokeOnCompletion {
Log.i(TAG, "writer() finished ${it?.localizedMessage}")
}

Log.i(TAG, "writer() finished")
}

fun reader() {
Expand Down Expand Up @@ -67,22 +61,29 @@ class Tunnel(fd: ParcelFileDescriptor, val processor: PacketRouter) {
}
}

fun onInbound(b: ByteBuffer) =
runBlocking { toPeerChannel.send(b) }
// fun onInbound(b: ByteBuffer): Unit = runBlocking {
// toPeerChannel.runCatching {
// toPeerChannel.send(b)
// }.onFailure {
// Log.w(TAG, "")
// }
// }

fun close(ex: Throwable? = null) {
ex?.let {
Log.e(TAG, "closing with exception: $it")
}

toPeerChannel.close()
writeDispatcher.close()
runBlocking { writer.cancelAndJoin() }
readerThread.interrupt()
output.close()
input.close()
}

companion object {
val TAG = Tunnel::class.java.simpleName
val writeDispatcher = Executors.newSingleThreadExecutor{
r -> Thread(r, "tunnel-write").apply { isDaemon = true }
}.asCoroutineDispatcher()
}
}
7 changes: 2 additions & 5 deletions app/src/main/java/org/openziti/mobile/ZitiContextModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.ViewModelProvider
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import org.openziti.ZitiContext
import org.openziti.android.Ziti
Expand All @@ -31,13 +30,13 @@ class ZitiContextModel(val ctx: ZitiContext): ViewModel() {

init {
GlobalScope.launch {
statusSub.consumeAsFlow().collect {
statusSub.collect {
statusLive.postValue(it)
}
}

GlobalScope.launch {
serviceSub.consumeAsFlow().collect {
serviceSub.collect {
when(it.type) {
ZitiContext.ServiceUpdate.Available -> {
servicesMap.put(it.service.name, it.service)
Expand Down Expand Up @@ -76,7 +75,5 @@ class ZitiContextModel(val ctx: ZitiContext): ViewModel() {
}

override fun onCleared() {
statusSub.cancel()
serviceSub.cancel()
}
}
8 changes: 5 additions & 3 deletions app/src/main/java/org/openziti/mobile/ZitiVPNService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import android.os.IBinder
import android.system.OsConstants
import android.util.Log
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import kotlinx.coroutines.channels.Channel
import org.openziti.android.Ziti
import org.openziti.mobile.net.PacketRouter
import org.openziti.mobile.net.PacketRouterImpl
import org.openziti.mobile.net.TUNNEL_MTU
import org.openziti.net.dns.DNSResolver
import java.nio.ByteBuffer

class ZitiVPNService : VpnService() {

Expand All @@ -31,6 +33,7 @@ class ZitiVPNService : VpnService() {
private val TAG: String = javaClass.simpleName
val dnsAddr = "169.254.0.2"

private val peerChannel = Channel<ByteBuffer>(128)
private var tunnel: Tunnel? = null
lateinit var packetRouter: PacketRouter
lateinit var dnsResolver: DNSResolver
Expand Down Expand Up @@ -73,7 +76,7 @@ class ZitiVPNService : VpnService() {

dnsResolver = Ziti.getDnsResolver()
packetRouter = PacketRouterImpl(dnsResolver, dnsAddr) {buf ->
tunnel?.onInbound(buf) ?: Log.w(TAG, "failed to send packet because tunnel was closed")
peerChannel.send(buf)
}

LocalBroadcastManager.getInstance(applicationContext).registerReceiver(receiver,
Expand Down Expand Up @@ -153,9 +156,8 @@ class ZitiVPNService : VpnService() {
val fd = builder.establish()!!

Log.i(TAG, "starting tunnel for fd=$fd")
return Tunnel(fd, packetRouter).let {
return Tunnel(fd, packetRouter, peerChannel).let {
tunnel = it
it.start()
}
} catch (ex: Throwable) {
Log.wtf(TAG, ex)
Expand Down
5 changes: 3 additions & 2 deletions app/src/main/java/org/openziti/mobile/net/PacketRouterImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.openziti.mobile.net

import android.util.Log
import kotlinx.coroutines.runBlocking
import org.openziti.net.dns.DNSResolver
import org.pcap4j.packet.IpPacket
import org.pcap4j.packet.IpSelector
Expand All @@ -18,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.timer


class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound: (b: ByteBuffer) -> Unit) : PacketRouter {
class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound: suspend (b: ByteBuffer) -> Unit) : PacketRouter {
val TAG = "routing"
val tcpConnections = ConcurrentHashMap<ConnectionKey, ZitiTunnelConnection>()
val zitiNameserver = ZitiNameserver(dnsAddr, resolver)
Expand All @@ -37,7 +38,7 @@ class PacketRouterImpl(resolver: DNSResolver, val dnsAddr: String, val inbound:

if (packet.header.dstAddr.hostAddress == dnsAddr) {
zitiNameserver.process(packet)?.let {
inbound(ByteBuffer.wrap(it.rawData))
runBlocking { inbound(ByteBuffer.wrap(it.rawData)) }
}

return
Expand Down
45 changes: 26 additions & 19 deletions app/src/main/java/org/openziti/mobile/net/ZitiTunnelConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kotlin.coroutines.CoroutineContext
*
*/
class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSocketAddress, synPack: IpV4Packet,
val onInbound: (ByteBuffer) -> Unit) : CoroutineScope {
val onInbound: suspend (ByteBuffer) -> Unit) : CoroutineScope {

val supervisor = SupervisorJob()
override val coroutineContext: CoroutineContext
Expand Down Expand Up @@ -61,9 +61,11 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock
tcpConn.init(tcp)

try {
val c = conn.await()
val c = withTimeout(3000) {
conn.await()
}
sendToPeer(tcpConn.accept().toList())
processPeerPackets()
processPeerPackets(c as AsynchronousSocketChannel)
readZitiConn(c)
} catch (ex: Exception) {
Log.e(info, "failed to connect to Ziti Service: $ex")
Expand All @@ -72,7 +74,6 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock
}
}


fun readZitiConn(conn: ZitiConnection) = launch {
try {
val buf = ByteArray(1024 * 16)
Expand Down Expand Up @@ -111,27 +112,33 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock

fun isClosed() = tcpConn.isClosed()

fun processPeerPackets() = launch {

val zitiConn = conn.await() as AsynchronousSocketChannel
fun processPeerPackets(zitiConn: AsynchronousSocketChannel) = launch {

inboundPackets.receiveAsFlow().collect { packet ->
val out = mutableListOf<TcpPacket>()
val payload = tcpConn.fromPeer(packet.payload as TcpPacket, out)

payload?.let {
Log.v(info, "sending ${it.size} bytes to ziti backend")
zitiConn.writeCompletely(ByteBuffer.wrap(it))
}

when (tcpConn.state) {
TCP.State.LAST_ACK, TCP.State.Closed -> {
zitiConn.close()
runCatching {
payload?.let {
Log.v(info, "sending ${it.size} bytes to ziti backend")
zitiConn.writeCompletely(ByteBuffer.wrap(it))
}
}.onSuccess {
when (tcpConn.state) {
TCP.State.LAST_ACK, TCP.State.Closed -> {
zitiConn.close()
}
TCP.State.FIN_WAIT_1 -> {
zitiConn.shutdownOutput()
}
else -> {
}
}
TCP.State.FIN_WAIT_1 -> {
zitiConn.shutdownOutput()
}.onFailure { exc ->
Log.w(info, "failed to deliver date to Ziti side", exc)
tcpConn.close()?.let { it ->
out.add(it)
}
else -> {}
}

sendToPeer(out)
Expand All @@ -154,7 +161,7 @@ class ZitiTunnelConnection(val srcAddr: InetSocketAddress, val dstAddr: InetSock

override fun toString(): String = info

fun sendToPeer(packets: List<Packet>) {
suspend fun sendToPeer(packets: List<Packet>) {
for (p in packets) {
val ipPack = IpV4Packet.Builder()
.version(IpVersion.IPV4)
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ buildscript {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:4.1.0'
classpath 'com.android.tools.build:gradle:4.1.1'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"

// NOTE: Do not place your application dependencies here; they belong
Expand Down

0 comments on commit f3b428b

Please sign in to comment.