In this use case we want to store the contents of our webshop table in the data lake (nightly full extract) using data fusion.

Goal

Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable solution underlying Cloud Data FUsion.

What you'll implement

Enabling the relevant APIs

Make sure to have enabled the "Cloud Data Fusion API", the "Cloud Dataproc API", and the "Cloud Storage API" (all should be enabled if you followed the previous labs). You can find API settings by searching for the respective API name:

Instance generation

Please open "Data Fusion" in the cloud console:

Create a "Basic" instance and call it, e.g., <your initials>-datafusion:

Accessing the instance

When the instance is ready, please click on "View Instance" to open the Data Fusion web UI:

The UI should now look like this:

Before proceeding in Data Fusion, please execute the following steps.

Enabling cloud SQL access

Please open another browser tab and load the GCP console. Open "IAM & Admin → IAM".

Enable the presentation of all roles, including automatically generated ones:

Find the Data Fusion user. This is the user which Data Fusion utilizes when accessing project resources like GCS or our cloud SQL database. Please select "edit":

We want to provide this account the relevant rights in order to be able to read data from, e.g., cloud SQL and write data to, e.g., GCS. The most simple way is to grant this account "Editor" rights which allows access to all resources in the project. However, in an operational setting you should provide more fine-granular rights. Click on "Add another role":

Select the "Project Editor" role:

Results

  1. You created a cloud data fusion instance and are now ready to set up ingestion pipelines in the graphical pipeline designer (Studio).

Adding the Cloud SQL Driver and Plugin

Data Fusion provides a lot of connectors and many functions beyond simple data ingestion. For our purposes in this lab, we will only use "Source" and "Sink" elements which you'll find in the menu on the left hand side. Unfortunately, Data Fusion does not ship the cloud SQL driver per default. Thus, we'll open the Data Fusion "Hub" and add this connector to our instance:

Searching for "CloudSQL" shows all available connectors for cloud SQL instances:

Please select "CloudSQL MySQL JDBC Driver", since we selected MySQL as our cloud SQL server RDBMS. First, you need to download the driver file:

Click on "Download" and follow the instructions to get the driver file from GitHub. When finished with this step, click on Deploy:

Drag and drop your downloaded file:

Click "Next". You can leave the default values in the following screen:

In the Hub, please also add the "CloudSQL MySQL Plugins" by clicking on "Deploy". This will add the Cloud MySQL source to the Data Fusion Studio:

Just click on finish in the next screen:

Please install the MySQL JDBC Driver in the Hub by downloading and deploying (the file "mysql-connector-java-XYZ.jar" within the zip file) it to your instance:

Please click on the "burger menu" and select "Studio" to open the graphical pipeline builder of Data Fusion:

Adding the Cloud SQL Source to the Ingestion Pipeline

Creating a Pipeline

After opening the "Studio", you should see an empty pipeline builder. First, please name the pipeline, e.g. "webshop_sales_to_gcs":

Next, please add a "Source" of type "Database":

Hover over the "Database" stage and select properties. Set the following properties:

The Lablel could be "Webshop-Sales-Source" and the reference "webshop_sales_source".

The connection string should be the following (replace pk with your initials):

jdbc:mysql://google/webshop?cloudSqlInstance=pk-bigdata:us-central1:pk-sql&socketFactory=com.google.cloud.sql.mysql.SocketFactory&useSSL=False

The import query should be the following in order to select all rows from the sales table:

select * from sales

Don't forget to scroll down and set the right credentials (in our case username is root without password):

Hit "Get schema" located under the connection string. On the right side, you should see the columns of our table:

Hit "Validate" on the top right and if no errors were found, you can close the properties using the "X".

Results

  1. You added a driver for your cloud SQL server (MySQL-based) to your data fusion instance.
  2. You added the cloud SQL plugin such that it can be used in the data fusion "Studio", i.e. the pipeline builder. (please note: due to errors we are using the "traditional" database stage)
  3. You created a source-object in the pipeline builder and configured the connection.

Add a GCS sink

We want to store our data in the GCS-based data lake. Thus, we'll add a sink "GCS" and connect the source with the sink:

Hover over the GCS-box and click on "Properties". Set the following properties:

The reference name could be

webshop_sales_ingest

The cloud storage path could be (change to your initials)

gs://pk-gcs/sales_from_datafusion

We want to use the Parquet format since our data lake should be able to serve analytical workloads.

Hit "Validate", make sure that your configuration doesn't show any errors, and close the window.

Results

  1. You added a sink (a folder in a cloud storage bucket) to the pipeline, configured it, and connected it with the source.

Loading preview data

Next, we want to take a quick look at the data and validate that we are connecting the right source. Click on "Preview":

Next, we need to run the pipeline once (in preview mode):

This will take some seconds. When hovering of any of the stages and selecting "Preview Data" you should see our source data:

Results

  1. The ingestion pipeline is now complete and ready to be deployed.

Export and save pipeline

In order to be able to reuse our specified pipeline, please export it to your local filesystem:

Results

  1. You saved your pipeline as a json file on your local machine.

Save and Deploy the Pipeline

Please click "Save" and afterwards "Deploy" in the pipeline builder. The output should look like this (showing the pipeline in the lower part):

Please adapt the deployment options in order to avoid quota restrictions:

The master node also needs to be adapted:

Execute the Pipeline

Please click "Run". The pipeline is now executed using a DataProc Hadoop cluster (i.e. it is horizontally scalable and big data-ready!). Our ingestion logic is transformed to a Spark job. One can see that a cluster is being generated by opening the DataProc overview page in the cloud console:

The cluster is provisioned (i.e. set up) - in Data Fusion you can see the current status:

If everything is fine you should see the following output. In case of errors, you need to check the "Logs":

Checking the Data Lake

When checking GCS, you should see the respective Parquet files in a folder named after the job's execution time:

Scaling out

Please go back to the pipeline and hit the configure button. You can increase the performance of your cluster here:

Results

  1. You deployed a pipeline which is now production-ready and could be scheduled regularly.
  2. This pipeline is big data-ready, however you need to check the costs for (1) pipeline development and (2) execution costs of Dataproc. You know how to configure it (partly).

Please make sure to delete your data fusion instance in order to avoid high costs (in case of the live lecture please leave it running):

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a modern, cloud-based and horizontally scaling ingestion pipeline using GCP's data fusion with an automatically provisioned Hadoop cluster below (using Spark for the extraction / ingestion logic). Data Fusion is very powerful and would also allow for "data wrangling". We skipped this part here since we are still concerned with data ingestion.