diff --git a/CHANGELOG.md b/CHANGELOG.md index 68943c40..8789330b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,17 @@ - Fixes for JSON data in mysql 8.0.16+ https://github.com/shyiko/mysql-binlog-connector-java/pull/288 - more fixes for the bizarre azure platform https://github.com/shyiko/mysql-binlog-connector-java/pull/275 +## [0.21.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.20.1...0.21.0) - 2020-06-08 + +### Fixed +- Potential deadlock when keepAlive is on ([#321](https://github.com/shyiko/mysql-binlog-connector-java/issues/321)). + +### Changed +- `BinaryLogClient.LifecycleListener::onConnect()` order relative to keepAlive thread `start()`. +Calling `disconnect()` inside `onConnect()` is now guaranteed to terminate keepAlive thread ([#213](https://github.com/shyiko/mysql-binlog-connector-java/pull/213), +[260](https://github.com/shyiko/mysql-binlog-connector-java/pull/260)). +A side effect of this change is that throwing RuntimeException inside `onConnect()` will no longer prevent keepAlive thread from starting. + ## [0.20.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.20.0...0.20.1) - 2019-05-12 ### Added diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index e3f1857c..430bb5be 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -65,7 +65,6 @@ import java.net.Socket; import java.net.SocketException; import java.security.GeneralSecurityException; -import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Collections; @@ -100,12 +99,10 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { new X509TrustManager() { @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) - throws CertificateException { } + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { } @Override - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) - throws CertificateException { } + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) { } @Override public X509Certificate[] getAcceptedIssuers() { @@ -167,7 +164,7 @@ public X509Certificate[] getAcceptedIssuers() { private volatile ExecutorService keepAliveThreadExecutor; private final Lock connectLock = new ReentrantLock(); - private final Lock keepAliveThreadExecutorLock = new ReentrantLock(); + private volatile CountDownLatch connectLatch; /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -498,119 +495,147 @@ public void setThreadFactory(ThreadFactory threadFactory) { * @throws IllegalStateException if binary log client is already connected */ public void connect() throws IOException, IllegalStateException { - if (!connectLock.tryLock()) { - throw new IllegalStateException("BinaryLogClient is already connected"); - } - boolean notifyWhenDisconnected = false; + connectWithTimeout(connectTimeout); + } + + private void connectWithTimeout(final long connectTimeout) throws IOException { + CountDownLatch latch = new CountDownLatch(1); + boolean connected = false; try { - Callable cancelDisconnect = null; + PacketChannel localChannel; + connectLock.lock(); try { - try { - long start = System.currentTimeMillis(); - channel = openChannel(); - if (connectTimeout > 0 && !isKeepAliveThreadRunning()) { - cancelDisconnect = scheduleDisconnectIn(connectTimeout - - (System.currentTimeMillis() - start)); - } - if (channel.getInputStream().peek() == -1) { - throw new EOFException(); - } - } catch (IOException e) { - throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + - ". Please make sure it's running.", e); + if (connectLatch != null) { + throw new IllegalStateException("BinaryLogClient is already connected"); } - GreetingPacket greetingPacket = receiveGreeting(); - - tryUpgradeToSSL(greetingPacket); - - new Authenticator(greetingPacket, channel, schema, username, password).authenticate(); - channel.authenticationComplete(); - - connectionId = greetingPacket.getThreadId(); - if ("".equals(binlogFilename)) { - synchronized (gtidSetAccessLock) { - if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - gtidSet = new GtidSet(fetchGtidPurged()); - } - } + connectLatch = latch; + localChannel = openChannelToBinaryLogStream(connectTimeout); + channel = localChannel; + if (keepAlive && !isKeepAliveThreadRunning()) { + keepAliveThreadExecutor = spawnKeepAliveThread(connectTimeout); } - if (binlogFilename == null) { - fetchBinlogFilenameAndPosition(); + } finally { + connectLock.unlock(); + } + connected = true; + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onConnect(this); + } + ensureEventDeserializerHasRequiredEDDs(); + listenForEventPackets(localChannel); + } finally { + connectLock.lock(); + try { + latch.countDown(); + if (latch == connectLatch) { + connectLatch = null; } - if (binlogPosition < 4) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4); - } - binlogPosition = 4; + } finally { + connectLock.unlock(); + } + if (connected) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onDisconnect(this); } - ChecksumType checksumType = fetchBinlogChecksum(); - if (checksumType != ChecksumType.NONE) { - confirmSupportOfChecksum(checksumType); + } + } + } + + private PacketChannel openChannelToBinaryLogStream(final long connectTimeout) throws IOException { + PacketChannel channel = null; + Callable cancelCloseChannel = null; + try { + try { + long start = System.currentTimeMillis(); + channel = openChannel(connectTimeout); + if (connectTimeout > 0 && !isKeepAliveThreadRunning()) { + cancelCloseChannel = scheduleCloseChannel(channel, connectTimeout - + (System.currentTimeMillis() - start)); } - setMasterServerId(); - if (heartbeatInterval > 0) { - enableHeartbeat(); + if (channel.getInputStream().peek() == -1) { + throw new EOFException(); } - gtid = null; - tx = false; - requestBinaryLogStream(); } catch (IOException e) { - disconnectChannel(); - throw e; - } finally { - if (cancelDisconnect != null) { - try { - cancelDisconnect.call(); - } catch (Exception e) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("\"" + e.getMessage() + - "\" was thrown while canceling scheduled disconnect call"); - } + throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + + ". Please make sure it's running.", e); + } + GreetingPacket greetingPacket = receiveGreeting(channel); + tryUpgradeToSSL(greetingPacket); + new Authenticator(greetingPacket, channel, schema, username, password).authenticate(); + channel.authenticationComplete(); + //authenticate(channel, greetingPacket); + connectionId = greetingPacket.getThreadId(); + if ("".equals(binlogFilename)) { + synchronized (gtidSetAccessLock) { + if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { + gtidSet = new GtidSet(fetchGtidPurged(channel)); } } } - connected = true; - notifyWhenDisconnected = true; - if (logger.isLoggable(Level.INFO)) { - String position; - synchronized (gtidSetAccessLock) { - position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition; - } - logger.info("Connected to " + hostname + ":" + port + " at " + position + - " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")"); + if (binlogFilename == null) { + fetchBinlogFilenameAndPosition(channel); } - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onConnect(this); + if (binlogPosition < 4) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4); + } + binlogPosition = 4; } - if (keepAlive && !isKeepAliveThreadRunning()) { - spawnKeepAliveThread(); + ChecksumType checksumType = fetchBinlogChecksum(channel); + if (checksumType != ChecksumType.NONE) { + confirmSupportOfChecksum(channel, checksumType); } - ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); - synchronized (gtidSetAccessLock) { - if (gtidSet != null) { - ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); - } + setMasterServerId(channel); + if (heartbeatInterval > 0) { + enableHeartbeat(channel); } - listenForEventPackets(); + gtid = null; + tx = false; + requestBinaryLogStream(channel); + } catch (IOException e) { + closeChannel(channel); + throw e; } finally { - connectLock.unlock(); - if (notifyWhenDisconnected) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onDisconnect(this); + if (cancelCloseChannel != null) { + try { + cancelCloseChannel.call(); + } catch (Exception e) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("\"" + e.getMessage() + + "\" was thrown while canceling scheduled disconnect call"); + } } } } + connected = true; + if (logger.isLoggable(Level.INFO)) { + String position; + synchronized (gtidSetAccessLock) { + position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition; + } + logger.info("Connected to " + hostname + ":" + port + " at " + position + + " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")"); + } + return channel; + } + + private void ensureEventDeserializerHasRequiredEDDs() { + ensureEventDataDeserializerIfPresent(EventType.ROTATE, RotateEventDataDeserializer.class); + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + ensureEventDataDeserializerIfPresent(EventType.GTID, GtidEventDataDeserializer.class); + ensureEventDataDeserializerIfPresent(EventType.QUERY, QueryEventDataDeserializer.class); + } + } } - private PacketChannel openChannel() throws IOException { + private PacketChannel openChannel(final long connectTimeout) throws IOException { Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); socket.connect(new InetSocketAddress(hostname, port), (int) connectTimeout); return new PacketChannel(socket); } - private Callable scheduleDisconnectIn(final long timeout) { - final BinaryLogClient self = this; + private Callable scheduleCloseChannel(final PacketChannel channel, final long timeout) { final CountDownLatch connectLatch = new CountDownLatch(1); final Thread thread = newNamedThread(new Runnable() { @Override @@ -628,7 +653,7 @@ public void run() { "Forcing disconnect."); } try { - self.disconnectChannel(); + closeChannel(channel); } catch (IOException e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, e.getMessage()); @@ -638,9 +663,9 @@ public void run() { } }, "blc-disconnect-" + hostname + ":" + port); thread.start(); - return new Callable() { + return new Callable() { - public Object call() throws Exception { + public Void call() throws Exception { connectLatch.countDown(); thread.join(); return null; @@ -657,7 +682,7 @@ private void checkError(byte[] packet) throws IOException { } } - private GreetingPacket receiveGreeting() throws IOException { + private GreetingPacket receiveGreeting(final PacketChannel channel) throws IOException { byte[] initialHandshakePacket = channel.read(); checkError(initialHandshakePacket); @@ -693,21 +718,21 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio } - private void enableHeartbeat() throws IOException { + private void enableHeartbeat(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); checkError(statementResult); } - private void setMasterServerId() throws IOException { + private void setMasterServerId(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("select @@server_id")); - ResultSetRowPacket[] resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length >= 0) { this.masterServerId = Long.parseLong(resultSet[0].getValue(0)); } } - private void requestBinaryLogStream() throws IOException { + private void requestBinaryLogStream(final PacketChannel channel) throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { @@ -723,12 +748,12 @@ private void requestBinaryLogStream() throws IOException { channel.write(dumpBinaryLogCommand); } - private void ensureEventDataDeserializer(EventType eventType, - Class eventDataDeserializerClass) { - EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); + private void ensureEventDataDeserializerIfPresent(EventType eventType, + Class> eventDataDeserializerClass) { + EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); if (eventDataDeserializer.getClass() != eventDataDeserializerClass && eventDataDeserializer.getClass() != EventDataWrapper.Deserializer.class) { - EventDataDeserializer internalEventDataDeserializer; + EventDataDeserializer internalEventDataDeserializer; try { internalEventDataDeserializer = eventDataDeserializerClass.newInstance(); } catch (Exception e) { @@ -740,8 +765,7 @@ private void ensureEventDataDeserializer(EventType eventType, } } - - private void spawnKeepAliveThread() { + private ExecutorService spawnKeepAliveThread(final long connectTimeout) { final ExecutorService threadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { @@ -750,51 +774,49 @@ public Thread newThread(Runnable runnable) { return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port); } }); - try { - keepAliveThreadExecutorLock.lock(); - threadExecutor.submit(new Runnable() { - @Override - public void run() { - while (!threadExecutor.isShutdown()) { + threadExecutor.submit(new Runnable() { + @Override + public void run() { + connectLock.lock(); // wait for connect() to finish initialization sequence + connectLock.unlock(); + while (!threadExecutor.isShutdown()) { + try { + //noinspection BusyWait + Thread.sleep(keepAliveInterval); + } catch (InterruptedException e) { + // expected in case of disconnect + } + if (threadExecutor.isShutdown()) { + return; + } + boolean connectionLost = false; + if (heartbeatInterval > 0) { + connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval; + } else { try { - Thread.sleep(keepAliveInterval); - } catch (InterruptedException e) { - // expected in case of disconnect - } - if (threadExecutor.isShutdown()) { - return; + channel.write(new PingCommand()); + } catch (IOException e) { + connectionLost = true; } - boolean connectionLost = false; - if (heartbeatInterval > 0) { - connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval; - } else { - try { - channel.write(new PingCommand()); - } catch (IOException e) { - connectionLost = true; - } + } + if (connectionLost) { + if (logger.isLoggable(Level.INFO)) { + logger.info("Trying to restore lost connection to " + hostname + ":" + port); } - if (connectionLost) { - if (logger.isLoggable(Level.INFO)) { - logger.info("Trying to restore lost connection to " + hostname + ":" + port); - } - try { - terminateConnect(); - connect(connectTimeout); - } catch (Exception ce) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Failed to restore connection to " + hostname + ":" + port + - ". Next attempt in " + keepAliveInterval + "ms"); - } + try { + terminateConnect(); + connect(connectTimeout); + } catch (Exception ce) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Failed to restore connection to " + hostname + ":" + port + + ". Next attempt in " + keepAliveInterval + "ms"); } } } } - }); - keepAliveThreadExecutor = threadExecutor; - } finally { - keepAliveThreadExecutorLock.unlock(); - } + } + }); + return threadExecutor; } private Thread newNamedThread(Runnable runnable, String threadName) { @@ -804,12 +826,7 @@ private Thread newNamedThread(Runnable runnable, String threadName) { } boolean isKeepAliveThreadRunning() { - try { - keepAliveThreadExecutorLock.lock(); - return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); - } finally { - keepAliveThreadExecutorLock.unlock(); - } + return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); } /** @@ -835,8 +852,7 @@ public void onConnect(BinaryLogClient client) { @Override public void run() { try { - setConnectTimeout(timeout); - connect(); + connectWithTimeout(timeout); } catch (IOException e) { exceptionReference.set(e); countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout" @@ -861,10 +877,16 @@ public void run() { } if (!started) { try { + // NOTE: we don't call disconnect here and so if client is able to connect right after timeout expires - + // keep-alive thread may be left running. terminateConnect(); - } finally { - throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms"); + } catch (IOException e) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("\"" + e.getMessage() + + "\" was thrown while terminating connection due to timeout"); + } } + throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms"); } } @@ -872,22 +894,21 @@ public void run() { * @return true if client is connected, false otherwise */ public boolean isConnected() { - return connected; + return connectLatch != null; } - private String fetchGtidPurged() throws IOException { + private String fetchGtidPurged(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show global variables like 'gtid_purged'")); - ResultSetRowPacket[] resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length != 0) { return resultSet[0].getValue(1).toUpperCase(); } return ""; } - private void fetchBinlogFilenameAndPosition() throws IOException { - ResultSetRowPacket[] resultSet; + private void fetchBinlogFilenameAndPosition(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show master status")); - resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length == 0) { throw new IOException("Failed to determine binlog filename/position"); } @@ -896,28 +917,29 @@ private void fetchBinlogFilenameAndPosition() throws IOException { binlogPosition = Long.parseLong(resultSetRow.getValue(1)); } - private ChecksumType fetchBinlogChecksum() throws IOException { + private ChecksumType fetchBinlogChecksum(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); - ResultSetRowPacket[] resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length == 0) { return ChecksumType.NONE; } return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); } - private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { + private void confirmSupportOfChecksum(final PacketChannel channel, ChecksumType checksumType) throws IOException { channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); byte[] statementResult = channel.read(); checkError(statementResult); eventDeserializer.setChecksumType(checksumType); } - private void listenForEventPackets() throws IOException { + private void listenForEventPackets(final PacketChannel channel) throws IOException { ByteArrayInputStream inputStream = channel.getInputStream(); boolean completeShutdown = false; try { while (inputStream.peek() != -1) { int packetLength = inputStream.readInteger(3); + //noinspection ResultOfMethodCallIgnored inputStream.skip(1); // 1 byte for sequence int marker = inputStream.read(); if (marker == 0xFF) { @@ -942,14 +964,14 @@ private void listenForEventPackets() throws IOException { if (cause instanceof EOFException || cause instanceof SocketException) { throw e; } - if (isConnected()) { + if (connected) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onEventDeserializationFailure(this, e); } } continue; } - if (isConnected()) { + if (connected) { eventLastSeen = System.currentTimeMillis(); updateGtidSet(event); notifyEventListeners(event); @@ -957,17 +979,17 @@ private void listenForEventPackets() throws IOException { } } } catch (Exception e) { - if (isConnected()) { + if (connected) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onCommunicationFailure(this, e); } } } finally { - if (isConnected()) { + if (connected) { if (completeShutdown) { disconnect(); // initiate complete shutdown sequence (which includes keep alive thread) } else { - disconnectChannel(); + closeChannel(channel); } } } @@ -978,6 +1000,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac int chunkLength; do { chunkLength = inputStream.readInteger(3); + //noinspection ResultOfMethodCallIgnored inputStream.skip(1); // 1 byte for sequence result = Arrays.copyOf(result, result.length + chunkLength); inputStream.fill(result, result.length - chunkLength, chunkLength); @@ -1049,8 +1072,8 @@ private void commitGtid() { } } - private ResultSetRowPacket[] readResultSet() throws IOException { - List resultSet = new LinkedList<>(); + private ResultSetRowPacket[] readResultSet(final PacketChannel channel) throws IOException { + List resultSet = new LinkedList(); byte[] statementResult = channel.read(); checkError(statementResult); @@ -1059,7 +1082,7 @@ private ResultSetRowPacket[] readResultSet() throws IOException { checkError(bytes); resultSet.add(new ResultSetRowPacket(bytes)); } - return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); + return resultSet.toArray(new ResultSetRowPacket[0]); } /** @@ -1145,32 +1168,32 @@ public void unregisterLifecycleListener(LifecycleListener eventListener) { /** * Disconnect from the replication stream. - * Note that this does not cause binlogFilename/binlogPosition to be cleared out. - * As the result following {@link #connect()} resumes client from where it left off. + * Note that this does not reset binlogFilename/binlogPosition. Calling {@link #connect()} or + * {@link #connect(long)}} again resumes client from where it left off. */ public void disconnect() throws IOException { - terminateKeepAliveThread(); - terminateConnect(); + connectLock.lock(); + ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; + PacketChannel channel = this.channel; + CountDownLatch connectLatch = this.connectLatch; + connectLock.unlock(); + + terminateKeepAliveThread(keepAliveThreadExecutor); + closeChannel(channel); + waitForConnectToTerminate(connectLatch); } - private void terminateKeepAliveThread() { - try { - keepAliveThreadExecutorLock.lock(); - ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; - if (keepAliveThreadExecutor == null) { - return; - } - keepAliveThreadExecutor.shutdownNow(); - while (!awaitTerminationInterruptibly(keepAliveThreadExecutor, - Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - // ignore - } - } finally { - keepAliveThreadExecutorLock.unlock(); + private void terminateKeepAliveThread(final ExecutorService threadExecutor) { + if (threadExecutor == null) { + return; } + threadExecutor.shutdownNow(); + while (!awaitTerminationInterruptibly(threadExecutor, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { /* retry */ } } - private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) { + @SuppressWarnings("SameParameterValue") + private static boolean awaitTerminationInterruptibly(final ExecutorService executorService, + final long timeout, final TimeUnit unit) { try { return executorService.awaitTermination(timeout, unit); } catch (InterruptedException e) { @@ -1179,21 +1202,32 @@ private static boolean awaitTerminationInterruptibly(ExecutorService executorSer } private void terminateConnect() throws IOException { - do { - disconnectChannel(); - } while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS)); + connectLock.lock(); + PacketChannel channel = this.channel; + CountDownLatch connectLatch = this.connectLatch; connectLock.unlock(); + + closeChannel(channel); + waitForConnectToTerminate(connectLatch); + } + + private void waitForConnectToTerminate(final CountDownLatch connectLatch) { + if (connectLatch != null) { + while (!awaitInterruptibly(connectLatch, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { /* retry */ } + } } - private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit) { + @SuppressWarnings("SameParameterValue") + private static boolean awaitInterruptibly(final CountDownLatch countDownLatch, + final long time, final TimeUnit unit) { try { - return lock.tryLock(time, unit); + return countDownLatch.await(time, unit); } catch (InterruptedException e) { return false; } } - private void disconnectChannel() throws IOException { + private void closeChannel(final PacketChannel channel) throws IOException { connected = false; if (channel != null && channel.isOpen()) { channel.close();