Skip to content

Commit

Permalink
Merge branch 'main' into 1.4.4
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisOptions.java
#	src/main/java/org/apache/flink/streaming/connectors/redis/mapper/RowRedisSinkMapper.java
#	src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLInsertTest.java
  • Loading branch information
jeff-zou committed Jan 28, 2025
2 parents 1e338fb + 93d0d93 commit 0f6ded5
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 15 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/build-redis-connector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: Build Redis Connector
on:
pull_request:
push:

jobs:
build-extension:
name: "Build Connector"
runs-on: ubuntu-latest
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@master

- name: Setup java
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: '8'

- name: Build flink connector 1.15
run: |
mvn clean package -DskipTests \
-Dflink.version=1.15.0
46 changes: 46 additions & 0 deletions .github/workflows/checkstyle.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: Code Style Checker

on:
pull_request:
push:
branches:
- dev

jobs:
java-checkstyle:
name: "CheckStyle"
runs-on: ubuntu-24.04
steps:
- name: Checkout
uses: actions/checkout@v3
with:
persist-credentials: false
submodules: recursive

- name: Setup java
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: '8'

- name: Run java checkstyle
run:
mvn clean compile checkstyle:checkstyle
36 changes: 36 additions & 0 deletions .github/workflows/license-eyes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
#Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: License Check
on:
pull_request:
push:
branches:
- dev
jobs:
license-check:
name: "License Check"
runs-on: ubuntu-24.04
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v2
- name: Check License
uses: apache/skywalking-eyes@v0.2.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
3 changes: 2 additions & 1 deletion README-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ key: name, field:subject, value: name\01subject\01score.
| scan.range.start | (none) | Integer | lrange start |
| scan.range.stop | (none) | Integer | lrange start |
| scan.count | (none) | Integer | srandmember count |
| zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK |
| zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK |
| audit.log | false | Boolean | Turn on the audit log switch |


##### sink with ttl parameters
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ on j.name = 'test'
| scan.range.start | (none) | Integer | 查询list结构时指定lrange start |
| scan.range.stop | (none) | Integer | 查询list结构时指定lrange start |
| scan.count | (none) | Integer | 查询set结构时指定srandmember count |
| zset.zremrangeby | (none) | String | 执行zadd之后,是否执行zremrangeby,取值:SCORE、LEX、RANK |
| zset.zremrangeby | (none) | String | 执行zadd之后,是否执行zremrangeby,取值:SCORE、LEX、RANK |
| audit.log | false | Boolean | 打开sink日志 |

### 3.1.1 command值与redis命令对应关系:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,29 @@ public class RedisCommandDescription extends RedisCommandBaseDescription impleme

private static final long serialVersionUID = 1L;

private Integer ttl;
private final Integer ttl;

private Boolean setIfAbsent;
private final Boolean setIfAbsent;

private LocalTime expireTime;
private final LocalTime expireTime;

private boolean ttlKeyNotAbsent;
private final Boolean ttlKeyNotAbsent;

private final boolean auditLog;

public RedisCommandDescription(
RedisCommand redisCommand,
Integer ttl,
LocalTime expireTime,
Boolean setIfAbsent,
Boolean ttlKeyNotAbsent) {
Boolean ttlKeyNotAbsent,
Boolean auditLog) {
super(redisCommand);
this.expireTime = expireTime;
this.ttl = ttl;
this.setIfAbsent = setIfAbsent;
this.ttlKeyNotAbsent = ttlKeyNotAbsent;
this.auditLog = auditLog;
}

