数据导入脚本如下
import time
import sys
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
reload(sys)
sys.setdefaultencoding('utf-8')
def set_mapping(es, index_name = "content_engine", doc_type_name = "en"):
my_mapping = {
"en": {
"properties": {
"a": {
"type": "string"
},
"b": {
"type": "string"
}
}
}
}
create_index = es.indices.create(index = index_name,body = my_mapping)
mapping_index = es.indices.put_mapping(index = index_name, doc_type = doc_type_name, body = my_mapping)
if create_index["acknowledged"] != True or mapping_index["acknowledged"] != True:
print "Index creation failed..."
def set_data(es, input_file, index_name = "content_engine", doc_type_name="en"):
i = 0
count = 0
ACTIONS = []
for line in open(input_file):
fields = line.replace("\r\n", "").replace("\n", "").split("----")
if len(fields) == 2:
a, b = fields
else:
continue
action = {
"_index": index_name,
"_type": doc_type_name,
"_source": {
"a": a,
"b": b,
}
}
i += 1
ACTIONS.append(action)
if (i == 500000):
success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error = True)
count += success
i = 0
ACTIONS = []
success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error=True)
count += success
print("insert %s lines" % count)
if __name__ == '__main__':
es = Elasticsearch(hosts=["127.0.0.1:9200"], timeout=5000)
set_mapping(es)
set_data(es,sys.argv[1])
数据大概 5 个 G 吧,机器配置虚拟机 24G 内存,刚开始无内存泄露现象,这个 Python 脚本的进程内存一直保持 1G 左右的占用,当插入 1600 w,内存开始持续飙升,最后达到 22G ,导致触发 OOM 机制, Python 进程被内核 kill ,差点怀疑人生。。大家在遇到 Python 内存泄露都是怎么定位的?
1
Zuckonit 2016-12-22 09:44:06 +08:00
1 、 gc
2 、 objgraph |
2
yzmm 2016-12-22 09:51:03 +08:00
5w bulk 一次,再不行重新建立下 es 对象试试
|
3
yuankui 2016-12-22 10:14:00 +08:00
没有人对你这么烂的代码感兴趣,这是事实,必须承认.
试试,找个同事或者同学,然后口述你代码逻辑,也许你会自己发现问题~ |
5
yuankui 2016-12-22 10:51:25 +08:00
@firebroo 其实我本意不是说你代码烂.
内存泄露一般出现在循环里面向循环外的容器塞数据,导致内存泄露. 你代码里的 ACTIONS 变量,在循环里面每次都塞一些数据,然后直到函数结束才释放. 也就是说, ACTIONS 里面包含整个文件的数据? 5G 的文件啊,哥. |
6
yuankui 2016-12-22 10:52:52 +08:00
忽略上面的,代码没仔细看..
|
7
p2p 2016-12-22 11:22:21 +08:00
如 2l 说的 减小 bulk 阀值, 直到没有内存问题
|
8
jimmyye 2016-12-22 11:24:33 +08:00
参考这里: https://github.com/elastic/elasticsearch-py/issues/297
1.试试用 generator 改写, 2.因为 bulk 调用 streaming_bulk ,试试调整 chunk_size 、 max_chunk_bytes : http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.streaming_bulk |
9
firebroo OP |
10
miraclinger 2016-12-22 18:37:23 +08:00
官网给的推荐是 1,000 to 5,000 条数据,文件大小是 5-15MB , https://www.elastic.co/guide/en/elasticsearch/guide/master/bulk.html
|
11
miraclinger 2016-12-22 19:23:33 +08:00
有个思路是用 linux 的切割命令: split -l 5000 input_file
再就是用多线程进行批量导入,线程数量最好是 200 个左右 |
12
miraclinger 2016-12-22 19:24:48 +08:00
有个思路是用 linux 的切割命令: split -l 5000 input_file
再就是用多线程对分割的文件 进行批量导入,线程数量最好是 200 个左右 |
13
WKPlus 2016-12-22 20:37:44 +08:00
没用过 python es 的库,但是看你的代码,如果 es 存了 ACTIONS 这个 list 的引用,有可能有内存泄露。把 ACTIONS = []改成 del ACTIONS[:]试下?
|
14
firebroo OP @miraclinger 嗯,我看了你的链接,官方的意思是推荐从一次导入 1000-5000 条开始测试直到找到最佳 performance 吧, 可能我的不是最佳,但是和这个应该没有关系,分割为小文件我导入我想过(现在我朋友推荐我使用 Java 的 API 用 9300 端口走 TCP 导入),但是我其实想找到内存泄露的原因呢。
@WKPlus 试过了,依然 oom ,我还试过 del 之后用 gc 库显示回收 gc ,也是炸裂。 |
15
firebroo OP 结帖了,在 github 提了[issue]( https://github.com/elastic/elasticsearch-py/issues/508),是我姿势不对。。
|
16
miraclinger 2016-12-23 09:56:52 +08:00
虽然已结贴,但是我还想问下,如果把值调成 5000 ,会出现内存泄露不?因为看了下 github 上的生成器,给我的感觉是一次性导入数据,不知道我有没有看错,如果这样的话,效率会比较低吧。
|
17
enenaaa 2016-12-23 10:44:50 +08:00
可以在内存飙升的时候看看具体是消耗在哪了。
貌似有 guppy 之类的工具可用? |
18
firebroo OP @miraclinger 晚上我测试完了给你结果,我觉得还是会泄露, github 那个它说 bluk 内部有 chunking ,默认好像是 chunking size 是 5000 吧,理解为 5000 个 documents 请求一次 es 的 API 就行。
@enenaaa 我取 stackoverflow 提问,有人推荐 pypi.python.org/pypi/memory_profiler ,但是我这个情况还是不适用。 |
20
firebroo OP |