Populate Streaming data into AWS Redshift Kinesis Data Stream

 

                 Reading Real Time Data Analytics using AWS Redshift



See My you tube video for detailed Explanation





 

1.       Amazon Kinesis Data Stream

2.      Creating IAM Role for Amazon Redshift

3.      Launch Redshift and Associate the AWS Redshift with IAM role create in Step2

4.      Create External Schema in Redshift

5.      Create Materialized view

6.      Schedule Materialized view

   

 


 My You tube Channel

 DataPundit

 

Project 1 - Putting random generated payload to Kinesis stream and to the AWS redshift cluster for analytics

 

--External python program to produce streaming data

import boto3

import random

 

client = boto3.client('kinesis',aws_access_key_id='AKIAYWSRW5TSORSZMAR5', aws_secret_access_key='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', region_name='ap-south-1')

 

for x in range(1, 6):

    v = x * random.randint(1, 4)

    t = x * random.randint(1, 3)

    p = x * random.randint(4,7)

    mydata = '{ "vibration": ' + str(v) + ', "temperature": ' + str(t) + ', "pressure": ' + str(p) + '}'

    partitionkey = random.randint(10, 100);

    response = client.put_record(StreamName='teststreamdp', Data=mydata, PartitionKey=str(partitionkey))

 

print("Ingestion Done")

 

 

--external schema for kinesis ---

 

CREATE EXTERNAL SCHEMA testschema

FROM KINESIS

IAM_ROLE 'arn:aws:iam::598245567716:role/kinnesisredshiftrole';

 

--Create Materialized View

CREATE MATERIALIZED VIEW mytestview AS

    SELECT approximatearrivaltimestamp,

    partitionkey,

    shardid,

    sequencenumber,

    json_parse(from_varbyte(data, 'utf-8')) as payload   

    FROM testschema.teststreamdp;

              

              

REFRESH MATERIALIZED VIEW mytestview;

 

select * from mytestview;

 

You can schedule this view for refresh in AWS Redshift itself.

 

===============================================================

 

Project 2 - Randomly generated JSON records to Kinesis

#Generating the random number of JSON record and send to Kinesis data stream

 

Open JUPYTER notebook from your local machine

Use the program below

=================================Python Program===================

import boto3

import json

from datetime import datetime

import calendar

import random

import time

 

import boto3

import json

from datetime import datetime

import calendar

import random

import time

 

my_stream_name = 'simulatorstream'

 

kinesis_client = boto3.client('kinesis', aws_access_key_id='AKIAYWSRW5TSORSZMAR5', aws_secret_access_key='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', region_name='ap-south-1')

 

 

def put_to_stream(thing_id, property_value, property_timestamp):

    payload = {'prop': str(property_value),'timestamp': str(property_timestamp),'thing_id': thing_id}

 

    print(payload)

 

    put_response = kinesis_client.put_record(StreamName=my_stream_name,Data=json.dumps(payload),PartitionKey=thing_id)

 

while True:

    property_value = random.randint(40, 120)

    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())

    thing_id = 'dd-dd'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second

    time.sleep(5)

              

--Create External Schema

CREATE EXTERNAL SCHEMA testschema

FROM KINESIS

IAM_ROLE 'arn:aws:iam::598245567716:role/kinnesisredshiftrole';

 

--Create Materialized View         

CREATE MATERIALIZED VIEW mv_flightsimulator AS

    SELECT approximatearrivaltimestamp,

    partitionkey,

    shardid,

    sequencenumber,

    json_parse(from_varbyte(data, 'utf-8')) as payload

    --json_extract_path_text(from_varbyte(data,'utf-8'),'prop') as PROP,

    --json_extract_path_text(from_varbyte(data,'utf-8'),'timestamp') as TIMES,

    --json_extract_path_text(from_varbyte(data,'utf-8'),'thing_id') as THING_ID

    FROM testschema.simulatorstream ;

              

              

REFRESH MATERIALIZED VIEW mv_flightsimulator;

 

select * from mv_flightsimulator;

 

 

You can schedule this view for refresh in AWS Redshift itself.

 

 

By Datapunditeducation@gmail.com

 

Use my you tube channel for data engineering videos such as AWS, ETL Abinitio, Big Data , SQL , UNIX, Linux


Comments

Post a Comment

Popular posts from this blog

Abinitio Interview Question # 1 - Write Multiple Files in Abinitio

Next In Sequence in ABinitio | How next_in_sequence() works in MFS