-
Notifications
You must be signed in to change notification settings - Fork 401
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: maosen <sunmaosen@dmetasoul.com> Co-authored-by: maosen <sunmaosen@dmetasoul.com>
- Loading branch information
Showing
3 changed files
with
784 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,389 @@ | ||
# Flink Standalone Cluster | ||
|
||
<!-- | ||
SPDX-FileCopyrightText: 2023 LakeSoul Contributors | ||
|
||
SPDX-License-Identifier: Apache-2.0 | ||
--> | ||
|
||
|
||
## Support Matrix | ||
|
||
| LakeSoul | Flink | | ||
|----------------------------|----------------------------------------------------------| | ||
2.4.x | 1.17 | ||
2.1.x-2.3.x | 1.14 | ||
|
||
## PG Configuration | ||
|
||
Add the following configuration to `$FLINK_HOME/conf/flink-conf.yaml`: | ||
```yaml | ||
containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver | ||
containerized.master.env.LAKESOUL_PG_USERNAME: root | ||
containerized.master.env.LAKESOUL_PG_PASSWORD: root | ||
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified | ||
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver | ||
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root | ||
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root | ||
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified | ||
``` | ||
Note that both the master and taskmanager environment variables need to be set. | ||
|
||
:::tip | ||
The connection information, username and password of the Postgres database need to be modified according to the actual deployment. | ||
::: | ||
|
||
:::caution | ||
Note that if you use Session mode to start a job, that is, submit the job to Flink Standalone Cluster as a client, `flink run` as a client will not read the above configuration, so you need to configure the environment variables separately, namely: | ||
|
||
```bash | ||
export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver | ||
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified | ||
export LAKESOUL_PG_USERNAME=root | ||
export LAKESOUL_PG_PASSWORD=root | ||
```` | ||
::: | ||
|
||
## SQL | ||
### Download LakeSoul Flink Jar | ||
It can be downloaded from the LakeSoul Release page: https://github.com/lakesoul-io/LakeSoul/releases/download/v2.4.0/lakesoul-flink-2.4.0-flink-1.17.jar. | ||
### Start SQL Client | ||
```bash | ||
# Start Flink SQL Client | ||
bin/sql-client.sh embedded -j lakesoul-flink-2.4.0-flink-1.17.jar | ||
``` | ||
## Create Table | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```Java | ||
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); | ||
String createUserSql = "create table user_info (" + | ||
"`id` INT," + | ||
"name STRING," + | ||
"score INT," + | ||
"`date` STRING," + | ||
"region STRING," + | ||
" PRIMARY KEY (`id`,`name`) NOT ENFORCED"+ | ||
") PARTITIONED BY (`region`,`date`)"+ | ||
" WITH (" + | ||
" 'connector'='lakesoul'," + | ||
" 'hashBucketNum'='4'," + | ||
" 'use_cdc'='true'," + | ||
" 'path'='/tmp/lakesoul/flink/sink/test' )"; | ||
tEnv. executeSql(createUserSql); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
-- Create the test_table table, use id and name as the joint primary key, use region and date as the two-level range partition, catalog is lakesoul, and database is default | ||
create table `lakesoul`.`default`.test_table ( | ||
`id` INT, | ||
name STRING, | ||
score INT, | ||
`date` STRING, | ||
region STRING, | ||
PRIMARY KEY (`id`,`name`) NOT ENFORCED | ||
) PARTITIONED BY (`region`,`date`) | ||
WITH ( | ||
'connector'='lakesoul', | ||
'hashBucketNum'='4', | ||
'use_cdc'='true', | ||
'path'='file:///tmp/lakesoul/flink/sink/test'); | ||
``` | ||
|
||
</TabItem> | ||
</Tabs> | ||
|
||
:::tip | ||
The meaning of the parameters for creating a table | ||
|
||
| Parameter | Explanation | Value Format | | ||
| -------------- | ---------------------------------- | -------------------------------------------- | | ||
| PARTITIONED BY | used to specify the range partition field of the table, if there is no range partition field, it will be omitted | PARTITIONED BY (`date`) | | ||
| PRIMARY KEY | used to specify one or more primary keys | PARIMARY KEY (`id`, `name`) NOT ENFORCED | | ||
| connector | data source connector, used to specify the data source type | 'connector'='lakesoul' | | ||
| hashBucketNum | table with primary key(s) must have this property set to a number >= 0 | 'hashBucketNum'='4' | | ||
| path | used to specify the storage path of the table | 'path'='file:///tmp/lakesoul/flink/sink/test' | | ||
| use_cdc | Set whether the table is in CDC format (refer to [CDC Table Format](../03-Usage%20Docs/04-cdc-ingestion-table.mdx) ) | 'use_cdc'='true' | | ||
::: | ||
## Drop Table | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("DROP TABLE if exists test_table"); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
DROP TABLE if exists test_table; | ||
``` | ||
|
||
</TabItem> | ||
</Tabs> | ||
|
||
## Insert Data | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("insert into `lakesoul`.`default`.test_table values (1, 'AAA', 98, '2023-05-10', 'China')"). await(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
批式:直接插入数据 | ||
```sql | ||
insert into `lakesoul`.`default`.test_table values (1,'AAA', 98, '2023-05-10', 'China'); | ||
``` | ||
流式:构建流式任务,从另一个流式数据源中读取数据并写入到 LakeSoul 表中。如果上游数据是 CDC 的格式,则目标写入的 LakeSoul 表也需要设置为 CDC 表。 | ||
```sql | ||
-- 表示将`lakesoul`.`cdcsink`.soure_table表中的全部数据,插入到lakesoul`.`default`.test_table | ||
insert into `lakesoul`.`default`.test_table select * from `lakesoul`.`cdcsink`.soure_table; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
:::caution | ||
1. Both stream and batch writing must enable the `execution.checkpointing.checkpoints-after-tasks-finish.enabled` option; | ||
2. For stream writing, checkpoint interval needs to be set, and it is recommended to be more than 1 minute; | ||
3. Set the corresponding time zone according to the environment: | ||
|
||
```sql | ||
SET 'table.local-time-zone' = 'Asia/Shanghai'; | ||
set 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'; | ||
-- Set the checkpointing interval | ||
set 'execution.checkpointing.interval' = '2min'; | ||
``` | ||
::: | ||
|
||
## Update Data | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1") await(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
:::caution | ||
Note that in the case of `update`, updating the values of primary key and partition columns is not allowed.For the stream execution mode, LakeSoul has been able to support ChangeLog semantics, which can support additions, deletions and modifications. | ||
::: | ||
## Delete Data | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("DELETE FROM `lakesoul`.`default`.test_table where id = 1") await(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
DELETE FROM `lakesoul`.`default`.test_table where id = 1; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
:::caution | ||
In the case of `delete`, partitioning columns in the condition are not allowed.For the stream execution mode, LakeSoul has been able to support ChangeLog semantics, which can support additions, deletions and modifications. | ||
::: | ||
## Query Data | ||
### Full Read | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
// Create a batch execution environment | ||
tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10'").print(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10'; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
### Snapshot Batch Read | ||
LakeSoul supports snapshot reading of tables, and users can query all data before the end timestamp by specifying partition information and end timestamp. | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
-- Execute snapshot read of test_table in the region=China partition, the end timestamp of the read is 2023-05-01 15:20:15, and the time zone is Asia/Shanghai | ||
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
### Incremental Range Read | ||
LakeSoul supports range incremental reads for tables. Users can query incremental data within this time range by specifying partition information, start timestamp, and end timestamp. | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
|
||
```java | ||
tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental','readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
-- Incremental reading of test_table in the region=China partition, the read timestamp range is 2023-05-01 15:15:15 to 2023-05-01 15:20:15, and the time zone is Asia/Shanghai | ||
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental', 'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
### Streaming Read | ||
The LakeSoul table supports streaming reads in Flink. Streaming reads are based on incremental reads. By specifying the start timestamp and partition information, users can continuously and uninterruptedly read new data after the start timestamp. | ||
If start timestamp is not specified, it will read from the first data。 | ||
|
||
<Tabs | ||
defaultValue="SQL" | ||
values={[ | ||
{label: 'Java', value: 'Java'}, | ||
{label: 'SQL', value: 'SQL'}, | ||
]}> | ||
<TabItem value="Java"> | ||
```java | ||
tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China'").print(); | ||
``` | ||
|
||
</TabItem> | ||
<TabItem value="SQL"> | ||
|
||
```sql | ||
-- Incremental reading of test_table in the region=China partition, the time zone is Asia/Shanghai | ||
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China'; | ||
``` | ||
</TabItem> | ||
</Tabs> | ||
|
||
LakeSoul fully supports Flink Changelog Stream semantics when streaming. For the LakeSoul CDC table, the result of incremental reading is still in CDC format, that is, it contains `insert`, `update`, `delete` events, and these events will be automatically converted to the corresponding values of the RowKind field of Flink's RowData class object, so that in Flink incremental pipeline calculation is achieved. | ||
|
||
### Lookup Join | ||
|
||
LakeSoul supports Lookup Join operations of Flink SQL. Lookup Join will cache the right table to be joined in memory, thereby greatly improving the join speed, and can be used in scenarios where relatively small dimension tables are joined. LakeSoul tries to refresh the cache every 60 seconds by default, you could change this by setting `'lookup.join.cache.ttl'='60s'` property when creating the dimension table. | ||
```sql | ||
CREATE TABLE `lakesoul`.`default`.customers ( | ||
`c_id` INT, | ||
`name` STRING, | ||
PRIMARY KEY (`c_id`) NOT ENFORCED) | ||
WITH ( | ||
'connector'='lakesoul', | ||
'hashBucketNum'='1', | ||
'path'='file:///tmp/lakesoul/flink/sink/customers' | ||
); | ||
CREATE TABLE `lakesoul`.`default`.orders ( | ||
`o_id` INT, | ||
`o_c_id` INT, | ||
PRIMARY KEY (`o_id`) NOT ENFORCED) | ||
WITH ( | ||
'connector'='lakesoul', | ||
'hashBucketNum'='1', | ||
'path'='file:///tmp/lakesoul/flink/sink/orders', | ||
'lookup.join.cache.ttl'='60s' | ||
); | ||
SELECT `o_id`, `c_id`, `name` | ||
FROM | ||
(SELECT *, proctime() as proctime FROM `lakesoul`.`default`.orders) as o | ||
JOIN `lakesoul`.`default`.customers FOR SYSTEM_TIME AS OF o.proctime | ||
ON c_id = o_cid; | ||
``` | ||
The Orders table is enriched with data from the Customers table. The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute ensures that each row of the Orders table is joined with those Customers rows that match the join predicate at the point in time when the Orders row is processed by the join operator. It also prevents that the join result is updated when a joined Customer row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.oc_id = c.id. | ||
|
||
:::tip | ||
LakeSoul supports read LakeSoul tables in batch and stream mode, execute commands on the Flink SQLClient client, and switch between stream and batch execution modes. | ||
```sql | ||
-- Execute Flink tasks according to the stream | ||
SET execution.runtime-mode = streaming; | ||
SET 'execution.checkpointing.interval' = '1min'; | ||
-- Execute Flink tasks in batch mode | ||
SET execution.runtime-mode = batch; | ||
``` | ||
Using Flink SQL, the format of the specified conditional query is `SELECT * FROM test_table /*+ OPTIONS('key'='value')*/ WHERE partition=somevalue`. In all of the following read modes, you could optionally specify partition values in `WHERE` clause to either specify the exact all partition values or just a subset of partitions values. LakeSoul will find the partitions that match the partition filters. | ||
In the query, `/* OPTIONS() */` are query options (hints). Hints must be placed directly after the table name (before any other subclause) and the options when LakeSoul reads include: | ||
|
||
| Parameter | Explanation of meaning | Parameter filling format | | ||
| ----------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------ | | ||
| readtype | read type, you can specify incremental read incremental, snapshot read snapshot, do not specify the default full read | 'readtype'='incremental' | | ||
| discoveryinterval | The time interval for discovering new data in streaming incremental read, in milliseconds, the default is 30000 | 'discoveryinterval'='10000' | | ||
| readstarttime | Start read timestamp, if no start timestamp is specified, it will read from the start version number by default | 'readstarttime'='2023-05-01 15:15:15' | | ||
| readendtime | End read timestamp, if no end timestamp is specified, the current latest version number will be read by default | 'readendtime'='2023-05-01 15:20:15' | | ||
| timezone | The time zone information of the timestamp, if the time zone information of the timestamp is not specified, it will be processed according to the local time zone by default | 'timezone'='Asia/Sahanghai' | | ||
|
||
::: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.