Flink 输出至 Elasticsearch
2023-12-29 23:09:26
【1】引入pom.xml
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.0</version>
</dependency>
【2】ES6 Scala
代码,自动导入的scala
包需要修改为scala._
否则会出现错误。
package com.zzx.flink
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object EsSinkTest {
def main(args: Array[String]): Unit = {
// 创建一个流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据并转换为 类
val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
//转换
val dataStream: DataStream[SensorReading] = inputStreamFromFile
.map( data => {
var dataArray = data.split(",")
SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
})
//定义一个 HttpHosts
val httpHost = new util.ArrayList[HttpHost]()
//默认 9200 我的修改为了 9201
httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
//定义一个 ElasticSearchFuntion 操作 es的function
val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
//element 每一条数据 通过 index 发送
override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
//包装写入 es 的数据
val dataSource = new util.HashMap[String,String]()
dataSource.put("sensor_id",element.id)
dataSource.put("temp",element.temperature.toString)
dataSource.put("ts",element.timestamp.toString)
//index
val indexRequest = Requests.indexRequest()
.index("sensor_temp")
.`type`("readingdata")
.source(dataSource)
index.add(indexRequest)
println("saved successfully " + element.toString)
}
}
//输出值 es
dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
env.execute("es")
}
}
【3】ES6
输出展示
文章来源:https://blog.csdn.net/zhengzhaoyang122/article/details/135299055
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!