This is again our use case: a simple webshop for which we want to calculate the average sales value per product as well as train a simple machine learning model in a scalable (big data) manner using Dask.

Goal

Using data in the data lake for batch processing from a Jupyter Notebook, this time however relying on Dask.

What you'll implement

Cluster Creation

Please navigate to Kubernetes:

Enable the API if necessary. We will not deploy our kubernetes cluster using the console WebUI via "create cluster", but the cloud shell. First, please open a cloud shell:

Make sure to have the right project set in cloud shell by executing this command first:

gcloud config set project pk-bigdata

Please copy and paste the following command into the cloud shell. This command creates a kubernetes cluster with specific settings, e.g. regarding virtual machine type or the cluster zone. You may want to change the cluster name (pk-k8s) to "your initials"-k8s (k8s is an abbreviation for kubernetes).

gcloud container clusters create \
  --machine-type n1-standard-4 \
  --num-nodes 2 \
  --zone us-central1-a \
  --cluster-version latest \
  pk-k8s

You can check this in the cluster overview when the cluster is created:

Results

  1. You created your first Kubernetes cluster via the cloud shell. The Kubernetes platform allows you to deploy modern containerized applications. We won't go into detail on Kubernetes but rather use a Kubernetes management tool (helm) to easily add applications which we'll install in the next step.

Provide account permissions to your account

Execute the following command (copy & paste) in the cloud shell. Change <GOOGLE-EMAIL-ACCOUNT> to your mail account you are logged in with in the GCP cloud console.

kubectl create clusterrolebinding cluster-admin-binding \
    --clusterrole=cluster-admin \
    --user=<GOOGLE-EMAIL-ACCOUNT>

This should be the shell output:

Install Helm in the Cloud Shell

Install helm in the cloud shell as follows:

curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash

This should be the output:

Configure Helm on the Kubernetes Cluster

Please execute the following three commands (copy & paste) such that we can use helm on our cluster:

kubectl --namespace kube-system create serviceaccount tiller

kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller

helm init --service-account tiller --history-max 100 --wait

This should be the output

Finalize Helm Configuration

We (unfortunately) need to add a security patch (in the current version). Please execute the following command:

kubectl patch deployment tiller-deploy \
--namespace=kube-system --type=json \
--patch='[{"op": "add", "path": "/spec/template/spec/containers/0/command", "value": ["/tiller", "--listen=localhost:44134"]}]'

Wait a few seconds and execute the following command:

helm version

This should show the following output indicating that the helm client and server are running:

Results

  1. You added a kubernetes management system called helm to your cluster.
  2. Now we have the fundamental setup that allows you to install various applications via helm (e.g. a scalable web server, database systems, but also big data systems like Dask).

Add and Update Dask Chart to Helm

helm repo add dask https://helm.dask.org/
helm repo update

Configure Dask Scheduler and Worker Nodes

Background

We could already deploy a dask cluster with default parameters. However, in order to be able to configure the cluster in further detail (e.g. number of nodes, additional Python packages we want to use, etc.), we will now learn how to overwrite the default configuration.

Downloading the sample configuration

The configuration can be overridden by providing a file "values.yaml". You can download a suggested configuration file in the cloud shell with the following command:

git clone https://github.com/pkuep/pk-bigdata-code.git

Checking the configuration file

After cloning the GitHub repository, you should be able to access the respective directory and take a look at the configuration file:

cd pk-bigdata-code
ls

This should be the output

Let's take a look at the file with the editor. Click on "Open Editor", browse into the directory pk-bigdata-code and select the file "values.yaml":

The most important part for us is the worker configuration:

Here you'll be able to change your cluster setup when you need more performance for your big data computations (e.g. by adding CPUs or worker nodes).

Please switch back to the cloud shell:

Make sure that you stay in the directory "pk-bigdata-code", i.e. where the file values.yaml is located.

Deploy Dask to the Cluster

Next, we want to finalize our preparations and deploy the dask cluster, i.e. a dask master (scheduler), a Jupyter Notebook, as well as the workers as configured in the values.yaml file. You may want to rename pk-dask to "your initials"-dask in the following code snippet which is to be executed in the cloud shell:

helm install --name pk-dask dask/dask --version 4.4.2 \
--set scheduler.serviceType=LoadBalancer \
--set jupyter.serviceType=LoadBalancer \
--values values.yaml

The output should look like this:

