Skip to content

Commit

Permalink
Return first version
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Dec 19, 2023
1 parent d823f67 commit 219b204
Showing 1 changed file with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,42 @@ class JsonRpcStreamParser(
}

fun streamParse(statusCode: Int, response: Flux<ByteArray>): Mono<out Response> {
val hotResponse = response.publish().autoConnect()
val firstPartSize = AtomicInteger()

val firstAccumulate = hotResponse.bufferUntil {
log.info("Bytes size {}", it.size)
firstPartSize.addAndGet(it.size)
firstPartSize.get() > firstChunkMaxSize
}.next().map { it.reduce { acc, bytes -> acc.plus(bytes) } }

return firstAccumulate.flatMap { firstBytes ->
if (firstBytes.size == 97) {
log.info("97 bytes {}", String(firstBytes))
return response.bufferUntil {
if (firstPartSize.get() > firstChunkMaxSize) {
true
} else {
firstPartSize.addAndGet(it.size)
firstPartSize.get() > firstChunkMaxSize // accumulate bytes until chunk is full
}
if (statusCode != 200) {
aggregateResponse(Flux.concat(Mono.just(firstBytes), hotResponse), statusCode)
}.map {
it.reduce { acc, bytes -> acc.plus(bytes) }
}.switchOnFirst({ first, responseStream ->
if (first.get() == null || statusCode == 200) {
aggregateResponse(responseStream, statusCode)
} else {
val whatCount = AtomicReference<Count>()
val endStream = AtomicBoolean(false)

val firstBytes = first.get()!!

val firstPart: SingleResponse? = parseFirstPart(firstBytes, endStream, whatCount)

if (firstPart == null) {
aggregateResponse(Flux.concat(Mono.just(firstBytes), hotResponse), statusCode)
aggregateResponse(responseStream, statusCode)
} else {
processSingleResponse(firstPart, hotResponse, endStream, whatCount)
processSingleResponse(firstPart, responseStream, endStream, whatCount)
}
}
}
}, false,)
.single()
.onErrorResume {
log.error(it.message)
Mono.just(
SingleResponse(
null,
JsonRpcError(RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE, it.message ?: "Internal error"),
),
)
}.doOnNext {
log.info("Resp type {}", it.javaClass)
}
}

Expand Down Expand Up @@ -117,7 +115,7 @@ class JsonRpcStreamParser(
): Flux<Chunk> {
return Flux.concat(
Mono.just(Chunk(firstBytes, false)),
responseStream
responseStream.skip(1)
.filter { !endStream.get() }
.map { bytes ->
val whatCountValue = whatCount.get()
Expand Down Expand Up @@ -321,4 +319,4 @@ class JsonRpcStreamParser(

fun hasSlash() = count.get() == 2
}
}
}

0 comments on commit 219b204

Please sign in to comment.