python - read - Comment utiliser une source JDBC pour écrire et lire des données dans(Py) Spark?




spark read format (2)

Le but de cette question est de documenter:

  • étapes nécessaires pour lire et écrire des données à l'aide de connexions JDBC dans PySpark

  • problèmes possibles avec les sources JDBC et solutions connues

Avec de petites modifications, ces méthodes devraient fonctionner avec d'autres langues prises en charge, notamment Scala et R.


Écriture de données

  1. Incluez le pilote JDBC applicable lorsque vous soumettez l'application ou démarrez le shell. Vous pouvez utiliser par exemple --packages :

    bin/pyspark --packages group:name:version  

    ou en combinant driver-class-path et jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

    Ces propriétés peuvent également être définies à l'aide de la variable d'environnement PYSPARK_SUBMIT_ARGS avant le PYSPARK_SUBMIT_ARGS l'instance JVM ou de conf/spark-defaults.conf pour définir spark.jars.packages ou spark.jars / spark.driver.extraClassPath .

  2. Choisissez le mode souhaité. Spark JDBC writer prend en charge les modes suivants:

    • append : Ajoute le contenu de ceci: class: DataFrame aux données existantes.
    • overwrite : overwrite les données existantes.
    • ignore : ignore cette opération si les données existent déjà.
    • error (cas par défaut): Lance une exception si des données existent déjà.

    Les Upserts ou autres modifications fines ne sont pas pris en charge.

    mode = ...
  3. Préparez l'URI JDBC, par exemple:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
  4. (Facultatif) Créez un dictionnaire d'arguments JDBC.

    properties = {
        "user": "foo",
        "password": "bar"
    }

    properties / options peuvent également être utilisées pour définir les propriétés de connexion JDBC prises en charge .

  5. Utilisez DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

    pour enregistrer les données (voir pyspark.sql.DataFrameWriter pour plus de détails).

Problèmes connus :

  • Le pilote approprié est introuvable lorsque le pilote a été inclus à l'aide de --packages ( java.sql.SQLException: No suitable driver found for jdbc: ... )

    En supposant qu'il n'y ait pas d'incompatibilité de version de pilote pour résoudre ce problème, vous pouvez ajouter une classe de driver aux properties . Par exemple:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
  • L'utilisation de df.write.format("jdbc").options(...).save() peut entraîner:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource n'autorise pas la création de table en tant que sélection.

    Solution inconnue.

  • dans Pyspark 1.3, vous pouvez essayer d’appeler directement une méthode Java:

    df._jdf.insertIntoJDBC(url, "baz", True)

Lecture de données

  1. Suivez les étapes 1 à 4 de Écriture de données
  2. Utilisez sqlContext.read.jdbc :

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)

    ou sqlContext.read.format("jdbc") :

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

Problèmes connus et problèmes rencontrés :

  • Le pilote approprié est introuvable - voir: Écriture de données
  • Spark SQL prend en charge le rabattement des prédicats avec les sources JDBC bien que tous les prédicats ne puissent pas être abaissés. De plus, il ne délègue pas de limites ni d'agrégations. La solution possible consiste à remplacer l'argument dbtable / table par une sous-requête valide. Voir par exemple:

    • Est-ce que les prédicats d'étincelle fonctionnent avec JDBC?
    • Plus d'une heure pour exécuter pyspark.sql.DataFrame.take (4)
    • Comment utiliser une requête SQL pour définir une table dans dbtable?
  • Par défaut, les sources de données JDBC chargent les données de manière séquentielle à l'aide d'un seul thread exécuteur. Pour assurer le chargement des données distribuées, vous pouvez:

    • Fournissez la column partitionnement (doit être IntegeType ), lowerBound , upperBound , numPartitions .
    • Fournissez une liste de prédicats de predicates s'excluant mutuellement, un pour chaque partition souhaitée.

    Voir:

    • Partitionner en étincelle lors de la lecture du SGBDR via JDBC ,
    • Comment optimiser le partitionnement lors de la migration de données depuis une source JDBC? ,
    • Comment améliorer les performances pour les travaux Spark lents utilisant DataFrame et la connexion JDBC?
    • Comment partitionner Spark RDD lors de l'importation de Postgres à l'aide de JDBC?
  • En mode distribué (avec colonne ou prédicats de partitionnement), chaque exécuteur fonctionne dans sa propre transaction. Si la base de données source est modifiée en même temps, rien ne garantit que la vue finale sera cohérente.

Où trouver les pilotes appropriés:

Autres options

En fonction de la base de données, des sources spécialisées peuvent exister et être préférées dans certains cas:


Reportez-vous à ce lien pour télécharger le fichier jdbc pour postgres et suivez les étapes pour télécharger le fichier jar.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html fichier jar sera téléchargé dans le chemin comme ceci. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

Si votre version d'étincelle est 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "[email protected]") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

enregistrez le fichier en tant que python et exécutez "python respectfilename.py"





pyspark