Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebased PR #267 #1

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Install protocol buffer compiler
uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v1
- name: Build
run: cargo build
run: cargo build --features protobuf-src
- name: Check Clippy
run: cargo clippy --tests --all-features -- -D warnings
run: |
cargo clippy --tests --features telemetry,protobuf-src -- -D warnings
cargo clippy --tests --no-default-features --features compression,tokio-rustls-runtime,async-std-rustls-runtime,auth-oauth2,telemetry,protobuf-src -- -D warnings
- name: Install nightly rustfmt
run: rustup toolchain install nightly --component rustfmt
- name: Check format
Expand All @@ -29,17 +27,11 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
pulsar-version: [ 2.8.4.1, 2.9.3.11, 2.10.1.8 ]
pulsar-version: [ 2.10.4.3, 2.11.1.2, 3.0.0.1 ]
steps:
- name: Install protocol buffer compiler
uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Start Pulsar Standalone Container
run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true streamnative/pulsar:${{ matrix.pulsar-version }} /pulsar/bin/pulsar standalone
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v1
- name: Build
run: cargo build
- name: Run tests
run: cargo test -- --nocapture
run: cargo test --features protobuf-src -- --nocapture
9 changes: 0 additions & 9 deletions CONTRIBUTING.md

This file was deleted.

82 changes: 44 additions & 38 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,71 +1,77 @@
[package]
name = "pulsar"
version = "5.1.0"
version = "6.0.1"
edition = "2021"
authors = [
"Colin Stearns <cstearns@developers.wyyerd.com>",
"Kevin Stenerson <kstenerson@developers.wyyerd.com>",
"Geoffroy Couprie <contact@geoffroycouprie.com>",
]

license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
readme = "./README.md"
repository = "https://github.com/streamnative/pulsar-rs"
documentation = "https://docs.rs/pulsar"
description = "Rust client for Apache Pulsar"
keywords = ["pulsar", "api", "client"]

[dependencies]
bytes = "^1.2.1"
crc = "^3.0.0"
nom = { version="^7.1.1", default-features=false, features=["alloc"] }
prost = "^0.11.0"
prost-derive = "^0.11.0"
bytes = "^1.4.0"
crc = "^3.0.1"
nom = { version="^7.1.3", default-features=false, features=["alloc"] }
prost = "^0.11.9"
prost-derive = "^0.11.9"
rand = "^0.8.5"
chrono = "^0.4.22"
chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] }
futures-timer = "^3.0.2"
log = "^0.4.17"
url = "^2.3.1"
regex = "^1.6.0"
log = "^0.4.19"
url = "^2.4.0"
regex = "^1.9.1"
bit-vec = "^0.6.3"
futures = "^0.3.25"
futures-io = "^0.3.25"
native-tls = "^0.2.10"
pem = "^1.1.0"
tokio = { version = "^1.21.2", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.4", features = ["codec"], optional = true }
tokio-native-tls = { version = "^0.3.0", optional = true }
async-std = {version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true }
asynchronous-codec = { version = "^0.6.0", optional = true }
async-native-tls = { version = "^0.4.0", optional = true }
futures = "^0.3.28"
futures-io = "^0.3.28"
native-tls = { version = "^0.2.11", optional = true }
rustls = { version = "^0.21.6", optional = true }
webpki-roots = { version = "^0.25.1", optional = true }
pem = "^3.0.0"
tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.8", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.24.1", optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true }
asynchronous-codec = { version = "^0.6.2", optional = true }
async-rustls = { version = "^0.4.0", optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
lz4 = { version = "^1.24.0", optional = true }
flate2 = { version = "^1.0.24", optional = true }
zstd = { version = "^0.11.2", optional = true }
snap = { version = "^1.0.5", optional = true }
openidconnect = { version = "^2.4.0", optional = true }
oauth2 = { version = "^4.2.3", optional = true }
serde = { version = "^1.0.147", features = ["derive"], optional = true }
serde_json = { version = "^1.0.87", optional = true }
flate2 = { version = "^1.0.26", optional = true }
zstd = { version = "^0.12.4", optional = true }
snap = { version = "^1.1.0", optional = true }
openidconnect = { version = "^3.3.0", optional = true }
oauth2 = { version = "^4.4.1", optional = true }
serde = { version = "^1.0.175", features = ["derive"], optional = true }
serde_json = { version = "^1.0.103", optional = true }
tracing = { version = "^0.1.37", optional = true }
async-trait = "^0.1.58"
data-url = { version = "^0.2.0", optional = true }
uuid = {version = "^1.2.1", features = ["v4", "fast-rng"] }
async-trait = "^0.1.72"
data-url = { version = "^0.3.0", optional = true }
uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] }

