From d8974df03e39bea52ce4e19a27707686eb8b6f69 Mon Sep 17 00:00:00 2001 From: zhouzixin Date: Sat, 4 May 2024 19:33:59 +0800 Subject: [PATCH] Add Pulsar plugin --- CHANGELOG.md | 1 + docs/en/setup/Plugins.md | 1 + poetry.lock | 51 +++++++++- protocol | 2 +- pyproject.toml | 1 + skywalking/__init__.py | 2 + skywalking/plugins/sw_pulsar.py | 92 +++++++++++++++++++ tests/plugin/base.py | 6 +- tests/plugin/data/sw_pulsar/__init__.py | 16 ++++ .../plugin/data/sw_pulsar/docker-compose.yml | 90 ++++++++++++++++++ tests/plugin/data/sw_pulsar/expected.data.yml | 82 +++++++++++++++++ .../data/sw_pulsar/services/__init__.py | 16 ++++ .../data/sw_pulsar/services/consumer.py | 32 +++++++ .../data/sw_pulsar/services/producer.py | 47 ++++++++++ tests/plugin/data/sw_pulsar/test_pulsar.py | 36 ++++++++ 15 files changed, 470 insertions(+), 5 deletions(-) create mode 100644 skywalking/plugins/sw_pulsar.py create mode 100644 tests/plugin/data/sw_pulsar/__init__.py create mode 100644 tests/plugin/data/sw_pulsar/docker-compose.yml create mode 100644 tests/plugin/data/sw_pulsar/expected.data.yml create mode 100644 tests/plugin/data/sw_pulsar/services/__init__.py create mode 100644 tests/plugin/data/sw_pulsar/services/consumer.py create mode 100644 tests/plugin/data/sw_pulsar/services/producer.py create mode 100644 tests/plugin/data/sw_pulsar/test_pulsar.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 962083a3..c058b996 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Plugins: - Add neo4j plugin.(#312) + - Add pulsar plugin.(#337) - Fixes: - Fix unexpected 'No active span' IllegalStateError (#311) diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index ca0fc389..5866f72e 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) | [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*']; | `sw_neo4j` | | [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*']; Python >=3.7 - ['3.0.18', '3.1.*']; | `sw_psycopg` | | [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['2.9']; | `sw_psycopg2` | +| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python >=3.7 - ['3.3.0']; | `sw_pulsar` | | [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*']; | `sw_pymongo` | | [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0']; | `sw_pymysql` | | [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; | `sw_pyramid` | diff --git a/poetry.lock b/poetry.lock index 52f500af..d7dc9b44 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiofiles" @@ -1889,6 +1889,53 @@ pytz = "*" numpy = ["numpy (>=1.7.0,<2.0.0)"] pandas = ["numpy (>=1.7.0,<2.0.0)", "pandas (>=1.1.0,<3.0.0)"] +[[package]] +name = "pulsar-client" +version = "3.3.0" +description = "Apache Pulsar Python client library" +optional = false +python-versions = "*" +files = [ + {file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"}, + {file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"}, + {file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"}, + {file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"}, + {file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"}, + {file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"}, + {file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"}, + {file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"}, +] + +[package.dependencies] +certifi = "*" + +[package.extras] +all = ["apache-bookkeeper-client (>=4.16.1)", "fastavro (==1.7.3)", "grpcio (>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"] +avro = ["fastavro (==1.7.3)"] +functions = ["apache-bookkeeper-client (>=4.16.1)", "grpcio (>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"] + [[package]] name = "packaging" version = "23.1" @@ -3683,4 +3730,4 @@ sync = ["kafka-python", "requests"] [metadata] lock-version = "2.0" python-versions = ">=3.7, <3.12" -content-hash = "016644168e470e2904bc0a4109c0218c7b9ecf0890d17a32e28aa81bcda0e8d0" +content-hash = "9b56b7ae8caf278da134fc9a6539df6134dc28d2c7d4c9736c0e29da51592685" diff --git a/protocol b/protocol index 9b2f4a5f..cc8aa8ca 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 9b2f4a5fb5694381924674d6c15cbead6a388d97 +Subproject commit cc8aa8ca4ee53a44487982a55762f72825824d73 diff --git a/pyproject.toml b/pyproject.toml index 734b0589..031670cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ kafka-python = { version = "*", optional = true } uvloop = { version = "^0.17.0", optional = true } aiokafka = { version = "^0.8.0", optional = true } aiohttp = { version = "^3.7.4", optional = true } +pulsar-client = "3.3.0" [tool.poetry.extras] all=[ diff --git a/skywalking/__init__.py b/skywalking/__init__.py index 1ed0c2b7..ec1dc093 100644 --- a/skywalking/__init__.py +++ b/skywalking/__init__.py @@ -36,6 +36,8 @@ class Component(Enum): KafkaConsumer = 41 RabbitmqProducer = 52 RabbitmqConsumer = 53 + PulsarProducer = 73 + PulsarConsumer = 74 Elasticsearch = 47 HBase = 94 Neo4j = 112 diff --git a/skywalking/plugins/sw_pulsar.py b/skywalking/plugins/sw_pulsar.py new file mode 100644 index 00000000..27c9570a --- /dev/null +++ b/skywalking/plugins/sw_pulsar.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from skywalking import Layer, Component +from skywalking.trace.carrier import Carrier +from skywalking.trace.context import get_context +from skywalking.trace.tags import TagMqTopic + +link_vector = ['https://github.com/apache/pulsar-client-python'] +support_matrix = { + 'pulsar-client': { + '>=3.7': ['3.3.0'] + } +} +note = """""" + + +def install(): + from pulsar import Producer + from pulsar import Consumer + + _send = Producer.send + _receive = Consumer.receive + Producer.send = _sw_send_func(_send) + Consumer.receive = _sw_receive_func(_receive) + + +def _sw_send_func(_send): + def _sw_send(this, content, + properties=None, + partition_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): + topic = this._producer.topic().split('/')[-1] + peer = getattr(this._producer, 'service_url', '') + with get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', peer=peer, + component=Component.PulsarProducer) as span: + span.tag(TagMqTopic(topic)) + span.layer = Layer.MQ + + carrier = span.inject() + if properties is None: + properties = {} + for item in carrier: + properties[item.key] = item.val + + return _send(this, content, properties=properties, partition_key=partition_key, + sequence_id=sequence_id, replication_clusters=replication_clusters, + disable_replication=disable_replication, event_timestamp=event_timestamp, + deliver_at=deliver_at, deliver_after=deliver_after) + + return _sw_send + + +def _sw_receive_func(_receive): + def _sw_receive(this, timeout_millis=None): + res = _receive(this, timeout_millis=timeout_millis) + if res: + topic = res.topic_name().split('/')[-1] + properties = res.properties() + carrier = Carrier() + for item in carrier: + if item.key in properties.keys(): + val = res.properties().get(item.key) + if val is not None: + item.val = val + + with get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', carrier=carrier) as span: + span.tag(TagMqTopic(topic)) + span.layer = Layer.MQ + span.component = Component.PulsarConsumer + return res + + return _sw_receive diff --git a/tests/plugin/base.py b/tests/plugin/base.py index 64634e38..e4c3512c 100644 --- a/tests/plugin/base.py +++ b/tests/plugin/base.py @@ -42,16 +42,18 @@ def validate(self, expected_file_name=None): expected_data = os.linesep.join(expected_data_file.readlines()) response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data) - + print('first response.status_code = ', response.status_code) if response.status_code != 200: # heuristically retry once time.sleep(10) response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data) + print('second response.status_code = ', response.status_code) if response.status_code != 200: res = requests.get('http://localhost:12800/receiveData') - + print('receiveData: ', res) actual_data = yaml.dump(yaml.load(res.content, Loader=Loader)) + print('actual_data: ', actual_data) differ = Differ() diff_list = list(differ.compare( diff --git a/tests/plugin/data/sw_pulsar/__init__.py b/tests/plugin/data/sw_pulsar/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/data/sw_pulsar/docker-compose.yml b/tests/plugin/data/sw_pulsar/docker-compose.yml new file mode 100644 index 00000000..93111206 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/docker-compose.yml @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../../docker-compose.base.yml + + pulsar-server: + image: apachepulsar/pulsar:3.2.0 + hostname: pulsar-server + ports: + - 6650:6650 + - 8080:8080 + networks: + - beyond + command: ["bash","-c", "bin/pulsar standalone"] + healthcheck: + test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"] + interval: 5s + timeout: 60s + retries: 120 + + producer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/producer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + pulsar-server: + condition: service_healthy + consumer: + condition: service_healthy + environment: + SW_AGENT_NAME: producer + SW_AGENT_LOGGING_LEVEL: DEBUG + + consumer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep -v grep"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + pulsar-server: + condition: service_healthy + environment: + SW_AGENT_NAME: consumer + SW_AGENT_LOGGING_LEVEL: DEBUG + +networks: + beyond: \ No newline at end of file diff --git a/tests/plugin/data/sw_pulsar/expected.data.yml b/tests/plugin/data/sw_pulsar/expected.data.yml new file mode 100644 index 00000000..f1116f55 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/expected.data.yml @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: producer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: Pulsar/Topic/sw-topic/Producer + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + tags: + - key: mq.topic + value: sw-topic + startTime: gt 0 + endTime: gt 0 + componentId: 73 + spanType: Exit + peer: '' + skipAnalysis: false + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: http.url + value: http://0.0.0.0:9090/users + - key: http.status_code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: Pulsar/Topic/sw-topic/Consumer + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + tags: + - key: mq.topic + value: sw-topic + refs: + - parentEndpoint: Pulsar/Topic/sw-topic/Producer + networkAddress: '' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: producer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 74 + spanType: Entry + peer: '' + skipAnalysis: false diff --git a/tests/plugin/data/sw_pulsar/services/__init__.py b/tests/plugin/data/sw_pulsar/services/__init__.py new file mode 100644 index 00000000..b1312a09 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/data/sw_pulsar/services/consumer.py b/tests/plugin/data/sw_pulsar/services/consumer.py new file mode 100644 index 00000000..fc444b12 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/consumer.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if __name__ == '__main__': + import pulsar + + client = pulsar.Client(service_url='pulsar://pulsar-server:6650') + consumer = client.subscribe('sw-topic', 'sw-subscription') + + while True: + try: + msg = consumer.receive() + print('Received message = ', str(msg.data().decode('utf-8')), '|message_id = ', msg.message_id()) + consumer.acknowledge(msg) + except pulsar.Interrupted: + break + + client.close() diff --git a/tests/plugin/data/sw_pulsar/services/producer.py b/tests/plugin/data/sw_pulsar/services/producer.py new file mode 100644 index 00000000..03fbf6a3 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/producer.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +if __name__ == '__main__': + from flask import Flask, jsonify + import pulsar + from pulsar import BatchingType + + app = Flask(__name__) + client = pulsar.Client(service_url='pulsar://pulsar-server:6650') + producer = client.create_producer( + 'sw-topic', + block_if_queue_full=True, + batching_enabled=True, + batching_max_publish_delay_ms=10, + batching_type=BatchingType.KeyBased + ) + + + @app.route('/users', methods=['POST', 'GET']) + def application(): + try: + producer.send('I love skywalking 3 thousand'.encode('utf-8'), None) + producer.flush() + producer.close() + print("my|producer send ok") + except Exception as e: + print('Failed to send message: %s', e) + return jsonify({'song': 'Nocturne', 'artist': 'Jay Chou'}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/data/sw_pulsar/test_pulsar.py b/tests/plugin/data/sw_pulsar/test_pulsar.py new file mode 100644 index 00000000..59767e8e --- /dev/null +++ b/tests/plugin/data/sw_pulsar/test_pulsar.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable + +import pytest +import requests + +from skywalking.plugins.sw_pulsar import support_matrix +from tests.orchestrator import get_test_vector +from tests.plugin.base import TestPluginBase + + +@pytest.fixture +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5) + + +class TestPlugin(TestPluginBase): + @pytest.mark.parametrize('version', get_test_vector(lib_name='pulsar-client', support_matrix=support_matrix)) + def test_plugin(self, docker_compose, version): + self.validate()