lundi 19 mai 2014

des services web d'Amazon - consommer un flux kinesis en python - Stack Overflow


I cant seem to find a decent example that shows how can I consume an AWS Kinesis stream via Python. Can someone please provide me with some examples I could look into?


Best




you should use boto.kinesis:


import boto.kinesis

After you created a stream:


step 1: connect to aws kinesis:


connection = connect_to_region('us-east-1',aws_access_key_id, aws_secret_access_key)

step 2: get the stream info (like how many shards, if it is active ..)


response = None
info = None
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
info = self.connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')

step 3 : get all shard ids, and for each shared id get the shard iterator:


shard_ids = []
stream_name = None
if info and 'StreamDescription' in info:
stream_name = info['StreamDescription']['StreamName']
for shard_id in info['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = self.connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })

step 4 : read the data for each shard


limit is the limit of records that you want to receive. (you can receive up to 10 MB) shard_iterator is the shared from previous step.


tries = 0
result = []
while tries < 100:
tries += 1
response = self.connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator

in your next call to get_records, you should use the shard_iterator that you received with the result of the previous get_records.


note: in one call to get_records, (limit = None) you can receive empty records. if calling to get_records with a limit, you will get the records that are in the same partition key (when you put data in to stream, you have to use partition key :


self.connection.put_record(stream_name, data, partition_key)


I cant seem to find a decent example that shows how can I consume an AWS Kinesis stream via Python. Can someone please provide me with some examples I could look into?


Best



you should use boto.kinesis:


import boto.kinesis

After you created a stream:


step 1: connect to aws kinesis:


connection = connect_to_region('us-east-1',aws_access_key_id, aws_secret_access_key)

step 2: get the stream info (like how many shards, if it is active ..)


response = None
info = None
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
info = self.connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')

step 3 : get all shard ids, and for each shared id get the shard iterator:


shard_ids = []
stream_name = None
if info and 'StreamDescription' in info:
stream_name = info['StreamDescription']['StreamName']
for shard_id in info['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = self.connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })

step 4 : read the data for each shard


limit is the limit of records that you want to receive. (you can receive up to 10 MB) shard_iterator is the shared from previous step.


tries = 0
result = []
while tries < 100:
tries += 1
response = self.connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator

in your next call to get_records, you should use the shard_iterator that you received with the result of the previous get_records.


note: in one call to get_records, (limit = None) you can receive empty records. if calling to get_records with a limit, you will get the records that are in the same partition key (when you put data in to stream, you have to use partition key :


self.connection.put_record(stream_name, data, partition_key)

0 commentaires:

Enregistrer un commentaire