Skip to content

Commit

Permalink
Bump RabbitMQ adaptor version (#40)
Browse files Browse the repository at this point in the history
* Bump RabbitMQ adaptor version

* Use adaptor 0.3.3 version

* Fix serialization format
  • Loading branch information
mexskican authored Sep 27, 2019
1 parent a5924f3 commit f8c5cf7
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 93 deletions.
222 changes: 153 additions & 69 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 6 additions & 13 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,9 @@ usage! {
"Specify custom API set available via JSON-RPC over IPC using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",

["API and Console Options – RabbitMQ"]
ARG arg_rabbitmq_hostname: (String) = "localhost", or |c: &Config| c.rabbitmq.as_ref()?.hostname.clone(),
"--rabbitmq-hostname=[IP]",
"Specify the RabbitMQ server hostname",

ARG arg_rabbitmq_port: (u16) = 5672u16, or |c: &Config| c.rabbitmq.as_ref()?.port.clone(),
"--rabbitmq-port=[PORT]",
"Specify the RabbitMQ server port",
ARG arg_rabbitmq_uri: (String) = "amqp://localhost:5672", or |c: &Config| c.rabbitmq.as_ref()?.uri.clone(),
"--rabbitmq-uri=[URI]",
"Specify the RabbitMQ server uri",

["API and Console Options – IPFS"]
FLAG flag_ipfs_api: (bool) = false, or |c: &Config| c.ipfs.as_ref()?.enable.clone(),
Expand Down Expand Up @@ -1281,8 +1277,7 @@ struct Ipc {
#[derive(Default, Debug, PartialEq, Deserialize)]
#[serde(deny_unknown_fields)]
struct RabbitMQ {
hostname: Option<String>,
port: Option<u16>,
uri: Option<String>,
}

#[derive(Default, Debug, PartialEq, Deserialize)]
Expand Down Expand Up @@ -1750,8 +1745,7 @@ mod tests {
arg_ipc_apis: "web3,eth,net,parity,parity_accounts,personal,traces,rpc,secretstore".into(),

// RabbitMQ
arg_rabbitmq_hostname: "localhost".into(),
arg_rabbitmq_port: 5672u16,
arg_rabbitmq_uri: "amqp://localhost:5672".into(),

// DAPPS
arg_dapps_path: Some("$HOME/.parity/dapps".into()),
Expand Down Expand Up @@ -2025,8 +2019,7 @@ mod tests {
apis: Some(vec!["rpc".into(), "eth".into()]),
}),
rabbitmq: Some(RabbitMQ {
hostname: Some("localhost".into()),
port: Some(5672),
uri: Some("amqp://localhost:5672".into()),
}),
dapps: Some(Dapps {
_legacy_disable: None,
Expand Down
3 changes: 1 addition & 2 deletions parity/cli/tests/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ port = 8180
apis = ["rpc", "eth"]

[rabbitmq]
hostname = "localhost"
port = 5672
uri = "amqp://localhost:5672"

[dapps]
port = 8080
Expand Down
6 changes: 2 additions & 4 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,7 @@ impl Configuration {

fn rabbitmq_config(&self) -> RabbitMqConfig {
RabbitMqConfig {
hostname: self.args.arg_rabbitmq_hostname.clone(),
port: self.args.arg_rabbitmq_port
uri: self.args.arg_rabbitmq_uri.clone()
}
}

Expand Down Expand Up @@ -1423,8 +1422,7 @@ mod tests {
ipc_conf: Default::default(),
net_conf: default_network_config(),
rabbitmq_conf: RabbitMqConfig {
hostname: "localhost".into(),
port: 5672,
uri: "amqp://localhost:5672".into(),
},
network_id: None,
warp_sync: true,
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ common-types = { path = "../ethcore/types" }
ethereum-types = "0.4"
futures-executor = "0.2.1"
parity-runtime = { path = "../util/runtime" }
rabbitmq_adaptor = {version = "0.3.1", registry = "chronicled-platform-v2"}
rabbitmq_adaptor = {version = "0.3.3", registry = "chronicled-platform-v2"}
rlp = { version = "0.3.0", features = ["ethereum"] }

[dev-dependencies]
Expand Down
8 changes: 4 additions & 4 deletions rabbitmq/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use failure::{format_err, Error};
use futures::future::{err, lazy};
use handler::{Handler, Sender};
use parity_runtime::Executor;
use rabbitmq_adaptor::{ConsumerResult, DeliveryExt, RabbitConnection, RabbitExt};
use rabbitmq_adaptor::{ConfigUri, ConsumerResult, DeliveryExt, RabbitConnection, RabbitExt};
use serde::Deserialize;
use serde_json;
use std::sync::Arc;
Expand All @@ -29,8 +29,7 @@ use TX_ERROR_ROUTING_KEY;

#[derive(Debug, Default, Clone, PartialEq)]
pub struct RabbitMqConfig {
pub hostname: String,
pub port: u16,
pub uri: String,
}

/// Eth PubSub implementation.
Expand Down Expand Up @@ -67,9 +66,10 @@ impl<C: 'static + miner::BlockChainClient + BlockChainClient> PubSubClient<C> {
) -> Result<Self, Error> {
let (sender, receiver) = channel::<Vec<u8>>(DEFAULT_CHANNEL_SIZE);
let sender_handler = Box::new(Sender::new(client.clone(), miner.clone()));
let config_uri = ConfigUri::Uri(config.uri);

executor.spawn(lazy(move || {
let rabbit = RabbitConnection::new(&config.hostname, config.port, DEFAULT_REPLY_QUEUE);
let rabbit = RabbitConnection::new(config_uri, None, DEFAULT_REPLY_QUEUE);
// Consume to public transaction messages
try_spawn(
rabbit
Expand Down

0 comments on commit f8c5cf7

Please sign in to comment.