scala写入MongoDB
2023-12-22 14:56:56
1.6 写MongoDB
写MongoDB的依赖
<dependency>
? ?<groupId>org.apache.spark</groupId>
? ?<artifactId>spark-sql_2.12</artifactId>
? ?<version>3.1.2</version>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
? ?<groupId>org.mongodb</groupId>
? ?<artifactId>casbah-core_2.12</artifactId>
? ?<version>3.1.1</version>
</dependency>
?
<!-- 加入Spark读写MongoDB的驱动 -->
<dependency>
? ?<groupId>org.mongodb.spark</groupId>
? ?<artifactId>mongo-spark-connector_2.12</artifactId>
? ?<version>2.4.3</version>
</dependency>
代码
package com.qianfeng.sparksql ? import com.mongodb.{MongoClientURI, casbah} import com.mongodb.casbah.{MongoClient, MongoClientURI} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} ? /** * 读写MongoDB的数据 */ //mongoDB的连接配置封装 case class MongoConfig(url:String,db:String,col:String) //封装学生考试信息 case class Stu(classID:Int,stuName:String,age:Int,sex:String,subject:String,score:Double) object Demo09_Mongo_RW_Data { ?def main(args: Array[String]): Unit = { ? ?val configMap = Map( ? ? ?"url"->"mongodb://qianfeng01:27017/mydb", ? ? ?"db"->"mydb", ? ? ?"collection"->"student" ? ) ? ?//获取SparkSQL的上下文 ? ?val spark = SparkSession.builder() ? ? .appName("sparksql") ? ? .master("local[*]") ? ? .getOrCreate() ? ? ?import spark.implicits._ ? ?val stuDF = spark.sparkContext.textFile("/Users/liyadong/data/sparkdata/stu.txt") ? ? .filter(_.length > 0) ? ? .map(x => { ? ? ? ?val fields = x.split(" ") ? ? ? ?//提取字段 ? ? ? ?Stu(fields(0).trim.toInt, fields(1).trim, fields(2).trim.toInt, fields(3).trim, fields(4).trim, fields(5).trim.toDouble) ? ? }).toDF() ? ?//stuDF.show() ? ? ?//写入数据 ? ?val config = MongoConfig(configMap.getOrElse("url",""),configMap.getOrElse("db",""),configMap.getOrElse("collection","")) ? ?writeDataToMongoDB(config,stuDF) ? ? ?//读取mongoDB的数据 ? ?readDataFromMongoDB(config, spark) ? ? ?/* ? ?match case ? ? */ ? ?//值匹配 ? ?val a = 666 ? ?val a1 = a match { ? ? ?case 666 => print("666-666") ? ? ?case 999 => println("999") ? ? ?case _ => println("null") ? } ? ?println(a1) ? ? ?//元组个数匹配 ? ?val t = (1,3,5) ? ?val t1 = t match { ? ? ?case (a, b, 3) => a + b + 3 ? ? ?case (a, b, 5) => a + b ? ? ?case _ => 0 ? } ? ?println(t1) ? ? ?//数组个数匹配 ? ?val l = Array(1,2) ? ?val l1 = l match { ? ? ?case Array(a) => a ? ? ?case Array(a, b) => a + b ? } ? ?println(l1) ? ? ?"hello spark".foreach(ch=>println(ch match { ? ? ?case ' ' => "space" ? ? ?case _ => "char:" + ch ? })) ? ? ? ?//5、关闭spark对象 ? ?spark.stop() } ? ?/** ? * 数据写入Mongo的逻辑 ? * @param config ? * @param stuDF ? */ ?def writeDataToMongoDB(config:MongoConfig,stuDF:DataFrame): Unit ={ ? ?/*//获取Mongo的client ? ?val client = MongoClient(casbah.MongoClientURI(config.url)) ? ?//获取操作集合 ? ?val collection = client(config.db)(config.col) ? ? ?//删除集合 ? ?collection.dropCollection()*/ ? ?//将df写入collection中 ? ?stuDF ? ? .write ? ? .option("uri",config.url) ? ? .option("collection",config.col) ? ? .mode(SaveMode.Overwrite) ? ? .format("com.mongodb.spark.sql") ? ? .save() ? ? ?//对数据创建索引 ? ?//collection.createIndex("name") } ? ?/** ? * 读取数据 ? * @param config ? * @param stuDF ? */ ?def readDataFromMongoDB(config:MongoConfig,spark:SparkSession): Unit ={ ? ?val frame = spark ? ? .read ? ? .option("uri", config.url) ? ? .option("collection", config.col) ? ? .format("com.mongodb.spark.sql") ? ? .load() ? ?//打印 ? ?frame match { ? ? ?case f => f.show() ? ? ?case _ => "" ? } } }
1、环境
2、idea中不能运行scala
3、winutil.exe ?null???? ?,,hadoop解压某个目录,将wingtil.exe和hadoop.dll放到hadoop的解压目录下的bin目录中,配置环境变量放到path中。关闭idea重新打开
Config("Hadoop_home_dir","")
文章来源:https://blog.csdn.net/HYSliuliuliu/article/details/134983673
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!