Skip to content

Commit

Permalink
Update test cases to work with Flink 1.16
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Mar 1, 2023
1 parent b3846fa commit 27d4a84
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 141 deletions.
13 changes: 5 additions & 8 deletions testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,14 @@ Please checks whether the directories `lib`, `opt` are available.
You will see the following output, which means that the test has run successfully:
```
(.venv) (base) dianfu@B-7174MD6R-1908 testing % python3 -m unittest test_table_api.TableTests.test_scalar_function
Using %s as FLINK_HOME... /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink
Skipped download /Users/dianfu/code/src/github/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists.
/Users/dianfu/miniconda3/lib/python3.8/subprocess.py:946: ResourceWarning: subprocess 71018 is still running
_warn("subprocess %s is still running" % self.pid,
Using %s as FLINK_HOME... /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.7/site-packages/pyflink
Skipped download /Users/dianfu/code/src/github/pyflink-faq/testing/flink-python-1.16.1-tests.jar since it already exists.
/Users/dianfu/opt/miniconda3/lib/python3.7/subprocess.py:883: ResourceWarning: subprocess 27896 is still running
ResourceWarning, source=self)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
Downloading jar org.apache.flink:flink-table-planner_2.11:1.14.4:jar:tests
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/table/table_environment.py:538: DeprecationWarning: Deprecated in 1.10. Use create_table instead.
warnings.warn("Deprecated in 1.10. Use create_table instead.", DeprecationWarning)
.
----------------------------------------------------------------------
Ran 1 test in 32.746s
Ran 1 test in 15.313s
OK
```
2 changes: 1 addition & 1 deletion testing/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
setup(
name="pyflink_testing",
version="0.1",
install_requires=['apache-flink==1.14.4']
install_requires=['apache-flink==1.16.1']
)
55 changes: 31 additions & 24 deletions testing/test_table_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
from pyflink.table import DataTypes
import uuid

from pyflink.table import DataTypes, TableDescriptor, Schema
from pyflink.table.udf import udf

from test_utils import PyFlinkStreamTableTestCase, TestAppendSink, results
from test_utils import PyFlinkStreamTableTestCase


class TableTests(PyFlinkStreamTableTestCase):

def get_results(self, table_name):
gateway = get_gateway()
TestValuesTableFactory = gateway.jvm.org.apache.flink.table.planner.factories.TestValuesTableFactory
return TestValuesTableFactory.getResults(table_name)
@staticmethod
def generate_table_name(prefix="Table"):
return "{0}_{1}".format(prefix, str(uuid.uuid1()).replace("-", "_"))

def test_scalar_function(self):
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())

