val prop = new Properties() prop.setProperty("user", "xxx") prop.setProperty("password", "xxx") val url = "jdbc:mysql://ip:port/cdp?useUnicode=true&characterEncoding=utf-8" spark.read. option("driver", "com.mysql.jdbc.Driver"). jdbc(url, "client_group", prop). // jdbc(url, "client_group", Array("proc_status=1", "opt_type!=2", "expired_time < now()"), prop). persist(StorageLevel.MEMORY_ONLY). createOrReplaceTempView("tv_client_group")
以下方法并行处理,但要注意条件是否包含重合数据
jdbc(url, "client_group", Array("proc_status=1", "opt_type!=2", "expired_time < now()"), prop)方法2
val dataframe = spark.read. format("jdbc"). option("driver", "com.mysql.jdbc.Driver"). option("dbtable", "dbTableName"). option("url", url). option("user", "userName"). option("password", "password"). load()写
调用write方法
oraclesql( s"""SELECT | cast(MEMBER_ID as INT) MEMBER_ID, | IF(sale_channel = '1', CAST(NULL AS STRING), store_id) STORE_CODE, | score INDI_VALUE, | 1 IS_LATEST, | CAST('$quarterDate' AS TIMESTAMP) CREATE_TIME, | bu_id BU_CODE, | CAST(NULL AS INT) MEMBER_ID_TMP |FROM recommendation.member_attribute_store |WHERe create_date='$quarterDate' |""".stripMargin). write.option("driver", "oracle.jdbc.driver.OracleDriver"). mode(SaveMode.Append).jdbc(url, tableName, prop)