Spark Dataset的实用操作笔记

2023-12-14 11:35:27

最近的项目用spark做离线计算,所以有用到一些操作,简单笔记一下
1.Dataset纵向复制数据
当一个dataset中数据量太少,不能有效的随机联查别的dataset时,需要将数据纵向复制,把数据量撑起来。可以采用两种方式:
第一种:

userDataset.unionAll(userDataset).unionAll(userDataset)

这种方式在少量复制时可以用,若要大量复制,则会创建大量dataset对象,且代码写起来也费劲。
第二种:

int[] array = IntStream.range(0, 30).toArray();
userDataset= userDataset.withColumn("row_num", functions.explode(functions.lit(array))).drop(functions.col("row_num"));

这种方式由range(0, 30)这个来控制复制的次数,好使一点
示例:

// String env = "yarn";
String env = "local[*]";
SparkSession sparkSession = SparkSession.builder().appName(appName).master(env ).getOrCreate();
hdfsPath = "file:///C:\\\\\\\\recall.csv";
        Dataset<Row> dataset = createRealView(session, hdfsPath);
        dataset.show(200);
        String randStr = String.valueOf((int) ((Math.random() * 9 + 1) * Math.pow(10, 5)));
        dataset = dataset.withColumn("group_name", functions.lit(RecallTaskEnum.taskName(taskNum)));
        dataset = dataset.withColumn("group_id",  functions.lit(randStr));
        dataset = dataset.withColumn("creater",  functions.lit("sparkTask"));
        dataset = dataset.withColumn("updater",  functions.lit("sparkTask"));
        dataset = dataset.withColumn("del_flag",  functions.lit(1));
        dataset = dataset.withColumn("created_time",  functions.now());
        dataset = dataset.withColumn("update_time",  functions.now());
        dataset.show(200);
        int[] array = IntStream.range(0, 10).toArray();
        dataset = dataset.withColumn("dummy", functions.explode(functions.lit(array))).drop(functions.col("dummy"));
        dataset.show(200);

其中, recall.csv

client_id,score
7056,2
7057,12
7058,1200
212121,1100
212122,100
212123,100

那么三次的输出结果为:

+---------+-----+
|client_id|score|
+---------+-----+
|     7056|    2|
|     7057|   12|
|     7058| 1200|
|   212121| 1100|
|   212122|  100|
|   212123|  100|
+---------+-----+

+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|client_id|score|      group_name|group_id|  creater|  updater|del_flag|        created_time|         update_time|
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+

+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|client_id|score|      group_name|group_id|  creater|  updater|del_flag|        created_time|         update_time|
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7056|    2|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7057|   12|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|     7058| 1200|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212121| 1100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212122|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
|   212123|  100|同业存单人群圈选|  728804|sparkTask|sparkTask|       1|2023-12-07 15:02:...|2023-12-07 15:02:...|
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+

2.创建指定结构的空Dataset
在项目开发中,如果需要读取csv文件或者读取MySQL创建Dataset时,有可能出现异常,为避免服务雪崩,那么可以使用创建指定相同结构的空Dataset去参与计算。
例如:

public static Dataset<Row> getDataByPartition(SparkSession session, String table, String partColumn, String selectColumns, String condition) {
        Map<String, String> dbOptions = new HashMap<>();
        dbOptions.put("driver", envMap.get("DRIVER"));
        dbOptions.put("url", envMap.get("URL"));
        dbOptions.put("user", envMap.get("USER"));
        dbOptions.put("password", envMap.get("PASSWORD"));

        //查询需要的数据
        String dataSelect = String.format("select %s from %s ", selectColumns, table);
        dataSelect += condition;
        Map<String, String> options = new HashMap<>();
        options.put("query", dataSelect);
        options.put("numPartitions", String.valueOf(EnvConstant.PARTITION_NUM));
        options.putAll(dbOptions);
eturn session.read().format("jdbc").options(options).load();

这样分区读数据库来创建视图,有可能会产生异常,程序可能会崩溃,加入一下处理

// 逗号分隔格式:selectColumns = "client_id, score, group_name, group_id, creater, updater|del_flag, created_time, update_time";
    public static Dataset<Row> generateEmptyDataset(SparkSession session, String selectColumns) {
        StructType schema = new StructType();
        if (StringUtils.isEmpty(selectColumns)) {
            schema = schema.add(new StructField("emptyDataset", DataTypes.StringType, false, Metadata.empty()));
            return session.createDataset(Collections.emptyList(), Encoders.row(schema));
        }
        String[] split = selectColumns.split(",");
        for (String str : split) {
            String res = str.trim();
            schema = schema.add(new StructField(res, DataTypes.StringType, false, Metadata.empty()));
        }
        return session.createDataset(Collections.emptyList(), Encoders.row(schema));
    }

根据传入的结构,动态创建schema,从而动态生成Dataset
修改后:

public static Dataset<Row> getDataByPartition(SparkSession session, String table, String partColumn, String selectColumns, String condition) {
        Map<String, String> dbOptions = new HashMap<>();
        dbOptions.put("driver", envMap.get("DRIVER"));
        dbOptions.put("url", envMap.get("URL"));
        dbOptions.put("user", envMap.get("USER"));
        dbOptions.put("password", envMap.get("PASSWORD"));

        //查询需要的数据
        String dataSelect = String.format("select %s from %s ", selectColumns, table);
        dataSelect += condition;
        Map<String, String> options = new HashMap<>();
        options.put("query", dataSelect);
        options.put("numPartitions", String.valueOf(EnvConstant.PARTITION_NUM));
        options.putAll(dbOptions);
        try {
            int i = 1/0;
            return session.read().format("jdbc").options(options).load();
        } catch (Exception e) {
            System.out.println("分区查询数据异常:" + e.getMessage());
            Dataset<Row> emptyDataset = generateEmptyDataset(session, selectColumns);
            emptyDataset.show();
            return emptyDataset;
        }
    }

当我设置int i = 1/0产生异常后,创建空dataset

+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|client_id|score|      group_name|group_id|  creater|  updater|del_flag|        created_time|         update_time|
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+
|         |     |                |        |         |         |        |                    |                    |
+---------+-----+----------------+--------+---------+---------+--------+--------------------+--------------------+

文章来源:https://blog.csdn.net/qq_40623672/article/details/134855617
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。