In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.

What you'll implement

Enabling the SQL cloud admin API (for the cloud SQL proxy)

For this use case, we need to enable a cloud API that allows the simple connection of different services to cloud SQL databases in GCP. Please enter "cloud sql admin api" in the search bar:

Please enable this API! We will use it to establish connections from our DataProc cluster's worker nodes (running the sqoop MapReduce job) to the cloud SQL database (webshop).

Retrieving the cloud SQL connection name

Please navigate to your cloud SQL instance which needs to be running and copy the connection name into the clipboard (overview page):

In my case it is:

pk-bigdata:us-central1:pk-sql

Creating the DataProc cluster with initialization actions and further settings

Opening DataProc

Please navigate to the DataProc service in the cloud console.

This time we will NOT (!) create the cluster via the web interface but via the cloud shell since we need some extra options that are not available in the web UI.

Opening the cloud shell and creating the cluster

Open a cloud shell:

Make sure that your project is set correctly. If not, execute the following command with your initials:

gcloud config set project pk-bigdata

We will now create a cluster from the command line in order to:

Please execute this command in the cloud shell (you may want to replace pk with your initials, make sure that all paths and the connection name to the SQL database copied before are correctly set):

gcloud dataproc clusters create pk-sqoop --region us-central1  --optional-components JUPYTER --enable-component-gateway --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/cloud-sql-proxy/cloud-sql-proxy.sh,gs://goog-dataproc-initialization-actions-us-central1/sqoop/sqoop.sh --metadata "hive-metastore-instance=pk-bigdata:us-central1:pk-sql" --scopes sql-admin --properties=hive:hive.metastore.warehouse.dir=gs://pk-gcs/hive-warehouse 

Cluster creation will take 2-3 minutes.

Connecting to the Hadoop master node

When cluster creation is finished, you can select the cluster in the web UI:

Open a connection to the master node which is able to start sqoop jobs:

Results

  1. You created a Hadoop cluster (DataProc) which provides functionality for accessing cloud SQL (initialization action "cloud-sql-proxy") as well as the sqoop component for batch ingestion of relational data into our data lake.
  2. Next, we'll use the connection to the master node and execute our sqoop data ingestion job.

Executing the job

Let's now start the sqoop job with a further parameter "--as-parquetfile":

sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --target-dir sales_from_sqoop_parquet --as-parquetfile

Checking the results

The sqoop job puts its data into HDFS by default (it is a "traditional" tool). You can check this by executing the following command on the master node:

hadoop fs -ls

This will show two folders if you executed the previous lab and did not delete the HDFS folder ("sales" and "sales_from_sqoop_parquet").

We can now compare the file sizes of both directories :

hadoop fs -ls sales/
hadoop fs -ls sales_from_sqoop_parquet/

You'll notice that the Parquet file is larger in size. This is due to the schema which is "shipped" with this file. With larger databases being ingested into the datalake, the compression becomes visible and Parquet files show strongly decreased file sizes.

Inspecting the Parquet File

Let's take a quick look at the Parquet file:

hadoop fs -cat sales_from_sqoop_parquet/*.parquet

You'll notice the compression (we can't read the file contents) but especially the schema which is placed within the file and derived from the RDBMS schema:

Results

  1. You executed a sqoop (MapReduce) job with a further configuration (HDFS output as Parquetfile).
  2. This setup is big data ready for heavy workloads, since MapReduce is capable of scaling out (horizontally) and if necessary, you can add worker nodes to your cluster (however, the SQL database might become a bottleneck).

Executing the job

Let's now start the sqoop job with a further parameter "--as-avrodatafile":

sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --target-dir sales_from_sqoop_avro --as-avrodatafile

Checking the results

The sqoop job puts its data into HDFS by default (it is a "traditional" tool). You can check this by executing the following command on the master node:

hadoop fs -ls

This will show three folders if you executed the previous lab and did not delete the HDFS folder ("sales", "sales_from_sqoop_parquet", and "sales_from_sqoop_avro").

Instead of comparing the files again, we will now move them to GCS.

Moving data from HDFS to GCS

Since we don't want to store data in HDFS over a long period of time (in order to be able to delete the DataProc cluster), we'll now move the sqoop results to GCS. We can achieve this by using the following commands:

hadoop fs -cp sales_from_sqoop_avro gs://pk-gcs/
hadoop fs -cp sales_from_sqoop_parquet gs://pk-gcs/

Your bucket should now contain all three directories with our sqoop export data in them:

Results

  1. You executed a sqoop (MapReduce) job with a further configuration (HDFS output as Avrofile).
  2. You learned how to move files from HDFS to cloud storage via the command line.

"Non-big data" load in Pandas

Creating a Notebook

Parquet files show the advantage of the included schema and are especially useful for analytical workloads. However, for testing purposes one often just wants to load a whole file into a Pandas dataframe to check if everything is fine. Let's do this now.

Open "AI Platform" → "Notebooks" and create a new instance (e.g. "pk-jupyter").

Open the JupyterLab and create a new Python3 notebook.

Loading the plaintext sqoop output

Insert the following code in the first cell:

import pandas as pd
df_plain = pd.read_csv("gs://pk-gcs/sales_from_sqoop/part-m-00000", header=None)
print(df_plain.dtypes)
df_plain.head(3)

This will load the plaintext output of sqoop and show the first 3 rows. Please notice that we don't have any schema information (all non-numeric columns are of type "object").

Loading the Parquet sqoop output

First, you need to find out the filename of your Parquet file in cloud storage:

Copy the filename and Insert the following code into another cell (replacing FILENAME):

df_parquet = pd.read_parquet("gs://pk-gcs/sales_from_sqoop_parquet/FILENAME.parquet")
df_parquet.head(3)

The Parquet file "shipped" the schema (however, unfortunately the date column is not represented correctly but rather in a timestamp "unix epoch" format):

We can convert this column to a date value using the following command:

df_parquet['sales_date'] = pd.to_datetime(df.sales_date, unit='ms')

This way we'll have the desired column types:

"Big data" load in PySpark

Opening a PySpark session

Although we have not learned much about Spark yet, we want to take a quick glance at how a Parquet file can be loaded into our cluster's worker nodes for distributed (big data) processing.

In the shell you used for executing the sqoop jobs (i.e. the cluster master node), please enter the following command to start an interactive PySpark session:

pyspark

After few seconds, you should see the interactive PySpark shell:

Directing the cluster towards the Parquet files

With the following command, we can tell the spark cluster that there is a Parquet file in our GCS (or also HDFS) and inspect the schema:

df = spark.read.load("gs://pk-gcs/sales_from_sqoop_parquet/*.parquet")
df

This should be the output:

With the following command, we can inspect the first three rows:

df.head(3)

This should be the output:

Please close the interactive PySpark shell with this command:

exit()

This should be the output:

Results

  1. You loaded Parquet files from cloud storage into the memory of a JupyterLab (AI Platform) virtual machine using Pandas.
  2. You loaded Parquet files from cloud storage into a Spark cluster using the interactive PySpark shell on the DataProc cluster's master node.

Please make sure to delete your cluster (in the live-lecture, please leave your cluster running):

You should also delete or at least shut down your cloud SQL instance (in the live-lecture, please leave your SQL server running!):

Furthermore, please delete the notebook instance (in the live-lecture, please leave that running!):

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in popular big data formats (Avro and Parquet). You also got a first impression of how to access these file formats from Pandas and PySpark (which you'll get to know in more detail later).