diff --git a/src/com/skey/evehbase/client/EveHBase.java b/src/com/skey/evehbase/client/EveHBase.java index d5dd9c6..279dba8 100644 --- a/src/com/skey/evehbase/client/EveHBase.java +++ b/src/com/skey/evehbase/client/EveHBase.java @@ -1,5 +1,6 @@ package com.skey.evehbase.client; +import com.skey.evehbase.pool.PoolConf; import com.skey.evehbase.pool.PoolEngine; import com.skey.evehbase.request.*; import com.skey.evehbase.security.SecurityConf; @@ -25,6 +26,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.*; +import java.util.concurrent.TimeUnit; /** * HBase客户端 @@ -163,7 +165,7 @@ public void createIndex(@Nonnull String tableName, @Nonnull String familyName, TableName tn = TableName.valueOf(tableName); // 创建索引实例 - if (indexName == null || indexName.trim().equals("")){ + if (indexName == null || indexName.trim().equals("")) { indexName = tableName + "_" + familyName + "_" + qualifier + "_idx"; } IndexSpecification iSpec = new IndexSpecification(indexName); @@ -201,7 +203,7 @@ public void createIndex(@Nonnull String tableName, @Nonnull String familyName, @Override public void putAsync(@Nonnull String tableName, @Nonnull List putList, PutCallback callback) { - PoolEngine.INSTANCE.execute(() -> { + PoolEngine.getInstance().execute(() -> { try { put(tableName, putList); if (callback != null) callback.onSuccessful(); @@ -248,7 +250,7 @@ public List scan(@Nonnull EveScan eveScan, @Nonnull Class clazz) throw scanner = table.getScanner(eveScan.getScan()); int i = 0; long max = eveScan.getMax(); - Result r ; + Result r; while ((r = scanner.next()) != null && i <= max) { results.add(r); i++; @@ -268,7 +270,7 @@ public List scan(@Nonnull EveScan eveScan, @Nonnull Class clazz) throw @Override public void scanAsync(@Nonnull EveScan eveScan, @Nonnull ResultCallback callback) { - PoolEngine.INSTANCE.execute(() -> { + PoolEngine.getInstance().execute(() -> { try { Class clazz = (Class) GenericUtils.getInterfaceT(callback)[0]; List result = scan(eveScan, clazz); @@ -307,7 +309,7 @@ public List get(@Nonnull EveGet eveGet, @Nonnull Class clazz) throws I @Override public void getAsync(@Nonnull EveGet eveGet, @Nonnull ResultCallback callback) { - PoolEngine.INSTANCE.execute(() -> { + PoolEngine.getInstance().execute(() -> { try { Class clazz = (Class) GenericUtils.getInterfaceT(callback)[0]; List results = get(eveGet, clazz); @@ -342,11 +344,13 @@ public static class Builder { private HashMap confMap = new HashMap<>(); + private PoolConf poolConf; + /** * 直接使用HBase的Configuration对象 * * @param conf org.apache.hadoop.conf.Configuration - * @return EveHBase.Builder() + * @return {@link EveHBase.Builder} */ public Builder config(Configuration conf) { this.conf = conf; @@ -358,7 +362,7 @@ public Builder config(Configuration conf) { * * @param name key * @param value value - * @return EveHBase.Builder() + * @return {@link EveHBase.Builder} */ public Builder config(String name, String value) { this.confMap.put(name, value); @@ -369,15 +373,29 @@ public Builder config(String name, String value) { * 启用安全认证支持 * * @param securityConf 安全认证配置 - * @return EveHBase.Builder() + * @return {@link EveHBase.Builder} */ public Builder enableSafeSupport(SecurityConf securityConf) { this.securityConf = securityConf; return this; } + /** + * 设置线程池配置 + * + * @param core 核心数 + * @param max 最大线程数 + * @param keepAlive 多少时间后关闭多余线程, 单位:毫秒 + * @param queueCapacity 队列容量 + * @return {@link EveHBase.Builder} + */ + public Builder pool(int core, int max, int keepAlive, int queueCapacity) { + poolConf = new PoolConf(core, max, keepAlive, queueCapacity); + return this; + } + public EveHBase build() { - return HBaseClientDirector.create(conf, confMap, securityConf); + return HBaseClientDirector.create(conf, confMap, securityConf, poolConf); } } diff --git a/src/com/skey/evehbase/client/HBaseClientDirector.java b/src/com/skey/evehbase/client/HBaseClientDirector.java index b25ef0c..09a791b 100644 --- a/src/com/skey/evehbase/client/HBaseClientDirector.java +++ b/src/com/skey/evehbase/client/HBaseClientDirector.java @@ -1,5 +1,7 @@ package com.skey.evehbase.client; +import com.skey.evehbase.pool.PoolConf; +import com.skey.evehbase.pool.PoolEngine; import com.skey.evehbase.security.LoginUtil; import com.skey.evehbase.security.SecurityConf; import org.apache.hadoop.conf.Configuration; @@ -33,12 +35,15 @@ public class HBaseClientDirector { * @param securityConf 安全配置 * @return HBase客户端 */ - static EveHBase create(Configuration conf, HashMap confMap, SecurityConf securityConf) { + static EveHBase create(Configuration conf, HashMap confMap, + SecurityConf securityConf, PoolConf poolConf) { Connection conn = null; try { + if (poolConf != null) confThreadPool(poolConf); Configuration fixedConf = fixConf(conf, confMap); if (securityConf != null) login(fixedConf, securityConf); conn = ConnectionFactory.createConnection(fixedConf); + } catch (IOException e) { if (LOG.isErrorEnabled()) LOG.error("HBase连接异常", e); } @@ -99,4 +104,21 @@ private static void login(Configuration conf, SecurityConf securityConf) throws } } + /** + * 配置线程池 + * + * @param poolConf 线程池配置 + */ + private static void confThreadPool(PoolConf poolConf) { + requireGTZero(poolConf.core, poolConf.maximum, poolConf.keepAlive, poolConf.queueCapacity); + + PoolEngine.setConf(poolConf.core, poolConf.maximum, poolConf.keepAlive, poolConf.queueCapacity); + } + + private static void requireGTZero(int... nums) { + for (int num : nums) { + if (num <= 0 ) throw new IllegalArgumentException("线程池参数必须大于0!"); + } + } + } diff --git a/src/com/skey/evehbase/pool/PoolConf.java b/src/com/skey/evehbase/pool/PoolConf.java new file mode 100644 index 0000000..604c660 --- /dev/null +++ b/src/com/skey/evehbase/pool/PoolConf.java @@ -0,0 +1,26 @@ +package com.skey.evehbase.pool; + +/** + * 线程池配置 + * + * @author ALion + * @version 2019/5/9 20:09 + */ +public class PoolConf { + + public int core; + + public int maximum; + + public int keepAlive; + + public int queueCapacity; + + public PoolConf(int core, int maximum, int keepAlive, int queueCapacity) { + this.core = core; + this.maximum = maximum; + this.keepAlive = keepAlive; + this.queueCapacity = queueCapacity; + } + +} diff --git a/src/com/skey/evehbase/pool/PoolEngine.java b/src/com/skey/evehbase/pool/PoolEngine.java index 9de4413..d153298 100644 --- a/src/com/skey/evehbase/pool/PoolEngine.java +++ b/src/com/skey/evehbase/pool/PoolEngine.java @@ -11,30 +11,47 @@ * * @author A Lion~ */ -public enum PoolEngine { +public class PoolEngine { - INSTANCE; + private PoolEngine() { - public static final String NAME_FORMAT = "eve-pool-%d"; + } + + public static PoolEngine getInstance() { + return InnerClass.instance; + } + + private static class InnerClass { + private static final PoolEngine instance = new PoolEngine(); + } + + private static final String NAME_FORMAT = "eve-pool-%d"; - public static final int CORE_POOL_SIZE = 4; + private static int corePoolSize = 4; - public static final int MAXIMUM_POOL_SIZE = 8; + private static int maximumPoolSize = 8; - public static final long KEEP_ALIVE_TIME = 100; + private static long keepAliveTime = 100; - public static final int QUEUE_CAPACITY = 1024; + private static int queueCapacity = 1024; + + public static void setConf(int core, int maximum, long keepAlive, int capacity) { + corePoolSize = core; + maximumPoolSize = maximum; + keepAliveTime = keepAlive; + queueCapacity = capacity; + } private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(NAME_FORMAT) .build(); private ExecutorService pool = new ThreadPoolExecutor( - CORE_POOL_SIZE, - MAXIMUM_POOL_SIZE, - KEEP_ALIVE_TIME, + corePoolSize, + maximumPoolSize, + keepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(QUEUE_CAPACITY), + new LinkedBlockingQueue<>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); @@ -47,7 +64,7 @@ public Future submit(Callable callable) { } public void shutdown() { - pool.shutdown(); + if (pool != null) pool.shutdown(); } } diff --git a/src/com/skey/evehbase/request/EveScan.java b/src/com/skey/evehbase/request/EveScan.java index a143b62..fc14807 100644 --- a/src/com/skey/evehbase/request/EveScan.java +++ b/src/com/skey/evehbase/request/EveScan.java @@ -112,7 +112,6 @@ public Builder filter(Filter filter) { * @return {@link EveScan.Builder} */ public Builder limit(long max) { - if (max < 0) throw new IllegalArgumentException("最大返回条数max不能小于0!"); this.max = max; return this; } diff --git a/src/com/skey/evehbase/request/EveScanDirector.java b/src/com/skey/evehbase/request/EveScanDirector.java index b062699..447a3a4 100644 --- a/src/com/skey/evehbase/request/EveScanDirector.java +++ b/src/com/skey/evehbase/request/EveScanDirector.java @@ -33,6 +33,7 @@ private EveScanDirector() { */ static EveScan create(String table, String startRow, String endRow, Map> columnMap, Filter filter, long max) { + if (max < 0) throw new IllegalArgumentException("最大返回条数max不能小于0!"); TableName tableName = TableName.valueOf(table); diff --git a/test/com/skey/evehbase/GetTest.java b/test/com/skey/evehbase/GetTest.java index 1878688..a095862 100644 --- a/test/com/skey/evehbase/GetTest.java +++ b/test/com/skey/evehbase/GetTest.java @@ -104,7 +104,7 @@ public void onFailed(Exception e) { } }); - PoolEngine.INSTANCE.shutdown(); + PoolEngine.getInstance().shutdown(); // client.close();