Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
SweatOfRa committed Mar 15, 2020
0 parents commit 88131c0
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 0 deletions.
4 changes: 4 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dataprocessor.data_processor import DataProcessor
from dataprocessor.feed_filter import TimeFreqFilter
from dataprocessor.feed_filter import TimeIndexing
from dataprocessor.constants import TimePeriod
19 changes: 19 additions & 0 deletions constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from enum import Enum

class TimePeriod(Enum):
# https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases
DAY = 'D'
BUSINESS_DAY = 'B'
WEEK = 'W'
MONTH_END = 'M'
BUSINESS_MONTH_END = 'BM'
SEMI_MONTH_END = 'SM'
QUARTER = 'Q'
HOUR = 'H'
BUSINESS_HOUR = 'BH'
MINUTE = 'T'
SECOND = 'S'
MILLISECOND = 'L'
MICROSECOND = 'U'
CONTINUOUS = ''

103 changes: 103 additions & 0 deletions data_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from utils import apply_func, summarize
from feed_filter import TimeFreqFilter, FilterInterface
from functools import partial

import pandas as pd
import functools


class DataProcessor(object):
RETURN_INDEX = 'ret'
ADV_INDEX = 'adv'

def __init__(self, *args):
if isinstance(args[0], pd.DataFrame):
self._data = args[0].sort_index()
else:
raise ValueError(f'Unable to interpret DataProcessor arguments: {str(args)}')
pass

def __getattr__(self, item):
return getattr(self._data, item, None)

def __call__(self, func, *args, **kwargs):
if isinstance(func, FilterInterface):
return DataProcessor(self._data.loc[func.apply(self._data)])
else:
ret_value = apply_func(self._data, func, *args, **kwargs)
if not isinstance(ret_value, type(self._data)):
raise TypeError(
f'Call to DataProcessor should return type {type(self._data)} but returned {type(ret_value)}')
return DataProcessor(ret_value)

def __getitem__(self, tuple_of_arguments):
filter_applied = tuple_of_arguments[0]
funcs = tuple_of_arguments[1]

old_return_fixed_indices = filter_applied.return_fixed_indices
filter_applied.return_fixed_indices = True
indices_that_exist, fixed_indices = filter_applied.apply(self._data)
filter_applied.return_fixed_indices = old_return_fixed_indices

column_names = tuple_of_arguments[2] if len(tuple_of_arguments) > 2 else None
if column_names is None:
column_names = list(self._data.columns.values)

summaries = [summarize(self._data.loc[x[0]:x[1]][column_names], funcs) for
x in zip(fixed_indices[:-1], fixed_indices[1:])]

summary = functools.reduce(lambda df1, df2: pd.concat([df1, df2], ignore_index=False), summaries)
summary["End_Period"] = fixed_indices[:-1]
summary["Start_Period"] = fixed_indices[1:]

summary.set_index('Start_Period', inplace=True)

if not isinstance(summary, type(self._data)):
raise TypeError(
f'Interval Call to DataProcessor should return type {type(self._data)} but returned {type(summary)}')

# if one wishes to rename the column names that can be done through another __call__
return DataProcessor(summary)

@property
def data(self):
return self._data.copy()

@staticmethod
def _shift(new_column_name, source_column_name, shift_count, df):
df[[new_column_name]] = df[[source_column_name]].shift(shift_count)
return df

@staticmethod
def first(x):
return x[0]

@staticmethod
def last(x):
return x[-1]

# this group of functions are nothing more than convenience functions!!
# I know, breaks the single interface principle...
def summarize_intervals(self, time_freq_filter, funcs_list, column_name):
return self.__getitem__((time_freq_filter, funcs_list, column_name))

def time_freq(self, *args, **kwargs):
return self.__call__(TimeFreqFilter(*args, **kwargs))

def between_time(self, start_time, end_time):
return self.__call__("between_time", start_time, end_time)

def filter_on_column(self, func, column_name):
return self.__call__(partial(func, column_name))

def positive_column(self, value_column="Value"):
return self.filter_on_column(lambda cn, d: d[d[cn] > 0.0], value_column)

def index(self, start_index, end_index):
return self.__call__(partial(lambda x, y, z: z.loc[x:y], start_index, end_index))

def rename_columns(self, old_names_list, new_names_list):
return self.__call__(lambda x: x.rename(columns=dict(zip(old_names_list, new_names_list))))

def shift_to_new_column(self, new_column_name, source_column_name, shift_count):
return self.__call__(partial(DataProcessor._shift, new_column_name, source_column_name, shift_count))
106 changes: 106 additions & 0 deletions feed_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import pandas as pd
import functools
import operator
from abc import ABC
from enum import Enum
from typing import Union, List

from constants import TimePeriod


class FilterType(Enum):
TIME = 'Time'
VOLUME = 'Volume'


class FilterInterface(ABC):
def apply(self, *args, **kwargs):
pass


