diff --git a/praice/cli.py b/praice/cli.py index 239dbbe..9e5a752 100644 --- a/praice/cli.py +++ b/praice/cli.py @@ -2,33 +2,16 @@ from typing import Optional import typer +from peewee import DoesNotExist, IntegrityError from rich import print as rprint from rich.table import Table +from praice.data_handling.collectors import price_collector from praice.data_handling.collectors.news_collector import ( collect_news_articles, collect_news_headlines, ) -from praice.data_handling.collectors.price_collector import ( - collect_historical_prices, - update_all_symbols_prices, - update_historical_prices, -) -from praice.data_handling.db_ops.crud import ( - add_scraping_url, - add_symbol, - delete_scraping_url, - delete_symbol, - get_historical_prices, - list_scraping_urls, - list_symbols, - update_scraping_url, - update_symbol, -) -from praice.data_handling.db_ops.news_helpers import ( - count_news_by_symbol, - count_news_with_null_content, -) +from praice.data_handling.db_ops import crud, news_helpers from praice.data_handling.models import db from praice.utils import logging @@ -37,11 +20,13 @@ app = typer.Typer() symbol_app = typer.Typer() +symbol_config_app = typer.Typer() scraping_url_app = typer.Typer() news_app = typer.Typer() price_app = typer.Typer() app.add_typer(symbol_app, name="symbol") +app.add_typer(symbol_config_app, name="symbol-config") app.add_typer(scraping_url_app, name="scraping-url") app.add_typer(news_app, name="news") app.add_typer(price_app, name="price") @@ -50,6 +35,8 @@ # ################# # Symbol commands # ################# + + @symbol_app.command("add") def cli_add_symbol( symbol: str = typer.Option(..., prompt=True), @@ -75,7 +62,9 @@ def cli_add_symbol( exchange = exchange or None try: - new_symbol = add_symbol(symbol, name, asset_class, sector, industry, exchange) + new_symbol = crud.add_symbol( + symbol, name, asset_class, sector, industry, exchange + ) rprint(f"[green]Symbol {new_symbol.symbol} added successfully.[/green]") except Exception as e: rprint(f"[red]Error adding symbol: {str(e)}[/red]") @@ -84,7 +73,7 @@ def cli_add_symbol( @symbol_app.command("list") def cli_list_symbols(): """List all symbols in the database.""" - symbols = list_symbols() + symbols = crud.list_symbols() table = Table(title="Symbols") table.add_column("Symbol", style="cyan") table.add_column("Name", style="magenta") @@ -117,7 +106,7 @@ def cli_update_symbol( ): """Update an existing symbol in the database.""" try: - update_symbol(symbol, name, asset_class, sector, industry, exchange) + crud.update_symbol(symbol, name, asset_class, sector, industry, exchange) rprint(f"[green]Symbol {symbol} updated successfully.[/green]") except Exception as e: rprint(f"[red]Error updating symbol: {str(e)}[/red]") @@ -127,7 +116,7 @@ def cli_update_symbol( def cli_delete_symbol(symbol: str = typer.Argument(..., help="Symbol to delete")): """Delete a symbol from the database.""" try: - if delete_symbol(symbol): + if crud.delete_symbol(symbol): rprint(f"[green]Symbol {symbol} deleted successfully.[/green]") else: rprint(f"[red]Symbol {symbol} not found.[/red]") @@ -135,9 +124,148 @@ def cli_delete_symbol(symbol: str = typer.Argument(..., help="Symbol to delete") rprint(f"[red]Error deleting symbol: {str(e)}[/red]") +# ################# +# Symbol Config commands +# ################# + + +@symbol_config_app.command("show") +def show_symbol_config(symbol: str): + """ + Show the configuration for a specific symbol. + """ + try: + config = crud.get_symbol_config(symbol) + table = Table(title=f"Configuration for {symbol}") + table.add_column("Setting", style="cyan") + table.add_column("Value", style="magenta") + + table.add_row("Collect Price Data", str(config.collect_price_data)) + table.add_row("Collect YFinance News", str(config.collect_yfinance_news)) + table.add_row( + "Collect Technical Indicators", str(config.collect_technical_indicators) + ) + table.add_row("Collect Fundamental Data", str(config.collect_fundamental_data)) + table.add_row("Custom Settings", config.custom_settings or "None") + + rprint(table) + except DoesNotExist: + rprint(f"[red]No configuration found for symbol {symbol}[/red]") + + +@symbol_config_app.command("create") +def create_symbol_config_cli( + symbol: str, + collect_price_data: bool = typer.Option( + True, "--price/--no-price", help="Collect price data" + ), + collect_yfinance_news: bool = typer.Option( + True, "--news/--no-news", help="Collect YFinance news" + ), + collect_technical_indicators: bool = typer.Option( + True, "--tech/--no-tech", help="Collect technical indicators" + ), + collect_fundamental_data: bool = typer.Option( + True, "--fund/--no-fund", help="Collect fundamental data" + ), +): + """ + Create a new configuration for a symbol. + """ + try: + crud.create_symbol_config( + symbol, + collect_price_data=collect_price_data, + collect_yfinance_news=collect_yfinance_news, + collect_technical_indicators=collect_technical_indicators, + collect_fundamental_data=collect_fundamental_data, + ) + rprint(f"[green]Configuration created successfully for {symbol}[/green]") + except IntegrityError: + rprint(f"[red]Configuration already exists for symbol {symbol}[/red]") + except DoesNotExist: + rprint(f"[red]Symbol {symbol} not found in the database[/red]") + + +@symbol_config_app.command("update") +def update_symbol_config_cli( + symbol: str, + collect_price_data: Optional[bool] = typer.Option( + None, "--price/--no-price", help="Collect price data" + ), + collect_yfinance_news: Optional[bool] = typer.Option( + None, "--news/--no-news", help="Collect YFinance news" + ), + collect_technical_indicators: Optional[bool] = typer.Option( + None, "--tech/--no-tech", help="Collect technical indicators" + ), + collect_fundamental_data: Optional[bool] = typer.Option( + None, "--fund/--no-fund", help="Collect fundamental data" + ), +): + """ + Update the configuration for a symbol. + """ + update_data = {} + if collect_price_data is not None: + update_data["collect_price_data"] = collect_price_data + if collect_yfinance_news is not None: + update_data["collect_yfinance_news"] = collect_yfinance_news + if collect_technical_indicators is not None: + update_data["collect_technical_indicators"] = collect_technical_indicators + if collect_fundamental_data is not None: + update_data["collect_fundamental_data"] = collect_fundamental_data + + if update_data: + if crud.update_symbol_config(symbol, **update_data): + rprint(f"[green]Configuration updated successfully for {symbol}[/green]") + else: + rprint(f"[red]Failed to update configuration for {symbol}[/red]") + else: + rprint("[yellow]No updates specified[/yellow]") + + +@symbol_config_app.command("delete") +def delete_symbol_config_cli(symbol: str): + """ + Delete the configuration for a symbol. + """ + if crud.delete_symbol_config(symbol): + rprint(f"[green]Configuration deleted successfully for {symbol}[/green]") + else: + rprint(f"[red]Failed to delete configuration for {symbol}[/red]") + + +@symbol_config_app.command("list") +def list_symbol_configs(): + """ + List all symbol configurations. + """ + configs = crud.list_symbol_configs() + table = Table(title="Symbol Configurations") + table.add_column("Symbol", style="cyan") + table.add_column("Collect Price Data", style="magenta") + table.add_column("Collect YFinance News", style="green") + table.add_column("Collect Technical Indicators", style="yellow") + table.add_column("Collect Fundamental Data", style="blue") + + for config in configs: + table.add_row( + config.symbol.symbol, + str(config.collect_price_data), + str(config.collect_yfinance_news), + str(config.collect_technical_indicators), + str(config.collect_fundamental_data), + ) + + rprint(table) + + # ################# # Scraping URL commands # ################# + + @scraping_url_app.command("add") def cli_add_scraping_url( symbol: str = typer.Option(..., prompt=True), @@ -146,7 +274,7 @@ def cli_add_scraping_url( ): """Add a new scraping URL for a symbol.""" try: - new_url = add_scraping_url(symbol, url, source) + new_url = crud.add_scraping_url(symbol, url, source) rprint( f"[green]Scraping URL for {new_url.symbol.symbol} added successfully.[/green]" ) @@ -154,12 +282,54 @@ def cli_add_scraping_url( rprint(f"[red]Error adding scraping URL: {str(e)}[/red]") +@scraping_url_app.command("add-yfinance") +def cli_add_yfinance_scraping_url( + symbol: str = typer.Option(..., prompt=True), +): + """Add a new scraping URL for a symbol using Yahoo Finance.""" + try: + symbol_obj = crud.get_symbol(symbol) + url = f"https://finance.yahoo.com/quote/{symbol_obj.symbol}/news/" + new_url = crud.add_scraping_url(symbol, url, "yfinance") + rprint( + f"[green]Scraping URL for {new_url.symbol.symbol} added successfully.[/green]" + ) + except DoesNotExist: + rprint(f"[red]Symbol {symbol} not found in the database[/red]") + except IntegrityError: + rprint(f"[red]Scraping URL already exists for symbol {symbol}[/red]") + except Exception as e: + rprint(f"[red]Error adding scraping URL: {str(e)}[/red]") + + +@scraping_url_app.command("add-yfinance-all") +def cli_add_yfinance_scraping_url_all_symbol_configs(): + """ + Add scraping URLs for all symbols that have yfinance news collection enabled in `symbol-config` table. + """ + configs = crud.list_symbol_configs() + for config in configs: + if config.collect_yfinance_news: + try: + url = f"https://finance.yahoo.com/quote/{config.symbol.symbol}/news/" + new_url = crud.add_scraping_url(config.symbol.symbol, url, "yfinance") + rprint( + f"[green]Scraping URL for {new_url.symbol.symbol} added successfully.[/green]" + ) + except IntegrityError: + rprint( + f"[red]Scraping URL already exists for symbol {config.symbol.symbol}[/red]" + ) + except Exception as e: + rprint(f"[red]Error adding scraping URL: {str(e)}[/red]") + + @scraping_url_app.command("list") def cli_list_scraping_urls( symbol: Optional[str] = typer.Option(None, help="Filter by symbol"), ): """List scraping URLs, optionally filtered by symbol.""" - urls = list_scraping_urls(symbol) + urls = crud.list_scraping_urls(symbol) table = Table(title="Scraping URLs") table.add_column("ID", style="cyan") table.add_column("Symbol", style="magenta") @@ -190,7 +360,7 @@ def cli_update_scraping_url( ): """Update an existing scraping URL.""" try: - update_scraping_url(id, url, source, is_active) + crud.update_scraping_url(id, url, source, is_active) rprint(f"[green]Scraping URL (ID: {id}) updated successfully.[/green]") except Exception as e: rprint(f"[red]Error updating scraping URL: {str(e)}[/red]") @@ -202,7 +372,7 @@ def cli_delete_scraping_url( ): """Delete a scraping URL.""" try: - if delete_scraping_url(id): + if crud.delete_scraping_url(id): rprint(f"[green]Scraping URL (ID: {id}) deleted successfully.[/green]") else: rprint(f"[red]Scraping URL with ID {id} not found.[/red]") @@ -213,6 +383,8 @@ def cli_delete_scraping_url( # ################# # News commands # ################# + + @news_app.command("collect-headlines") def cli_collect_news_headlines( symbol: str = typer.Argument(..., help="Symbol to collect news for"), @@ -260,7 +432,7 @@ def cli_collect_news_articles( @news_app.command("count-null-content") def cli_count_news_with_null_content(): """Count the number of news articles with null content.""" - count = count_news_with_null_content() + count = news_helpers.count_news_with_null_content() rprint(f"[cyan]Number of news articles with null content: {count}[/cyan]") @@ -269,7 +441,7 @@ def cli_count_news_by_symbol( n: int = typer.Option(-1, "-n", help="Number of symbols to return"), ): """Count the number of news articles for each symbol.""" - counts = count_news_by_symbol(n)["news_count_by_symbol"] + counts = news_helpers.count_news_by_symbol(n)["news_count_by_symbol"] table = Table(title="News Counts by Symbol") table.add_column("Symbol", style="cyan") @@ -284,6 +456,8 @@ def cli_count_news_by_symbol( # ################# # Price commands # ################# + + @price_app.command("collect") def cli_collect_prices( symbol: str = typer.Argument(..., help="The stock symbol to collect data for"), @@ -303,11 +477,13 @@ def cli_collect_prices( if days: end_date = datetime.now(UTC) start_date = end_date - timedelta(days=days) - price_data = collect_historical_prices( + price_data = price_collector.collect_historical_prices( symbol=symbol, start_date=start_date, end_date=end_date ) else: - price_data = collect_historical_prices(symbol=symbol, period=period) + price_data = price_collector.collect_historical_prices( + symbol=symbol, period=period + ) if not price_data: rprint(f"[red]No price data collected for {symbol}[/red]") @@ -316,6 +492,36 @@ def cli_collect_prices( rprint(f"[green]Collected {len(price_data)} price records for {symbol}[/green]") +@price_app.command("collect-all") +def cli_collect_all_prices( + days: Optional[int] = typer.Option(None, help="Number of days to collect data for"), + period: Optional[str] = typer.Option( + "max", + help=( + "Period to collect data for. " + "Choices: 1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max" + ), + ), +): + """ + Collect historical price data for all symbols + that have price data collection enabled in `symbol_configs`. + """ + try: + if days: + end_date = datetime.now(UTC) + start_date = end_date - timedelta(days=days) + results = price_collector.collect_historical_prices_all( + start_date=start_date, end_date=end_date + ) + else: + results = price_collector.collect_historical_prices_all(period=period) + + rprint(f"[green]Collected prices for {len(results)} symbols[/green]") + except Exception as e: + rprint(f"[red]Error collecting prices: {str(e)}[/red]") + + @price_app.command("update") def cli_update_prices( symbol: str = typer.Argument(..., help="The stock symbol to update data for"), @@ -326,7 +532,7 @@ def cli_update_prices( """ Update historical prices for a given symbol in the database. """ - updated_count = update_historical_prices(symbol, days) + updated_count = price_collector.update_historical_prices(symbol, days) rprint(f"[green]Updated {updated_count} price records for {symbol}[/green]") @@ -339,7 +545,7 @@ def cli_update_all_prices( """ Update historical prices for all active symbols in the database. """ - results = update_all_symbols_prices(days) + results = price_collector.update_all_symbols_prices(days) total_updated = sum(results.values()) rprint( f"[green]Updated prices for {len(results)} symbols. Total records updated: {total_updated}[/green]" @@ -357,7 +563,7 @@ def cli_show_prices( end_date = datetime.now(UTC).date() start_date = end_date - timedelta(days=days) - prices = get_historical_prices(symbol, start_date, end_date) + prices = crud.get_historical_prices(symbol, start_date, end_date) if not prices: rprint(f"[red]No price data found for {symbol} in the last {days} days[/red]") diff --git a/praice/data_handling/collectors/price_collector.py b/praice/data_handling/collectors/price_collector.py index 5f54962..5204b40 100644 --- a/praice/data_handling/collectors/price_collector.py +++ b/praice/data_handling/collectors/price_collector.py @@ -4,7 +4,7 @@ import yfinance as yf from loguru import logger -from praice.data_handling.db_ops.crud import bulk_upsert_historical_prices +from praice.data_handling.db_ops import crud from praice.data_handling.db_ops.symbol_helpers import get_active_symbols @@ -42,7 +42,7 @@ def collect_historical_prices( hist = hist.rename(columns=str.lower) hist = hist.rename(columns={"stock splits": "stock_splits"}) price_data = hist.to_dict(orient="records") - bulk_upsert_historical_prices(symbol, price_data) + crud.bulk_upsert_historical_prices(symbol, price_data) return price_data except Exception as e: @@ -50,6 +50,51 @@ def collect_historical_prices( return [] +def collect_historical_prices_all( + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + period: str = "max", +) -> Dict[str, List[Dict]]: + """ + Collects historical prices for all symbols that have price data + collection enabled in `symbol_configs`. + + Args: + start_date (Optional[datetime]): The start date for collecting historical prices. + Defaults to None. + end_date (Optional[datetime]): The end date for collecting historical prices. + Defaults to None. + period (str): The period for collecting historical prices. + Defaults to "max". + + Returns: + Dict[str, List[Dict]]: A dictionary containing historical price data for each symbol. + + """ + results = {} + configs = crud.list_symbol_configs() + for config in configs: + if config.collect_price_data: + try: + if start_date and end_date: + price_data = collect_historical_prices( + symbol=config.symbol.symbol, + start_date=start_date, + end_date=end_date, + ) + else: + price_data = collect_historical_prices( + symbol=config.symbol.symbol, period=period + ) + + results[config.symbol.symbol] = price_data + except Exception as e: + logger.error( + f"Error collecting historical prices for {config.symbol.symbol}: {str(e)}" + ) + return results + + def update_historical_prices(symbol: str, lookback_days: int = 30) -> int: """ Update historical prices for a given symbol. @@ -73,7 +118,7 @@ def update_historical_prices(symbol: str, lookback_days: int = 30) -> int: return 0 try: - updated_count = bulk_upsert_historical_prices(symbol, price_data) + updated_count = crud.bulk_upsert_historical_prices(symbol, price_data) logger.info(f"Updated {updated_count} historical price records for {symbol}") return updated_count diff --git a/praice/data_handling/db_ops/crud.py b/praice/data_handling/db_ops/crud.py index 3b05b4b..d2bc476 100644 --- a/praice/data_handling/db_ops/crud.py +++ b/praice/data_handling/db_ops/crud.py @@ -9,14 +9,16 @@ NewsSymbol, ScrapingUrl, Symbol, + SymbolConfig, db, ) from praice.utils import helpers - # ############################ # Symbol CRUD operations # ############################ + + def get_symbol(symbol: str) -> Symbol: """ Retrieves a Symbol object based on the given symbol. @@ -33,6 +35,16 @@ def get_symbol(symbol: str) -> Symbol: return Symbol.get(Symbol.symbol == symbol.upper()) +def _ensure_symbol(symbol: Union[Symbol, str]) -> Symbol: + """ + Ensure that we have a Symbol object. + If a string is provided, fetch the corresponding Symbol object. + """ + if isinstance(symbol, str): + return get_symbol(symbol) + return symbol + + def add_symbol( symbol: str, name: str, @@ -111,9 +123,77 @@ def get_asset_class(info: Dict[str, any]) -> str: return "stock" +# ############################ +# SymbolConfig CRUD operations +# ############################ + + +def create_symbol_config( + symbol: Union[Symbol, str], + collect_price_data: bool = True, + collect_yfinance_news: bool = True, + collect_technical_indicators: bool = True, + collect_fundamental_data: bool = True, +) -> SymbolConfig: + """ + Create a new symbol configuration. + """ + symbol_obj = _ensure_symbol(symbol) + return SymbolConfig.create( + symbol=symbol_obj, + collect_price_data=collect_price_data, + collect_yfinance_news=collect_yfinance_news, + collect_technical_indicators=collect_technical_indicators, + collect_fundamental_data=collect_fundamental_data, + ) + + +def get_symbol_config(symbol: Union[Symbol, str]) -> SymbolConfig: + """ + Retrieve the configuration for a specific symbol. + """ + symbol_obj = _ensure_symbol(symbol) + return SymbolConfig.get(SymbolConfig.symbol == symbol_obj) + + +def update_symbol_config(symbol: Union[Symbol, str], **kwargs) -> bool: + """ + Update the configuration for a specific symbol. + """ + symbol_obj = _ensure_symbol(symbol) + query = SymbolConfig.update(**kwargs).where(SymbolConfig.symbol == symbol_obj) + return query.execute() > 0 + + +def delete_symbol_config(symbol: Union[Symbol, str]) -> bool: + """ + Delete the configuration for a specific symbol. + """ + symbol_obj = _ensure_symbol(symbol) + query = SymbolConfig.delete().where(SymbolConfig.symbol == symbol_obj) + return query.execute() > 0 + + +def get_or_create_symbol_config( + symbol: Union[Symbol, str], +) -> Tuple[SymbolConfig, bool]: + """ + Get the existing symbol configuration or create a new one with default values. + """ + symbol_obj = _ensure_symbol(symbol) + return SymbolConfig.get_or_create(symbol=symbol_obj) + + +def list_symbol_configs() -> List[SymbolConfig]: + """List all symbol configurations in the database.""" + return list(SymbolConfig.select()) + + # ############################ # ScrapingUrl CRUD operations # ############################ + + def add_scraping_url(symbol: str, url: str, source: str) -> ScrapingUrl: """Add a new scraping URL for a symbol.""" with db.atomic(): @@ -156,6 +236,8 @@ def delete_scraping_url(id: int) -> bool: # ############################ # News CRUD operations # ############################ + + def create_news( title: str, url: str, @@ -274,6 +356,8 @@ def delete_news(news_id: int) -> bool: # ############################ # NewsSymbol CRUD operations # ############################ + + def create_news_symbol(news: News, symbol: Symbol) -> NewsSymbol: """ Create a NewsSymbol object. @@ -307,16 +391,6 @@ def delete_news_symbol(news_symbol_id: int) -> bool: # ############################ -def _ensure_symbol(symbol: Union[Symbol, str]) -> Symbol: - """ - Ensure that we have a Symbol object. - If a string is provided, fetch the corresponding Symbol object. - """ - if isinstance(symbol, str): - return get_symbol(symbol) - return symbol - - def create_historical_price( symbol: Union[Symbol, str], date: date, diff --git a/praice/data_handling/models.py b/praice/data_handling/models.py index 3d94c5f..a4fcf86 100644 --- a/praice/data_handling/models.py +++ b/praice/data_handling/models.py @@ -151,6 +151,28 @@ def create(cls, **query): return super(Symbol, cls).create(**query) +class SymbolConfig(BaseModel): + """ + Represents the configuration for a symbol. + + Attributes: + symbol (ForeignKeyField): The foreign key to the Symbol model. + collect_price_data (BooleanField): Indicates whether to collect price data. + collect_yfinance_news (BooleanField): Indicates whether to collect news from Yahoo Finance. + collect_technical_indicators (BooleanField): Indicates whether to collect technical indicators. + collect_fundamental_data (BooleanField): Indicates whether to collect fundamental data. + """ + + symbol = ForeignKeyField(Symbol, backref="config", unique=True) + collect_price_data = BooleanField(default=True) + collect_yfinance_news = BooleanField(default=True) + collect_technical_indicators = BooleanField(default=True) + collect_fundamental_data = BooleanField(default=True) + + class Meta: + table_name = "symbol_configs" + + class News(BaseModel): """ Represents a news article. @@ -257,7 +279,9 @@ def create_tables(): This function uses the `db` connection to create tables in the database. It takes no arguments. """ with db: - db.create_tables([Symbol, News, NewsSymbol, ScrapingUrl, HistoricalPrice1D]) + db.create_tables( + [Symbol, SymbolConfig, News, NewsSymbol, ScrapingUrl, HistoricalPrice1D] + ) if __name__ == "__main__": diff --git a/praice/scheduler.py b/praice/scheduler.py index 9ddfc9f..57604ba 100644 --- a/praice/scheduler.py +++ b/praice/scheduler.py @@ -1,11 +1,10 @@ +import pytz from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger -from praice.data_handling.collectors.news_collector import ( - collect_news_articles, - collect_news_headlines_by_source, -) +from praice.data_handling.collectors import news_collector, price_collector from praice.utils.logging import get_scheduler_logger # Get the scheduler-specific logger @@ -27,16 +26,10 @@ def collect_headlines_by_source_job(source: str): Args: source (str): The source from which to collect headlines. - - Returns: - None - - Raises: - Exception: If there is an error in the headline collection job. """ logger.info(f"Starting headline collection job from source {source}") try: - collect_news_headlines_by_source(source) + news_collector.collect_news_headlines_by_source(source) logger.info("Headline collection job completed successfully") except Exception as e: logger.error(f"Error in headline collection job: {str(e)}") @@ -45,22 +38,27 @@ def collect_headlines_by_source_job(source: str): def collect_articles_job(): """ Executes the article collection job. - - Returns: - None - - Raises: - Exception: If an error occurs during the article collection job. - """ logger.info("Starting article collection job") try: - collect_news_articles(limit=100) + news_collector.collect_news_articles(limit=100) logger.info("Article collection job completed successfully") except Exception as e: logger.error(f"Error in article collection job: {str(e)}") +def collect_price_data_job(): + """ + Executes the price data collection job. + """ + logger.info("Starting price data collection job") + try: + price_collector.collect_historical_prices_all(period="5d") + logger.info("Price data collection job completed successfully") + except Exception as e: + logger.error(f"Error in price data collection job: {str(e)}") + + def init_scheduler(): """ Initializes and starts the scheduler. @@ -76,7 +74,7 @@ def init_scheduler(): # Add jobs to the scheduler scheduler.add_job( collect_headlines_by_source_job, - "interval", + trigger="interval", minutes=80, id="collect_yfinance_headlines", kwargs={"source": "yfinance"}, @@ -84,10 +82,20 @@ def init_scheduler(): logger.info("Added job: collect_yfinance_headlines") scheduler.add_job( - collect_articles_job, "interval", minutes=170, id="collect_articles" + collect_articles_job, + trigger="interval", + minutes=170, + id="collect_articles", ) logger.info("Added job: collect_articles") + scheduler.add_job( + collect_price_data_job, + trigger=CronTrigger(hour=18, minute=0, timezone=pytz.timezone("US/Eastern")), + id="collect_price_data", + ) + logger.info("Added job: collect_price_data (runs daily at 6:00 PM ET)") + # Start the scheduler scheduler.start() logger.info("Scheduler started") diff --git a/requirements.txt b/requirements.txt index 03d1fbd..696935c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ peewee==3.17.6 psycopg2-binary==2.9.9 pydantic-settings==2.4.0 pydantic==2.8.2 +pytz==2024.1 requests==2.32.3 rich==13.7.1 tqdm==4.66.5