好好学习

扫一扫关注

spark操作关系型数据库

下载文本     王走召2022-09-30 20:42:50 4270
mysql 读 方法1
    val prop = new Properties()    prop.setProperty("user", "xxx")    prop.setProperty("password", "xxx")    val url = "jdbc:mysql://ip:port/cdp?useUnicode=true&characterEncoding=utf-8"    spark.read. option("driver", "com.mysql.jdbc.Driver"). jdbc(url, "client_group",  prop). //   jdbc(url, "client_group", Array("proc_status=1", "opt_type!=2", "expired_time < now()"), prop). persist(StorageLevel.MEMORY_ONLY). createOrReplaceTempView("tv_client_group")

以下方法并行处理,但要注意条件是否包含重合数据

jdbc(url, "client_group", Array("proc_status=1", "opt_type!=2", "expired_time < now()"), prop)
方法2
val dataframe = spark.read. format("jdbc"). option("driver", "com.mysql.jdbc.Driver"). option("dbtable", "dbTableName"). option("url", url). option("user", "userName"). option("password", "password"). load()

调用write方法

oracle
sql(   s"""SELECT |  cast(MEMBER_ID as INT) MEMBER_ID, |  IF(sale_channel = '1',  CAST(NULL AS STRING), store_id) STORE_CODE, |  score INDI_VALUE, |  1 IS_LATEST, |  CAST('$quarterDate' AS TIMESTAMP) CREATE_TIME, |  bu_id BU_CODE, |  CAST(NULL AS INT) MEMBER_ID_TMP |FROM recommendation.member_attribute_store |WHERe create_date='$quarterDate' |""".stripMargin).   write.option("driver", "oracle.jdbc.driver.OracleDriver").   mode(SaveMode.Append).jdbc(url, tableName, prop) 
 
反对 0举报 0 收藏 0 评论 0

(c)2022 haohaoxuexi.cc SYSTEM All Rights Reserved

冀ICP备17031443号-5