-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding function under dedicated object (#68)
* move function towards object * adding refresh button per edge * refacto and linting * adding limitation * clean * refreshing by edge * make edges names dynamic * adding logs * remove extra idem ids * remove logs * rollback edge_data_manager * lint * rollback item.id --------- Co-authored-by: gireg.roussel <gireg.roussel@octo.com>
- Loading branch information
1 parent
105b550
commit e104195
Showing
9 changed files
with
217 additions
and
197 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import json | ||
import os | ||
from typing import List, Optional | ||
|
||
from dotenv import load_dotenv | ||
from google.api_core.exceptions import NotFound | ||
from google.cloud.storage import Client | ||
|
||
load_dotenv() | ||
|
||
BUCKET_NAME = os.getenv("GCP_BUCKET_NAME") | ||
|
||
|
||
class GCPBinaryStorage: | ||
def __init__(self, prefix: str = ""): | ||
self.prefix = prefix | ||
self.bucket = Client().bucket(BUCKET_NAME) | ||
|
||
def get_edges_names(self) -> List[str]: | ||
iterator = self.bucket.list_blobs(prefix=self.prefix, delimiter="/") | ||
response = iterator._get_next_page_response() | ||
edges_names = [prefix.rstrip("/") for prefix in response["prefixes"]] | ||
return edges_names | ||
|
||
def get_text_blob(self, blobname: str) -> str: | ||
blob = self.bucket.blob(blobname) | ||
try: | ||
text_blob = blob.download_as_text() | ||
except NotFound as e: | ||
print(f"Blob not found for {blobname}") | ||
print(f"Error: {e}") | ||
text_blob = None | ||
return text_blob | ||
|
||
def extract_metadata( | ||
self, edge_name: str, use_case: str, item_id: str | ||
) -> Optional[dict]: | ||
metadata_blobname = f"{edge_name}/{use_case}/{item_id}/metadata.json" | ||
|
||
return json.loads(self.get_text_blob(metadata_blobname)) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import os | ||
from io import BytesIO | ||
from typing import List, Optional | ||
|
||
from dotenv import load_dotenv | ||
from google.cloud.storage.blob import Blob | ||
from PIL import Image | ||
from pydantic import BaseModel | ||
|
||
from gcp_binary_storage import GCPBinaryStorage | ||
from models.use_case import UseCase | ||
from utils.prediction_boxes import filter_inferences_on_camera_id, plot_predictions | ||
|
||
load_dotenv() | ||
|
||
BUCKET_NAME = os.getenv("GCP_BUCKET_NAME") | ||
IMG_EXTENSIONS = [".jpg", ".jpeg", ".png"] | ||
NUMBER_CAMERAS = os.getenv("NUMBER_CAMERAS", 2) | ||
|
||
|
||
class EdgeData(BaseModel): | ||
name: str | ||
edge_ip: Optional[str] = None | ||
use_cases: List[UseCase] = [] | ||
|
||
limit_blob_extracted: int = 20 | ||
|
||
def add_usecase(self, use_case_name: str, edge_ip: str): | ||
self.edge_ip = edge_ip | ||
self.use_cases.append(UseCase(name=use_case_name)) | ||
|
||
def get_use_case_names(self) -> List[str]: | ||
return [use_case.name for use_case in self.use_cases] | ||
|
||
def get_use_case(self, name: str) -> Optional[UseCase]: | ||
for use_case in self.use_cases: | ||
if use_case.name == name: | ||
return use_case | ||
return None | ||
|
||
def get_ip(self, gcp_client: GCPBinaryStorage): | ||
print(f"Getting IP for edge: {self.name}") | ||
ip_blobname = f"{self.name}/edge_ip.txt" | ||
self.edge_ip = gcp_client.get_text_blob(ip_blobname) | ||
|
||
def extract(self, gcp_client: GCPBinaryStorage): | ||
print(f"Extracting data for edge: {self.name}") | ||
|
||
blobs = gcp_client.bucket.list_blobs(prefix=self.name) | ||
|
||
blobs_images = [ | ||
blob | ||
for blob in blobs | ||
if any(blob.name.endswith(extension) for extension in IMG_EXTENSIONS) | ||
] | ||
|
||
blobs_sorted = sorted(blobs_images, key=lambda x: x.time_created, reverse=True)[ | ||
: self.limit_blob_extracted | ||
] | ||
|
||
self.package(blobs_sorted, gcp_client) | ||
|
||
def extract_item_ids(self, blobs: List[Blob]): | ||
item_ids = [] | ||
for blob in blobs: | ||
item_id = blob.name.split("/")[2] | ||
item_ids.append(item_id) | ||
return item_ids | ||
|
||
def package(self, blobs: List[Blob], gcp_client: GCPBinaryStorage): | ||
cloud_item_ids = self.extract_item_ids(blobs) | ||
for blob in blobs: | ||
blob_name = blob.name | ||
|
||
blob_name_split = blob_name.split("/") | ||
edge_name = blob_name_split[0] | ||
use_case_name = blob_name_split[1] | ||
item_id = blob_name_split[2] | ||
file_name = blob_name_split[-1] | ||
camera_id = file_name.split(".")[0] | ||
|
||
if use_case_name not in self.get_use_case_names(): | ||
self.add_usecase(use_case_name, self.edge_ip) | ||
|
||
use_case = self.get_use_case(use_case_name) | ||
|
||
use_case_item_ids = use_case.get_item_ids() | ||
extra_item_ids = list(set(use_case_item_ids) - set(cloud_item_ids)) | ||
if extra_item_ids != []: | ||
for item_id in extra_item_ids: | ||
use_case.remove_item(item_id) | ||
continue | ||
if item_id not in use_case_item_ids: | ||
metadata = gcp_client.extract_metadata( | ||
edge_name, use_case_name, item_id | ||
) | ||
use_case.add_item(item_id, blob.time_created, metadata) | ||
|
||
item = use_case.get_item(item_id) | ||
if camera_id not in item.get_camera_ids(): | ||
item.add_camera(camera_id) | ||
|
||
if any(file_name.endswith(extension) for extension in IMG_EXTENSIONS): | ||
# Downloading the first NUMBER_CAMERAS pics | ||
if item.number_pictures < NUMBER_CAMERAS: | ||
binary_data = blob.download_as_bytes() | ||
picture = Image.open(BytesIO(binary_data)) | ||
|
||
# If metadata is not empty, we plot the predictions | ||
if item.contains_predictions(camera_id): | ||
camera_inferences_metadata = filter_inferences_on_camera_id( | ||
camera_id, item.metadata | ||
) | ||
if camera_inferences_metadata: | ||
picture = plot_predictions( | ||
picture, camera_inferences_metadata | ||
) | ||
|
||
camera = item.get_camera(camera_id) | ||
camera.add_picture(picture) | ||
item.number_pictures += 1 |
Oops, something went wrong.