Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
leodbrs committed Nov 21, 2024
0 parents commit cd88f34
Show file tree
Hide file tree
Showing 27 changed files with 2,884 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.DS_Store
mise.toml
__pycache__
4 changes: 4 additions & 0 deletions .mise.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[tools]
python = "3.10.15"
poetry = {version='latest', pyproject='pyproject.toml'}

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Todo
- [ ] Incorporate a color line feature to display the car battery status during travel
12 changes: 12 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3"

tasks:
default:
cmds:
- task: dev
dev:
desc: "Run development environment"
cmds:
- cmd: poetry run fastapi dev oufoaler/app.py
ignore_error: true
silent: true
1 change: 1 addition & 0 deletions markmap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

49 changes: 49 additions & 0 deletions oufoaler/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from oufoaler.config import config
from oufoaler.controllers.car_controller import CarController
from oufoaler.views.api import router as api_router

logging.basicConfig(level=getattr(logging, config.logging_level.upper()))
logger = logging.getLogger(__name__)

# Create FastAPI app
app = FastAPI()

# Include sub-routers
app.include_router(api_router)
#
# Add static files and templates
app.mount("/static", StaticFiles(directory="oufoaler/views/static"), name="static")
templates = Jinja2Templates(directory="oufoaler/views/templates")
#
# Load controllers
car_ctrl = CarController()


@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
cars = [car.__dict__ for car in car_ctrl.get_cars()]
return templates.TemplateResponse(
request=request, name="index.html", context={"cars": cars}
)


# Health Endpoint
@app.get("/health", response_class=JSONResponse)
async def health():
return {"status": "ok", "msg": "healthy"}


def run():
uvicorn.run("oufoaler.app:app", host=config.host, port=config.port, reload=False)


if __name__ == "__main__":
run()
6 changes: 6 additions & 0 deletions oufoaler/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dotenv import load_dotenv

from oufoaler.models.settings import Settings

load_dotenv()
config = Settings() # type: ignore
Empty file.
95 changes: 95 additions & 0 deletions oufoaler/controllers/car_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import requests
from fastapi.logger import logger

from oufoaler.config import config
from oufoaler.models.car import Car


class CarController:
def __init__(self) -> None:
self._cars_cache: list[Car] = list()

def get_cars(self) -> list[Car]:
url = "https://api.chargetrip.io/graphql"
headers = {
"Content-Type": "application/json",
"x-client-id": config.chargetrip_client_id,
"x-app-id": config.chargetrip_app_id,
}

body = '{"query":"query vehicleListAll { vehicleList { id naming { make model version edition chargetrip_version } drivetrain { type } connectors { standard power max_electric_power time speed } adapters { standard power max_electric_power time speed } battery { usable_kwh full_kwh } body { seats } availability { status } range { chargetrip_range { best worst } } media { image { id type url height width thumbnail_url thumbnail_height thumbnail_width } brand { id type url height width thumbnail_url thumbnail_height thumbnail_width } video { id url } } routing { fast_charging_support } connect { providers } } }"}'

try:
response = requests.post(url, headers=headers, data=body)
response.raise_for_status()
response_data = response.json()
# logger.debug(f"GraphQL API Response: {response_data}")
if "errors" in response_data:
error_message = response_data["errors"][0]["message"]
logger.error(f"GraphQL API Error: {error_message}")
raise RuntimeError(f"Failed to fetch cars: {error_message}")

exclude_ids = {"63ef773bc7ac42e426e66301", "63d3e0ce44bd322626dd23f8"}

cars = []
for car in response_data.get("data", {}).get("vehicleList", []):
if car["id"] in exclude_ids:
continue

p_max = 0.0
for connector in car["connectors"]:
p_max = max(p_max, connector["max_electric_power"])

cars.append(
Car(
id=car["id"],
make=car["naming"]["make"],
model=car["naming"]["model"],
version=car["naming"]["chargetrip_version"],
power=p_max,
battery_capacity=float(car["battery"]["usable_kwh"]),
range_best=float(car["range"]["chargetrip_range"]["best"]),
range_worst=float(car["range"]["chargetrip_range"]["worst"]),
image=car["media"]["image"]["url"],
)
)
self._cars_cache = cars
return cars
except Exception as e:
raise RuntimeError(f"Failed to fetch cars: {str(e)}") from e

def calculate_soc_per_km(self, car: Car) -> float:
try:
average_range = (car.range_best + car.range_worst) / 2
if average_range == 0:
raise ZeroDivisionError("Average range cannot be zero.")
soc_per_km = 100.0 / average_range
return soc_per_km
except (TypeError, ZeroDivisionError) as e:
print(f"Error calculating soc_per_km: {e}")
return 0.0

def calculate_max_distance_without_charging(
self, soc_start: float, soc_min: float, soc_per_km: float
) -> float:
try:
if soc_per_km == 0:
raise ValueError("soc_per_km cannot be zero.")
max_distance = (soc_start - soc_min) / soc_per_km
return max_distance
except (TypeError, ValueError) as e:
print(f"Error calculating max distance: {e}")
return 0.0

def get_car_by_id(self, car_id: str) -> Car:
# Force cars fetch if cache is empty
if self._cars_cache.__len__() == 0:
self.get_cars()

print(self._cars_cache)
# Find car in cache
car = next((car for car in self._cars_cache if car.id == car_id), None)
if not car:
raise ValueError(f"Car with id {car_id} not found")

return car
169 changes: 169 additions & 0 deletions oufoaler/controllers/charging_station_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import json
import math
from typing import List

