Skip to content

Commit

Permalink
Add read timeout (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
w3stling authored Aug 9, 2024
1 parent 98489af commit 67f9f21
Show file tree
Hide file tree
Showing 3 changed files with 457 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@
*/
public abstract class AbstractRssReader<C extends Channel, I extends Item> {
private static final String LOG_GROUP = "com.apptasticsoftware.rssreader";
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1);
private final HttpClient httpClient;
private DateTimeParser dateTimeParser = new DateTime();
private String userAgent = "";
private Duration connectionTimeout = Duration.ofSeconds(25);
private Duration requestTimeout = Duration.ofSeconds(25);
private Duration readTimeout = Duration.ofSeconds(25);
private final Map<String, String> headers = new HashMap<>();
private final HashMap<String, BiConsumer<C, String>> channelTags = new HashMap<>();
private final HashMap<String, Map<String, BiConsumer<C, String>>> channelAttributes = new HashMap<>();
Expand Down Expand Up @@ -240,7 +244,7 @@ protected void registerItemAttributes() {
}

/**
* Date and Time parser for parsing timestamps.
* Date and time parser for parsing timestamps.
* @param dateTimeParser the date time parser to use.
* @return updated RSSReader.
*/
Expand All @@ -252,8 +256,8 @@ public AbstractRssReader<C, I> setDateTimeParser(DateTimeParser dateTimeParser)
}

/**
* Sets the user-agent of the HttpClient.
* This is completely optional and if not set then it will not send a user-agent header.
* Sets the user-agent of the http client.
* Optional parameter if not set the default value for {@code java.net.http.HttpClient} will be used.
* @param userAgent the user-agent to use.
* @return updated RSSReader.
*/
Expand All @@ -265,8 +269,7 @@ public AbstractRssReader<C, I> setUserAgent(String userAgent) {
}

/**
* Adds a http header to the HttpClient.
* This is completely optional and if no headers are set then it will not add anything.
* Adds a http header to the http client.
* @param key the key name of the header.
* @param value the value of the header.
* @return updated RSSReader.
Expand All @@ -279,6 +282,58 @@ public AbstractRssReader<C, I> addHeader(String key, String value) {
return this;
}

/**
* Sets the connection timeout for the http client.
* The connection timeout is the time it takes to establish a connection to the server.
* If set to zero the default value for {@link java.net.http.HttpClient.Builder#connectTimeout(Duration)} will be used.
* Default: 25 seconds.
*
* @param connectionTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setConnectionTimeout(Duration connectionTimeout) {
validate(connectionTimeout, "Connection timeout");
this.connectionTimeout = connectionTimeout;
return this;
}

/**
* Sets the request timeout for the http client.
* The request timeout is the time between the request is sent and the first byte of the response is received.
* If set to zero the default value for {@link java.net.http.HttpRequest.Builder#timeout(Duration)} will be used.
* Default: 25 seconds.
*
* @param requestTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setRequestTimeout(Duration requestTimeout) {
validate(requestTimeout, "Request timeout");
this.requestTimeout = requestTimeout;
return this;
}

/**
* Sets the read timeout.
* The read timeout it the time for reading all data in the response body.
* The effect of setting the timeout to zero is the same as setting an infinite Duration, ie. block forever.
* Default: 25 seconds.
*
* @param readTimeout the timeout duration.
* @return updated RSSReader.
*/
public AbstractRssReader<C, I> setReadTimeout(Duration readTimeout) {
validate(readTimeout, "Read timeout");
this.readTimeout = readTimeout;
return this;
}

private void validate(Duration duration, String name) {
Objects.requireNonNull(duration, name + " must not be null");
if (duration.isNegative()) {
throw new IllegalArgumentException(name + " must not be negative");
}
}

/**
* Add item extension for tags
* @param tag - tag name
Expand Down Expand Up @@ -450,8 +505,10 @@ public CompletableFuture<Stream<I>> readAsync(String url) {
*/
protected CompletableFuture<HttpResponse<InputStream>> sendAsyncRequest(String url) {
var builder = HttpRequest.newBuilder(URI.create(url))
.timeout(Duration.ofSeconds(25))
.header("Accept-Encoding", "gzip");
if (requestTimeout.toMillis() > 0) {
builder.timeout(requestTimeout);
}

if (!userAgent.isBlank())
builder.header("User-Agent", userAgent);
Expand Down Expand Up @@ -510,6 +567,7 @@ class RssItemIterator implements Iterator<I> {
private I nextItem;
private boolean isChannelPart = false;
private boolean isItemPart = false;
private ScheduledFuture<?> parseWatchdog;

public RssItemIterator(InputStream is) {
this.is = is;
Expand All @@ -528,6 +586,9 @@ public RssItemIterator(InputStream is) {
xmlInFact.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, Boolean.FALSE);

reader = xmlInFact.createXMLStreamReader(is);
if (!readTimeout.isZero()) {
parseWatchdog = EXECUTOR.schedule(this::close, readTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
}
catch (XMLStreamException e) {
var logger = Logger.getLogger(LOG_GROUP);
Expand All @@ -539,6 +600,9 @@ public RssItemIterator(InputStream is) {

public void close() {
try {
if (parseWatchdog != null) {
parseWatchdog.cancel(false);
}
reader.close();
is.close();
} catch (XMLStreamException | IOException e) {
Expand Down Expand Up @@ -783,16 +847,20 @@ private HttpClient createHttpClient() {
var context = SSLContext.getInstance("TLSv1.3");
context.init(null, null, null);

client = HttpClient.newBuilder()
var builder = HttpClient.newBuilder()
.sslContext(context)
.connectTimeout(Duration.ofSeconds(25))
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();
.followRedirects(HttpClient.Redirect.ALWAYS);
if (connectionTimeout.toMillis() > 0) {
builder.connectTimeout(connectionTimeout);
}
client = builder.build();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(25))
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();
var builder = HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.ALWAYS);
if (connectionTimeout.toMillis() > 0) {
builder.connectTimeout(connectionTimeout);
}
client = builder.build();
}

return client;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package com.apptasticsoftware.integrationtest;

import com.apptasticsoftware.rssreader.Item;
import com.apptasticsoftware.rssreader.RssReader;
import com.apptasticsoftware.rssreader.util.RssServer;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.*;

class ConnectionTest {
private static final int PORT = 8008;
private static final Duration NEGATIVE_DURATION = Duration.ofSeconds(-30);

@Test
void testConnectionTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setConnectionTimeout(null));
assertEquals("Connection timeout must not be null", exception.getMessage());
}

@Test
void testRequestTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setRequestTimeout(null));
assertEquals("Request timeout must not be null", exception.getMessage());
}

@Test
void testReadTimeoutWithNullValue() {
var rssReader = new RssReader();
var exception = assertThrows(NullPointerException.class, () -> rssReader.setReadTimeout(null));
assertEquals("Read timeout must not be null", exception.getMessage());
}

@Test
void testConnectionTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setConnectionTimeout(NEGATIVE_DURATION));
assertEquals("Connection timeout must not be negative", exception.getMessage());
}

@Test
void testRequestTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setRequestTimeout(NEGATIVE_DURATION));
assertEquals("Request timeout must not be negative", exception.getMessage());
}

@Test
void testReadTimeoutWithNegativeValue() {
var rssReader = new RssReader();
var exception = assertThrows(IllegalArgumentException.class, () -> rssReader.setReadTimeout(NEGATIVE_DURATION));
assertEquals("Read timeout must not be negative", exception.getMessage());
}

@Test
void testReadFromLocalRssServerNoTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setConnectionTimeout(Duration.ZERO)
.setRequestTimeout(Duration.ZERO)
.setReadTimeout(Duration.ZERO)
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testReadFromLocalRssServer10SecondTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setConnectionTimeout(Duration.ofSeconds(10))
.setRequestTimeout(Duration.ofSeconds(10))
.setReadTimeout(Duration.ofSeconds(10))
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}


@Test
void testReadFromLocalRssServer() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ofSeconds(2))
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testNoReadTimeout() throws IOException {
var server = RssServer.with(getFile("atom-feed.xml"))
.port(PORT)
.endpointPath("/rss")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ZERO)
.read("http://localhost:8008/rss")
.collect(Collectors.toList());

server.stop();
verify(3, items);
}

@Test
void testReadTimeout() throws IOException {
var server = RssServer.withWritePause(getFile("atom-feed.xml"), Duration.ofSeconds(4))
.port(PORT)
.endpointPath("/slow-server")
.build();
server.start();

var items = new RssReader()
.setReadTimeout(Duration.ofSeconds(2))
.read("http://localhost:8008/slow-server")
.collect(Collectors.toList());

server.stop();
verify(2, items);
}

private static void verify(int expectedSize, List<Item> items) {
assertEquals(expectedSize, items.size());

if (!items.isEmpty()) {
assertEquals("dive into mark", items.get(0).getChannel().getTitle());
assertEquals(65, items.get(0).getChannel().getDescription().length());
assertEquals("http://example.org/feed.atom", items.get(0).getChannel().getLink());
assertEquals("Copyright (c) 2003, Mark Pilgrim", items.get(0).getChannel().getCopyright().orElse(null));
assertEquals("Example Toolkit", items.get(0).getChannel().getGenerator().orElse(null));
assertEquals("2005-07-31T12:29:29Z", items.get(0).getChannel().getLastBuildDate().orElse(null));

assertEquals("Atom draft-07 snapshot", items.get(0).getTitle().orElse(null));
assertNull(items.get(1).getAuthor().orElse(null));
assertEquals("http://example.org/audio/ph34r_my_podcast.mp3", items.get(0).getLink().orElse(null));
assertEquals("tag:example.org,2003:3.2397", items.get(0).getGuid().orElse(null));
assertEquals("2003-12-13T08:29:29-04:00", items.get(0).getPubDate().orElse(null));
assertEquals("2005-07-31T12:29:29Z", items.get(0).getUpdated().orElse(null));
assertEquals(211, items.get(1).getDescription().orElse("").length());
}
if (items.size() >= 2) {
assertEquals("Atom-Powered Robots Run Amok", items.get(1).getTitle().orElse(null));
assertNull(items.get(1).getAuthor().orElse(null));
assertEquals("http://example.org/2003/12/13/atom03", items.get(1).getLink().orElse(null));
assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a", items.get(1).getGuid().orElse(null));
assertEquals("2003-12-13T18:30:02Z", items.get(1).getPubDate().orElse(null));
assertEquals("2003-12-13T18:30:02Z", items.get(1).getUpdated().orElse(null));
assertEquals(211, items.get(1).getDescription().orElse("").length());
}
if (items.size() >= 3) {
assertEquals("Atom-Powered Robots Run Amok 2", items.get(2).getTitle().orElse(null));
assertNull(items.get(2).getAuthor().orElse(null));
assertEquals("http://example.org/2003/12/13/atom04", items.get(2).getLink().orElse(null));
assertEquals("urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6b", items.get(2).getGuid().orElse(null));
assertEquals("2003-12-13T09:28:28-04:00", items.get(2).getPubDate().orElse(null));
assertEquals(1071322108, items.get(2).getPubDateZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null));
assertEquals("2003-12-13T18:30:01Z", items.get(2).getUpdated().orElse(null));
assertEquals(1071340201, items.get(2).getUpdatedZonedDateTime().map(ZonedDateTime::toEpochSecond).orElse(null));
assertEquals(47, items.get(2).getDescription().orElse("").length());
}
}

private File getFile(String filename) {
var url = getClass().getClassLoader().getResource(filename);
return new File(url.getFile());
}
}
Loading

0 comments on commit 67f9f21

Please sign in to comment.