Skip to content

Commit

Permalink
添加线程池配置逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
AlionSSS committed May 9, 2019
1 parent 7971ffe commit 6b5b791
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 24 deletions.
36 changes: 27 additions & 9 deletions src/com/skey/evehbase/client/EveHBase.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.skey.evehbase.client;

import com.skey.evehbase.pool.PoolConf;
import com.skey.evehbase.pool.PoolEngine;
import com.skey.evehbase.request.*;
import com.skey.evehbase.security.SecurityConf;
Expand All @@ -25,6 +26,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
* HBase客户端
Expand Down Expand Up @@ -163,7 +165,7 @@ public void createIndex(@Nonnull String tableName, @Nonnull String familyName,
TableName tn = TableName.valueOf(tableName);

// 创建索引实例
if (indexName == null || indexName.trim().equals("")){
if (indexName == null || indexName.trim().equals("")) {
indexName = tableName + "_" + familyName + "_" + qualifier + "_idx";
}
IndexSpecification iSpec = new IndexSpecification(indexName);
Expand Down Expand Up @@ -201,7 +203,7 @@ public void createIndex(@Nonnull String tableName, @Nonnull String familyName,

@Override
public void putAsync(@Nonnull String tableName, @Nonnull List<Put> putList, PutCallback callback) {
PoolEngine.INSTANCE.execute(() -> {
PoolEngine.getInstance().execute(() -> {
try {
put(tableName, putList);
if (callback != null) callback.onSuccessful();
Expand Down Expand Up @@ -248,7 +250,7 @@ public <T> List<T> scan(@Nonnull EveScan eveScan, @Nonnull Class<T> clazz) throw
scanner = table.getScanner(eveScan.getScan());
int i = 0;
long max = eveScan.getMax();
Result r ;
Result r;
while ((r = scanner.next()) != null && i <= max) {
results.add(r);
i++;
Expand All @@ -268,7 +270,7 @@ public <T> List<T> scan(@Nonnull EveScan eveScan, @Nonnull Class<T> clazz) throw

@Override
public <T> void scanAsync(@Nonnull EveScan eveScan, @Nonnull ResultCallback<T> callback) {
PoolEngine.INSTANCE.execute(() -> {
PoolEngine.getInstance().execute(() -> {
try {
Class<T> clazz = (Class<T>) GenericUtils.getInterfaceT(callback)[0];
List<T> result = scan(eveScan, clazz);
Expand Down Expand Up @@ -307,7 +309,7 @@ public <T> List<T> get(@Nonnull EveGet eveGet, @Nonnull Class<T> clazz) throws I

@Override
public <T> void getAsync(@Nonnull EveGet eveGet, @Nonnull ResultCallback<T> callback) {
PoolEngine.INSTANCE.execute(() -> {
PoolEngine.getInstance().execute(() -> {
try {
Class<T> clazz = (Class<T>) GenericUtils.getInterfaceT(callback)[0];
List<T> results = get(eveGet, clazz);
Expand Down Expand Up @@ -342,11 +344,13 @@ public static class Builder {

private HashMap<String, String> confMap = new HashMap<>();

private PoolConf poolConf;

/**
* 直接使用HBase的Configuration对象
*
* @param conf org.apache.hadoop.conf.Configuration
* @return EveHBase.Builder()
* @return {@link EveHBase.Builder}
*/
public Builder config(Configuration conf) {
this.conf = conf;
Expand All @@ -358,7 +362,7 @@ public Builder config(Configuration conf) {
*
* @param name key
* @param value value
* @return EveHBase.Builder()
* @return {@link EveHBase.Builder}
*/
public Builder config(String name, String value) {
this.confMap.put(name, value);
Expand All @@ -369,15 +373,29 @@ public Builder config(String name, String value) {
* 启用安全认证支持
*
* @param securityConf 安全认证配置
* @return EveHBase.Builder()
* @return {@link EveHBase.Builder}
*/
public Builder enableSafeSupport(SecurityConf securityConf) {
this.securityConf = securityConf;
return this;
}

/**
* 设置线程池配置
*
* @param core 核心数
* @param max 最大线程数
* @param keepAlive 多少时间后关闭多余线程, 单位:毫秒
* @param queueCapacity 队列容量
* @return {@link EveHBase.Builder}
*/
public Builder pool(int core, int max, int keepAlive, int queueCapacity) {
poolConf = new PoolConf(core, max, keepAlive, queueCapacity);
return this;
}

public EveHBase build() {
return HBaseClientDirector.create(conf, confMap, securityConf);
return HBaseClientDirector.create(conf, confMap, securityConf, poolConf);
}

}
Expand Down
24 changes: 23 additions & 1 deletion src/com/skey/evehbase/client/HBaseClientDirector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.skey.evehbase.client;

import com.skey.evehbase.pool.PoolConf;
import com.skey.evehbase.pool.PoolEngine;
import com.skey.evehbase.security.LoginUtil;
import com.skey.evehbase.security.SecurityConf;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -33,12 +35,15 @@ public class HBaseClientDirector {
* @param securityConf 安全配置
* @return HBase客户端
*/
static EveHBase create(Configuration conf, HashMap<String, String> confMap, SecurityConf securityConf) {
static EveHBase create(Configuration conf, HashMap<String, String> confMap,
SecurityConf securityConf, PoolConf poolConf) {
Connection conn = null;
try {
if (poolConf != null) confThreadPool(poolConf);
Configuration fixedConf = fixConf(conf, confMap);
if (securityConf != null) login(fixedConf, securityConf);
conn = ConnectionFactory.createConnection(fixedConf);

} catch (IOException e) {
if (LOG.isErrorEnabled()) LOG.error("HBase连接异常", e);
}
Expand Down Expand Up @@ -99,4 +104,21 @@ private static void login(Configuration conf, SecurityConf securityConf) throws
}
}

/**
* 配置线程池
*
* @param poolConf 线程池配置
*/
private static void confThreadPool(PoolConf poolConf) {
requireGTZero(poolConf.core, poolConf.maximum, poolConf.keepAlive, poolConf.queueCapacity);

PoolEngine.setConf(poolConf.core, poolConf.maximum, poolConf.keepAlive, poolConf.queueCapacity);
}

private static void requireGTZero(int... nums) {
for (int num : nums) {
if (num <= 0 ) throw new IllegalArgumentException("线程池参数必须大于0!");
}
}

}
26 changes: 26 additions & 0 deletions src/com/skey/evehbase/pool/PoolConf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.skey.evehbase.pool;

/**
* 线程池配置
*
* @author ALion
* @version 2019/5/9 20:09
*/
public class PoolConf {

public int core;

public int maximum;

public int keepAlive;

public int queueCapacity;

public PoolConf(int core, int maximum, int keepAlive, int queueCapacity) {
this.core = core;
this.maximum = maximum;
this.keepAlive = keepAlive;
this.queueCapacity = queueCapacity;
}

}
41 changes: 29 additions & 12 deletions src/com/skey/evehbase/pool/PoolEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,47 @@
*
* @author A Lion~
*/
public enum PoolEngine {
public class PoolEngine {

INSTANCE;
private PoolEngine() {

public static final String NAME_FORMAT = "eve-pool-%d";
}

public static PoolEngine getInstance() {
return InnerClass.instance;
}

private static class InnerClass {
private static final PoolEngine instance = new PoolEngine();
}

private static final String NAME_FORMAT = "eve-pool-%d";

public static final int CORE_POOL_SIZE = 4;
private static int corePoolSize = 4;

public static final int MAXIMUM_POOL_SIZE = 8;
private static int maximumPoolSize = 8;

public static final long KEEP_ALIVE_TIME = 100;
private static long keepAliveTime = 100;

public static final int QUEUE_CAPACITY = 1024;
private static int queueCapacity = 1024;

public static void setConf(int core, int maximum, long keepAlive, int capacity) {
corePoolSize = core;
maximumPoolSize = maximum;
keepAliveTime = keepAlive;
queueCapacity = capacity;
}

private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(NAME_FORMAT)
.build();

private ExecutorService pool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY),
new LinkedBlockingQueue<>(queueCapacity),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

Expand All @@ -47,7 +64,7 @@ public <T> Future<T> submit(Callable<T> callable) {
}

public void shutdown() {
pool.shutdown();
if (pool != null) pool.shutdown();
}

}
1 change: 0 additions & 1 deletion src/com/skey/evehbase/request/EveScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public Builder filter(Filter filter) {
* @return {@link EveScan.Builder}
*/
public Builder limit(long max) {
if (max < 0) throw new IllegalArgumentException("最大返回条数max不能小于0!");
this.max = max;
return this;
}
Expand Down
1 change: 1 addition & 0 deletions src/com/skey/evehbase/request/EveScanDirector.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private EveScanDirector() {
*/
static EveScan create(String table, String startRow, String endRow,
Map<String, Set<String>> columnMap, Filter filter, long max) {
if (max < 0) throw new IllegalArgumentException("最大返回条数max不能小于0!");

TableName tableName = TableName.valueOf(table);

Expand Down
2 changes: 1 addition & 1 deletion test/com/skey/evehbase/GetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void onFailed(Exception e) {
}
});

PoolEngine.INSTANCE.shutdown();
PoolEngine.getInstance().shutdown();

// client.close();

Expand Down

0 comments on commit 6b5b791

Please sign in to comment.