In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (vertically) scalable cloud function.

What you'll implement

Creating a cloud-based Jupyter Notebook environment

If you don't have Anaconda and Jupyter Notebook installed locally on your computer, please create a notebook in GCP's AI Platform using Vertex AI / Colab Enterprise:

Creating the Notebook and Implementing the Logic

Create a cell and insert the following code to install the mysql driver:

!pip install pymysql

Import the newly installed package pymysql in another cell:

import pymysql

Hit "Copy" and add the following new cell to your JupyterLab (replace <see Moodle> with the respective information provided in the section "Organization"):

conn = pymysql.connect(host='<see Moodle>', user=<see Moodle>, password='<see Moodle>', db='<see Moodle>')
conn

This should be the output:

Retrieve all rows from the table "sales"

Next, we want to get all rows from the table for visualization purposes:

with conn.cursor() as cur:
    cur.execute('SELECT * FROM sales')
    rows = cur.fetchall()
    sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
    for row in rows:
        sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
        
print(sales)

This should be the output:

Store table "sales" in cloud storage

In case you are working in the cloud JupyterLab, you can access the cloud storage easily with the following code:

from datetime import datetime
from google.cloud import storage
filename = f"webshop_sales_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
storage_client = storage.Client()
bucket = storage_client.get_bucket("hdm-kueppers")
blob = bucket.blob(filename)

blob.upload_from_string(sales, content_type='text/csv')

You should now be able to see the csv file in the cloud console (under Cloud Storage)

Downloading and opening the file should show all contents of the table "sales":

Results

  1. You created a Jupyter Notebook in order to understand the requirements of our ingestion logic: pulling all data from a table via the pymysql connector and pushing these to the data lake.
  2. You set up a Notebook environment in the cloud which is capable of accessing different resources in your GCP project, especially the cloud SQL server and cloud storage.
  3. You pulled all data from a sample table and (1) showed it in the notebook and (2) stored it on GCS, i.e. the data lake.
  4. Next, we want to transfer the logic to a cloud function.

Creating a cloud function for image retrieval and storage upload

Please go to cloud functions in the console:

Create a function:

We'll use the inline editor:

Set the function (=service) name to "rdbms-ingestion-yourintials". Choose us-central1 as region and a modern Python runtime. Set "Allow public access" under "Authentication". The other parameters can be set as default. Hit create.

Next, let's transfer the logic of the Jupyter Notebook to our method "rdbms-ingestion-yourintials".

from google.cloud import storage
from datetime import datetime
import pymysql
 
def ingest_webshop(request):
    """ connects to the webshop database, retrieves all rows from the
        table "sales" and uploads these into a file on GCS
    """
    # connect to the database - replace <cloud SQL IP> with the IP of your cloud SQL server
    conn = pymysql.connect(host='<see Moodle>', user=<see Moodle>, password='<see Moodle>', db='<see Moodle>')

 
    # create a string that holds all rows of the table "sales" in a csv-ready format
    with conn.cursor() as cur:
        cur.execute('SELECT * FROM sales')
        rows = cur.fetchall()
        sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
        for row in rows:
            sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
 
    # create a filename with a timestamp and store the data in a csv file in the datalake
    filename = f"webshop_sales_fromfunction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    storage_client = storage.Client()
    bucket = storage_client.get_bucket("pk-gcs")
    blob = bucket.blob(filename)
 
    blob.upload_from_string(sales, content_type='text/csv')

    return 'Success'


Next, we'll need to specify that our cloud function requires some Python packages. Click on "requirements.txt" and add this code at the end of the file:

google-cloud-storage
datetime
pymysql

Hit "Deploy" (either from requirements.txt or main.py):

Wait 1-2 minutes until the final function is deployed and then click on the URL.

You should now see the output in your bucket:

Results

  1. You transferred the sample logic from the notebook to a cloud function (main.py).
  2. You specified requirements on packages of your function (requirements.txt).
  3. You deployed the function and tested it.
  4. The ingestion logic (i.e. the cloud function) is now ready to be scheduled regularly.

We set up a cloud scheduler in the lab "Image to GCS with Cloud Functions" (see https://pkuep.github.io/pk-bigdata/batch_ingestion_cloudfunctionimage). The steps to automate this function are the same. Thus, we'll spare them here.

Please make sure to delete your scheduled job in the cloud scheduler (if applicable):

Next, you may want to delete the files in the bucket.

Next, you should delete the cloud function.

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a state-of-the-art cloud ingestion pipeline using cloud functions and scheduled its execution.