logstash6.8.3 导入 CSV 文件到 ElasticSearch
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了logstash6.8.3 导入 CSV 文件到 ElasticSearch,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4520字,纯文字阅读大概需要7分钟。
内容图文
logstash6.8.3 导入 CSV 文件到 ElasticSearch
使用logstash 导入数据到ES时,由三个步骤组成:input、filter、output。整个导入过程可视为:unix 管道操作,而管道中的每一步操作都是由"插件"实现的。使用./bin/logstash-plugin list
查看 logstash 已安装的插件。
每个插件的选项都可以在官网查询,先明确是哪一步操作,然后去官方文档看是否有相应的插件是否支持这种操作。比如 output 配置选项:plugins-outputs-elasticsearch-options),其中的doc_id选项就支持 指定docid写入ES。在这里,简要说明一些常用的插件,要想了解它们实现的功能可参考官方文档。
- mutate 插件
- csv 插件
- convert 插件
- date 插件
- xxx
使用logstash导入时,会默认生成一些额外的字段,比如@version、host、@timestamp,如果用不着,这些字段可以去除掉 ,此外,要注意ES中的索引的格式(Mapping结构),最好是指定自定义的索引模板,保证索引最"精简"。
配置文件完成后,执行以下命令./bin/logstash -f csvfile_logstash.conf
即可启动 logstash 执行导入操作。
以下是各种错误解决:
错误一:
ConfigurationError”, :message=>”Expected one of #, input, filter, output at line 1, column 1
如果 配置文件内容是正确的,用Notepad检查一下文件的编码,确保是:UTF-8 无BOM格式编码
解决 SOH 分隔符问题
由于csv插件的separator选项不支持转义字符,因此无法用\u0001
来代表SOH。如果 csv 文件以 SOH 分隔符(\u0001
)分割,一种方案是使用mutate插件替换,将\u0001
替换成逗号。如下所示:
mutate{
# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
gsub => [ "message","\u0001","," ]
# @timestamp 字段是默认生成的, 名称修改成 created
rename => ["@timestamp", "created"]
}
但是实际上logstash6.8.3是支持按 SOH 分割的。在Linux shell 下,先按 ctrl+v,再按ctrl+a,输入的就是SOH。那么在vim中打开配置文件,在 vim的 insert 模式下,先按 ctrl+v,再按ctrl+a,将SOH作为 csv 插件的separator分割符。
csv {
# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
separator => ""
columns => ["topsid", "title"]
# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
remove_field => ["host", "@timestamp", "@version", "message","path"]
}
一个示例配置模板如下:(以SOH作为分割符)
input {
file {
path => "/data/pengshijin/test/*.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
separator => ""
columns => ["topsid", "title"]
# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
remove_field => ["host", "@timestamp", "@version", "message","path"]
}
mutate {
convert => {
# 类型转换
"topsid" => "integer"
"title" => "string"
}
}
}
output {
elasticsearch {
hosts => "http://http://127.0.0.1:9200"
index => "chantitletest"
# 指定 文档的 类型为 "_doc"
document_type => "_doc"
# 指定doc id 为topsid字段的值
document_id => "%{topsid}"
manage_template => true
# 使用自定义的模板写入,否则将会以logstash默认模板写入
template => "/data/services/logstash-6.8.3/config/chantitletpe.json"
template_overwrite => true
template_name => "chantitletpe"
}
stdout{
codec => json_lines
}
}
一个示例配置模板如下(将 SOH 转换成 逗号):
input {
file {
path => "/data/pengshijin/test/*.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
mutate{
# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
gsub => [ "message","\u0001","," ]
# @timestamp 字段是默认生成的, 名称修改成 created
rename => ["@timestamp", "created"]
}
csv {
# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
separator => ","
columns => ["topsid", "title"]
# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
remove_field => ["host", "@timestamp", "@version", "message","path"]
}
mutate {
convert => {
# 类型转换
"topsid" => "integer"
"title" => "string"
}
}
}
output {
elasticsearch {
hosts => "http://127.0.0.1:9200"
index => "chantitletest"
document_type => "_doc"
# 指定doc id 为topsid字段的值
document_id => "%{topsid}"
manage_template => true
# 使用自定义的模板写入,否则将会以logstash默认模板写入
template => "/data/services/logstash-6.8.3/config/chantitletpe.json"
template_overwrite => true
template_name => "chantitletpe"
}
stdout{
codec => json_lines
}
}
使用的自定义模板如下:
{
"index_patterns": [
"chantitle_v1",
"chantitletest"
],
"settings": {
"number_of_shards": 3,
"analysis": {
"analyzer": {
"my_hanlp_analyzer": {
"tokenizer": "my_hanlp"
},
"pinyin_analyzer": {
"tokenizer": "my_pinyin"
}
},
"tokenizer": {
"my_hanlp": {
"enable_normalization": "true",
"type": "hanlp_standard"
},
"my_pinyin": {
"keep_joined_full_pinyin": "true",
"lowercase": "true",
"keep_original": "true",
"remove_duplicated_term": "true",
"keep_first_letter": "false",
"keep_separate_first_letter": "false",
"type": "pinyin",
"limit_first_letter_length": "16",
"keep_full_pinyin": "true"
}
}
}
},
"mappings": {
"_doc": {
"properties": {
"created": {
"type": "date",
"doc_values": false,
"format": "yyyy-MM-dd HH:mm:ss"
},
"title": {
"type": "text",
"fields": {
"pinyin": {
"type": "text",
"boost": 10,
"analyzer": "pinyin_analyzer"
},
"raw": {
"type": "keyword",
"doc_values": false
}
},
"analyzer": "my_hanlp_analyzer"
},
"topsid": {
"type": "long",
"doc_values": false
}
}
}
}
}
原文:https://www.cnblogs.com/hapjin/p/12410408.html
原文:https://www.cnblogs.com/hapjin/p/12410408.html
内容总结
以上是互联网集市为您收集整理的logstash6.8.3 导入 CSV 文件到 ElasticSearch全部内容,希望文章能够帮你解决logstash6.8.3 导入 CSV 文件到 ElasticSearch所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。