[dev-dependencies]
serde = { version = "^1.0.145", features = ["derive"] }
serde_json = "^1.0.85"
env_logger = "^0.9.1"
tokio = { version = "^1.21.2", features = ["macros", "rt-multi-thread"] }
serde = { version = "^1.0.175", features = ["derive"] }
serde_json = "^1.0.103"
env_logger = "^0.10.0"
tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "^0.11.1"
prost-build = "^0.11.9"
protobuf-src = { version = "1.1.0", optional = true }

[features]
default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ]
compression = [ "lz4", "flate2", "zstd", "snap" ]
tokio-runtime = [ "tokio", "tokio-util", "tokio-native-tls" ]
async-std-runtime = [ "async-std", "asynchronous-codec", "async-native-tls" ]
tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ]
tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ]
async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ]
async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls", "webpki-roots" ]
auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ]
telemetry = ["tracing"]
protobuf-src = ["dep:protobuf-src"]
38 changes: 23 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
## pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/)
# pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/)

[![crates](https://img.shields.io/crates/v/pulsar.svg)](https://crates.io/crates/pulsar)
[![docs](https://img.shields.io/docsrs/pulsar)](https://docs.rs/pulsar)

[Documentation](https://docs.rs/pulsar)

This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with [Tokio](https://tokio.rs/) and [async-std](https://async.rs/).

Features:

- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup
- multi topic consumers (based on a regex or list)
- TLS connection
- configurable executor (Tokio or async-std)
- automatic reconnection with exponential back off
- message batching
- compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)
- telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features)
- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup;
- Multi topic consumers (based on a regex or list);
- TLS connection;
- Configurable executor (Tokio or async-std);
- Automatic reconnection with exponential back off;
- Message batching;
- Compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features);
- Telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features).

### Getting Started
## Getting Started

Add the following dependencies in your `Cargo.toml`:

Expand All @@ -34,7 +32,7 @@ Try out [examples](examples):
- [consumer](examples/consumer.rs)
- [reader](examples/reader.rs)

### Project Maintainers
## Project Maintainers

- [@CleverAkanoa](https://github.com/CleverAkanoa)
- [@DonghunLouisLee](https://github.com/DonghunLouisLee)
Expand All @@ -45,7 +43,7 @@ Try out [examples](examples):
- [@stearnsc](https://github.com/stearnsc)
- [@tisonkun](https://github.com/tisonkun)

### Contribution
## Contribution

This project welcomes your PR and issues. For example, refactoring, adding features, correcting English, etc.

Expand All @@ -55,8 +53,18 @@ Thanks to all the people who already contributed!
<img src="https://contributors-img.web.app/image?repo=streamnative/pulsar-rs" />
</a>

### License
## License

This library is licensed under the terms of both the MIT license and the Apache License (Version 2.0), and may include packages written by third parties which carry their own copyright notices and license terms.

See [LICENSE-APACHE](LICENSE-APACHE), [LICENSE-MIT](LICENSE-MIT), and [COPYRIGHT](COPYRIGHT) for details.

## History

This project is originally created by [@stearnsc](https://github.com/stearnsc) and others at [Wyyerd](https://github.com/wyyerd) at 2018. Later at 2022, the orginal creators [decided to transfer the repository to StreamNative](https://github.com/streamnative-oss/sn-pulsar-rs/issues/20).

Currently, this project is actively maintained under the StreamNative organization with a diverse [maintainers group](#project-maintainers).

## About StreamNative

Founded in 2019 by the original creators of Apache Pulsar, [StreamNative](https://streamnative.io/) is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with [StreamNative Cloud](https://streamnative.io/product), a fully managed service to help teams accelerate time-to-production.
39 changes: 39 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
self
}

/// add a certificate and private key to authenticate the client in TLS connections
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_identity(mut self, certificate: Vec<u8>, private_key: Vec<u8>) -> Self {
match &mut self.tls_options {
Some(tls) => {
tls.certificate = Some(certificate);
tls.private_key = Some(private_key);
}
None => {
self.tls_options = Some(TlsOptions {
certificate: Some(certificate),
private_key: Some(private_key),
..Default::default()
})
}
}
self
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
match &mut self.tls_options {
Expand Down Expand Up @@ -549,6 +568,26 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
Ok(self.with_certificate_chain(v))
}

/// add a certificate and private key to authenticate the client in TLS connections
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_identity_files<P: AsRef<std::path::Path>>(
self,
certificate_path: P,
private_key_path: P,
) -> Result<Self, std::io::Error> {
use std::io::Read;

let mut file = std::fs::File::open(certificate_path)?;
let mut certificate = vec![];
file.read_to_end(&mut certificate)?;

let mut file = std::fs::File::open(private_key_path)?;
let mut private_key = vec![];
file.read_to_end(&mut private_key)?;

Ok(self.with_identity(certificate, private_key))
}

/// creates the Pulsar client and connects it
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
Expand Down
Loading