Skip to content

Commit

Permalink
Fix #1
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabben committed Mar 1, 2023
1 parent 81a3799 commit d4fce28
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# i2ptransport [![nim-version-img]][nim-version]

[nim-version]: https://nim-lang.org/blog/2020/04/03/version-120-released.html
[nim-version-img]: https://img.shields.io/badge/Nim_-v1.2.0%2B-blue
[nim-version]: https://nim-lang.org/blog/2021/10/19/version-160-released.html
[nim-version-img]: https://img.shields.io/badge/Nim_-v1.6.0%2B-blue

**I2P Transport for** [nim-libp2p](https://github.com/status-im/nim-libp2p)

Expand Down
4 changes: 2 additions & 2 deletions i2ptransport.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "0.1.2"
version = "0.1.3"
author = "Gabben"
description = "I2P Transport for libp2p"
license = "MIT"
Expand All @@ -9,6 +9,6 @@ skipDirs = @["tests", "examples"]

# Dependencies

requires "nim >= 1.2.0",
requires "nim >= 1.6.0",
"libp2p",
"https://github.com/gabbhack/sam_protocol >= 0.1.1"
67 changes: 59 additions & 8 deletions src/i2ptransport.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
{.push raises: [].}

import std/[
strformat,
options,
importutils,
sequtils
]

import std/[strformat, options]
import chronos, chronicles, strutils
import stew/[
byteutils,
Expand All @@ -26,6 +29,7 @@ import libp2p/transports/[
tcptransport
]
import libp2p/upgrademngrs/upgrade

import sam_protocol as sam


Expand Down Expand Up @@ -73,6 +77,10 @@ proc createControlConnection(self: I2PTransport) {.async, gcsafe.}
proc parseI2P(address: MultiAddress): string {.gcsafe.}
proc connectStream(transport: StreamTransport, address: MultiAddress, settings: I2PSessionSettings) {.async, gcsafe.}
proc acceptStream(transport: StreamTransport, settings: I2PSessionSettings) {.async, gcsafe.}
proc connHandler*(self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
dir: Direction, timeout: Duration): Future[Connection] {.async.}


proc init*(
Expand Down Expand Up @@ -232,9 +240,7 @@ proc createControlConnection(self: I2PTransport) {.async, gcsafe.} =
await transport.closeWait()
raise newException(I2PError, fmt"Unsuccessful control session create for nickname `{settings.nickname}`: {answer.session}")

let connection = await self.tcpTransport.connHandler(transport, Opt.none(MultiAddress), Direction.Out)
# The control connection must not disconnect by timeout
connection.timeout = InfiniteDuration
discard await self.tcpTransport.connHandler(transport, Opt.none(MultiAddress), Direction.Out, timeout = ZeroDuration)

proc generateDestination*(
samAddress = initTAddress("127.0.0.1:7656"),
Expand Down Expand Up @@ -323,3 +329,48 @@ proc handlesStart(address: MultiAddress): bool {.gcsafe.} =

proc parseI2P(address: MultiAddress): string {.gcsafe.} =
string.fromBytes(address[multiCodec("dns")].get().protoArgument().get())

proc connHandler*(self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
dir: Direction, timeout: Duration): Future[Connection] {.async.} =
privateAccess(TcpTransport)

trace "Handling tcp connection", address = $observedAddr,
dir = $dir,
clients = self.clients[Direction.In].len +
self.clients[Direction.Out].len

let conn = Connection(
ChronosStream.init(
client = client,
dir = dir,
observedAddr = observedAddr,
timeout = timeout
))

proc onClose() {.async.} =
try:
let futs = @[client.join(), conn.join()]
await futs[0] or futs[1]
for f in futs:
if not f.finished: await f.cancelAndWait() # cancel outstanding join()

trace "Cleaning up client", addrs = $client.remoteAddress,
conn

self.clients[dir].keepItIf( it != client )
await allFuturesThrowing(
conn.close(), client.closeWait())

trace "Cleaned up client", addrs = $client.remoteAddress,
conn

except CatchableError as exc:
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn

self.clients[dir].add(client)
asyncSpawn onClose()

return conn

0 comments on commit d4fce28

Please sign in to comment.