AWS Project 1
Watch my YouTube video for explanation :
Please look into the class notes here for your references:
AWS Redshift KInesis Redshift - Done video as named "AWS Project 1"
https://aws.amazon.com/blogs/big-data/stream-transform-and-analyze-xml-data-in-real-time-with-amazon-kinesis-aws-lambda-and-amazon-redshift/
Lambda Function
======================================================
from __future__ import print_function
import base64
import json
import boto3
import os
import time
import csv
import sys
from xml.etree.ElementTree import XML, fromstring
import xml.etree.ElementTree as ET
print('Loading function')
def lambda_handler(event, context):
output = []
print(event)
for record in event['Records']:
payload = base64.b64decode(record["kinesis"]["data"])
parsedRecords = parseXML(payload)
print(parsedRecords.decode("utf-8"))
fullyparsed=parsedRecords.decode("utf-8")
# Do custom processing on the payload here
#output_record = {'recordId': record["kinesis"]['sequenceNumber'],'result': 'Ok','data': base64.b64encode(parsedRecords)}
output_record = {'recordId': record["kinesis"]['sequenceNumber'],'result': 'Ok','data': base64.b64encode(parsedRecords).decode("utf-8")}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['Records'])))
print(output)
return {'records': output}
def parseXML(inputXML):
xmlstring = str(inputXML.decode('utf-8'))
# create element tree object
root = ET.fromstring(str(xmlstring))
#print("Root Tag"+root.tag)
# create empty list for items
xmlItems = ""
# iterate over employee records
for item in root.findall('employee'):
#print("child tag name:"+item.tag+" - Child attribute")
# Form pipe delimited string, by concatenating XML values
record = item.find('first_name').text + "|" + item.find('last_name').text + "|" + item.find('phone').text
primaryaddress = ""
secondaryaddress = ""
# Get primary address and secondary address separately to be concatenated to the original record in sequence
for addressitem in item.find('all_address').findall('address'):
if(addressitem.find('type').text == "primary"):
primaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
elif(addressitem.find('type').text == "secondary"):
secondaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
#print("Primary Address:"+primaryaddress)
#print("Secondary Address:"+secondaryaddress)
record += "|" + primaryaddress + "|" + secondaryaddress + "\n"
xmlItems += record
#print("Record"+record)
#print("Final Transformed Output:"+xmlItems)
return xmlItems.encode('utf-8')
=============================================================================================================================
aws kinesis put-record --stream-name <Stream-Name> --data
"<employees>
<employee>
<first_name>FName 1</first_name>
<last_name>LName 1</last_name>
<all_address><address>
<type>primary</type>
<street_address>Street Address 1</street_address>
<state>State 1</state>
<zip>11111</zip>
</address>
<address>
<type>secondary</type>
<street_address>Street Address 2</street_address>
<state>State 2</state>
<zip>11112</zip>
</address>
</all_address>
<phone>111-111-1111</phone>
</employee>
<employee>
<first_name>FName 2</first_name>
<last_name>LName 2</last_name>
<all_address><address><type>primary</type><street_address>Street Address 3</street_address><state>State 3</state><zip>11113</zip></address><address><type>secondary</type><street_address>Street Address 4</street_address><state>State 4</state><zip>11114</zip></address></all_address><phone>111-111-1112</phone></employee></employees>" —partition-key <partition-key-name>
create table employee
(
first_name varchar(20),
last_name varchar(20),
phone varchar(20),
primary_address_street varchar(60),
primary_address_state varchar(20),
primary_address_zip varchar(20),
secondary_address_street varchar(60),
secondary_address_state varchar(20),
secondary_address_zipvarchar(20)
);
Fore more Abinitio, AWS and data engineering videos please subscribe , view , like and share my YouTube channel
Click DataPundit
Comments
Post a Comment