AWS Project 1





Watch my YouTube video for explanation :




Please look into the class notes here for your references:


AWS Redshift KInesis Redshift  - Done video as named "AWS Project 1"
https://aws.amazon.com/blogs/big-data/stream-transform-and-analyze-xml-data-in-real-time-with-amazon-kinesis-aws-lambda-and-amazon-redshift/


Lambda Function
======================================================
from __future__ import print_function

import base64
import json
import boto3
import os
import time
import csv 
import sys

from xml.etree.ElementTree import XML, fromstring
import xml.etree.ElementTree as ET

print('Loading function')


def lambda_handler(event, context):
    output = []
    
    print(event)
    
    for record in event['Records']:
        payload = base64.b64decode(record["kinesis"]["data"])
        parsedRecords = parseXML(payload)
        print(parsedRecords.decode("utf-8"))
        
        fullyparsed=parsedRecords.decode("utf-8")
        # Do custom processing on the payload here
        #output_record = {'recordId': record["kinesis"]['sequenceNumber'],'result': 'Ok','data': base64.b64encode(parsedRecords)}
        output_record = {'recordId': record["kinesis"]['sequenceNumber'],'result': 'Ok','data': base64.b64encode(parsedRecords).decode("utf-8")}
        output.append(output_record)

        print('Successfully processed {} records.'.format(len(event['Records'])))
        print(output)
        return {'records': output}
    
    
def parseXML(inputXML):
    xmlstring =  str(inputXML.decode('utf-8'))
    
    # create element tree object
    root = ET.fromstring(str(xmlstring))
    #print("Root Tag"+root.tag)
    
    # create empty list for items 
    xmlItems = ""
  
    # iterate over employee records
    for item in root.findall('employee'):
       #print("child tag name:"+item.tag+" - Child attribute")
       
       # Form pipe delimited string, by concatenating XML values
       record = item.find('first_name').text + "|" + item.find('last_name').text + "|" + item.find('phone').text
       
       primaryaddress = ""
       secondaryaddress = ""
       
       # Get primary address and secondary address separately to be concatenated to the original record in sequence
       for addressitem in item.find('all_address').findall('address'):
           if(addressitem.find('type').text == "primary"):
               primaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
           elif(addressitem.find('type').text == "secondary"):
               secondaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
               
       #print("Primary Address:"+primaryaddress)
       #print("Secondary Address:"+secondaryaddress)
       
       record += "|" + primaryaddress + "|" + secondaryaddress + "\n"
       xmlItems += record
       #print("Record"+record)
    
    #print("Final Transformed Output:"+xmlItems)
    return xmlItems.encode('utf-8')
=============================================================================================================================



aws kinesis put-record --stream-name <Stream-Name> --data 
"<employees>
  <employee>
   <first_name>FName 1</first_name>
   <last_name>LName 1</last_name>
   <all_address><address>
<type>primary</type>
<street_address>Street Address 1</street_address>
<state>State 1</state>
<zip>11111</zip>
</address>
<address>
<type>secondary</type>
<street_address>Street Address 2</street_address>
<state>State 2</state>
<zip>11112</zip>
</address>
</all_address>
<phone>111-111-1111</phone>
  </employee>
  <employee>
  <first_name>FName 2</first_name>
  <last_name>LName 2</last_name>
  <all_address><address><type>primary</type><street_address>Street Address 3</street_address><state>State 3</state><zip>11113</zip></address><address><type>secondary</type><street_address>Street Address 4</street_address><state>State 4</state><zip>11114</zip></address></all_address><phone>111-111-1112</phone></employee></employees>" —partition-key <partition-key-name>






create table employee
(
first_name varchar(20), 
last_name varchar(20), 
phone varchar(20), 
primary_address_street varchar(60), 
primary_address_state varchar(20), 
primary_address_zip varchar(20), 
secondary_address_street varchar(60), 
secondary_address_state varchar(20), 
secondary_address_zipvarchar(20)
);


Fore more Abinitio, AWS and data engineering videos please subscribe , view , like and share my YouTube channel 

Click DataPundit

Comments

Popular posts from this blog

Abinitio Interview Question # 1 - Write Multiple Files in Abinitio

Next In Sequence in ABinitio | How next_in_sequence() works in MFS