Skip to content

Commit

Permalink
Drop device provider concept
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Sep 30, 2019
1 parent 6a0f0d1 commit 902fce4
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 483 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,37 @@
*******************************************************************************/
package de.dentrassi.hono.simulator.http;

import static de.dentrassi.hono.demo.common.Select.oneOf;
import static io.glutamate.lang.Environment.consumeAs;
import static io.glutamate.lang.Environment.getAs;
import java.util.Random;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import de.dentrassi.hono.demo.common.AppRuntime;
import de.dentrassi.hono.demo.common.ProducerConfig;
import de.dentrassi.hono.demo.common.DeadlockDetector;
import de.dentrassi.hono.demo.common.EventWriter;
import de.dentrassi.hono.demo.common.Payload;
import de.dentrassi.hono.demo.common.Register;
import de.dentrassi.hono.demo.common.Tenant;
import de.dentrassi.hono.demo.common.Tls;
import io.glutamate.lang.Environment;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.netty.handler.ssl.OpenSsl;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import okhttp3.Credentials;

public class Application {

private static final Logger logger = LoggerFactory.getLogger(Application.class);

private static final ProducerConfig config = ProducerConfig.fromEnv();

private static final String DEVICE_PROVIDER = System.getenv().getOrDefault("DEVICE_PROVIDER", "VERTX");
private static final String METHOD = Environment.get("HTTP_METHOD").orElse("PUT");

private static final boolean NOAUTH = Environment.getAs("HTTP_NOAUTH", false, Boolean::parseBoolean);

public static void main(final String[] args) throws Exception {

Expand All @@ -54,9 +53,6 @@ public static void main(final String[] args) throws Exception {
}

private static void runSimulator(final AppRuntime runtime) throws InterruptedException {
logger.info("Using Device implementation: {}", DEVICE_PROVIDER);

final DeviceProvider provider = locateProvider();

final int numberOfDevices = getAs("NUM_DEVICES", 10, Integer::parseInt);
final int numberOfThreads = getAs("NUM_THREADS", 10, Integer::parseInt);
Expand All @@ -80,58 +76,117 @@ private static void runSimulator(final AppRuntime runtime) throws InterruptedExc
final Tags commonTags = Tags.of(
Tag.of("protocol", "http"),
Tag.of("tenant", Tenant.TENANT),
config.getType().asTag()
);
config.getType().asTag());
final Statistics stats = new Statistics(registry, commonTags);

final ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();

final TickExecutor tickExecutor = new TickExecutor();

final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

final Random r = new Random();

try {
final var webClient = createWebClient(runtime.getVertx());

for (int i = 0; i < numberOfDevices; i++) {

final String username = String.format("user-%s-%s", deviceIdPrefix, i);
final String deviceId = String.format("%s-%s", deviceIdPrefix, i);

for (int i = 0; i < numberOfDevices; i++) {
final var request = createRequest(webClient, config, Payload.payload(), Tenant.TENANT, deviceIdPrefix,
username, "hono-secret");

final String username = String.format("user-%s-%s", deviceIdPrefix, i);
final String deviceId = String.format("%s-%s", deviceIdPrefix, i);
final Device device = new Device(() -> request, username, deviceId, Tenant.TENANT,
"hono-secret", register, Payload.payload(), stats);

final Device device = provider.createDevice(executor, username, deviceId, Tenant.TENANT,
"hono-secret", register, Payload.payload(), stats,
EventWriter.nullWriter());
tickExecutor.scheduleAtFixedRate(device::tick, r.nextInt((int) config.getPeriod().toMillis()),
config.getPeriod().toMillis());

final Supplier<CompletableFuture<?>> ticker;
switch (config.getType()) {
case EVENT:
ticker = device::tickEvent;
break;
default:
ticker = device::tickTelemetry;
break;
}
}

Thread.sleep(Long.MAX_VALUE);

}

private static WebClient createWebClient(final Vertx vertx) {
final WebClientOptions clientOptions = new WebClientOptions();

tickExecutor.scheduleAtFixedRate(ticker, r.nextInt((int) config.getPeriod().toMillis()),
config.getPeriod().toMillis());
consumeAs("VERTX_KEEP_ALIVE", Boolean::parseBoolean, clientOptions::setKeepAlive);
consumeAs("VERTX_MAX_POOL_SIZE", Integer::parseInt, clientOptions::setMaxPoolSize);
consumeAs("VERTX_POOLED_BUFFERS", Boolean::parseBoolean, clientOptions::setUsePooledBuffers);

}
clientOptions.setConnectTimeout(getAs("VERTX_CONNECT_TIMEOUT", 5_000, Integer::parseInt));
clientOptions.setIdleTimeout(getAs("VERTX_IDLE_TIMEOUT", 5_000, Integer::parseInt));

Thread.sleep(Long.MAX_VALUE);
} finally {
executor.shutdown();
statsExecutor.shutdown();
if (Tls.insecure()) {
clientOptions.setVerifyHost(false);
clientOptions.setTrustAll(true);
}

if (vertx.isNativeTransportEnabled()
&& OpenSsl.isAvailable()
&& OpenSsl.supportsKeyManagerFactory()
&& OpenSsl.supportsHostnameValidation()) {
clientOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions());
}

return WebClient.create(vertx, clientOptions);
}

private static DeviceProvider locateProvider() {
for (final DeviceProvider provider : ServiceLoader.load(DeviceProvider.class)) {
if (provider.getName().equals(DEVICE_PROVIDER)) {
return provider;
}
private static HttpRequest<Buffer> createRequest(final WebClient client, final ProducerConfig config,
final Payload payload, final String tenant, final String deviceId, final String user,
final String password) {

final var auth = Credentials.basic(user + "@" + tenant, password);
final var url = buildUrl(config, tenant, deviceId);

final HttpRequest<Buffer> request;

switch (METHOD) {
case "POST":
request = client.postAbs(url);
break;
default:
request = client.putAbs(url);
break;
}
throw new IllegalArgumentException(String.format("Unable to find device provider: '%s'", DEVICE_PROVIDER));

if (!NOAUTH) {
request.putHeader("Authorization", auth);
}

request.putHeader("Content-Type", payload.getContentType());

return request;
}

private static String buildUrl(final ProducerConfig config, final String tenant, final String deviceId) {

final String url = oneOf(System.getenv("HONO_HTTP_URL"));
if (url == null || url.isBlank()) {
throw new IllegalArgumentException("'HONO_HTTP_URL' is missing or blank");
}

final var builder = new StringBuilder(url);

if (!url.endsWith("/")) {
builder.append("/");
}

switch (config.getType()) {
case EVENT:
builder.append("event");
break;
case TELEMETRY:
builder.append("telemetry");
break;
}

switch (METHOD) {
case "POST":
break;
default:
builder.append("/").append(tenant).append("/").append(deviceId);
break;
}

return builder.toString();
}
}
Loading

0 comments on commit 902fce4

Please sign in to comment.