Skip to content

Commit

Permalink
Merge pull request #33 from reza-zereh/feature/issue-32/collect-histo…
Browse files Browse the repository at this point in the history
…rical-prices

Feature/issue 32/collect historical prices
  • Loading branch information
ironcladgeek authored Aug 24, 2024
2 parents f09c246 + a87fa78 commit 66ef63e
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 4 deletions.
118 changes: 118 additions & 0 deletions praice/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import UTC, datetime, timedelta
from typing import Optional

import typer
Expand All @@ -8,11 +9,17 @@
collect_news_articles,
collect_news_headlines,
)
from praice.data_handling.collectors.price_collector import (
collect_historical_prices,
update_all_symbols_prices,
update_historical_prices,
)
from praice.data_handling.db_ops.crud import (
add_scraping_url,
add_symbol,
delete_scraping_url,
delete_symbol,
get_historical_prices,
list_scraping_urls,
list_symbols,
update_scraping_url,
Expand All @@ -32,13 +39,17 @@
symbol_app = typer.Typer()
scraping_url_app = typer.Typer()
news_app = typer.Typer()
price_app = typer.Typer()

app.add_typer(symbol_app, name="symbol")
app.add_typer(scraping_url_app, name="scraping-url")
app.add_typer(news_app, name="news")
app.add_typer(price_app, name="price")


# #################
# Symbol commands
# #################
@symbol_app.command("add")
def cli_add_symbol(
symbol: str = typer.Option(..., prompt=True),
Expand Down Expand Up @@ -124,7 +135,9 @@ def cli_delete_symbol(symbol: str = typer.Argument(..., help="Symbol to delete")
rprint(f"[red]Error deleting symbol: {str(e)}[/red]")


# #################
# Scraping URL commands
# #################
@scraping_url_app.command("add")
def cli_add_scraping_url(
symbol: str = typer.Option(..., prompt=True),
Expand Down Expand Up @@ -197,7 +210,9 @@ def cli_delete_scraping_url(
rprint(f"[red]Error deleting scraping URL: {str(e)}[/red]")


# #################
# News commands
# #################
@news_app.command("collect-headlines")
def cli_collect_news_headlines(
symbol: str = typer.Argument(..., help="Symbol to collect news for"),
Expand Down Expand Up @@ -266,6 +281,109 @@ def cli_count_news_by_symbol(
rprint(table)


# #################
# Price commands
# #################
@price_app.command("collect")
def cli_collect_prices(
symbol: str = typer.Argument(..., help="The stock symbol to collect data for"),
days: Optional[int] = typer.Option(None, help="Number of days to collect data for"),
period: Optional[str] = typer.Option(
"max",
help=(
"Period to collect data for. "
"Choices: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max"
),
),
):
"""
Collect historical price data for a given symbol.
"""

if days:
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=days)
price_data = collect_historical_prices(
symbol=symbol, start_date=start_date, end_date=end_date
)
else:
price_data = collect_historical_prices(symbol=symbol, period=period)

if not price_data:
rprint(f"[red]No price data collected for {symbol}[/red]")
return

rprint(f"[green]Collected {len(price_data)} price records for {symbol}[/green]")


@price_app.command("update")
def cli_update_prices(
symbol: str = typer.Argument(..., help="The stock symbol to update data for"),
days: int = typer.Option(
30, help="Number of days to look back for updating prices"
),
):
"""
Update historical prices for a given symbol in the database.
"""
updated_count = update_historical_prices(symbol, days)
rprint(f"[green]Updated {updated_count} price records for {symbol}[/green]")


@price_app.command("update-all")
def cli_update_all_prices(
days: int = typer.Option(
30, help="Number of days to look back for updating prices"
),
):
"""
Update historical prices for all active symbols in the database.
"""
results = update_all_symbols_prices(days)
total_updated = sum(results.values())
rprint(
f"[green]Updated prices for {len(results)} symbols. Total records updated: {total_updated}[/green]"
)


@price_app.command("show")
def cli_show_prices(
symbol: str = typer.Argument(..., help="The stock symbol to show prices for"),
days: int = typer.Option(30, help="Number of days of price history to show"),
):
"""
Show historical prices for a given symbol.
"""
end_date = datetime.now(UTC).date()
start_date = end_date - timedelta(days=days)

prices = get_historical_prices(symbol, start_date, end_date)

if not prices:
rprint(f"[red]No price data found for {symbol} in the last {days} days[/red]")
return

table = Table(title=f"Historical Prices for {symbol}")
table.add_column("Date", style="cyan")
table.add_column("Open", style="magenta")
table.add_column("High", style="green")
table.add_column("Low", style="red")
table.add_column("Close", style="blue")
table.add_column("Volume", style="yellow")

for price in prices:
table.add_row(
str(price.date),
f"{price.open:.2f}",
f"{price.high:.2f}",
f"{price.low:.2f}",
f"{price.close:.2f}",
f"{price.volume:,}",
)

rprint(table)


if __name__ == "__main__":
db.connect()
app()
Expand Down
105 changes: 105 additions & 0 deletions praice/data_handling/collectors/price_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from datetime import UTC, datetime, timedelta
from typing import Dict, List, Optional

import yfinance as yf
from loguru import logger

from praice.data_handling.db_ops.crud import bulk_upsert_historical_prices
from praice.data_handling.db_ops.symbol_helpers import get_active_symbols


def collect_historical_prices(
symbol: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
period: str = "max",
) -> List[Dict]:
"""
Collect historical price data for a given symbol using yfinance.
Args:
symbol (str): The stock symbol to collect data for.
start_date (Optional[datetime]): The start date for historical data.
end_date (Optional[datetime]): The end date for historical data.
period (str): The period to fetch data for, used if start_date and end_date are None.
Options: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max
Returns:
List[Dict]: A list of dictionaries containing historical price data.
"""
symbol = symbol.upper()
logger.info(f"Collecting historical prices for {symbol}")

ticker = yf.Ticker(symbol)

try:
if start_date and end_date:
hist = ticker.history(start=start_date, end=end_date)
else:
hist = ticker.history(period=period)

hist = hist.reset_index()
hist = hist.rename(columns=str.lower)
hist = hist.rename(columns={"stock splits": "stock_splits"})
price_data = hist.to_dict(orient="records")
bulk_upsert_historical_prices(symbol, price_data)
return price_data

except Exception as e:
logger.error(f"Error collecting historical prices for {symbol}: {str(e)}")
return []


def update_historical_prices(symbol: str, lookback_days: int = 30) -> int:
"""
Update historical prices for a given symbol.
Args:
symbol (str): The stock symbol to update data for.
lookback_days (int): The number of days to look back for updating prices.
Returns:
int: The number of price records updated or inserted.
"""
logger.info(f"Updating historical prices for {symbol}")

end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=lookback_days)

price_data = collect_historical_prices(symbol, start_date, end_date)

if not price_data:
logger.warning(f"No price data collected for {symbol}")
return 0

try:
updated_count = bulk_upsert_historical_prices(symbol, price_data)
logger.info(f"Updated {updated_count} historical price records for {symbol}")
return updated_count

except Exception as e:
logger.error(f"Error updating historical prices for {symbol}: {str(e)}")
return 0


def update_all_symbols_prices(lookback_days: int = 30) -> Dict[str, int]:
"""
Update historical prices for all active symbols in the database.
Args:
lookback_days (int): The number of days to look back for updating prices.
Returns:
Dict[str, int]: A dictionary with symbols as keys and the number of updated records as values.
"""
logger.info("Updating historical prices for all active symbols")

results = {}
active_symbols = get_active_symbols()

for symbol in active_symbols:
updated_count = update_historical_prices(symbol.symbol, lookback_days)
results[symbol.symbol] = updated_count

logger.info(f"Completed updating historical prices for {len(results)} symbols")
return results
Loading

0 comments on commit 66ef63e

Please sign in to comment.