import requests
from shapely import Polygon, box, unary_union

from oufoaler.controllers.itinerary_controller import ItineraryController
from oufoaler.models.car import Car


class ChargingStationsController:
def simplify_geometry(self, area):
simplified_geom = area.simplify(0.01)
return simplified_geom

def round_coordinates(self, area, decimal_places=3):
def round_point(coords):
return tuple(round(coord, decimal_places) for coord in coords)

if area.geom_type == "Polygon":
exterior = [round_point(pt) for pt in area.exterior.coords]
interiors = [
[round_point(pt) for pt in interior.coords]
for interior in area.interiors
]
rounded_geom = Polygon(exterior, interiors)
elif area.geom_type == "MultiPolygon":
rounded_geom = unary_union(
[self.round_coordinates(p, decimal_places) for p in area.geoms]
)
else:
rounded_geom = area
return rounded_geom

def fetch_charging_stations(
self, area: str, car: Car, session: requests.Session
) -> List[dict]:
"""Fetch charging stations from the API for the given search area."""
stations = []
offset = 0
total_count = 1
where_clause = (
f"type_prise like '*T2*' AND "
f"within(geo_point_borne, geom'{area}') AND "
f"puiss_max <= {car.power}"
)

try:
while offset < total_count:
params = {
"select": "*",
"where": where_clause,
"limit": 100,
"offset": offset,
}
response = session.get(
"https://odre.opendatasoft.com/api/explore/v2.1/catalog/datasets/bornes-irve/records",
params=params,
timeout=60,
)
response.raise_for_status()
data = response.json()
batch = data.get("results", [])
stations.extend(batch)
total_count = data.get("total_count", 0)
offset += 100
except requests.exceptions.HTTPError as http_err:
raise http_err
except requests.exceptions.RequestException as req_err:
raise req_err
except json.JSONDecodeError as json_err:
raise json_err
except Exception as e:
raise e
return stations

def split_polygon_into_grid(
self, buffered_projected: Polygon, grid_size: int
) -> List[Polygon]:
minx, miny, maxx, maxy = buffered_projected.bounds
grid_polygons = []
x_steps = math.ceil((maxx - minx) / grid_size)
y_steps = math.ceil((maxy - miny) / grid_size)

for i in range(x_steps):
for j in range(y_steps):
grid_minx = minx + i * grid_size
grid_miny = miny + j * grid_size
grid_maxx = min(grid_minx + grid_size, maxx)
grid_maxy = min(grid_miny + grid_size, maxy)
grid_cell = box(grid_minx, grid_miny, grid_maxx, grid_maxy)
sub_polygon = buffered_projected.intersection(grid_cell)
if not sub_polygon.is_empty:
grid_polygons.append(sub_polygon)
return grid_polygons

def find_charging_stations_near_route(
self, waypoints: list, car: Car, session: requests.Session
) -> List[dict]:
BUFFER_DISTANCE = 20000 # in meters
SUB_POLYGON_SIZE = 300000 # in meters

itinerary_ctrl = ItineraryController()

# Step 1: Create LineString from the points
line = itinerary_ctrl.create_linestring_from_points(waypoints)
if not line:
raise ValueError("Failed to create LineString from waypoints.")

# Step 2: Project to metric CRS (EPSG:3857) for buffering
line_proj = itinerary_ctrl.project_geometry(line)
if not line_proj:
raise ValueError("Failed to project LineString.")

# Step 3: Buffer the LineString
buffered = line_proj.buffer(BUFFER_DISTANCE)

# Step 4: Re-project back to WGS84 (EPSG:4326)
buffered_wgs84 = itinerary_ctrl.project_geometry(
buffered, src_crs="epsg:3857", dst_crs="epsg:4326"
)
if not buffered_wgs84:
raise ValueError("Failed to reproject buffered polygon to WGS84.")

buffered_wgs84 = self.simplify_geometry(buffered_wgs84)
buffered_wgs84 = self.round_coordinates(buffered_wgs84)

# Step 5: Project buffered polygon to metric CRS (EPSG:3857)
buffered_projected = itinerary_ctrl.project_geometry(
buffered_wgs84, src_crs="epsg:4326", dst_crs="epsg:3857"
)
if not buffered_projected:
raise ValueError("Failed to project buffered polygon to metric CRS.")

# Step 6: Split buffered polygon into sub-polygons
if not isinstance(buffered_projected, Polygon):
raise ValueError("Expected a Polygon geometry")
sub_polygons_projected = self.split_polygon_into_grid(
buffered_projected, SUB_POLYGON_SIZE
)

if not sub_polygons_projected:
raise ValueError("No sub-polygons created. Check buffer and grid size.")

# Step 7: Re-project sub-polygons back to WGS84
sub_polygons_wgs84 = [
itinerary_ctrl.project_geometry(
sub_poly, src_crs="epsg:3857", dst_crs="epsg:4326"
)
for sub_poly in sub_polygons_projected
if sub_poly and not sub_poly.is_empty
]

# Step 8: Fetch charging stations for each sub-polygon
all_stations = []
for sub_poly in sub_polygons_wgs84:
wkt = sub_poly.wkt
stations = self.fetch_charging_stations(wkt, car, session)
all_stations.extend(stations)

# Deduplicate stations based on 'id_station'
unique_stations = {
station.get("id_station"): station
for station in all_stations
if station.get("id_station")
}

return list(unique_stations.values())
Loading

0 comments on commit cd88f34

Please sign in to comment.