Skip to content

Commit

Permalink
Add optional pledgeinsize function to transcoding protocol (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Sep 13, 2024
1 parent 130a1ff commit d3d4197
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Base.position(::NoopStream)
```@docs
TranscodingStreams.Codec
TranscodingStreams.expectedsize
TranscodingStreams.pledgeinsize
TranscodingStreams.minoutsize
TranscodingStreams.initialize
TranscodingStreams.finalize
Expand Down
35 changes: 30 additions & 5 deletions src/codec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Transcoding proceeds by calling some functions in a specific way. We call this
There are six functions for a codec to implement:
- `expectedsize`: return the expected size of transcoded data
- `pledgeinsize`: tell the codec the total input size
- `minoutsize`: return the minimum output size of `process`
- `initialize`: initialize the codec
- `finalize`: finalize the codec
Expand All @@ -22,7 +23,7 @@ There are six functions for a codec to implement:
These are defined in the `TranscodingStreams` and a new codec type must extend
these methods if necessary. Implementing a `process` method is mandatory but
others are optional. `expectedsize`, `minoutsize`, `initialize`, `finalize`,
others are optional. `expectedsize`, `minoutsize`, `pledgeinsize`, `initialize`, `finalize`,
and `startproc` have a default implementation.
Your codec type is denoted by `C` and its object by `codec`.
Expand All @@ -39,6 +40,18 @@ used as a hint to determine the size of a data buffer when `transcode` is
called. A good hint will reduce the number of buffer resizing and hence result
in better performance.
### `pledgeinsize`
The `pledgeinsize(codec::C, insize::Int64, error::Error)::Symbol` method is used
when `transcode` is called to tell the `codec` the total input size.
This is called after `startproc` and before `process`. Some
compressors can add this total input size to a header, making `expectedsize`
accurate during later decompression. By default this just returns `:ok`.
If there is an error, the return code must be `:error` and the `error` argument
must be set to an exception object. Setting an inaccurate `insize` may cause the
codec to error later on while processing data. A negative `insize` means unknown
content size.
### `minoutsize`
The `minoutsize(codec::C, input::Memory)::Int` method takes `codec` and `input`,
Expand Down Expand Up @@ -71,10 +84,11 @@ the stream will become the close mode for safety.
### `startproc`
The `startproc(codec::C, mode::Symbol, error::Error)::Symbol` method takes
`codec`, `mode` and `error`, and returns a status code. This is called just
before the stream starts reading or writing data. `mode` is either `:read` or
`:write` and then the stream starts reading or writing, respectively. The
return code must be `:ok` if `codec` is ready to read or write data. Otherwise,
`codec`, `mode`, and `error`, and returns a status code. This resets the state
of the codec and is called before the stream starts processing data.
After a call to `startproc`, `pledgeinsize` can be optionally called.
`mode` is either `:read` or `:write`. The
return code must be `:ok` if `codec` is ready to process data. Otherwise,
it must be `:error` and the `error` argument must be set to an exception object.
### `process`
Expand Down Expand Up @@ -112,6 +126,17 @@ function expectedsize(codec::Codec, input::Memory)::Int
return input.size
end

"""
pledgeinsize(codec::Codec, insize::Int64, error::Error)::Symbol
Tell the codec the total input size.
The default method does nothing and returns `:ok`.
"""
function pledgeinsize(codec::Codec, insize::Int64, error::Error)::Symbol
return :ok
end

"""
minoutsize(codec::Codec, input::Memory)::Int
Expand Down
6 changes: 6 additions & 0 deletions src/transcode.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ function unsafe_transcode!(
if code === :error
@goto error
end
if pledgeinsize(codec, Int64(buffersize(input)), error) === :error
@goto error
end
n = GC.@preserve input minoutsize(codec, buffermem(input))
@label process
makemargin!(output, n)
Expand All @@ -168,6 +171,9 @@ function unsafe_transcode!(
if startproc(codec, :write, error) === :error
@goto error
end
if pledgeinsize(codec, Int64(buffersize(input)), error) === :error
@goto error
end
n = GC.@preserve input minoutsize(codec, buffermem(input))
@goto process
end
Expand Down
58 changes: 49 additions & 9 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ struct DoubleFrameEncoder <: TranscodingStreams.Codec
opened::Base.RefValue{Bool}
stopped::Base.RefValue{Bool}
got_stop_msg::Base.RefValue{Bool}
pledged_in_size::Base.RefValue{Int64}
in_size_count::Base.RefValue{Int64}
end

DoubleFrameEncoder() = DoubleFrameEncoder(Ref(false), Ref(false), Ref(false))
DoubleFrameEncoder() = DoubleFrameEncoder(Ref(false), Ref(false), Ref(false), Ref(Int64(-1)), Ref(Int64(0)))

function TranscodingStreams.process(
codec :: DoubleFrameEncoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error_ref :: TranscodingStreams.Error,
)
pledged = codec.pledged_in_size[]
if input.size == 0
codec.got_stop_msg[] = true
end
Expand All @@ -45,26 +48,59 @@ function TranscodingStreams.process(
return 0, 0, :error
elseif !codec.opened[]
output[1] = UInt8('[')
output[2] = UInt8(' ')
if pledged (0:9)
output[2] = UInt8('0'+pledged)
else
output[2] = UInt8(' ')
end
codec.opened[] = true
return 0, 2, :ok
elseif codec.got_stop_msg[]
# check in_size_count against pledged
if pledged (0:9)
if pledged > codec.in_size_count[]
error_ref[] = ErrorException("pledged in size was too big")
return 0, 0, :error
end
end
output[1] = UInt8(' ')
output[2] = UInt8(']')
codec.stopped[] = true
return 0, 2, :end
else
i = j = 0
# check input.size against pledged
if pledged (0:9)
if input.size > pledged || pledged - input.size < codec.in_size_count[]
error_ref[] = ErrorException("pledged in size was too small")
return 0, 0, :error
end
end
while i + 1 lastindex(input) && j + 2 lastindex(output)
b = input[i+1]
i += 1
output[j+1] = output[j+2] = b
j += 2
end
codec.in_size_count[] += i
return i, j, :ok
end
end

function TranscodingStreams.pledgeinsize(
codec::DoubleFrameEncoder,
insize::Int64,
error::Error,
)::Symbol
if codec.opened[]
error[] = ErrorException("pledgeinsize called after opening")
return :error
else
codec.pledged_in_size[] = insize
return :ok
end
end

function TranscodingStreams.expectedsize(
:: DoubleFrameEncoder,
input :: TranscodingStreams.Memory)
Expand All @@ -81,6 +117,8 @@ function TranscodingStreams.startproc(codec::DoubleFrameEncoder, ::Symbol, error
codec.opened[] = false
codec.got_stop_msg[] = false
codec.stopped[] = false
codec.pledged_in_size[] = -1
codec.in_size_count[] = 0
return :ok
end

Expand Down Expand Up @@ -149,7 +187,7 @@ function TranscodingStreams.process(
codec.a[] != UInt8('[') && error("expected [")
@label state2
do_read(codec.a) || return (codec.state[]=2; (Δin, Δout, :ok))
codec.a[] != UInt8(' ') && error("expected space")
codec.a[] (UInt8(' '), UInt8('0'):UInt8('9')...) && error("expected space or size")
while true
@label state3
do_read(codec.a) || return (codec.state[]=3; (Δin, Δout, :ok))
Expand Down Expand Up @@ -189,12 +227,14 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD


@testset "DoubleFrame Codecs" begin
@test transcode(DoubleFrameEncoder, b"") == b"[ ]"
@test transcode(DoubleFrameEncoder, b"a") == b"[ aa ]"
@test transcode(DoubleFrameEncoder, b"ab") == b"[ aabb ]"
@test transcode(DoubleFrameEncoder(), b"") == b"[ ]"
@test transcode(DoubleFrameEncoder(), b"a") == b"[ aa ]"
@test transcode(DoubleFrameEncoder(), b"ab") == b"[ aabb ]"
@test transcode(DoubleFrameEncoder, b"") == b"[0 ]"
@test transcode(DoubleFrameEncoder, b"a") == b"[1aa ]"
@test transcode(DoubleFrameEncoder, b"ab") == b"[2aabb ]"
@test transcode(DoubleFrameEncoder(), b"") == b"[0 ]"
@test transcode(DoubleFrameEncoder(), b"a") == b"[1aa ]"
@test transcode(DoubleFrameEncoder(), b"ab") == b"[2aabb ]"
@test transcode(DoubleFrameEncoder(), ones(UInt8,9)) == [b"[9"; ones(UInt8,18); b" ]";]
@test transcode(DoubleFrameEncoder(), ones(UInt8,10)) == [b"[ "; ones(UInt8,20); b" ]";]

@test_throws Exception transcode(DoubleFrameDecoder, b"")
@test_throws Exception transcode(DoubleFrameDecoder, b" [")
Expand Down

0 comments on commit d3d4197

Please sign in to comment.