生产环境_Spark解析JSON字符串并插入到MySQL数据库

2023-12-16 16:34:26

业务背景:? ? ??

??最近开发有一个需求,是这样的

????????我需要将一段从前端传过来的JSON字符串进行解析,并从中提取出所需的数据,然后将这些数据插入到MySQL数据库中。

json格式样例如下

{ \"区域编号\": \"001\", \"区域名称\": \"测试区域\", \"速度\": \"50\", \"速度分数\": \"80\", \"gj\": \"中国\", \"区域顶点集\": \"[{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]\" }"}

????????spark代码会使用JsonPath库解析JSON数据将上面的json串解析,使用JsonPath.parsejsonStr解析为parsedJson对象。

????????随后使用SparkSQL将这条数据写入mysql中特定的库表,其实还可以做一个优化的,比如入库前先判断有没有这条数据,如果有则不插入,如果没有就插入,优化这部分我没做,有时间在改吧。

? ? 代码如下

??注意的是,只要配置好mysql的配置即可

import com.jayway.jsonpath.JsonPath
import java.sql.{Connection, DriverManager}
object area_sd_insert_v2 {
  def main(args: Array[String]): Unit = {


    // 初始化参数
    val jsonparam = "{\"jsonStr\": \"{ \\\"区域编号\\\": \\\"001\\\", \\\"区域名称\\\": \\\"测试区域\\\", \\\"速度\\\": \\\"50\\\", \\\"速度分数\\\": \\\"80\\\", \\\"gj\\\": \\\"中国\\\", \\\"区域顶点集\\\": \\\"[{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]\\\" }\"}"
    println(jsonparam)

    val jsonStr = JsonPath.read[String](jsonparam, "$.jsonStr")
    val parsedJson = JsonPath.parse(jsonStr)

    val areaid = parsedJson.read[String]("$.区域编号")
    val areaName = parsedJson.read[String]("$.区域名称")
    val sd = parsedJson.read[String]("$.速度")
    val score = parsedJson.read[String]("$.速度分数")
    val gj = parsedJson.read[String]("$.gj")
    val areaPts = parsedJson.read[String]("$.区域顶点集")
    
    //by_matrix70,防止抄袭_20231216
    //博客主页   https://blog.csdn.net/qq_52128187?type=blog

    val host = "192.168.11.11"
    val port = "3306"
    val defaultCharset = "utf-8"
    val user = "root"
    val password = "123456789"
    val base = "test_1" // 数据库名
    val table = "table_20231216"
    val driver ="com.mysql.jdbc.Driver"
    val url = s"jdbc:mysql://$host:$port/$base?useUnicode=true&characterEncoding=$defaultCharset&useSSL=false"

    Class.forName(driver)
    val connection = DriverManager.getConnection(url, user, password)
    try {
      val statement = connection.createStatement
      val rowAffected = statement.executeUpdate(s"""INSERT INTO $table ('区域编号', '区域名称', '速度', '速度分数', 'gj', '区域顶点集')
                                              VALUES ('$areaid', '$areaName', '$sd', '$score', '$gj', '$areaPts')""")
    } finally {
      connection.close()
    }
  }
}

文章来源:https://blog.csdn.net/qq_52128187/article/details/135021720
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。