scala写入MongoDB
2023-12-22 14:56:56
1.6 写MongoDB
写MongoDB的依赖
<dependency>
? ?<groupId>org.apache.spark</groupId>
? ?<artifactId>spark-sql_2.12</artifactId>
? ?<version>3.1.2</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
? ?<groupId>org.mongodb</groupId>
? ?<artifactId>casbah-core_2.12</artifactId>
? ?<version>3.1.1</version>
</dependency>
?
<!-- 加入Spark读写MongoDB的驱动 -->
<dependency>
? ?<groupId>org.mongodb.spark</groupId>
? ?<artifactId>mongo-spark-connector_2.12</artifactId>
? ?<version>2.4.3</version>
</dependency>
代码
package com.qianfeng.sparksql
?
import com.mongodb.{MongoClientURI, casbah}
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
?
/**
* 读写MongoDB的数据
*/
//mongoDB的连接配置封装
case class MongoConfig(url:String,db:String,col:String)
//封装学生考试信息
case class Stu(classID:Int,stuName:String,age:Int,sex:String,subject:String,score:Double)
object Demo09_Mongo_RW_Data {
?def main(args: Array[String]): Unit = {
? ?val configMap = Map(
? ? ?"url"->"mongodb://qianfeng01:27017/mydb",
? ? ?"db"->"mydb",
? ? ?"collection"->"student"
? )
? ?//获取SparkSQL的上下文
? ?val spark = SparkSession.builder()
? ? .appName("sparksql")
? ? .master("local[*]")
? ? .getOrCreate()
?
? ?import spark.implicits._
? ?val stuDF = spark.sparkContext.textFile("/Users/liyadong/data/sparkdata/stu.txt")
? ? .filter(_.length > 0)
? ? .map(x => {
? ? ? ?val fields = x.split(" ")
? ? ? ?//提取字段
? ? ? ?Stu(fields(0).trim.toInt, fields(1).trim, fields(2).trim.toInt, fields(3).trim, fields(4).trim, fields(5).trim.toDouble)
? ? }).toDF()
? ?//stuDF.show()
?
? ?//写入数据
? ?val config = MongoConfig(configMap.getOrElse("url",""),configMap.getOrElse("db",""),configMap.getOrElse("collection",""))
? ?writeDataToMongoDB(config,stuDF)
?
? ?//读取mongoDB的数据
? ?readDataFromMongoDB(config, spark)
?
? ?/*
? ?match case
? ? */
? ?//值匹配
? ?val a = 666
? ?val a1 = a match {
? ? ?case 666 => print("666-666")
? ? ?case 999 => println("999")
? ? ?case _ => println("null")
? }
? ?println(a1)
?
? ?//元组个数匹配
? ?val t = (1,3,5)
? ?val t1 = t match {
? ? ?case (a, b, 3) => a + b + 3
? ? ?case (a, b, 5) => a + b
? ? ?case _ => 0
? }
? ?println(t1)
?
? ?//数组个数匹配
? ?val l = Array(1,2)
? ?val l1 = l match {
? ? ?case Array(a) => a
? ? ?case Array(a, b) => a + b
? }
? ?println(l1)
?
? ?"hello spark".foreach(ch=>println(ch match {
? ? ?case ' ' => "space"
? ? ?case _ => "char:" + ch
? }))
?
?
? ?//5、关闭spark对象
? ?spark.stop()
}
?
?/**
? * 数据写入Mongo的逻辑
? * @param config
? * @param stuDF
? */
?def writeDataToMongoDB(config:MongoConfig,stuDF:DataFrame): Unit ={
? ?/*//获取Mongo的client
? ?val client = MongoClient(casbah.MongoClientURI(config.url))
? ?//获取操作集合
? ?val collection = client(config.db)(config.col)
?
? ?//删除集合
? ?collection.dropCollection()*/
? ?//将df写入collection中
? ?stuDF
? ? .write
? ? .option("uri",config.url)
? ? .option("collection",config.col)
? ? .mode(SaveMode.Overwrite)
? ? .format("com.mongodb.spark.sql")
? ? .save()
?
? ?//对数据创建索引
? ?//collection.createIndex("name")
}
?
?/**
? * 读取数据
? * @param config
? * @param stuDF
? */
?def readDataFromMongoDB(config:MongoConfig,spark:SparkSession): Unit ={
? ?val frame = spark
? ? .read
? ? .option("uri", config.url)
? ? .option("collection", config.col)
? ? .format("com.mongodb.spark.sql")
? ? .load()
? ?//打印
? ?frame match {
? ? ?case f => f.show()
? ? ?case _ => ""
? }
}
}
1、环境
2、idea中不能运行scala
3、winutil.exe ?null???? ?,,hadoop解压某个目录,将wingtil.exe和hadoop.dll放到hadoop的解压目录下的bin目录中,配置环境变量放到path中。关闭idea重新打开
Config("Hadoop_home_dir","")
文章来源:https://blog.csdn.net/HYSliuliuliu/article/details/134983673
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!