hadoop - tutorial - pig vs hive




Apache Pig: FLATTEN et l'exécution parallèle des réducteurs (3)

Il n'y a pas de garantie si pig utilise la valeur de configuration DEFAULT_PARALLEL pour chaque étape du script pig. Essayez PARALLEL avec votre étape de jointure / groupe spécifique qui vous semble prendre du temps (dans votre cas, l'étape GROUP).

 inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;

Si cela ne fonctionne toujours pas alors vous devrez peut-être voir vos données pour le problème d'asymétrie.

J'ai implémenté un script Apache Pig. Quand j'exécute le script, il en résulte de nombreux mappeurs pour une étape spécifique, mais il n'y a qu'un seul réducteur pour cette étape. En raison de cette condition (de nombreux mappeurs, un réducteur), le cluster Hadoop est presque inactif pendant l'exécution du réducteur unique. Afin de mieux utiliser les ressources de la grappe, j'aimerais aussi avoir beaucoup de réducteurs en parallèle.

Même si je mets le parallélisme dans le script Pig en utilisant la commande SET DEFAULT_PARALLEL, je n'ai toujours qu'un seul réducteur.

La partie de code qui génère le problème est la suivante:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

Les alias 'inputData' et 'inputDataGrouped' sont calculés dans le mappeur.

Les 'paires' et les 'pairesFlat' dans le réducteur.

Si je change le script en supprimant la ligne avec la commande FLATTEN (pairesFlat = FOREACH paires GENERATE FLATTEN (paires_bag) AS (item1: int, item2: int);) alors l'exécution se traduit par 5 réducteurs (et donc dans une exécution parallèle) .

Il semble que la commande FLATTEN soit le problème et évite que de nombreux réducteurs soient créés.

Comment pourrais-je atteindre le même résultat de FLATTEN mais avoir le script en cours d'exécution en parallèle (avec beaucoup de réducteurs)?

Modifier:

EXPLAIN plan lorsque vous avez deux FOREACH (comme ci-dessus):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

EXPLAIN plan lorsque vous avez un seul FOREACH avec FLATTEN enveloppant l'UDF:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false

J'ai essayé "set default parallel" et "PARALLEL 100" mais pas de chance. Le cochon utilise encore 1 réducteur.

Il s'est avéré que je devais générer un nombre aléatoire de 1 à 100 pour chaque enregistrement et regrouper ces enregistrements par ce nombre aléatoire.

Nous perdons du temps sur le regroupement, mais c'est beaucoup plus rapide pour moi parce que maintenant je peux utiliser plus de réducteurs.

Voici le code (SUBMITTER est mon propre UDF):

tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));

Pour répondre à votre question, nous devons d'abord savoir combien de réducteurs de cochons sont nécessaires pour accomplir le processus - Réorganiser globalement. Parce que, selon ce que je comprends, la génération / projection ne devrait pas nécessiter un seul réducteur. Je ne peux pas dire la même chose à propos de Flatten. Cependant, nous savons par le bon sens que pendant l'aplatissement, le but est de dé-ndifier les tuples des sacs et vice versa. Et pour ce faire, tous les tuples appartenant à un sac devraient certainement être disponibles dans le même réducteur. J'ai peut être tort. Mais quelqu'un peut-il ajouter quelque chose ici pour obtenir une réponse à cet utilisateur?







apache-pig