diff --git a/README.md b/README.md
index f492c57..efb665a 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@
Factor Values |
-(TSLogReturn 30 :close) |
+(LogReturn 30 :close) |
+ |
2019-12-27~2020-01-14.pq |
= |
@@ -73,7 +73,7 @@ For example, on a daily OHLC dataset, the 30 days log return on the column `clos
```python
from factor_expr import Factor
-Factor("(TSLogReturn 30 :close)")
+Factor("(LogReturn 30 :close)")
```
Note, in `Factor Expr`, column names are referred by the `:column-name` syntax.
@@ -87,7 +87,7 @@ from factor_expr import Factor, replay
result = await replay(
["data.pq"],
- [Factor("(TSLogReturn 30 :close)")]
+ [Factor("(LogReturn 30 :close)")]
)
```
@@ -99,7 +99,7 @@ In case of multiple datasets are passed in, the results will be concatenated wit
For example, the code above will give you a DataFrame looks similar to this:
-| index | (TSLogReturn 30 :close) |
+| index | (LogReturn 30 :close) |
| ----- | ----------------------- |
| 0 | 0.23 |
| ... | ... |
@@ -150,24 +150,24 @@ Any `` larger than 0 are treated as `true`.
All the window functions take a window size as the first argument. The computation will be done on the look-back window with the size given in ``.
-* Sum of the window elements: `(TSSum )`
-* Mean of the window elements: `(TSMean )`
-* Min of the window elements: `(TSMin )`
-* Max of the window elements: `(TSMax )`
-* The index of the min of the window elements: `(TSArgMin )`
-* The index of the max of the window elements: `(TSArgMax )`
-* Stdev of the window elements: `(TSStd )`
-* Skew of the window elements: `(TSSkew )`
-* The rank (ascending) of the current element in the window: `(TSRank )`
+* Sum of the window elements: `(Sum )`
+* Mean of the window elements: `(Mean )`
+* Min of the window elements: `(Min )`
+* Max of the window elements: `(Max )`
+* The index of the min of the window elements: `(ArgMin )`
+* The index of the max of the window elements: `(ArgMax )`
+* Stdev of the window elements: `(Std )`
+* Skew of the window elements: `(Skew )`
+* The rank (ascending) of the current element in the window: `(Rank )`
* The value `` ticks back: `(Delay )`
-* The log return of the value `` ticks back to current value: `(TSLogReturn )`
-* Rolling correlation between two series: `(TSCorrelation )`
-* Rolling quantile of a series: `(TSQuantile )`, e.g. `(TSQuantile 100 0.5 )` computes the median of a window sized 100.
+* The log return of the value `` ticks back to current value: `(LogReturn )`
+* Rolling correlation between two series: `(Correlation )`
+* Rolling quantile of a series: `(Quantile )`, e.g. `(Quantile 100 0.5 )` computes the median of a window sized 100.
#### Warm-up Period for Window Functions
Factors containing window functions require a warm-up period. For example, for
-`(TSSum 10 :close)`, it will not generate data until the 10th tick is replayed.
+`(Sum 10 :close)`, it will not generate data until the 10th tick is replayed.
In this case, `replay` will write `NaN` into the result during the warm-up period, until the factor starts to produce data.
This ensures the length of the factor output will be as same as the length of the input dataset. You can use the `trim`
parameter to let replay trim off the warm-up period before it returns.
@@ -194,7 +194,7 @@ pd.DataFrame({
result = await replay(
["data.pq"],
- [Factor("(TSLogReturn 30 :close)")],
+ [Factor("(LogReturn 30 :close)")],
index_col="time",
)
```
@@ -294,13 +294,11 @@ async def replay(
files: Iterable[str],
factors: List[Factor],
*,
- predicate: Optional[Factor] = None,
+ reset: bool = True,
batch_size: int = 40960,
n_data_jobs: int = 1,
n_factor_jobs: int = 1,
pbar: bool = True,
- trim: bool = False,
- index_col: Optional[str] = None,
verbose: bool = False,
output: Literal["pandas", "pyarrow", "raw"] = "pandas",
) -> Union[pd.DataFrame, pa.Table]:
@@ -309,12 +307,13 @@ async def replay(
Parameters
----------
- files: Iterable[str]
- Paths to the datasets. Currently, only parquet format is supported.
+ files: Iterable[str | pa.Table]
+ Paths to the datasets. Or already read pyarrow Tables.
factors: List[Factor]
- A list of Factors to replay on the given set of files.
- predicate: Optional[Factor] = None
- Use a predicate to pre-filter the replay result. Any value larger than 0 is treated as True.
+ A list of Factors to replay.
+ reset: bool = True
+ Whether to reset the factors. Factors carries memory about the data they already replayed. If you are calling
+ replay multiple times and the factors should not starting from fresh, set this to False.
batch_size: int = 40960
How many rows to replay at one time. Default is 40960 rows.
n_data_jobs: int = 1
@@ -324,14 +323,10 @@ async def replay(
e.g. if `n_data_jobs=3` and `n_factor_jobs=5`, you will have 3 * 5 threads running concurrently.
pbar: bool = True
Whether to show the progress bar using tqdm.
- trim: bool = False
- Whether to trim the warm up period off from the result.
- index_col: Optional[str] = None
- Set the index column.
verbose: bool = False
If True, failed factors will be printed out in stderr.
- output: Literal["pandas" | "pyarrow" | "raw"] = "pandas"
- The return format, can be pandas DataFrame ("pandas") or pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw").
+ output: Literal["pyarrow" | "raw"] = "pyarrow"
+ The return format, can be pyarrow Table ("pyarrow") or un-concatenated pyarrow Tables ("raw").
"""
```
diff --git a/native/src/lib.rs b/native/src/lib.rs
index 73945f9..ef30da7 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -21,6 +21,7 @@ fn _lib(py: Python, m: &PyModule) -> PyResult<()> {
)?;
m.add_class::()?;
m.add_function(wrap_pyfunction!(python::replay, m)?)?;
+ m.add_function(wrap_pyfunction!(python::replay_file, m)?)?;
Ok(())
}
diff --git a/native/src/ops/arithmetic.rs b/native/src/ops/arithmetic.rs
index 2e73d70..44b0d2e 100644
--- a/native/src/ops/arithmetic.rs
+++ b/native/src/ops/arithmetic.rs
@@ -30,6 +30,12 @@ macro_rules! impl_arithmetic_bivariate {
}
impl Operator for $op {
+ fn reset(&mut self) {
+ self.l.reset();
+ self.r.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (l, r) = (&mut self.l, &mut self.r);
@@ -188,6 +194,11 @@ macro_rules! impl_arithmetic_univariate {
}
impl Operator for $op {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
@@ -301,20 +312,20 @@ macro_rules! impl_arithmetic_univariate_1arg {
($([$name:tt => $op:ident: $($func:tt)+])+) => {
$(
pub struct $op {
- s: BoxOp,
+ inner: BoxOp,
p: f64,
i: usize,
}
impl Clone for $op {
fn clone(&self) -> Self {
- Self::new(self.p, self.s.clone())
+ Self::new(self.p, self.inner.clone())
}
}
impl $op {
- pub fn new(p: f64, s: BoxOp) -> Self {
- Self { p, s, i: 0 }
+ pub fn new(p: f64, inner: BoxOp) -> Self {
+ Self { p, inner, i: 0 }
}
}
@@ -323,16 +334,21 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
impl Operator for $op {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
- let vals = &*self.s.update(tb)?;
+ let vals = &*self.inner.update(tb)?;
#[cfg(feature = "check")]
assert_eq!(tb.len(), vals.len());
let mut results = Vec::with_capacity(tb.len());
for &val in vals {
- if self.i < self.s.ready_offset() {
+ if self.i < self.inner.ready_offset() {
#[cfg(feature = "check")]
assert!(val.is_nan());
results.push(f64::NAN);
@@ -348,19 +364,19 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
fn ready_offset(&self) -> usize {
- self.s.ready_offset()
+ self.inner.ready_offset()
}
fn to_string(&self) -> String {
- format!("({} {} {})", Self::NAME, self.p, self.s.to_string())
+ format!("({} {} {})", Self::NAME, self.p, self.inner.to_string())
}
fn depth(&self) -> usize {
- 1 + self.s.depth()
+ 1 + self.inner.depth()
}
fn len(&self) -> usize {
- self.s.len() + 1
+ self.inner.len() + 1
}
fn child_indices(&self) -> Vec {
@@ -368,7 +384,7 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
fn columns(&self) -> Vec {
- self.s.columns()
+ self.inner.columns()
}
#[throws(as Option)]
@@ -378,10 +394,10 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
let i = i - 1;
- let ns = self.s.len();
+ let ns = self.inner.len();
if i < ns {
- self.s.get(i)?
+ self.inner.get(i)?
} else {
throw!()
}
@@ -394,13 +410,13 @@ macro_rules! impl_arithmetic_univariate_1arg {
}
let i = i - 1;
- let ns = self.s.len();
+ let ns = self.inner.len();
if i < ns {
if i == 0 {
- return mem::replace(&mut self.s, op) as BoxOp;
+ return mem::replace(&mut self.inner, op) as BoxOp;
}
- self.s.insert(i, op)?
+ self.inner.insert(i, op)?
} else {
throw!()
}
diff --git a/native/src/ops/constant.rs b/native/src/ops/constant.rs
index 2e8ccdb..05c425a 100644
--- a/native/src/ops/constant.rs
+++ b/native/src/ops/constant.rs
@@ -5,6 +5,8 @@ use fehler::{throw, throws};
use std::borrow::Cow;
impl Operator for f64 {
+ fn reset(&mut self) {}
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
vec![*self; tb.len()].into()
diff --git a/native/src/ops/getter.rs b/native/src/ops/getter.rs
index 9e51393..f5710c3 100644
--- a/native/src/ops/getter.rs
+++ b/native/src/ops/getter.rs
@@ -24,6 +24,8 @@ impl Named for Getter {
}
impl Operator for Getter {
+ fn reset(&mut self) {}
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
if matches!(self.idx, None) {
diff --git a/native/src/ops/logic.rs b/native/src/ops/logic.rs
index e9715f8..c3e093b 100644
--- a/native/src/ops/logic.rs
+++ b/native/src/ops/logic.rs
@@ -34,6 +34,13 @@ impl Named for If {
}
impl Operator for If {
+ fn reset(&mut self) {
+ self.cond.reset();
+ self.btrue.reset();
+ self.bfalse.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let cond = &mut self.cond;
@@ -224,6 +231,12 @@ macro_rules! impl_logic_bivariate {
impl Operator for $op
{
+ fn reset(&mut self) {
+ self.l.reset();
+ self.r.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (l, r) = (&mut self.l, &mut self.r);
@@ -384,6 +397,11 @@ impl Named for Not {
}
impl Operator for Not {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/mod.rs b/native/src/ops/mod.rs
index b68e318..8e5c6e8 100644
--- a/native/src/ops/mod.rs
+++ b/native/src/ops/mod.rs
@@ -33,6 +33,7 @@ where
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]>;
fn ready_offset(&self) -> usize; // A.K.A. at offset the output of factor is first time not nan
fn to_string(&self) -> String;
+ fn reset(&mut self);
fn len(&self) -> usize;
fn depth(&self) -> usize;
diff --git a/native/src/ops/overlap_studies.rs b/native/src/ops/overlap_studies.rs
index 5986826..b3d8f94 100644
--- a/native/src/ops/overlap_studies.rs
+++ b/native/src/ops/overlap_studies.rs
@@ -23,14 +23,14 @@ impl Clone for SMA {
}
impl SMA {
- pub fn new(inner: BoxOp, n: usize) -> Self {
+ pub fn new(inner: BoxOp, win_size: usize) -> Self {
Self {
- window: VecDeque::with_capacity(n),
+ inner,
+ win_size,
+
+ window: VecDeque::with_capacity(win_size),
sum: 0.,
i: 0,
-
- inner,
- win_size: n,
}
}
}
@@ -40,6 +40,13 @@ impl Named for SMA {
}
impl Operator for SMA {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.sum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/correlation.rs b/native/src/ops/window/correlation.rs
index 3220c09..7ba58f2 100644
--- a/native/src/ops/window/correlation.rs
+++ b/native/src/ops/window/correlation.rs
@@ -4,31 +4,15 @@ use anyhow::{anyhow, Error, Result};
use fehler::{throw, throws};
use std::{borrow::Cow, cmp::max, collections::VecDeque, iter::FromIterator, mem};
-#[derive(Clone)]
-struct Cache {
- history: VecDeque<(f64, f64)>,
-
- x: f64,
- y: f64,
-}
-
-impl Cache {
- fn new() -> Cache {
- Cache {
- history: VecDeque::new(),
-
- x: 0.,
- y: 0.,
- }
- }
-}
-
pub struct Correlation {
win_size: usize,
x: BoxOp,
y: BoxOp,
- cache: Cache,
+ window: VecDeque<(f64, f64)>,
+
+ xsum: f64,
+ ysum: f64,
i: usize,
}
@@ -45,7 +29,9 @@ impl Correlation {
x,
y,
- cache: Cache::new(),
+ window: VecDeque::new(),
+ xsum: 0.,
+ ysum: 0.,
i: 0,
}
}
@@ -56,6 +42,15 @@ impl Named for Correlation {
}
impl Operator for Correlation {
+ fn reset(&mut self) {
+ self.x.reset();
+ self.y.reset();
+ self.window.clear();
+ self.xsum = 0.;
+ self.ysum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let (x, y) = (&mut self.x, &mut self.y);
@@ -77,30 +72,27 @@ impl Operator for Correlation {
continue;
}
- self.cache.history.push_back((xval, yval));
- self.cache.x += xval;
- self.cache.y += yval;
+ self.window.push_back((xval, yval));
+ self.xsum += xval;
+ self.ysum += yval;
- let val = if self.cache.history.len() == self.win_size {
- let n = self.cache.history.len() as f64; // this should be equal to self.win_size
- let xbar = self.cache.x / n;
- let ybar = self.cache.y / n;
+ let val = if self.window.len() == self.win_size {
+ let n = self.window.len() as f64; // this should be equal to self.win_size
+ let xbar = self.xsum / n;
+ let ybar = self.ysum / n;
let nom = self
- .cache
- .history
+ .window
.iter()
.map(|(x, y)| (x - xbar) * (y - ybar))
.sum::();
let denomx = self
- .cache
- .history
+ .window
.iter()
.map(|(x, _)| (x - xbar).powf(2.))
.sum::()
.sqrt();
let denomy = self
- .cache
- .history
+ .window
.iter()
.map(|(_, y)| (y - ybar).powf(2.))
.sum::()
@@ -113,9 +105,9 @@ impl Operator for Correlation {
} else {
self.fchecked(nom / denom)?
};
- let (xval, yval) = self.cache.history.pop_front().unwrap();
- self.cache.x -= xval;
- self.cache.y -= yval;
+ let (xval, yval) = self.window.pop_front().unwrap();
+ self.xsum -= xval;
+ self.ysum -= yval;
val
} else {
f64::NAN
diff --git a/native/src/ops/window/delay.rs b/native/src/ops/window/delay.rs
index 0b9643c..ee034fa 100644
--- a/native/src/ops/window/delay.rs
+++ b/native/src/ops/window/delay.rs
@@ -34,6 +34,12 @@ impl Named for Delay {
}
impl Operator for Delay {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/mean.rs b/native/src/ops/window/mean.rs
index a3a3330..c410e25 100644
--- a/native/src/ops/window/mean.rs
+++ b/native/src/ops/window/mean.rs
@@ -37,6 +37,13 @@ impl Named for Mean {
}
impl Operator for Mean {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.sum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/minmax.rs b/native/src/ops/window/minmax.rs
index 7f051d1..b27f470 100644
--- a/native/src/ops/window/minmax.rs
+++ b/native/src/ops/window/minmax.rs
@@ -4,21 +4,6 @@ use anyhow::{anyhow, Error, Result};
use fehler::{throw, throws};
use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem};
-#[derive(Clone)]
-struct Cache {
- history: VecDeque<(usize, f64)>,
- seq: usize,
-}
-
-impl Cache {
- fn new() -> Cache {
- Cache {
- history: VecDeque::new(),
- seq: 0,
- }
- }
-}
-
macro_rules! impl_minmax {
($($op:ident $cmp:tt {$($vfunc:tt)+})+) => {
$(
@@ -26,7 +11,8 @@ macro_rules! impl_minmax {
win_size: usize,
inner: BoxOp,
- cache: Cache,
+ window: VecDeque<(usize, f64)>,
+ seq: usize,
i: usize,
}
@@ -42,7 +28,8 @@ macro_rules! impl_minmax {
win_size,
inner,
- cache: Cache::new(),
+ window: VecDeque::new(),
+ seq: 0,
i: 0,
}
}
@@ -53,6 +40,13 @@ macro_rules! impl_minmax {
}
impl Operator for $op {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.seq = 0;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
@@ -70,28 +64,28 @@ macro_rules! impl_minmax {
continue;
}
- self.cache.seq += 1;
+ self.seq += 1;
- while let Some((seq_old, _)) = self.cache.history.front() {
- if seq_old + self.win_size <= self.cache.seq {
- self.cache.history.pop_front();
+ while let Some((seq_old, _)) = self.window.front() {
+ if seq_old + self.win_size <= self.seq {
+ self.window.pop_front();
} else {
break;
}
}
- while let Some((_, last_val)) = self.cache.history.back() {
+ while let Some((_, last_val)) = self.window.back() {
if val $cmp *last_val {
- self.cache.history.pop_back();
+ self.window.pop_back();
} else {
break;
}
}
- self.cache.history.push_back((self.cache.seq, val));
+ self.window.push_back((self.seq, val));
- let val = if self.cache.history.len() == self.win_size {
- let val = ($($vfunc)+) (&self.cache, self.win_size);
+ let val = if self.window.len() == self.win_size {
+ let val = ($($vfunc)+) (&self.window, self.seq, self.win_size);
val
} else {
f64::NAN
@@ -182,8 +176,8 @@ macro_rules! impl_minmax {
}
impl_minmax! {
- Min < { |cache: &Cache, _: usize| cache.history.front().unwrap().1 }
- Max > { |cache: &Cache, _: usize| cache.history.front().unwrap().1 }
- ArgMin < { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 }
- ArgMax > { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 }
+ Min < { |window: &VecDeque<(usize, f64)>, _: usize, _: usize| window.front().unwrap().1 }
+ Max > { |window: &VecDeque<(usize, f64)>, _: usize, _: usize| window.front().unwrap().1 }
+ ArgMin < { |window: &VecDeque<(usize, f64)>, seq: usize, win_size: usize| (window.front().unwrap().0 + win_size - seq - 1) as f64 }
+ ArgMax > { |window: &VecDeque<(usize, f64)>, seq: usize, win_size: usize| (window.front().unwrap().0 + win_size - seq - 1) as f64 }
}
diff --git a/native/src/ops/window/quantile.rs b/native/src/ops/window/quantile.rs
index bd694fa..b2a687e 100644
--- a/native/src/ops/window/quantile.rs
+++ b/native/src/ops/window/quantile.rs
@@ -33,6 +33,7 @@ impl Quantile {
inner,
quantile,
r: ((win_size - 1) as f64 * quantile).floor() as usize,
+
window: VecDeque::with_capacity(win_size),
ostree: OSTree::new(),
i: 0,
@@ -45,6 +46,13 @@ impl Named for Quantile {
}
impl Operator for Quantile {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.ostree.clear();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/rank.rs b/native/src/ops/window/rank.rs
index cdb4ee7..00471a9 100644
--- a/native/src/ops/window/rank.rs
+++ b/native/src/ops/window/rank.rs
@@ -41,6 +41,13 @@ impl Named for Rank {
}
impl Operator for Rank {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.ostree.clear();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/returns.rs b/native/src/ops/window/returns.rs
index b06ab74..c081be5 100644
--- a/native/src/ops/window/returns.rs
+++ b/native/src/ops/window/returns.rs
@@ -23,6 +23,7 @@ impl LogReturn {
Self {
win_size,
inner,
+
window: VecDeque::with_capacity(win_size + 1),
i: 0,
}
@@ -34,6 +35,12 @@ impl Named for LogReturn {
}
impl Operator for LogReturn {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/skew.rs b/native/src/ops/window/skew.rs
index e72c87f..7266a6e 100644
--- a/native/src/ops/window/skew.rs
+++ b/native/src/ops/window/skew.rs
@@ -37,6 +37,13 @@ impl Named for Skew {
}
impl Operator for Skew {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.sum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/stdev.rs b/native/src/ops/window/stdev.rs
index 0ea5826..726aef4 100644
--- a/native/src/ops/window/stdev.rs
+++ b/native/src/ops/window/stdev.rs
@@ -37,6 +37,13 @@ impl Named for Stdev {
}
impl Operator for Stdev {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.sum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/ops/window/sum.rs b/native/src/ops/window/sum.rs
index 4cde8d2..30db1cc 100644
--- a/native/src/ops/window/sum.rs
+++ b/native/src/ops/window/sum.rs
@@ -37,6 +37,13 @@ impl Named for Sum {
}
impl Operator for Sum {
+ fn reset(&mut self) {
+ self.inner.reset();
+ self.window.clear();
+ self.sum = 0.;
+ self.i = 0;
+ }
+
#[throws(Error)]
fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> {
let vals = &*self.inner.update(tb)?;
diff --git a/native/src/python.rs b/native/src/python.rs
index 05df01f..7859213 100644
--- a/native/src/python.rs
+++ b/native/src/python.rs
@@ -44,6 +44,10 @@ impl Factor {
self.op.ready_offset()
}
+ pub fn reset(&mut self) {
+ self.op.reset()
+ }
+
pub fn replace<'p>(&self, i: usize, other: PyRef<'p, Factor>) -> PyResult {
if i == 0 {
return Ok(Factor {
@@ -193,3 +197,42 @@ pub fn replay<'py>(
.collect(),
})
}
+
+#[pyfunction]
+pub fn replay_file<'py>(
+ py: Python<'py>,
+ file: &str,
+ mut ops: Vec>,
+ njobs: usize,
+) -> PyResult {
+ let mut ops: Vec<_> = ops.iter_mut().map(|f| f.borrow_mut(py)).collect();
+ let ops = ops
+ .iter_mut()
+ .map(|f| (&mut *f.op) as &mut dyn Operator)
+ .collect();
+
+ let (succeeded, failed) = py
+ .allow_threads(|| -> Result<_> {
+ let pool = rayon::ThreadPoolBuilder::new().num_threads(njobs).build()?;
+ Ok(pool.install(|| crate::replay::replay_file(file, ops, None))?)
+ })
+ .map_err(|e| PyValueError::new_err(format!("{}", e)))?;
+
+ Ok(ReplayResult {
+ succeeded: succeeded
+ .into_iter()
+ .map(|(k, v)| {
+ let data = v.into_data();
+ let (array, schema) = ffi::to_ffi(&data).unwrap();
+ let array = Box::into_raw(Box::new(array));
+ let schema = Box::into_raw(Box::new(schema));
+
+ (k, (array as usize, schema as usize))
+ })
+ .collect(),
+ failed: failed
+ .into_iter()
+ .map(|(k, v)| (k, format!("{}", v)))
+ .collect(),
+ })
+}
diff --git a/native/src/replay.rs b/native/src/replay.rs
index b6ced18..2d7dbd7 100644
--- a/native/src/replay.rs
+++ b/native/src/replay.rs
@@ -75,7 +75,7 @@ pub fn replay_file(
path: &str,
ops: Vec<&mut (dyn Operator)>,
batch_size: O,
-) -> (usize, HashMap, HashMap)
+) -> (HashMap, HashMap)
where
O: Into