Skip to content

Commit

Permalink
Provider information about the amount of payload received
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Apr 4, 2019
1 parent 268f465 commit f6f3d6a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public Application(final String tenant, final String host, final int port, final
.of("tenant", this.tenant)
.and(type.asTag());

this.consumer = new Consumer(this.runtime.getRegistry().counter("messages.received", commonTags));
this.consumer = new Consumer(
this.runtime.getRegistry().counter("messages.received", commonTags),
this.runtime.getRegistry().counter("payload.received", commonTags));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,35 @@

package de.dentrassi.hono.simulator.consumer;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import io.micrometer.core.instrument.Counter;

public class Consumer {

private final Counter counter;
private final Counter messages;
private final Counter payload;

public Consumer(final Counter counter) {
this.counter = counter;
public Consumer(final Counter messages, final Counter payload) {
this.messages = messages;
this.payload = payload;
}

public void handleMessage(final Message msg) {
this.counter.increment();
this.messages.increment();

final Section body = msg.getBody();
if (body instanceof Data) {
final Binary value = ((Data) body).getValue();
if (value != null) {
this.payload.increment(value.getLength());
}
}
}

public double count() {
return counter.count();
return messages.count();
}
}

0 comments on commit f6f3d6a

Please sign in to comment.