class Filtration(object):
def __init__(self, filters: Union[FilterInterface, List[FilterInterface]] = None):
self.filters = []
if filters is not None:
self.filters = [x for x in filters if x is not None] if isinstance(filter, list) else [filters]

def add_filter(self, filter: FilterInterface):
self.filters.append(filter)

def __str__(self):
return '\n'.join([str(f) for f in self.filters])

def apply(self, *args, **kwargs):
return [f.apply(*args, **kwargs) for f in self.filters]


class FreqFilter(FilterInterface):
def __init__(self, period, length=1, starting=None, return_fixed_indices=False):
super(FreqFilter, self).__init__()
self.length = length
self.period = period
self.starting = starting
self.return_fixed_indices = return_fixed_indices

def __str__(self):
type_of_length = "NoneType" if self.length is None else str(type(self.length))
return " ".join([str(type(self.period)), str(self.period), type_of_length, str(self.length)])

def apply(self, *args, **kwargs):
pass


class TimeIndexing(Enum):
BEFORE = 1
AFTER = 2
BEFORE_AND_AFTER = 3


# TODO improve the speed
class TimeFreqFilter(FreqFilter):
def __init__(self, period, length=None, starting=None, indexing=TimeIndexing.BEFORE, return_fixed_indices=False):
assert isinstance(period, TimePeriod)
if length is None:
if not period == TimePeriod.CONTINUOUS:
length = 1
elif period == TimePeriod.CONTINUOUS:
period = None
super(TimeFreqFilter, self).__init__(period, length, starting, return_fixed_indices)
self.time_indexing = indexing

def apply(self, *args, **kwargs):
dfi = args[0][0].index if not isinstance(args[0], pd.DataFrame) else args[0].index
# it is possible that we get duplicated indices in
# (that is OK, multiple data points at the same instance)
# but we need unique data points when filtering
dfi = dfi[~dfi.duplicated(keep='first')]
# TODO: check that the latter did not unnecessarily clean indices in the original data
if self.period == TimePeriod.CONTINUOUS:
return dfi

used_starting = self.starting if self.starting is not None else dfi[0]
used_range = pd.date_range(used_starting,
dfi[-1].to_pydatetime(),
freq=f'{self.length}{self.period.value}')

return_fixed_range = self.return_fixed_indices
# comparing interval to retrieved time index:
# [used_range[2],
# dfi.asof(used_range[2]),
# dfi.to_series().truncate(before=used_range[2])[0],
# dfi[dfi.get_loc(used_range[2], method='bfill')]]
if self.time_indexing == TimeIndexing.BEFORE:
indices_to_return = sorted(list(set([dfi.asof(x) for x in used_range])))
elif self.time_indexing == TimeIndexing.AFTER:
indices_to_return = sorted(list(set([dfi[dfi.get_loc(x, method='bfill')] for x in used_range])))
elif self.time_indexing == TimeIndexing.BEFORE_AND_AFTER:
# https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
indices_to_return = sorted(list(set(functools.reduce(operator.iconcat,
[[dfi.asof(x), dfi[dfi.get_loc(x, method='bfill')]]
for x in used_range], []))))
if return_fixed_range:
indices_to_return = (indices_to_return, used_range)

return indices_to_return

pass
Empty file added tests/__init__.py
Empty file.
112 changes: 112 additions & 0 deletions tests/test_data_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from feed_filter import TimeFreqFilter
from constants import TimePeriod
from data_processor import DataProcessor

from pprint import pprint as pp
from functools import partial
from datetime import datetime
import pandas as pd
import numpy as np

from faker import Faker

fake = Faker()


def duplicate_col(source_col_name, target_col_name, df):
df[[target_col_name]] = df[[source_col_name]]
return df


def shift_colname(column_name, num_shifts, df):
lag_char = 'F' if num_shifts < 0 else 'L'
new_column_name = f'{column_name}_{lag_char}{str(np.abs(num_shifts))}'
df[new_column_name] = df[[column_name]].shift(num_shifts)
return df


def rolling_mean(x, col_name, n):
return pd.DataFrame(x[col_name].shift(1).rolling(window=n).mean(), index=x.index)


def test_data_processor():
num_obs = 2000
data = pd.DataFrame(np.random.randn(num_obs).tolist(), columns=["Return"], index=[fake.date_time_between_dates(
datetime_start=datetime(2020, 3, 13, 14, 58, 57), datetime_end=datetime(2020, 3, 20, 14, 58, 57), tzinfo=None)
for x in range(num_obs)])
# pp(data.Return['2020-03-13 19:55:49.743080':'2020-03-15 13:00:00.866140'])

z = DataProcessor(data)(TimeFreqFilter(TimePeriod.MINUTE, 15))(rolling_mean, col_name="Return", n=5).data
# pp(z.Return['2020-03-13 19:55:49.743080':'2020-03-15 13:00:00.866140'])

