Skip to content

Commit

Permalink
Async send to tunneling clients
Browse files Browse the repository at this point in the history
  • Loading branch information
bmalinowsky committed May 12, 2024
1 parent bba8673 commit 10eab60
Showing 1 changed file with 52 additions and 42 deletions.
94 changes: 52 additions & 42 deletions src/io/calimero/server/gateway/KnxServerGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.Iterator;
Expand Down Expand Up @@ -463,7 +462,8 @@ record IpEvent(ServiceContainer sc, FrameEvent event) {}
private static final FrameEvent ResetEvent = new FrameEvent(KnxServerGateway.class, new byte[0]);

// support replaying subnet events for disrupted tunneling connections
private final Map<ServiceContainer, ReplayBuffer<FrameEvent>> subnetEventBuffers = new HashMap<>();
// TODO move to subnet connector?
private final Map<ServiceContainer, ReplayBuffer<FrameEvent>> subnetEventBuffers = new ConcurrentHashMap<>();
private final Map<KNXnetIPConnection, ServiceContainer> waitingForReplay = new ConcurrentHashMap<>();

private final Instant startTime;
Expand Down Expand Up @@ -861,6 +861,8 @@ private void transmitCurrentTime(final SubnetConnector connector, final DPTXlato
try {
logger.log(DEBUG, "dispatch {0}->{1} to all server-side connections", src, dst);
final long eventId = new FrameEvent(this, (CEMI) null).id();
final var apdu = prepareTimestamp(xlator, src, dst);

for (final var conn : serverConnections) {
boolean monitor = false;
if (conn instanceof final DataEndpoint de) {
Expand All @@ -871,14 +873,11 @@ private void transmitCurrentTime(final SubnetConnector connector, final DPTXlato
monitor = de.type() == ConnectionType.Monitor;
}

// always create new timestamp, so we don't send an outdated timestamp if a previous client
// connection was not responsive
final var apdu = prepareTimestamp(xlator, src, dst);
final var f = new CEMILDataEx(CEMILData.MC_LDATA_IND, src, dst, apdu, p);
// if we have a bus monitoring connection, but a subnet connector does not support busmonitor mode,
// we serve that connection by converting cEMI L-Data -> cEMI BusMon
final CEMI send = monitor ? convertToBusmon(f, eventId, connector) : f;
send(sc, conn, send);
asyncSend(sc, conn, send);
}
}
catch (final RuntimeException e) {
Expand Down Expand Up @@ -974,17 +973,19 @@ private void replayPendingSubnetEvents()
final ServiceContainer svcContainer = entry.getValue();
final ReplayBuffer<FrameEvent> replayBuffer = subnetEventBuffers.get(svcContainer);
final Collection<FrameEvent> events = replayBuffer.replay(c);
logger.log(WARNING, "previous connection of {0} got disrupted => replay {1} pending messages", c, events.size());
events.forEach(fe -> {

Executor.execute(() -> {
logger.log(WARNING, "previous connection of {0} got disrupted => replay {1} pending messages", c, events.size());
try {
send(svcContainer, c, fe.getFrame());
for (final var fe : events)
send(svcContainer, c, fe.getFrame());
}
catch (final InterruptedException e) {
logger.log(WARNING, "failed to replay frame event", e);
logger.log(WARNING, "interrupted replay for " + c, e);
}
});
waitingForReplay.remove(c);
logger.log(DEBUG, "replay completed for connection {0}", c);
waitingForReplay.remove(c);
logger.log(DEBUG, "replay completed for connection {0}", c);
}, c + " replay");
}
}

Expand All @@ -1010,15 +1011,11 @@ private String friendlyName() {
}
}

private FrameEvent recordFrameEvent;

private void recordEvent(final SubnetConnector connector, final FrameEvent fe)
{
final ReplayBuffer<FrameEvent> buffer = subnetEventBuffers.get(connector.getServiceContainer());
if (buffer != null) {
if (buffer != null)
buffer.recordEvent(fe);
recordFrameEvent = fe;
}
}

