Skip to content

Commit

Permalink
📝 Updated notebooks
Browse files Browse the repository at this point in the history
  • Loading branch information
ronaldokun committed Aug 10, 2024
1 parent 5237a4a commit 56d9595
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 116 deletions.
1 change: 0 additions & 1 deletion fiscaliza/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def extract_value(field: str | int | dict | list) -> str | int | list:

elif isinstance(field, (int, float)) or field is None:
return field

else:
raise TypeError(
f"O tipo de campo {type(field)} não é suportado. "
Expand Down
207 changes: 149 additions & 58 deletions nbs/00_main.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@
"#| export\n",
"load_dotenv(override=True)\n",
"\n",
"UTFCHARS = re.compile(r\"[!\\\"#$%&'\\(\\)*+\\,\\-\\.\\/:;<=>\\?@\\[\\\\\\]\\^`_\\{\\|\\}~]\")\n"
"UTFCHARS = re.compile(r\"[!\\\"#$%&'\\(\\)*+\\,\\-\\.\\/:;<=>\\?@\\[\\\\\\]\\^`_\\{\\|\\}~]\")\n",
"\n",
"CONDITIONAL_FIELDS = {\n",
" k: v.reset(v) for k, v in FIELDS.items() if getattr(v, \"mapping\", False)\n",
"}\n"
]
},
{
Expand Down Expand Up @@ -112,8 +116,8 @@
" ) from e\n",
" return fiscaliza\n",
"\n",
" def get_issue(self, issue: str) -> dict:\n",
" return Issue(self.client, issue)\n"
" def get_issue(self, issue_id: str | int):\n",
" return Issue(self.client, issue_id)\n"
]
},
{
Expand All @@ -138,12 +142,23 @@
" )\n",
" self._ascii2utf = {}\n",
"\n",
" @property\n",
" def current_user(self):\n",
" user = dict(list(self.client.user.get(\"current\")))\n",
" return f'{user.get(\"firstname\", \"\")} {user.get(\"lastname\", \"\")}'\n",
"\n",
" @staticmethod\n",
" def __format_json_string(field: str) -> str:\n",
" \"\"\"Recebe uma string formatada como json e retorna a mesma string formatada como json\"\"\"\n",
" string = field.replace(\"'\", '\"').replace(\"=>\", \": \")\n",
"\n",
" def force_strings(obj):\n",
" if isinstance(obj, dict):\n",
" return {k: str(v) for k, v in obj.items()}\n",
" return obj\n",
"\n",
" try:\n",
" return json.loads(string)\n",
" return json.loads(string, object_hook=force_strings)\n",
" except (json.JSONDecodeError, TypeError):\n",
" return string\n",
"\n",
Expand All @@ -152,7 +167,9 @@
" \"\"\"Recebe uma string formatada como json e extrai os valores das chaves de acordo com o tipo de campo\"\"\"\n",
" if isinstance(field, str):\n",
" json_obj = Issue.__format_json_string(field)\n",
" if isinstance(json_obj, (str, int, float)):\n",
" if isinstance(json_obj, (int, float)):\n",
" return str(json_obj)\n",
" elif isinstance(json_obj, str):\n",
" return json_obj\n",
" return Issue.extract_value(json_obj)\n",
"\n",
Expand All @@ -166,7 +183,6 @@
"\n",
" elif isinstance(field, (int, float)) or field is None:\n",
" return field\n",
"\n",
" else:\n",
" raise TypeError(\n",
" f\"O tipo de campo {type(field)} não é suportado. \"\n",
Expand Down Expand Up @@ -285,8 +301,8 @@
" except ValueError:\n",
" pass\n",
" attrs |= {\n",
" \"latitude_coordenadas\": lat,\n",
" \"longitude_coordenadas\": long,\n",
" \"latitude_coordenadas\": str(lat),\n",
" \"longitude_coordenadas\": str(long),\n",
" }\n",
" if coords := attrs.pop(\"coordenadas_estacao\", None):\n",
" coords = Issue.__format_json_string(coords)\n",
Expand All @@ -296,8 +312,8 @@
" except ValueError:\n",
" pass\n",
" attrs |= {\n",
" \"latitude_da_estacao\": lat,\n",
" \"longitude_da_estacao\": long,\n",
" \"latitude_da_estacao\": str(lat),\n",
" \"longitude_da_estacao\": str(long),\n",
" }\n",
" return attrs\n",
"\n",
Expand Down Expand Up @@ -349,14 +365,14 @@
" return {k: attrs[k] for k in sorted(attrs)}\n",
"\n",
" @staticmethod\n",
" def _append_irregularity_options(\n",
" tipo_de_inspecao: str, editable_fields: dict\n",
" ) -> dict:\n",
" def _append_irregularity_options(editable_fields: dict) -> dict:\n",
" \"\"\"\n",
" Appends the options of the 'irregularidade' field to the editable_fields dictionary.\n",
" \"\"\"\n",
" if editable_fields.get(\"irregularidade\"):\n",
" match tipo_de_inspecao:\n",
" if (\n",
" \"irregularidade\" in editable_fields\n",
" ): # checking only the existence of the key, because it can be an empty list\n",
" match editable_fields[\"tipo_de_inspecao\"].value:\n",
" case \"Certificação\":\n",
" options = [\n",
" \"Comercialização de produtos\",\n",
Expand All @@ -375,20 +391,43 @@
" \"Outras irregularidades técnicas (especificar)\",\n",
" \"Potência diversa da autorizada\",\n",
" ]\n",
" case \"Uso do Espectro - Não Outorgado\":\n",
" options = [\"Entidade não outorgada\"]\n",
" case __:\n",
" options = []\n",
"\n",
" editable_fields[\"irregularidade\"].options = options\n",
" return editable_fields\n",
"\n",
" # @staticmethod\n",
" # def _cast_and_set(field, value):\n",
" # try:\n",
" # match field.dtype:\n",
" # case \"string\":\n",
" # value = str(value)\n",
" # case \"int\":\n",
" # value = int(value)\n",
" # case \"float\":\n",
" # value = float(value)\n",
" # case \"list\":\n",
" # value = listify(value)\n",
" # case _:\n",
" # print(f\"Unknown dtype {field.dtype}, casting skipped...\")\n",
" # except ValueError as e:\n",
" # print(\n",
" # f\"Error casting {field.name} value {value} to dtype {field.dtype}: {e}\"\n",
" # )\n",
" # setattr(field, \"value\", value)\n",
" # return field\n",
"\n",
" @cached_property\n",
" def editable_fields(self) -> dict:\n",
" \"\"\"Retrieves the editable fields of an issue as a dictionary.\"\"\"\n",
" editable_fields = {}\n",
" keys_by_id = sorted(FIELDS.keys(), key=lambda x: getattr(FIELDS[x], \"id\", 0))\n",
" fields = {k: FIELDS[k] for k in keys_by_id}\n",
" for key, field in fields.items():\n",
" if key in self.attrs:\n",
" fields = {k: FIELDS[k].reset(FIELDS[k]) for k in keys_by_id}\n",
" for key in self.attrs:\n",
" if field := fields.get(key):\n",
" if hasattr(field, \"options\"):\n",
" if field.multiple:\n",
" self.attrs[key] = [str(k) for k in self.attrs[key]]\n",
Expand All @@ -400,11 +439,8 @@
" elif key == \"fiscal_responsavel\":\n",
" setattr(field, \"options\", [\"\"] + self.attrs[\"membros\"])\n",
" editable_fields[key] = field\n",
" if tipo_de_inspecao := self.attrs.get(\"tipo_de_inspecao\"):\n",
" editable_fields = self._append_irregularity_options(\n",
" tipo_de_inspecao, editable_fields\n",
" )\n",
" return editable_fields\n",
" editable_fields = self._append_irregularity_options(editable_fields)\n",
" return self._update_fields(self.attrs, editable_fields)\n",
"\n",
" def mandatory_fields(self) -> dict:\n",
" return {\n",
Expand All @@ -420,43 +456,89 @@
" if getattr(v, \"mapping\", False)\n",
" }\n",
"\n",
" def update_fields(self, dados: dict) -> dict:\n",
" @staticmethod\n",
" def _fields_derived_from_select_conditional(key, value) -> dict:\n",
" \"\"\"\n",
" Check if the data to be submitted to the Fiscaliza server is complete and valid.\n",
" Fill in the fields resulted from values on other fields.\n",
" \"\"\"\n",
" dependent_fields = {}\n",
" if field := CONDITIONAL_FIELDS.get(key):\n",
" for val in listify(\n",
" value\n",
" ): # abusing use of empty defaults from dict.get to avoid if clauses\n",
" val = str(val)\n",
" if field.options:\n",
" assert (\n",
" val in field.options\n",
" ), f\"Opção inválida para o campo {key}: {val}\"\n",
" for new_key in field.mapping.get(val, []):\n",
" if new_key not in dependent_fields:\n",
" dependent_fields[new_key] = FIELDS[new_key].reset(\n",
" FIELDS[new_key]\n",
" )\n",
"\n",
" for key, field in self.conditional_fields().items():\n",
" if key in dados:\n",
" if hasattr(self, \"editable_fields\"):\n",
" del self.editable_fields\n",
" new_fields = set()\n",
" all_fields = {\n",
" item for values in field.mapping.values() for item in values\n",
" }\n",
" for option in listify(dados[key]):\n",
" if field.options:\n",
" assert (\n",
" option in field.options\n",
" ), f\"Opção inválida para o campo {key}: {option}\"\n",
"\n",
" if fields := field.mapping.get(option):\n",
" new_fields.update(fields)\n",
"\n",
" self.editable_fields = {\n",
" k: v\n",
" for k, v in self.editable_fields.items()\n",
" if k not in all_fields.difference(new_fields)\n",
" }\n",
" return dependent_fields\n",
"\n",
" self.editable_fields |= {k: FIELDS[k] for k in new_fields}\n",
" @staticmethod\n",
" def _keys_unrelated_from_select_conditional(key, value) -> dict:\n",
" \"\"\"\n",
" Fill in the fields resulted from values on other fields.\n",
" \"\"\"\n",
" unrelated_fields = set()\n",
" if field := CONDITIONAL_FIELDS.get(key):\n",
" for val in listify(\n",
" value\n",
" ): # abusing use of empty defaults from dict.get to avoid if clauses\n",
" val = str(val)\n",
" if field.options:\n",
" assert (\n",
" val in field.options\n",
" ), f\"Opção inválida para o campo {key}: {val}\"\n",
" for options in field.mapping.values():\n",
" for option in options:\n",
" if option not in field.mapping.get(val, []):\n",
" unrelated_fields.add(option)\n",
" return unrelated_fields\n",
"\n",
" @staticmethod\n",
" def _update_options_for_each_conditional(dados: dict) -> tuple[dict, set]:\n",
" dependent_fields = {}\n",
" unrelated_keys = set()\n",
" for key, value in dados.items():\n",
" dependent_fields |= Issue._fields_derived_from_select_conditional(\n",
" key, value\n",
" )\n",
" unrelated_keys.difference_update(dependent_fields)\n",
" unrelated_keys.update(\n",
" Issue._keys_unrelated_from_select_conditional(key, value)\n",
" )\n",
" return dependent_fields, unrelated_keys\n",
"\n",
" @staticmethod\n",
" def _update_fields(dados: dict, editable_fields: dict) -> dict:\n",
" \"\"\"\n",
" Check if the data to be submitted to the Fiscaliza server is complete and valid.\n",
" \"\"\"\n",
" insert, delete = Issue._update_options_for_each_conditional(dados)\n",
" for key, value in insert.items():\n",
" if key not in editable_fields:\n",
" editable_fields[key] = value\n",
" for key in delete:\n",
" if key in editable_fields:\n",
" del editable_fields[key]\n",
" return editable_fields\n",
"\n",
" def update_fields(self, dados: dict):\n",
" \"\"\"\n",
" Check if the data to be submitted to the Fiscaliza server is complete and valid.\n",
" \"\"\"\n",
" self.editable_fields = self._update_fields(dados, self.editable_fields)\n",
"\n",
" for key, value in dados.items():\n",
" if key in self.editable_fields:\n",
" self.editable_fields[key](value)\n",
" if key == \"tipo_de_inspecao\":\n",
" self.editable_fields = self._append_irregularity_options(\n",
" value, self.editable_fields\n",
" )\n",
"\n",
" self.editable_fields = self._append_irregularity_options(self.editable_fields)\n",
"\n",
" def _get_id_only_fields(self, data: dict) -> dict:\n",
" if status := data.get(\"status\"):\n",
Expand All @@ -474,7 +556,9 @@
" (\"latitude_coordenadas\" in data) and (\"longitude_coordenadas\" in data)\n",
" ): # Don't use numeric data that could be zero in clauses, that why the 'in' is here and not := dados.get(...)\n",
" newkey = \"coordenadas_geograficas\"\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey]\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey].reset(\n",
" SPECIAL_FIELDS[newkey]\n",
" )\n",
" self.editable_fields.pop(\"latitude_coordenadas\", None)\n",
" self.editable_fields.pop(\"longitude_coordenadas\", None)\n",
" data[newkey] = (\n",
Expand All @@ -488,7 +572,9 @@
" )\n",
" if (\"latitude_da_estacao\" in data) and (\"longitude_da_estacao\" in data):\n",
" newkey = \"coordenadas_estacao\"\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey]\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey].reset(\n",
" SPECIAL_FIELDS[newkey]\n",
" )\n",
" self.editable_fields.pop(\"latitude_da_estacao\", None)\n",
" self.editable_fields.pop(\"longitude_da_estacao\", None)\n",
" data[newkey] = (\n",
Expand All @@ -512,7 +598,9 @@
" raise ValueError(\n",
" \"Para gerar o PLAI é necessário fornecer o tipo do processo e as coordenação da FI\"\n",
" )\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey]\n",
" self.editable_fields[newkey] = SPECIAL_FIELDS[newkey].reset(\n",
" SPECIAL_FIELDS[newkey]\n",
" )\n",
" data[newkey] = (tipo_processo_plai, coords_fi_plai)\n",
"\n",
" return data\n",
Expand Down Expand Up @@ -608,10 +696,11 @@
" ],\n",
" )\n",
"\n",
" def update(self, dados: dict):\n",
" def update(self, dados: dict) -> str:\n",
" \"\"\"Updates an issue with the given data.\"\"\"\n",
" self.refresh()\n",
" status = self.editable_fields[\"status\"].value\n",
" message = \"\"\n",
" for new_status in FLOW[status]:\n",
" status_id = STATUS[new_status]\n",
" if subset := STATES.get(new_status):\n",
Expand All @@ -621,8 +710,10 @@
" else:\n",
" data = self._parse_value_dict(dados)\n",
" self.client.issue.update(self.id, status_id=status_id, **data)\n",
" print(f\"Atualizado para o status {new_status}\")\n",
" self.refresh()"
" message = f'A Inspeção nº {self.id} foi atualizada. O seu estado atual é \"{new_status}\".'\n",
" self.refresh()\n",
"\n",
" return message"
]
},
{
Expand Down
Loading

0 comments on commit 56d9595

Please sign in to comment.