public Integer getTTL() {
Expand All @@ -62,4 +66,8 @@ public Boolean getSetIfAbsent() {
public boolean getTtlKeyNotAbsent() {
return ttlKeyNotAbsent;
}

public boolean isAuditLog() {
return auditLog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** Created by Jeff Zou on 2020/9/10. */
/** Created by jeff.zou on 2020/9/10. */
public class RedisOptions {

public static final ConfigOption<Integer> TIMEOUT =
Expand Down Expand Up @@ -205,6 +205,11 @@ public class RedisOptions {
.stringType()
.defaultValue(null)
.withDescription("Remove related elements,Valid values: LEX,RANK,SCORE");
public static final ConfigOption<Boolean> AUDIT_LOG =
ConfigOptions.key("audit.log")
.booleanType()
.defaultValue(false)
.withDescription("Optional turn on the audit log switch.");

private RedisOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ public class RowRedisSinkMapper implements RedisSinkMapper<GenericRowData> {

private final Boolean ttlKeyNotAbsent;

private final Boolean auditLog;

public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) {
this.redisCommand = redisCommand;
this.ttl = config.get(RedisOptions.TTL);
this.setIfAbsent = config.get(RedisOptions.SET_IF_ABSENT);
this.ttlKeyNotAbsent = config.get(RedisOptions.TTL_KEY_NOT_ABSENT);
this.auditLog = config.get(RedisOptions.AUDIT_LOG);
String expireOnTime = config.get(RedisOptions.EXPIRE_ON_TIME);
if (!StringUtils.isNullOrWhitespaceOnly(expireOnTime)) {
this.expireTime = LocalTime.parse(expireOnTime);
Expand All @@ -57,14 +60,28 @@ public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(
redisCommand, ttl, expireTime, setIfAbsent, ttlKeyNotAbsent);
redisCommand, ttl, expireTime, setIfAbsent, ttlKeyNotAbsent, auditLog);
}

@Override
public String getKeyFromData(RowData rowData, LogicalType logicalType, Integer keyIndex) {
return RedisRowConverter.rowDataToString(logicalType, rowData, keyIndex);
}

@Override
public String getValueFromData(RowData rowData, LogicalType logicalType, Integer valueIndex) {
return RedisRowConverter.rowDataToString(logicalType, rowData, valueIndex);
}

@Override
public String getFieldFromData(RowData rowData, LogicalType logicalType, Integer fieldIndex) {
return RedisRowConverter.rowDataToString(logicalType, rowData, fieldIndex);
}

public RedisCommand getRedisCommand() {
return redisCommand;
}

@Override
public boolean equals(Object obj) {
RedisCommand redisCommand = ((RowRedisSinkMapper) obj).redisCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(RedisOptions.SCAN_RANGE_START);
options.add(RedisOptions.SCAN_COUNT);
options.add(RedisOptions.ZREM_RANGEBY);
options.add(RedisOptions.AUDIT_LOG);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class RedisSinkFunction<IN> extends RichSinkFunction<IN> {
private final List<DataType> columnDataTypes;
private final RedisValueDataStructure redisValueDataStructure;
private final String zremrangeby;
private final boolean auditLog;
protected Integer ttl;
protected int expireTimeSeconds = -1;
private transient RedisCommandsContainer redisCommandsContainer;
Expand Down Expand Up @@ -99,6 +100,7 @@ public RedisSinkFunction(
this.ttl = redisCommandDescription.getTTL();
this.ttlKeyNotAbsent = redisCommandDescription.getTtlKeyNotAbsent();
this.setIfAbsent = redisCommandDescription.getSetIfAbsent();
this.auditLog = redisCommandDescription.isAuditLog();
if (redisCommandDescription.getExpireTime() != null) {
this.expireTimeSeconds = redisCommandDescription.getExpireTime().toSecondOfDay();
}
Expand Down Expand Up @@ -136,6 +138,9 @@ public void invoke(IN input, Context context) throws Exception {
}

startSink(params, kind);
if (auditLog) {
LOG.info("{}", rowData);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testSinkValueWithExpire() throws Exception {

String sink =
"create table sink_redis(name varchar, level varchar, age varchar) with ( "
+ sigleWith()
+ singleWith()
+ "'ttl'='10', '"
+ REDIS_COMMAND
+ "'='"
Expand Down Expand Up @@ -74,7 +74,7 @@ public void testSinkValueWithExpireOnKeyPresent() throws Exception {

String dim =
"create table sink_redis(name varchar, level varchar, age varchar) with ( "
+ sigleWith()
+ singleWith()
+ " 'ttl'='8', 'ttl.key.not.absent'='true', '"
+ REDIS_COMMAND
+ "'='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.Preconditions;

/** Created by Jeff Zou on 2020/9/10. */
/** Created by jeff Zou on 2020/9/10. */
public class SQLInsertTest extends TestRedisConfigBase {

@Test
Expand All @@ -51,7 +51,7 @@ public void testSetSQL() throws Exception {
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.SET
+ "')";
+ "', 'audit.log'='true')";

tEnv.executeSql(ddl);
String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testSinkLimitLettucePool() throws Exception {

String sink =
"create table sink_redis(name varchar, level varchar, age varchar) with ( "
+ sigleWith()
+ singleWith()
+ "'ttl'='10', '"
+ REDIS_COMMAND
+ "'='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void stopSingle() {
redisClient.shutdown();
}

protected String sigleWith() {
protected String singleWith() {
return "'connector'='redis', "
+ "'host'='"
+ REDIS_HOST
Expand Down
17 changes: 17 additions & 0 deletions style/spotless-formatter.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<profiles version="13">
<profile kind="CodeFormatterProfile" name="'Flink Connector for Redis'" version="13">
<setting id="org.eclipse.jdt.core.compiler.source" value="1.8"/>
Expand Down

0 comments on commit 0f6ded5

Please sign in to comment.