diff --git a/Justfile b/Justfile index 70e7057..8ee9aa0 100644 --- a/Justfile +++ b/Justfile @@ -4,7 +4,7 @@ bootstrap-python: rm python/README.md build-extension: - cd native && cargo build --release + cd native && cargo build --release --features check ls native/target/release cd python && poetry run python ../scripts/python-helper.py copy-extension @@ -30,4 +30,4 @@ release: git checkout release git merge prerelease git push - git checkout master \ No newline at end of file + git checkout master diff --git a/native/Cargo.toml b/native/Cargo.toml index 9dad58c..42e02a2 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -1,5 +1,5 @@ [package] -authors = ["Weiyuan Wu "] +authors = ["Weiyuan Wu "] edition = "2018" name = "factor-expr" version = "0.2.3" @@ -8,6 +8,7 @@ version = "0.2.3" crate-type = ["rlib", "cdylib"] name = "factor_expr" + [dependencies] anyhow = "1" arrow = { version = "50", features = [ "ffi" ] } @@ -38,3 +39,4 @@ built = {version = "0.7", features = ["chrono"]} default = ["extension"] executable = ["pyo3/auto-initialize"] extension = ["pyo3/extension-module"] +check = [] diff --git a/native/rustfmt.toml b/native/rustfmt.toml new file mode 100644 index 0000000..2f99016 --- /dev/null +++ b/native/rustfmt.toml @@ -0,0 +1,2 @@ +imports_granularity = "Crate" +unstable_features = true diff --git a/native/src/float.rs b/native/src/float.rs index 21f75d4..1314629 100644 --- a/native/src/float.rs +++ b/native/src/float.rs @@ -1,8 +1,10 @@ /// copied from float_ord library use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; -use std::fmt::{Debug, Display}; -use std::hash::{Hash, Hasher}; -use std::marker::PhantomData; +use std::{ + fmt::{Debug, Display}, + hash::{Hash, Hasher}, + marker::PhantomData, +}; pub trait SortOrder { fn convert(f: f64) -> u64; @@ -104,11 +106,12 @@ impl Into> for f64 { #[cfg(test)] mod tests { use super::{Ascending, Descending, Float}; - use rand::distributions::Uniform; - use rand::{thread_rng, Rng}; - use std::collections::hash_map::DefaultHasher; - use std::f64::{INFINITY, NAN}; - use std::hash::{Hash, Hasher}; + use rand::{distributions::Uniform, thread_rng, Rng}; + use std::{ + collections::hash_map::DefaultHasher, + f64::{INFINITY, NAN}, + hash::{Hash, Hasher}, + }; #[test] fn test_ord() { diff --git a/native/src/ops/arithmetic.rs b/native/src/ops/arithmetic.rs index dd1306d..2e73d70 100644 --- a/native/src/ops/arithmetic.rs +++ b/native/src/ops/arithmetic.rs @@ -2,10 +2,7 @@ use super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::cmp::max; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, cmp::max, iter::FromIterator, mem}; macro_rules! impl_arithmetic_bivariate { ($([$name:tt => $op:ident: $($func:tt)+])+) => { @@ -38,13 +35,17 @@ macro_rules! impl_arithmetic_bivariate { let (l, r) = (&mut self.l, &mut self.r); let (ls, rs) = rayon::join(|| l.update(tb), || r.update(tb)); let (ls, rs) = (&*ls?, &*rs?); + #[cfg(feature = "check")] assert_eq!(tb.len(), ls.len()); + #[cfg(feature = "check")] assert_eq!(tb.len(), rs.len()); let mut results = Vec::with_capacity(tb.len()); for (&lval, &rval) in ls.into_iter().zip(rs) { if self.i < self.l.ready_offset() || self.i < self.r.ready_offset() { + #[cfg(feature = "check")] + assert!(lval.is_nan() || rval.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -190,12 +191,15 @@ macro_rules! impl_arithmetic_univariate { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -322,12 +326,15 @@ macro_rules! impl_arithmetic_univariate_1arg { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.s.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.s.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; diff --git a/native/src/ops/logic.rs b/native/src/ops/logic.rs index 07c0bdc..e9715f8 100644 --- a/native/src/ops/logic.rs +++ b/native/src/ops/logic.rs @@ -2,10 +2,7 @@ use super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::cmp::max; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, cmp::max, iter::FromIterator, mem}; // #[derive(Clone)] pub struct If { @@ -49,14 +46,19 @@ impl Operator for If { ); let (conds, btrues, bfalses) = (&*conds?, &*btrues?, &*bfalses?); + #[cfg(feature = "check")] assert_eq!(tb.len(), conds.len()); + #[cfg(feature = "check")] assert_eq!(tb.len(), btrues.len()); + #[cfg(feature = "check")] assert_eq!(tb.len(), bfalses.len()); let mut results = Vec::with_capacity(tb.len()); for ((&cond, &tval), &fval) in conds.into_iter().zip(btrues).zip(bfalses) { if self.i < self.ready_offset() { + #[cfg(feature = "check")] + assert!(cond.is_nan() || tval.is_nan() || fval.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -227,13 +229,17 @@ macro_rules! impl_logic_bivariate { let (l, r) = (&mut self.l, &mut self.r); let (ls, rs) = rayon::join(|| l.update(tb), || r.update(tb)); let (ls, rs) = (&*ls?, &*rs?); + #[cfg(feature = "check")] assert_eq!(tb.len(), ls.len()); + #[cfg(feature = "check")] assert_eq!(tb.len(), rs.len()); let mut results = Vec::with_capacity(tb.len()); for (&lval, &rval) in ls.into_iter().zip(rs) { if self.i < self.l.ready_offset() || self.i < self.r.ready_offset() { + #[cfg(feature = "check")] + assert!(lval.is_nan() || rval.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -381,12 +387,15 @@ impl Operator for Not { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; diff --git a/native/src/ops/overlap_studies.rs b/native/src/ops/overlap_studies.rs index a32c881..5986826 100644 --- a/native/src/ops/overlap_studies.rs +++ b/native/src/ops/overlap_studies.rs @@ -1,6 +1,4 @@ -use std::collections::VecDeque; -use std::mem; -use std::{borrow::Cow, iter::FromIterator}; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; @@ -45,12 +43,15 @@ impl Operator for SMA { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; diff --git a/native/src/ops/parser.rs b/native/src/ops/parser.rs index 34db90b..31cfdea 100644 --- a/native/src/ops/parser.rs +++ b/native/src/ops/parser.rs @@ -115,19 +115,19 @@ fn visit(sexpr: Cons) -> BoxOp { Not::::NAME => Result::>::from_iter(params)?.boxed(), // windows - TSSum::::NAME => Result::>::from_iter(params)?.boxed(), - TSMean::::NAME => Result::>::from_iter(params)?.boxed(), - TSCorrelation::::NAME => Result::>::from_iter(params)?.boxed(), - TSMin::::NAME => Result::>::from_iter(params)?.boxed(), - TSMax::::NAME => Result::>::from_iter(params)?.boxed(), - TSArgMin::::NAME => Result::>::from_iter(params)?.boxed(), - TSArgMax::::NAME => Result::>::from_iter(params)?.boxed(), - TSStdev::::NAME => Result::>::from_iter(params)?.boxed(), - TSSkew::::NAME => Result::>::from_iter(params)?.boxed(), + Sum::::NAME => Result::>::from_iter(params)?.boxed(), + Mean::::NAME => Result::>::from_iter(params)?.boxed(), + Correlation::::NAME => Result::>::from_iter(params)?.boxed(), + Min::::NAME => Result::>::from_iter(params)?.boxed(), + Max::::NAME => Result::>::from_iter(params)?.boxed(), + ArgMin::::NAME => Result::>::from_iter(params)?.boxed(), + ArgMax::::NAME => Result::>::from_iter(params)?.boxed(), + Stdev::::NAME => Result::>::from_iter(params)?.boxed(), + Skew::::NAME => Result::>::from_iter(params)?.boxed(), Delay::::NAME => Result::>::from_iter(params)?.boxed(), - TSRank::::NAME => Result::>::from_iter(params)?.boxed(), - TSQuantile::::NAME => Result::>::from_iter(params)?.boxed(), - TSLogReturn::::NAME => Result::>::from_iter(params)?.boxed(), + Rank::::NAME => Result::>::from_iter(params)?.boxed(), + Quantile::::NAME => Result::>::from_iter(params)?.boxed(), + LogReturn::::NAME => Result::>::from_iter(params)?.boxed(), // overla_studies SMA::::NAME => Result::>::from_iter(params)?.boxed(), diff --git a/native/src/ops/window/correlation.rs b/native/src/ops/window/correlation.rs index b345ea6..3220c09 100644 --- a/native/src/ops/window/correlation.rs +++ b/native/src/ops/window/correlation.rs @@ -2,11 +2,7 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::cmp::max; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, cmp::max, collections::VecDeque, iter::FromIterator, mem}; #[derive(Clone)] struct Cache { @@ -27,7 +23,7 @@ impl Cache { } } -pub struct TSCorrelation { +pub struct Correlation { win_size: usize, x: BoxOp, y: BoxOp, @@ -36,13 +32,13 @@ pub struct TSCorrelation { i: usize, } -impl Clone for TSCorrelation { +impl Clone for Correlation { fn clone(&self) -> Self { Self::new(self.win_size, self.x.clone(), self.y.clone()) } } -impl TSCorrelation { +impl Correlation { pub fn new(win_size: usize, x: BoxOp, y: BoxOp) -> Self { Self { win_size, @@ -55,23 +51,27 @@ impl TSCorrelation { } } -impl Named for TSCorrelation { - const NAME: &'static str = "TSCorr"; +impl Named for Correlation { + const NAME: &'static str = "Corr"; } -impl Operator for TSCorrelation { +impl Operator for Correlation { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let (x, y) = (&mut self.x, &mut self.y); let (xs, ys) = rayon::join(|| x.update(tb), || y.update(tb)); let (xs, ys) = (&*xs?, &*ys?); + #[cfg(feature = "check")] assert_eq!(tb.len(), xs.len()); + #[cfg(feature = "check")] assert_eq!(tb.len(), ys.len()); let mut results = Vec::with_capacity(tb.len()); for (&xval, &yval) in xs.into_iter().zip(ys) { if self.i < self.x.ready_offset() || self.i < self.y.ready_offset() { + #[cfg(feature = "check")] + assert!(xval.is_nan() || yval.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -206,14 +206,14 @@ impl Operator for TSCorrelation { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSCorrelation { + fn from_iter>>(iter: A) -> Correlation { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 3 { throw!(anyhow!( "{} expect a constant and two series, got {:?}", - TSCorrelation::::NAME, + Correlation::::NAME, params )) } @@ -221,10 +221,10 @@ impl FromIterator> for Result> { let k2 = params.remove(0).to_operator(); let k3 = params.remove(0).to_operator(); match (k1, k2, k3) { - (Parameter::Constant(c), Some(sx), Some(sy)) => TSCorrelation::new(c as usize, sx, sy), + (Parameter::Constant(c), Some(sx), Some(sy)) => Correlation::new(c as usize, sx, sy), _ => throw!(anyhow!( "{} expect a constant and two series", - TSCorrelation::::NAME, + Correlation::::NAME, )), } } diff --git a/native/src/ops/window/delay.rs b/native/src/ops/window/delay.rs index 75ce3bb..0b9643c 100644 --- a/native/src/ops/window/delay.rs +++ b/native/src/ops/window/delay.rs @@ -2,10 +2,7 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; pub struct Delay { win_size: usize, @@ -40,12 +37,15 @@ impl Operator for Delay { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; diff --git a/native/src/ops/window/mean.rs b/native/src/ops/window/mean.rs index d904cbb..a3a3330 100644 --- a/native/src/ops/window/mean.rs +++ b/native/src/ops/window/mean.rs @@ -2,12 +2,9 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSMean { +pub struct Mean { win_size: usize, inner: BoxOp, @@ -16,13 +13,13 @@ pub struct TSMean { i: usize, } -impl Clone for TSMean { +impl Clone for Mean { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSMean { +impl Mean { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -35,20 +32,23 @@ impl TSMean { } } -impl Named for TSMean { - const NAME: &'static str = "TSMean"; +impl Named for Mean { + const NAME: &'static str = "Mean"; } -impl Operator for TSMean { +impl Operator for Mean { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -134,26 +134,26 @@ impl Operator for TSMean { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSMean { + fn from_iter>>(iter: A) -> Mean { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( "{} expect a constant and a series, got {:?}", - TSMean::::NAME, + Mean::::NAME, params )) } let k1 = params.remove(0); let k2 = params.remove(0); match (k1, k2) { - (Parameter::Constant(c), Parameter::Operator(sub)) => TSMean::new(c as usize, sub), + (Parameter::Constant(c), Parameter::Operator(sub)) => Mean::new(c as usize, sub), (a, b) => throw!(anyhow!( "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSMean::::NAME, + name = Mean::::NAME, )), } } diff --git a/native/src/ops/window/minmax.rs b/native/src/ops/window/minmax.rs index 5b771c2..7f051d1 100644 --- a/native/src/ops/window/minmax.rs +++ b/native/src/ops/window/minmax.rs @@ -2,10 +2,7 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; #[derive(Clone)] struct Cache { @@ -59,12 +56,15 @@ macro_rules! impl_minmax { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -182,8 +182,8 @@ macro_rules! impl_minmax { } impl_minmax! { - TSMin < { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } - TSMax > { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } - TSArgMin < { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } - TSArgMax > { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } + Min < { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } + Max > { |cache: &Cache, _: usize| cache.history.front().unwrap().1 } + ArgMin < { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } + ArgMax > { |cache: &Cache, win_size: usize| (cache.history.front().unwrap().0 + win_size - cache.seq - 1) as f64 } } diff --git a/native/src/ops/window/mod.rs b/native/src/ops/window/mod.rs index 1d2183b..5a1a0c2 100644 --- a/native/src/ops/window/mod.rs +++ b/native/src/ops/window/mod.rs @@ -9,13 +9,13 @@ mod skew; mod stdev; mod sum; -pub use correlation::TSCorrelation; +pub use correlation::Correlation; pub use delay::Delay; -pub use mean::TSMean; -pub use minmax::{TSArgMax, TSArgMin, TSMax, TSMin}; -pub use quantile::TSQuantile; -pub use rank::TSRank; -pub use returns::*; -pub use skew::TSSkew; -pub use stdev::TSStdev; -pub use sum::TSSum; +pub use mean::Mean; +pub use minmax::{ArgMax, ArgMin, Max, Min}; +pub use quantile::Quantile; +pub use rank::Rank; +pub use returns::LogReturn; +pub use skew::Skew; +pub use stdev::Stdev; +pub use sum::Sum; diff --git a/native/src/ops/window/quantile.rs b/native/src/ops/window/quantile.rs index 6f60b9c..bd694fa 100644 --- a/native/src/ops/window/quantile.rs +++ b/native/src/ops/window/quantile.rs @@ -1,15 +1,14 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; -use crate::float::{Ascending, Float, IntoFloat}; -use crate::ticker_batch::TickerBatch; +use crate::{ + float::{Ascending, Float, IntoFloat}, + ticker_batch::TickerBatch, +}; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; use order_stats_tree::OSTree; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSQuantile { +pub struct Quantile { win_size: usize, quantile: f64, r: usize, // win_size * quantile @@ -20,13 +19,13 @@ pub struct TSQuantile { i: usize, } -impl Clone for TSQuantile { +impl Clone for Quantile { fn clone(&self) -> Self { Self::new(self.win_size, self.quantile, self.inner.clone()) } } -impl TSQuantile { +impl Quantile { pub fn new(win_size: usize, quantile: f64, inner: BoxOp) -> Self { assert!(0. <= quantile && quantile <= 1.); Self { @@ -41,20 +40,23 @@ impl TSQuantile { } } -impl Named for TSQuantile { - const NAME: &'static str = "TSQuantile"; +impl Named for Quantile { + const NAME: &'static str = "Quantile"; } -impl Operator for TSQuantile { +impl Operator for Quantile { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -145,14 +147,14 @@ impl Operator for TSQuantile { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSQuantile { + fn from_iter>>(iter: A) -> Quantile { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 3 { throw!(anyhow!( "{} expect two constants and one series, got {:?}", - TSQuantile::::NAME, + Quantile::::NAME, params )) } @@ -161,14 +163,14 @@ impl FromIterator> for Result> { let k3 = params.remove(0); match (k1, k2, k3) { (Parameter::Constant(c), Parameter::Constant(c2), Parameter::Operator(s)) => { - TSQuantile::new(c as usize, c2, s) + Quantile::new(c as usize, c2, s) } (a, b, c) => throw!(anyhow!( "{name} expect two constants and a series, got ({name} {} {} {})", a, b, c, - name = TSQuantile::::NAME, + name = Quantile::::NAME, )), } } diff --git a/native/src/ops/window/rank.rs b/native/src/ops/window/rank.rs index b7adc06..cdb4ee7 100644 --- a/native/src/ops/window/rank.rs +++ b/native/src/ops/window/rank.rs @@ -1,15 +1,14 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; -use crate::float::{Ascending, Float, IntoFloat}; -use crate::ticker_batch::TickerBatch; +use crate::{ + float::{Ascending, Float, IntoFloat}, + ticker_batch::TickerBatch, +}; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; use order_stats_tree::OSTree; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSRank { +pub struct Rank { win_size: usize, inner: BoxOp, @@ -18,13 +17,13 @@ pub struct TSRank { i: usize, } -impl Clone for TSRank { +impl Clone for Rank { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSRank { +impl Rank { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -37,20 +36,23 @@ impl TSRank { } } -impl Named for TSRank { - const NAME: &'static str = "TSRank"; +impl Named for Rank { + const NAME: &'static str = "Rank"; } -impl Operator for TSRank { +impl Operator for Rank { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -140,26 +142,26 @@ impl Operator for TSRank { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSRank { + fn from_iter>>(iter: A) -> Rank { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( "{} expect a constant and one series, got {:?}", - TSRank::::NAME, + Rank::::NAME, params )) } let k1 = params.remove(0); let k2 = params.remove(0); match (k1, k2) { - (Parameter::Constant(c), Parameter::Operator(s)) => TSRank::new(c as usize, s), + (Parameter::Constant(c), Parameter::Operator(s)) => Rank::new(c as usize, s), (a, b) => throw!(anyhow!( "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSRank::::NAME, + name = Rank::::NAME, )), } } diff --git a/native/src/ops/window/returns.rs b/native/src/ops/window/returns.rs index 9ae6df7..b06ab74 100644 --- a/native/src/ops/window/returns.rs +++ b/native/src/ops/window/returns.rs @@ -2,12 +2,9 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSLogReturn { +pub struct LogReturn { win_size: usize, inner: BoxOp, @@ -15,13 +12,13 @@ pub struct TSLogReturn { i: usize, } -impl Clone for TSLogReturn { +impl Clone for LogReturn { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSLogReturn { +impl LogReturn { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -32,20 +29,22 @@ impl TSLogReturn { } } -impl Named for TSLogReturn { - const NAME: &'static str = "TSLogReturn"; +impl Named for LogReturn { + const NAME: &'static str = "LogReturn"; } -impl Operator for TSLogReturn { +impl Operator for LogReturn { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); - let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -130,26 +129,26 @@ impl Operator for TSLogReturn { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSLogReturn { + fn from_iter>>(iter: A) -> LogReturn { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( "{} expect a constant and a series, got {:?}", - TSLogReturn::::NAME, + LogReturn::::NAME, params )) } let k1 = params.remove(0); let k2 = params.remove(0); match (k1, k2) { - (Parameter::Constant(c), Parameter::Operator(s)) => TSLogReturn::new(c as usize, s), + (Parameter::Constant(c), Parameter::Operator(s)) => LogReturn::new(c as usize, s), (a, b) => throw!(anyhow!( "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSLogReturn::::NAME, + name = LogReturn::::NAME, )), } } diff --git a/native/src/ops/window/skew.rs b/native/src/ops/window/skew.rs index 7ba7e26..e72c87f 100644 --- a/native/src/ops/window/skew.rs +++ b/native/src/ops/window/skew.rs @@ -2,11 +2,9 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::mem; -use std::{collections::VecDeque, iter::FromIterator}; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSSkew { +pub struct Skew { win_size: usize, inner: BoxOp, @@ -15,13 +13,13 @@ pub struct TSSkew { i: usize, } -impl Clone for TSSkew { +impl Clone for Skew { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSSkew { +impl Skew { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -34,20 +32,23 @@ impl TSSkew { } } -impl Named for TSSkew { - const NAME: &'static str = "TSSkew"; +impl Named for Skew { + const NAME: &'static str = "Skew"; } -impl Operator for TSSkew { +impl Operator for Skew { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -152,9 +153,9 @@ impl Operator for TSSkew { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSSkew { + fn from_iter>>(iter: A) -> Skew { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( @@ -166,13 +167,11 @@ impl FromIterator> for Result> { let k1 = params.remove(0); let k2 = params.remove(0); match (k1, k2) { - (Parameter::Constant(c), Parameter::Operator(s)) if c >= 3. => { - TSSkew::new(c as usize, s) - } + (Parameter::Constant(c), Parameter::Operator(s)) if c >= 3. => Skew::new(c as usize, s), (Parameter::Constant(c), Parameter::Operator(_)) if c < 3. => { throw!(anyhow!( "{} for requires constant larger than 2, got {}", - TSSkew::::NAME, + Skew::::NAME, c )) } @@ -180,7 +179,7 @@ impl FromIterator> for Result> { "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSSkew::::NAME, + name = Skew::::NAME, )), } } diff --git a/native/src/ops/window/stdev.rs b/native/src/ops/window/stdev.rs index 508955c..0ea5826 100644 --- a/native/src/ops/window/stdev.rs +++ b/native/src/ops/window/stdev.rs @@ -2,11 +2,9 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::mem; -use std::{collections::VecDeque, iter::FromIterator}; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSStdev { +pub struct Stdev { win_size: usize, inner: BoxOp, @@ -15,13 +13,13 @@ pub struct TSStdev { i: usize, } -impl Clone for TSStdev { +impl Clone for Stdev { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSStdev { +impl Stdev { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -34,21 +32,24 @@ impl TSStdev { } } -impl Named for TSStdev { - const NAME: &'static str = "TSStd"; +impl Named for Stdev { + const NAME: &'static str = "Std"; } -impl Operator for TSStdev { +impl Operator for Stdev { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -143,9 +144,9 @@ impl Operator for TSStdev { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSStdev { + fn from_iter>>(iter: A) -> Stdev { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( @@ -161,16 +162,16 @@ impl FromIterator> for Result> { if c <= 1. { throw!(anyhow!( "win size for {} should larger than 1", - TSStdev::::NAME + Stdev::::NAME )) } - TSStdev::new(c as usize, s) + Stdev::new(c as usize, s) } (a, b) => throw!(anyhow!( "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSStdev::::NAME, + name = Stdev::::NAME, )), } } diff --git a/native/src/ops/window/sum.rs b/native/src/ops/window/sum.rs index 5c981a9..4cde8d2 100644 --- a/native/src/ops/window/sum.rs +++ b/native/src/ops/window/sum.rs @@ -2,12 +2,9 @@ use super::super::{parser::Parameter, BoxOp, Named, Operator}; use crate::ticker_batch::TickerBatch; use anyhow::{anyhow, Error, Result}; use fehler::{throw, throws}; -use std::borrow::Cow; -use std::collections::VecDeque; -use std::iter::FromIterator; -use std::mem; +use std::{borrow::Cow, collections::VecDeque, iter::FromIterator, mem}; -pub struct TSSum { +pub struct Sum { win_size: usize, inner: BoxOp, @@ -16,13 +13,13 @@ pub struct TSSum { i: usize, } -impl Clone for TSSum { +impl Clone for Sum { fn clone(&self) -> Self { Self::new(self.win_size, self.inner.clone()) } } -impl TSSum { +impl Sum { pub fn new(win_size: usize, inner: BoxOp) -> Self { Self { win_size, @@ -35,21 +32,24 @@ impl TSSum { } } -impl Named for TSSum { - const NAME: &'static str = "TSSum"; +impl Named for Sum { + const NAME: &'static str = "Sum"; } -impl Operator for TSSum { +impl Operator for Sum { #[throws(Error)] fn update<'a>(&mut self, tb: &'a T) -> Cow<'a, [f64]> { let vals = &*self.inner.update(tb)?; + #[cfg(feature = "check")] assert_eq!(tb.len(), vals.len()); let mut results = Vec::with_capacity(tb.len()); for &val in vals { if self.i < self.inner.ready_offset() { + #[cfg(feature = "check")] + assert!(val.is_nan()); results.push(f64::NAN); self.i += 1; continue; @@ -138,26 +138,26 @@ impl Operator for TSSum { } } -impl FromIterator> for Result> { +impl FromIterator> for Result> { #[throws(Error)] - fn from_iter>>(iter: A) -> TSSum { + fn from_iter>>(iter: A) -> Sum { let mut params: Vec<_> = iter.into_iter().collect(); if params.len() != 2 { throw!(anyhow!( "{} expect a constant and a series, got {:?}", - TSSum::::NAME, + Sum::::NAME, params )) } let k1 = params.remove(0); let k2 = params.remove(0); match (k1, k2) { - (Parameter::Constant(c), Parameter::Operator(sub)) => TSSum::new(c as usize, sub), + (Parameter::Constant(c), Parameter::Operator(sub)) => Sum::new(c as usize, sub), (a, b) => throw!(anyhow!( "{name} expect a constant and a series, got ({name} {} {})", a, b, - name = TSSum::::NAME, + name = Sum::::NAME, )), } } diff --git a/native/src/python.rs b/native/src/python.rs index a6ee0e0..05df01f 100644 --- a/native/src/python.rs +++ b/native/src/python.rs @@ -1,20 +1,21 @@ use super::ops::{from_str, Operator}; use anyhow::Result; -use arrow::array::{make_array, Array}; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::ffi::{self, FFI_ArrowArray, FFI_ArrowSchema}; -use arrow::record_batch::RecordBatch; +use arrow::{ + array::{make_array, Array}, + datatypes::{DataType, Field, Schema}, + ffi::{self, FFI_ArrowArray, FFI_ArrowSchema}, + record_batch::RecordBatch, +}; use dict_derive::IntoPyObject; use fehler::throw; -use pyo3::class::basic::CompareOp; -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use std::borrow::Cow; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; +use pyo3::{class::basic::CompareOp, exceptions::PyValueError, prelude::*}; +use std::{ + borrow::Cow, + collections::{hash_map::DefaultHasher, HashMap}, + convert::TryFrom, + hash::{Hash, Hasher}, + sync::Arc, +}; // *mut FFI_ArrowArray, *mut FFI_ArrowSchema type ArrowFFIPtr = (usize, usize); diff --git a/native/src/replay.rs b/native/src/replay.rs index 08ad671..b6ced18 100644 --- a/native/src/replay.rs +++ b/native/src/replay.rs @@ -5,11 +5,12 @@ use arrow::{ record_batch::RecordBatch, }; use fehler::throws; -use parquet::file::reader::SerializedFileReader; -use parquet::{arrow::arrow_reader::ParquetRecordBatchReader, file::reader::FileReader}; +use parquet::{ + arrow::arrow_reader::ParquetRecordBatchReader, + file::reader::{FileReader, SerializedFileReader}, +}; use rayon::prelude::*; -use std::fs::File; -use std::{borrow::Cow, collections::HashMap}; +use std::{borrow::Cow, collections::HashMap, fs::File}; static DEFAULT_BATCH_SIZE: usize = 2048; diff --git a/native/src/ticker_batch.rs b/native/src/ticker_batch.rs index 6ac88a4..0c9573f 100644 --- a/native/src/ticker_batch.rs +++ b/native/src/ticker_batch.rs @@ -1,5 +1,7 @@ -use arrow::array::{as_primitive_array, Float64Array}; -use arrow::record_batch::RecordBatch; +use arrow::{ + array::{as_primitive_array, Float64Array}, + record_batch::RecordBatch, +}; use std::collections::HashMap; // Tickers should be sync because we will do parallel replay