Populate Streaming data into AWS Redshift Kinesis Data Stream
Reading Real Time Data Analytics using AWS
Redshift
See My you tube video for detailed Explanation
1.
Amazon
Kinesis Data Stream
2.
Creating IAM Role for Amazon Redshift
3.
Launch Redshift and Associate the AWS Redshift
with IAM role create in Step2
4.
Create External Schema in Redshift
5.
Create Materialized view
6.
Schedule Materialized view
Project 1 - Putting random generated
payload to Kinesis stream and to the AWS redshift cluster for analytics
--External python program to produce streaming
data
import boto3
import random
client =
boto3.client('kinesis',aws_access_key_id='AKIAYWSRW5TSORSZMAR5',
aws_secret_access_key='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',
region_name='ap-south-1')
for x in range(1, 6):
v
= x * random.randint(1, 4)
t
= x * random.randint(1, 3)
p
= x * random.randint(4,7)
mydata = '{ "vibration": ' + str(v) + ',
"temperature": ' + str(t) + ', "pressure": ' + str(p) + '}'
partitionkey = random.randint(10, 100);
response = client.put_record(StreamName='teststreamdp',
Data=mydata, PartitionKey=str(partitionkey))
print("Ingestion Done")
--external schema for kinesis ---
CREATE EXTERNAL SCHEMA testschema
FROM KINESIS
IAM_ROLE
'arn:aws:iam::598245567716:role/kinnesisredshiftrole';
--Create Materialized View
CREATE MATERIALIZED VIEW mytestview AS
SELECT approximatearrivaltimestamp,
partitionkey,
shardid,
sequencenumber,
json_parse(from_varbyte(data, 'utf-8')) as payload
FROM testschema.teststreamdp;
REFRESH MATERIALIZED VIEW mytestview;
select * from mytestview;
You can schedule this view for refresh in
AWS Redshift itself.
===============================================================
Project 2 - Randomly generated JSON records
to Kinesis
#Generating the random number of JSON
record and send to Kinesis data stream
Open JUPYTER notebook from your local
machine
Use the program below
=================================Python
Program===================
import boto3
import json
from datetime import datetime
import calendar
import random
import time
import boto3
import json
from datetime import datetime
import calendar
import random
import time
my_stream_name = 'simulatorstream'
kinesis_client = boto3.client('kinesis',
aws_access_key_id='AKIAYWSRW5TSORSZMAR5',
aws_secret_access_key='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',
region_name='ap-south-1')
def put_to_stream(thing_id, property_value,
property_timestamp):
payload = {'prop': str(property_value),'timestamp':
str(property_timestamp),'thing_id': thing_id}
print(payload)
put_response = kinesis_client.put_record(StreamName=my_stream_name,Data=json.dumps(payload),PartitionKey=thing_id)
while True:
property_value = random.randint(40, 120)
property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
thing_id = 'dd-dd'
put_to_stream(thing_id, property_value, property_timestamp)
#
wait for 5 second
time.sleep(5)
--Create External Schema
CREATE EXTERNAL SCHEMA testschema
FROM KINESIS
IAM_ROLE
'arn:aws:iam::598245567716:role/kinnesisredshiftrole';
--Create Materialized View
CREATE MATERIALIZED VIEW mv_flightsimulator AS
SELECT approximatearrivaltimestamp,
partitionkey,
shardid,
sequencenumber,
json_parse(from_varbyte(data, 'utf-8')) as payload
--json_extract_path_text(from_varbyte(data,'utf-8'),'prop')
as PROP,
--json_extract_path_text(from_varbyte(data,'utf-8'),'timestamp') as
TIMES,
--json_extract_path_text(from_varbyte(data,'utf-8'),'thing_id') as
THING_ID
FROM testschema.simulatorstream ;
REFRESH MATERIALIZED VIEW mv_flightsimulator;
select * from mv_flightsimulator;
You can schedule this view for refresh in
AWS Redshift itself.
By Datapunditeducation@gmail.com
Use my you tube channel for data engineering
videos such as AWS, ETL Abinitio, Big Data , SQL , UNIX, Linux
good read thanks
ReplyDelete