In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).
Ingesting structured relational data into the data lake for batch processing via a (vertically) scalable cloud function.
If you don't have Anaconda and Jupyter Notebook installed locally on your computer, please create a notebook in GCP's AI Platform using Vertex AI / Colab Enterprise:
Create a cell and insert the following code to install the mysql driver:
!pip install pymysql
Import the newly installed package pymysql in another cell:
import pymysql
Hit "Copy" and add the following new cell to your JupyterLab (replace <see Moodle> with the respective information provided in the section "Organization"):
conn = pymysql.connect(host='<see Moodle>', user=<see Moodle>, password='<see Moodle>', db='<see Moodle>')
conn
This should be the output:
Next, we want to get all rows from the table for visualization purposes:
with conn.cursor() as cur:
cur.execute('SELECT * FROM sales')
rows = cur.fetchall()
sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
for row in rows:
sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
print(sales)
This should be the output:
In case you are working in the cloud JupyterLab, you can access the cloud storage easily with the following code:
from datetime import datetime
from google.cloud import storage
filename = f"webshop_sales_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
storage_client = storage.Client()
bucket = storage_client.get_bucket("hdm-kueppers")
blob = bucket.blob(filename)
blob.upload_from_string(sales, content_type='text/csv')
You should now be able to see the csv file in the cloud console (under Cloud Storage)
Downloading and opening the file should show all contents of the table "sales":
Please go to cloud functions in the console:
Create a function:
We'll use the inline editor:
Set the function (=service) name to "rdbms-ingestion-yourintials". Choose us-central1 as region and a modern Python runtime. Set "Allow public access" under "Authentication". The other parameters can be set as default. Hit create.
Next, let's transfer the logic of the Jupyter Notebook to our method "rdbms-ingestion-yourintials".
from google.cloud import storage
from datetime import datetime
import pymysql
def ingest_webshop(request):
""" connects to the webshop database, retrieves all rows from the
table "sales" and uploads these into a file on GCS
"""
# connect to the database - replace <cloud SQL IP> with the IP of your cloud SQL server
conn = pymysql.connect(host='<see Moodle>', user=<see Moodle>, password='<see Moodle>', db='<see Moodle>')
# create a string that holds all rows of the table "sales" in a csv-ready format
with conn.cursor() as cur:
cur.execute('SELECT * FROM sales')
rows = cur.fetchall()
sales = "date, weekday, region, age_group, product, product_group, sales_value\n"
for row in rows:
sales += (f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]}\n')
# create a filename with a timestamp and store the data in a csv file in the datalake
filename = f"webshop_sales_fromfunction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
storage_client = storage.Client()
bucket = storage_client.get_bucket("pk-gcs")
blob = bucket.blob(filename)
blob.upload_from_string(sales, content_type='text/csv')
return 'Success'
Next, we'll need to specify that our cloud function requires some Python packages. Click on "requirements.txt" and add this code at the end of the file:
google-cloud-storage
datetime
pymysql
Hit "Deploy" (either from requirements.txt or main.py):
Wait 1-2 minutes until the final function is deployed and then click on the URL.
You should now see the output in your bucket:
We set up a cloud scheduler in the lab "Image to GCS with Cloud Functions" (see https://pkuep.github.io/pk-bigdata/batch_ingestion_cloudfunctionimage). The steps to automate this function are the same. Thus, we'll spare them here.
Please make sure to delete your scheduled job in the cloud scheduler (if applicable):
Next, you may want to delete the files in the bucket.
Next, you should delete the cloud function.
Congratulations, you set up a state-of-the-art cloud ingestion pipeline using cloud functions and scheduled its execution.