python tutorial 如何使用JDBC源在(Py)Spark中寫入和讀取數據?




spark教學 (3)

請參閱此鏈接以下載postgres的jdbc,並按照步驟下載jar文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar文件將在這樣的路徑下載。 “/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”

如果你的火花版本是2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "[email protected]") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

將文件保存為python並運行“python representfilename.py”

這個問題的目標是記錄:

  • 在PySpark中使用JDBC連接讀取和寫入數據所需的步驟

  • JDBC源和已知解決方案可能存在的問題

通過小的更改,這些方法應該與其他支持的語言一起使用,包括Scala和R.


寫數據

  1. 提交應用程序或啟動shell時包含適用的JDBC驅動程序。 你可以使用例如--packages

    bin/pyspark --packages group:name:version  
    

    或者結合使用driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    也可以在啟動JVM實例之前使用PYSPARK_SUBMIT_ARGS環境變量設置這些屬性,或使用conf/spark-defaults.conf設置spark.jars.packagesspark.jars / spark.driver.extraClassPath

  2. 選擇所需的模式。 Spark JDBC writer支持以下模式:

    • append :追加以下內容:class: DataFrame到現有數據。
    • overwrite :覆蓋現有數據。
    • ignore :如果數據已存在,則靜默忽略此操作。
    • error (默認情況):如果數據已存在則拋出異常。

    不支持Upserts或其他細粒度修改

    mode = ...
    
  3. 準備JDBC URI,例如:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (可選)創建JDBC參數的字典。

    properties = {
        "user": "foo",
        "password": "bar"
    }
    
  5. 使用DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    保存數據(有關詳細信息,請參閱pyspark.sql.DataFrameWriter )。

已知問題

  • 使用--packages包含java.sql.SQLException: No suitable driver found for jdbc: ...java.sql.SQLException: No suitable driver found for jdbc: ...

    假設沒有驅動程序版本不匹配來解決此問題,您可以向properties添加driver類。 例如:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • 使用df.write.format("jdbc").options(...).save()可能導致:

    java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允許將create table作為select。

    解決方案未知

  • 在Pyspark 1.3中,您可以嘗試直接調用Java方法:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

讀數據

  1. 按照寫入數據的步驟1-4 進行操作
  2. 使用sqlContext.read.jdbc

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    sqlContext.read.format("jdbc")

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

已知問題和陷阱

  • 找不到合適的驅動程序 - 請參閱:寫入數據
  • Spark SQL支持使用JDBC源進行謂詞下推,儘管並非所有謂詞都可以下推。 它也不會委託限製或聚合。 可能的解決方法是使用有效的子查詢替換dbtable / table參數。 參見例如:
    • spark謂詞下推是否適用於JDBC?
    • 一個多小時執行pyspark.sql.DataFrame.take(4)
  • 默認情況下,JDBC數據源使用單個執行程序線程按順序加載數據。 要確保分佈式數據加載,您可以:

    • 提供分區column (必須是IntegeType ), lowerBoundupperBoundnumPartitions
    • 提供互斥謂詞predicates列表,每個predicates分區一個。
  • 在分佈式模式(具有分區列或謂詞)中,每個執行程序在其自己的事務中操作。 如果同時修改源數據庫,則無法保證最終視圖將保持一致。

哪裡可以找到合適的司機:


下載mysql-connector-java驅動程序並保存在spark jar文件夾中,觀察bellow python代碼,這裡將數據寫入“acotr1”,我們要在mysql數據庫中創建acotr1表結構

spark = SparkSession.builder.appName(“prasadad”)。master('local')。config('spark.driver.extraClassPath','D:\ spark-2.1.0-bin-hadoop2.7 \ jars \ mysql-連接器的Java-5.1.41-bin.jar')。getOrCreate()

sc = spark.sparkContext

從pyspark.sql導入SQLContext

sqlContext = SQLContext(sc)

df = sqlContext.read.format(“jdbc”)。option(url =“jdbc:mysql:// localhost:3306 / sakila”,driver =“com.mysql.jdbc.Driver”,dbtable =“actor”,user =“根”,口令=“Ramyam01”)。負載()

mysql_url =“JDBC:MySQL的://本地主機:3306 / sakila的用戶=根&密碼= Ramyam01”

df.write.jdbc(mysql_url,表=“actor1”,模式=“追加”)





pyspark