z2 = DataProcessor(data)(TimeFreqFilter(TimePeriod.HOUR, 1))("between_time", '08:30', '16:30')(
lambda x: x.rename(columns={"Return": "RETURN"})).data
# pp(z2.head(5))
# pp(z2.tail(5))

z3 = DataProcessor(data)("between_time", '15:59', '16:30')(TimeFreqFilter(TimePeriod.BUSINESS_DAY))(
lambda x: x[x.Return > 0.0])
# pp(z3.head(5))
# pp(z3.tail(5))

z2 = DataProcessor(data).time_freq(TimePeriod.HOUR, 1). \
between_time('08:30', '16:30').data
# pp(z2.Return['2020-03-13 19:55:49.743080':'2020-03-15 13:00:00.866140'])

z2 = DataProcessor(data) \
(partial(lambda x, y, z: z.loc[x:y], '2020-03-13 08:00', '2020-03-17 08:00')) \
("between_time", '08:15', '16:30') \
(lambda x: x[x.Return > 0.0]) \
[TimeFreqFilter(TimePeriod.MINUTE, 5, starting=datetime(2017, 6, 1, 8, 15, 0)),
[DataProcessor.first, np.max, np.min, DataProcessor.last, np.median, np.mean, np.std], "Return"] \
(lambda x: x.rename(columns={'amax': 'HIGH', 'amin': 'LOW', 'mean': 'MEAN',
'median': 'MEDIAN', 'first': 'OPEN', 'last': 'CLOSE', 'std': 'STD'})).data

# pp(z2['2020-03-13 12:00':'2020-03-16 13:00'])
# pp(z2.head(5).HIGH - z2.head(5).LOW)
# pp(z2.columns.values)

z3 = DataProcessor(data).between_time('11:30', '14:00').shift_to_new_column("L1_LOG_RET", "Return", 1).data
# pp(z3.tail(5))

z3 = DataProcessor(data).between_time('08:01', '18:30').time_freq(TimePeriod.BUSINESS_DAY).positive_column(
value_column="Return").data
# pp(z3.tail(5))

z3 = DataProcessor(data).index('2020-03-13 19:55:49.743080', '2020-03-15 13:00:00.866140'). \
between_time('08:15', '16:30').positive_column(value_column="Return"). \
summarize_intervals(TimeFreqFilter(TimePeriod.MINUTE, 5, starting=datetime(2020, 3, 13, 19, 0, 0)),
[DataProcessor.first, np.max, np.min, DataProcessor.last, np.median, np.mean, np.std],
"Return"). \
rename_columns(['amax', 'amin', 'mean', 'median', 'first', 'last', 'std'],
['HIGH', 'LOW', 'MEAN', 'MEDIAN', 'OPEN', 'CLOSE', 'STD']).data

# pp(z3.HIGH - z3.LOW)
# pp(z3.tail(5))

z2 = DataProcessor(data).index('2020-03-13 19:55', '2020-03-15 13:00'). \
between_time('08:15', '16:30').positive_column(value_column="Return"). \
summarize_intervals(TimeFreqFilter(TimePeriod.MINUTE, 30, starting=datetime(2020, 3, 14, 8, 0, 0)),
[DataProcessor.first, np.max, np.min, DataProcessor.last, np.median, np.mean, np.std],
"Return"). \
rename_columns(['amax', 'amin', 'mean', 'median', 'first', 'last', 'std'],
['HIGH', 'LOW', 'MEAN', 'MEDIAN', 'OPEN', 'CLOSE', 'STD'])(lambda x: x[~np.isnan(x.STD)]).data
# pp(z2.tail(5))

z2 = DataProcessor(data) \
(partial(lambda x, y, z: z.loc[x:y], '2020-03-13 19:55', '2020-03-15 13:00')) \
("between_time", '08:15', '16:30') \
(lambda x: x[x.Return > 0.0]) \
[TimeFreqFilter(TimePeriod.MINUTE, 30, starting=datetime(2020, 3, 14, 8, 0, 0)),
[DataProcessor.first, np.max, np.min, DataProcessor.last, np.median, np.mean, np.std], "Return"] \
(lambda x: x.rename(columns={'amax': 'HIGH', 'amin': 'LOW', 'mean': 'MEAN',
'median': 'MEDIAN', 'first': 'OPEN', 'last': 'CLOSE', 'std': 'STD'})) \
(partial(duplicate_col, "MEAN", "LogReturn_MEAN")) \
(partial(duplicate_col, "STD", "LogReturn_STD")) \
(partial(shift_colname, 'LogReturn_MEAN', -1)) \
(partial(shift_colname, 'LogReturn_STD', -1)) \
(lambda x: x[~np.isnan(x.LogReturn_STD) & ~np.isnan(x.STD) & ~np.isnan(x.LogReturn_STD_F1)]).data

# pp(z2.columns.values)
# pp(z2.head(10))

Loading

0 comments on commit 88131c0

Please sign in to comment.