Skip to content

Commit

Permalink
Add Pulsar plugin (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
CodePrometheus authored Jun 29, 2024
1 parent 32fbd35 commit 02dc53c
Show file tree
Hide file tree
Showing 17 changed files with 485 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
#
[submodule "protocol"]
path = protocol
url = https://github.com/apache/skywalking-data-collect-protocol
url = https://github.com/apache/skywalking-data-collect-protocol.git
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- Plugins:
- Add neo4j plugin.(#312)
- Add pulsar plugin.(#345)

- Fixes:
- Fix unexpected 'No active span' IllegalStateError (#311)
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ poetry:
ifeq ($(OS),Windows)
-powershell (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | py -
poetry self update
else
else ifeq ($(OS),Darwin)
-curl -sSL https://install.python-poetry.org | python3 -
poetry self update || $(MAKE) poetry-fallback
else
-curl -sSL https://install.python-poetry.org | python3 - --version 1.5.1
endif

.PHONY: gen
Expand Down
1 change: 1 addition & 0 deletions docs/en/setup/Plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!)
| [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*']; | `sw_neo4j` |
| [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*']; Python >=3.7 - ['3.0.18', '3.1.*']; | `sw_psycopg` |
| [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['2.9']; | `sw_psycopg2` |
| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python >=3.8 - ['3.3.0']; | `sw_pulsar` |
| [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*']; | `sw_pymongo` |
| [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0']; | `sw_pymysql` |
| [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; | `sw_pyramid` |
Expand Down
2 changes: 1 addition & 1 deletion docs/en/setup/advanced/LogReporter.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Log reporter supports all three protocols including `grpc`, `http` and `kafka`,
If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`.

If chosen `kafka` protocol, please make sure to config
[kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/)
[kafka-fetcher](https://skywalking.apache.org/docs/main/v10.0.1/en/setup/backend/kafka-fetcher/)
on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers.

**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`**
Expand Down
49 changes: 48 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocol
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ loguru = "^0.6.0"
httpx = "^0.23.3"
confluent-kafka = "^2.0.2"
neo4j = "^5.9.0"
pulsar-client = "3.3.0"

[tool.poetry.group.lint.dependencies]
pylint = '2.13.9'
Expand Down
2 changes: 2 additions & 0 deletions skywalking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Component(Enum):
KafkaConsumer = 41
RabbitmqProducer = 52
RabbitmqConsumer = 53
PulsarProducer = 73
PulsarConsumer = 74
Elasticsearch = 47
HBase = 94
Neo4j = 112
Expand Down
107 changes: 107 additions & 0 deletions skywalking/plugins/sw_pulsar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import Layer, Component
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import TagMqTopic, TagMqBroker

link_vector = ['https://github.com/apache/pulsar-client-python']
support_matrix = {
'pulsar-client': {
'>=3.8': ['3.3.0']
}
}
note = """"""


def install():
from pulsar import Producer
from pulsar import Consumer
from pulsar import Client

__init = Client.__init__
_send = Producer.send
_receive = Consumer.receive
_peer = ''

def get_peer():
return _peer

def set_peer(value):
nonlocal _peer
_peer = value

def _sw_init(self, service_url):
__init(self, service_url)
set_peer(service_url)

def _sw_send_func(_send):
def _sw_send(this, content,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
topic = this._producer.topic().split('/')[-1]
with get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', peer=get_peer(),
component=Component.PulsarProducer) as span:
span.tag(TagMqTopic(topic))
span.tag(TagMqBroker(get_peer()))
span.layer = Layer.MQ

carrier = span.inject()
if properties is None:
properties = {}
for item in carrier:
properties[item.key] = item.val

return _send(this, content, properties=properties, partition_key=partition_key,
sequence_id=sequence_id, replication_clusters=replication_clusters,
disable_replication=disable_replication, event_timestamp=event_timestamp,
deliver_at=deliver_at, deliver_after=deliver_after)

return _sw_send

def _sw_receive_func(_receive):
def _sw_receive(this, timeout_millis=None):
res = _receive(this, timeout_millis=timeout_millis)
if res:
topic = res.topic_name().split('/')[-1]
properties = res.properties()
carrier = Carrier()
for item in carrier:
if item.key in properties.keys():
val = res.properties().get(item.key)
if val is not None:
item.val = val

with get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', carrier=carrier) as span:
span.tag(TagMqTopic(topic))
span.tag(TagMqBroker(get_peer()))
span.layer = Layer.MQ
span.component = Component.PulsarConsumer
return res

return _sw_receive

Client.__init__ = _sw_init
Producer.send = _sw_send_func(_send)
Consumer.receive = _sw_receive_func(_receive)
16 changes: 16 additions & 0 deletions tests/plugin/data/sw_pulsar/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
90 changes: 90 additions & 0 deletions tests/plugin/data/sw_pulsar/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

version: '2.1'

services:
collector:
extends:
service: collector
file: ../../docker-compose.base.yml

pulsar-server:
image: apachepulsar/pulsar:3.2.0
hostname: pulsar-server
ports:
- 6650:6650
- 8080:8080
networks:
- beyond
command: ["bash","-c", "bin/pulsar standalone"]
healthcheck:
test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"]
interval: 5s
timeout: 60s
retries: 120

producer:
extends:
service: agent
file: ../../docker-compose.base.yml
ports:
- 9090:9090
volumes:
- .:/app
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/producer.py']
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
pulsar-server:
condition: service_healthy
consumer:
condition: service_healthy
environment:
SW_AGENT_NAME: producer
SW_AGENT_LOGGING_LEVEL: INFO

consumer:
extends:
service: agent
file: ../../docker-compose.base.yml
ports:
- 9091:9091
volumes:
- .:/app
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py']
healthcheck:
test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep -v grep"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
pulsar-server:
condition: service_healthy
environment:
SW_AGENT_NAME: consumer
SW_AGENT_LOGGING_LEVEL: INFO

networks:
beyond:
Loading

0 comments on commit 02dc53c

Please sign in to comment.