Skip to content

Commit

Permalink
Merge pull request #14 from Bayer-Group/namer
Browse files Browse the repository at this point in the history
Namer
  • Loading branch information
jack-e-tabaska authored Sep 26, 2022
2 parents 3d247c4 + d8d74cf commit 829affa
Show file tree
Hide file tree
Showing 36 changed files with 494 additions and 144 deletions.
2 changes: 1 addition & 1 deletion bclaw_runner/Dockerfile.alpine
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/python:3.9.9-alpine3.15 AS base
FROM public.ecr.aws/docker/library/python:3.10.7-alpine3.16 AS base

LABEL maintainer="jack.tabaska@bayer.com"

Expand Down
1 change: 0 additions & 1 deletion bclaw_runner/src/runner/custom_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def format(self, record: logging.LogRecord):
"bucket": os.environ.get("BC_LAUNCH_BUCKET"),
"key": os.environ.get("BC_LAUNCH_KEY"),
"version": os.environ.get("BC_LAUNCH_VERSION"),
"s3_request_id": os.environ.get("BC_LAUNCH_S3_REQUEST_ID"),
},
"sfn_execution_id": os.environ.get("BC_EXECUTION_ID"),
"branch": os.environ.get("BC_BRANCH_IDX"),
Expand Down
10 changes: 6 additions & 4 deletions bclaw_runner/src/runner/dind.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def run_child_container(image_tag: str, command: str, parent_workspace: str, par
ret = 255
with closing(docker.client.from_env()) as docker_client:
child_image = pull_image(docker_client, image_tag)

logger.info("---------- starting user command block ----------")
container = docker_client.containers.run(child_image.tags[0], command,
cpu_shares=cpu_shares,
detach=True,
Expand All @@ -132,15 +134,15 @@ def run_child_container(image_tag: str, command: str, parent_workspace: str, par
with closing(container.logs(stream=True)) as fp:
for line in fp:
logger.info(line.decode("utf-8"))
logger.info("subprocess exited")

except Exception:
logger.exception("error during subprocess logging: ")
logger.exception("----- error during subprocess logging: ")
container.reload()
logger.info(f"subprocess status is {container.status}")
logger.warning("continuing without subprocess logging")
logger.info(f"----- subprocess status is {container.status}")
logger.warning("----- continuing without subprocess logging")

finally:
logger.info("---------- end of user command block ----------")
response = container.wait()
container.remove()
ret = response.get("StatusCode", 1)
Expand Down
24 changes: 17 additions & 7 deletions bclaw_runner/src/runner/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,25 @@ def check_for_previous_run(self) -> bool:
Returns:
True if this step has been run before
"""
ret = self._s3_file_exists(self.qualify(self.run_status_obj))
try:
ret = self._s3_file_exists(self.qualify(self.run_status_obj))
except Exception:
logger.warning("unable to query previous run status, assuming none")
ret = False
return ret

def clear_run_status(self) -> None:
s3 = boto3.resource("s3")
status_obj = s3.Object(self.bucket, self.qualify(self.run_status_obj))
status_obj.delete()
try:
s3 = boto3.resource("s3")
status_obj = s3.Object(self.bucket, self.qualify(self.run_status_obj))
status_obj.delete()
except Exception:
logger.warning("unable to clear previous run status")

def put_run_status(self) -> None:
s3 = boto3.resource("s3")
status_obj = s3.Object(self.bucket, self.qualify(self.run_status_obj))
status_obj.put(Body=b"", Metadata=_file_metadata())
try:
s3 = boto3.resource("s3")
status_obj = s3.Object(self.bucket, self.qualify(self.run_status_obj))
status_obj.put(Body=b"", Metadata=_file_metadata())
except Exception:
logger.warning("failed to upload run status")
7 changes: 2 additions & 5 deletions bclaw_runner/src/runner/runner_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def main(commands: List[str],

# mark job complete on success
if status == 0:
try:
repo.put_run_status()
except RuntimeError:
logger.warning("failed to upload run status")
repo.put_run_status()

except Exception as e:
logger.exception("runner failed")
Expand All @@ -115,7 +112,7 @@ def cli() -> int:
with spot_termination_checker():
args = docopt(__doc__, version=VERSION)

logger.info(json.dumps(args, indent=4))
logger.info(f"{args = }")

commands = json.loads(args["--cmd"])
image = args["--image"]
Expand Down
2 changes: 0 additions & 2 deletions bclaw_runner/tests/test_custom_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def test_JSONFormatter(monkeypatch):
monkeypatch.setenv("BC_LAUNCH_KEY", "testJobFile")
monkeypatch.setenv("BC_LAUNCH_VERSION", "testVersion")
monkeypatch.setenv("BC_LAUNCH_BUCKET", "testLaunchBucket")
monkeypatch.setenv("BC_LAUNCH_S3_REQUEST_ID", "1234567890")
monkeypatch.setenv("BC_STEP_NAME", "testStepName")
monkeypatch.setenv("BC_WORKFLOW_NAME", "testWorkflowName")
monkeypatch.setenv("AWS_BATCH_JOB_ID", "0987654321")
Expand All @@ -38,7 +37,6 @@ def test_JSONFormatter(monkeypatch):
"bucket": "testLaunchBucket",
"key": "testJobFile",
"version": "testVersion",
"s3_request_id": "1234567890",
},
"level": logging.getLevelName(10),
"message": "test message",
Expand Down
2 changes: 0 additions & 2 deletions bclaw_runner/tests/test_dind.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,3 @@ def mock_from_env():

if logging_crash:
assert "continuing without subprocess logging" in caplog.text
else:
assert "subprocess exited" in caplog.text
41 changes: 38 additions & 3 deletions bclaw_runner/tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,17 +452,52 @@ def test_check_for_previous_run(monkeypatch, mock_buckets, step_name, expect):
assert result == expect


def test_clear_run_status(monkeypatch, mock_buckets):
monkeypatch.setenv("BC_STEP_NAME", "test_step")
def failing_thing(*args, **kwargs):
raise RuntimeError("failed")


def test_check_for_previous_run_fail(monkeypatch, mock_buckets, caplog):
monkeypatch.setenv("BC_STEP_NAME", "nothing")
repo = Repository("s3://non_bucket/repo/path")
monkeypatch.setattr(repo, "_s3_file_exists", failing_thing)
result = repo.check_for_previous_run()
assert "unable to query previous run status" in caplog.text
assert result == False


@pytest.mark.parametrize("step_name, expect", [
("test_step", True),
("un_step", False),
])
def test_clear_run_status(monkeypatch, mock_buckets, step_name, expect):
monkeypatch.setenv("BC_STEP_NAME", step_name)
repo = Repository(f"s3://{TEST_BUCKET}/repo/path")
assert repo.check_for_previous_run() is True
assert repo.check_for_previous_run() == expect
repo.clear_run_status()
assert repo.check_for_previous_run() is False


def test_clear_run_status_fail(monkeypatch, mock_buckets, caplog):
monkeypatch.setenv("BC_STEP_NAME", "nothing")
repo = Repository("s3://non_bucket/repo/path")
monkeypatch.setattr(repo, "qualify", failing_thing)
repo.clear_run_status()
assert "unable to clear previous run status" in caplog.text


def test_put_run_status(monkeypatch, mock_buckets):
monkeypatch.setenv("BC_STEP_NAME", "test_step_two")
repo = Repository(f"s3://{TEST_BUCKET}/repo/path")
assert repo.check_for_previous_run() is False
repo.put_run_status()
assert repo.check_for_previous_run() is True


def test_put_run_status_fail(monkeypatch, mock_buckets, caplog):
monkeypatch.setenv("BC_STEP_NAME", "nothing")
repo = Repository("s3://non_bucket/repo/path")
monkeypatch.setattr(repo, "qualify", failing_thing)
repo.put_run_status()
assert "failed to upload run status" in caplog.text


9 changes: 9 additions & 0 deletions cloudformation/bc_batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Resources:
- "arn:aws:iam::aws:policy/AmazonEC2FullAccess"
- "arn:aws:iam::aws:policy/AmazonS3FullAccess"
- "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
- "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"

EcsInstanceProfile:
Type: AWS::IAM::InstanceProfile
Expand Down Expand Up @@ -242,6 +243,12 @@ Resources:
SecurityGroupIds: !Ref SecurityGroupIds
Subnets: !Ref Subnets

# 20220909: save for v1.2
# SchedulingPolicy:
# Type: AWS::Batch::SchedulingPolicy
# Properties:
# Name: !Sub "${QueueName}-scheduling-policy"

BatchQueue:
Type: AWS::Batch::JobQueue
Properties:
Expand All @@ -251,6 +258,8 @@ Resources:
ComputeEnvironmentOrder:
- Order: 1
ComputeEnvironment: !Ref ComputeEnvironment
# 20220909: save for v1.2
# SchedulingPolicyArn: !Ref SchedulingPolicy

Outputs:
BatchQueueArn:
Expand Down
42 changes: 42 additions & 0 deletions cloudformation/bc_core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,42 @@ Resources:
LogGroupName: !Sub "/aws/lambda/${SubpipesLambda}"
RetentionInDays: !Ref LogRetentionDays

NamerLambda:
Type: AWS::Serverless::Function
Properties:
Handler: namer.lambda_handler
Runtime: python3.9
CodeUri: lambda/src/sfn_namer
MemorySize: 128
Timeout: 5
Layers:
- !Ref CommonLayer
Policies:
-
Version: "2012-10-17"
Statement:
-
Effect: Allow
Action:
- "states:StartExecution"
Resource: "*"
DeploymentPreference:
Enabled: False

PermissionToInvokeNamer:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref NamerLambda
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceAccount: !Ref AWS::AccountId

NamerLambdaLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${NamerLambda}"
RetentionInDays: !Ref LogRetentionDays

EventHandlerLambda:
Type: AWS::Serverless::Function
Properties:
Expand Down Expand Up @@ -681,6 +717,12 @@ Outputs:
LogRetentionDays:
Value: !Ref LogRetentionDays

NamerLambdaArn:
Value: !GetAtt NamerLambda.Arn

NamerLambdaLogGroupName:
Value: !Ref NamerLambdaLogGroup

OnDemandQueueArn:
Value: !GetAtt OnDemandQueueStack.Outputs.BatchQueueArn

Expand Down
49 changes: 0 additions & 49 deletions cloudformation/bc_installer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,51 +187,6 @@ Resources:
VersioningConfiguration:
Status: Enabled

AdminSNSTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub "${InstallationName}-admin-notifs"
KmsMasterKeyId: "alias/aws/sns"

AdminSNSTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: 2012-10-17
Id: __default_policy_ID
Statement:
-
Effect: Allow
Sid: __default_statement_ID
Principal:
AWS: "*"
Action:
- SNS:AddPermission
- SNS:DeleteTopic
- SNS:GetTopicAttributes
- SNS:ListSubscriptionsByTopic
- SNS:Publish
- SNS:Receive
- SNS:RemovePermission
- SNS:SetTopicAttributes
- SNS:Subscribe
Resource:
!Ref AdminSNSTopic
Condition:
StringEquals:
AWS:SourceOwner:
Ref: AWS::AccountId
-
Effect: Allow
Sid: yada_yada
Principal:
Service:
- codestar-notifications.amazonaws.com
Action: "SNS:Publish"
Resource: !Ref AdminSNSTopic
Topics:
- !Ref AdminSNSTopic

CoreStackCodebuildProject:
Type: AWS::CodeBuild::Project
Properties:
Expand Down Expand Up @@ -414,7 +369,3 @@ Resources:
- "ecr:*"
Effect: Allow
Resource: "*"

Outputs:
AdminSNSTopicName:
Value: !GetAtt AdminSNSTopic.TopicName
Loading

0 comments on commit 829affa

Please sign in to comment.