In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (vertically) scalable cloud function.

What you'll implement

Creating a cloud-based Jupyter Notebook environment

If you don't have Anaconda and Jupyter Notebook installed locally on your computer, please create a notebook in GCP's AI Platform:

Create a new instance:

Choose "Python 2 and 3" and choose a name for your notebook instance:

Wait until the instance is created. It should take less than a minute.

Open the JupyterLab:

Creating the Notebook and Implementing the Logic

Notebook Creation

Create a Python 3 notebook:

Create a cell and insert the following code to install the mysql driver:

!pip install pymysql

Import the newly installed package pymysql in another cell:

import pymysql

Finding the Database Server's IP address

Now, let's connect to our webshop cloud SQL database. First, we need to find out the database's IP address. Please open another browser tab and navigate to the cloud SQL service. You'll be able to find out the IP address of your SQL server In the overview page of cloud SQL:

Adding the IP address to your JupyterLab cell

Hit "Copy" and add the following new cell to your JupyterLab (replace <Copied IP Address> with your server's address):

conn = pymysql.connect(host='<Copied IP Address>', user='root', password='', db='webshop')
conn

Allowing database connections from your JupyterLab

Since GCP has strict security policies, it is not possible to simply connect to the database. One could set up a secured cloud proxy connection. However, for testing purposes we'll just allow the JupyterLab machine to connect to the MySQL server. Our cloud function will later be automatically allowed to connect (at least when it is running in the same project).

Please navigate to the "Notebooks" page in the "AI platform" and click on the instance name:

You will be forwarded to "Compute Engine", GCPs infrastructure as a service product which underlies most of the services we are using. When scrolling down a bit you'll find the IP address of your Jupyter Lab machine:

Copy the "External IP" to the clipboard.

Navigate back to the cloud SQL page in the cloud console. Select "Connections":

Click on "Add Network" and call it "JupyterLab" with the IP address being pasted into "Network":

Hit "Done" and then "Save". JupyterLab should now be listed with its IP address under "Authorized networks":

Connecting to the database from the JupyterLab cell

Now navigate back to the JupyterLab and execute the cell you created above in your Jupyter Notebook:

conn = pymysql.connect(host='<Copied IP Address>', user='root', password='', db='webshop')
conn

This should be the output:

Retrieve all rows from the table "sales"

Next, we want to get all rows from the table for visualization purposes:

with conn.cursor() as cur:
    cur.execute('SELECT * FROM sales')
    rows = cur.fetchall()
    sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
    for row in rows:
        sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
        
print(sales)

This should be the output:

Store table "sales" in cloud storage

In case you are working in the cloud JupyterLab, you can access the cloud storage easily with the following code:

from datetime import datetime
from google.cloud import storage
filename = f"webshop_sales_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
storage_client = storage.Client()
bucket = storage_client.get_bucket("pk-gcs")
blob = bucket.blob(filename)

blob.upload_from_string(sales, content_type='text/csv')

You should now be able to see the csv file in the cloud console (under Cloud Storage)

Downloading and opening the file should show all contents of the table "sales":

Downloading the Source Code from GitHub (optional)

In the JupyterLab overview page (or via File → New → Terminal), you can open a console:

Within the terminal, you can enter the following code to clone the course's GitHub repository:

git clone https://github.com/pkuep/pk-bigdata-code.git

After refreshing the file browser, you should see the folder "pk-bigdata-code" in which you'll find the source code of this lab (Jupyter Notebooks → batch_ingestion → BigData - Batch Ingestion - Sample Cloud SQL Database Extraction.ipynb):

Results

  1. You created a Jupyter Notebook in order to understand the requirements of our ingestion logic: pulling all data from a table via the pymysql connector and pushing these to the data lake.
  2. You set up a Notebook environment in the cloud which is capable of accessing different resources in your GCP project, especially the cloud SQL server and cloud storage.
  3. You pulled all data from a sample table and (1) showed it in the notebook and (2) stored it on GCS, i.e. the data lake.
  4. Next, we want to transfer the logic to a cloud function.

Creating a cloud function for SQL extraction and storage upload

Please go to cloud functions in the console:

Create a function. Set the function name to "ingest_webshop". Set "Allow unauthenticated invocations" under "Authentication". The other parameters can be set as default. First, you'll need to hit "Save" in the "Trigger" section before being able to continue:

Hit Next. Change the runtime to Python 3.7, change the entry point method (e.g. to ingest_image) and also change the function name in the code editor to, e.g., ingest_webshop:

You are working in the file "main.py". This file holds the logic we want to deploy to our cloud function. However, first, we want to get the connection name of our cloud SQL database in order to avoid entering the IP address. Please navigate to the overview page of cloud SQL and copy the connection name to the clipboard:

Next, let's transfer the logic of the Jupyter Notebook to our method "ingest_webshop".

from google.cloud import storage
from datetime import datetime
import pymysql
 
def ingest_webshop(request):
    """ connects to the webshop database, retrieves all rows from the
        table "sales" and uploads these into a file on GCS
    """
    # connect to the database - replace <cloud SQL IP> with the IP of your cloud SQL server
    conn = pymysql.connect(unix_socket='/cloudsql/pk-bigdata:us-central1:pk-sql', user='root', password='', db='webshop')
 
    # create a string that holds all rows of the table "sales" in a csv-ready format
    with conn.cursor() as cur:
        cur.execute('SELECT * FROM sales')
        rows = cur.fetchall()
        sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
        for row in rows:
            sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
 
    # create a filename with a timestamp and store the data in a csv file in the datalake
    filename = f"webshop_sales_fromfunction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    storage_client = storage.Client()
    bucket = storage_client.get_bucket("pk-gcs")
    blob = bucket.blob(filename)
 
    blob.upload_from_string(sales, content_type='text/csv')

    return 'Success'


Next, we'll need to specify that our cloud function requires some Python packages. Click on "requirements.txt" and add this code at the end of the file:

google-cloud-storage
datetime
pymysql

Hit "Deploy" (either from requirements.txt or main.py):

Wait 1-2 minutes until the function is deployed and then click on its name:

Go to the "Testing" section and hit "Test the function". The output should be "OK".

You should now see the output in your bucket:

Results

  1. You transferred the sample logic from the notebook to a cloud function (main.py).
  2. You specified requirements on packages of your function (requirements.txt).
  3. You deployed the function and tested it.
  4. The ingestion logic (i.e. the cloud function) is now ready to be scheduled regularly.

We set up a cloud scheduler in the lab "Image to GCS with Cloud Functions" (see https://pkuep.github.io/pk-bigdata/batch_ingestion_cloudfunctionimage). The steps to automate this function are the same. Thus, we'll spare them here.

Please make sure to delete your scheduled job in the cloud scheduler (if applicable):

Next, you may want to delete the files in the bucket.

Next, you should delete the cloud function:

You may want to rename and download the JupyterLab Notebook we created (however, it is also available via GitHub):

You should also delete or at least shut down your cloud SQL instance (in the live-lecture, please leave your SQL server running!):

You should also delete or at least shut down your notebook (under "AI Platform") - (in the live lecture, please leave your notebook running!):

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a state-of-the-art cloud ingestion pipeline using cloud functions and scheduled its execution.