Skip to content

Commit

Permalink
Added logic to validate token and check priority permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
val500 committed Sep 30, 2024
1 parent e27259c commit 6719eb5
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 4 deletions.
3 changes: 3 additions & 0 deletions docs/reference/job-schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ The following table lists the key elements that a job definition file should con
- string
- /
- | (Optional) URL to send job status updates to. These updates originate from the agent and get posted to the server which then posts the update to the webhook. If no webhook is specified, these updates will not be generated.
* - ``job_priority``
- integer
- | (Optional) Integer specifying how much priority this job has. Jobs with higher job_priority will be selected by agents before other jobs. Specifying a job priority requires authorization in the form of a JWT obtained by sending a GET request to /v1/authenticate/<client-id> with a valid client_id and client-key.

Example jobs in YAML
----------------------------
Expand Down
1 change: 1 addition & 0 deletions server/src/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class Job(Schema):
allocate_data = fields.Dict(required=False)
reserve_data = fields.Dict(required=False)
job_status_webhook = fields.String(required=False)
job_priority = fields.Integer(required=False)


class JobId(Schema):
Expand Down
56 changes: 52 additions & 4 deletions server/src/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import os
import uuid
from datetime import datetime, timezone, timedelta

import pkg_resources
from apiflask import APIBlueprint, abort
from flask import jsonify, request, send_file
Expand Down Expand Up @@ -75,9 +74,9 @@ def job_post(json_data: dict):
job_queue = ""
if not job_queue:
abort(422, message="Invalid data or no job_queue specified")

auth_token = request.headers.get("Authorization")
try:
job = job_builder(json_data)
job = job_builder(json_data, auth_token)
except ValueError:
abort(400, message="Invalid job_id specified")

Expand All @@ -101,7 +100,33 @@ def has_attachments(data: dict) -> bool:
)


def job_builder(data):
def check_token_permission(
auth_token: str, secret_key: str, priority: int, queue: str
) -> bool:
"""
Validates token received from client and checks if it can
push a job to the queue with the requested priority
"""
if auth_token is None:
abort(401, "No authentication token specified")
else:
try:
decoded_jwt = jwt.decode(
auth_token,
secret_key,
algorithms="HS256",
options={"require": ["exp", "iat", "sub", "max_priority"]},
)
except jwt.exceptions.ExpiredSignatureError:
abort(403, "Token has expired")
except jwt.exceptions.InvalidTokenError:
abort(403, "Invalid Token")

max_priority = decoded_jwt["max_priority"].get(queue, 0)
return max_priority >= priority


def job_builder(data: dict, auth_token: str):
"""Build a job from a dictionary of data"""
job = {
"created_at": datetime.now(timezone.utc),
Expand All @@ -124,6 +149,27 @@ def job_builder(data):
if has_attachments(data):
data["attachments_status"] = "waiting"

if "job_priority" in data:
priority_level = data["job_priority"]
job_queue = data["job_queue"]
allowed = check_token_permission(
auth_token,
SECRET_KEY,
priority_level,
job_queue,
)
if not allowed:
abort(
403,
(
f"Not enough permissions to push to {job_queue}",
f"with priority {priority_level}",
),
)
job["job_priority"] = priority_level
data.pop("job_priority")
else:
job["job_priority"] = 0
job["job_id"] = job_id
job["job_data"] = data
return job
Expand Down Expand Up @@ -676,6 +722,8 @@ def validate_client_key_pair(client_id: str, client_key: str):
"""
Checks client_id and key pair for validity and returns their permissions
"""
if client_key is None:
return None
client_key_bytes = client_key.encode("utf-8")
client_permissions_entry = database.mongo.db.client_permissions.find_one(
{
Expand Down
41 changes: 41 additions & 0 deletions server/tests/test_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,44 @@ def test_authenticate_invalid_client_key(mongo_app_with_permissions):
headers=create_auth_header(client_id, client_key),
)
assert output.status_code == 401


def test_job_with_priority(mongo_app_with_permissions):
"""Tests submission of priority job with valid token"""
app, _, client_id, client_key, _ = mongo_app_with_permissions
v1.SECRET_KEY = "my_secret_key"
authenticate_output = app.post(
"/v1/oauth2/token",
headers=create_auth_header(client_id, client_key),
)
token = authenticate_output.data.decode("utf-8")
job = {"job_queue": "myqueue2", "job_priority": 200}
job_response = app.post(
"/v1/job", json=job, headers={"Authorization": token}
)
assert 200 == job_response.status_code


def test_priority_no_token(mongo_app_with_permissions):
"""Tests rejection of priority job with no token"""
app, _, _, _, _ = mongo_app_with_permissions
v1.SECRET_KEY = "my_secret_key"
job = {"job_queue": "myqueue2", "job_priority": 200}
job_response = app.post("/v1/job", json=job)
assert 401 == job_response.status_code


def test_priority_invalid_queue(mongo_app_with_permissions):
"""Tests rejection of priority job with invalid queue"""
app, _, client_id, client_key, _ = mongo_app_with_permissions
v1.SECRET_KEY = "my_secret_key"
authenticate_output = app.post(
"/v1/oauth2/token",
headers=create_auth_header(client_id, client_key),
)
token = authenticate_output.data.decode("utf-8")
job = {"job_queue": "myinvalidqueue", "job_priority": 200}
job_response = app.post(
"/v1/job", json=job, headers={"Authorization": token}
)
assert 403 == job_response.status_code

0 comments on commit 6719eb5

Please sign in to comment.