Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.0.7rc2 #107

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 83 additions & 45 deletions gen3_tracker/gen3/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,52 @@
from gen3_tracker import Config
from gen3_tracker.common import Push, Commit
from gen3_tracker.gen3.indexd import write_indexd
from gen3_tracker.git import calculate_hash, DVC, run_command, DVCMeta, DVCItem, modified_date
from gen3_tracker.git import (
calculate_hash,
DVC,
run_command,
DVCMeta,
DVCItem,
modified_date,
)


def _validate_parameters(from_: str) -> pathlib.Path:

assert len(urlparse(from_).scheme) == 0, f"{from_} appears to be an url. url to url cp not supported"
assert (
len(urlparse(from_).scheme) == 0
), f"{from_} appears to be an url. url to url cp not supported"

return from_


def cp(config: Config,
from_: str,
project_id: str,
ignore_state: bool,
auth=None,
user=None,
object_name=None,
bucket_name=None,
metadata: dict = {},
):
def cp(
config: Config,
from_: str,
project_id: str,
ignore_state: bool,
auth=None,
user=None,
object_name=None,
bucket_name=None,
metadata: dict = {},
):
"""Copy meta to bucket, used by etl_pod job"""
from_ = _validate_parameters(str(from_))
if not isinstance(from_, pathlib.Path):
from_ = pathlib.Path(from_)

assert auth, "auth is required"

metadata = dict({'submitter': None, 'metadata_version': '0.0.1', 'is_metadata': True} | metadata)
if not metadata['submitter']:
metadata = dict(
{"submitter": None, "metadata_version": "0.0.1", "is_metadata": True} | metadata
)
if not metadata["submitter"]:
if not user:
user = auth.curl('/user/user').json()
metadata['submitter'] = user['name']
user = auth.curl("/user/user").json()
metadata["submitter"] = user["name"]

program, project = project_id.split('-')
program, project = project_id.split("-")

assert bucket_name, f"could not find bucket for {program}"

Expand All @@ -57,27 +69,26 @@ def cp(config: Config,

if not object_name:
now = datetime.now().strftime("%Y%m%d-%H%M%S")
object_name = f'_{project_id}-{now}_meta.zip'
object_name = f"_{project_id}-{now}_meta.zip"

zipfile_path = temp_dir / object_name
with ZipFile(zipfile_path, 'w') as zip_object:
with ZipFile(zipfile_path, "w") as zip_object:
for _ in from_.glob("*.ndjson"):
zip_object.write(_)

stat = zipfile_path.stat()
md5_sum = calculate_hash('md5', zipfile_path)
md5_sum = calculate_hash("md5", zipfile_path)
my_dvc = DVC(
meta=DVCMeta(),
outs=[
DVCItem(
path=object_name,
md5=md5_sum,
hash='md5',
hash="md5",
modified=modified_date(zipfile_path),
size=stat.st_size,

)
]
],
)

metadata = write_indexd(
Expand All @@ -92,64 +103,91 @@ def cp(config: Config,
# document = file_client.upload_file_to_guid(guid=id_, file_name=object_name, bucket=bucket_name)
# print(document, file=sys.stderr)

run_command(f"gen3-client upload-single --bucket {bucket_name} --guid {my_dvc.object_id} --file {zipfile_path} --profile {config.gen3.profile}", no_capture=False)
run_command(
f"gen3-client upload-single --bucket {bucket_name} --guid {my_dvc.object_id} --file {zipfile_path} --profile {config.gen3.profile}",
no_capture=False,
)

return {'msg': f"Uploaded {zipfile_path} to {bucket_name}", "object_id": my_dvc.object_id, "object_name": object_name}
return {
"msg": f"Uploaded {zipfile_path} to {bucket_name}",
"object_id": my_dvc.object_id,
"object_name": object_name,
}


def publish_commits(config: Config, wait: bool, auth: Gen3Auth, bucket_name: str, spinner=None) -> dict:
def publish_commits(
config: Config, wait: bool, auth: Gen3Auth, bucket_name: str, spinner=None
) -> dict:
"""Publish commits to the portal."""

# TODO legacy fhir-import-export job: copies meta to bucket and triggers job,
# meta information is already in git REPO,
# we should consider changing the fhir_import_export job to use the git REPO

user = auth.curl('/user/user').json()
user = auth.curl("/user/user").json()

# copy meta to bucket
upload_result = cp(
config=config,
from_='META',
from_="META",
project_id=config.gen3.project_id,
ignore_state=True,
auth=auth,
user=user,
bucket_name=bucket_name
bucket_name=bucket_name,
)

object_id = upload_result['object_id']
object_id = upload_result["object_id"]

push = Push(config=config)
jobs_client = Gen3Jobs(auth_provider=auth)

# create "legacy" commit object, read by fhir-import-export job
push.commits.append(Commit(object_id=object_id, message='From g3t-git', meta_path=upload_result['object_name'], commit_id=object_id))
args = {'push': push.model_dump(), 'project_id': config.gen3.project_id, 'method': 'put'}
push.commits.append(
Commit(
object_id=object_id,
message="From g3t-git",
meta_path=upload_result["object_name"],
commit_id=object_id,
)
)
args = {
"push": push.model_dump(),
"project_id": config.gen3.project_id,
"method": "put",
}

# capture logging from gen3.jobs
from cdislogging import get_logger # noqa

cdis_logging = get_logger("__name__")
cdis_logging.setLevel(logging.WARN)

if wait:
# async_run_job_and_wait monkeypatched below
_ = asyncio.run(jobs_client.async_run_job_and_wait(job_name='fhir_import_export', job_input=args, spinner=spinner))
_ = asyncio.run(
jobs_client.async_run_job_and_wait(
job_name="fhir_import_export", job_input=args, spinner=spinner
)
)
else:
_ = jobs_client.create_job('fhir_import_export', args)
_ = jobs_client.create_job("fhir_import_export", args)

if not isinstance(_, dict):
_ = {'output': _}
if isinstance(_['output'], str):
_ = {"output": _}
if isinstance(_["output"], str):
try:
_['output'] = json.loads(_['output'])
_["output"] = json.loads(_["output"])
except json.JSONDecodeError:
pass
return _


# monkey patch for gen3.jobs.Gen3Jobs.async_run_job_and_wait
# make it less noisy and sleep less (max of 30 seconds)
async def async_run_job_and_wait(self, job_name, job_input, spinner=None, _ssl=None, **kwargs):
async def async_run_job_and_wait(
self, job_name, job_input, spinner=None, _ssl=None, **kwargs
):
"""
Asynchronous function to create a job, wait for output, and return. Will
sleep in a linear delay until the job is done, starting with 1 second.
Expand Down Expand Up @@ -188,12 +226,12 @@ async def async_run_job_and_wait(self, job_name, job_input, spinner=None, _ssl=N
if status.get("status") != "Completed":
# write failed output to log file before raising exception
response = await self.async_get_output(job_create_response.get("uid"))
with open("logs/publish.log", 'a') as f:
log_msg = {'timestamp': datetime.now(pytz.UTC).isoformat()}
log_msg.update(response)
f.write(json.dumps(log_msg, separators=(',', ':')))
f.write('\n')
with open("logs/publish.log", "a") as f:
log_msg = {"timestamp": datetime.now(pytz.UTC).isoformat()}
log_msg.update(response)
f.write(json.dumps(log_msg, separators=(",", ":")))
f.write("\n")

raise Exception(f"Job status not complete: {status.get('status')}")

response = await self.async_get_output(job_create_response.get("uid"))
Expand Down
Loading
Loading