In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job.

What you'll implement

Enabling the SQL cloud admin API (for the cloud SQL proxy)

For this use case, we need to enable a cloud API that allows the simple connection of different services to cloud SQL databases in GCP. Please enter "cloud sql admin api" in the search bar:

Please enable this API! We will use it to establish connections from our DataProc cluster's worker nodes (running the sqoop MapReduce job) to the cloud SQL database (webshop).

Retrieving the cloud SQL connection name

Please navigate to your cloud SQL instance which needs to be running and copy the connection name into the clipboard (overview page):

In my case it is:

pk-bigdata:us-central1:pk-sql

Creating the DataProc cluster with initialization actions and further settings

Opening DataProc

Please navigate to the DataProc service in the cloud console.

This time we will NOT (!) create the cluster via the web interface but via the cloud shell since we need some extra options that are not available in the web UI.

Opening the cloud shell and creating the cluster

Open a cloud shell:

Make sure that your project is set correctly. If not, execute the following command with your initials:

gcloud config set project pk-bigdata

We will now create a cluster from the command line in order to:

Please execute this command in the cloud shell (you may want to replace pk with your initials, make sure that all paths and the connection name to the SQL database copied before are correctly set):

gcloud dataproc clusters create pk-sqoop --region us-central1  --optional-components JUPYTER --enable-component-gateway --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/cloud-sql-proxy/cloud-sql-proxy.sh,gs://goog-dataproc-initialization-actions-us-central1/sqoop/sqoop.sh --metadata "hive-metastore-instance=pk-bigdata:us-central1:pk-sql-test" --scopes sql-admin --properties=hive:hive.metastore.warehouse.dir=gs://pk-gcs/hive-warehouse --master-machine-type n1-standard-2 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50

Cluster creation will take 2-3 minutes.

Connecting to the Hadoop master node

When cluster creation is finished, you can select the cluster in the web UI:

Open a connection to the master node which is able to start sqoop jobs:

Results

  1. You created a Hadoop cluster (DataProc) which provides functionality for accessing cloud SQL (initialization action "cloud-sql-proxy") as well as the sqoop component for batch ingestion of relational data into our data lake.
  2. You learned how to set up a DataProc cluster via the cloud shell.
  3. Next, we'll use the connection to the master node and execute our sqoop data ingestion job.

Executing the job (with HDFS import)

Please open the SSH connection to the master node:

The following command starts the sqoop job (the "--m 1" option is necessary since there is no primary key defined on our table).

sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1

You now should see a lot of output since a MapReduce (horizontally scaling!) job is running and retrieving the data from the relational database.

When finished, the output should look like this:

We ingested 30 rows into our data lake.

Checking the results

The sqoop job puts its data into HDFS by default (it is a "traditional" tool). You can check this by executing the following command on the master node:

hadoop fs -ls

This will show one folder "sales". We can take a look into this folder with this command:

hadoop fs -ls sales/

With this command you'll be able to take a look at the data being extracted from the RDBMS:

hadoop fs -cat sales/part-m-00000

This should be the output:

Executing the job (with GCS import)

In order to import the relational data into our GCS-based data lake, we need to set the target directory explicitly to avoid HDFS. This command will ingest the input from our database into a GCS bucket (pk-gcs) and therein a folder "sales_from_sqoop" (make sure to change your initials if applicable):

sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --target-dir gs://pk-gcs/sales_from_sqoop

This should yield the same shell output, however your GCS bucket should now contain folder "sales_from_sqoop":

The folder contains the same files as the HDFS folder sqoop created in our first job run.

We could set an autoscaling policy to let our DataProc cluster scale-up if necessary:

This is, however, not part of our lecture.

Results

  1. You executed a sqoop (MapReduce) job with different configurations (HDFS output first and GCS output second).
  2. This setup is big data ready for heavy workloads, since MapReduce is capable of scaling out (horizontally) and if necessary, you can add worker nodes to your cluster (however, the SQL database might become a bottleneck).

Please make sure to delete your cluster (in the live-lecture, please leave your cluster running):

You should also delete or at least shut down your cloud SQL instance (in the live-lecture, please leave your SQL server running!):

You may also want to clean up your GCS bucket. However, especially in the live lecture please leave the files in place.

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop. You learned about initialization actions which come in handy when working with DataProc.