Results

  1. You created a customized dask configuration.
  2. You deployed dask with this configuration to your running kubernetes cluster.
  3. You are now ready to access the web interfaces (when your dask deployment is finished, see next task).

Checking the External Load Balancers for Web Access

The deployment of dask will take few minutes. Take a look at the "Services & ingress" page and wait until the endpoints are filled with IP addresses and the Pods column shows "1/1" per row:

Dask Status Page

First, we'll take a look at the dask status page. Click on the IP address in the row "dask-pk-scheduler" with port 80 (see screenshot above). This will open the cluster overview page and should look like this:

We'll come back to this page later to see our cluster working.

Jupyter Notebook

Next, open the URL to the Jupyter Notebooks page (row dask-pk-scheduler). You should be prompted to enter a password which is "dask":

Aftwards, you should see a Jupyter Lab page like this:

Results

  1. You opened the web interfaces that we need for interacting with and monitoring of the dask cluster.

Reading Data from GCS into a Dask DataFrame

First, open a new Jupyter Notebook:

Call this notebook "Dask Batch Processing" and save it.

Package Imports

Next, import the following two packages (copy & paste into the first cell):

import gcsfs
import dask.dataframe as dd

Connecting the Notebook to the Dask Cluster

As another setup step we'll need to connect our notebook to the dask scheduler as an entry point to the cluster:

from dask.distributed import Client, progress
c = Client()
c

This should be the output (we'll ignore the warning):

DataFrame Definition

Now it's time to inform your workers about the location of the source data you want to analyze:

ddf = dd.read_csv('gcs://pk-gcs/webshop_datalake/webshop_history.csv')
ddf

Analytics on a Dask DataFrame

Working with dask usually feels like working with pandas. The only difference is the lazy evaluation. The following command calculates the mean sales value per region, just like in pandas; however, to execute this command and let all your workers concurrently work on it, you'll need to add for example "compute()":

ddf.groupby("region")['sales_value'].mean().compute() # compute triggers the workers to start computation

Your notebook should look like this now:

Checking the Dask Dashboard

When switching over to the dask dashboard, you'll notice that the cluster worked. In the dashboard, we can observe the parallel computation (in this case it's not too heavily parallelized; your output may vary depending on the cluster configuration):

Machine Learning with Dask

Dask provides some implementations of popular machine learning algorithms. Let's execute a very simple training of a model which is able to predict the sales_values based on the age of a customer. Copy & paste this to a new cell:

import dask_lightgbm.core as dlgbm
reg = dlgbm.LGBMRegressor()

X = ddf[['age']]
y = ddf['sales_value']
reg.fit(X, y)

After executing the model training, you can check your worker's progress (your output may vary):

Afterwards, you can use the model for predictions:

import pandas as pd
pred_df = pd.DataFrame([[10],[20],[30],[40],[50],[60]])
pred_ddf = dd.from_pandas(pred_df, npartitions=1)
reg.predict(pred_ddf).compute()

Results

  1. You successfully performed Python-based big data analytics with a dask cluster. Congratulations!
  2. In a next step, we'll vary the cluster configuration slightly.

Now we want to add a further worker node and observe the cluster's performance. Open the file "values.yaml" in the cloud shell editor (see above) and change the number of workers from two to three (please note: more workers are not possible with our current cluster configuration).

Go back to the terminal and enter the following command to update the kubernetes cluster, i.e. add another worker:

helm upgrade pk-dask dask/dask -f values.yaml

You should see output similar to this:

You can check if your additional worker is ready by clicking onto the "workloads" page in the cloud console:

When all three workers are running (called Pods by kubernetes), you should see this output:

When clicking on "Workers" in the dask dashboard, you should see that one further worker is available for computation:

This example shows the advantages of kubernetes: we can manipulate our infrastructure very easily and container management (i.e. spinning up a new worker for example) is done completely in the background.

We could do a short experiment and compare the calculation time of different configurations:

One worker

Two workers

Results

  1. You are now able to change cluster configuration "on the fly" and make use of kubernetes features in the context of big data processing with dask.
  2. You are now finished with this lab. Please make sure to shutdown your cluster since it is pretty costly.

Please make sure to shutdown your cluster due to high costs of our setup.

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you used fairly complex technologies and a sophisticated setup (kubernetes with helm and dask) to deploy a state-of-the-art Python big data processing environment which you accessed via a Jupyter notebook being created and edited from the Jupyter Lab environment.

Dask is one of the rapidly growing frameworks for big data processing and thus pretty interesting for big data architectures and data engineering.