diff --git a/mlflow/R/mlflow/DESCRIPTION b/mlflow/R/mlflow/DESCRIPTION index fee5a0eaadea6..cc758338b7cc9 100644 --- a/mlflow/R/mlflow/DESCRIPTION +++ b/mlflow/R/mlflow/DESCRIPTION @@ -1,7 +1,7 @@ Type: Package Package: mlflow Title: Interface to 'MLflow' -Version: 1.21.1 +Version: 1.22.0 Authors@R: c(person(given = "Matei", family = "Zaharia", diff --git a/mlflow/java/client/pom.xml b/mlflow/java/client/pom.xml index 077c81d094ea9..122ea05964517 100644 --- a/mlflow/java/client/pom.xml +++ b/mlflow/java/client/pom.xml @@ -5,7 +5,7 @@ org.mlflow mlflow-parent - 1.21.1-SNAPSHOT + 1.22.0 ../pom.xml diff --git a/mlflow/java/pom.xml b/mlflow/java/pom.xml index 63a137ab7fcd6..8dfa24d24f22f 100644 --- a/mlflow/java/pom.xml +++ b/mlflow/java/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.mlflow mlflow-parent - 1.21.1-SNAPSHOT + 1.22.0 pom MLflow Parent POM http://mlflow.org @@ -59,7 +59,7 @@ - 1.21.1-SNAPSHOT + 1.22.0 1.8 1.8 2.11.12 diff --git a/mlflow/java/scoring/pom.xml b/mlflow/java/scoring/pom.xml index d24e4f7482e01..d3f561a0bd852 100644 --- a/mlflow/java/scoring/pom.xml +++ b/mlflow/java/scoring/pom.xml @@ -5,7 +5,7 @@ org.mlflow mlflow-parent - 1.21.1-SNAPSHOT + 1.22.0 ../pom.xml diff --git a/mlflow/java/spark/pom.xml b/mlflow/java/spark/pom.xml index 707e3c02363a0..73791b8be426b 100644 --- a/mlflow/java/spark/pom.xml +++ b/mlflow/java/spark/pom.xml @@ -1,7 +1,7 @@ 4.0.0 mlflow-spark - 1.21.1-SNAPSHOT + 1.22.0 ${project.artifactId} 1.8 @@ -15,7 +15,7 @@ org.mlflow mlflow-parent - 1.21.1-SNAPSHOT + 1.22.0 ../pom.xml diff --git a/mlflow/pyfunc/scoring_server/__init__.py b/mlflow/pyfunc/scoring_server/__init__.py index e323a28fdcb70..c254dbe26ae64 100644 --- a/mlflow/pyfunc/scoring_server/__init__.py +++ b/mlflow/pyfunc/scoring_server/__init__.py @@ -20,6 +20,12 @@ import pandas as pd import sys import traceback +from pydantic import BaseModel +from fastapi import FastAPI, APIRouter, Request, HTTPException, Response, Header, status +from typing import List, Optional, Dict +import uvicorn +import asyncio +import json # NB: We need to be careful what we import form mlflow here. Scoring server is used from within # model's conda environment. The version of mlflow doing the serving (outside) and the version of @@ -65,13 +71,24 @@ CONTENT_TYPE_FORMAT_RECORDS_ORIENTED = "pandas-records" CONTENT_TYPE_FORMAT_SPLIT_ORIENTED = "pandas-split" +CONTENT_TYPE_RAW_JSON = "raw-json" -FORMATS = [CONTENT_TYPE_FORMAT_RECORDS_ORIENTED, CONTENT_TYPE_FORMAT_SPLIT_ORIENTED] +FORMATS = [CONTENT_TYPE_FORMAT_RECORDS_ORIENTED, CONTENT_TYPE_FORMAT_SPLIT_ORIENTED, CONTENT_TYPE_RAW_JSON] PREDICTIONS_WRAPPER_ATTR_NAME_ENV_KEY = "PREDICTIONS_WRAPPER_ATTR_NAME" _logger = logging.getLogger(__name__) +class RequestData(BaseModel): + columns: List[str] = [] + data: list = [] + + def is_valid(self): + return True + + def get_dataframe(self): + df = pd.DataFrame(data = self.data, columns = self.columns) + return df def infer_and_parse_json_input(json_input, schema: Schema = None): """ @@ -205,38 +222,38 @@ def _handle_serving_error(error_message, error_code, include_traceback=True): e = MlflowException(message=error_message, error_code=error_code) reraise(MlflowException, e) - def init(model: PyFuncModel): """ Initialize the server. Loads pyfunc model from the path. """ - app = flask.Flask(__name__) + fast_app = FastAPI(title= __name__, version= "v1") + fast_app.include_router(APIRouter()) input_schema = model.metadata.get_input_schema() - @app.route("/ping", methods=["GET"]) + @fast_app.get("/ping") def ping(): # pylint: disable=unused-variable """ Determine if the container is working and healthy. We declare it healthy if we can load the model successfully. """ - health = model is not None - status = 200 if health else 404 - return flask.Response(response="\n", status=status, mimetype="application/json") + if model is None: + raise HTTPException(status_code=404, detail="Model not loaded properly") + return {"message": "OK"} - @app.route("/invocations", methods=["POST"]) - @catch_mlflow_exception - def transformation(): # pylint: disable=unused-variable + @fast_app.post("/invocations") + def transformation(request_data: RequestData, content_type: Optional[str] = Header(None)): # pylint: disable=unused-variable """ Do an inference on a single batch of data. In this sample server, we take data as CSV or json, convert it to a Pandas DataFrame or Numpy, generate predictions and convert them back to json. """ + # data = _dataframe_from_json(request_data.json()) # Content-Type can include other attributes like CHARSET # Content-type RFC: https://datatracker.ietf.org/doc/html/rfc2045#section-5.1 # TODO: Suport ";" in quoted parameter values - type_parts = flask.request.content_type.split(";") + type_parts = content_type.split(";") type_parts = list(map(str.strip, type_parts)) mime_type = type_parts[0] parameter_value_pairs = type_parts[1:] @@ -247,27 +264,31 @@ def transformation(): # pylint: disable=unused-variable charset = parameter_values.get("charset", "utf-8").lower() if charset != "utf-8": - return flask.Response( - response="The scoring server only supports UTF-8", - status=415, - mimetype="text/plain", + return Response( + content="The scoring server only supports UTF-8", + status_code=415, + media_type="text/plain" ) content_format = parameter_values.get("format") # Convert from CSV to pandas if mime_type == CONTENT_TYPE_CSV and not content_format: - data = flask.request.data.decode("utf-8") + data = request_data.json() csv_input = StringIO(data) data = parse_csv_input(csv_input=csv_input) + elif mime_type == CONTENT_TYPE_JSON and content_format == CONTENT_TYPE_RAW_JSON: + if len(request_data.data) != 0: + data = dict(zip(request_data.columns, request_data.data[0])) + else: + data = {} elif mime_type == CONTENT_TYPE_JSON and not content_format: - json_str = flask.request.data.decode("utf-8") - data = infer_and_parse_json_input(json_str, input_schema) + data = infer_and_parse_json_input(request_data.json(), input_schema) elif ( mime_type == CONTENT_TYPE_JSON and content_format == CONTENT_TYPE_FORMAT_SPLIT_ORIENTED ): data = parse_json_input( - json_input=StringIO(flask.request.data.decode("utf-8")), + json_input=StringIO(request_data.json()), orient="split", schema=input_schema, ) @@ -276,29 +297,25 @@ def transformation(): # pylint: disable=unused-variable and content_format == CONTENT_TYPE_FORMAT_RECORDS_ORIENTED ): data = parse_json_input( - json_input=StringIO(flask.request.data.decode("utf-8")), + json_input=StringIO(request_data.json()), orient="records", schema=input_schema, ) elif mime_type == CONTENT_TYPE_JSON_SPLIT_NUMPY and not content_format: - data = parse_split_oriented_json_input_to_numpy(flask.request.data.decode("utf-8")) + data = parse_split_oriented_json_input_to_numpy(request_data.json()) else: - return flask.Response( - response=( - "This predictor only supports the following content types and formats:" + return Response( + content="This predictor only supports the following content types and formats:" " Types: {supported_content_types}; Formats: {formats}." " Got '{received_content_type}'.".format( supported_content_types=CONTENT_TYPES, formats=FORMATS, - received_content_type=flask.request.content_type, - ) - ), - status=415, - mimetype="text/plain", + received_content_type=content_type, + ), + status_code=415, + media_type="text/plain" ) - # Do the prediction - try: raw_predictions = model.predict(data) except MlflowException as e: @@ -314,11 +331,10 @@ def transformation(): # pylint: disable=unused-variable ), error_code=BAD_REQUEST, ) - result = StringIO() - predictions_to_json(raw_predictions, result) - return flask.Response(response=result.getvalue(), status=200, mimetype="application/json") + predictions = _get_jsonable_obj(raw_predictions, pandas_orient="records") + return predictions - return app + return fast_app def _predict(model_uri, input_path, output_path, content_type, json_format): @@ -342,8 +358,8 @@ def _predict(model_uri, input_path, output_path, content_type, json_format): def _serve(model_uri, port, host): pyfunc_model = load_model(model_uri) - init(pyfunc_model).run(port=port, host=host) - + fast_app = init(pyfunc_model) + uvicorn.run(fast_app, host=host, port=port, log_level="info") def get_cmd( model_uri: str, port: int = None, host: int = None, nworkers: int = None @@ -362,8 +378,7 @@ def get_cmd( args.append(f"-w {nworkers}") command = ( - f"gunicorn {' '.join(args)} ${{GUNICORN_CMD_ARGS}}" - " -- mlflow.pyfunc.scoring_server.wsgi:app" + "gunicorn mlflow.pyfunc.scoring_server.wsgi:app --worker-class uvicorn.workers.UvicornWorker" ) else: args = [] diff --git a/mlflow/version.py b/mlflow/version.py index 5b7e90e25b18b..168af631a57c5 100644 --- a/mlflow/version.py +++ b/mlflow/version.py @@ -2,7 +2,7 @@ import re -VERSION = "1.21.1.dev0" +VERSION = "1.22.0" def is_release_version(): diff --git a/setup.py b/setup.py index 2365b1a8d1878..7720325472cbd 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,8 @@ def package_files(directory): "alembic<=1.4.1", # Required "docker>=4.0.0", + "fastapi", + "uvicorn", "Flask", "gunicorn; platform_system != 'Windows'", "numpy",