diff --git a/data_rentgen/db/scripts/create_analytic_views.py b/data_rentgen/db/scripts/create_analytic_views.py index 3699935..299f4c6 100755 --- a/data_rentgen/db/scripts/create_analytic_views.py +++ b/data_rentgen/db/scripts/create_analytic_views.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) -class Depths(str, Enum): +class Depth(str, Enum): DAY = "day" WEEK = "week" MONTH = "month" @@ -44,9 +44,9 @@ def get_parser() -> ArgumentParser: ) parser.add_argument( "--depths", - type=lambda x: Depths(x).value, - choices=[item.value for item in Depths], - nargs="?", + type=Depth, + choices=[item.value for item in Depth], + nargs="+", help="Depth of matherialized view data (created_at filter). Default is day", ) return parser @@ -62,11 +62,11 @@ def get_statement(base_table: str, type: str) -> str: {base_table}.dataset_id as dataset_id , u.id as user_id , u.name as user_name - , max({base_table}.created_at) as last_interaction + , max({base_table}.created_at) as last_interaction_dt , count(*) as num_of_interactions - , sum(num_bytes) as s_bytes - , sum(num_rows) as s_rows - , sum(num_files) as s_files + , sum(num_bytes) as sum_bytes + , sum(num_rows) as sum_rows + , sum(num_files) as sum_files FROM {base_table} JOIN run r ON {base_table}.run_id = r.id JOIN public.user u ON r.started_by_user_id = u.id @@ -75,14 +75,15 @@ def get_statement(base_table: str, type: str) -> str: ) SELECT d.name as dataset_name - , l.name as location_name + , l.name as dataset_location + , l.type as dataset_location_type , agr.user_id , agr.user_name - , agr.last_interaction + , agr.last_interaction_dt , agr.num_of_interactions - , agr.s_bytes - , agr.s_rows - , agr.s_files + , agr.sum_bytes + , agr.sum_rows + , agr.sum_files FROM aggregates agr JOIN dataset d ON agr.dataset_id = d.id LEFT JOIN location l ON d.location_id = l.id @@ -91,19 +92,19 @@ def get_statement(base_table: str, type: str) -> str: """ -async def create_views(depths: Depths, session: AsyncSession): +async def create_view(depth: Depth, session: AsyncSession): for base_table in ("output", "input"): - statement = get_statement(base_table, depths) + statement = get_statement(base_table, depth) logger.debug("Executing statement: %s", statement) await session.execute(text(statement)) - await session.commit() - await refresh_view(base_table + view_sufix_map[depths], session) -async def refresh_view(view_name: str, session: AsyncSession): - logger.info("Refresh view: %s", view_name) - statement = f"REFRESH MATERIALIZED VIEW {view_name}" - await session.execute(text(statement)) +async def refresh_view(depth: Depth, session: AsyncSession): + for base_table in ("output", "input"): + view_name = base_table + view_sufix_map[depth] + logger.info("Refresh view: %s", view_name) + statement = f"REFRESH MATERIALIZED VIEW {view_name}" + await session.execute(text(statement)) async def main(args: list[str]) -> None: @@ -112,19 +113,19 @@ async def main(args: list[str]) -> None: parser = get_parser() params = parser.parse_args(args) depths = params.depths + if not depths: + logger.info("Create views for all depths") + depths = Depth + else: + depths = sorted(set(depths)) db_settings = DatabaseSettings() session_factory = create_session_factory(db_settings) async with session_factory() as session: - if depths: - depths = Depths(depths) - logger.info("Create views with depths: %s", depths) - await create_views(depths, session) - else: - logger.info("Create all views") - for depth in Depths: - logger.info("Create views with depths: %s", depth) - await create_views(depth, session) + for depth in depths: + await create_view(depth, session) + await refresh_view(depth, session) + await session.commit() if __name__ == "__main__": diff --git a/docs/reference/database/index.rst b/docs/reference/database/index.rst index fbf41d2..164304c 100644 --- a/docs/reference/database/index.rst +++ b/docs/reference/database/index.rst @@ -19,14 +19,15 @@ Views based on data in ``output`` and ``input`` tables and has such structure: .. code:: text dataset_name - Name of dataset. - location_name - Name of dataset location (e.g. clusster name). + dataset_location - Name of dataset location (e.g. clusster name). + dataset_location_type - Type of dataset location (e.g. hive, hdfs, postgres). user_id - Internal user id. user_name - Internal user name (e.g. name of user which run spark job). - last_interaction - Time when user lat time interact with dataset. Read or write depens on base table. + last_interaction_dt - Time when user lat time interact with dataset. Read or write depens on base table. num_of_interactions - Number of interactions in given interval. - s_bytes - Sum of bytes in given interval. ``num_bytes`` - column. - s_rows - Sum of rows in given interval. ``num_rows`` - column. - s_files - Sum of files in given interval. ``num_files`` - column. + sum_bytes - Sum of bytes in given interval. ``num_bytes`` - column. + sum_rows - Sum of rows in given interval. ``num_rows`` - column. + sum_files - Sum of files in given interval. ``num_files`` - column. We provide three types of views: ``day``, ``week`` and ``month``, based on the time period in which the aggregation occur. By default, script creates pair views for all intervals.