-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBasicTCPEchoAIO.java
141 lines (127 loc) · 4.85 KB
/
BasicTCPEchoAIO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* TCP Echo Server using Asynchronous I/O
*
* The main() creates a TCP server socket channel, sets up the socket including
* binding and setting the accept completion handler, and non-busily waits
* (forever).
*
* @author Michael Donahoo
* @version 0.2
*/
public class BasicTCPEchoAIO {
/**
* Buffer size (bytes)
*/
private static final int BUFSIZE = 256;
/**
* Global logger
*/
private static final Logger logger = Logger.getLogger("Basic");
public static void main(String[] args) throws IOException {
if (args.length != 1) { // Test for correct # of args
throw new IllegalArgumentException("Parameter(s): <Port>");
}
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
try (AsynchronousServerSocketChannel listenChannel = AsynchronousServerSocketChannel.open(channelGroup)) {
// Bind local port
listenChannel.bind(new InetSocketAddress(Integer.parseInt(args[0])));
// Create accept handler
listenChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel clntChan, Void attachment) {
listenChannel.accept(null, this);
handleAccept(clntChan);
}
@Override
public void failed(Throwable e, Void attachment) {
logger.log(Level.WARNING, "Close Failed", e);
}
});
// Block until current thread dies
Thread.currentThread().join();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Server Interrupted", e);
}
}
/**
* Called after each accept completion
*
* @param clntChan channel of new client
*/
public static void handleAccept(AsynchronousSocketChannel clntChan) {
// After accepting, the echo server reads from the client
initRead(clntChan, ByteBuffer.allocateDirect(BUFSIZE));
}
/**
* Called after each read completion
*
* @param clntChan channel of new client
* @param buf byte buffer used in {@link Readable}
* @param bytesRead number of bytes read
*/
public static void handleRead(AsynchronousSocketChannel clntChan, ByteBuffer buf, int bytesRead) {
if (bytesRead == -1) { // Did the other end close?
try {
clntChan.close();
} catch (IOException e) {
die(clntChan, "Unable to close", e);
}
} else if (bytesRead > 0) {
// After reading, the echo server echos all bytes
buf.flip(); // prepare to write
initWrite(clntChan, buf);
}
logger.info(() -> "Handled read of " + bytesRead + " bytes");
}
/**
* Called after each write
*
* @param clntChan channel of new client
* @param buf byte buffer used in write
*/
public static void handleWrite(AsynchronousSocketChannel clntChan, ByteBuffer buf) {
if (buf.hasRemaining()) { // More to write
initWrite(clntChan, buf);
} else { // Back to reading
// After writing all bytes, the server again reads
buf.clear();
initRead(clntChan, buf);
}
}
public static void initRead(AsynchronousSocketChannel clntChan, ByteBuffer buf) {
clntChan.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
public void completed(Integer bytesRead, ByteBuffer buf) {
handleRead(clntChan, buf, bytesRead);
}
public void failed(Throwable ex, ByteBuffer v) {
die(clntChan, "Read failed", ex);
}
});
}
public static void initWrite(AsynchronousSocketChannel clntChan, ByteBuffer buf) {
clntChan.write(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
public void completed(Integer bytesWritten, ByteBuffer buf) {
handleWrite(clntChan, buf);
}
public void failed(Throwable ex, ByteBuffer buf) {
die(clntChan, "Write failed", ex);
}
});
}
public static void die(AsynchronousSocketChannel clntChan, String msg, Throwable ex) {
logger.log(Level.SEVERE, msg, ex);
try {
clntChan.close();
} catch (IOException e) {
logger.log(Level.WARNING, "Close Failed", e);
}
}
}