Skip to content

Commit

Permalink
tracing in dev
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanchon committed Dec 16, 2023
1 parent 48d9c3b commit 040b2af
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
14 changes: 14 additions & 0 deletions guillotina/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@

import guillotina.db.orm.base
import os
import opentelemetry
import pathlib


_zone = tzutc() # utz tz is much faster than local tz info
_marker = object()
tracer = opentelemetry.trace.get_tracer(__name__)


@implementer(IResourceFactory)
Expand Down Expand Up @@ -295,6 +297,7 @@ def _get_transaction(self) -> ITransaction:
return self.__txn__
raise TransactionNotFound()

@tracer.start_as_current_span("Folder.async_contains")
async def async_contains(self, key: str) -> bool:
"""
Asynchronously check if key exists inside this folder
Expand All @@ -303,6 +306,7 @@ async def async_contains(self, key: str) -> bool:
"""
return await self._get_transaction().contains(self.__uuid__, key)

@tracer.start_as_current_span("Folder.async_set")
async def async_set(self, key: str, value: Resource) -> None:
"""
Asynchronously set an object in this folder
Expand All @@ -317,6 +321,7 @@ async def async_set(self, key: str, value: Resource) -> None:
value.__txn__ = trns
trns.register(value)

@tracer.start_as_current_span("Folder.async_get")
async def async_get(self, key: str, default=None, suppress_events=False) -> Optional[IBaseObject]:
"""
Asynchronously get an object inside this folder
Expand All @@ -334,6 +339,7 @@ async def async_get(self, key: str, default=None, suppress_events=False) -> Opti
pass
return default

