Skip to content

Commit

Permalink
Add writer and writer_mut methods to SerializerState
Browse files Browse the repository at this point in the history
  • Loading branch information
Ten0 committed Apr 7, 2024
1 parent 3b88057 commit a2b3039
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
24 changes: 12 additions & 12 deletions serde_avro_fast/src/object_container_file_encoding/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl<'c, 's, W: Write> Writer<'c, 's, W> {
/// Serialize one value as an object in the object container file
pub fn serialize<T: Serialize>(&mut self, value: T) -> Result<(), SerError> {
self.flush_finished_block()?;
if self.inner.serializer_state.writer.len() >= self.inner.aprox_block_size as usize {
if self.inner.serializer_state.writer().len() >= self.inner.aprox_block_size as usize {
self.finish_block()?;
}
self.inner.serialize(value)?;
Expand Down Expand Up @@ -355,7 +355,7 @@ impl<'c, 's, W: Write> Writer<'c, 's, W> {
n_objects: u64,
) -> Result<(), SerError> {
self.flush_finished_block()?;
if self.inner.serializer_state.writer.len() >= self.inner.aprox_block_size as usize {
if self.inner.serializer_state.writer().len() >= self.inner.aprox_block_size as usize {
self.finish_block()?;
}
self.inner.push_serialized(serialized_objects, n_objects)?;
Expand Down Expand Up @@ -408,7 +408,7 @@ impl<'c, 's, W: Write> Writer<'c, 's, W> {
)
.map_err(SerError::io)?;
self.inner.block_header_size = None; // Mark that we have flushed
self.inner.serializer_state.writer.clear();
self.inner.serializer_state.writer_mut().clear();
}
}

Expand Down Expand Up @@ -500,18 +500,18 @@ struct WriterInner<'c, 's> {

impl<'c, 's> WriterInner<'c, 's> {
fn serialize<T: Serialize>(&mut self, value: T) -> Result<(), SerError> {
let buf_len_before_attempt = self.serializer_state.writer.len();
let buf_len_before_attempt = self.serializer_state.writer().len();
value
.serialize(self.serializer_state.serializer())
.map_err(|e| {
// If the flush is going wrong though there's nothing we can do
self.serializer_state
.writer
.writer_mut()
.truncate(buf_len_before_attempt);
e
})?;
self.n_elements_in_block += 1;
if self.serializer_state.writer.len() >= self.aprox_block_size as usize {
if self.serializer_state.writer().len() >= self.aprox_block_size as usize {
self.finish_block()?;
}
Ok(())
Expand All @@ -522,14 +522,14 @@ impl<'c, 's> WriterInner<'c, 's> {
serialized_objects: &[u8],
n_objects: u64,
) -> Result<(), SerError> {
let buf_len_before_attempt = self.serializer_state.writer.len();
let buf_len_before_attempt = self.serializer_state.writer().len();
self.serializer_state
.writer
.writer_mut()
.write_all(serialized_objects)
.map_err(|e| {
// If the flush is going wrong though there's nothing we can do
self.serializer_state
.writer
.writer_mut()
.truncate(buf_len_before_attempt);
SerError::io(e)
})?;
Expand All @@ -539,7 +539,7 @@ impl<'c, 's> WriterInner<'c, 's> {
.ok_or_else(|| {
SerError::new("Provided incorrect n_elements to write_serialized (too big)")
})?;
if self.serializer_state.writer.len() >= self.aprox_block_size as usize {
if self.serializer_state.writer().len() >= self.aprox_block_size as usize {
self.finish_block()?;
}
Ok(())
Expand All @@ -553,7 +553,7 @@ impl<'c, 's> WriterInner<'c, 's> {
);

self.compression_codec_state
.encode(self.serializer_state.writer.as_slice())?;
.encode(self.serializer_state.writer().as_slice())?;

let n = <i64 as integer_encoding::VarInt>::encode_var(
self.n_elements_in_block as i64,
Expand All @@ -577,7 +577,7 @@ impl<'c, 's> WriterInner<'c, 's> {
.compressed_buffer()
.unwrap_or_else(|| {
// No compression codec, use the serializer's buffer directly
self.serializer_state.writer.as_slice()
self.serializer_state.writer().as_slice()
})
}
}
18 changes: 17 additions & 1 deletion serde_avro_fast/src/ser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use {integer_encoding::VarIntWriter, serde::ser::*, std::io::Write};
/// Does not implement [`Serializer`] directly (use
/// [`.serializer`](Self::serializer) to obtain that).
pub struct SerializerState<'c, 's, W> {
pub(crate) writer: W,
writer: W,
/// Storing these here for reuse so that we can bypass the allocation,
/// and statistically obtain buffers that are already the proper length
/// (since we have used them for previous records)
Expand Down Expand Up @@ -205,6 +205,22 @@ impl<W> SerializerState<'_, '_, W> {
pub fn into_writer(self) -> W {
self.writer
}

/// Get writer by reference
///
/// This may be useful to observe the state of the inner buffer,
/// notably when re-using a `SerializerState` to write multiple objects.
pub fn writer(&self) -> &W {
&self.writer
}

/// Get writer by mutable reference
///
/// This may be useful to clear the inner buffer, when re-using a
/// `SerializerState`.
pub fn writer_mut(&mut self) -> &mut W {
&mut self.writer
}
}

/// Buffers used during serialization, for reuse across serializations
Expand Down

0 comments on commit a2b3039

Please sign in to comment.