private void onServerFrameReceived(final IpEvent ipEvent) throws InterruptedException {
Expand Down Expand Up @@ -1082,7 +1079,7 @@ private void onServerFrameReceived(final IpEvent ipEvent) throws InterruptedExce
final var routingConfig = routerObj.routingLcConfig(true);
switch (routingConfig) {
case All -> {
dispatchLdataToClients(getSubnetConnector(svcCont.getName()), ldata, fe.id());
dispatchLdataToClients(getSubnetConnector(svcCont.getName()), ldata, fe.id(), null);

final CEMILData send = adjustHopCount(ldata);
if (send == null)
Expand All @@ -1101,7 +1098,7 @@ private void onServerFrameReceived(final IpEvent ipEvent) throws InterruptedExce
try {
final var ind = CEMIFactory.create(null, null,
(CEMILData) CEMIFactory.create(CEMILData.MC_LDATA_IND, null, ldata), false, false);
send(svcCont, dataEndpoint, ind);
asyncSend(svcCont, dataEndpoint, ind);
}
catch (KNXFormatException | RuntimeException e) {
e.printStackTrace();
Expand Down Expand Up @@ -1163,7 +1160,7 @@ private void onServerFrameReceived(final IpEvent ipEvent) throws InterruptedExce
try {
final var ind = CEMIFactory.create(null, null,
(CEMILData) CEMIFactory.create(CEMILData.MC_LDATA_IND, null, ldata), false, false);
send(svcCont, conn, ind);
asyncSend(svcCont, conn, ind);
}
catch (KNXFormatException | RuntimeException e) {
e.printStackTrace();
Expand Down Expand Up @@ -1256,7 +1253,7 @@ else if (subnet.interfaceAddress().isPresent())
switch (config) {
case All -> {
for (final var conn : serverConnections)
send(sc, conn, send);
asyncSend(sc, conn, send);
dispatchToOtherSubnets(send, subnet, false);
}
case Block -> {
Expand All @@ -1269,7 +1266,7 @@ else if (subnet.interfaceAddress().isPresent())
// route to other subnet if indicated by destination
final var otherSubnet = connectorFor(dst);
if (otherSubnet.isPresent()) {
var os = otherSubnet.get();
final var os = otherSubnet.get();
// only forward if dst is actually in a different subnet (never feed back into originating subnet)
if (!os.equals(subnet))
dispatchToSubnet(os, send, fe.systemBroadcast());
Expand Down Expand Up @@ -1314,7 +1311,7 @@ else if (subnet.interfaceAddress().isPresent())
}

recordEvent(subnet, fe);
dispatchLdataToClients(subnet, send, fe.id());
dispatchLdataToClients(subnet, send, fe.id(), fe);
dispatchToOtherSubnets(send, subnet, false);
}
return;
Expand All @@ -1326,12 +1323,8 @@ else if (frame instanceof CEMIBusMon) {

for (final var conn : serverConnections) {
// routing does not support busmonitor mode
if (!(conn instanceof KNXnetIPRouting)) {
try {
send(subnet.getServiceContainer(), conn, frame);
}
catch (final InterruptedException e) {}
}
if (!(conn instanceof KNXnetIPRouting))
asyncSend(subnet.getServiceContainer(), conn, frame, fe);
}
return;
}
Expand Down Expand Up @@ -1523,7 +1516,7 @@ else if (subnet.interfaceType() == InterfaceType.Usb
if (assignedAddress != null) {
logger.log(DEBUG, "dispatch {0}->{1} ({2}) using {3}", f.getSource(), assignedAddress,
dst, connection);
send(sc, connection, CEMIFactory.create(null, assignedAddress, f, false));
asyncSend(sc, connection, CEMIFactory.create(null, assignedAddress, f, false));
}
}
// also dispatch via routing as-is
Expand All @@ -1542,7 +1535,7 @@ else if ((c = findRoutingConnection().orElse(null)) != null) {
logger.log(INFO, "no active KNXnet/IP connection for destination {0}, " +
"dispatch {1}->{2} to all server-side connections", dst, f.getSource(), dst);
for (final var conn : serverConnections)
send(sc, conn, f);
asyncSend(sc, conn, f);
}
}
else {
Expand Down Expand Up @@ -1572,16 +1565,16 @@ else if ((c = findRoutingConnection().orElse(null)) != null) {
return;
}

dispatchLdataToClients(subnet, f, eventId);
dispatchLdataToClients(subnet, f, eventId, null);
}
}
catch (final KnxPropertyException e) {
logger.log(ERROR, "send to server-side failed for " + f, e);
}
}

private void dispatchLdataToClients(final SubnetConnector subnet, final CEMILData f, final long eventId)
throws InterruptedException {
private void dispatchLdataToClients(final SubnetConnector subnet, final CEMILData f, final long eventId,
final FrameEvent recordFrameEvent) throws InterruptedException {
logger.log(DEBUG, "dispatch {0}->{1} to all server-side connections", f.getSource(), f.getDestination());
final ServiceContainer sc = subnet.getServiceContainer();
for (final var conn : serverConnections) {
Expand All @@ -1594,7 +1587,7 @@ private void dispatchLdataToClients(final SubnetConnector subnet, final CEMILDat
if (de.type() == ConnectionType.Monitor)
send = convertToBusmon(f, eventId, subnet);
}
send(sc, conn, send);
asyncSend(sc, conn, send, recordFrameEvent);
}
}

Expand Down Expand Up @@ -1651,17 +1644,34 @@ private Optional<KNXnetIPConnection> findRoutingConnection()
return serverConnections.stream().filter(KNXnetIPRouting.class::isInstance).findAny();
}

private void asyncSend(final ServiceContainer svcContainer, final KNXnetIPConnection c, final CEMI f) {
asyncSend(svcContainer, c, f, null);
}

private void asyncSend(final ServiceContainer svcContainer, final KNXnetIPConnection c, final CEMI f,
final FrameEvent recordFrameEvent) {
Executor.execute(() -> {
try {
send(svcContainer, c, f);
if (recordFrameEvent != null) {
final ReplayBuffer<FrameEvent> buffer = subnetEventBuffers.get(svcContainer);
if (buffer != null)
buffer.completeEvent(c, recordFrameEvent);
}
}
catch (final InterruptedException e) {
e.printStackTrace();
}
}, c + " sender");
}

private void send(final ServiceContainer svcContainer, final KNXnetIPConnection c, final CEMI f)
throws InterruptedException {
final int oi = objectInstance(svcContainer.getName());
try {
c.send(f, WaitForAck);
setNetworkState(oi, false, false);
incMsgTransmitted(oi, false);

final ReplayBuffer<FrameEvent> buffer = subnetEventBuffers.get(svcContainer);
if (buffer != null)
buffer.completeEvent(c, recordFrameEvent);
}
catch (final KNXTimeoutException e) {
logger.log(WARNING, "sending on {0} failed: {1} ({2})", c, e.getMessage(), f.toString());
Expand Down Expand Up @@ -2145,7 +2155,7 @@ private void verifySubnetInterfaceAddress(final ServiceContainer svcCont) throws
}
}

private void incMsgTransmitted(final int objinst, final boolean toKnxNetwork)
private synchronized void incMsgTransmitted(final int objinst, final boolean toKnxNetwork)
{
final int pid = toKnxNetwork ? PID.MSG_TRANSMIT_TO_KNX : PID.MSG_TRANSMIT_TO_IP;
// must be 4 byte unsigned
Expand Down Expand Up @@ -2208,7 +2218,7 @@ private CEMILData adjustHopCount(final CEMILData msg)
}

// if we can not transmit for 5 seconds, we assume some network fault
private void setNetworkState(final int objectInstance, final boolean knxNetwork, final boolean faulty)
private synchronized void setNetworkState(final int objectInstance, final boolean knxNetwork, final boolean faulty)
{
// 1 byte bit field
int state = getPropertyOrDefault(KNXNETIP_PARAMETER_OBJECT, objectInstance, PID.KNXNETIP_DEVICE_STATE, 0);
Expand Down

0 comments on commit 10eab60

Please sign in to comment.