This is again our use case: a simple webshop for which we want to calculate the average sales value per product in a scalable (big data) manner.

Goal

Using data in the data lake for batch processing from a Jupyter Notebook.

What you'll implement

Cluster Creation

Please navigate to DataProc, which is GCP's Hadoop product:

Hit "create cluster":

If applicable, select "Cluster on Compute Engine":

You need to use the following parameters:

Important: Scroll down and "Jupyter Notebook" in the section "Components":

Next, select "Configure Nodes (optional)" and switch to a smaller machine type for both, the "Manager node" as well as the "Worker nodes".

You will first need to change the primary disk type to Standard Persistent Disk for the master and the workers:

Please also reduce the size of the attached hard disks for both node types:

Set a bucket in which the cluster is able to store its data to be persisted (please use your own bucket):

Please also allow access to the whole project for the cluster as follows:

Leave the remaining fields with their default val1ues and click on "Create".

Results

  1. You configured and started a Hadoop cluster which is ready for our first batch processing job.

Opening the Jupyter Notebook

Click onto the link to your cluster:

When clicking on "VM instances" one can see that the three virtual machines are running. We will later use this to connect to the master node.

For now, we only want to use the web interface of the Jupyter notebook. Click on "Web Interfaces" and "Jupyter":

You should see a Jupyter Notebook environment:

Let's create a notebook in Cloud Storage, i.e. navigate to GCS and click "New" → "PySpark":

Connecting to the PySpark session and reading the data lake

Although we don't know yet what Spark or PySpark is in general, we can use it quickly with our Python knowledge. The "entry point" to our big data cluster in PySpark is the so-called SparkContext. You can check if everything is fine and accessible by just entering "sc" in one cell. It may take some seconds until the cell shows the output.

sc

This should be the output:

Renaming the Notebook

You may also now want to rename the notebook (click on "Untitled" and change the name to, e.g., "Batch Processing 1 (Fundamentals)").

Sample Batch Task (this time calculated in "big data mode" by all nodes in the cluster)

A batch job could be to read all csv-files in the webshop_datalake (we only have one file currently stored there) and calculate the average sales value per product. Note that we assume that all files in the webshop_datalake have the same structure.

Connecting to the data lake

Let's start by defining the connection to the data lake for PySpark. Put the following code in a cell and execute it (no output is expected):

# provide spark the link to the webshop history (datalake)
df = spark.read.csv("gs://hdm-kueppers/webshop_history.csv", inferSchema=True, header=True)

Batch Processing

Next, let's define our batch processing by making use of one of Spark's great features: querying big data in a SQL manner. The cell should have a Pandas DataFrame as output.

# create a temporary view on the data that we can simply query by SQL
df.createOrReplaceTempView("sales")

# this is our batch processing - calculate the mean sales_value per product_name
df_batch_result = spark.sql("SELECT product_name, mean(sales_value) FROM sales GROUP BY product_name")

# show results (interactive mode)
df_batch_result.toPandas()

Your notebook should now look like this:

Creating the Batch View

Finally, we don't want to stay in interactive mode, but write the batch processing results back to the cloud storage.

# write results back to cloud GCS
df_batch_result.write.csv("gs://hdm-kueppers/webshop_batch_results/average_sales_per_product")

There is no output expected. However, in your bucket, you should see a folder "average_sales_per_product" containing multiple files which hold our batch processing results. We will later discuss why there is not only one results-file (do you already have an idea? Think about parallel processing.). This should be your bucket's content:

Please note: You might see less than three part-files.

Examining the result (downloading one file)

When downloading and opening one file (with more than 0 B size!), you'll see that it contains a partial result:

Thus, batch processing may require a further step to consolidate the results into a "batch view", e.g. by generating a BigQuery table. We will skip this step, which is however simply possible.

Results

  1. You used a Jupyter Notebook as an interactive big data tool for batch processing.
  2. The code you ran is big data-ready and scalable. If you need more performance, just add more worker nodes in cluster creation and you are ready to process terabytes of data in a batch manner.
  3. We can now shut down our cluster in order to save money (see next step).

Important: In case you want to continue with the next lab, you can omit this step and delete the cluster later.

Reasoning

Within the cloud environment you pay by use. Since the DataProc Cluster costs ~3$ per day, we now want to tear it down. Unfortunately, DataProc does not allow "stopping" the cluster (like cloud SQL did). Thus, we'll need to delete the cluster to avoid costs.

Saving and closing the notebooks

Save your Jupyter Notebook.

Shutting down the DataProc Cluster

Go to the cluster details page and hit "Delete":

Congratulations, you completed another lab in our big data journey. This lab was the first one in which you got in touch with big data tools and analytics. We will understand the applied technology in detail throughout this course.