From adcb0aea67eef3e567252993f48adc23a3d0f37e Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:20:27 +0800 Subject: [PATCH 01/15] add workflows (cherry picked from commit 7c54a178544cddae151269db67040b4cecc6eb75) --- .github/workflows/checkstyle.yaml | 36 ++++++++++++++++++++++++++++++ .github/workflows/license-eyes.yml | 36 ++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 .github/workflows/checkstyle.yaml create mode 100644 .github/workflows/license-eyes.yml diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml new file mode 100644 index 0000000..4ba2390 --- /dev/null +++ b/.github/workflows/checkstyle.yaml @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: License Check +on: + pull_request: + push: + branches: + - main +jobs: + license-check: + name: "License Check" + runs-on: ubuntu-latest + steps: + - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + uses: actions/checkout@v2 + - name: Check License + uses: apache/skywalking-eyes@v0.2.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml new file mode 100644 index 0000000..23dbd5d --- /dev/null +++ b/.github/workflows/license-eyes.yml @@ -0,0 +1,36 @@ +# +#Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: License Check +on: + pull_request: + push: + branches: + - main +jobs: + license-check: + name: "License Check" + runs-on: ubuntu-latest + steps: + - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + uses: actions/checkout@v2 + - name: Check License + uses: apache/skywalking-eyes@v0.2.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file From cae1902568e122b75bf15b78959982913b196c76 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:22:37 +0800 Subject: [PATCH 02/15] modify ubuntu version (cherry picked from commit b3f0525f8d06eefa2cd10a44f6174655c59af16e) --- .github/workflows/checkstyle.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index 4ba2390..94f54b1 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -26,7 +26,7 @@ on: jobs: license-check: name: "License Check" - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" uses: actions/checkout@v2 From 484e2f6cc82ec36f3451bfbe90e01f64294a6cc7 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:30:07 +0800 Subject: [PATCH 03/15] test (cherry picked from commit 12eede031d4411cc26f426c50a8eec304d81204e) --- .github/workflows/checkstyle.yaml | 33 ++++++++++++++++++------------ .github/workflows/license-eyes.yml | 2 +- style/spotless-formatter.xml | 17 +++++++++++++++ 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index 94f54b1..b9cddb2 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -17,20 +16,28 @@ # under the License. # --- -name: License Check +name: Code Style Checker + on: pull_request: - push: - branches: - - main + jobs: - license-check: - name: "License Check" + java-checkstyle: + name: "CheckStyle" runs-on: ubuntu-24.04 steps: - - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@v2 - - name: Check License - uses: apache/skywalking-eyes@v0.2.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + - name: Checkout + uses: actions/checkout@v3 + with: + persist-credentials: false + submodules: recursive + + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' + + - name: Run java checkstyle + run: + cd flink-doris-redis && mvn clean compile checkstyle:checkstyle \ No newline at end of file diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index 23dbd5d..5cc6b4f 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -26,7 +26,7 @@ on: jobs: license-check: name: "License Check" - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" uses: actions/checkout@v2 diff --git a/style/spotless-formatter.xml b/style/spotless-formatter.xml index ae2631c..d730b94 100644 --- a/style/spotless-formatter.xml +++ b/style/spotless-formatter.xml @@ -1,5 +1,22 @@ + From bc8e9dd173617918221c5a313a4d09e87af23369 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:44:14 +0800 Subject: [PATCH 04/15] format code --- .github/workflows/license-eyes.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index 5cc6b4f..5339680 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -22,7 +22,7 @@ on: pull_request: push: branches: - - main + - dev jobs: license-check: name: "License Check" From 715ce2bb59c8bcd94db80f71736a27a20a01f467 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:47:57 +0800 Subject: [PATCH 05/15] test checkstyle --- .../flink/streaming/connectors/redis/table/SQLExpireTest.java | 4 ++-- .../streaming/connectors/redis/table/SQLLettuceLimitTest.java | 2 +- .../connectors/redis/table/base/TestRedisConfigBase.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java index 3688b79..1b19529 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java @@ -44,7 +44,7 @@ public void testSinkValueWithExpire() throws Exception { String sink = "create table sink_redis(name varchar, level varchar, age varchar) with ( " - + sigleWith() + + singleWith() + "'ttl'='10', '" + REDIS_COMMAND + "'='" @@ -74,7 +74,7 @@ public void testSinkValueWithExpireOnKeyPresent() throws Exception { String dim = "create table sink_redis(name varchar, level varchar, age varchar) with ( " - + sigleWith() + + singleWith() + " 'ttl'='8', 'ttl.key.not.absent'='true', '" + REDIS_COMMAND + "'='" diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java index 9c22adc..0495600 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java @@ -44,7 +44,7 @@ public void testSinkLimitLettucePool() throws Exception { String sink = "create table sink_redis(name varchar, level varchar, age varchar) with ( " - + sigleWith() + + singleWith() + "'ttl'='10', '" + REDIS_COMMAND + "'='" diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java index 88f0ff5..df12fe3 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java @@ -63,7 +63,7 @@ public static void stopSingle() { redisClient.shutdown(); } - protected String sigleWith() { + protected String singleWith() { return "'connector'='redis', " + "'host'='" + REDIS_HOST From 53bf6965130cf9174d2e723db1a2431ea2f33172 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:50:52 +0800 Subject: [PATCH 06/15] code format checkstyle test --- .github/workflows/checkstyle.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index b9cddb2..eadfa0c 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -20,6 +20,9 @@ name: Code Style Checker on: pull_request: + push: + branches: + - dev jobs: java-checkstyle: From 3b60ec5d575fe55fbd133a538862ccc7091518c5 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:53:15 +0800 Subject: [PATCH 07/15] code format checkstyle test --- .github/workflows/checkstyle.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index eadfa0c..f251d7c 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -43,4 +43,5 @@ jobs: - name: Run java checkstyle run: + pwd cd flink-doris-redis && mvn clean compile checkstyle:checkstyle \ No newline at end of file From 5a1ba7f87ad21a939e2ba81984ccf1417860306a Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 19:55:03 +0800 Subject: [PATCH 08/15] code format checkstyle test --- .github/workflows/checkstyle.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index f251d7c..eadfa0c 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -43,5 +43,4 @@ jobs: - name: Run java checkstyle run: - pwd cd flink-doris-redis && mvn clean compile checkstyle:checkstyle \ No newline at end of file From ba3ce1a9e906b14bf35beb90178f83a0631121de Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:02:28 +0800 Subject: [PATCH 09/15] code format checkstyle test --- .github/workflows/checkstyle.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index eadfa0c..c85151f 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -43,4 +43,5 @@ jobs: - name: Run java checkstyle run: - cd flink-doris-redis && mvn clean compile checkstyle:checkstyle \ No newline at end of file + cd flink-doris-redis + mvn clean compile checkstyle:checkstyle \ No newline at end of file From e3c74889eedb5fc63509adb7cff785b95abd25b1 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:04:03 +0800 Subject: [PATCH 10/15] code format checkstyle test --- .github/workflows/checkstyle.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml index c85151f..706f0d5 100644 --- a/.github/workflows/checkstyle.yaml +++ b/.github/workflows/checkstyle.yaml @@ -43,5 +43,4 @@ jobs: - name: Run java checkstyle run: - cd flink-doris-redis mvn clean compile checkstyle:checkstyle \ No newline at end of file From 0a6e89a4a8ea08560e3e21098eeb690cfbd5034d Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:17:03 +0800 Subject: [PATCH 11/15] add more test workflow --- .github/workflows/approve-label-trigger.yml | 28 +++++++++ .github/workflows/approve-label.yml | 67 +++++++++++++++++++++ .github/workflows/build-redis-connector.yml | 44 ++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 .github/workflows/approve-label-trigger.yml create mode 100644 .github/workflows/approve-label.yml create mode 100644 .github/workflows/build-redis-connector.yml diff --git a/.github/workflows/approve-label-trigger.yml b/.github/workflows/approve-label-trigger.yml new file mode 100644 index 0000000..fab55d7 --- /dev/null +++ b/.github/workflows/approve-label-trigger.yml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Label when reviewed +on: pull_request_review +jobs: + + label-when-reviewed: + name: "Label PRs when reviewed" + runs-on: ubuntu-24.04 + steps: + - name: "Do nothing. Only trigger corresponding workflow_run event" + run: echo \ No newline at end of file diff --git a/.github/workflows/approve-label.yml b/.github/workflows/approve-label.yml new file mode 100644 index 0000000..cd82837 --- /dev/null +++ b/.github/workflows/approve-label.yml @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Label when approved workflow run +on: + workflow_run: + workflows: ["Label when reviewed"] + types: ['requested'] +permissions: + # All other permissions are set to none + checks: write + contents: read + pull-requests: write + +jobs: + + label-when-approved: + name: "Label when approved" + runs-on: ubuntu-24.04 + outputs: + isApprovedByCommiters: ${{ steps.label-when-approved-by-commiters.outputs.isApproved }} + isApprovedByAnyone: ${{ steps.label-when-approved-by-anyone.outputs.isApproved }} + steps: + - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + uses: actions/checkout@v2 + with: + persist-credentials: false + submodules: recursive + - name: "Get information about the original trigger of the run" + uses: ./.github/actions/get-workflow-origin + id: source-run-info + with: + token: ${{ secrets.GITHUB_TOKEN }} + sourceRunId: ${{ github.event.workflow_run.id }} + - name: Label when approved by commiters + uses: ./.github/actions/label-when-approved-action + id: label-when-approved-by-commiters + with: + token: ${{ secrets.GITHUB_TOKEN }} + label: 'approved' + require_committers_approval: 'true' + remove_label_when_approval_missing: 'true' + pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }} + comment: 'PR approved by at least one committer and no changes requested.' + - name: Label when approved by anyone + uses: ./.github/actions/label-when-approved-action + id: label-when-approved-by-anyone + with: + token: ${{ secrets.GITHUB_TOKEN }} + label: 'reviewed' + pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }} + comment: 'PR approved by anyone and no changes requested.' \ No newline at end of file diff --git a/.github/workflows/build-redis-connector.yml b/.github/workflows/build-redis-connector.yml new file mode 100644 index 0000000..49d62e3 --- /dev/null +++ b/.github/workflows/build-redis-connector.yml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Build Connector +on: + pull_request: + push: + +jobs: + build-extension: + name: "Build Connector" + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout + uses: actions/checkout@master + + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' + + - name: Build flink connector 1.15 + run: | + mvn clean package \ + -Dflink.version=1.15.0 \ No newline at end of file From 9dc34e7d08a551a229878fc068b16612059d4d42 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:18:58 +0800 Subject: [PATCH 12/15] skip unit test --- .github/workflows/build-redis-connector.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-redis-connector.yml b/.github/workflows/build-redis-connector.yml index 49d62e3..deeddaa 100644 --- a/.github/workflows/build-redis-connector.yml +++ b/.github/workflows/build-redis-connector.yml @@ -40,5 +40,5 @@ jobs: - name: Build flink connector 1.15 run: | - mvn clean package \ + mvn clean package -DskipTests \ -Dflink.version=1.15.0 \ No newline at end of file From c152bfa6378a2205faab7c0c54540f3f82c19b83 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:21:25 +0800 Subject: [PATCH 13/15] rename workflow name --- .github/workflows/build-redis-connector.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-redis-connector.yml b/.github/workflows/build-redis-connector.yml index deeddaa..c0da65f 100644 --- a/.github/workflows/build-redis-connector.yml +++ b/.github/workflows/build-redis-connector.yml @@ -16,7 +16,7 @@ # under the License. # --- -name: Build Connector +name: Build Redis Connector on: pull_request: push: From 1412fea127af577115fb96ba1086c8974bb52b59 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 2 Jan 2025 20:38:04 +0800 Subject: [PATCH 14/15] delete approve workflow --- .github/workflows/approve-label-trigger.yml | 28 --------- .github/workflows/approve-label.yml | 67 --------------------- 2 files changed, 95 deletions(-) delete mode 100644 .github/workflows/approve-label-trigger.yml delete mode 100644 .github/workflows/approve-label.yml diff --git a/.github/workflows/approve-label-trigger.yml b/.github/workflows/approve-label-trigger.yml deleted file mode 100644 index fab55d7..0000000 --- a/.github/workflows/approve-label-trigger.yml +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---- -name: Label when reviewed -on: pull_request_review -jobs: - - label-when-reviewed: - name: "Label PRs when reviewed" - runs-on: ubuntu-24.04 - steps: - - name: "Do nothing. Only trigger corresponding workflow_run event" - run: echo \ No newline at end of file diff --git a/.github/workflows/approve-label.yml b/.github/workflows/approve-label.yml deleted file mode 100644 index cd82837..0000000 --- a/.github/workflows/approve-label.yml +++ /dev/null @@ -1,67 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---- -name: Label when approved workflow run -on: - workflow_run: - workflows: ["Label when reviewed"] - types: ['requested'] -permissions: - # All other permissions are set to none - checks: write - contents: read - pull-requests: write - -jobs: - - label-when-approved: - name: "Label when approved" - runs-on: ubuntu-24.04 - outputs: - isApprovedByCommiters: ${{ steps.label-when-approved-by-commiters.outputs.isApproved }} - isApprovedByAnyone: ${{ steps.label-when-approved-by-anyone.outputs.isApproved }} - steps: - - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@v2 - with: - persist-credentials: false - submodules: recursive - - name: "Get information about the original trigger of the run" - uses: ./.github/actions/get-workflow-origin - id: source-run-info - with: - token: ${{ secrets.GITHUB_TOKEN }} - sourceRunId: ${{ github.event.workflow_run.id }} - - name: Label when approved by commiters - uses: ./.github/actions/label-when-approved-action - id: label-when-approved-by-commiters - with: - token: ${{ secrets.GITHUB_TOKEN }} - label: 'approved' - require_committers_approval: 'true' - remove_label_when_approval_missing: 'true' - pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }} - comment: 'PR approved by at least one committer and no changes requested.' - - name: Label when approved by anyone - uses: ./.github/actions/label-when-approved-action - id: label-when-approved-by-anyone - with: - token: ${{ secrets.GITHUB_TOKEN }} - label: 'reviewed' - pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }} - comment: 'PR approved by anyone and no changes requested.' \ No newline at end of file From 73ec00266212e400def3c93d13e11f1d4dd102ba Mon Sep 17 00:00:00 2001 From: jeff-zou Date: Tue, 28 Jan 2025 12:00:37 +0800 Subject: [PATCH 15/15] add audit log --- README-en.md | 3 +- README.md | 3 +- .../command/RedisCommandDescription.java | 18 ++- .../connectors/redis/config/RedisOptions.java | 44 +---- .../redis/mapper/RowRedisSinkMapper.java | 13 +- .../redis/table/RedisDynamicTableFactory.java | 1 + .../redis/table/RedisSinkFunction.java | 117 +++++++------- .../connectors/redis/table/SQLInsertTest.java | 150 +++++++++--------- 8 files changed, 174 insertions(+), 175 deletions(-) diff --git a/README-en.md b/README-en.md index 257ff41..ef6824c 100644 --- a/README-en.md +++ b/README-en.md @@ -112,7 +112,8 @@ key: name, field:subject, value: name\01subject\01score. | scan.range.start | (none) | Integer | lrange start | | scan.range.stop | (none) | Integer | lrange start | | scan.count | (none) | Integer | srandmember count | -| zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK | +| zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK | +| audit.log | false | Boolean | Turn on the audit log switch | ##### sink with ttl parameters diff --git a/README.md b/README.md index ed30a9e..f18257d 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,8 @@ on j.name = 'test' | scan.range.start | (none) | Integer | 查询list结构时指定lrange start | | scan.range.stop | (none) | Integer | 查询list结构时指定lrange start | | scan.count | (none) | Integer | 查询set结构时指定srandmember count | -| zset.zremrangeby | (none) | String | 执行zadd之后,是否执行zremrangeby,取值:SCORE、LEX、RANK | +| zset.zremrangeby | (none) | String | 执行zadd之后,是否执行zremrangeby,取值:SCORE、LEX、RANK | +| audit.log | false | Boolean | 打开sink日志 | ### 3.1.1 command值与redis命令对应关系: diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/command/RedisCommandDescription.java b/src/main/java/org/apache/flink/streaming/connectors/redis/command/RedisCommandDescription.java index 3c0918b..dc7d71c 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/command/RedisCommandDescription.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/command/RedisCommandDescription.java @@ -26,25 +26,29 @@ public class RedisCommandDescription extends RedisCommandBaseDescription impleme private static final long serialVersionUID = 1L; - private Integer ttl; + private final Integer ttl; - private Boolean setIfAbsent; + private final Boolean setIfAbsent; - private LocalTime expireTime; + private final LocalTime expireTime; - private boolean ttlKeyNotAbsent; + private final Boolean ttlKeyNotAbsent; + + private final boolean auditLog; public RedisCommandDescription( RedisCommand redisCommand, Integer ttl, LocalTime expireTime, Boolean setIfAbsent, - Boolean ttlKeyNotAbsent) { + Boolean ttlKeyNotAbsent, + Boolean auditLog) { super(redisCommand); this.expireTime = expireTime; this.ttl = ttl; this.setIfAbsent = setIfAbsent; this.ttlKeyNotAbsent = ttlKeyNotAbsent; + this.auditLog = auditLog; } public Integer getTTL() { @@ -62,4 +66,8 @@ public Boolean getSetIfAbsent() { public boolean getTtlKeyNotAbsent() { return ttlKeyNotAbsent; } + + public boolean isAuditLog() { + return auditLog; + } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisOptions.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisOptions.java index 78b9aee..3e8f953 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisOptions.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisOptions.java @@ -24,202 +24,167 @@ /** Created by jeff.zou on 2020/9/10. */ public class RedisOptions { - private RedisOptions() { - } - public static final ConfigOption TIMEOUT = ConfigOptions.key("timeout") .intType() .defaultValue(2000) .withDescription("Optional timeout for connect to redis"); - public static final ConfigOption MAXTOTAL = ConfigOptions.key("maxTotal") .intType() .defaultValue(2) .withDescription("Optional maxTotal for connect to redis"); - public static final ConfigOption MAXIDLE = ConfigOptions.key("maxIdle") .intType() .defaultValue(2) .withDescription("Optional maxIdle for connect to redis"); - public static final ConfigOption MINIDLE = ConfigOptions.key("minIdle") .intType() .defaultValue(1) .withDescription("Optional minIdle for connect to redis"); - public static final ConfigOption PASSWORD = ConfigOptions.key("password") .stringType() .noDefaultValue() .withDescription("Optional password for connect to redis"); - public static final ConfigOption PORT = ConfigOptions.key("port") .intType() .defaultValue(6379) .withDescription("Optional port for connect to redis"); - public static final ConfigOption HOST = ConfigOptions.key("host") .stringType() .noDefaultValue() .withDescription("Optional host for connect to redis"); - public static final ConfigOption CLUSTERNODES = ConfigOptions.key("cluster-nodes") .stringType() .noDefaultValue() .withDescription("Optional nodes for connect to redis cluster"); - public static final ConfigOption DATABASE = ConfigOptions.key("database") .intType() .defaultValue(0) .withDescription("Optional database for connect to redis"); - public static final ConfigOption COMMAND = ConfigOptions.key("command") .stringType() .noDefaultValue() .withDescription("Optional command for connect to redis"); - public static final ConfigOption REDISMODE = ConfigOptions.key("redis-mode") .stringType() .noDefaultValue() .withDescription("Optional redis-mode for connect to redis"); - public static final ConfigOption REDIS_MASTER_NAME = ConfigOptions.key("master.name") .stringType() .noDefaultValue() .withDescription("Optional master.name for connect to redis sentinels"); - public static final ConfigOption SENTINELS_INFO = ConfigOptions.key("sentinels.info") .stringType() .noDefaultValue() .withDescription("Optional sentinels.info for connect to redis sentinels"); - public static final ConfigOption SENTINELS_PASSWORD = ConfigOptions.key("sentinels.password") .stringType() .noDefaultValue() .withDescription("Optional sentinels.password for connect to redis sentinels"); - public static final ConfigOption TTL = ConfigOptions.key("ttl") .intType() .noDefaultValue() .withDescription("Optional ttl for insert to redis"); - public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows") .longType() .defaultValue(-1L) .withDescription("Optional max rows of cache for query redis"); - public static final ConfigOption LOOKUP_CHCHE_TTL = ConfigOptions.key("lookup.cache.ttl") .longType() .defaultValue(-1L) .withDescription("Optional ttl of cache for query redis"); - public static final ConfigOption LOOKUP_CACHE_LOAD_ALL = ConfigOptions.key("lookup.cache.load-all") .booleanType() .defaultValue(false) .withDescription("Optional if load all elements into cache for query"); - public static final ConfigOption MAX_RETRIES = ConfigOptions.key("max.retries") .intType() .defaultValue(1) .withDescription("Optional max retries of cache sink"); - public static final ConfigOption SINK_PARALLELISM = ConfigOptions.key("sink.parallelism") .intType() .defaultValue(1) .withDescription("Optional parrallelism for sink"); - public static final ConfigOption SINK_LIMIT = ConfigOptions.key("sink.limit") .booleanType() .defaultValue(false) .withDescription("Optional if open the limit for sink "); - public static final ConfigOption SINK_LIMIT_MAX_NUM = ConfigOptions.key("sink.limit.max-num") .intType() .defaultValue(10000) .withDescription("Optional the max num of writes for limited sink"); - public static final ConfigOption SINK_LIMIT_INTERVAL = ConfigOptions.key("sink.limit.interval") .longType() .defaultValue(100L) .withDescription( "Optional the millisecond interval between each write for limited sink"); - public static final ConfigOption SINK_LIMIT_MAX_ONLINE = ConfigOptions.key("sink.limit.max-online") .longType() .defaultValue(30 * 60 * 1000L) .withDescription("Optional the max online milliseconds for limited sink"); - public static final ConfigOption VALUE_DATA_STRUCTURE = ConfigOptions.key("value.data.structure") .enumType(RedisValueDataStructure.class) .defaultValue(RedisValueDataStructure.column) .withDescription("Optional redis value data structure."); - public static final ConfigOption EXPIRE_ON_TIME = ConfigOptions.key("ttl.on.time") .stringType() .noDefaultValue() .withDescription("Optional redis key expire on time, eg: 10:00 12:12:01"); - public static final ConfigOption SET_IF_ABSENT = ConfigOptions.key("set.if.absent") .booleanType() .defaultValue(false) .withDescription("Optional setIfAbsent for insert(set/hset) to redis"); - public static final ConfigOption TTL_KEY_NOT_ABSENT = ConfigOptions.key("ttl.key.not.absent") .booleanType() .defaultValue(false) .withDescription("Optional set ttl when key not absent"); - public static final ConfigOption NETTY_IO_POOL_SIZE = ConfigOptions.key("io.pool.size") .intType() .defaultValue(null) .withDescription("Optional set io pool size for netty of lettuce"); - public static final ConfigOption NETTY_EVENT_POOL_SIZE = ConfigOptions.key("event.pool.size") .intType() .defaultValue(null) .withDescription("Optional set event pool size for netty of lettuce"); - public static final ConfigOption SCAN_KEY = ConfigOptions.key("scan.key") .stringType() .defaultValue(null) .withDescription("Optional set key for query"); - public static final ConfigOption SCAN_ADDITION_KEY = ConfigOptions.key("scan.addition.key") .stringType() .defaultValue(null) .withDescription("Optional set addition key for query"); - public static final ConfigOption SCAN_RANGE_START = ConfigOptions.key("scan.range.start") .intType() @@ -230,16 +195,21 @@ private RedisOptions() { .intType() .defaultValue(null) .withDescription("Optional set range stop for lrange query"); - public static final ConfigOption SCAN_COUNT = ConfigOptions.key("scan.count") .intType() .defaultValue(null) .withDescription("Optional set count for srandmember query"); - public static final ConfigOption ZREM_RANGEBY = ConfigOptions.key("zset.zremrangeby") .stringType() .defaultValue(null) .withDescription("Remove related elements,Valid values: LEX,RANK,SCORE"); + public static final ConfigOption AUDIT_LOG = + ConfigOptions.key("audit.log") + .booleanType() + .defaultValue(false) + .withDescription("Optional turn on the audit log switch."); + + private RedisOptions() {} } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/mapper/RowRedisSinkMapper.java b/src/main/java/org/apache/flink/streaming/connectors/redis/mapper/RowRedisSinkMapper.java index 45f0640..fdfcc31 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/mapper/RowRedisSinkMapper.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/mapper/RowRedisSinkMapper.java @@ -33,21 +33,24 @@ /** base row redis mapper implement. */ public class RowRedisSinkMapper implements RedisSinkMapper { - private Integer ttl; + private final Integer ttl; private LocalTime expireTime; - private RedisCommand redisCommand; + private final RedisCommand redisCommand; - private Boolean setIfAbsent; + private final Boolean setIfAbsent; - private Boolean ttlKeyNotAbsent; + private final Boolean ttlKeyNotAbsent; + + private final Boolean auditLog; public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) { this.redisCommand = redisCommand; this.ttl = config.get(RedisOptions.TTL); this.setIfAbsent = config.get(RedisOptions.SET_IF_ABSENT); this.ttlKeyNotAbsent = config.get(RedisOptions.TTL_KEY_NOT_ABSENT); + this.auditLog = config.get(RedisOptions.AUDIT_LOG); String expireOnTime = config.get(RedisOptions.EXPIRE_ON_TIME); if (!StringUtils.isNullOrWhitespaceOnly(expireOnTime)) { this.expireTime = LocalTime.parse(expireOnTime); @@ -57,7 +60,7 @@ public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription( - redisCommand, ttl, expireTime, setIfAbsent, ttlKeyNotAbsent); + redisCommand, ttl, expireTime, setIfAbsent, ttlKeyNotAbsent, auditLog); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java index 8067b51..fbf9031 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java @@ -117,6 +117,7 @@ public Set> optionalOptions() { options.add(RedisOptions.SCAN_RANGE_START); options.add(RedisOptions.SCAN_COUNT); options.add(RedisOptions.ZREM_RANGEBY); + options.add(RedisOptions.AUDIT_LOG); return options; } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java index f1044e1..349dbea 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java @@ -67,6 +67,7 @@ public class RedisSinkFunction extends RichSinkFunction { private final List columnDataTypes; private final RedisValueDataStructure redisValueDataStructure; private final String zremrangeby; + private final boolean auditLog; protected Integer ttl; protected int expireTimeSeconds = -1; private transient RedisCommandsContainer redisCommandsContainer; @@ -98,6 +99,7 @@ public RedisSinkFunction( this.ttl = redisCommandDescription.getTTL(); this.ttlKeyNotAbsent = redisCommandDescription.getTtlKeyNotAbsent(); this.setIfAbsent = redisCommandDescription.getSetIfAbsent(); + this.auditLog = redisCommandDescription.isAuditLog(); if (redisCommandDescription.getExpireTime() != null) { this.expireTimeSeconds = redisCommandDescription.getExpireTime().toSecondOfDay(); } @@ -109,8 +111,8 @@ public RedisSinkFunction( /** * Called when new data arrives to the sink, and forwards it to Redis channel. Depending on the - * specified Redis data type, a different Redis command will be - * applied. Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, HSET, ZADD. + * specified Redis data type, a different Redis command will be applied. Available commands are + * RPUSH, LPUSH, SADD, PUBLISH, SET, SETEX, PFADD, HSET, ZADD. * * @param input The incoming data */ @@ -135,6 +137,9 @@ public void invoke(IN input, Context context) throws Exception { } startSink(params, kind); + if (auditLog) { + LOG.info("{}", rowData); + } } /** @@ -187,19 +192,20 @@ private RedisFuture sink(String[] params) { case SADD: redisFuture = this.redisCommandsContainer.sadd(params[0], params[1]); break; - case SET: { - if (!this.setIfAbsent) { - redisFuture = this.redisCommandsContainer.set(params[0], params[1]); - } else { - redisFuture = this.redisCommandsContainer.exists(params[0]); - redisFuture.whenComplete( - (existsVal, throwable) -> { - if ((int) existsVal == 0) { - this.redisCommandsContainer.set(params[0], params[1]); - } - }); + case SET: + { + if (!this.setIfAbsent) { + redisFuture = this.redisCommandsContainer.set(params[0], params[1]); + } else { + redisFuture = this.redisCommandsContainer.exists(params[0]); + redisFuture.whenComplete( + (existsVal, throwable) -> { + if ((int) existsVal == 0) { + this.redisCommandsContainer.set(params[0], params[1]); + } + }); + } } - } break; case PFADD: redisFuture = this.redisCommandsContainer.pfadd(params[0], params[1]); @@ -251,46 +257,48 @@ private RedisFuture sink(String[] params) { case SREM: redisFuture = this.redisCommandsContainer.srem(params[0], params[1]); break; - case HSET: { - if (!this.setIfAbsent) { - redisFuture = - this.redisCommandsContainer.hset(params[0], params[1], params[2]); - } else { - redisFuture = this.redisCommandsContainer.hexists(params[0], params[1]); - redisFuture.whenComplete( - (exist, throwable) -> { - if (!(Boolean) exist) { - this.redisCommandsContainer.hset( - params[0], params[1], params[2]); - } - }); + case HSET: + { + if (!this.setIfAbsent) { + redisFuture = + this.redisCommandsContainer.hset(params[0], params[1], params[2]); + } else { + redisFuture = this.redisCommandsContainer.hexists(params[0], params[1]); + redisFuture.whenComplete( + (exist, throwable) -> { + if (!(Boolean) exist) { + this.redisCommandsContainer.hset( + params[0], params[1], params[2]); + } + }); + } } - } break; - case HMSET: { - if (params.length < 2) { - throw new RuntimeException("params length must be greater than 2"); - } - if (params.length % 2 != 1) { - throw new RuntimeException("params length must be odd"); - } - // 遍历把params第一个下标作为key,从第二个下标作为value,存进map中 - Map hashField = new HashMap<>(); - for (int i = 1; i < params.length; i++) { - hashField.put(params[i], params[++i]); - } - if (!this.setIfAbsent) { - redisFuture = this.redisCommandsContainer.hmset(params[0], hashField); - } else { - redisFuture = this.redisCommandsContainer.exists(params[0]); - redisFuture.whenComplete( - (exist, throwable) -> { - if (!(Boolean) exist) { - this.redisCommandsContainer.hmset(params[0], hashField); - } - }); + case HMSET: + { + if (params.length < 2) { + throw new RuntimeException("params length must be greater than 2"); + } + if (params.length % 2 != 1) { + throw new RuntimeException("params length must be odd"); + } + // 遍历把params第一个下标作为key,从第二个下标作为value,存进map中 + Map hashField = new HashMap<>(); + for (int i = 1; i < params.length; i++) { + hashField.put(params[i], params[++i]); + } + if (!this.setIfAbsent) { + redisFuture = this.redisCommandsContainer.hmset(params[0], hashField); + } else { + redisFuture = this.redisCommandsContainer.exists(params[0]); + redisFuture.whenComplete( + (exist, throwable) -> { + if (!(Boolean) exist) { + this.redisCommandsContainer.hmset(params[0], hashField); + } + }); + } } - } break; case HINCRBY: redisFuture = @@ -349,9 +357,10 @@ private RedisFuture rowKindDelete(String[] params) { Double d = -Double.valueOf(params[1]); redisFuture = this.redisCommandsContainer.zincrBy(params[0], d, params[2]); break; - case HDEL: { - redisFuture = this.redisCommandsContainer.hdel(params[0], params[1]); - } + case HDEL: + { + redisFuture = this.redisCommandsContainer.hdel(params[0], params[1]); + } break; case HINCRBY: redisFuture = diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLInsertTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLInsertTest.java index 11392bb..60db8c5 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLInsertTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLInsertTest.java @@ -18,6 +18,10 @@ package org.apache.flink.streaming.connectors.redis.table; +import static org.apache.flink.streaming.connectors.redis.config.RedisValidator.REDIS_COMMAND; + +import io.lettuce.core.Range; + import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.command.RedisCommand; import org.apache.flink.streaming.connectors.redis.table.base.TestRedisConfigBase; @@ -27,10 +31,6 @@ import org.junit.jupiter.api.Test; import org.junit.platform.commons.util.Preconditions; -import io.lettuce.core.Range; - -import static org.apache.flink.streaming.connectors.redis.config.RedisValidator.REDIS_COMMAND; - /** Created by jeff.zou on 2020/9/10. */ public class SQLInsertTest extends TestRedisConfigBase { @@ -51,7 +51,7 @@ public void testSetSQL() throws Exception { + REDIS_COMMAND + "'='" + RedisCommand.SET - + "')"; + + "', 'audit.log'='true')"; tEnv.executeSql(ddl); String sql = @@ -107,7 +107,8 @@ public void testHmsetSQL() throws Exception { + "', 'minIdle'='1' )"; tEnv.executeSql(ddl); - String sql = " insert into sink_redis select * from (values ('test_hash', '3', '18', 'city', '广州'))"; + String sql = + " insert into sink_redis select * from (values ('test_hash', '3', '18', 'city', '广州'))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Preconditions.condition(singleRedisCommands.hget("test_hash", "city").equals("广州"), ""); @@ -163,7 +164,7 @@ public void testSRem() throws Exception { TableResult tableResult = tEnv.executeSql("insert into redis_sink select * from (values('set', 'test2'))"); tableResult.getJobClient().get().getJobExecutionResult().get(); - Preconditions.condition(singleRedisCommands.sismember("set", "test2") == false, ""); + Preconditions.condition(!singleRedisCommands.sismember("set", "test2"), ""); } @Test @@ -274,13 +275,11 @@ public void testSinkValueFrom() throws Exception { TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); String s = - new StringBuilder() - .append("test") - .append(RedisDynamicTableFactory.CACHE_SEPERATOR) - .append("11.3") - .append(RedisDynamicTableFactory.CACHE_SEPERATOR) - .append("10.3") - .toString(); + "test" + + RedisDynamicTableFactory.CACHE_SEPERATOR + + "11.3" + + RedisDynamicTableFactory.CACHE_SEPERATOR + + "10.3"; Preconditions.condition(singleRedisCommands.get("test").equals(s), ""); } @@ -473,21 +472,22 @@ public void testZaddAndzremRangeByScore() throws Exception { EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); singleRedisCommands.del("test_zadd"); - String datagen = "CREATE TABLE datagen (" + - " score INT," + - " mm VARCHAR" + - " ) WITH (" + - " 'connector' = 'datagen'," + - " 'rows-per-second'='1'," + - " 'fields.score.kind'='sequence'," + - " 'fields.score.start'='1'," + - " 'fields.score.end'='10'," + - " 'fields.mm.length'='10'" + - " )"; + String datagen = + "CREATE TABLE datagen (" + + " score INT," + + " mm VARCHAR" + + " ) WITH (" + + " 'connector' = 'datagen'," + + " 'rows-per-second'='1'," + + " 'fields.score.kind'='sequence'," + + " 'fields.score.start'='1'," + + " 'fields.score.end'='10'," + + " 'fields.mm.length'='10'" + + " )"; String ddl = - "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + - "rem_min_score DOUBLE, rem_max_score DOUBLE) with ( " + - "'connector'='redis', " + "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + + "rem_min_score DOUBLE, rem_max_score DOUBLE) with ( " + + "'connector'='redis', " + "'host'='" + REDIS_HOST + "','port'='" @@ -503,13 +503,15 @@ public void testZaddAndzremRangeByScore() throws Exception { tEnv.executeSql(datagen); tEnv.executeSql(ddl); - String sql = " insert into sink_redis select 'test_zadd' AS key, score, mm, 1 as rem_min_score ,5 as " + - "rem_max_score " + - "from datagen"; + String sql = + " insert into sink_redis select 'test_zadd' AS key, score, mm, 1 as rem_min_score ,5 as " + + "rem_max_score " + + "from datagen"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Range range = Range.create(1, 5); - Preconditions.condition(singleRedisCommands.zrangebyscore("test_zadd", range).size() == 0, ""); + Preconditions.condition( + singleRedisCommands.zrangebyscore("test_zadd", range).size() == 0, ""); } @Test @@ -520,23 +522,24 @@ public void testZaddAndzremRangeByLex() throws Exception { EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); singleRedisCommands.del("test_zadd"); - String datagen = "CREATE TABLE datagen (" + - " index INT," + - " mm VARCHAR" + - " ) WITH (" + - " 'connector' = 'datagen'," + - " 'rows-per-second'='1'," + - " 'fields.index.kind'='sequence'," + - " 'fields.index.start'='1'," + - " 'fields.index.end'='10'," + - " 'fields.mm.length'='5'" + - " )"; + String datagen = + "CREATE TABLE datagen (" + + " index INT," + + " mm VARCHAR" + + " ) WITH (" + + " 'connector' = 'datagen'," + + " 'rows-per-second'='1'," + + " 'fields.index.kind'='sequence'," + + " 'fields.index.start'='1'," + + " 'fields.index.end'='10'," + + " 'fields.mm.length'='5'" + + " )"; String ddl = - "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + - "rem_begin VARCHAR, rem_end VARCHAR" + - ",primary key (key) not ENFORCED" + - ") with ( " + - "'connector'='redis', " + "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + + "rem_begin VARCHAR, rem_end VARCHAR" + + ",primary key (key) not ENFORCED" + + ") with ( " + + "'connector'='redis', " + "'host'='" + REDIS_HOST + "','port'='" @@ -552,13 +555,15 @@ public void testZaddAndzremRangeByLex() throws Exception { tEnv.executeSql(datagen); tEnv.executeSql(ddl); - String sql = " insert into sink_redis select 'test_zadd' AS key, 100 as score, concat('aa', mm) mm, 'aa' as " + - "rem_begin ,'bb' as rem_end " + - "from datagen"; + String sql = + " insert into sink_redis select 'test_zadd' AS key, 100 as score, concat('aa', mm) mm, 'aa' as " + + "rem_begin ,'bb' as rem_end " + + "from datagen"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Range range = Range.create("aa", "bb"); - Preconditions.condition(singleRedisCommands.zrangebylex("test_zadd", range).size() == 0, ""); + Preconditions.condition( + singleRedisCommands.zrangebylex("test_zadd", range).size() == 0, ""); } @Test @@ -569,23 +574,24 @@ public void testZaddAndzremRangeByRank() throws Exception { EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); singleRedisCommands.del("test_zadd"); - String datagen = "CREATE TABLE datagen (" + - " index INT," + - " mm VARCHAR" + - " ) WITH (" + - " 'connector' = 'datagen'," + - " 'rows-per-second'='1'," + - " 'fields.index.kind'='sequence'," + - " 'fields.index.start'='1'," + - " 'fields.index.end'='10'," + - " 'fields.mm.length'='5'" + - " )"; + String datagen = + "CREATE TABLE datagen (" + + " index INT," + + " mm VARCHAR" + + " ) WITH (" + + " 'connector' = 'datagen'," + + " 'rows-per-second'='1'," + + " 'fields.index.kind'='sequence'," + + " 'fields.index.start'='1'," + + " 'fields.index.end'='10'," + + " 'fields.mm.length'='5'" + + " )"; String ddl = - "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + - "rem_begin INT, rem_end INT" + - ",primary key (key) not ENFORCED" + - ") with ( " + - "'connector'='redis', " + "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " + + "rem_begin INT, rem_end INT" + + ",primary key (key) not ENFORCED" + + ") with ( " + + "'connector'='redis', " + "'host'='" + REDIS_HOST + "','port'='" @@ -601,12 +607,12 @@ public void testZaddAndzremRangeByRank() throws Exception { tEnv.executeSql(datagen); tEnv.executeSql(ddl); - String sql = " insert into sink_redis select 'test_zadd' AS key, 100 as score, concat('aa', mm) mm, 0 as " + - "rem_begin ,9 as rem_end " + - "from datagen"; + String sql = + " insert into sink_redis select 'test_zadd' AS key, 100 as score, concat('aa', mm) mm, 0 as " + + "rem_begin ,9 as rem_end " + + "from datagen"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Preconditions.condition(singleRedisCommands.zrange("test_zadd", 0, -1).size() == 0, ""); } - }