From 902fce4cc0221627104a3e046174fd99df5a72cc Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Wed, 10 Apr 2019 09:58:28 +0200 Subject: [PATCH] Drop device provider concept --- .../hono/simulator/http/Application.java | 159 +++++++++---- .../dentrassi/hono/simulator/http/Device.java | 159 +++++-------- .../hono/simulator/http/DeviceProvider.java | 26 -- .../hono/simulator/http/Response.java | 25 -- .../http/provider/DefaultProvider.java | 53 ----- .../simulator/http/provider/VertxDevice.java | 225 ------------------ ...ntrassi.hono.simulator.http.DeviceProvider | 1 - 7 files changed, 165 insertions(+), 483 deletions(-) delete mode 100644 simulator-http/src/main/java/de/dentrassi/hono/simulator/http/DeviceProvider.java delete mode 100644 simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Response.java delete mode 100644 simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/DefaultProvider.java delete mode 100644 simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/VertxDevice.java delete mode 100644 simulator-http/src/main/resources/META-INF/services/de.dentrassi.hono.simulator.http.DeviceProvider diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Application.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Application.java index 035c5fa..bb99430 100644 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Application.java +++ b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Application.java @@ -10,38 +10,37 @@ *******************************************************************************/ package de.dentrassi.hono.simulator.http; +import static de.dentrassi.hono.demo.common.Select.oneOf; +import static io.glutamate.lang.Environment.consumeAs; import static io.glutamate.lang.Environment.getAs; import java.util.Random; -import java.util.ServiceLoader; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import de.dentrassi.hono.demo.common.AppRuntime; import de.dentrassi.hono.demo.common.ProducerConfig; import de.dentrassi.hono.demo.common.DeadlockDetector; -import de.dentrassi.hono.demo.common.EventWriter; import de.dentrassi.hono.demo.common.Payload; import de.dentrassi.hono.demo.common.Register; import de.dentrassi.hono.demo.common.Tenant; import de.dentrassi.hono.demo.common.Tls; +import io.glutamate.lang.Environment; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import io.netty.handler.ssl.OpenSsl; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.OpenSSLEngineOptions; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import okhttp3.Credentials; public class Application { - private static final Logger logger = LoggerFactory.getLogger(Application.class); - private static final ProducerConfig config = ProducerConfig.fromEnv(); - private static final String DEVICE_PROVIDER = System.getenv().getOrDefault("DEVICE_PROVIDER", "VERTX"); + private static final String METHOD = Environment.get("HTTP_METHOD").orElse("PUT"); + + private static final boolean NOAUTH = Environment.getAs("HTTP_NOAUTH", false, Boolean::parseBoolean); public static void main(final String[] args) throws Exception { @@ -54,9 +53,6 @@ public static void main(final String[] args) throws Exception { } private static void runSimulator(final AppRuntime runtime) throws InterruptedException { - logger.info("Using Device implementation: {}", DEVICE_PROVIDER); - - final DeviceProvider provider = locateProvider(); final int numberOfDevices = getAs("NUM_DEVICES", 10, Integer::parseInt); final int numberOfThreads = getAs("NUM_THREADS", 10, Integer::parseInt); @@ -80,58 +76,117 @@ private static void runSimulator(final AppRuntime runtime) throws InterruptedExc final Tags commonTags = Tags.of( Tag.of("protocol", "http"), Tag.of("tenant", Tenant.TENANT), - config.getType().asTag() - ); + config.getType().asTag()); final Statistics stats = new Statistics(registry, commonTags); - final ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor(); - final TickExecutor tickExecutor = new TickExecutor(); - final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); - final Random r = new Random(); - try { + final var webClient = createWebClient(runtime.getVertx()); + + for (int i = 0; i < numberOfDevices; i++) { + + final String username = String.format("user-%s-%s", deviceIdPrefix, i); + final String deviceId = String.format("%s-%s", deviceIdPrefix, i); - for (int i = 0; i < numberOfDevices; i++) { + final var request = createRequest(webClient, config, Payload.payload(), Tenant.TENANT, deviceIdPrefix, + username, "hono-secret"); - final String username = String.format("user-%s-%s", deviceIdPrefix, i); - final String deviceId = String.format("%s-%s", deviceIdPrefix, i); + final Device device = new Device(() -> request, username, deviceId, Tenant.TENANT, + "hono-secret", register, Payload.payload(), stats); - final Device device = provider.createDevice(executor, username, deviceId, Tenant.TENANT, - "hono-secret", register, Payload.payload(), stats, - EventWriter.nullWriter()); + tickExecutor.scheduleAtFixedRate(device::tick, r.nextInt((int) config.getPeriod().toMillis()), + config.getPeriod().toMillis()); - final Supplier> ticker; - switch (config.getType()) { - case EVENT: - ticker = device::tickEvent; - break; - default: - ticker = device::tickTelemetry; - break; - } + } + + Thread.sleep(Long.MAX_VALUE); + + } + + private static WebClient createWebClient(final Vertx vertx) { + final WebClientOptions clientOptions = new WebClientOptions(); - tickExecutor.scheduleAtFixedRate(ticker, r.nextInt((int) config.getPeriod().toMillis()), - config.getPeriod().toMillis()); + consumeAs("VERTX_KEEP_ALIVE", Boolean::parseBoolean, clientOptions::setKeepAlive); + consumeAs("VERTX_MAX_POOL_SIZE", Integer::parseInt, clientOptions::setMaxPoolSize); + consumeAs("VERTX_POOLED_BUFFERS", Boolean::parseBoolean, clientOptions::setUsePooledBuffers); - } + clientOptions.setConnectTimeout(getAs("VERTX_CONNECT_TIMEOUT", 5_000, Integer::parseInt)); + clientOptions.setIdleTimeout(getAs("VERTX_IDLE_TIMEOUT", 5_000, Integer::parseInt)); - Thread.sleep(Long.MAX_VALUE); - } finally { - executor.shutdown(); - statsExecutor.shutdown(); + if (Tls.insecure()) { + clientOptions.setVerifyHost(false); + clientOptions.setTrustAll(true); } + + if (vertx.isNativeTransportEnabled() + && OpenSsl.isAvailable() + && OpenSsl.supportsKeyManagerFactory() + && OpenSsl.supportsHostnameValidation()) { + clientOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions()); + } + + return WebClient.create(vertx, clientOptions); } - private static DeviceProvider locateProvider() { - for (final DeviceProvider provider : ServiceLoader.load(DeviceProvider.class)) { - if (provider.getName().equals(DEVICE_PROVIDER)) { - return provider; - } + private static HttpRequest createRequest(final WebClient client, final ProducerConfig config, + final Payload payload, final String tenant, final String deviceId, final String user, + final String password) { + + final var auth = Credentials.basic(user + "@" + tenant, password); + final var url = buildUrl(config, tenant, deviceId); + + final HttpRequest request; + + switch (METHOD) { + case "POST": + request = client.postAbs(url); + break; + default: + request = client.putAbs(url); + break; } - throw new IllegalArgumentException(String.format("Unable to find device provider: '%s'", DEVICE_PROVIDER)); + + if (!NOAUTH) { + request.putHeader("Authorization", auth); + } + + request.putHeader("Content-Type", payload.getContentType()); + + return request; } + private static String buildUrl(final ProducerConfig config, final String tenant, final String deviceId) { + + final String url = oneOf(System.getenv("HONO_HTTP_URL")); + if (url == null || url.isBlank()) { + throw new IllegalArgumentException("'HONO_HTTP_URL' is missing or blank"); + } + + final var builder = new StringBuilder(url); + + if (!url.endsWith("/")) { + builder.append("/"); + } + + switch (config.getType()) { + case EVENT: + builder.append("event"); + break; + case TELEMETRY: + builder.append("telemetry"); + break; + } + + switch (METHOD) { + case "POST": + break; + default: + builder.append("/").append(tenant).append("/").append(deviceId); + break; + } + + return builder.toString(); + } } diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Device.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Device.java index ee08d4c..d295600 100644 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Device.java +++ b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Device.java @@ -11,111 +11,58 @@ package de.dentrassi.hono.simulator.http; import static de.dentrassi.hono.demo.common.Register.shouldRegister; -import static de.dentrassi.hono.demo.common.Select.oneOf; - +import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import de.dentrassi.hono.demo.common.Payload; import de.dentrassi.hono.demo.common.Register; -import okhttp3.Credentials; -import okhttp3.HttpUrl; +import io.glutamate.lang.Environment; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; -public abstract class Device { +public class Device { private static final Logger logger = LoggerFactory.getLogger(Device.class); - private static final String METHOD = System.getenv().get("HTTP_METHOD"); - - protected static final boolean AUTO_REGISTER = Boolean - .parseBoolean(System.getenv().getOrDefault("AUTO_REGISTER", "true")); - - protected static final boolean NOAUTH = Boolean.parseBoolean(System.getenv().getOrDefault("HTTP_NOAUTH", "false")); + private static final boolean AUTO_REGISTER = Environment.getAs("AUTO_REGISTER", true, Boolean::parseBoolean); - protected final String auth; + private final Register register; - protected final Register register; + private final String user; - protected final String user; + private final String deviceId; - protected final String deviceId; + private final String password; - protected final String password; + private final Statistics statistics; - protected final String tenant; + private final Supplier> requestProvider; - protected final Statistics statistics; + private final Payload payload; - protected final String method; + public Device(final Supplier> requestProvider, final String user, final String deviceId, + final String tenant, final String password, final Register register, final Payload payload, + final Statistics statistics) { - protected final boolean enabled; - - public Device(final String user, final String deviceId, final String tenant, final String password, - final Register register, final Statistics statistics) { + Objects.requireNonNull(requestProvider); + Objects.requireNonNull(payload); this.register = register; this.user = user; this.deviceId = deviceId; - this.tenant = tenant; this.password = password; this.statistics = statistics; - this.auth = Credentials.basic(user + "@" + tenant, password); - - this.method = METHOD != null ? METHOD : "PUT"; - - this.enabled = getHonoHttpUrl() != null; - } - - protected HttpUrl getHonoHttpUrl() { - - String url = oneOf(System.getenv("HONO_HTTP_URL")); - - final String envProto = System.getenv("HONO_HTTP_PROTO"); - final String envHost = oneOf(System.getenv("HONO_HTTP_HOST")); - final String envPort = System.getenv("HONO_HTTP_PORT"); - - if (url == null && envHost != null && envPort != null) { - final String proto = envProto != null ? envProto : "http"; - url = String.format("%s://%s:%s", proto, envHost, envPort); - } - - if (url != null) { - return HttpUrl.parse(url); - } else { - return null; - } - } - - protected HttpUrl createUrl(final String type) { - if ("POST".equals(this.method)) { - return createPostUrl(type); - } else { - return createPutUrl(type); - } - } - - protected HttpUrl createPostUrl(final String type) { - if (!this.enabled) { - return null; - } - - return getHonoHttpUrl().resolve("/" + type); - } - - protected HttpUrl createPutUrl(final String type) { - if (!this.enabled) { - return null; - } + this.requestProvider = requestProvider; + this.payload = payload; - return getHonoHttpUrl().newBuilder() - .addPathSegment(type) - .addPathSegment(this.tenant) - .addPathSegment(this.deviceId) - .build(); } public void register() throws Exception { @@ -124,23 +71,7 @@ public void register() throws Exception { } } - protected abstract ThrowingSupplier, Exception> tickTelemetryProvider(); - - protected abstract ThrowingSupplier, Exception> tickEventProvider(); - - public CompletableFuture tickTelemetry() { - return tick(() -> tickTelemetryProvider().get()); - } - - public CompletableFuture tickEvent() { - return tick(() -> tickEventProvider().get()); - } - - protected CompletableFuture tick(final ThrowingSupplier, Exception> runnable) { - - if (!this.enabled) { - return CompletableFuture.completedFuture(null); - } + public CompletableFuture tick() { this.statistics.scheduled(); final Instant start = Instant.now(); @@ -148,7 +79,7 @@ protected CompletableFuture tick(final ThrowingSupplier, final CompletableFuture future; try { - future = runnable.get(); + future = process(); } catch (final Exception e) { this.statistics.failed(); return CompletableFuture.completedFuture(null); @@ -157,13 +88,14 @@ protected CompletableFuture tick(final ThrowingSupplier, return future.whenComplete((r, ex) -> { if (ex != null) { - statistics.failed(); + this.statistics.failed(); logger.debug("Failed to publish", ex); } final Duration dur = Duration.between(start, Instant.now()); this.statistics.duration(dur); }); + } protected void handleSuccess() { @@ -175,16 +107,16 @@ protected void handleException(final Throwable e) { this.statistics.error(0); } - protected void handleFailure(final Response response) { + protected void handleFailure(final HttpResponse response) { this.statistics.failed(); - this.statistics.error(response.code()); + this.statistics.error(response.statusCode()); if (logger.isDebugEnabled()) { - logger.debug("handleFailure - code: {}, body: {}", response.code(), response.bodyAsString()); + logger.debug("handleFailure - code: {}, body: {}", response.statusCode(), response.bodyAsString()); } try { - switch (response.code()) { + switch (response.statusCode()) { case 401: case 403: //$FALL-THROUGH$ if (AUTO_REGISTER && shouldRegister()) { @@ -197,8 +129,9 @@ protected void handleFailure(final Response response) { } } - protected void handleResponse(final Response response) { - final int code = response.code(); + protected void handleResponse(final HttpResponse response) { + + final int code = response.statusCode(); if (code < 200 || code > 299) { handleFailure(response); } else { @@ -206,4 +139,28 @@ protected void handleResponse(final Response response) { } } + + protected CompletableFuture process() throws IOException { + + final CompletableFuture result = new CompletableFuture<>(); + + this.requestProvider + .get() + .sendBuffer(this.payload.getBuffer(), ar -> { + + final HttpResponse response = ar.result(); + + if (ar.succeeded()) { + handleResponse(response); + result.complete(null); + } else { + handleException(ar.cause()); + result.completeExceptionally(ar.cause()); + } + + }); + + return result; + } + } diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/DeviceProvider.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/DeviceProvider.java deleted file mode 100644 index c6f21a2..0000000 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/DeviceProvider.java +++ /dev/null @@ -1,26 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2018, 2019 Red Hat Inc and others. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Jens Reimann - initial API and implementation - *******************************************************************************/ - -package de.dentrassi.hono.simulator.http; - -import java.util.concurrent.Executor; - -import de.dentrassi.hono.demo.common.EventWriter; -import de.dentrassi.hono.demo.common.Payload; -import de.dentrassi.hono.demo.common.Register; - -public interface DeviceProvider { - - String getName(); - - Device createDevice(Executor executor, String user, String deviceId, String tenant, String password, - Register register, Payload payload, Statistics statistics, EventWriter eventWriter); -} diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Response.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Response.java deleted file mode 100644 index e1dbaac..0000000 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/Response.java +++ /dev/null @@ -1,25 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2018 Red Hat Inc and others. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Jens Reimann - initial API and implementation - *******************************************************************************/ -package de.dentrassi.hono.simulator.http; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -public interface Response { - - int code(); - - String bodyAsString(Charset charset); - - default String bodyAsString() { - return bodyAsString(StandardCharsets.UTF_8); - } -} diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/DefaultProvider.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/DefaultProvider.java deleted file mode 100644 index 9bcbffc..0000000 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/DefaultProvider.java +++ /dev/null @@ -1,53 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017, 2019 Red Hat Inc and others. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Jens Reimann - initial API and implementation - *******************************************************************************/ -package de.dentrassi.hono.simulator.http.provider; - -import java.util.concurrent.Executor; - -import de.dentrassi.hono.demo.common.EventWriter; -import de.dentrassi.hono.demo.common.Payload; -import de.dentrassi.hono.demo.common.Register; -import de.dentrassi.hono.simulator.http.Device; -import de.dentrassi.hono.simulator.http.DeviceProvider; -import de.dentrassi.hono.simulator.http.Statistics; - -public class DefaultProvider implements DeviceProvider { - - @FunctionalInterface - public interface Constructor { - - Device construct(Executor executor, String user, String deviceId, String tenant, String password, - Register register, Payload payload, Statistics statistics, - EventWriter eventWriter); - } - - private final String name; - private final DefaultProvider.Constructor constructor; - - public DefaultProvider(final String name, final Constructor constructor) { - this.name = name; - this.constructor = constructor; - } - - @Override - public String getName() { - return this.name; - } - - @Override - public Device createDevice(final Executor executor, final String user, final String deviceId, final String tenant, - final String password, final Register register, final Payload payload, - final Statistics statistics, final EventWriter eventWriter) { - return this.constructor.construct(executor, user, deviceId, tenant, password, register, payload, - statistics, eventWriter); - } - -} \ No newline at end of file diff --git a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/VertxDevice.java b/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/VertxDevice.java deleted file mode 100644 index 64a657f..0000000 --- a/simulator-http/src/main/java/de/dentrassi/hono/simulator/http/provider/VertxDevice.java +++ /dev/null @@ -1,225 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2018, 2019 Red Hat Inc and others. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Jens Reimann - initial API and implementation - *******************************************************************************/ -package de.dentrassi.hono.simulator.http.provider; - -import static io.glutamate.lang.Environment.*; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.glutamate.lang.Environment; -import de.dentrassi.hono.demo.common.EventWriter; -import de.dentrassi.hono.demo.common.Payload; -import de.dentrassi.hono.demo.common.Register; -import de.dentrassi.hono.demo.common.Tls; -import de.dentrassi.hono.simulator.http.Device; -import de.dentrassi.hono.simulator.http.Response; -import de.dentrassi.hono.simulator.http.Statistics; -import de.dentrassi.hono.simulator.http.ThrowingSupplier; -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.dns.AddressResolverOptions; -import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; - -public class VertxDevice extends Device { - - private static final Logger logger = LoggerFactory.getLogger(VertxDevice.class); - - public static class Provider extends DefaultProvider { - - public Provider() { - super("VERTX", VertxDevice::new); - } - - } - - private static Vertx vertx; - - private static final AtomicReference client = new AtomicReference<>(); - - private static void initialize(final EventWriter eventWriter) { - - if (vertx != null) { - return; - } - - final VertxOptions options = new VertxOptions(); - - options.setPreferNativeTransport(true); - - final AddressResolverOptions addressResolverOptions = new AddressResolverOptions(); - consumeAs("VERTX_DNS_MAX_TTL", Integer::parseInt, addressResolverOptions::setCacheMaxTimeToLive); - addressResolverOptions - .setCacheNegativeTimeToLive( - Environment.getAs("VERTX_DNS_CACHE_NEGATIVE_TTL_SECONDS", 1, Integer::parseInt)); - options.setAddressResolverOptions(addressResolverOptions); - - vertx = Vertx.factory.vertx(options); - - final boolean usingNative = vertx.isNativeTransportEnabled(); - System.out.println("VERTX: Running with native: " + usingNative); - - createWebClient(eventWriter, 1); - - Environment.consumeAs("VERTX_RECREATE_CLIENT", Long::parseLong, period -> { - if (period > 0) { - vertx.setPeriodic(period, t -> createWebClient(eventWriter, period)); - } - }); - - } - - private static void createWebClient(final EventWriter eventWriter, final long period) { - logger.info("Creating new web client"); - - if (eventWriter != null) { - eventWriter.writeEvent("Web Client", "Creating new vertx web clients"); - } - - final WebClientOptions clientOptions = new WebClientOptions(); - - consumeAs("VERTX_KEEP_ALIVE", Boolean::parseBoolean, clientOptions::setKeepAlive); - consumeAs("VERTX_MAX_POOL_SIZE", Integer::parseInt, clientOptions::setMaxPoolSize); - consumeAs("VERTX_POOLED_BUFFERS", Boolean::parseBoolean, clientOptions::setUsePooledBuffers); - - clientOptions.setConnectTimeout(getAs("VERTX_CONNECT_TIMEOUT", 5_000, Integer::parseInt)); - clientOptions.setIdleTimeout(getAs("VERTX_IDLE_TIMEOUT", 5_000, Integer::parseInt)); - - if (Tls.insecure()) { - clientOptions.setVerifyHost(false); - clientOptions.setTrustAll(true); - } - - final WebClient oldClient = client.getAndSet(WebClient.create(vertx, clientOptions)); - if (oldClient != null) { - vertx.setTimer(period, t -> oldClient.close()); - } - } - - private final Payload payload; - - private final Buffer payloadBuffer; - - private final String telemetryUrl; - - private final String eventUrl; - - public VertxDevice(final Executor executor, final String user, final String deviceId, final String tenant, - final String password, final Register register, final Payload payload, - final Statistics statistics, final EventWriter eventWriter) { - super(user, deviceId, tenant, password, register, statistics); - - initialize(eventWriter); - - this.payload = payload; - this.payloadBuffer = Buffer.factory.buffer(this.payload.getBytes()); - - this.telemetryUrl = Objects.toString(createUrl("telemetry"), null); - this.eventUrl = Objects.toString(createUrl("event"), null); - } - - private HttpRequest createRequest(final String url) { - - final HttpRequest request; - - if (this.method.equals("POST")) { - request = VertxDevice.client.get().postAbs(url); - } else { - request = VertxDevice.client.get().putAbs(url); - } - - if (!NOAUTH) { - request.putHeader("Authorization", this.auth); - } - - request.putHeader("Content-Type", this.payload.getContentType()); - - return request; - } - - private HttpRequest createTelemetryRequest() { - return createRequest(this.telemetryUrl); - } - - private HttpRequest createEventRequest() { - return createRequest(this.eventUrl); - } - - protected CompletableFuture process(final Supplier> request) - throws IOException { - - final CompletableFuture result = new CompletableFuture<>(); - - request - .get() - .sendBuffer(this.payloadBuffer, ar -> { - - final HttpResponse response = ar.result(); - - if (ar.succeeded()) { - handleResponse(convertRequest(response)); - result.complete(null); - } else { - handleException(ar.cause()); - result.completeExceptionally(ar.cause()); - } - - }); - - return result; - } - - private static Response convertRequest(final HttpResponse response) { - return new Response() { - - @Override - public int code() { - return response.statusCode(); - } - - @Override - public String bodyAsString() { - // vertx decodes bodies always using UTF-8 - // but we do save a lookup of the encoder that way. - return response.bodyAsString(); - } - - @Override - public String bodyAsString(final Charset charset) { - return response.bodyAsString(charset.name()); - } - - }; - } - - @Override - protected ThrowingSupplier, Exception> tickTelemetryProvider() { - return () -> process(this::createTelemetryRequest); - } - - @Override - protected ThrowingSupplier, Exception> tickEventProvider() { - return () -> process(this::createEventRequest); - } - -} diff --git a/simulator-http/src/main/resources/META-INF/services/de.dentrassi.hono.simulator.http.DeviceProvider b/simulator-http/src/main/resources/META-INF/services/de.dentrassi.hono.simulator.http.DeviceProvider deleted file mode 100644 index 675c03c..0000000 --- a/simulator-http/src/main/resources/META-INF/services/de.dentrassi.hono.simulator.http.DeviceProvider +++ /dev/null @@ -1 +0,0 @@ -de.dentrassi.hono.simulator.http.provider.VertxDevice$Provider \ No newline at end of file