第一个FLink程序之wordCount

2023-12-23 06:17:01

前言

前几篇内容讲解的都是环境的部署安装,下面就关于安装好的环境,开始着手程序的编写和实现。


一、Flink批处理

批处理在flink中来说操作是有界的,比如对一个文件的单词进行统计,首选的话需要创建执行环境,此处使用的ExecutionEnviroment,下面是具体的执行代码
提示:先导入maven依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>

提示:创建java运行类

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountBatchDemo {
    public static void main(String[] args) throws Exception {
        //TODO 1.创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //TODO 2.读取文件:从文件中读取
        DataSource<String> lineDs = env.readTextFile("input/word.txt");
        //TODO 3.切分、转换(word,1)
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //TODO 3.1按照空格切分单词
                String[] words = value.split(" ");
                //TODO 3.2将单词转换为(word,1)格式
                for (String word:words){
                    Tuple2<String,Integer> wordTuple2 = Tuple2.of(word,1);
                    out.collect(wordTuple2);
                }
            }
        });
        //TODO 4.按照word分组
        UnsortedGrouping<Tuple2<String,Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);
        //ToDO 5.各分组内聚合
        AggregateOperator<Tuple2<String,Integer>> sum = wordAndOneGroupBy.sum(1);//1是位置,表示第二个袁术
       //TODO 6.输出
        sum.print();
    }
}

读取的文件目录可以在项目文件中创建,也可以更改为hdfs路径

二、Flink流式处理

Flink流式处理首选要创建流式执行环境,此时用StreamExecutionEnvironment
提示:flink高版本maven依赖和低版本有很大区别

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>

提示:类名需导入正确路径,防止程序出错

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WordCountStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        DataStreamSource<String> lineDS = env.socketTextStream("cdp1", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for(String word:words){
                    out.collect(Tuple2.of(word,1));
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum1 = tuple2StringKeyedStream.sum(1);
        sum1.print();
        env.execute();
    }
}

该程序是监听cdp1这台机器的7777端口,可以在cdp1这台机器上执行命令nc -lk 7777开启7777端口(确保机器已安装netcat,如果未安装,则执行yum install -y netcat命令安装),在窗口内,输入单词,flink程序这边即可统计单词数量。


总结

以上两个例子分别是批处理文件单词统计和流式处理统计7777端口输入的单词统计,也是开始接触flink的第一个demo,flink还有很多强大的功能,后续会结合具体的业务场景讲解具体的实现代码,好了,今天就讲到这里,后续会继续持续更新。

文章来源:https://blog.csdn.net/weixin_43153588/article/details/135117008
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。