I was recently given a data Challenge to complete, I was tasked to use python to install and configure multiple databases and perform an ETL pipeline between both.
To Begin I launched T.2 Micro Linux AMI Instance and installed Mysql along with MongoDB, the CLI commands can be seen below
Installation of MySQL
$ sudo yum update
## Found Mysql version as seen below, mysql has forked to open source mariadb for Linux AMI EC2
$ sudo yum search mysql
Found: mariadb.x86_64 : A community developed branch of MySQL
#Run the following commands to start the installation
$ sudo yum install mariadb.x86_64
$ sudo yum install -y mariadb-server
$ sudo systemctl enable mariadb
$ sudo mysql_secure_installation
#credentials are here
username: root
Password: test
Installation of MongoDB
##MongoDB installation
##Create a repo file
$ Sudo nano /etc/yum.repos.d/mongodb-org-4.4.repo
##Copy and paste the following into the file: !! copy and paste doesn't preserve line breaks in Nano GUI
[mongodb-org-4.4]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/4.4/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.4.asc
##to see what method to launch MongoDB run the following
$ ps --no-headers -o comm 1
## I received systemmd so we commence with the following lines of code to start the database
$ sudo systemctl start mongod
##Check the database using this command
$ sudo systemctl status mongod
#bind proper IP
$ Sudo nano /etc/mongod.conf
##find bindIp; change localhost IP 127.0.0.1 to 0.0.0.0 to open to world
##Lastly, Make sure both; MongoDB and MariaDB, start when the system reboots; use the following
$ sudo systemctl restart mongod
$ sudo systemctl enable mariadb
Connection to MySQL and MongoDB Remote: Configuring Security Groups/ Remote Applications
I needed to add TCP port 22 to allow MySql remote connections from MySQL workbench, I also added TCP port 27017 to be exposed to my Local IP address alone. MongoDB doesn’t need a password, but to secure it I would only allow connections from one source. To access the Databases remotely I used MySql Workbench and MongoDB Compass
Installation of Python 3.7 on Linux AMI and Checking MySQL connection
#install Python 3.7
$ Sudo install python3.7
#check version
$ python3 --version
# I have Python 3.7.9
##INSTALL PIP
$ curl -O https://bootstrap.pypa.io/get-pip.py
$ python3 get-pip.py --user
#Install Mysql Connector for Sql server
$ sudo pip3 install mysql-connector
#run script to test connection
{
import mysql.connector
cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')
cnx.close()
}
Preparation of ETL and Connection Testing
#Getting ready for ETL, Install Pymongo, install pymysql
$ sudo pip3 install pymongo
$ sudo pip3 install pymsql
$ sudo pip3 install pandas
#create simple python script to test connection MONGODB TEST
{
from pymongo import MongoClient
# pprint library is used to make the output look more pretty
from pprint import pprint
# connect to MongoDB,
client = MongoClient('mongodb://localhost:27017')
db=client.admin
# Issue the serverStatus command and print the results
serverStatusResult=db.command("serverStatus")
pprint(serverStatusResult)
}
#create simple python script to test connection MYSQL TEST
{
import pymysql
# Connect to the database.
connection = pymysql.connect(host='localhost',
user='root',
password='test',
db='test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
print ("connect successful!!")
}
Loading Data into MySQL Database
Creating Table and loading data
#Create Table
{
import mysql.connector
cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')
mycursor= cnx.cursor()
mycursor.execute("CREATE TABLE users (ID VARCHAR(255), Name VARCHAR(255), Department VARCHAR(255), Location VARCHAR(255), Pay VARCHAR(255))")
cnx.close()
}
#Add Entries into SQL Table
{
import mysql.connector
cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')
mycursor = cnx.cursor()
sql = "INSERT INTO users (ID, Name, Department, Location, Pay) VALUES (%s, %s, %s, %s, %d)"
val = [
('0001', 'George Michaels', 'Maintenance', 'Warehouse 5', '50000', ),
('0002', 'Michael Jackson', 'Sales', 'Warehouse 5', '80000'),
('0003', 'Phil Collins', 'IT', 'Warehouse 2', '75000'),
('0004', 'Matthew Bellamy', 'IT', 'Warehouse 3', '75000'),
('0005', 'Till Lindermann', 'Security', 'Warehouse 1', '70000'),
('0006', 'Weird Al Yankovich', 'Sales', 'Warehouse 4', '80000'),
('0008', 'Joe Shmo', 'Security', 'Warehouse 3', '70000')
('0009', 'John Doe', 'Sales', 'Warehouse 2', '80000')
('0010', 'Max Payne', 'Maintenance', 'Warehouse 4', '50000')
('0011', 'Dean Martin', 'Sales', 'Warehouse 5', '80000')
('0012', 'Jayne Doe', 'Security', 'Warehouse 4', '70000')
('0013', 'meghan markle', 'Sales', 'Warehouse 1', '80000')
('0014', 'William Anderson', 'Maintenance', 'Warehouse 4', '50000')
('0015', 'Jack Sparrow', 'IT', 'Warehouse 3', '75000')
('0016', 'Thomas Anderson', 'Security', 'Warehouse 4', '70000')
('0017', 'Dean Richards', 'Maintenance', 'Warehouse 2', '50000')
]
mycursor.executemany(sql, val)
cnx.commit()
print(mycursor.rowcount, "was inserted.")
}
ETL Python Script
This ETL function will Extract all data from the MySQL database and perform two functions
- Extract all Data from MySQL database and load it into a Dataframe. The Dataframe output is seen below
ID Name Department Location Pay
0 0001 George Michaels Maintenance Warehouse 5 52000
1 0002 Michael Jackson Sales Warehouse 5 89000
2 0003 Phil Collins IT Warehouse 2 77000
3 0004 Matthew Bellamy IT Warehouse 3 79000
4 0005 Till Lindermann Security Warehouse 1 78000
5 0006 Weird Al Yankovich Sales Warehouse 4 81000
6 0008 Joe Shmo Security Warehouse 3 72000
7 0009 John Doe Sales Warehouse 2 83000
8 0010 Max Payne Maintenance Warehouse 4 53000
9 0011 Dean Martin Sales Warehouse 5 84000
10 0012 Jayne Doe Security Warehouse 4 72000
11 0013 meghan markle Sales Warehouse 1 83000
12 0014 William Anderson Maintenance Warehouse 4 52000
13 0015 Jack Sparrow IT Warehouse 3 74000
14 0016 Thomas Anderson Security Warehouse 4 71000
15 0017 Dean Richards Maintenance Warehouse 2 51000
2. Take the Dataframe and transform the table above to find the mean Pay for each department.
3. Load the Data into the MongoDB database, both the original data and transformed Data.
Script for both ETL functions are seen below:
import mysql.connector as sql
import pymongo
from pymongo import MongoClient
import pandas as pd
#EXTRACT FROM MYSQL
db_connection = sql.connect(host='localhost', database='test', user='root', password='test')
db_cursor = db_connection.cursor()
db_cursor.execute('SELECT * FROM users')
table_rows = db_cursor.fetchall()
df = pd.DataFrame(table_rows, columns = ["ID", "Name", "Department", "Location", "Pay"])
df['Pay'] = df['Pay'].astype(int)
#print(df)
#TRANSFORM DATA INTO AVERAGE PAY PER DEPARTMENT
tdf = df.groupby('Department')['Pay'].mean().reset_index(name ='Average Pay')
print(tdf)
#Connect to MONGODB
client = MongoClient()
client = MongoClient('mongodb://localhost:27017/Test')
#select database
db = client['Test']
#select the collection within the database
testcollection = db.testcollection
#convert entire collection to Pandas dataframe
#LOAD DATA, two sets, one is original datavalues, other is transformed values
testcollection.insert_many(df.to_dict('records'))
testcollection.insert_many(tdf.to_dict('records'))
Transformed Data: MongoDB
#This shows the original entries being converted to JSON and imported into MongoDB, I added one sample below
{
"ID": "0001",
"Name": "George Michaels",
"Department": "Maintenance",
"Location": "Warehouse 5",
"Pay": 52000
}
#These Show the transformed data points; which display the mean pay for each department
{
"Department": "IT",
"Average Pay": 76666.66666666667
}
{
"Department": "Maintenance",
"Average Pay": 52000
}
{
"Department": "Sales",
"Average Pay": 84000
}
{
"Department": "Security",
"Average Pay": 73250
}