想用多线程把数据库的数据写入 elasticsearch
如果发生异常,要立即终止整个循环,所以用了 Future
代码如下
private Result loadDataFromDbIntoEs(Long maxId) {
ExecutorService pool = Executors.newWorkStealingPool();
LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>();
try {
//遍历数据库的表
for (long i = 0; i <= maxId; i += getDbPageSize()) {
//创建任务
Callable<Result> task = createTask(i);
//任务入队
futureQueue.put(pool.submit(task));
//队列超过一定长度后,先执行掉一部分再继续
if (futureQueue.size() > QUEUE_SIZE * 8) {
if ((i % 100000) == 0) {
log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId);
}
//执行一部分任务
checkFuture(futureQueue);
}
}
pool.shutdown();
//处理队列中剩余的任务
while (!futureQueue.isEmpty()) {
checkFuture(futureQueue);
}
log.info("{} - sync complete", esEntityClassName);
} catch (SyncException | InterruptedException | ExecutionException e) {
//throw...
}
return Result.ok();
}
/**
* 创建任务
*/
private Callable<Result> createTask(Long currentId) {
return () -> {
List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1);
if (dbList.isEmpty()) {
//忽略没有数据的 id 区间
return Result.ok();
}
//写入 es
return bulkCreate(dbListToEsList(dbList));
};
}
/**
* 消费任务
*/
private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException {
for (int i = 0; i < QUEUE_SIZE; i++) {
Future<Result> future = futureQueue.poll();
if (future != null) {
Result result = future.get();
if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) {
throw new SyncException(result.getMessage());
}
}
}
}
现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?
1
gosansam 2019-07-12 11:49:43 +08:00
Result result = future.get();
这个获取结果是阻塞的 |
2
zhady009 2019-07-12 13:30:03 +08:00
guava 的 ListeningExecutorService submit 返回 ListenableFuture 应该可以解决
|
3
softtwilight 2019-07-12 13:49:47 +08:00
可以试试用 completableFuture,checkFuture 可以改 join,如果 io 耗时多,Executors.newWorkStealingPool() 的线程可能有点少,completableFuture 也可以自定义线程池
|
4
rykinia OP 谢谢各位的解答。
研究了几天,感觉主要问题还是 es 的阻塞比较久,不过把这里改成了主线程消费 queue 里面的,线程池里异步往 queue 添加,稍微好了点。 |