mercredi 9 avril 2014

Hadoop - Apache cochon : aplatir et parallèle de l'exécution des réducteurs - Stack Overflow


I did implement a Apache Pig script and when I execute the script it results in many mappers for a specific step, but has only one reducer for that step and so the Hadoop cluster is almost idle while this reducer is executed. In order to better use the resources of the cluster I would like to also have many reducers running in parallel.


Even if I set the parallelism in the Pig script using the SET DEFAULT_PARALLEL command I still result in having only 1 reducer.


The code part issuing the problem is the following:


SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

The 'inputData' and 'inputDataGrouped' aliases are computed in the mapper.


The 'pairs' and 'pairsFlat' in the reducer.


If I change the script by removing the line with the FLATTEN command (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);) then the execution results in 5 reducers (and thus in a parallel execution).


It seems that the FLATTEN command is the problem and avoids that many reducers are created.


Does anybody know how I could reach the same result of FLATTEN but having the script being executed in parallel (with many reducers)?


Many thanks in advance for your support!


Best regards, Christian


Edit:


EXPLAIN plan when having two FOREACH (as above):


Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
| |
| Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[int] - scope-27
| |
| |---Project[bytearray][1] - scope-26
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
| |
| Project[bag][0] - scope-39
|
|---pairs: New For Each(false)[bag] - scope-38
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-36
| |
| |---Project[bag][1] - scope-35
| |
| |---Project[bag][1] - scope-34
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

EXPLAIN plan when having only one FOREACH with FLATTEN wrapping the UDF:


Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
| |
| Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
| |
| Cast[chararray] - scope-21
| |
| |---Project[bytearray][0] - scope-20
| |
| Cast[int] - scope-24
| |
| |---Project[bytearray][1] - scope-23
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-33
| |
| |---Project[bag][1] - scope-32
| |
| |---Project[bag][1] - scope-31
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false


I did implement a Apache Pig script and when I execute the script it results in many mappers for a specific step, but has only one reducer for that step and so the Hadoop cluster is almost idle while this reducer is executed. In order to better use the resources of the cluster I would like to also have many reducers running in parallel.


Even if I set the parallelism in the Pig script using the SET DEFAULT_PARALLEL command I still result in having only 1 reducer.


The code part issuing the problem is the following:


SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

The 'inputData' and 'inputDataGrouped' aliases are computed in the mapper.


The 'pairs' and 'pairsFlat' in the reducer.


If I change the script by removing the line with the FLATTEN command (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);) then the execution results in 5 reducers (and thus in a parallel execution).


It seems that the FLATTEN command is the problem and avoids that many reducers are created.


Does anybody know how I could reach the same result of FLATTEN but having the script being executed in parallel (with many reducers)?


Many thanks in advance for your support!


Best regards, Christian


Edit:


EXPLAIN plan when having two FOREACH (as above):


Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
| |
| Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[int] - scope-27
| |
| |---Project[bytearray][1] - scope-26
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
| |
| Project[bag][0] - scope-39
|
|---pairs: New For Each(false)[bag] - scope-38
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-36
| |
| |---Project[bag][1] - scope-35
| |
| |---Project[bag][1] - scope-34
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

EXPLAIN plan when having only one FOREACH with FLATTEN wrapping the UDF:


Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
| |
| Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
| |
| Cast[chararray] - scope-21
| |
| |---Project[bytearray][0] - scope-20
| |
| Cast[int] - scope-24
| |
| |---Project[bytearray][1] - scope-23
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-33
| |
| |---Project[bag][1] - scope-32
| |
| |---Project[bag][1] - scope-31
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false

0 commentaires:

Enregistrer un commentaire