Skip to content

Commit

Permalink
Add streaming write support
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitakuklev committed Oct 7, 2024
1 parent f5dfbe1 commit 11c57d7
Show file tree
Hide file tree
Showing 2 changed files with 525 additions and 39 deletions.
156 changes: 153 additions & 3 deletions pysdds/structures/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
"SDDSFile",
]

import copy
import io
import logging
import math
import sys
from typing import List, Optional, Literal, Dict
from pathlib import Path
from typing import List, Optional, Literal, Dict, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -757,12 +760,15 @@ class SDDSFile:
"n_pages",
)

def __init__(self):
def __init__(self, add_data_nm=False):
self.description: Optional[Description] = None
self.parameters: List[Parameter] = []
self.arrays: List[Array] = []
self.columns: List[Column] = []
self.data: Optional[Data] = None
if add_data_nm:
self.data: Optional[Data] = Data()
else:
self.data: Optional[Data] = None
self.associates: List[Associate] = []

self._mode: Literal["binary", "ascii"] = "binary"
Expand Down Expand Up @@ -829,6 +835,42 @@ def __eq__(self, other):
"""
self.compare(other, eps=None)

def copy(self, data=True, deep=True):
"""
Create a copy of the SDDSFile object
Parameters
----------
data : bool
If True, data is copied, otherwise data is empty
deep : bool
If True, a deep copy is made, otherwise a shallow copy is made
Returns
-------
SDDSFile
A copy of the current object
"""
if deep:
sdds2 = copy.deepcopy(self)
else:
sdds2 = copy.copy(self)

if not data:
for p in sdds2.parameters:
p.data = []
for a in sdds2.arrays:
a.data = []
for c in sdds2.columns:
c.data = []
sdds2.n_pages = 0

return sdds2

@property
def is_empty(self):
return self.n_pages == 0

def describe(self):
line = ""
line += (
Expand Down Expand Up @@ -987,6 +1029,7 @@ def mode(self, value):
self.set_mode(value)

# @columns.setter

# def columns(self, value):
# self._columns = value
# #self._columns_dict = {c.name: c for c in value} if value is not None else None
Expand Down Expand Up @@ -1023,6 +1066,64 @@ def set_endianness(self, endianness: Literal["big", "little"]):
if self.mode == "binary" and self.data is not None:
self.data.nm["endian"] = endianness

def add_parameter(self, name: str, type: str, symbol: str = None, units: str = None, description: str = None,
fixed_value: Union[int, float, str] = None, data: Optional[List[Union[int, float, str]]] = None):
# STRING name = NULL
# STRING symbol = NULL
# STRING units = NULL
# STRING description = NULL
# STRING type = NULL
# STRING fixed_value = NULL
nm = {'name': name, 'type': type}
if symbol is not None:
nm['symbol'] = symbol
if units is not None:
nm['units'] = units
if description is not None:
nm['description'] = description
if fixed_value is not None:
nm['fixed_value'] = fixed_value
par = Parameter(nm, self)
if data is not None:
par.data = data
self.parameters.append(par)
self.n_parameters += 1
return par

def add_column(self, name: str, type: str, symbol: str = None, units: str = None, description: str = None,
format_string: str = None, group_name: str = None, field_length: int = 0, dimensions: int = 1,
data: Optional[List[np.ndarray]] = None):
# STRING name = NULL
# STRING symbol = NULL
# STRING units = NULL
# STRING description = NULL
# STRING format_string = NULL
# STRING type = NULL
# STRING group_name = NULL
# long field_length = 0
# long dimensions = 1
nm = {'name': name, 'type': type}
if symbol is not None:
nm['symbol'] = symbol
if units is not None:
nm['units'] = units
if description is not None:
nm['description'] = description
if format_string is not None:
nm['format_string'] = format_string
if group_name is not None:
nm['group_name'] = group_name
if field_length != 0:
nm['field_length'] = field_length
if dimensions != 1:
nm['dimensions'] = dimensions
col = Column(nm, self)
if data is not None:
col.data = data
self.columns.append(col)
self.n_columns += 1
return col

@staticmethod
def from_df(
df_list: List[pd.DataFrame],
Expand Down Expand Up @@ -1142,6 +1243,9 @@ def columns_to_df(self, page: int = 0) -> pd.DataFrame:
df = pd.DataFrame(data=data, columns=column_names)
return df

def cdf(self, page: int = 0) -> pd.DataFrame:
return self.columns_to_df(page)

def parameters_to_df(self) -> pd.DataFrame:
"""
Retrieve parameters as a pandas dataframe. Indices correspond to pages, and column labels to parameter names.
Expand All @@ -1157,6 +1261,9 @@ def parameters_to_df(self) -> pd.DataFrame:
df.index.name = "Page"
return df

def pdf(self) -> pd.DataFrame:
return self.parameters_to_df()

def page_to_df(self, page=0) -> pd.DataFrame:
"""
Transforms parameters and columns from single page into a single dataframe, expanding parameter values to
Expand Down Expand Up @@ -1235,3 +1342,46 @@ def validate_data(self):
raise Exception(
f"dtype {v.dtype} does not match expected {expected_dtype}"
)

def to_sdds(self):
"""Convert SDDS object to a bytes object, ready for writing to file"""
from .. import write
buf = io.BytesIO()
write(self, buf)
return buf.getvalue()

def write(self,
filename: Union[Path, str, io.BytesIO],
compression: Optional[str] = None,
overwrite: Optional[bool] = False,
use_best_settings: bool = True
):
"""
Write SDDS object to file.
This is syntactic sugar for the pysdds.write method; same arguments apply.
"""
from .. import write
with open(filename, "wb") as f:
write(self, filepath=f, compression=compression, overwrite=overwrite, use_best_settings=use_best_settings)

def get_streaming_writer(self,
filename: Union[Path, str, io.BytesIO],
compression: Optional[str] = None,
overwrite: Optional[bool] = False,
use_best_settings: bool = True
):
"""
Create a write stream for incremental writing of rows. All header namelists must be fully
defined before calling this method, but data structures should be empty.
Returned object is a class with methods begin(), write_row(), next_page(), and end(); each call will immediately
write to file.
Note that in streaming mode column_major_order is not supported.
"""
if not self.is_empty:
raise ValueError("Cannot mix streaming with regular writes - data object must be empty")
from ..writers.writers import IncrementalWriter
streamer = IncrementalWriter(self, filename, compression, overwrite, use_best_settings)
return streamer
Loading

0 comments on commit 11c57d7

Please sign in to comment.