diff --git a/website/docs/01-Getting Started/04-Flink-Guide.mdx b/website/docs/01-Getting Started/04-Flink-Guide.mdx
new file mode 100644
index 000000000..719dfc39f
--- /dev/null
+++ b/website/docs/01-Getting Started/04-Flink-Guide.mdx
@@ -0,0 +1,389 @@
+# Flink Standalone Cluster
+
+
+
+
+## Support Matrix
+
+| LakeSoul | Flink |
+|----------------------------|----------------------------------------------------------|
+ 2.4.x | 1.17
+ 2.1.x-2.3.x | 1.14
+
+## PG Configuration
+
+Add the following configuration to `$FLINK_HOME/conf/flink-conf.yaml`:
+```yaml
+containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
+containerized.master.env.LAKESOUL_PG_USERNAME: root
+containerized.master.env.LAKESOUL_PG_PASSWORD: root
+containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
+containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
+containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
+containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+```
+Note that both the master and taskmanager environment variables need to be set.
+
+:::tip
+The connection information, username and password of the Postgres database need to be modified according to the actual deployment.
+:::
+
+:::caution
+Note that if you use Session mode to start a job, that is, submit the job to Flink Standalone Cluster as a client, `flink run` as a client will not read the above configuration, so you need to configure the environment variables separately, namely:
+
+```bash
+export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
+export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+export LAKESOUL_PG_USERNAME=root
+export LAKESOUL_PG_PASSWORD=root
+````
+:::
+
+## SQL
+### Download LakeSoul Flink Jar
+It can be downloaded from the LakeSoul Release page: https://github.com/lakesoul-io/LakeSoul/releases/download/v2.4.0/lakesoul-flink-2.4.0-flink-1.17.jar.
+### Start SQL Client
+```bash
+# Start Flink SQL Client
+bin/sql-client.sh embedded -j lakesoul-flink-2.4.0-flink-1.17.jar
+```
+## Create Table
+
+
+
+
+ ```Java
+ TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+ String createUserSql = "create table user_info (" +
+ "`id` INT," +
+ "name STRING," +
+ "score INT," +
+ "`date` STRING," +
+ "region STRING," +
+ " PRIMARY KEY (`id`,`name`) NOT ENFORCED"+
+ ") PARTITIONED BY (`region`,`date`)"+
+ " WITH (" +
+ " 'connector'='lakesoul'," +
+ " 'hashBucketNum'='4'," +
+ " 'use_cdc'='true'," +
+ " 'path'='/tmp/lakesoul/flink/sink/test' )";
+ tEnv. executeSql(createUserSql);
+ ```
+
+
+
+
+ ```sql
+ -- Create the test_table table, use id and name as the joint primary key, use region and date as the two-level range partition, catalog is lakesoul, and database is default
+ create table `lakesoul`.`default`.test_table (
+ `id` INT,
+ name STRING,
+ score INT,
+ `date` STRING,
+ region STRING,
+ PRIMARY KEY (`id`,`name`) NOT ENFORCED
+ ) PARTITIONED BY (`region`,`date`)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='4',
+ 'use_cdc'='true',
+ 'path'='file:///tmp/lakesoul/flink/sink/test');
+ ```
+
+
+
+
+:::tip
+The meaning of the parameters for creating a table
+
+| Parameter | Explanation | Value Format |
+| -------------- | ---------------------------------- | -------------------------------------------- |
+| PARTITIONED BY | used to specify the range partition field of the table, if there is no range partition field, it will be omitted | PARTITIONED BY (`date`) |
+| PRIMARY KEY | used to specify one or more primary keys | PARIMARY KEY (`id`, `name`) NOT ENFORCED |
+| connector | data source connector, used to specify the data source type | 'connector'='lakesoul' |
+| hashBucketNum | table with primary key(s) must have this property set to a number >= 0 | 'hashBucketNum'='4' |
+| path | used to specify the storage path of the table | 'path'='file:///tmp/lakesoul/flink/sink/test' |
+| use_cdc | Set whether the table is in CDC format (refer to [CDC Table Format](../03-Usage%20Docs/04-cdc-ingestion-table.mdx) ) | 'use_cdc'='true' |
+:::
+## Drop Table
+
+
+
+
+ ```java
+ tEnvs.executeSql("DROP TABLE if exists test_table");
+ ```
+
+
+
+
+ ```sql
+ DROP TABLE if exists test_table;
+ ```
+
+
+
+
+## Insert Data
+
+
+
+
+ ```java
+ tEnvs.executeSql("insert into `lakesoul`.`default`.test_table values (1, 'AAA', 98, '2023-05-10', 'China')"). await();
+ ```
+
+
+
+
+ 批式:直接插入数据
+ ```sql
+ insert into `lakesoul`.`default`.test_table values (1,'AAA', 98, '2023-05-10', 'China');
+ ```
+ 流式:构建流式任务,从另一个流式数据源中读取数据并写入到 LakeSoul 表中。如果上游数据是 CDC 的格式,则目标写入的 LakeSoul 表也需要设置为 CDC 表。
+ ```sql
+ -- 表示将`lakesoul`.`cdcsink`.soure_table表中的全部数据,插入到lakesoul`.`default`.test_table
+ insert into `lakesoul`.`default`.test_table select * from `lakesoul`.`cdcsink`.soure_table;
+ ```
+
+
+
+:::caution
+1. Both stream and batch writing must enable the `execution.checkpointing.checkpoints-after-tasks-finish.enabled` option;
+2. For stream writing, checkpoint interval needs to be set, and it is recommended to be more than 1 minute;
+3. Set the corresponding time zone according to the environment:
+
+```sql
+SET 'table.local-time-zone' = 'Asia/Shanghai';
+set 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';
+-- Set the checkpointing interval
+set 'execution.checkpointing.interval' = '2min';
+```
+:::
+
+## Update Data
+
+
+
+
+ ```java
+ tEnvs.executeSql("UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1") await();
+ ```
+
+
+
+
+ ```sql
+ UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1;
+ ```
+
+
+
+:::caution
+Note that in the case of `update`, updating the values of primary key and partition columns is not allowed.For the stream execution mode, LakeSoul has been able to support ChangeLog semantics, which can support additions, deletions and modifications.
+:::
+## Delete Data
+
+
+
+
+ ```java
+ tEnvs.executeSql("DELETE FROM `lakesoul`.`default`.test_table where id = 1") await();
+ ```
+
+
+
+
+ ```sql
+ DELETE FROM `lakesoul`.`default`.test_table where id = 1;
+ ```
+
+
+
+:::caution
+In the case of `delete`, partitioning columns in the condition are not allowed.For the stream execution mode, LakeSoul has been able to support ChangeLog semantics, which can support additions, deletions and modifications.
+:::
+## Query Data
+### Full Read
+
+
+
+
+ ```java
+ // Create a batch execution environment
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10'").print();
+ ```
+
+
+
+
+ ```sql
+ SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10';
+ ```
+
+
+
+### Snapshot Batch Read
+LakeSoul supports snapshot reading of tables, and users can query all data before the end timestamp by specifying partition information and end timestamp.
+
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Execute snapshot read of test_table in the region=China partition, the end timestamp of the read is 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+### Incremental Range Read
+LakeSoul supports range incremental reads for tables. Users can query incremental data within this time range by specifying partition information, start timestamp, and end timestamp.
+
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental','readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Incremental reading of test_table in the region=China partition, the read timestamp range is 2023-05-01 15:15:15 to 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental', 'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+### Streaming Read
+The LakeSoul table supports streaming reads in Flink. Streaming reads are based on incremental reads. By specifying the start timestamp and partition information, users can continuously and uninterruptedly read new data after the start timestamp.
+If start timestamp is not specified, it will read from the first data。
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Incremental reading of test_table in the region=China partition, the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+LakeSoul fully supports Flink Changelog Stream semantics when streaming. For the LakeSoul CDC table, the result of incremental reading is still in CDC format, that is, it contains `insert`, `update`, `delete` events, and these events will be automatically converted to the corresponding values of the RowKind field of Flink's RowData class object, so that in Flink incremental pipeline calculation is achieved.
+
+### Lookup Join
+
+LakeSoul supports Lookup Join operations of Flink SQL. Lookup Join will cache the right table to be joined in memory, thereby greatly improving the join speed, and can be used in scenarios where relatively small dimension tables are joined. LakeSoul tries to refresh the cache every 60 seconds by default, you could change this by setting `'lookup.join.cache.ttl'='60s'` property when creating the dimension table.
+```sql
+CREATE TABLE `lakesoul`.`default`.customers (
+ `c_id` INT,
+ `name` STRING,
+ PRIMARY KEY (`c_id`) NOT ENFORCED)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='1',
+ 'path'='file:///tmp/lakesoul/flink/sink/customers'
+ );
+CREATE TABLE `lakesoul`.`default`.orders (
+ `o_id` INT,
+ `o_c_id` INT,
+ PRIMARY KEY (`o_id`) NOT ENFORCED)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='1',
+ 'path'='file:///tmp/lakesoul/flink/sink/orders',
+ 'lookup.join.cache.ttl'='60s'
+ );
+SELECT `o_id`, `c_id`, `name`
+FROM
+(SELECT *, proctime() as proctime FROM `lakesoul`.`default`.orders) as o
+JOIN `lakesoul`.`default`.customers FOR SYSTEM_TIME AS OF o.proctime
+ON c_id = o_cid;
+```
+The Orders table is enriched with data from the Customers table. The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute ensures that each row of the Orders table is joined with those Customers rows that match the join predicate at the point in time when the Orders row is processed by the join operator. It also prevents that the join result is updated when a joined Customer row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.oc_id = c.id.
+
+:::tip
+LakeSoul supports read LakeSoul tables in batch and stream mode, execute commands on the Flink SQLClient client, and switch between stream and batch execution modes.
+```sql
+-- Execute Flink tasks according to the stream
+SET execution.runtime-mode = streaming;
+SET 'execution.checkpointing.interval' = '1min';
+-- Execute Flink tasks in batch mode
+SET execution.runtime-mode = batch;
+```
+Using Flink SQL, the format of the specified conditional query is `SELECT * FROM test_table /*+ OPTIONS('key'='value')*/ WHERE partition=somevalue`. In all of the following read modes, you could optionally specify partition values in `WHERE` clause to either specify the exact all partition values or just a subset of partitions values. LakeSoul will find the partitions that match the partition filters.
+In the query, `/* OPTIONS() */` are query options (hints). Hints must be placed directly after the table name (before any other subclause) and the options when LakeSoul reads include:
+
+| Parameter | Explanation of meaning | Parameter filling format |
+| ----------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------ |
+| readtype | read type, you can specify incremental read incremental, snapshot read snapshot, do not specify the default full read | 'readtype'='incremental' |
+| discoveryinterval | The time interval for discovering new data in streaming incremental read, in milliseconds, the default is 30000 | 'discoveryinterval'='10000' |
+| readstarttime | Start read timestamp, if no start timestamp is specified, it will read from the start version number by default | 'readstarttime'='2023-05-01 15:15:15' |
+| readendtime | End read timestamp, if no end timestamp is specified, the current latest version number will be read by default | 'readendtime'='2023-05-01 15:20:15' |
+| timezone | The time zone information of the timestamp, if the time zone information of the timestamp is not specified, it will be processed according to the local time zone by default | 'timezone'='Asia/Sahanghai' |
+
+:::
\ No newline at end of file
diff --git a/website/docs/03-Usage Docs/11-machine-learning-support.md b/website/docs/03-Usage Docs/11-machine-learning-support.md
index aa1371450..60fcb3738 100644
--- a/website/docs/03-Usage Docs/11-machine-learning-support.md
+++ b/website/docs/03-Usage Docs/11-machine-learning-support.md
@@ -179,7 +179,7 @@ More Examples at [LakeSoul/python/examples](https://github.com/lakesoul-io/Lake
## Ray DataSource
LakeSoul implements Ray's [Datasource](https://docs.ray.io/en/latest/data/api/doc/ray.data.Datasource.html). The following is an example of calling code:
```python
-importray.data
+import ray.data
import lakesoul.ray
ds = ray.data.read_lakesoul("table_name", partitions={'split': 'train'})
```
diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/01-Getting Started/04-Flink-Guide.mdx b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/01-Getting Started/04-Flink-Guide.mdx
new file mode 100644
index 000000000..4edacfd96
--- /dev/null
+++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/01-Getting Started/04-Flink-Guide.mdx
@@ -0,0 +1,394 @@
+# Flink单节点集群
+
+
+
+
+## 版本支持
+
+| LakeSoul | Flink |
+|----------------------------|----------------------------------------------------------|
+ 2.4.x | 1.17
+ 2.1.x-2.3.x | 1.14
+
+## PG配置
+
+添加如下配置到 `$FLINK_HOME/conf/flink-conf.yaml`:
+```yaml
+containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
+containerized.master.env.LAKESOUL_PG_USERNAME: root
+containerized.master.env.LAKESOUL_PG_PASSWORD: root
+containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
+containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
+containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
+containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+```
+注意master和taskmanager都需要设置
+
+:::tip
+Postgres 数据库的连接信息、用户名密码需要根据实际情况修改。
+:::
+
+:::caution
+注意如果使用 Session 模式来启动作业,即将作业以 client 方式提交到 Flink Standalone Cluster,则 `flink run` 作为 client,是不会读取上面配置,因此需要再单独配置环境变量,即:
+
+```bash
+export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
+export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
+export LAKESOUL_PG_USERNAME=root
+export LAKESOUL_PG_PASSWORD=root
+````
+:::
+
+## SQL
+### 下载LakeSoul Flink Jar
+可以在 LakeSoul Release 页面下载: https://github.com/lakesoul-io/LakeSoul/releases/download/v2.4.0/lakesoul-flink-2.4.0-flink-1.17.jar.
+如果访问 Github 有问题,也可以通过这个链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-flink-2.5.0-flink-1.17.jar
+
+### 使用SQL Client
+```bash
+# Start Flink SQL Client
+bin/sql-client.sh embedded -j lakesoul-flink-2.4.0-flink-1.17.jar
+```
+## 创建表
+
+
+
+
+ ```Java
+ TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+ String createUserSql = "create table user_info (" +
+ "`id` INT," +
+ "name STRING," +
+ "score INT," +
+ "`date` STRING," +
+ "region STRING," +
+ " PRIMARY KEY (`id`,`name`) NOT ENFORCED"+
+ ") PARTITIONED BY (`region`,`date`)"+
+ " WITH (" +
+ " 'connector'='lakesoul'," +
+ " 'hashBucketNum'='4'," +
+ " 'use_cdc'='true'," +
+ " 'path'='/tmp/lakesoul/flink/sink/test' )";
+ tEnv. executeSql(createUserSql);
+ ```
+
+
+
+
+ ```sql
+ -- Create the test_table table, use id and name as the joint primary key, use region and date as the two-level range partition, catalog is lakesoul, and database is default
+ create table `lakesoul`.`default`.test_table (
+ `id` INT,
+ name STRING,
+ score INT,
+ `date` STRING,
+ region STRING,
+ PRIMARY KEY (`id`,`name`) NOT ENFORCED
+ ) PARTITIONED BY (`region`,`date`)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='4',
+ 'use_cdc'='true',
+ 'path'='file:///tmp/lakesoul/flink/sink/test');
+ ```
+
+
+
+
+
+:::tip
+建表语句中各个部分参数含义:
+
+| 参数 | 含义说明 | 参数填写格式 |
+| -------------- | ----------------------------------------------------------------------------------------- | --------------------------------------------- |
+| PARTITIONED BY | 用于指定表的 Range 分区字段,如果不存在 range 分区字段,则省略 | PARTITIONED BY (`date`) |
+| PRIMARY KEY | 用于指定表的主键,可以包含多个列 | PARIMARY KEY (`id`, `name`) NOT ENFORCED |
+| connector | 数据源连接器,用于指定数据源类型 | 'connector'='lakesoul' |
+| hashBucketNum | 有主键表必须设置哈希分片数 | 'hashBucketNum'='4' |
+| path | 用于指定表的存储路径 | 'path'='file:///tmp/lakesoul/flink/sink/test' |
+| use_cdc | 设置表是否为 CDC 格式 (参考 [CDC 表格式](../03-Usage%20Docs/04-cdc-ingestion-table.mdx) ) | 'use_cdc'='true' |
+
+:::
+## 删除表
+
+
+
+
+ ```java
+ tEnvs.executeSql("DROP TABLE if exists test_table");
+ ```
+
+
+
+
+ ```sql
+ DROP TABLE if exists test_table;
+ ```
+
+
+
+
+## 插入数据
+
+
+
+
+ ```java
+ tEnvs.executeSql("insert into `lakesoul`.`default`.test_table values (1, 'AAA', 98, '2023-05-10', 'China')"). await();
+ ```
+
+
+
+
+ 批式:直接插入数据
+ ```sql
+ insert into `lakesoul`.`default`.test_table values (1,'AAA', 98, '2023-05-10', 'China');
+ ```
+ 流式:构建流式任务,从另一个流式数据源中读取数据并写入到 LakeSoul 表中。如果上游数据是 CDC 的格式,则目标写入的 LakeSoul 表也需要设置为 CDC 表。
+ ```sql
+ -- 表示将`lakesoul`.`cdcsink`.soure_table表中的全部数据,插入到lakesoul`.`default`.test_table
+ insert into `lakesoul`.`default`.test_table select * from `lakesoul`.`cdcsink`.soure_table;
+ ```
+
+
+
+:::caution
+1. 对流、批写入,都必须开启 `execution.checkpointing.checkpoints-after-tasks-finish.enabled` 选项;
+2. 对流写入,需要设置 checkpoint 间隔,建议为 1 分钟以上;
+3. 根据环境设置相应的时区:
+
+```sql
+SET 'table.local-time-zone' = 'Asia/Shanghai';
+set 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';
+-- 设置 checkpointing 时间间隔
+set 'execution.checkpointing.interval' = '2min';
+```
+:::
+
+## 更新数据
+
+
+
+
+ ```java
+ tEnvs.executeSql("UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1") await();
+ ```
+
+
+
+
+ ```sql
+ UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1;
+ ```
+
+
+
+:::caution
+注意 `update` 情况下,不允许更新主键、分区列的值。
+对于流的执行模式,LakeSoul 已经能够支持 ChangeLog 语义,可以支持增删改。
+:::
+## Delete Data
+
+
+
+
+ ```java
+ tEnvs.executeSql("DELETE FROM `lakesoul`.`default`.test_table where id = 1") await();
+ ```
+
+
+
+
+ ```sql
+ DELETE FROM `lakesoul`.`default`.test_table where id = 1;
+ ```
+
+
+
+:::caution
+`delete` 情况下,不允许条件中带有分区列。
+对于流的执行模式,LakeSoul 已经能够支持 ChangeLog 语义,可以支持增删改。
+:::
+## 查询数据
+### 全量读
+
+
+
+
+ ```java
+ // Create a batch execution environment
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10'").print();
+ ```
+
+
+
+
+ ```sql
+ SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10';
+ ```
+
+
+
+### 快照读
+LakeSoul 支持对表执行快照读取,用户通过指定分区信息和结束时间戳,可以查询结束时间戳之前的所有数据。
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Execute snapshot read of test_table in the region=China partition, the end timestamp of the read is 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+### 增量范围读
+LakeSoul 支持对表执行范围增量读取,用户通过指定分区信息和起始时间戳、结束时间戳,可以查询这一时间范围内的增量数据。
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental','readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Incremental reading of test_table in the region=China partition, the read timestamp range is 2023-05-01 15:15:15 to 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental', 'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+### 流读
+LakeSoul 表支持在 Flink 执行流式读取,流式读基于增量读,用户通过指定起始时间戳和分区信息,可以连续不间断读取自起始时间戳以后的新增数据。若是不设置起始时间戳则从第一条数据读
+
+
+
+ ```java
+ tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
+ ```
+
+
+
+
+ ```sql
+ -- Incremental reading of test_table in the region=China partition, the time zone is Asia/Shanghai
+ SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China';
+ ```
+
+
+
+在流式读取时,LakeSoul 完整支持 Flink Changelog Stream 语义。对于 LakeSoul CDC 表,增量读取的结果仍然为 CDC 格式,即包含了 `insert`,`update`,`delete` 事件,这些事件会自动转为 Flink RowData 的 RowKind 字段的对应值,从而在 Flink 中实现了全链路的增量计算。
+
+### Lookup Join
+LakeSoul 表支持 Flink SQL 中的 Lookup Join 操作。Lookup Join 会将待 Join 的右表缓存在内存中,从而大幅提升 Join 速度,可以在较小维表关联的场景中使用以提升性能。
+LakeSoul 默认每隔 60 秒会尝试刷新缓存,这个间隔可以通过在创建维表时设置 `'lookup.join.cache.ttl'='60s'` 表属性来修改。
+
+```sql
+CREATE TABLE `lakesoul`.`default`.customers (
+ `c_id` INT,
+ `name` STRING,
+ PRIMARY KEY (`c_id`) NOT ENFORCED)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='1',
+ 'path'='file:///tmp/lakesoul/flink/sink/customers'
+ );
+CREATE TABLE `lakesoul`.`default`.orders (
+ `o_id` INT,
+ `o_c_id` INT,
+ PRIMARY KEY (`o_id`) NOT ENFORCED)
+ WITH (
+ 'connector'='lakesoul',
+ 'hashBucketNum'='1',
+ 'path'='file:///tmp/lakesoul/flink/sink/orders',
+ 'lookup.join.cache.ttl'='60s'
+ );
+SELECT `o_id`, `c_id`, `name`
+FROM
+(SELECT *, proctime() as proctime FROM `lakesoul`.`default`.orders) as o
+JOIN `lakesoul`.`default`.customers FOR SYSTEM_TIME AS OF o.proctime
+ON c_id = o_cid;
+```
+Orders 表需要与 Customers 表的数据进行 Lookup Join。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时,Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 o_c_id = c_id
+
+:::tip
+支持Flink按批式和流式读取lakesoul表,在Flink SQLClient客户端执行命令,切换流式和批式的执行模式。
+```sql
+-- 按照流式执行Flink任务
+SET execution.runtime-mode = streaming;
+SET 'execution.checkpointing.interval' = '1min';
+-- 按照批式执行Flink任务
+SET execution.runtime-mode = batch;
+```
+
+使用 Flink SQL,指定条件查询的格式为 `SELECT * FROM test_table /*+ OPTIONS('key'='value')*/ WHERE partition=xxx` 。在任何一种读的模式下,分区可以指定,也可以不指定,也可以只指定一部分分区值,LakeSoul 会自动匹配满足条件的分区。
+
+其中 `/* OPTIONS() */` 为查询选项(hints),必须要直接跟在表名的后面(在 where 等其他子句的前面),LakeSoul 读取时的 hint 选项包括:
+
+| 参数 | 含义说明 | 参数填写格式 |
+| ----------------- |---------------------------------------------| ------------------------------------- |
+| readtype | 读类型,可以指定增量读incremental,快照读snapshot,不指定默认全量读 | 'readtype'='incremental' |
+| discoveryinterval | 流式增量读的发现新数据时间间隔,单位毫秒,默认为 30000 | 'discoveryinterval'='10000' |
+| readstarttime | 起始读时间戳,如果未指定起始时间戳,则默认从起始版本号开始读取 | 'readstarttime'='2023-05-01 15:15:15' |
+| readendtime | 结束读时间戳,如果未指定结束时间戳,则默认读取到当前最新版本号 | 'readendtime'='2023-05-01 15:20:15' |
+| timezone | 时间戳的时区信息,如果不指定时间戳的时区信息,则默认为按本机时区处理 | 'timezone'='Asia/Sahanghai' |
+:::
\ No newline at end of file