Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36188] Fix disable buffer flush lose efficacy #49

Merged

Conversation

MOBIN-F
Copy link
Contributor

@MOBIN-F MOBIN-F commented Aug 30, 2024

The user lookup joins the hbase table, adds 1 to the col value, and writes it back to hbase

@Test
void testTableSinkDisabledBufferFlush() throws Exception {
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);        
            tEnv.executeSql(
                "CREATE TABLE hTableForSink ("
                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                        + " family1 ROW<col1 INT>"
                        + ") WITH ("
                        + " 'connector' = 'hbase-2.2',"
                        + " 'sink.buffer-flush.max-size' = '0',"
                        + " 'sink.buffer-flush.max-rows' = '0',"
                        + " 'table-name' = '"
                        + TEST_TABLE_6
                        + "',"
                        + " 'zookeeper.quorum' = '"
                        + getZookeeperQuorum()
                        + "'"
                        + ")");       
            String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
            tEnv.executeSql(insert).await();        

            tEnv.executeSql(
                "CREATE VIEW user_click AS "
                        + " SELECT user_id, proctime() AS proc_time"
                        + " FROM ( "
                        + " VALUES(1), (1), (1), (1), (1)"
                        + " ) AS t (user_id);");    
    
            tEnv.executeSql(
                "INSERT INTO hTableForSink SELECT "
                        + "    user_id as rowkey,"
                        + "    ROW(CAST(family1.col1 + 1 AS INT))"
                        + " FROM user_click INNER JOIN hTableForSink"
                        + " FOR SYSTEM_TIME AS OF user_click.proc_time"
                        + " ON hTableForSink.rowkey = user_click.user_id;");        

            tEnv.executeSql(
                "CREATE TABLE hTableForQuery ("
                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                        + " family1 ROW<col1 INT>"
                        + ") WITH ("
                        + " 'connector' = 'hbase-2.2',"
                        + " 'table-name' = '"
                        + TEST_TABLE_6
                        + "',"
                        + " 'zookeeper.quorum' = '"
                        + getZookeeperQuorum()
                        + "'"
                        + ")");
        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";        
            TableResult firstResult = tEnv.executeSql(query);
        List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect());
        String firstExpected = "+I[1, 6]";
        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
    } 

test failed

org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements and received 1
 expected: [+I[1, 6]]
 received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
Expected :+I[1, 6]
Actual   :+I[1, 2] 

@MOBIN-F MOBIN-F changed the title [FLINK-36188] fix disable buffer flush lose efficacy [FLINK-36188] Fix disable buffer flush lose efficacy Aug 30, 2024
@ferenc-csaky
Copy link
Contributor

Thanks for submitting this fix! Pls. check the test the change broke. And since new changes will be necessary anyways, pls. reword the commit msg as well to contain the jira key to make "Title Validator" happy.

@MOBIN-F MOBIN-F force-pushed the fix-disable-buffer-flush-lose-efficacy branch from ffc62d7 to be126cf Compare August 30, 2024 11:25
@MOBIN-F
Copy link
Contributor Author

MOBIN-F commented Sep 2, 2024

Thanks for submitting this fix! Pls. check the test the change broke. And since new changes will be necessary anyways, pls. reword the commit msg as well to contain the jira key to make "Title Validator" happy.

done,Could you please review it again? thank you @ferenc-csaky

@MOBIN-F
Copy link
Contributor Author

MOBIN-F commented Sep 23, 2024

@MartijnVisser can you help to trigger a CI run, thanks~

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@ferenc-csaky ferenc-csaky merged commit c6eb87f into apache:main Sep 25, 2024
9 checks passed
Copy link

boring-cyborg bot commented Sep 25, 2024

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants