From 21893fb3d7fc17a07e8716e8ef4d611446b0e00f Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sat, 22 Apr 2023 21:48:26 +0800 Subject: [PATCH 1/8] :bookmark: InterceptorAfter --- .../com/luna/common/net/AsyncQuickStart.java | 147 ------------------ .../java/com/luna/common/net/HttpUtils.java | 11 ++ 2 files changed, 11 insertions(+), 147 deletions(-) delete mode 100644 src/main/java/com/luna/common/net/AsyncQuickStart.java diff --git a/src/main/java/com/luna/common/net/AsyncQuickStart.java b/src/main/java/com/luna/common/net/AsyncQuickStart.java deleted file mode 100644 index e2c4f0d1..00000000 --- a/src/main/java/com/luna/common/net/AsyncQuickStart.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ - -package com.luna.common.net; - -import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; -import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.nio.AsyncRequestProducer; -import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; - -import java.io.IOException; -import java.nio.CharBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; - -public class AsyncQuickStart { - - public static void main (final String[] args) throws Exception { - try (final CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault()) { - // Start the client - httpclient.start(); - - // Execute request - final SimpleHttpRequest request1 = SimpleRequestBuilder.get("http://httpbin.org/get").build(); - final Future future = httpclient.execute(request1, null); - // and wait until response is received - final SimpleHttpResponse response1 = future.get(); - System.out.println(request1.getRequestUri() + "->" + response1.getCode()); - - // One most likely would want to use a callback for operation result - final CountDownLatch latch1 = new CountDownLatch(1); - final SimpleHttpRequest request2 = SimpleRequestBuilder.get("http://httpbin.org/get").build(); - httpclient.execute(request2, new FutureCallback() { - - @Override - public void completed(final SimpleHttpResponse response2) { - latch1.countDown(); - System.out.println(request2.getRequestUri() + "->" + response2.getCode()); - } - - @Override - public void failed(final Exception ex) { - latch1.countDown(); - System.out.println(request2.getRequestUri() + "->" + ex); - } - - @Override - public void cancelled() { - latch1.countDown(); - System.out.println(request2.getRequestUri() + " cancelled"); - } - - }); - latch1.await(); - - // In real world one most likely would want also want to stream - // request and response body content - final CountDownLatch latch2 = new CountDownLatch(1); - final AsyncRequestProducer producer3 = AsyncRequestBuilder.get("http://httpbin.org/get").build(); - final AbstractCharResponseConsumer consumer3 = new AbstractCharResponseConsumer() { - - HttpResponse response; - - @Override - protected void start(final HttpResponse response, final ContentType contentType) throws HttpException, IOException { - this.response = response; - } - - @Override - protected int capacityIncrement() { - return Integer.MAX_VALUE; - } - - @Override - protected void data(final CharBuffer data, final boolean endOfStream) throws IOException { - // Do something useful - } - - @Override - protected HttpResponse buildResult() throws IOException { - return response; - } - - @Override - public void releaseResources() { - } - - }; - httpclient.execute(producer3, consumer3, new FutureCallback() { - - @Override - public void completed(final HttpResponse response3) { - latch2.countDown(); - System.out.println(request2.getRequestUri() + "->" + response3.getCode()); - } - - @Override - public void failed(final Exception ex) { - latch2.countDown(); - System.out.println(request2.getRequestUri() + "->" + ex); - } - - @Override - public void cancelled() { - latch2.countDown(); - System.out.println(request2.getRequestUri() + " cancelled"); - } - - }); - latch2.await(); - - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/luna/common/net/HttpUtils.java b/src/main/java/com/luna/common/net/HttpUtils.java index 1b94a8ba..3452e4f6 100644 --- a/src/main/java/com/luna/common/net/HttpUtils.java +++ b/src/main/java/com/luna/common/net/HttpUtils.java @@ -11,6 +11,7 @@ import org.apache.hc.client5.http.DnsResolver; import org.apache.hc.client5.http.SystemDefaultDnsResolver; import org.apache.hc.client5.http.auth.*; +import org.apache.hc.client5.http.classic.ExecChainHandler; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.classic.methods.HttpPut; @@ -163,6 +164,16 @@ public static void refresh() { httpClient = httpClientBuilder.build(); } + public void addRequestInterceptorFirst(HttpRequestInterceptor httpRequestInterceptor) { + httpClientBuilder.addRequestInterceptorFirst(httpRequestInterceptor); + refresh(); + } + + public void addExecInterceptorAfter(final String existing, final String name, final ExecChainHandler interceptor) { + httpClientBuilder.addExecInterceptorAfter(existing, name, interceptor); + refresh(); + } + public static void basicAuth(String userName, String password, String host) { authContext(userName, password, host, StandardAuthScheme.BASIC); } From 6b9b0aeedda4a3512aa50ba34980af2c91b943bc Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sat, 22 Apr 2023 22:46:32 +0800 Subject: [PATCH 2/8] :bookmark: method name --- .../java/com/luna/common/net/HttpUtils.java | 38 +++++++++---------- .../luna/common/net/high/AsyncHttpUtils.java | 15 ++++---- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/luna/common/net/HttpUtils.java b/src/main/java/com/luna/common/net/HttpUtils.java index 3452e4f6..9b7d52e8 100644 --- a/src/main/java/com/luna/common/net/HttpUtils.java +++ b/src/main/java/com/luna/common/net/HttpUtils.java @@ -410,25 +410,23 @@ public static String doPutHandler(String host, String path, Map */ public static T doPost(String host, String path, Map headers, Map queries, Map bodies, HttpClientResponseHandler responseHandler) { - HttpPost request = new HttpPost(buildUrl(host, path, queries)); - builderHeader(headers, request); + + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + // 设置浏览器兼容模式 + builder.setMode(HttpMultipartMode.LEGACY); + // 设置请求的编码格式 + builder.setCharset(CharsetUtil.defaultCharset()); + builder.setContentType(ContentType.MULTIPART_FORM_DATA); if (MapUtils.isNotEmpty(bodies)) { bodies.forEach((k, v) -> { // 传入参数可以为file或者filePath,在此处做转换 File file = new File(v); - MultipartEntityBuilder builder = MultipartEntityBuilder.create(); - // 设置浏览器兼容模式 - builder.setMode(HttpMultipartMode.LEGACY); - // 设置请求的编码格式 - builder.setCharset(CharsetUtil.defaultCharset()); - builder.setContentType(ContentType.MULTIPART_FORM_DATA); // 添加文件 builder.addBinaryBody(k, file); - HttpEntity reqEntity = builder.build(); - request.setEntity(reqEntity); }); } - return doRequest(responseHandler, request); + HttpEntity reqEntity = builder.build(); + return doPost(host, path, headers, queries, reqEntity, responseHandler); } public static ClassicHttpResponse doPost(String host, String path, Map headers, @@ -449,10 +447,15 @@ public static ClassicHttpResponse doPost(String host, String path, Map T doPost(String host, String path, Map headers, Map queries, String body, HttpClientResponseHandler responseHandler) { + return doPost(host, path, headers, queries, new StringEntity(body, Charset.defaultCharset()), responseHandler); + } + + public static T doPost(String host, String path, Map headers, + Map queries, HttpEntity httpEntity, HttpClientResponseHandler responseHandler) { HttpPost request = new HttpPost(buildUrl(host, path, queries)); builderHeader(headers, request); - if (StringUtils.isNotBlank(body)) { - request.setEntity(new StringEntity(body, Charset.defaultCharset())); + if (httpEntity != null) { + request.setEntity(httpEntity); } return doRequest(responseHandler, request); } @@ -480,12 +483,7 @@ public static String doPostHander(String host, String path, Map */ public static T doPost(String host, String path, Map headers, Map queries, byte[] body, HttpClientResponseHandler responseHandler) { - HttpPost request = new HttpPost(buildUrl(host, path, queries)); - builderHeader(headers, request); - if (ObjectUtils.isNotEmpty(body)) { - request.setEntity(new ByteArrayEntity(body, ContentType.APPLICATION_OCTET_STREAM)); - } - return doRequest(responseHandler, request); + return doPost(host, path, headers, queries, new ByteArrayEntity(body, ContentType.APPLICATION_OCTET_STREAM), responseHandler); } public static HttpResponse doPost(String host, String path, Map headers, @@ -721,6 +719,6 @@ public static String checkResponseAndGetResult(HttpResponse httpResponse) { } public static String checkResponseAndGetResult(HttpResponse httpResponse, Boolean isEnsure) { - return checkResponseAndGetResultV2((ClassicHttpResponse) httpResponse, isEnsure); + return checkResponseAndGetResultV2((ClassicHttpResponse)httpResponse, isEnsure); } } diff --git a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java index 09f3ba37..3f35bd92 100644 --- a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java +++ b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java @@ -48,6 +48,7 @@ import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.nio.AsyncEntityProducer; @@ -153,32 +154,32 @@ public static void setAuth(String host, Integer port, String user, String passwo public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, Map queries, Path file, AsyncHttpClientResponseHandler responseHandler) throws IOException { AsyncRequestProducer producer = - getProducer(host, path, headers, queries, new PathEntityProducer(file, StandardOpenOption.READ), AsyncRequestBuilder.post()); + getProducer(host, path, headers, queries, new PathEntityProducer(file, StandardOpenOption.READ), Method.POST.toString()); return doAsyncRequest(responseHandler, producer); } public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, Map queries, File file, AsyncHttpClientResponseHandler responseHandler) { - AsyncRequestProducer producer = getProducer(host, path, headers, queries, new FileEntityProducer(file), AsyncRequestBuilder.post()); + AsyncRequestProducer producer = getProducer(host, path, headers, queries, new FileEntityProducer(file), Method.POST.toString()); return doAsyncRequest(responseHandler, producer); } public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, Map queries, String body, AsyncHttpClientResponseHandler responseHandler) { - AsyncRequestProducer producer = getProducer(host, path, headers, queries, new StringAsyncEntityProducer(body), AsyncRequestBuilder.post()); + AsyncRequestProducer producer = getProducer(host, path, headers, queries, new StringAsyncEntityProducer(body), Method.POST.toString()); return doAsyncRequest(responseHandler, producer); } public static CustomAsyncHttpResponse doGet(String host, String path, Map headers, Map queries, AsyncHttpClientResponseHandler responseHandler) { - AsyncRequestBuilder get = AsyncRequestBuilder.get(); - AsyncRequestProducer producer = getProducer(host, path, headers, queries, null, get); + AsyncRequestProducer producer = getProducer(host, path, headers, queries, null, Method.GET.toString()); return doAsyncRequest(responseHandler, producer); } private static AsyncRequestProducer getProducer(String host, String path, Map headers, Map queries, - AsyncEntityProducer entityProducer, - AsyncRequestBuilder builder) { + AsyncEntityProducer entityProducer, + String method) { + AsyncRequestBuilder builder = AsyncRequestBuilder.create(method); builder.setHttpHost(HttpHighLevelUtil.getHost(host)); builder.setUri(HttpUtils.buildUrl(host, path, queries)); HttpUtils.builderHeader(headers, builder); From a83364a4b1935b25a9e560fc0dd76fa2022bf292 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sun, 23 Apr 2023 00:22:26 +0800 Subject: [PATCH 3/8] :bookmark: fix --- pom.xml | 5 ++- .../java/com/luna/common/net/HttpUtils.java | 31 ++++++++++++++++--- .../com/luna/common/utils/HttpUtilsTest.java | 28 +++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 0ab5c207..6910c011 100644 --- a/pom.xml +++ b/pom.xml @@ -23,8 +23,7 @@ 3.12.0 1.15 2.11.0 - 5.2.1 - 4.5.13 + 5.2.1 31.1-jre 2.10.6 2.0.26 @@ -74,7 +73,7 @@ org.apache.httpcomponents.client5 httpclient5 - ${httpclient.version} + ${httpclient5.version} diff --git a/src/main/java/com/luna/common/net/HttpUtils.java b/src/main/java/com/luna/common/net/HttpUtils.java index 9b7d52e8..d542d75e 100644 --- a/src/main/java/com/luna/common/net/HttpUtils.java +++ b/src/main/java/com/luna/common/net/HttpUtils.java @@ -86,13 +86,29 @@ public class HttpUtils { /** * 设置连接建立的超时时间为10s */ - public static final int CONNECT_TIMEOUT = 10; + public static int CONNECT_TIMEOUT = 10; - public static final int RESPONSE_TIMEOUT = 30; + public static int RESPONSE_TIMEOUT = 30; - public static final int MAX_ROUTE = 200; + public static int MAX_ROUTE = 200; - public static final int SOCKET_TIME_OUT = 10; + public static int SOCKET_TIME_OUT = 10; + + public static void setConnectTimeout(int connectTimeout) { + CONNECT_TIMEOUT = connectTimeout; + } + + public static void setResponseTimeout(int responseTimeout) { + RESPONSE_TIMEOUT = responseTimeout; + } + + public static void setMaxRoute(int maxRoute) { + MAX_ROUTE = maxRoute; + } + + public static void setSocketTimeOut(int socketTimeOut) { + SOCKET_TIME_OUT = socketTimeOut; + } static { init(); @@ -241,6 +257,13 @@ public static void setProxy(String host, Integer port) { refresh(); } + public static void setProxy(String host, Integer port, String username, String password) { + if (StringUtils.isNotBlank(username)){ + authContext(username, password, host, StandardAuthScheme.BASIC); + } + setProxy(host, port); + } + /** * 请求头构建 * diff --git a/src/test/java/com/luna/common/utils/HttpUtilsTest.java b/src/test/java/com/luna/common/utils/HttpUtilsTest.java index 85a8704f..3ef62744 100644 --- a/src/test/java/com/luna/common/utils/HttpUtilsTest.java +++ b/src/test/java/com/luna/common/utils/HttpUtilsTest.java @@ -1,7 +1,9 @@ package com.luna.common.utils; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.luna.common.net.HttpUtils; +import com.luna.common.net.HttpUtilsConstant; import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.utils.Base64; import org.apache.hc.core5.http.ClassicHttpResponse; @@ -9,11 +11,14 @@ import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.io.HttpClientResponseHandler; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.StringEntity; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Map; import static com.luna.common.net.HttpUtils.*; @@ -53,6 +58,29 @@ public void get_post() { Assert.assertNotNull(responseString); } + @Test + public void get_post_2() { + HttpClientResponseHandler responseHandler = response -> { + return EntityUtils.toString(response.getEntity()); + }; + HttpUtils.setProxy(7890); + Map header = Maps.newHashMap(); + header.put(HttpHeaders.AUTHORIZATION, "Bearer sk-xxxxx"); + header.put(HttpHeaders.CONTENT_TYPE, HttpUtilsConstant.JSON); + + StringEntity stringEntity = new StringEntity("{\n" + + " \"input\": [\n" + + " \"十们代存府出治对提流感形织务文。\"\n" + + " ],\n" + + " \"model\": \"text-moderation-latest\"\n" + + "}", Charset.defaultCharset()); + String responseString = + HttpUtils.doPost("https://api.openai.com", "/v1/moderations", header, + null,stringEntity, responseHandler); + System.out.println(responseString); + Assert.assertNotNull(responseString); + } + @Test public void proxy_test() { HttpUtils.setProxy("127.0.0.1", 7890); From a8f126b53d9472b8a8ad38e98f4555d8504a83e4 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Fri, 28 Apr 2023 20:15:46 +0800 Subject: [PATCH 4/8] :bookmark: sse async request --- pom.xml | 13 ++- .../java/com/luna/common/net/HttpUtils.java | 56 +++++----- .../luna/common/net/HttpUtilsConstant.java | 1 + .../com/luna/common/net/IPAddressUtil.java | 3 + .../async/CustomAbstacktFutureCallback.java | 2 + .../AsyncHttpClientResponseHandler.java | 3 +- .../AsyncValidatingResponseHandler.java | 4 +- .../luna/common/net/high/AsyncHttpUtils.java | 101 ++++++++++++++---- .../luna/common/utils/AsyncHttpUtilsTest.java | 73 +++++++++++-- .../com/luna/common/utils/HttpUtilsTest.java | 10 ++ 10 files changed, 197 insertions(+), 69 deletions(-) diff --git a/pom.xml b/pom.xml index 6910c011..4e164f13 100644 --- a/pom.xml +++ b/pom.xml @@ -23,13 +23,14 @@ 3.12.0 1.15 2.11.0 - 5.2.1 + 5.2.1 31.1-jre 2.10.6 2.0.26 6.1.3 4.13.2 1.18.26 + 2.0.1.Final @@ -73,7 +74,7 @@ org.apache.httpcomponents.client5 httpclient5 - ${httpclient5.version} + ${httpclient-5.version} @@ -108,13 +109,19 @@ javax.validation validation-api - 2.0.1.Final + ${validation-api.version} org.projectlombok lombok ${lombok.version} + + org.slf4j + slf4j-simple + 1.7.36 + compile + diff --git a/src/main/java/com/luna/common/net/HttpUtils.java b/src/main/java/com/luna/common/net/HttpUtils.java index d542d75e..aa7711c6 100644 --- a/src/main/java/com/luna/common/net/HttpUtils.java +++ b/src/main/java/com/luna/common/net/HttpUtils.java @@ -2,12 +2,12 @@ import com.google.common.collect.ImmutableList; import com.luna.common.constant.StrPoolConstant; +import com.luna.common.net.high.AsyncHttpUtils; import com.luna.common.net.method.HttpDelete; import com.luna.common.text.CharsetUtil; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hc.client5.http.ContextBuilder; import org.apache.hc.client5.http.DnsResolver; import org.apache.hc.client5.http.SystemDefaultDnsResolver; import org.apache.hc.client5.http.auth.*; @@ -40,12 +40,10 @@ import org.apache.hc.client5.http.cookie.Cookie; import org.apache.hc.client5.http.entity.mime.HttpMultipartMode; import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; -import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory; import org.apache.hc.core5.http.io.HttpClientResponseHandler; import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.hc.core5.http.io.entity.FileEntity; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.http.message.BasicClassicHttpRequest; import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; @@ -53,7 +51,6 @@ import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.TimeValue; -import org.apache.hc.core5.util.Timeout; import javax.net.ssl.SSLContext; import java.io.File; @@ -71,28 +68,28 @@ */ public class HttpUtils { - public static final HttpClientContext CLIENT_CONTEXT = HttpClientContext.create(); - private static final int MAX_REDIRECTS = 10; - private static CloseableHttpClient httpClient; + public static final HttpClientContext CLIENT_CONTEXT = HttpClientContext.create(); + private static CloseableHttpClient httpClient; - private static volatile HttpClientBuilder httpClientBuilder; + private static final HttpClientBuilder HTTP_CLIENT_BUILDER = HttpClients.custom(); - private static final BasicCookieStore cookieStore = new BasicCookieStore(); + private static final BasicCookieStore COOKIE_STORE = new BasicCookieStore(); + private static final int MAX_REDIRECTS = 10; /** * 最大连接数 */ - public static final int MAX_CONN = 200; + public static final int MAX_CONN = 200; /** * 设置连接建立的超时时间为10s */ - public static int CONNECT_TIMEOUT = 10; + public static int CONNECT_TIMEOUT = 10; - public static int RESPONSE_TIMEOUT = 30; + public static int RESPONSE_TIMEOUT = 30; - public static int MAX_ROUTE = 200; + public static int MAX_ROUTE = 200; - public static int SOCKET_TIME_OUT = 10; + public static int SOCKET_TIME_OUT = 10; public static void setConnectTimeout(int connectTimeout) { CONNECT_TIMEOUT = connectTimeout; @@ -167,26 +164,24 @@ public InetAddress[] resolve(final String host) throws UnknownHostException { .setSoTimeout(SOCKET_TIME_OUT, TimeUnit.SECONDS) .build()); - if (httpClientBuilder == null) { - httpClientBuilder = HttpClients.custom(); - } - httpClientBuilder.setConnectionManager(cm) - .setDefaultRequestConfig(defaultRequestConfig).setDefaultCookieStore(cookieStore); + HTTP_CLIENT_BUILDER.setConnectionManager(cm) + .setDefaultRequestConfig(defaultRequestConfig); - httpClient = httpClientBuilder.build(); + httpClient = HTTP_CLIENT_BUILDER.build(); } public static void refresh() { - httpClient = httpClientBuilder.build(); + HTTP_CLIENT_BUILDER.setDefaultCookieStore(COOKIE_STORE); + httpClient = HTTP_CLIENT_BUILDER.build(); } public void addRequestInterceptorFirst(HttpRequestInterceptor httpRequestInterceptor) { - httpClientBuilder.addRequestInterceptorFirst(httpRequestInterceptor); + HTTP_CLIENT_BUILDER.addRequestInterceptorFirst(httpRequestInterceptor); refresh(); } public void addExecInterceptorAfter(final String existing, final String name, final ExecChainHandler interceptor) { - httpClientBuilder.addExecInterceptorAfter(existing, name, interceptor); + HTTP_CLIENT_BUILDER.addExecInterceptorAfter(existing, name, interceptor); refresh(); } @@ -237,7 +232,7 @@ public static void authContext(String userName, String password, String host, St } public static void setProxy(Integer port) { - setProxy("127.0.0.1", port); + setProxy(IPAddressUtil.LOCAL_HOST, port); } /** @@ -251,14 +246,13 @@ public static void setProxy(String host, Integer port) { // for proxy debug HttpHost proxy = new HttpHost(host, port); DefaultProxyRoutePlanner defaultProxyRoutePlanner = new DefaultProxyRoutePlanner(proxy); - httpClientBuilder.setRoutePlanner(defaultProxyRoutePlanner) - .setDefaultCookieStore(cookieStore); + HTTP_CLIENT_BUILDER.setRoutePlanner(defaultProxyRoutePlanner); refresh(); } public static void setProxy(String host, Integer port, String username, String password) { - if (StringUtils.isNotBlank(username)){ + if (StringUtils.isNotBlank(username)) { authContext(username, password, host, StandardAuthScheme.BASIC); } setProxy(host, port); @@ -285,19 +279,19 @@ public static void builderHeader(Map headers, AsyncRequestBuilde } public static List getCookie() { - return cookieStore.getCookies(); + return COOKIE_STORE.getCookies(); } public static void addCookie(Cookie cookie) { - cookieStore.addCookie(cookie); + COOKIE_STORE.addCookie(cookie); } public static void addCookie(List cookies) { - cookies.forEach(cookieStore::addCookie); + cookies.forEach(COOKIE_STORE::addCookie); } public static void addCookie(Cookie... cookies) { - Arrays.stream(cookies).forEach(cookieStore::addCookie); + Arrays.stream(cookies).forEach(COOKIE_STORE::addCookie); } private static T doRequest(HttpClientResponseHandler responseHandler, HttpUriRequestBase request) { diff --git a/src/main/java/com/luna/common/net/HttpUtilsConstant.java b/src/main/java/com/luna/common/net/HttpUtilsConstant.java index d134586a..d5f7c3c1 100644 --- a/src/main/java/com/luna/common/net/HttpUtilsConstant.java +++ b/src/main/java/com/luna/common/net/HttpUtilsConstant.java @@ -21,4 +21,5 @@ public interface HttpUtilsConstant { String TEXT = "text/html; charset=UTF-8"; String OCTET_STREAM = "application/octet-stream"; + } diff --git a/src/main/java/com/luna/common/net/IPAddressUtil.java b/src/main/java/com/luna/common/net/IPAddressUtil.java index 97bda317..eb8ca19e 100644 --- a/src/main/java/com/luna/common/net/IPAddressUtil.java +++ b/src/main/java/com/luna/common/net/IPAddressUtil.java @@ -9,6 +9,9 @@ import java.util.Arrays; public class IPAddressUtil { + + public static final String LOCAL_HOST = "127.0.0.1"; + private static final int INADDR4SZ = 4; private static final int INADDR16SZ = 16; private static final int INT16SZ = 2; diff --git a/src/main/java/com/luna/common/net/async/CustomAbstacktFutureCallback.java b/src/main/java/com/luna/common/net/async/CustomAbstacktFutureCallback.java index 999b1cc1..d993e360 100644 --- a/src/main/java/com/luna/common/net/async/CustomAbstacktFutureCallback.java +++ b/src/main/java/com/luna/common/net/async/CustomAbstacktFutureCallback.java @@ -1,5 +1,6 @@ package com.luna.common.net.async; +import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.hc.core5.concurrent.FutureCallback; @@ -13,6 +14,7 @@ public abstract class CustomAbstacktFutureCallback implements FutureCallback< @Override public void completed(T result) { + log.info("completed::result = {}", JSON.toJSONString(result)); } @Override diff --git a/src/main/java/com/luna/common/net/hander/AsyncHttpClientResponseHandler.java b/src/main/java/com/luna/common/net/hander/AsyncHttpClientResponseHandler.java index 9a28dede..7c0fa1c8 100644 --- a/src/main/java/com/luna/common/net/hander/AsyncHttpClientResponseHandler.java +++ b/src/main/java/com/luna/common/net/hander/AsyncHttpClientResponseHandler.java @@ -31,6 +31,7 @@ import com.luna.common.net.async.CustomAsyncHttpResponse; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; /** * Handler that encapsulates the process of generating a response object @@ -51,6 +52,6 @@ public interface AsyncHttpClientResponseHandler { * * @throws IOException in case of a problem or the connection was aborted */ - void handleResponse(CustomAsyncHttpResponse response) ; + void handleResponse(R response) ; } diff --git a/src/main/java/com/luna/common/net/hander/AsyncValidatingResponseHandler.java b/src/main/java/com/luna/common/net/hander/AsyncValidatingResponseHandler.java index 20f80554..9f201438 100644 --- a/src/main/java/com/luna/common/net/hander/AsyncValidatingResponseHandler.java +++ b/src/main/java/com/luna/common/net/hander/AsyncValidatingResponseHandler.java @@ -36,7 +36,7 @@ * @author luna */ @Slf4j -public abstract class AsyncValidatingResponseHandler implements AsyncHttpClientResponseHandler { +public abstract class AsyncValidatingResponseHandler implements AsyncHttpClientResponseHandler { protected void validateResponse(HttpResponse response) { String reasonPhrase = response.getReasonPhrase(); @@ -49,7 +49,7 @@ protected void validateResponse(HttpResponse response) { } @Override - public void handleResponse(CustomAsyncHttpResponse response) { + public void handleResponse(R response) { validateResponse(response); } } \ No newline at end of file diff --git a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java index 3f35bd92..4e4c3f7e 100644 --- a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java +++ b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java @@ -34,10 +34,11 @@ import java.util.Map; import java.util.concurrent.Future; import com.luna.common.net.HttpUtils; -import com.luna.common.net.async.CustomAbstacktFutureCallback; +import com.luna.common.net.IPAddressUtil; import com.luna.common.net.async.CustomAsyncHttpResponse; import com.luna.common.net.async.CustomResponseConsumer; import com.luna.common.net.hander.AsyncHttpClientResponseHandler; +import lombok.SneakyThrows; import org.apache.hc.client5.http.auth.CredentialsProvider; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.TlsConfig; @@ -47,18 +48,21 @@ import org.apache.hc.client5.http.impl.auth.CredentialsProviderBuilder; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.nio.AsyncEntityProducer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.entity.FileEntityProducer; import org.apache.hc.core5.http.nio.entity.PathEntityProducer; import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; import org.apache.hc.core5.http.ssl.TLS; import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; @@ -79,9 +83,34 @@ public class AsyncHttpUtils { asyncClient.start(); } + @SneakyThrows + public static void refresh() { + asyncClient = HTTP_ASYNC_CLIENT_BUILDER.build(); + asyncClient.start(); + } + + + public static void setProxy(Integer port) { + setProxy(IPAddressUtil.LOCAL_HOST, port); + } + + /** + * 使用代理访问 + * + * @param host 代理地址 + * @param port 代理端口 + * @return + */ + public static void setProxy(String host, Integer port) { + // for proxy debug + HttpHost proxy = new HttpHost(host, port); + AsyncHttpUtils.HTTP_ASYNC_CLIENT_BUILDER.setProxy(proxy); + refresh(); + } + @PreDestroy - public void destroy() throws IOException { - asyncClient.close(); + public static void destroy() throws IOException { + asyncClient.close(CloseMode.GRACEFUL); } public static void init() { @@ -155,49 +184,77 @@ public static CustomAsyncHttpResponse doPost(String host, String path, Map queries, Path file, AsyncHttpClientResponseHandler responseHandler) throws IOException { AsyncRequestProducer producer = getProducer(host, path, headers, queries, new PathEntityProducer(file, StandardOpenOption.READ), Method.POST.toString()); - return doAsyncRequest(responseHandler, producer); + return doAsyncRequest(producer, responseHandler); } public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, Map queries, File file, AsyncHttpClientResponseHandler responseHandler) { AsyncRequestProducer producer = getProducer(host, path, headers, queries, new FileEntityProducer(file), Method.POST.toString()); - return doAsyncRequest(responseHandler, producer); + return doAsyncRequest(producer, responseHandler); } public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, Map queries, String body, AsyncHttpClientResponseHandler responseHandler) { AsyncRequestProducer producer = getProducer(host, path, headers, queries, new StringAsyncEntityProducer(body), Method.POST.toString()); - return doAsyncRequest(responseHandler, producer); + return doAsyncRequest(producer, responseHandler); } public static CustomAsyncHttpResponse doGet(String host, String path, Map headers, Map queries, AsyncHttpClientResponseHandler responseHandler) { - AsyncRequestProducer producer = getProducer(host, path, headers, queries, null, Method.GET.toString()); - return doAsyncRequest(responseHandler, producer); + AsyncRequestProducer producer = getProducer(host, path, headers, queries, Method.GET.toString()); + return doAsyncRequest(producer, responseHandler); + } + + public static CustomAsyncHttpResponse doPost(String host, String path, Map headers, + Map queries, AsyncHttpClientResponseHandler responseHandler) { + AsyncRequestProducer producer = getProducer(host, path, headers, queries, Method.POST.toString()); + return doAsyncRequest(producer, responseHandler); + } + + public static AsyncRequestProducer getProducer(String host, String path, Map headers, Map queries, + String method) { + return getProducer(host, path, headers, queries, null, method); } - private static AsyncRequestProducer getProducer(String host, String path, Map headers, Map queries, - AsyncEntityProducer entityProducer, - String method) { + public static AsyncRequestProducer getProducer(String host, String path, Map headers, Map queries, + AsyncEntityProducer entityProducer, String method) { AsyncRequestBuilder builder = AsyncRequestBuilder.create(method); builder.setHttpHost(HttpHighLevelUtil.getHost(host)); builder.setUri(HttpUtils.buildUrl(host, path, queries)); HttpUtils.builderHeader(headers, builder); - builder.setEntity(entityProducer); + if (entityProducer != null) { + builder.setEntity(entityProducer); + } return builder.build(); } - private static CustomAsyncHttpResponse doAsyncRequest(AsyncHttpClientResponseHandler responseHandler, AsyncRequestProducer producer) { - final Future future; + public static CustomAsyncHttpResponse doAsyncRequest(AsyncRequestProducer producer, AsyncHttpClientResponseHandler responseHandler) { + return doAsyncRequest(producer, new FutureCallback() { + @Override + public void completed(CustomAsyncHttpResponse result) { + responseHandler.handleResponse(result); + } + + @Override + public void failed(Exception ex) { + throw new RuntimeException(ex); + } + + @Override + public void cancelled() { + throw new RuntimeException("cancelled"); + } + }); + } + + public static CustomAsyncHttpResponse doAsyncRequest(AsyncRequestProducer producer, FutureCallback callback) { + return doAsyncRequest(producer, CustomResponseConsumer.create(), callback); + } + + public static T doAsyncRequest(AsyncRequestProducer producer, AsyncResponseConsumer consumer, FutureCallback callback) { + final Future future; try { - future = asyncClient.execute( - producer, - CustomResponseConsumer.create(), HttpUtils.CLIENT_CONTEXT, new CustomAbstacktFutureCallback() { - @Override - public void completed(CustomAsyncHttpResponse result) { - responseHandler.handleResponse(result); - } - }); + future = asyncClient.execute(producer, consumer, HttpUtils.CLIENT_CONTEXT, callback); return future.get(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java b/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java index 675159fa..9323e40c 100644 --- a/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java +++ b/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java @@ -1,19 +1,48 @@ package com.luna.common.utils; import com.alibaba.fastjson2.JSON; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.luna.common.net.HttpUtils; +import com.luna.common.net.HttpUtilsConstant; import com.luna.common.net.async.CustomAbstacktFutureCallback; import com.luna.common.net.async.CustomAsyncHttpResponse; +import com.luna.common.net.async.CustomResponseConsumer; +import com.luna.common.net.async.CustomSseAsyncConsumer; import com.luna.common.net.hander.AsyncValidatingResponseHandler; +import com.luna.common.net.hander.LoggingEventSourceListener; import com.luna.common.net.high.AsyncHttpUtils; +import com.luna.common.net.method.SseRequest; +import com.luna.common.net.sse.Event; +import com.luna.common.net.sse.SseEntity; +import com.luna.common.net.sse.SseResponse; +import lombok.extern.slf4j.Slf4j; import org.apache.hc.client5.http.async.methods.*; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.impl.auth.CredentialsProviderBuilder; -import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.*; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.CloseMode; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.net.URI; +import java.nio.CharBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -21,6 +50,7 @@ * @description * @date 2023/4/22 */ +@Slf4j public class AsyncHttpUtilsTest { @Test @@ -40,9 +70,9 @@ public void async_test() throws Exception { CustomAsyncHttpResponse httpResponse = AsyncHttpUtils.doGet("http://httpbin.org", requestUri, null, null, new AsyncValidatingResponseHandler() { @Override - public void handleResponse(CustomAsyncHttpResponse response) { - // call back do it - System.out.println(response.getBodyText()); + public void handleResponse(R response) { + CustomAsyncHttpResponse response1 = (CustomAsyncHttpResponse) response; + System.out.println(response1.getBodyText()); } }); @@ -54,18 +84,41 @@ public void handleResponse(CustomAsyncHttpResponse response) { @Test public void auth_test() { - AsyncHttpUtils.setAuth("httpbin.org", "user", "passwd"); CustomAsyncHttpResponse httpResponse = AsyncHttpUtils.doGet("http://httpbin.org", "/basic-auth/user/passwd", null, null, new AsyncValidatingResponseHandler() {}); System.out.println(JSON.toJSONString(httpResponse.getBodyText())); } - public static class CustomFutureCallback extends CustomAbstacktFutureCallback { - @Override - public void completed(String result) { - System.out.println(result); - } + @Test + public void atest() { + AsyncHttpUtils.setProxy(7890); + Map header = Maps.newHashMap(); + header.put(HttpHeaders.AUTHORIZATION, "Bearer sk-xxxx"); + header.put(HttpHeaders.CONTENT_TYPE, HttpUtilsConstant.JSON); + + StringAsyncEntityProducer stringAsyncEntityProducer = new StringAsyncEntityProducer("{\"temperature\":0,\"model\":\"text-davinci-003\",\"prompt\":\"Say this is a test\",\"stream\":true,\"max_tokens\":7}"); + AsyncRequestProducer producer = AsyncHttpUtils.getProducer("https://api.openai.com", "/v1/completions", header, new HashMap<>(), stringAsyncEntityProducer, Method.POST.toString()); + + CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(new LoggingEventSourceListener<>()); + + AsyncHttpUtils.doAsyncRequest(producer, customSseAsyncConsumer, new CustomAbstacktFutureCallback(){}); + } + + @Test + public void test_sse() { + CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(new LoggingEventSourceListener<>()); + + ImmutableMap map = ImmutableMap.of(); + AsyncRequestProducer producer = AsyncHttpUtils.getProducer("http://localhost:6060", "/stream-sse-mvc", map, new HashMap<>(), Method.GET.toString()); + + SseResponse sseResponse = AsyncHttpUtils.doAsyncRequest(producer, customSseAsyncConsumer, new CustomAbstacktFutureCallback() { + @Override + public void completed(SseResponse result) { + super.completed(result); + } + }); + System.out.println(JSON.toJSONString(sseResponse)); } public static void main(final String[] args) throws Exception { diff --git a/src/test/java/com/luna/common/utils/HttpUtilsTest.java b/src/test/java/com/luna/common/utils/HttpUtilsTest.java index 3ef62744..898b9a7e 100644 --- a/src/test/java/com/luna/common/utils/HttpUtilsTest.java +++ b/src/test/java/com/luna/common/utils/HttpUtilsTest.java @@ -5,6 +5,7 @@ import com.luna.common.net.HttpUtils; import com.luna.common.net.HttpUtilsConstant; import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.impl.classic.BasicHttpClientResponseHandler; import org.apache.hc.client5.http.utils.Base64; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.HttpEntity; @@ -38,6 +39,15 @@ public void test1Get() throws Exception { Assert.assertNotNull(responseString); } + @Test + public void sse_test() { + + String host = "https://www.w3schools.com"; + String path = "/html/demo_sse.php"; + String s = doGet(host, path, null, null, new BasicHttpClientResponseHandler()); + System.out.println(s); + } + @Test public void get_test() { HttpClientResponseHandler responseHandler = response -> { From 15a51f00cb8292d01838e9fd7f4be8092cca874b Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Fri, 28 Apr 2023 20:16:11 +0800 Subject: [PATCH 5/8] :bookmark: sse async request --- .../net/async/CustomSseAsyncConsumer.java | 94 +++++++++++++ .../hander/AbstactEventFutureCallback.java | 33 +++++ .../net/hander/EventSourceListener.java | 19 +++ .../hander/LoggingEventSourceListener.java | 27 ++++ .../luna/common/net/method/SseRequest.java | 23 +++ .../java/com/luna/common/net/sse/Event.java | 55 ++++++++ .../com/luna/common/net/sse/SseEntity.java | 132 ++++++++++++++++++ .../com/luna/common/net/sse/SseResponse.java | 25 ++++ 8 files changed, 408 insertions(+) create mode 100644 src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java create mode 100644 src/main/java/com/luna/common/net/hander/AbstactEventFutureCallback.java create mode 100644 src/main/java/com/luna/common/net/hander/EventSourceListener.java create mode 100644 src/main/java/com/luna/common/net/hander/LoggingEventSourceListener.java create mode 100644 src/main/java/com/luna/common/net/method/SseRequest.java create mode 100644 src/main/java/com/luna/common/net/sse/Event.java create mode 100644 src/main/java/com/luna/common/net/sse/SseEntity.java create mode 100644 src/main/java/com/luna/common/net/sse/SseResponse.java diff --git a/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java new file mode 100644 index 00000000..9f4ae92f --- /dev/null +++ b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java @@ -0,0 +1,94 @@ +package com.luna.common.net.async; + +import com.luna.common.net.hander.AbstactEventFutureCallback; +import com.luna.common.net.hander.EventSourceListener; +import com.luna.common.net.sse.Event; +import com.luna.common.net.sse.SseResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; + +import java.io.IOException; +import java.nio.CharBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author luna + * @description + * @date 2023/4/28 + */ +@Slf4j +public class CustomSseAsyncConsumer extends AbstractCharResponseConsumer { + + private SseResponse response; + + private final BlockingQueue events; + + private final AbstactEventFutureCallback callback; + + private final Integer timeWait; + + public CustomSseAsyncConsumer(Integer capacity, AbstactEventFutureCallback callback, Integer timeWait) { + this.events = new ArrayBlockingQueue<>(capacity); + this.callback = callback; + this.timeWait = timeWait; + } + + public CustomSseAsyncConsumer(FutureCallback listener) { + this.events = new ArrayBlockingQueue<>(100); + this.callback = new AbstactEventFutureCallback() { + @Override + public void onEvent(Event result) { + listener.completed(result); + } + }; + this.timeWait = 1000; + } + + @Override + protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException { + // = onOpen + this.response = new SseResponse(response, contentType); + this.response.getEntity().setEvents(events); + } + + @Override + protected int capacityIncrement() { + return Integer.MAX_VALUE; + } + + @Override + protected void data(CharBuffer data, boolean endOfStream) throws IOException { + // 这是一条记录 + response.getEntity().pushBuffer(data, endOfStream); + // = onEvent + try { + Event poll = events.poll(timeWait, TimeUnit.MILLISECONDS); + if (poll != null) { + callback.onEvent(poll); + } + } catch (InterruptedException e) { + log.error("data::data = {}, endOfStream = {} ", data, endOfStream, e); + } + } + + @Override + protected SseResponse buildResult() throws IOException { + return response; + } + + @Override + public void releaseResources() { + callback.completed(response); + } + + @Override + public void failed(Exception cause) { + callback.failed(cause); + } +} diff --git a/src/main/java/com/luna/common/net/hander/AbstactEventFutureCallback.java b/src/main/java/com/luna/common/net/hander/AbstactEventFutureCallback.java new file mode 100644 index 00000000..944df1b9 --- /dev/null +++ b/src/main/java/com/luna/common/net/hander/AbstactEventFutureCallback.java @@ -0,0 +1,33 @@ +package com.luna.common.net.hander; + +import com.alibaba.fastjson2.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.hc.core5.concurrent.FutureCallback; + +/** + * @author luna + * @description + * @date 2023/4/28 + */ +@Slf4j +public abstract class AbstactEventFutureCallback implements FutureCallback { + + public void onEvent(E result) { + log.info("onEvent::result = {}", JSON.toJSONString(result)); + } + + @Override + public void completed(T result) { + log.info("completed::result = {}", JSON.toJSONString(result)); + } + + @Override + public void failed(Exception ex) { + + } + + @Override + public void cancelled() { + + } +} diff --git a/src/main/java/com/luna/common/net/hander/EventSourceListener.java b/src/main/java/com/luna/common/net/hander/EventSourceListener.java new file mode 100644 index 00000000..27cb9f34 --- /dev/null +++ b/src/main/java/com/luna/common/net/hander/EventSourceListener.java @@ -0,0 +1,19 @@ +package com.luna.common.net.hander; + +import com.luna.common.net.sse.Event; +import com.luna.common.net.sse.SseResponse; +import org.apache.hc.core5.concurrent.FutureCallback; + +public interface EventSourceListener extends FutureCallback { + + /** + * 单个事件 + * @param event + */ + void onEvent(T event); + + + void failed(Exception ex); + + void cancelled(); +} diff --git a/src/main/java/com/luna/common/net/hander/LoggingEventSourceListener.java b/src/main/java/com/luna/common/net/hander/LoggingEventSourceListener.java new file mode 100644 index 00000000..1f53c60b --- /dev/null +++ b/src/main/java/com/luna/common/net/hander/LoggingEventSourceListener.java @@ -0,0 +1,27 @@ +package com.luna.common.net.hander; + +import com.alibaba.fastjson2.JSON; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class LoggingEventSourceListener extends AbstactEventFutureCallback { + + @Override + public void onEvent(E result) { + log.info("onEvent::result = {}", JSON.toJSONString(result)); + } + + @Override + public void completed(T result) { + } + + @Override + public void failed(Exception ex) { + super.failed(ex); + } + + @Override + public void cancelled() { + super.cancelled(); + } +} diff --git a/src/main/java/com/luna/common/net/method/SseRequest.java b/src/main/java/com/luna/common/net/method/SseRequest.java new file mode 100644 index 00000000..ab4174cb --- /dev/null +++ b/src/main/java/com/luna/common/net/method/SseRequest.java @@ -0,0 +1,23 @@ +package com.luna.common.net.method; + + +import org.apache.hc.client5.http.classic.methods.HttpGet; + +import java.net.URI; + +/** + * Allows us to set the correct Accept header automatically and always use HTTP GET. + */ +public class SseRequest extends HttpGet { + + + public SseRequest(URI uri) { + super(uri); + addHeader("Accept", "text/event-stream"); + } + + public SseRequest(String uri) { + super(uri); + addHeader("Accept", "text/event-stream"); + } +} diff --git a/src/main/java/com/luna/common/net/sse/Event.java b/src/main/java/com/luna/common/net/sse/Event.java new file mode 100644 index 00000000..008bbd9f --- /dev/null +++ b/src/main/java/com/luna/common/net/sse/Event.java @@ -0,0 +1,55 @@ +package com.luna.common.net.sse; + +public class Event { + + private final String id; + private final String event; + private final String data; + private final int retry; + + public Event(String id, String event, String data, int retry) { + this.id = id; + this.event = event; + this.data = data; + this.retry = retry; + } + + public String getId() { + return id; + } + + public String getEvent() { + return event; + } + + public String getData() { + return data; + } + + public int getRetry() { + return retry; + } + + @Override + public String toString() { + StringBuilder eventString = new StringBuilder(); + if (id != null && id.length() > 0) { + eventString.append("id: "); + eventString.append(id); + } + if (event != null && event.length() > 0) { + eventString.append("\nevent: "); + eventString.append(event); + } + if (data != null && data.length() > 0) { + eventString.append("\ndata: "); + eventString.append(data); + } + if (retry != 0) { + eventString.append("\nretry: "); + eventString.append(retry); + } + return eventString.toString(); + } + +} diff --git a/src/main/java/com/luna/common/net/sse/SseEntity.java b/src/main/java/com/luna/common/net/sse/SseEntity.java new file mode 100644 index 00000000..2b1ed850 --- /dev/null +++ b/src/main/java/com/luna/common/net/sse/SseEntity.java @@ -0,0 +1,132 @@ +package com.luna.common.net.sse; + +import com.google.common.base.Splitter; +import com.luna.common.io.IoUtil; +import com.luna.common.text.CharsetUtil; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +public class SseEntity extends AbstractHttpEntity { + + private BlockingQueue events = new ArrayBlockingQueue<>(100); + private StringBuilder currentEvent = new StringBuilder(); + + private final StringBuilder allEvent = new StringBuilder(); + private int newLineCount = 0; + private String lastEventId; + + public StringBuilder getAllEvent() { + return allEvent; + } + + public void setEvents(BlockingQueue events) { + this.events = events; + } + + public SseEntity(ContentType contentType) { + super(contentType, CharsetUtil.defaultCharsetName()); + } + + public void pushBuffer(CharBuffer buf, boolean endOfStream) { + while (buf.hasRemaining()) { + processChar(buf.get()); + } + } + + private void processChar(char nextChar) { + if (nextChar == '\n') { + newLineCount++; + } else { + newLineCount = 0; + } + if (newLineCount > 1) { + processCurrentEvent(); + currentEvent = new StringBuilder(); + } else { + currentEvent.append(nextChar); + } + } + + // Parse raw data for each event to create processed event object + // Parsing specification - https://www.w3.org/TR/eventsource/#parsing-an-event-stream + private void processCurrentEvent() { + String rawEvent = currentEvent.toString(); + String id = ""; + String event = ""; + int retry = 0; + StringBuilder data = new StringBuilder(); + List list = Splitter.on("\n").splitToList(rawEvent); + for (String[] lineTokens : list.stream().map(s -> s.split(":", 2)).collect(Collectors.toList())) { + switch (lineTokens[0]) { + case "id": + id = lineTokens[1].trim(); + break; + case "event": + event = lineTokens[1].trim(); + break; + case "retry": + retry = Integer.parseInt(lineTokens[1].trim()); + break; + case "data": + data.append(lineTokens[1].trim()); + break; + } + } + events.offer(new Event(id, event, data.toString(), retry)); + currentEvent = new StringBuilder(); + allEvent.append(rawEvent); + newLineCount = 0; + lastEventId = id; + } + + public BlockingQueue getEvents() { + return events; + } + + public boolean hasMoreEvents() { + return events.size() > 0; + } + + public String getLastEventId() { + return lastEventId; + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public long getContentLength() { + return allEvent.length(); + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + return IoUtil.toStream(allEvent.toString(), Charset.defaultCharset()); + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + IoUtil.writeObjects(outStream, true, allEvent); + } + + @Override + public boolean isStreaming() { + return true; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/src/main/java/com/luna/common/net/sse/SseResponse.java b/src/main/java/com/luna/common/net/sse/SseResponse.java new file mode 100644 index 00000000..5bcf2c8c --- /dev/null +++ b/src/main/java/com/luna/common/net/sse/SseResponse.java @@ -0,0 +1,25 @@ +package com.luna.common.net.sse; + +import org.apache.hc.core5.http.*; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; + +public class SseResponse extends BasicClassicHttpResponse { + + private final HttpResponse original; + private final SseEntity entity; + + public SseResponse(HttpResponse original, ContentType contentType) { + super(original.getCode()); + this.original = original; + this.entity = new SseEntity(contentType); + } + + @Override + public SseEntity getEntity() { + return entity; + } + + public HttpResponse getOriginal() { + return original; + } +} From f1175042730534692fe71c2c01a40b1d799a97fe Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Fri, 28 Apr 2023 21:48:33 +0800 Subject: [PATCH 6/8] :bookmark: sse async request --- .../net/async/CustomSseAsyncConsumer.java | 15 ++++++++++++++- .../net/hander/EventSourceListener.java | 19 ------------------- .../luna/common/utils/AsyncHttpUtilsTest.java | 4 ++-- 3 files changed, 16 insertions(+), 22 deletions(-) delete mode 100644 src/main/java/com/luna/common/net/hander/EventSourceListener.java diff --git a/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java index 9f4ae92f..e09d0365 100644 --- a/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java +++ b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java @@ -1,7 +1,7 @@ package com.luna.common.net.async; import com.luna.common.net.hander.AbstactEventFutureCallback; -import com.luna.common.net.hander.EventSourceListener; +import com.luna.common.net.hander.LoggingEventSourceListener; import com.luna.common.net.sse.Event; import com.luna.common.net.sse.SseResponse; import lombok.extern.slf4j.Slf4j; @@ -33,12 +33,25 @@ public class CustomSseAsyncConsumer extends AbstractCharResponseConsumer callback) { + this.events = new ArrayBlockingQueue<>(100); + this.callback = callback; + this.timeWait = 1000; + } + + public CustomSseAsyncConsumer(Integer capacity, AbstactEventFutureCallback callback, Integer timeWait) { this.events = new ArrayBlockingQueue<>(capacity); this.callback = callback; this.timeWait = timeWait; } + public CustomSseAsyncConsumer() { + this.events = new ArrayBlockingQueue<>(100); + this.callback = new LoggingEventSourceListener<>(); + this.timeWait = 1000; + } + public CustomSseAsyncConsumer(FutureCallback listener) { this.events = new ArrayBlockingQueue<>(100); this.callback = new AbstactEventFutureCallback() { diff --git a/src/main/java/com/luna/common/net/hander/EventSourceListener.java b/src/main/java/com/luna/common/net/hander/EventSourceListener.java deleted file mode 100644 index 27cb9f34..00000000 --- a/src/main/java/com/luna/common/net/hander/EventSourceListener.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.luna.common.net.hander; - -import com.luna.common.net.sse.Event; -import com.luna.common.net.sse.SseResponse; -import org.apache.hc.core5.concurrent.FutureCallback; - -public interface EventSourceListener extends FutureCallback { - - /** - * 单个事件 - * @param event - */ - void onEvent(T event); - - - void failed(Exception ex); - - void cancelled(); -} diff --git a/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java b/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java index 9323e40c..a0691806 100644 --- a/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java +++ b/src/test/java/com/luna/common/utils/AsyncHttpUtilsTest.java @@ -100,14 +100,14 @@ public void atest() { StringAsyncEntityProducer stringAsyncEntityProducer = new StringAsyncEntityProducer("{\"temperature\":0,\"model\":\"text-davinci-003\",\"prompt\":\"Say this is a test\",\"stream\":true,\"max_tokens\":7}"); AsyncRequestProducer producer = AsyncHttpUtils.getProducer("https://api.openai.com", "/v1/completions", header, new HashMap<>(), stringAsyncEntityProducer, Method.POST.toString()); - CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(new LoggingEventSourceListener<>()); + CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(); AsyncHttpUtils.doAsyncRequest(producer, customSseAsyncConsumer, new CustomAbstacktFutureCallback(){}); } @Test public void test_sse() { - CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(new LoggingEventSourceListener<>()); + CustomSseAsyncConsumer customSseAsyncConsumer = new CustomSseAsyncConsumer(); ImmutableMap map = ImmutableMap.of(); AsyncRequestProducer producer = AsyncHttpUtils.getProducer("http://localhost:6060", "/stream-sse-mvc", map, new HashMap<>(), Method.GET.toString()); From c431349827b9d4f6b779fc86819d023c5dc6cce4 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sat, 29 Apr 2023 01:35:30 +0800 Subject: [PATCH 7/8] :bookmark: sse async request --- .../luna/common/net/async/CustomSseAsyncConsumer.java | 1 + .../java/com/luna/common/net/high/AsyncHttpUtils.java | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java index e09d0365..e38d5caa 100644 --- a/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java +++ b/src/main/java/com/luna/common/net/async/CustomSseAsyncConsumer.java @@ -83,6 +83,7 @@ protected void data(CharBuffer data, boolean endOfStream) throws IOException { try { Event poll = events.poll(timeWait, TimeUnit.MILLISECONDS); if (poll != null) { + log.info("data::data = {}, endOfStream = {}", poll, endOfStream); callback.onEvent(poll); } } catch (InterruptedException e) { diff --git a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java index 4e4c3f7e..2815e8b1 100644 --- a/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java +++ b/src/main/java/com/luna/common/net/high/AsyncHttpUtils.java @@ -39,7 +39,9 @@ import com.luna.common.net.async.CustomResponseConsumer; import com.luna.common.net.hander.AsyncHttpClientResponseHandler; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.auth.StandardAuthScheme; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; @@ -89,6 +91,13 @@ public static void refresh() { asyncClient.start(); } + public static void setProxy(String host, Integer port, String username, String password) { + if (StringUtils.isNotBlank(username)) { + setAuth(host,username, password); + } + setProxy(host, port); + } + public static void setProxy(Integer port) { setProxy(IPAddressUtil.LOCAL_HOST, port); From f1e40f978463f9fd6dc7c63cac35c60d5a166231 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sat, 29 Apr 2023 01:35:56 +0800 Subject: [PATCH 8/8] :bookmark: 2.4.4 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4e164f13..5e2e335e 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.github.lunasaw luna-common luna-common - 2.4.3 + 2.4.4 common is project which contains common utils https://github.com/lunasaw/luna-common