Skip to content

Commit

Permalink
vector ingestion example updates (#29)
Browse files Browse the repository at this point in the history
* upgrade vector ingestion example

- upgrade to Flink 1.19.1 and Python 3.11
- simplify config by using single json object with all settings
- add local model inference with embedded sentence transformer
- make model inference configurable (remote vs. local)
- adapt readme

* fix readme and links

* remove conflicting numpy from python-libs folder before zipping
  • Loading branch information
hpgrahsl authored Nov 8, 2024
1 parent fad63a0 commit 4345a7c
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 170 deletions.
95 changes: 67 additions & 28 deletions pyflink-vector-embeddings/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ Connections ttl opn rt1 rt5 p50 p90

Since all locally running services are containerized applications, you need to have have either [Docker](https://www.docker.com/) or [Podman](https://podman-desktop.io/) installed on your system.

> [!IMPORTANT]
> Before running the containers make sure to set your own MongoDB Connection String in the `compose.yaml` (see https://github.com/hpgrahsl/pyflink-vector-ingestion-decodable/blob/main/compose.yaml#L35).
> [!NOTE]
> Optional: The model inference supports two REST endpoint, one for local inference and another to delegate requests to a hosted model on HuggingFace.
For the latter, you'd need an API KEY and set it in the `compose.yaml` (see https://github.com/hpgrahsl/pyflink-vector-ingestion-decodable/blob/main/compose.yaml#L48).
> _OPTIONAL:_ The app for model inference supports two REST endpoints, one for local inference and another to delegate requests to a hosted model on HuggingFace.
For the latter, you'd need an API KEY and set it in the `compose.yaml` [here](./compose.yaml#L63).

Next, switch to a terminal and run the whole demo stack with like so:

Expand Down Expand Up @@ -161,38 +158,68 @@ Find the most important code snippets of the PyFlink job below:
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '{mysql_host}',
'port' = '{mysql_port}',
'username' = '{mysql_user}',
'password' = '{mysql_password}',
'hostname' = '{job_config['mysql_host']}',
'port' = '{job_config['mysql_port']}',
'username' = '{job_config['mysql_user']}',
'password' = '{job_config['mysql_password']}',
'jdbc.properties.maxAllowedPacket' = '16777216',
'server-time-zone' = 'UTC',
'database-name' = 'vector_demo',
'table-name' = 'Review');
""")
```

```python
# define scalar user-defined function
class Embedding(ScalarFunction):
# define scalar user-defined functions
class RemoteEmbedding(ScalarFunction):

def __init__(self, url):
self.modelServerApiUrl = url

def eval(self, text):
response = requests.post(self.modelServerApiUrl,text)
response = requests.post(self.modelServerApiUrl,
data=text.encode('utf-8'),
headers={'Content-type': 'text/plain; charset=utf-8'})
return json.loads(response.text)

class LocalEmbedding(ScalarFunction):

model: None

def __init__(self):
self.model = SentenceTransformer(model_name_or_path='all-mpnet-base-v2',cache_folder='/opt/flink-libs/hf-hub-cache')

def open(self, function_context: FunctionContext):
self.model = SentenceTransformer(model_name_or_path='all-mpnet-base-v2',cache_folder='/opt/flink-libs/hf-hub-cache')

def eval(self, text):
return self.model.encode(text,None,None,32,None,'sentence_embedding','float32',True,False).tolist()

def __getstate__(self):
state = dict(self.__dict__)
del state['model']
return state
```

```python
# register UDF and specify I/O data types
t_env.create_temporary_system_function(
"embedding",
udf(
Embedding(model_server),
input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.DOUBLE())
)
)
# register UDFs and specify I/O data types
t_env.create_temporary_system_function(
"remote_embedding",
udf(
RemoteEmbedding(job_config['model_server']),
input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.DOUBLE())
)
)

t_env.create_temporary_system_function(
"local_embedding",
udf(
LocalEmbedding(),
input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.DOUBLE())
)
)
```

```python
Expand All @@ -214,16 +241,28 @@ t_env.execute_sql(f"""
```

```python
# read from the source table, calculate vector embeddings by means of the UDF, and insert into sink table
t_env.execute_sql("""
# switch between local or remote embedding calculation according to configuration
calc_embedding = "";
if job_config['embedding_mode'] == 'remote':
calc_embedding = 'REMOTE_EMBEDDING(reviewText) AS embedding'
elif job_config['embedding_mode'] == 'local':
calc_embedding = 'LOCAL_EMBEDDING(reviewText) AS embedding'
else:
print(f"error: embedding_mode must be either [local | remote] but was '{job_config['embedding_mode']}'")
sys.exit(-1)

# read from source table, calculate vector embeddings according to chosen mode, and insert into table sink
t_env.execute_sql(
f"""
INSERT INTO review_embeddings
SELECT
id AS _id,
itemId,
reviewText,
EMBEDDING(reviewText) AS embedding
{calc_embedding}
FROM reviews
""")
"""
)
```

#### Build Pyflink Job
Expand All @@ -247,14 +286,14 @@ In the `pyflink-app` folder there is a YAML file `decodable-resources.yaml` whi
> [!IMPORTANT]
> Before you deploy this using the Decodable CLI you have to specify all the necessary secrets matching your setup.
Four secrets must be modified which are all found in `pyflink-app/.secrets` folder:
The secret named `job_config` must be modified which is found in the `pyflink-app/.secrets` folder. Adapt all placeholders in the following fields of the JSON:

* `model_server`: adapt the placeholder with your **ngrok hostname from the HTTPs tunnel** for the model serving app
* `mongodb_uri`: adapt the placeholder with your **ngrok hostname and port for the MongoDB TCP tunnel**
* `mysql_host`: adapt the placeholder with your proper **ngrok hostname from the MySQL TCP tunnel**
* `mysql_port`: adapt the placeholder with your proper **ngrok port from the MySQL TCP tunnel**

Make sure all files are updated accordingly and saved.
Make sure all changes to this file are saved.

From the terminal within the `pyflink-app` folder use the [Decodable CLI](https://docs.decodable.co/cli.html) to deploy the PyFlink job as follows:

Expand All @@ -264,7 +303,7 @@ decodable apply decodable-resources.yaml

The first step is that the CLI will upload and validate the self-contained ZIP archive. Right after that the other Decodable resources (secrets + pipeline) specified in the YAML manifest are created and the pipeline is automatically started.

Once the pipeline is up and running, the MySQL CDC connector performs a so-called initial snapshot of the existing user reviews in the corresponding source database table. The PyFlink job transforms the textual user reviews by means of the custom `EMBEDDING` UDF which calls out to the Model Serving App to retrieve the vector embeddings. The results are written into the collection using the MongoDB connector.
Once the pipeline is up and running, the MySQL CDC connector performs a so-called initial snapshot of the existing user reviews in the corresponding source database table. The PyFlink job transforms the textual user reviews by means of the custom UDF - either the `LOCAL_EMBEDDING` or the `REMOTE_EMBEDDING` function - which is used to calculate the vector embeddings. The results are written into the collection using the MongoDB connector.

### User Reviews Generator

Expand Down
1 change: 1 addition & 0 deletions pyflink-vector-embeddings/pyflink-app/.secrets/job_config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{ "embedding_mode": "remote", "model_server": "https://<your_ngrok_model_server_hostname_here>/local/embedding", "mongodb_uri": "mongodb://demousr:demopwd@<your_ngrok_tcp_mongodb_hostname_and_port_here>/?retryWrites=true&w=majority", "mysql_host": "<your_ngrok_tcp_mysql_hostname_here>", "mysql_password": "123456", "mysql_port": "<your_ngrok_tcp_mysql_port_here>", "mysql_user": "root" }

This file was deleted.

1 change: 0 additions & 1 deletion pyflink-vector-embeddings/pyflink-app/.secrets/mongodb_uri

This file was deleted.

1 change: 0 additions & 1 deletion pyflink-vector-embeddings/pyflink-app/.secrets/mysql_host

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion pyflink-vector-embeddings/pyflink-app/.secrets/mysql_port

This file was deleted.

1 change: 0 additions & 1 deletion pyflink-vector-embeddings/pyflink-app/.secrets/mysql_user

This file was deleted.

23 changes: 0 additions & 23 deletions pyflink-vector-embeddings/pyflink-app/Dockerfile.app

This file was deleted.

14 changes: 0 additions & 14 deletions pyflink-vector-embeddings/pyflink-app/Dockerfile.base

This file was deleted.

2 changes: 1 addition & 1 deletion pyflink-vector-embeddings/pyflink-app/Dockerfile.builder
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 python:3.10.14-slim-bullseye
FROM --platform=linux/arm64 python:3.11.10-slim-bullseye

RUN apt-get update -y && \
apt-get install -y make && \
Expand Down
18 changes: 10 additions & 8 deletions pyflink-vector-embeddings/pyflink-app/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ PY = python3
VENV = $(TARGET)/venv
BIN=$(VENV)/bin

build: $(TARGET) $(LIBS)/mysql-connector-java-8.0.30.jar $(LIBS)/flink-sql-connector-mysql-cdc-3.1.0.jar $(LIBS)/flink-sql-connector-mongodb-1.2.0-1.18.jar $(LIBS)/flink-python-1.18.1.jar $(TARGET)/pyflink-job.zip $(VENV)
build: $(TARGET) $(LIBS)/mysql-connector-java-8.0.30.jar $(LIBS)/flink-sql-connector-mysql-cdc-3.1.0.jar $(LIBS)/flink-sql-connector-mongodb-1.2.0-1.19.jar $(LIBS)/flink-python-1.19.1.jar $(TARGET)/pyflink-job.zip $(VENV)

$(TARGET):
mkdir $(TARGET)

$(VENV): requirements.txt
$(PY) -m venv $(VENV)
$(BIN)/pip install -r requirements.txt --target=${PYTHON_LIBS}
# NOTE: Decodable infra for custom pipelines runs on ARM CPUs by default hence the platform specifier
$(BIN)/pip install -r requirements.txt --platform manylinux2014_aarch64 --only-binary=:all: --target=${PYTHON_LIBS}
touch $(VENV)

$(LIBS)/mysql-connector-java-8.0.30.jar: | $(TARGET)
Expand All @@ -26,17 +27,18 @@ $(LIBS)/flink-sql-connector-mysql-cdc-3.1.0.jar: | $(TARGET)
mkdir -p $(LIBS)
wget -N -P $(LIBS) https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.0/flink-sql-connector-mysql-cdc-3.1.0.jar

$(LIBS)/flink-sql-connector-mongodb-1.2.0-1.18.jar: | $(TARGET)
$(LIBS)/flink-sql-connector-mongodb-1.2.0-1.19.jar: | $(TARGET)
mkdir -p $(LIBS)
wget -N -P $(LIBS) https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-mongodb/1.2.0-1.18/flink-sql-connector-mongodb-1.2.0-1.18.jar
wget -N -P $(LIBS) https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-mongodb/1.2.0-1.19/flink-sql-connector-mongodb-1.2.0-1.19.jar

$(LIBS)/flink-python-1.18.1.jar: | $(TARGET)
$(LIBS)/flink-python-1.19.1.jar: | $(TARGET)
mkdir -p $(LIBS)
wget -N -P $(LIBS) https://repo1.maven.org/maven2/org/apache/flink/flink-python/1.18.1/flink-python-1.18.1.jar
wget -N -P $(LIBS) https://repo1.maven.org/maven2/org/apache/flink/flink-python/1.19.1/flink-python-1.19.1.jar

$(TARGET)/pyflink-job.zip: main.py $(LIBS)/mysql-connector-java-8.0.30.jar $(LIBS)/flink-sql-connector-mysql-cdc-3.1.0.jar $(LIBS)/flink-sql-connector-mongodb-1.2.0-1.18.jar $(LIBS)/flink-python-1.18.1.jar $(VENV)
$(TARGET)/pyflink-job.zip: main.py $(LIBS)/mysql-connector-java-8.0.30.jar $(LIBS)/flink-sql-connector-mysql-cdc-3.1.0.jar $(LIBS)/flink-sql-connector-mongodb-1.2.0-1.19.jar $(LIBS)/flink-python-1.19.1.jar $(VENV)
cp main.py $(TARGET)
cd $(TARGET) && zip -r pyflink-job.zip main.py libs python-libs
# NOTE: workaround due to conflicting numpy dependency -> explicitly remove the corresponding packages from the python-libs folder
cd $(TARGET) && rm -rf python-libs/numpy* && zip -r pyflink-job.zip main.py libs python-libs

clean:
@rm -rf $(TARGET)
Expand Down
53 changes: 6 additions & 47 deletions pyflink-vector-embeddings/pyflink-app/decodable-resources.yaml
Original file line number Diff line number Diff line change
@@ -1,64 +1,23 @@
---
kind: secret
metadata:
name: vector_ingest_mysql_host
spec_version: v1
spec:
value_file: ./.secrets/mysql_host
---
kind: secret
metadata:
name: vector_ingest_mysql_port
spec_version: v1
spec:
value_file: ./.secrets/mysql_port
---
kind: secret
metadata:
name: vector_ingest_mysql_user
spec_version: v1
spec:
value_file: ./.secrets/mysql_user
---
kind: secret
metadata:
name: vector_ingest_mysql_password
spec_version: v1
spec:
value_file: ./.secrets/mysql_password
---
kind: secret
metadata:
name: vector_ingest_mongodb_uri
spec_version: v1
spec:
value_file: ./.secrets/mongodb_uri
---
kind: secret
metadata:
name: vector_ingest_model_server
name: vector_ingest_job_config
spec_version: v1
spec:
value_file: ./.secrets/model_server
value_file: ./.secrets/job_config
---
kind: pipeline
metadata:
name: webinar_vector_ingestion_fallback
name: example-vector-ingestion
spec_version: v2
spec:
type: PYTHON
job_file_path: ./target/pyflink-job.zip
properties:
secrets:
- vector_ingest_mysql_host
- vector_ingest_mysql_port
- vector_ingest_mysql_user
- vector_ingest_mysql_password
- vector_ingest_mongodb_uri
- vector_ingest_model_server
flink_version: 1.18-python310
- vector_ingest_job_config
flink_version: 1.19-python311
additional_metrics: []
execution:
active: true
task_size: M
task_size: L
task_count: 1
Loading

0 comments on commit 4345a7c

Please sign in to comment.