【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文依赖redis环境好用。
本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版
一、maven依赖
本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。
如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。
二、redis环境及相关内容说明
1、本示例是需要redis环境的,至于redis是集群或单击和本示例关系不大。
关于redis的更多信息请参考其官网。
2、本示例是以redis作为外部数据进行异步交互的例子,也是实际中应用中常见的例子。关于异步数据交互参考文章:55、Flink之用于外部数据访问的异步 I/O
三、redis作为Flink的source异步数据交互示例
本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。
本示例外部数据就以flink的集合作为示例,redis数据中存储的为hash表,下面验证中会有具体展示。
1、maven依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
2、redis异步交互数据实现
1)、读取redis数据时以string进行输出
package org.datastreamapi.source.custom.redis;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import com.sun.jdi.IntegerValue;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @author alanchan
*
*/
public class CustomRedisSource extends RichAsyncFunction<String, String> {
private JedisPoolConfig config = null;
private static String ADDR = "192.168.10.41";
private static int PORT = 6379;
// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时
private static int TIMEOUT = 10000;
private JedisPool jedisPool = null;
private Jedis jedis = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
}
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
// 文件中读取的内容
System.out.println("输入参数input----:" + input);
// 发起一个异步请求,返回结果
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String[] arrayData = input.split(",");
String name = arrayData[1];
String value = jedis.hget("AsyncReadUser_Redis", name);
System.out.println("查询结果output----:" + value);
return value;
}
}).thenAccept((String dbResult) -> {
// 设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));
});
}
// 连接超时的时候调用的方法
@Override
public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("redis connect timeout!");
}
@Override
public void close() throws Exception {
super.close();
if (jedis.isConnected()) {
jedis.close();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class User {
private int id;
private String name;
private int age;
private double balance;
User(String value) {
String[] str = value.split(",");
this.setId(Integer.valueOf(str[0]));
this.setName(str[1]);
this.setAge(Integer.valueOf(str[2]));
this.setBalance(Double.valueOf(str[0]));
}
}
}
2)、读取redis数据时以pojo进行输出
package org.datastreamapi.source.custom.redis;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @author alanchan
*
*/
public class CustomRedisSource2 extends RichAsyncFunction<String, User> {
private JedisPoolConfig config = null;
private static String ADDR = "192.168.10.41";
private static int PORT = 6379;
// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时
private static int TIMEOUT = 10000;
private JedisPool jedisPool = null;
private Jedis jedis = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
}
@Override
public void asyncInvoke(String input, ResultFuture<User> resultFuture) throws Exception {
System.out.println("输入查询条件:" + input);
CompletableFuture.supplyAsync(new Supplier<User>() {
@Override
public User get() {
String[] arrayData = input.split(",");
String name = arrayData[1];
String value = jedis.hget("AsyncReadUser_Redis", name);
System.out.println("查询redis结果:" + value);
return new User(value);
}
}).thenAccept((User dbResult) -> {
// 设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));
});
}
// 连接超时的时候调用的方法
@Override
public void timeout(String input, ResultFuture<User> resultFuture) throws Exception {
System.out.println("redis connect timeout!");
}
@Override
public void close() throws Exception {
super.close();
if (jedis.isConnected()) {
jedis.close();
}
}
}
3、使用示例
package org.datastreamapi.source.custom.redis;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;
/**
* @author alanchan
*
*/
public class TestCustomRedisSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// id,name
DataStreamSource<String> lines = env.fromElements("1,alan", "2,alanchan", "3,alanchanchn", "4,alan_chan", "5,alan_chan_chn");
SingleOutputStreamOperator<String> result = AsyncDataStream.orderedWait(lines, new CustomRedisSource(), 10, TimeUnit.SECONDS, 1);
SingleOutputStreamOperator<User> result2 = AsyncDataStream.orderedWait(lines, new CustomRedisSource2(), 10, TimeUnit.SECONDS, 1);
result.print("result-->").setParallelism(1);
result2.print("result2-->").setParallelism(1);
env.execute();
}
}
4、验证
1)、准备redis环境数据
hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'
127.0.0.1:6379> hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hgetall AsyncReadUser_Redis
1) "alan"
2) "1,alan,18,20,alan.chan.chn@163.com"
3) "alanchan"
4) "2,alanchan,19,25,alan.chan.chn@163.com"
5) "alanchanchn"
6) "3,alanchanchn,20,30,alan.chan.chn@163.com"
7) "alan_chan"
8) "4,alan_chan,27,20,alan.chan.chn@163.com"
9) "alan_chan_chn"
10) "5,alan_chan_chn,36,10,alan.chan.chn@163.com"
2)、启动应用程序,并观察控制台输出
输入查询条件:5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件:3,alanchanchn
输入查询条件:1,alan
输入参数input----:1,alan
输入查询条件:2,alanchan
输入查询条件:4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,alan.chan.chn@163.com
查询redis结果:1,alan,18,20,alan.chan.chn@163.com
查询结果output----:1,alan,18,20,alan.chan.chn@163.com
查询redis结果:4,alan_chan,27,20,alan.chan.chn@163.com
查询redis结果:2,alanchan,19,25,alan.chan.chn@163.com
查询结果output----:2,alanchan,19,25,alan.chan.chn@163.com
查询redis结果:3,alanchanchn,20,30,alan.chan.chn@163.com
查询结果output----:4,alan_chan,27,20,alan.chan.chn@163.com
查询结果output----:5,alan_chan_chn,36,10,alan.chan.chn@163.com
查询redis结果:5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 4,alan_chan,27,20,alan.chan.chn@163.com
result-->> 5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 3,alanchanchn,20,30,alan.chan.chn@163.com
result-->> 2,alanchan,19,25,alan.chan.chn@163.com
result-->> 1,alan,18,20,alan.chan.chn@163.com
result2-->> CustomRedisSource.User(id=4, name=alan_chan, age=27, balance=4.0)
result2-->> CustomRedisSource.User(id=1, name=alan, age=18, balance=1.0)
result2-->> CustomRedisSource.User(id=3, name=alanchanchn, age=20, balance=3.0)
result2-->> CustomRedisSource.User(id=5, name=alan_chan_chn, age=36, balance=5.0)
result2-->> CustomRedisSource.User(id=2, name=alanchan, age=19, balance=2.0)
以上,本文主要介绍Flink 的redis作为数据源的异步读取使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!