@tracer.start_as_current_span("Folder.async_multi_get")
async def async_multi_get(
self, keys: List[str], default=None, suppress_events=False
) -> AsyncIterator[IBaseObject]:
Expand All @@ -346,6 +352,7 @@ async def async_multi_get(
async for item in txn.get_children(self, keys): # type: ignore
yield item

@tracer.start_as_current_span("Folder.async_del")
async def async_del(self, key: str) -> None:
"""
Asynchronously delete object in the folder
Expand All @@ -357,18 +364,21 @@ async def async_del(self, key: str) -> None:
if obj is not None:
return txn.delete(obj)

@tracer.start_as_current_span("Folder.async_len")
async def async_len(self) -> int:
"""
Asynchronously calculate the len of the folder
"""
return await self._get_transaction().len(self.__uuid__)

@tracer.start_as_current_span("Folder.async_keys")
async def async_keys(self) -> List[str]:
"""
Asynchronously get the sub object keys in this folder
"""
return await self._get_transaction().keys(self.__uuid__)

@tracer.start_as_current_span("Folder.async_items")
async def async_items(self, suppress_events=False) -> AsyncIterator[Tuple[str, Resource]]:
"""
Asynchronously iterate through contents of folder
Expand All @@ -379,6 +389,7 @@ async def async_items(self, suppress_events=False) -> AsyncIterator[Tuple[str, R
await notify(ObjectLoadedEvent(value))
yield key, value

@tracer.start_as_current_span("Folder.async_values")
async def async_values(self, suppress_events=False) -> AsyncIterator[Tuple[Resource]]:
txn = self._get_transaction()
async for _, value in txn.items(self): # type: ignore
Expand Down Expand Up @@ -552,6 +563,7 @@ def iter_schemata(obj) -> Iterator[Interface]:


@profilable
@tracer.start_as_current_span("create_content")
async def create_content(type_, **kw) -> IResource:
"""Utility to create a content.
Expand All @@ -569,6 +581,7 @@ async def create_content(type_, **kw) -> IResource:


@profilable
@tracer.start_as_current_span("create_content_in_container")
async def create_content_in_container(
parent: Folder, type_: str, id_: str, request: IRequest = None, check_security=True, **kw
) -> Resource:
Expand Down Expand Up @@ -640,6 +653,7 @@ def get_all_behavior_interfaces(content) -> list:
return behaviors


@tracer.start_as_current_span("get_all_behaviors")
async def get_all_behaviors(content, create=False, load=True, preload_only=False) -> list:
schemas = get_all_behavior_interfaces(content)
instances = [schema(content) for schema in schemas]
Expand Down
20 changes: 20 additions & 0 deletions guillotina/db/storages/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ async def get_conflicts(self, txn):
async with self.pool.acquire() as conn:
return await conn.fetch(sql, txn._tid)

@tracer.start_as_current_span("commit")
async def commit(self, transaction):
async with watch_lock(transaction._lock, "commit_txn"):
if transaction._db_txn is not None:
Expand All @@ -1168,6 +1169,7 @@ async def commit(self, transaction):
log.warning("Do not have db transaction to commit")
return transaction._tid

@tracer.start_as_current_span("abort")
async def abort(self, transaction):
async with watch_lock(transaction._lock, "rollback_txn"):
if transaction._db_txn is not None:
Expand All @@ -1182,6 +1184,7 @@ async def abort(self, transaction):
# log.warning('Do not have db transaction to rollback')

# Introspection
@tracer.start_as_current_span("get_page_of_keys")
async def get_page_of_keys(self, txn, oid, page=1, page_size=1000):
keys = []
sql = self._sql.get("BATCHED_GET_CHILDREN_KEYS", self._objects_table_name)
Expand All @@ -1191,13 +1194,15 @@ async def get_page_of_keys(self, txn, oid, page=1, page_size=1000):
keys.append(record["id"])
return keys

@tracer.start_as_current_span("keys")
async def keys(self, txn, oid):
sql = self._sql.get("GET_CHILDREN_KEYS", self._objects_table_name)
async with self.acquire(txn) as conn:
with watch("keys"):
result = await conn.fetch(sql, oid)
return result

@tracer.start_as_current_span("get_child")
async def get_child(self, txn, parent_oid, id):
sql = self._sql.get("GET_CHILD", self._objects_table_name)
result = await self.get_one_row(txn, sql, parent_oid, id, metric="get_child")
Expand All @@ -1206,6 +1211,7 @@ async def get_child(self, txn, parent_oid, id):
result["state"] = await app_settings["state_reader"](result)
return result

@tracer.start_as_current_span("get_children")
async def get_children(self, txn, parent_oid, ids):
sql = self._sql.get("GET_CHILDREN_BATCH", self._objects_table_name)
async with self.acquire(txn) as conn:
Expand All @@ -1217,6 +1223,7 @@ async def get_children(self, txn, parent_oid, ids):
row["state"] = await app_settings["state_reader"](row)
return result

@tracer.start_as_current_span("has_key")
async def has_key(self, txn, parent_oid, id):
sql = self._sql.get("EXIST_CHILD", self._objects_table_name)
result = await self.get_one_row(txn, sql, parent_oid, id, metric="has_key")
Expand All @@ -1225,13 +1232,15 @@ async def has_key(self, txn, parent_oid, id):
else:
return True

@tracer.start_as_current_span("len")
async def len(self, txn, oid):
sql = self._sql.get("NUM_CHILDREN", self._objects_table_name)
async with self.acquire(txn) as conn:
with watch("num_children"):
result = await conn.fetchval(sql, oid)
return result

@tracer.start_as_current_span("items")
async def items(self, txn, oid):
sql = self._sql.get("GET_CHILDREN", self._objects_table_name)
async with self.acquire(txn) as conn:
Expand All @@ -1251,6 +1260,7 @@ async def items(self, txn, oid):
if len(records) < max_records:
break

@tracer.start_as_current_span("get_annotation")
async def get_annotation(self, txn, oid, id):
sql = self._sql.get("GET_ANNOTATION", self._objects_table_name)
result = await self.get_one_row(txn, sql, oid, id, metric="load_annotation")
Expand All @@ -1261,6 +1271,7 @@ async def get_annotation(self, txn, oid, id):
result["state"] = await app_settings["state_reader"](result)
return result

@tracer.start_as_current_span("get_annotations")
async def get_annotations(self, txn, oid, ids):
sql = self._sql.get("GET_ANNOTATIONS", self._objects_table_name)
futures = []
Expand All @@ -1277,6 +1288,7 @@ async def get_annotations(self, txn, oid, ids):
annotations[idx]["state"] = state_data[idx]
return {annotation["id"]: annotation for annotation in annotations}

@tracer.start_as_current_span("get_annotation_keys")
async def get_annotation_keys(self, txn, oid):
sql = self._sql.get("GET_ANNOTATIONS_KEYS", self._objects_table_name)
async with self.acquire(txn) as conn:
Expand All @@ -1288,6 +1300,7 @@ async def get_annotation_keys(self, txn, oid):
items.append(item)
return items

@tracer.start_as_current_span("write_blob_chunk")
async def write_blob_chunk(self, txn, bid, oid, chunk_index, data):
sql = self._sql.get("HAS_OBJECT", self._objects_table_name)
result = await self.get_one_row(txn, sql, oid, metric="has_object")
Expand All @@ -1307,10 +1320,12 @@ async def write_blob_chunk(self, txn, bid, oid, chunk_index, data):
with watch("store_blob_chunk"):
return await conn.execute(sql, bid, oid, chunk_index, data)

@tracer.start_as_current_span("read_blob_chunk")
async def read_blob_chunk(self, txn, bid, chunk=0):
sql = self._sql.get("READ_BLOB_CHUNK", self._blobs_table_name)
return await self.get_one_row(txn, sql, bid, chunk, metric="load_blob_chunk")

@tracer.start_as_current_span("read_blob_chunks")
async def read_blob_chunks(self, txn, bid):
async with self.acquire(txn) as conn:
with watch("read_blob_chunks"):
Expand All @@ -1320,26 +1335,30 @@ async def read_blob_chunks(self, txn, bid):
# sub-queries and they you end up with a deadlock
yield record

@tracer.start_as_current_span("del_blob")
async def del_blob(self, txn, bid):
sql = self._sql.get("DELETE_BLOB", self._blobs_table_name)
async with self.acquire(txn) as conn:
with watch("delete_blob_chunk"):
await conn.execute(sql, bid)

@tracer.start_as_current_span("get_total_number_of_objects")
async def get_total_number_of_objects(self, txn):
sql = self._sql.get("NUM_ROWS", self._objects_table_name)
async with self.acquire(txn) as conn:
with watch("total_objects"):
result = await conn.fetchval(sql)
return result

@tracer.start_as_current_span("get_total_number_of_resources")
async def get_total_number_of_resources(self, txn):
sql = self._sql.get("NUM_RESOURCES", self._objects_table_name)
async with self.acquire(txn) as conn:
with watch("total_resources"):
result = await conn.fetchval(sql)
return result

@tracer.start_as_current_span("get_total_resources_of_type")
async def get_total_resources_of_type(self, txn, type_):
sql = self._sql.get("NUM_RESOURCES_BY_TYPE", self._objects_table_name)
async with self.acquire(txn) as conn:
Expand All @@ -1348,6 +1367,7 @@ async def get_total_resources_of_type(self, txn, type_):
return result

# Massive treatment without security
@tracer.start_as_current_span("_get_page_resources_of_type")
async def _get_page_resources_of_type(self, txn, type_, page, page_size):
async with self.acquire(txn) as conn:
keys = []
Expand Down

0 comments on commit 040b2af

Please sign in to comment.