In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).
Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.
For this use case, we need to enable a cloud API that allows the simple connection of different services to cloud SQL databases in GCP. Please enter "cloud sql admin api" in the search bar:
Please enable this API! We will use it to establish connections from our DataProc cluster's worker nodes (running the sqoop MapReduce job) to the cloud SQL database (webshop).
Please navigate to your cloud SQL instance which needs to be running and copy the connection name into the clipboard (overview page):
In my case it is:
pk-bigdata:us-central1:pk-sql
Please navigate to the DataProc service in the cloud console.
This time we will NOT (!) create the cluster via the web interface but via the cloud shell since we need some extra options that are not available in the web UI.
Open a cloud shell:
Make sure that your project is set correctly. If not, execute the following command with your initials:
gcloud config set project pk-bigdata
We will now create a cluster from the command line in order to:
Please execute this command in the cloud shell (you may want to replace pk with your initials, make sure that all paths and the connection name to the SQL database copied before are correctly set):
gcloud dataproc clusters create pk-sqoop --region us-central1 --optional-components JUPYTER --enable-component-gateway --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/cloud-sql-proxy/cloud-sql-proxy.sh,gs://goog-dataproc-initialization-actions-us-central1/sqoop/sqoop.sh --metadata "hive-metastore-instance=pk-bigdata:us-central1:pk-sql" --scopes sql-admin --properties=hive:hive.metastore.warehouse.dir=gs://pk-gcs/hive-warehouse
Cluster creation will take 2-3 minutes.
When cluster creation is finished, you can select the cluster in the web UI:
Open a connection to the master node which is able to start sqoop jobs:
Let's now start the sqoop job with a further parameter "--as-parquetfile":
sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --target-dir sales_from_sqoop_parquet --as-parquetfile
The sqoop job puts its data into HDFS by default (it is a "traditional" tool). You can check this by executing the following command on the master node:
hadoop fs -ls
This will show two folders if you executed the previous lab and did not delete the HDFS folder ("sales" and "sales_from_sqoop_parquet").
We can now compare the file sizes of both directories :
hadoop fs -ls sales/
hadoop fs -ls sales_from_sqoop_parquet/
You'll notice that the Parquet file is larger in size. This is due to the schema which is "shipped" with this file. With larger databases being ingested into the datalake, the compression becomes visible and Parquet files show strongly decreased file sizes.
Let's take a quick look at the Parquet file:
hadoop fs -cat sales_from_sqoop_parquet/*.parquet
You'll notice the compression (we can't read the file contents) but especially the schema which is placed within the file and derived from the RDBMS schema:
Let's now start the sqoop job with a further parameter "--as-avrodatafile":
sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --target-dir sales_from_sqoop_avro --as-avrodatafile
The sqoop job puts its data into HDFS by default (it is a "traditional" tool). You can check this by executing the following command on the master node:
hadoop fs -ls
This will show three folders if you executed the previous lab and did not delete the HDFS folder ("sales", "sales_from_sqoop_parquet", and "sales_from_sqoop_avro").
Instead of comparing the files again, we will now move them to GCS.
Since we don't want to store data in HDFS over a long period of time (in order to be able to delete the DataProc cluster), we'll now move the sqoop results to GCS. We can achieve this by using the following commands:
hadoop fs -cp sales_from_sqoop_avro gs://pk-gcs/
hadoop fs -cp sales_from_sqoop_parquet gs://pk-gcs/
Your bucket should now contain all three directories with our sqoop export data in them:
Parquet files show the advantage of the included schema and are especially useful for analytical workloads. However, for testing purposes one often just wants to load a whole file into a Pandas dataframe to check if everything is fine. Let's do this now.
Open "AI Platform" → "Notebooks" and create a new instance (e.g. "pk-jupyter").
Open the JupyterLab and create a new Python3 notebook.
Insert the following code in the first cell:
import pandas as pd
df_plain = pd.read_csv("gs://pk-gcs/sales_from_sqoop/part-m-00000", header=None)
print(df_plain.dtypes)
df_plain.head(3)
This will load the plaintext output of sqoop and show the first 3 rows. Please notice that we don't have any schema information (all non-numeric columns are of type "object").
First, you need to find out the filename of your Parquet file in cloud storage:
Copy the filename and Insert the following code into another cell (replacing FILENAME):
df_parquet = pd.read_parquet("gs://pk-gcs/sales_from_sqoop_parquet/FILENAME.parquet")
df_parquet.head(3)
The Parquet file "shipped" the schema (however, unfortunately the date column is not represented correctly but rather in a timestamp "unix epoch" format):
We can convert this column to a date value using the following command:
df_parquet['sales_date'] = pd.to_datetime(df.sales_date, unit='ms')
This way we'll have the desired column types:
Although we have not learned much about Spark yet, we want to take a quick glance at how a Parquet file can be loaded into our cluster's worker nodes for distributed (big data) processing.
In the shell you used for executing the sqoop jobs (i.e. the cluster master node), please enter the following command to start an interactive PySpark session:
pyspark
After few seconds, you should see the interactive PySpark shell:
With the following command, we can tell the spark cluster that there is a Parquet file in our GCS (or also HDFS) and inspect the schema:
df = spark.read.load("gs://pk-gcs/sales_from_sqoop_parquet/*.parquet")
df
This should be the output:
With the following command, we can inspect the first three rows:
df.head(3)
This should be the output:
Please close the interactive PySpark shell with this command:
exit()
This should be the output:
Please make sure to delete your cluster (in the live-lecture, please leave your cluster running):
You should also delete or at least shut down your cloud SQL instance (in the live-lecture, please leave your SQL server running!):
Furthermore, please delete the notebook instance (in the live-lecture, please leave that running!):
Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in popular big data formats (Avro and Parquet). You also got a first impression of how to access these file formats from Pandas and PySpark (which you'll get to know in more detail later).