A Journey in Learning

Data Science Challenge: ETL from MySQL to MongoDB

I was recently given a data Challenge to complete, I was tasked to use python to install and configure multiple databases and perform an ETL pipeline between both.

To Begin I launched T.2 Micro Linux AMI Instance and installed Mysql along with MongoDB, the CLI commands can be seen below

Installation of MySQL

  $ sudo yum update

## Found Mysql version as seen below, mysql has forked to open source mariadb for Linux AMI EC2
  $ sudo yum search mysql
  Found: mariadb.x86_64 : A community developed branch of MySQL

#Run the following commands to start the installation
  $ sudo yum install mariadb.x86_64
  $ sudo yum install -y mariadb-server
  $ sudo systemctl enable mariadb
  $ sudo mysql_secure_installation

#credentials are here
  username: root
  Password: test

Installation of MongoDB

##MongoDB installation

##Create a repo file
  $ Sudo nano /etc/yum.repos.d/mongodb-org-4.4.repo

##Copy and paste the following into the file: !! copy and paste doesn't preserve line breaks in Nano GUI
  [mongodb-org-4.4]
  name=MongoDB Repository
  baseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/4.4/x86_64/
  gpgcheck=1
  enabled=1
  gpgkey=https://www.mongodb.org/static/pgp/server-4.4.asc

##to see what method to launch MongoDB run the following
  $ ps --no-headers -o comm 1

## I received systemmd so we commence with the following lines of code to start the database
  $ sudo systemctl start mongod

##Check the database using this command
  $ sudo systemctl status mongod

#bind proper IP
  $ Sudo nano /etc/mongod.conf
  ##find bindIp; change localhost IP 127.0.0.1 to 0.0.0.0 to open to world

##Lastly, Make sure both; MongoDB and MariaDB, start when the system reboots; use the following
  $ sudo systemctl restart mongod
  $ sudo systemctl enable mariadb 

Connection to MySQL and MongoDB Remote: Configuring Security Groups/ Remote Applications

I needed to add TCP port 22 to allow MySql remote connections from MySQL workbench, I also added TCP port 27017 to be exposed to my Local IP address alone. MongoDB doesn’t need a password, but to secure it I would only allow connections from one source. To access the Databases remotely I used MySql Workbench and MongoDB Compass

Installation of Python 3.7 on Linux AMI and Checking MySQL connection

#install Python 3.7
  $ Sudo install python3.7

#check version
  $ python3 --version
  # I have Python 3.7.9

##INSTALL PIP
  $ curl -O https://bootstrap.pypa.io/get-pip.py
  $ python3 get-pip.py --user

#Install Mysql Connector for Sql server
  $ sudo pip3 install mysql-connector

#run script to test connection
  {
  import mysql.connector

  cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')
  cnx.close()
  }

Preparation of ETL and Connection Testing

#Getting ready for ETL, Install Pymongo, install pymysql
  $ sudo pip3 install pymongo
  $ sudo pip3 install pymsql
  $ sudo pip3 install pandas


#create simple python script to test connection MONGODB TEST
  {
  from pymongo import MongoClient

  # pprint library is used to make the output look more pretty
  from pprint import pprint

  # connect to MongoDB,
  client = MongoClient('mongodb://localhost:27017')
  db=client.admin

  # Issue the serverStatus command and print the results
  serverStatusResult=db.command("serverStatus")
  pprint(serverStatusResult)
  }

#create simple python script to test connection MYSQL TEST
  {
  import pymysql

  # Connect to the database.
  connection = pymysql.connect(host='localhost',
                              user='root',
                              password='test',
                              db='test',
                              charset='utf8mb4',
                              cursorclass=pymysql.cursors.DictCursor)

  print ("connect successful!!")
  }

Loading Data into MySQL Database

Creating Table and loading data

#Create Table
  {
  import mysql.connector

  cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')

  mycursor= cnx.cursor()
  mycursor.execute("CREATE TABLE users (ID VARCHAR(255), Name VARCHAR(255), Department VARCHAR(255), Location VARCHAR(255), Pay VARCHAR(255))")

  cnx.close()
  }

