Flink vs. Spark:特点、区别和使用场景

2023-12-16 13:49:01

简介: Flink 和 Spark 是两个主流的大数据处理框架,但它们在数据处理模型、执行引擎和使用场景上有着不同的特点。本文将深入比较 Flink 和 Spark,以及它们的适用场景,并结合代码示例说明它们的用法和优劣势。

1. Flink 和 Spark 的特点比较

数据处理模型
  • Flink: 基于事件驱动的流式处理,支持精确事件时间处理和状态管理。底层使用自带的执行引擎处理数据流。
  • Spark: 基于批处理和微批处理模型,支持 RDD、DataFrame 和 Dataset。Structured Streaming 实现了流处理,但相对 Flink 在事件时间处理和状态管理上有所欠缺。
状态管理和容错性
  • Flink: 内置的状态管理机制支持流式任务的状态管理和容错性,保证数据的一致性。
  • Spark: 对状态管理相对较弱,依赖外部存储如 HDFS,容错性有一定限制。

2. Flink 和 Spark 的使用场景

Flink 的使用场景
  • 精确的事件处理: 适用于需要精确事件时间处理和严格状态管理的场景,如金融交易、实时监控等。
  • 实时流式应用: 对于对延迟敏感的实时流式应用,如实时推荐、网络监控等。
  • 迭代计算: 适用于图计算、机器学习等需要迭代计算的场景。
Spark 的使用场景
  • 大规模批处理: 对大规模数据的离线批处理,如数据清洗、ETL 等。
  • 交互式分析: 支持交互式查询和数据探索,适用于数据科学家和分析师。
  • 机器学习: Spark MLlib 提供了丰富的机器学习库,用于大规模数据的机器学习应用。

3. Flink 和 Spark 示例代码

Flink 示例代码(WordCount)
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkWordCount {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
                "To be, or not to be, that is the question",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
                .groupBy(0)
                .sum(1);

        counts.print();
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

Spark 示例代码(WordCount)
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;

public class SparkWordCount {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("SparkWordCount")
                .master("local[*]")
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        JavaRDD<String> lines = jsc.parallelize(Arrays.asList(
                "To be, or not to be, that is the question",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        ));

        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.toLowerCase().split("\\W+")).iterator());

        JavaRDD<String> filteredWords = words.filter(word -> word.length() > 0);

        JavaRDD<String> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey(Integer::sum)
                .map(tuple -> tuple._1 + ": " + tuple._2);

        wordCounts.foreach(System.out::println);

        spark.stop();
    }
}

当涉及到 FlinkSQL 和 SparkSQL 时,两者提供了一种以 SQL 语言进行数据查询和处理的方式。以下是分别在 Flink 和 Spark 中使用 SQL 进行简单数据处理的示例代码:

FlinkSQL 示例代码

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQLExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建一个DataStream(假设数据源为Kafka)
        DataStream<Order> orders = env.fromElements(
                new Order(1, "product-1", 100),
                new Order(2, "product-2", 150),
                new Order(3, "product-3", 200)
        );

        // 注册DataStream为表
        tableEnv.createTemporaryView("Orders", orders, "orderId, productName, amount");

        // 使用SQL进行查询
        Table result = tableEnv.sqlQuery("SELECT productName, SUM(amount) as totalAmount FROM Orders GROUP BY productName");

        // 将结果转换为DataStream并打印输出
        tableEnv.toRetractStream(result, Types.TUPLE(Types.STRING, Types.LONG))
                .print();

        env.execute("Flink SQL Example");
    }

    // 订单类
    public static class Order {
        public int orderId;
        public String productName;
        public int amount;

        public Order(int orderId, String productName, int amount) {
            this.orderId = orderId;
            this.productName = productName;
            this.amount = amount;
        }
    }
}

SparkSQL 示例代码

import org.apache.spark.sql.*;

public class SparkSQLExample {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("SparkSQLExample")
                .master("local[*]")
                .getOrCreate();

        // 创建一个DataFrame
        Dataset<Row> df = spark.createDataFrame(
                spark.sparkContext().parallelize(
                        RowFactory.create(1, "product-1", 100),
                        RowFactory.create(2, "product-2", 150),
                        RowFactory.create(3, "product-3", 200)
                ),
                DataTypes.createStructType(new StructField[]{
                        DataTypes.createStructField("orderId", DataTypes.IntegerType, false),
                        DataTypes.createStructField("productName", DataTypes.StringType, false),
                        DataTypes.createStructField("amount", DataTypes.IntegerType, false)
                })
        );

        // 创建一个临时视图
        df.createOrReplaceTempView("Orders");

        // 使用SQL进行查询
        Dataset<Row> result = spark.sql("SELECT productName, SUM(amount) as totalAmount FROM Orders GROUP BY productName");

        // 展示结果
        result.show();

        spark.stop();
    }
}

在项目中我们会经常使用sparkSQL针对离线数据进行统计汇总,做一些概览数据的呈现功能。?

结语

Flink 和 Spark 都是强大的大数据处理框架,各自有着独特的特点和适用场景。通过本文的比较,可以更深入地了解它们,并根据自身需求选择适合的框架来处理数据。掌握两者的优劣势有助于更好地应用于大数据处理和实时计算场景。

文章来源:https://blog.csdn.net/EverythingAtOnce/article/details/135031293
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。