Easticsearch 数据迁移至influxdb【python】
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Easticsearch 数据迁移至influxdb【python】,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3321字,纯文字阅读大概需要5分钟。
内容图文
Easticsearch 数据迁移至influxdb python
需求:将Easticsearch部分数据迁移至influxdb中。
见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。
解决方案:大批量从Easticsearch取数据,两种方案。1.from...size 2.scroll (类似于数据库的游标) 脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。
#!/usr/bin/env python #coding=utf-8 import sys import json import datetime import elasticsearch from influxdb import InfluxDBClient #连接Easticsearch class ES(object): @classmethod def connect_host(cls): url = "http://192.168.121.33:9202/" es = elasticsearch.Elasticsearch(url,timeout=120) return es es = ES.connect_host() #连接influxdb client = InfluxDBClient(host="192.168.121.33", port="8086", username=‘admin‘, password=‘admin‘, database=‘esl‘) client.create_database(‘esl‘) #DSL查询语法 data = { "query": { "match_all" : {}}, "size": 100 } # 设置要过滤返回的字段值,要什么字段。 ‘hits.hits._source.resource_id‘, ‘hits.hits._source.timestamp‘, ‘hits.hits._source.counter_volume‘, ‘hits.hits._source.@timestamp‘, ] # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 res = es.search( index=‘pipefilter_meters*‘, doc_type =‘canaledge.flow.bytes‘, body=data, search_type="scan", scroll="10m" ) scroll_id = res[‘_scroll_id‘] response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) scroll_id = response[‘_scroll_id‘] #获取第二次scroll_id hits = response[‘hits‘][‘hits‘] in_data = [] while len(hits) > 0: for i in hits: res_id = i[‘_source‘][‘resource_id‘] r_id, r_type = res_id.split(‘:‘) datas = { "measurement": "es_net", "tags": { "resource_id": r_id, "type": r_type }, "time": i[‘_source‘][‘timestamp‘], "fields": { "counter_volume": i[‘_source‘][‘counter_volume‘] } } in_data.append(datas) #循环写入influxdb client.write_points(in_data) in_data = [] #每次循环完重新定义列表为空 data = { "query": { "match_all" : {}}, "size": 100 } ## 设置要过滤返回的字段值,要什么字段。 ‘_scroll_id‘, ‘hits.hits._source.resource_id‘, ‘hits.hits._source.timestamp‘, ‘hits.hits._source.counter_volume‘, ‘hits.hits._source.@timestamp‘, ] ## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) #调试 #if not response.get(‘hits‘): # print response # sys.exit(1) #else: hits = response[‘hits‘][‘hits‘] scroll_id = response["_scroll_id"] #获取第三次scroll_id
本文出自 “生锈的老枪_技术博客” 博客,转载请与作者联系!
Easticsearch 数据迁移至influxdb【python】
标签:python easticsearch inflxudb easticsearch迁移 easticsearch迁移influxdb
本文系统来源:http://ruilinux.blog.51cto.com/4265949/1892420
内容总结
以上是互联网集市为您收集整理的Easticsearch 数据迁移至influxdb【python】全部内容,希望文章能够帮你解决Easticsearch 数据迁移至influxdb【python】所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。