#Add Entries into SQL Table
  {
  import mysql.connector

  cnx = mysql.connector.connect(user='root', password='test', host='127.0.0.1', database='test')
  mycursor = cnx.cursor()

  sql = "INSERT INTO users (ID, Name, Department, Location, Pay) VALUES (%s, %s, %s, %s, %d)"
  val = [
  ('0001', 'George Michaels', 'Maintenance', 'Warehouse 5', '50000', ),
  ('0002', 'Michael Jackson', 'Sales', 'Warehouse 5', '80000'),
  ('0003', 'Phil Collins', 'IT', 'Warehouse 2', '75000'),
  ('0004', 'Matthew Bellamy', 'IT', 'Warehouse 3', '75000'),
  ('0005', 'Till Lindermann', 'Security', 'Warehouse 1', '70000'),
  ('0006', 'Weird Al Yankovich', 'Sales', 'Warehouse 4', '80000'),
  ('0008', 'Joe Shmo', 'Security', 'Warehouse 3', '70000')
  ('0009', 'John Doe', 'Sales', 'Warehouse 2', '80000')
  ('0010', 'Max Payne', 'Maintenance', 'Warehouse 4', '50000')
  ('0011', 'Dean Martin', 'Sales', 'Warehouse 5', '80000')
  ('0012', 'Jayne Doe', 'Security', 'Warehouse 4', '70000')
  ('0013', 'meghan markle', 'Sales', 'Warehouse 1', '80000')
  ('0014', 'William Anderson', 'Maintenance', 'Warehouse 4', '50000')
  ('0015', 'Jack Sparrow', 'IT', 'Warehouse 3', '75000')
  ('0016', 'Thomas Anderson', 'Security', 'Warehouse 4', '70000')
  ('0017', 'Dean Richards', 'Maintenance', 'Warehouse 2', '50000')
  ]

  mycursor.executemany(sql, val)

  cnx.commit()

  print(mycursor.rowcount, "was inserted.")
  }
Using MySQL Workbench to check if entries were added

ETL Python Script

This ETL function will Extract all data from the MySQL database and perform two functions

  1. Extract all Data from MySQL database and load it into a Dataframe. The Dataframe output is seen below

      ID                Name   Department     Location    Pay
0   0001     George Michaels  Maintenance  Warehouse 5  52000
1   0002     Michael Jackson        Sales  Warehouse 5  89000
2   0003        Phil Collins           IT  Warehouse 2  77000
3   0004     Matthew Bellamy           IT  Warehouse 3  79000
4   0005     Till Lindermann     Security  Warehouse 1  78000
5   0006  Weird Al Yankovich        Sales  Warehouse 4  81000
6   0008            Joe Shmo     Security  Warehouse 3  72000
7   0009            John Doe        Sales  Warehouse 2  83000
8   0010           Max Payne  Maintenance  Warehouse 4  53000
9   0011         Dean Martin        Sales  Warehouse 5  84000
10  0012           Jayne Doe     Security  Warehouse 4  72000
11  0013       meghan markle        Sales  Warehouse 1  83000
12  0014    William Anderson  Maintenance  Warehouse 4  52000
13  0015        Jack Sparrow           IT  Warehouse 3  74000
14  0016     Thomas Anderson     Security  Warehouse 4  71000
15  0017       Dean Richards  Maintenance  Warehouse 2  51000

2. Take the Dataframe and transform the table above to find the mean Pay for each department.

3. Load the Data into the MongoDB database, both the original data and transformed Data.

Script for both ETL functions are seen below:

import mysql.connector as sql
import pymongo
from pymongo import MongoClient
import pandas as pd

#EXTRACT FROM MYSQL
    db_connection = sql.connect(host='localhost', database='test', user='root', password='test')
    db_cursor = db_connection.cursor()
    db_cursor.execute('SELECT * FROM users')
    table_rows = db_cursor.fetchall()
    df = pd.DataFrame(table_rows, columns = ["ID", "Name", "Department", "Location", "Pay"])
    df['Pay'] = df['Pay'].astype(int)
    #print(df)

#TRANSFORM DATA INTO AVERAGE PAY PER DEPARTMENT
    tdf = df.groupby('Department')['Pay'].mean().reset_index(name ='Average Pay')
    print(tdf)

#Connect to MONGODB
    client = MongoClient()
    client = MongoClient('mongodb://localhost:27017/Test')
    #select database
    db = client['Test']
    #select the collection within the database
    testcollection = db.testcollection
    #convert entire collection to Pandas dataframe

#LOAD DATA, two sets, one is original datavalues, other is transformed values
    testcollection.insert_many(df.to_dict('records'))
    testcollection.insert_many(tdf.to_dict('records'))

Transformed Data: MongoDB


#This shows the original entries being converted to JSON and imported into MongoDB, I added one sample below
{
    "ID": "0001",
    "Name": "George Michaels",
    "Department": "Maintenance",
    "Location": "Warehouse 5",
    "Pay": 52000
}

#These Show the transformed data points; which display the mean pay for each department
{
    "Department": "IT",
    "Average Pay": 76666.66666666667
}
{
    "Department": "Maintenance",
    "Average Pay": 52000
}
{
    "Department": "Sales",
    "Average Pay": 84000
}
{
    "Department": "Security",
    "Average Pay": 73250
}
MongoDB Compass shows the data has been successfully transformed