table_sink = TestAppendSink(
['a', 'b'],
[DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
sink_table = self.generate_table_name("Result")
self.t_env.execute_sql(f"""
CREATE TABLE {sink_table} (
a BIGINT,
b BIGINT
) WITH (
'connector' = 'test-sink'
)
""")

t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.select(t.a, add_one(t.a)) \
.execute_insert("Results").wait()
actual = results()
.execute_insert(sink_table).wait()
actual = self.get_results()
self.assert_equals(actual, ["+I[1, 2]", "+I[2, 3]", "+I[3, 4]"])

def test_sink_ddl(self):
def test_scalar_function_using_table_descriptor(self):
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())

self.t_env.execute_sql("""
CREATE TABLE Results(
a BIGINT,
b BIGINT
) with (
'connector' = 'values'
)
""")
sink_table = self.generate_table_name("Result")
self.t_env.create_table(
sink_table,
TableDescriptor.for_connector('test-sink')
.schema(Schema
.new_builder()
.column('a', DataTypes.BIGINT())
.column('b', DataTypes.BIGINT())
.build())
.build())

t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.select(t.a, add_one(t.a)) \
.execute_insert("Results").wait()
actual = self.get_results("Results")
.execute_insert(sink_table).wait()
actual = self.get_results()
self.assert_equals(actual, ["+I[1, 2]", "+I[2, 3]", "+I[3, 4]"])
120 changes: 12 additions & 108 deletions testing/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import unittest
from subprocess import check_output

from py4j.java_gateway import JavaObject, java_import
from py4j.java_gateway import JavaObject
from pyflink import pyflink_gateway_server
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, SinkFunction
from pyflink.find_flink_home import _find_flink_home
from pyflink.java_gateway import get_gateway
from pyflink.pyflink_gateway_server import on_windows
from pyflink.table import TableEnvironment, EnvironmentSettings, TableSink
from pyflink.table.types import _to_java_type
from pyflink.util import java_utils
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.util.java_utils import is_instance_of


Expand All @@ -50,7 +48,7 @@ def setUpClass(cls):

print("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])

testing_jars = [("org.apache.flink", "flink-python_2.11", "1.14.4", "tests")]
testing_jars = [("org.apache.flink", "flink-python", "1.16.1", "tests")]
download_testing_jars(testing_jars)

@classmethod
Expand Down Expand Up @@ -113,9 +111,9 @@ def construct_test_classpath():
return os.path.pathsep.join(test_jars)


pyflink_gateway_server.download_apache_avro = lambda *a, **b: ()
pyflink_gateway_server.construct_test_classpath = construct_test_classpath


###################### Test utilities for Table API & SQL ######################


Expand All @@ -127,110 +125,16 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase):
def setUp(self):
super(PyFlinkStreamTableTestCase, self).setUp()
self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
testing_jars = [("org.apache.flink", "flink-table-planner_2.11", "1.14.4", "tests")]
download_testing_jars(testing_jars)


class TestTableSink(TableSink):
"""
Base class for test table sink.
"""

_inited = False

def __init__(self, j_table_sink, field_names, field_types):
gateway = get_gateway()
j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = java_utils.to_jarray(
gateway.jvm.TypeInformation,
[_to_java_type(field_type) for field_type in field_types])
j_table_sink = j_table_sink.configure(j_field_names, j_field_types)
super(TestTableSink, self).__init__(j_table_sink)

@classmethod
def _ensure_initialized(cls):
if TestTableSink._inited:
return

gateway = get_gateway()
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestAppendSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestRetractSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestUpsertSink")
java_import(gateway.jvm, "org.apache.flink.table.legacyutils.RowCollector")

TestTableSink._inited = True


class TestAppendSink(TestTableSink):
"""
A test append table sink.
"""

def __init__(self, field_names, field_types):
TestTableSink._ensure_initialized()

gateway = get_gateway()
super(TestAppendSink, self).__init__(
gateway.jvm.TestAppendSink(), field_names, field_types)


class TestRetractSink(TestTableSink):
"""
A test retract table sink.
"""

def __init__(self, field_names, field_types):
TestTableSink._ensure_initialized()

gateway = get_gateway()
super(TestRetractSink, self).__init__(
gateway.jvm.TestRetractSink(), field_names, field_types)


class TestUpsertSink(TestTableSink):
"""
A test upsert table sink.
"""

def __init__(self, field_names, field_types, keys, is_append_only):
TestTableSink._ensure_initialized()

@staticmethod
def get_results():
"""
Retrieves the results from a retract table sink.
"""
gateway = get_gateway()
j_keys = gateway.new_array(gateway.jvm.String, len(keys))
for i in range(0, len(keys)):
j_keys[i] = keys[i]

super(TestUpsertSink, self).__init__(
gateway.jvm.TestUpsertSink(j_keys, is_append_only), field_names, field_types)


def results():
"""
Retrieves the results from an append table sink.
"""
return retract_results()


def retract_results():
"""
Retrieves the results from a retract table sink.
"""
gateway = get_gateway()
results = gateway.jvm.RowCollector.getAndClearValues()
return gateway.jvm.RowCollector.retractResults(results)


def upsert_results(keys):
"""
Retrieves the results from an upsert table sink.
"""
gateway = get_gateway()
j_keys = gateway.new_array(gateway.jvm.int, len(keys))
for i in range(0, len(keys)):
j_keys[i] = keys[i]

results = gateway.jvm.RowCollector.getAndClearValues()
return gateway.jvm.RowCollector.upsertResults(results, j_keys)
results = gateway.jvm.org.apache.flink.table.utils.TestingSinks.RowCollector.getAndClearValues()
return gateway.jvm\
.org.apache.flink.table.utils.TestingSinks.RowCollector.retractResults(results)


###################### Test utilities for DataStream API ######################
Expand Down

0 comments on commit 27d4a84

Please sign in to comment.