Is there a better way to use Apache Hive to sessionize my log data ? I'm not sure that I'm doing so, below, in the optimal way:
The log data is stored in sequence files; a single log entry is a JSON string; eg:
{"source": {"api_key": "app_key_1", "user_id": "user0"}, "events": [{"timestamp": 1330988326, "event_type": "high_score", "event_params": {"score": "1123", "level": "9"}}, {"timestamp": 1330987183, "event_type": "some_event_0", "event_params": {"some_param_00": "val", "some_param_01": 100}}, {"timestamp": 1330987775, "event_type": "some_event_1", "event_params": {"some_param_11": 100, "some_param_10": "val"}}]}
Formatted, this looks like:
{'source': {'api_key': 'app_key_1', 'user_id': 'user0'},
'events': [{'event_params': {'level': '9', 'score': '1123'},
'event_type': 'high_score',
'timestamp': 1330988326},
{'event_params': {'some_param_00': 'val', 'some_param_01': 100},
'event_type': 'some_event_0',
'timestamp': 1330987183},
{'event_params': {'some_param_10': 'val', 'some_param_11': 100},
'event_type': 'some_event_1',
'timestamp': 1330987775}]
}
'source' contains some info ( user_id and api_key ) about the source of the events contained in 'events'; 'events' contains a list of events generated by the source; each event has 'event_params', 'event_type', and 'timestamp' ( timestamp is a Unix timestamp in GMT ). Note that timestamps within a single log entry, and across log entries may be out of order.
Note that I'm constrained such that I cannot change the log format, cannot initially log the data into separate files that are partitioned ( though I could use Hive to do this after the data is logged ), etc.
In the end, I'd like a table of sessions, where a session is associated with an app ( api_k ) and user, and has a start time and session length ( or end time ); sessions are split where, for a given app and user, a gap of 30 or more minutes occurs between events.
My solution does the following ( Hive script and python transform script are below; doesn't seem like it would be useful to show the SerDe source, but let me know if it would be ):
[1] load the data into log_entry_tmp, in a denormalized format
[2] explode the data into log_entry, so that, eg, the above single entry would now have multiple entries:
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"high_score","event_params":{"score":"1123","level":"9"},"event_timestamp":1330988326}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_0","event_params":{"some_param_00":"val","some_param_01":"100"},"event_timestamp":1330987183}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_1","event_params":{"some_param_11":"100","some_param_10":"val"},"event_timestamp":1330987775}
[3] transform and write data into session_info_0, where each entry contains events' app_id, user_id, and timestamp
[4] tranform and write data into session_info_1, where entries are ordered by app_id, user_id, event_timestamp ; and each entry contains a session_id ; the python tranform script finds the splits, and groups the data into sessions
[5] transform and write final session data to session_info_2 ; the sessions' app + user, start time, and length in seconds
[Hive script]
drop table if exists app_info;
create external table app_info ( app_id int, app_name string, api_k string )
location '${WORK}/hive_tables/app_info';
add jar ../build/our-serdes.jar;
-- [1] load the data into log_entry_tmp, in a denormalized format
drop table if exists log_entry_tmp;
create external table log_entry_tmp
row format serde 'com.company.TestLogSerde'
location '${WORK}/hive_tables/test_logs';
drop table if exists log_entry;
create table log_entry (
entry struct<source_api_key:string,
source_user_id:string,
event_type:string,
event_params:map<string,string>,
event_timestamp:bigint>);
-- [2] explode the data into log_entry
insert overwrite table log_entry
select explode (trans0_list) t
from log_entry_tmp;
drop table if exists session_info_0;
create table session_info_0 (
app_id string,
user_id string,
event_timestamp bigint
);
-- [3] transform and write data into session_info_0, where each entry contains events' app_id, user_id, and timestamp
insert overwrite table session_info_0
select ai.app_id, le.entry.source_user_id, le.entry.event_timestamp
from log_entry le
join app_info ai on (le.entry.source_api_key = ai.api_k);
add file ./TestLogTrans.py;
drop table if exists session_info_1;
create table session_info_1 (
session_id string,
app_id string,
user_id string,
event_timestamp bigint,
session_start_datetime string,
session_start_timestamp bigint,
gap_secs int
);
-- [4] tranform and write data into session_info_1, where entries are ordered by app_id, user_id, event_timestamp ; and each entry contains a session_id ; the python tranform script finds the splits, and groups the data into sessions
insert overwrite table session_info_1
select
transform (t.app_id, t.user_id, t.event_timestamp)
using './TestLogTrans.py'
as (session_id, app_id, user_id, event_timestamp, session_start_datetime, session_start_timestamp, gap_secs)
from
(select app_id as app_id, user_id as user_id, event_timestamp as event_timestamp from session_info_0 order by app_id, user_id, event_timestamp ) t;
drop table if exists session_info_2;
create table session_info_2 (
session_id string,
app_id string,
user_id string,
session_start_datetime string,
session_start_timestamp bigint,
len_secs int
);
-- [5] transform and write final session data to session_info_2 ; the sessions' app + user, start time, and length in seconds
insert overwrite table session_info_2
select session_id, app_id, user_id, session_start_datetime, session_start_timestamp, sum(gap_secs)
from session_info_1
group by session_id, app_id, user_id, session_start_datetime, session_start_timestamp;
[TestLogTrans.py]
#!/usr/bin/python
import sys, time
def buildDateTime(ts):
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(ts))
curGroup = None
prevGroup = None
curSessionStartTimestamp = None
curSessionStartDatetime = None
prevTimestamp = None
for line in sys.stdin.readlines():
fields = line.split('\t')
if len(fields) != 3:
raise Exception('fields = %s', fields)
app_id = fields[0]
user_id = fields[1]
event_timestamp = int(fields[2].strip())
curGroup = '%s-%s' % (app_id, user_id)
curTimestamp = event_timestamp
if prevGroup == None:
prevGroup = curGroup
curSessionStartTimestamp = curTimestamp
curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)
prevTimestamp = curTimestamp
isNewGroup = (curGroup != prevGroup)
gapSecs = 0 if isNewGroup else (curTimestamp - prevTimestamp)
isSessionSplit = (gapSecs >= 1800)
if isNewGroup or isSessionSplit:
curSessionStartTimestamp = curTimestamp
curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)
session_id = '%s-%s-%d' % (app_id, user_id, curSessionStartTimestamp)
print '%s\t%s\t%s\t%d\t%s\t%d\t%d' % (session_id, app_id, user_id, curTimestamp, curSessionStartDatetime, curSessionStartTimestamp, gapSecs)
prevGroup = curGroup
prevTimestamp = curTimestamp
I think you could easily drop step 3, and put that query that you use there in as a subquery to your from clause in step 4. Physicalising that transform, doesn't appear to give you anything.
Otherwise I think for what you're trying to achieve here, this seems a reasonable approach.
Potentially step 2 you could achieve using a custom mapper, passing the output into step 4 as a custome reducer (with step 3 built in as a subquery). That will reduce you mapreduce jobs by 1, and therefore could give you a significant saving in time.
Is there a better way to use Apache Hive to sessionize my log data ? I'm not sure that I'm doing so, below, in the optimal way:
The log data is stored in sequence files; a single log entry is a JSON string; eg:
{"source": {"api_key": "app_key_1", "user_id": "user0"}, "events": [{"timestamp": 1330988326, "event_type": "high_score", "event_params": {"score": "1123", "level": "9"}}, {"timestamp": 1330987183, "event_type": "some_event_0", "event_params": {"some_param_00": "val", "some_param_01": 100}}, {"timestamp": 1330987775, "event_type": "some_event_1", "event_params": {"some_param_11": 100, "some_param_10": "val"}}]}
Formatted, this looks like:
{'source': {'api_key': 'app_key_1', 'user_id': 'user0'},
'events': [{'event_params': {'level': '9', 'score': '1123'},
'event_type': 'high_score',
'timestamp': 1330988326},
{'event_params': {'some_param_00': 'val', 'some_param_01': 100},
'event_type': 'some_event_0',
'timestamp': 1330987183},
{'event_params': {'some_param_10': 'val', 'some_param_11': 100},
'event_type': 'some_event_1',
'timestamp': 1330987775}]
}
'source' contains some info ( user_id and api_key ) about the source of the events contained in 'events'; 'events' contains a list of events generated by the source; each event has 'event_params', 'event_type', and 'timestamp' ( timestamp is a Unix timestamp in GMT ). Note that timestamps within a single log entry, and across log entries may be out of order.
Note that I'm constrained such that I cannot change the log format, cannot initially log the data into separate files that are partitioned ( though I could use Hive to do this after the data is logged ), etc.
In the end, I'd like a table of sessions, where a session is associated with an app ( api_k ) and user, and has a start time and session length ( or end time ); sessions are split where, for a given app and user, a gap of 30 or more minutes occurs between events.
My solution does the following ( Hive script and python transform script are below; doesn't seem like it would be useful to show the SerDe source, but let me know if it would be ):
[1] load the data into log_entry_tmp, in a denormalized format
[2] explode the data into log_entry, so that, eg, the above single entry would now have multiple entries:
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"high_score","event_params":{"score":"1123","level":"9"},"event_timestamp":1330988326}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_0","event_params":{"some_param_00":"val","some_param_01":"100"},"event_timestamp":1330987183}
{"source_api_key":"app_key_1","source_user_id":"user0","event_type":"some_event_1","event_params":{"some_param_11":"100","some_param_10":"val"},"event_timestamp":1330987775}
[3] transform and write data into session_info_0, where each entry contains events' app_id, user_id, and timestamp
[4] tranform and write data into session_info_1, where entries are ordered by app_id, user_id, event_timestamp ; and each entry contains a session_id ; the python tranform script finds the splits, and groups the data into sessions
[5] transform and write final session data to session_info_2 ; the sessions' app + user, start time, and length in seconds
[Hive script]
drop table if exists app_info;
create external table app_info ( app_id int, app_name string, api_k string )
location '${WORK}/hive_tables/app_info';
add jar ../build/our-serdes.jar;
-- [1] load the data into log_entry_tmp, in a denormalized format
drop table if exists log_entry_tmp;
create external table log_entry_tmp
row format serde 'com.company.TestLogSerde'
location '${WORK}/hive_tables/test_logs';
drop table if exists log_entry;
create table log_entry (
entry struct<source_api_key:string,
source_user_id:string,
event_type:string,
event_params:map<string,string>,
event_timestamp:bigint>);
-- [2] explode the data into log_entry
insert overwrite table log_entry
select explode (trans0_list) t
from log_entry_tmp;
drop table if exists session_info_0;
create table session_info_0 (
app_id string,
user_id string,
event_timestamp bigint
);
-- [3] transform and write data into session_info_0, where each entry contains events' app_id, user_id, and timestamp
insert overwrite table session_info_0
select ai.app_id, le.entry.source_user_id, le.entry.event_timestamp
from log_entry le
join app_info ai on (le.entry.source_api_key = ai.api_k);
add file ./TestLogTrans.py;
drop table if exists session_info_1;
create table session_info_1 (
session_id string,
app_id string,
user_id string,
event_timestamp bigint,
session_start_datetime string,
session_start_timestamp bigint,
gap_secs int
);
-- [4] tranform and write data into session_info_1, where entries are ordered by app_id, user_id, event_timestamp ; and each entry contains a session_id ; the python tranform script finds the splits, and groups the data into sessions
insert overwrite table session_info_1
select
transform (t.app_id, t.user_id, t.event_timestamp)
using './TestLogTrans.py'
as (session_id, app_id, user_id, event_timestamp, session_start_datetime, session_start_timestamp, gap_secs)
from
(select app_id as app_id, user_id as user_id, event_timestamp as event_timestamp from session_info_0 order by app_id, user_id, event_timestamp ) t;
drop table if exists session_info_2;
create table session_info_2 (
session_id string,
app_id string,
user_id string,
session_start_datetime string,
session_start_timestamp bigint,
len_secs int
);
-- [5] transform and write final session data to session_info_2 ; the sessions' app + user, start time, and length in seconds
insert overwrite table session_info_2
select session_id, app_id, user_id, session_start_datetime, session_start_timestamp, sum(gap_secs)
from session_info_1
group by session_id, app_id, user_id, session_start_datetime, session_start_timestamp;
[TestLogTrans.py]
#!/usr/bin/python
import sys, time
def buildDateTime(ts):
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(ts))
curGroup = None
prevGroup = None
curSessionStartTimestamp = None
curSessionStartDatetime = None
prevTimestamp = None
for line in sys.stdin.readlines():
fields = line.split('\t')
if len(fields) != 3:
raise Exception('fields = %s', fields)
app_id = fields[0]
user_id = fields[1]
event_timestamp = int(fields[2].strip())
curGroup = '%s-%s' % (app_id, user_id)
curTimestamp = event_timestamp
if prevGroup == None:
prevGroup = curGroup
curSessionStartTimestamp = curTimestamp
curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)
prevTimestamp = curTimestamp
isNewGroup = (curGroup != prevGroup)
gapSecs = 0 if isNewGroup else (curTimestamp - prevTimestamp)
isSessionSplit = (gapSecs >= 1800)
if isNewGroup or isSessionSplit:
curSessionStartTimestamp = curTimestamp
curSessionStartDatetime = buildDateTime(curSessionStartTimestamp)
session_id = '%s-%s-%d' % (app_id, user_id, curSessionStartTimestamp)
print '%s\t%s\t%s\t%d\t%s\t%d\t%d' % (session_id, app_id, user_id, curTimestamp, curSessionStartDatetime, curSessionStartTimestamp, gapSecs)
prevGroup = curGroup
prevTimestamp = curTimestamp
I think you could easily drop step 3, and put that query that you use there in as a subquery to your from clause in step 4. Physicalising that transform, doesn't appear to give you anything.
Otherwise I think for what you're trying to achieve here, this seems a reasonable approach.
Potentially step 2 you could achieve using a custom mapper, passing the output into step 4 as a custome reducer (with step 3 built in as a subquery). That will reduce you mapreduce jobs by 1, and therefore could give you a significant saving in time.
0 commentaires:
Enregistrer un commentaire