java - spark - thèse doctorat big data pdf




La conversion de la table mysql en un ensemble de données spark est très lente comparée à celle du fichier csv (2)

S'il vous plaît suivez les étapes ci-dessous

1. téléchargez une copie du connecteur JDBC pour mysql. Je crois que vous en avez déjà un.

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2.créer un fichier db-properties.flat dans le format ci-dessous

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3. Créez d'abord une table vide où vous voulez charger les données.

appelez le shell spark avec la classe de pilote

spark-shell --driver-class-path  <your path to mysql jar>

puis importer tout le paquet requis

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

initier un contexte de ruche ou un contexte sql

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

définir certaines propriétés

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

Charger les propriétés de mysql db à partir du fichier

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

créez une requête pour lire les données de votre table et passez-la à la méthode read de #sqlcontext. c'est là que vous pouvez gérer votre clause where

val df1 = "(SELECT  * FROM your_table_name) as s1" 

passer le jdbcurl, sélectionner la requête et les propriétés db à lire la méthode

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

écris-le à ta table

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")

J'ai un fichier csv dans Amazon s3 avec une taille de 62 Mo (114 000 lignes). Je le convertis en jeu de données spark, et en prenant les 500 premières lignes de celui-ci. Le code est comme suit;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

L'opération entière prend 20 à 30 secondes.

Maintenant j'essaie la même chose mais en utilisant plutôt csv j'utilise la table mySQL avec 119 000 lignes. Le serveur MySQL est en amazon ec2. Le code est comme suit;

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;

SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

Cela prend de 5 à 10 minutes. Je cours une étincelle à l'intérieur de jvm. En utilisant la même configuration dans les deux cas.

Je peux utiliser partitionColumn, numParttition etc. mais je n'ai aucune colonne numérique et une autre question est le schéma de la table qui m'est inconnu.

Mon problème n'est pas comment diminuer le temps requis comme je le sais dans le cas idéal étincelle fonctionnera en cluster mais ce que je ne peux pas comprendre est pourquoi cette grande différence de temps dans les deux cas ci-dessus?


Ce problème a été couvert plusieurs fois sur :

  • Comment améliorer les performances des travaux Spark lents à l'aide de la connexion DataFrame et JDBC?
  • spark jdbc df limit ... que fait-il?
  • Comment utiliser la source JDBC pour écrire et lire des données dans (Py) Spark?

et dans des sources externes:

donc juste pour réitérer - par défaut DataFrameReader.jdbc ne distribue pas de données ou ne lit pas. Il utilise un fil unique, un seul exectuor.

Pour distribuer les lectures:

  • utiliser des plages avec lowerBound / upperBound :

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
  • predicates

    Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )




amazon-s3