Skip to content

Commit

Permalink
+ GeoJSON stats
Browse files Browse the repository at this point in the history
  • Loading branch information
emi420 committed Oct 31, 2024
1 parent bfe193f commit 28e5865
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 11 deletions.
29 changes: 22 additions & 7 deletions API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from celery import Celery

# Reader imports
from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer
from src.app import CustomExport, PolygonStats, GeoJSONStats, RawData, S3FileTransfer
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
Expand Down Expand Up @@ -75,7 +75,7 @@ def create_readme_content(default_readme, polygon_stats):


def zip_binding(
working_dir, exportname_parts, geom_dump, polygon_stats, default_readme
working_dir, exportname_parts, geom_dump, polygon_stats, geojson_stats, default_readme
):
logging.debug("Zip Binding Started!")
upload_file_path = os.path.join(
Expand All @@ -88,6 +88,9 @@ def zip_binding(
),
}

if geojson_stats:
additional_files["stats.json"] = geojson_stats

for name, content in additional_files.items():
temp_path = os.path.join(working_dir, name)
with open(temp_path, "w") as f:
Expand Down Expand Up @@ -165,7 +168,6 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
if os.path.exists(clean_dir):
shutil.rmtree(clean_dir)


@celery.task(
bind=True,
name="process_raw_data",
Expand Down Expand Up @@ -209,11 +211,22 @@ def process_raw_data(self, params, user=None):
file_parts,
)

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)
inside_file_size = 0
polygon_stats = None
geojson_stats = None

if "include_stats" in params.dict():
if params.include_stats:
geoJSONStats = GeoJSONStats(params.filters)
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts, geoJSONStats.raw_data_line_stats)
geojson_stats = geoJSONStats.json()
else:
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)

inside_file_size = 0
if "include_stats" in params.dict():
if params.include_stats:
feature = {
Expand All @@ -222,12 +235,14 @@ def process_raw_data(self, params, user=None):
"properties": {},
}
polygon_stats = PolygonStats(feature).get_summary_stats()

if bind_zip:
upload_file_path, inside_file_size = zip_binding(
working_dir=working_dir,
exportname_parts=exportname_parts,
geom_dump=geom_dump,
polygon_stats=polygon_stats,
geojson_stats=geojson_stats,
default_readme=DEFAULT_README_TEXT,
)

Expand Down
57 changes: 53 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from psycopg2.extras import DictCursor
from slugify import slugify
from tqdm import tqdm
from geojson_stats.stats import Stats

# Reader imports
from src.config import (
Expand Down Expand Up @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
os.remove(query_path)

@staticmethod
def query2geojson(con, extraction_query, dump_temp_file_path):
def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn = None):
"""Function written from scratch without being dependent on any library, Provides better performance for geojson binding"""
# creating geojson file
pre_geojson = """{"type": "FeatureCollection","features": ["""
Expand All @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path):
for row in cursor:
if first:
first = False
f.write(row[0])
else:
f.write(",")
f.write(row[0])
if plugin_fn:
f.write(plugin_fn(row[0]))
else:
f.write((row[0]))
cursor.close() # closing connection to avoid memory issues
# close the writing geojson with last part
f.write(post_geojson)
Expand Down Expand Up @@ -711,7 +714,7 @@ def get_grid_id(geom, cur):
country_export,
)

def extract_current_data(self, exportname):
def extract_current_data(self, exportname, plugin_fn = None):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Args:
exportname: takes filename as argument to create geojson file passed from routers
Expand Down Expand Up @@ -777,6 +780,7 @@ def extract_current_data(self, exportname):
country_export=country_export,
),
dump_temp_file_path,
plugin_fn
) # uses own conversion class
if output_type == RawDataOutputType.SHAPEFILE.value:
(
Expand Down Expand Up @@ -2255,3 +2259,48 @@ def get_summary_stats(self, start_date, end_date, group_by):
result = self.cur.fetchall()
self.d_b.close_conn()
return [dict(item) for item in result]

class GeoJSONStats(Stats):
"""Used for collecting stats while processing GeoJSON files line by line"""

def __init__(self, filters, *args, **kwargs):
super().__init__(*args, **kwargs)

self.config.clean = True
self.config.properties_prop = "properties.tags"

if filters and filters.tags:
config_area = ["building"]
config_length = ["highway", "waterway"]

for tag in config_area:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.area = True
for tag in config_length:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.length = True

def check_filter(self, tags, tag):
if tags.all_geometry:
if tags.all_geometry.join_or and tag in tags.all_geometry.join_or:
return True
if tags.all_geometry.join_and and tag in tags.all_geometry.join_and:
return True
if tags.polygon:
if tags.polygon.join_or and tag in tags.polygon.join_or:
return True
if tags.polygon.join_and and tag in tags.polygon.join_and:
return True
if tags.line:
if tags.line.join_or and tag in tags.line.join_or:
return True
if tags.line.join_and and tag in tags.line.join_and:
return True

def raw_data_line_stats(self, line: str):
self.process_file_line(line)
return line

0 comments on commit 28e5865

Please sign in to comment.