Flink之DataStream API的转换算子

2023-12-13 14:49:19

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {
    public static void main(String[] args) {



        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        env.socketTextStream("hadoop102",8888)
                .flatMap((String line, Collector<Tuple2<String, Integer>> out)->{
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word,1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(0)
                .sum(1)
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {

        private String Operator;



        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word,1));
            }
        }
    }
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);
        ds.print("input================");
        //reduce:每个用户的点击次数

        ds.map(event->new WordCount(event.getUser(),1))
                .keyBy(WordCount::getWord)
                .reduce(
                        new ReduceFunction<WordCount>() {
                            /**
                             *
                             * @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出
                             * @param value2 新来的值
                             * @return
                             * @throws Exception
                             */
                            @Override
                            public WordCount reduce(WordCount value1, WordCount value2) throws Exception {
                                System.out.println("测试");
                                return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());
                            }
                        }
                ).print("reduce");

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);
        FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(
                new TextLineInputFormat(),
                new Path("input/enents.txt")
        ).build();

        DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

        SingleOutputStreamOperator<Event> ds = fileDs.map(
                new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] valueArr = value.split("");
                        return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));
                    }
                }
        );


        ds.print("input================");

        ds.map(
                new RichMapFunction<Event, WordCount>() {

                    /**
                     * 生命周期open方法,当前算子实例创建时执行一次,只执行一次
                     * @param parameters The configuration containing the parameters attached to the contract.
                     * @throws Exception
                     */
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println("创建Redis的连接对象");
                    }

                    @Override
                    public WordCount map(Event value) throws Exception {

                        System.out.println("每条数据执行一次");

                        return new WordCount(value.getUser(),1);
                    }

                    /**
                     * 生命周期的close方法
                     * 当前算子实例销毁时执行一次
                     * @throws Exception
                     */
                    @Override
                    public void close() throws Exception {
                        System.out.println("关闭连接对象");
                    }
                }
        ).print("map");

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

文章来源:https://blog.csdn.net/qq_44273739/article/details/134841620
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。