python-如何使用elasticsearch.helpers.streaming_bulk
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python-如何使用elasticsearch.helpers.streaming_bulk,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2175字,纯文字阅读大概需要4分钟。
内容图文
![python-如何使用elasticsearch.helpers.streaming_bulk](/upload/InfoBanner/zyjiaocheng/694/c7391d2ed53842e9b145d13b9e2f023b.jpg)
有人可以建议如何使用函数elasticsearch.helpers.streaming_bulk代替elasticsearch.helpers.bulk将数据索引到elasticsearch中.
如果我只是简单地更改streaming_bulk而不是批量,则不会索引任何内容,因此我猜它需要以其他形式使用.
下面的代码从CSV文件中以500个元素的块创建索引,类型和索引数据,并进入elasticsearch.它工作正常,但是我在徘徊是否可以提高性能.这就是为什么我想尝试streaming_bulk函数的原因.
目前,我需要10分钟才能为200MB的CSV文档索引100万行.我使用两台机器,Centos 6.6,8个CPU,x86_64,CPU MHz:2499.902,内存:15.574G.
不确定它可以更快地进行.
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file
es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)
with open(file_to_index, 'rb') as csvfile:
reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows
content = {"_index": index_name, "_type": type_name}
batch_chunks = []
iterator = 0
for row in reader:
var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields, index_name, type_name)
batch_chunks.append(var)
if iterator % 500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks[:]
print "ispucalo batch"
iterator = iterator + 1
# indexing of last batch_chunk
if len(batch_chunks) != 0:
helpers.bulk(es,batch_chunks)
解决方法:
因此,流式批量返回一个interator.这意味着除非您开始对其进行迭代,否则将不会发生任何事情. “批量”功能的代码如下所示:
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in streaming_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors
因此,基本上只调用streaming_bulk(client,actions,** kwargs)实际上不会做任何事情.直到像本for循环中那样对它进行迭代,索引才真正开始发生.
因此,在您的代码中.欢迎您将“批量”更改为“ streaming_bulk”,但是您需要遍历流批量的结果才能真正索引任何内容.
内容总结
以上是互联网集市为您收集整理的python-如何使用elasticsearch.helpers.streaming_bulk全部内容,希望文章能够帮你解决python-如何使用elasticsearch.helpers.streaming_bulk所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。