Skip to content

Commit

Permalink
分钟处理示例
Browse files Browse the repository at this point in the history
  • Loading branch information
wukan1986 committed Dec 30, 2024
1 parent 41b8a37 commit 2df86c9
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions examples/demo_min.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,73 @@
如果分钟数据已经按日期分好了文件,也可以直接多进程并行处理,就没这么麻烦
"""
import sys
from datetime import datetime

import numpy as np
import pandas as pd
import polars as pl
from loguru import logger

from expr_codegen.tool import codegen_exec
from expr_codegen import codegen_exec # noqa

np.random.seed(42)

ASSET_COUNT = 500
DATE_COUNT = 250 * 20
DATE = pd.date_range(datetime(2020, 1, 1), periods=DATE_COUNT, freq='2h').repeat(ASSET_COUNT)
DATE_COUNT = 250 * 24 * 60 * 1
DATE = pd.date_range(datetime(2020, 1, 1), periods=DATE_COUNT, freq='1min').repeat(ASSET_COUNT)
ASSET = [f'A{i:04d}' for i in range(ASSET_COUNT)] * DATE_COUNT

df = pl.DataFrame(
{
'datetime': DATE,
'asset': ASSET,
"OPEN": np.random.rand(DATE_COUNT * ASSET_COUNT),
"HIGH": np.random.rand(DATE_COUNT * ASSET_COUNT),
"LOW": np.random.rand(DATE_COUNT * ASSET_COUNT),
"CLOSE": np.random.rand(DATE_COUNT * ASSET_COUNT),
"VOLUME": np.random.rand(DATE_COUNT * ASSET_COUNT),
"OPEN_INTEREST": np.random.rand(DATE_COUNT * ASSET_COUNT),
"FILTER": np.tri(DATE_COUNT, ASSET_COUNT, k=-2).reshape(-1),
}
).lazy()

df = df.filter(pl.col('FILTER') == 1)

logger.info('时间戳调整开始')
# 交易日,期货夜盘属于下一个交易日,后移4小时夜盘日期就一样了
df = df.with_columns(trading_day=pl.col('datetime').dt.offset_by("4h"))
# 周五晚已经变成了周六,双修要移动到周一
# 周五晚已经变成了周六,双休要移动到周一
df = df.with_columns(trading_day=pl.when(pl.col('trading_day').dt.weekday() > 5)
.then(pl.col("trading_day").dt.offset_by("2d"))
.otherwise(pl.col("trading_day")))
df = df.with_columns(
# 交易日
trading_day=pl.col("trading_day").dt.truncate("1d"),
trading_day=pl.col("trading_day").dt.date(),
# 工作日
action_day=pl.col('datetime').dt.truncate('1d'),
action_day=pl.col('datetime').dt.date(),
)


def _code_block_1():
OPEN_1 = ts_delay(OPEN, 1)
OPEN_RANK = cs_rank(OPEN_1)


df = df.collect()
logger.info('时间戳调整完成')
# ---
# !!! 重要代码,生成复合字段,用来ts_排序
# _asset_date以下划线开头,会自动删除,如要保留,可去了下划线
# 股票用action_day,期货用trading_day
df = df.with_columns(_asset_date=pl.struct("asset", "trading_day"))
print(df.tail(5))
df = codegen_exec(df, _code_block_1, output_file=sys.stdout, # 打印代码
df = codegen_exec(df, """OPEN_RANK = cs_rank(OPEN[1]) # 仅演示""",
# !!!使用时一定要分清分组是用哪个字段
date='datetime', asset='_asset_date')
# 演示中间某天的数据
df = df.filter(pl.col('asset') == 'A0000', pl.col('trading_day') == pl.datetime(2020, 1, 6))

print(df.collect())
# ---
logger.info('1分钟转日线开始')
df = df.sort('asset', 'datetime').group_by('asset', 'trading_day', maintain_order=True).agg(
open_dt=pl.first("datetime"),
close_dt=pl.last("datetime"),
OPEN=pl.first("OPEN"),
HIGH=pl.max("HIGH"),
LOW=pl.min("LOW"),
CLOSE=pl.last("CLOSE"),
VOLUME=pl.sum("VOLUME"),
OPEN_INTEREST=pl.last("OPEN_INTEREST"),
)
logger.info('1分钟转日线结束')
print(df)
# df.write_csv('output.csv')

0 comments on commit 2df86c9

Please sign in to comment.