dimanche 11 mai 2014

MapReduce - COLLECT_SET() dans la ruche (Hadoop) - Stack Overflow


I just learned about the collect_set() function in Hive, and I started a job on a development 3-node cluster.


I only have about 10 GB to process. The job, however, is literally taking forever. I think there might be either a bug in the implementation of collect_set(), a bug in my code, or the collect_set() function is really resource intensive.


Here's My SQL for Hive (no pun intended):


INSERT OVERWRITE TABLE sequence_result_1
SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM site_session sess
JOIN (SELECT * FROM site_event
WHERE event_id = 274 OR event_id = 284 OR event_id = 55 OR event_id = 151) evt
ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = evt.session_key)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;

There are 4 MR passes. The first took about 30 seconds. The second Map took about 1 minute. And most of the second reduce took about 2 minutes. In the last two hours, it's been incrementing from 97.71% to 97.73%. Is this right? I think there must be some issue. I took a look at the log, and I can't tell whether it's normal.


[Sample of log]


2011-06-21 16:32:22,715 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Tbl flush: #hash table = 120894
2011-06-21 16:32:22,758 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Table flushed: new size = 108804
2011-06-21 16:32:23,003 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5142000000 rows
2011-06-21 16:32:23,003 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5142000000 rows
2011-06-21 16:32:24,138 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5143000000 rows
2011-06-21 16:32:24,138 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5143000000 rows
2011-06-21 16:32:24,725 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Tbl flush: #hash table = 120894
2011-06-21 16:32:24,768 INFO org.apache.hadoop.hive.ql.exec.GroupByOperator: 6 forwarding 42000000 rows
2011-06-21 16:32:24,771 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Table flushed: new size = 108804
2011-06-21 16:32:25,338 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5144000000 rows
2011-06-21 16:32:25,338 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5144000000 rows
2011-06-21 16:32:26,467 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5145000000 rows
2011-06-21 16:32:26,468 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5145000000 rows

I'm pretty new at this, and trying to work with collect_set() and Hive Array is driving me off the deep end.


Thanks in advance :)




Major fail. My solution below. There was no issue with the COLLECT_SET after all, it just trying to COLLECT all of the items, of which there were infinite.


Why? Because I joined on something that wasn't even part of the set. Second join used to be the same ON condition, now it correctly says hit.session_key = evt.session_key


INSERT OVERWRITE TABLE sequence_result_1
SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM tealeaf_session sess
JOIN site_event evt ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = hit.session_key)
WHERE evt.event_id IN(274,284,55,151)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;



First thing I would try is getting rid of the sub-select and just join to site_event, then move the event_id filter into the outer where clause and change it to an in(). So something like:


SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM site_session sess
JOIN site_event evt ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = evt.session_key)
WHERE evt.event_id in(274,284,55151)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;

Additionally, I don't know the sizes of each table, but in general in Hive, you want to keep your largest tables (usually your fact table) on the right hand side of joins to reduce memory usage. The reason being that Hive attempts to hold the left hand side of a join in memory, and streams the right hand side in order to accomplish the join.




I'd guess what's happen is that it's producing a COLLECT_SET() for EACH row that would have been returned. So for every row you're returning, it's returning the entire array produced by COLLECT_SET. That might be taxing and taking a long time.


Check the performance with COLLECT_SET out of the query. If that is speedy enough, push the calculation of COLLECT_SET into a sub-query and then use that column instead of doing the calculation where you are.


I haven't used COLLECT_SET or done any tests, from what your post, that's what I'd first suspect.



I just learned about the collect_set() function in Hive, and I started a job on a development 3-node cluster.


I only have about 10 GB to process. The job, however, is literally taking forever. I think there might be either a bug in the implementation of collect_set(), a bug in my code, or the collect_set() function is really resource intensive.


Here's My SQL for Hive (no pun intended):


INSERT OVERWRITE TABLE sequence_result_1
SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM site_session sess
JOIN (SELECT * FROM site_event
WHERE event_id = 274 OR event_id = 284 OR event_id = 55 OR event_id = 151) evt
ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = evt.session_key)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;

There are 4 MR passes. The first took about 30 seconds. The second Map took about 1 minute. And most of the second reduce took about 2 minutes. In the last two hours, it's been incrementing from 97.71% to 97.73%. Is this right? I think there must be some issue. I took a look at the log, and I can't tell whether it's normal.


[Sample of log]


2011-06-21 16:32:22,715 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Tbl flush: #hash table = 120894
2011-06-21 16:32:22,758 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Table flushed: new size = 108804
2011-06-21 16:32:23,003 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5142000000 rows
2011-06-21 16:32:23,003 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5142000000 rows
2011-06-21 16:32:24,138 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5143000000 rows
2011-06-21 16:32:24,138 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5143000000 rows
2011-06-21 16:32:24,725 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Tbl flush: #hash table = 120894
2011-06-21 16:32:24,768 INFO org.apache.hadoop.hive.ql.exec.GroupByOperator: 6 forwarding 42000000 rows
2011-06-21 16:32:24,771 WARN org.apache.hadoop.hive.ql.exec.GroupByOperator: Hash Table flushed: new size = 108804
2011-06-21 16:32:25,338 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5144000000 rows
2011-06-21 16:32:25,338 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5144000000 rows
2011-06-21 16:32:26,467 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 5145000000 rows
2011-06-21 16:32:26,468 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 5145000000 rows

I'm pretty new at this, and trying to work with collect_set() and Hive Array is driving me off the deep end.


Thanks in advance :)



Major fail. My solution below. There was no issue with the COLLECT_SET after all, it just trying to COLLECT all of the items, of which there were infinite.


Why? Because I joined on something that wasn't even part of the set. Second join used to be the same ON condition, now it correctly says hit.session_key = evt.session_key


INSERT OVERWRITE TABLE sequence_result_1
SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM tealeaf_session sess
JOIN site_event evt ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = hit.session_key)
WHERE evt.event_id IN(274,284,55,151)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;


First thing I would try is getting rid of the sub-select and just join to site_event, then move the event_id filter into the outer where clause and change it to an in(). So something like:


SELECT sess.session_key as session_key,
sess.remote_address as remote_address,
sess.hit_count as hit_count,
COLLECT_SET(evt.event_id) as event_set,
hit.rsp_timestamp as hit_timestamp,
sess.site_link as site_link
FROM site_session sess
JOIN site_event evt ON (sess.session_key = evt.session_key)
JOIN site_hit hit ON (sess.session_key = evt.session_key)
WHERE evt.event_id in(274,284,55151)
GROUP BY sess.session_key, sess.remote_address, sess.hit_count, hit.rsp_timestamp, sess.site_link
ORDER BY hit_timestamp;

Additionally, I don't know the sizes of each table, but in general in Hive, you want to keep your largest tables (usually your fact table) on the right hand side of joins to reduce memory usage. The reason being that Hive attempts to hold the left hand side of a join in memory, and streams the right hand side in order to accomplish the join.



I'd guess what's happen is that it's producing a COLLECT_SET() for EACH row that would have been returned. So for every row you're returning, it's returning the entire array produced by COLLECT_SET. That might be taxing and taking a long time.


Check the performance with COLLECT_SET out of the query. If that is speedy enough, push the calculation of COLLECT_SET into a sub-query and then use that column instead of doing the calculation where you are.


I haven't used COLLECT_SET or done any tests, from what your post, that's what I'd first suspect.


0 commentaires:

Enregistrer un commentaire