This is again our use case: a simple webshop for which we want to calculate the average sales value per product as well as train a simple machine learning model in a scalable (big data) manner using Dask.
Using data in the data lake for batch processing from a Jupyter Notebook, this time however relying on Dask.
Please navigate to Kubernetes:
Enable the API if necessary. We will not deploy our kubernetes cluster using the console WebUI via "create cluster", but the cloud shell. First, please open a cloud shell:
Make sure to have the right project set in cloud shell by executing this command first:
gcloud config set project pk-bigdata
Please copy and paste the following command into the cloud shell. This command creates a kubernetes cluster with specific settings, e.g. regarding virtual machine type or the cluster zone. You may want to change the cluster name (pk-k8s) to "your initials"-k8s (k8s is an abbreviation for kubernetes).
gcloud container clusters create \
--machine-type n1-standard-4 \
--num-nodes 2 \
--zone us-central1-a \
--cluster-version latest \
pk-k8s
You can check this in the cluster overview when the cluster is created:
Execute the following command (copy & paste) in the cloud shell. Change <GOOGLE-EMAIL-ACCOUNT> to your mail account you are logged in with in the GCP cloud console.
kubectl create clusterrolebinding cluster-admin-binding \
--clusterrole=cluster-admin \
--user=<GOOGLE-EMAIL-ACCOUNT>
This should be the shell output:
Install helm in the cloud shell as follows:
curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
This should be the output:
Please execute the following three commands (copy & paste) such that we can use helm on our cluster:
kubectl --namespace kube-system create serviceaccount tiller
kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller --history-max 100 --wait
This should be the output
We (unfortunately) need to add a security patch (in the current version). Please execute the following command:
kubectl patch deployment tiller-deploy \
--namespace=kube-system --type=json \
--patch='[{"op": "add", "path": "/spec/template/spec/containers/0/command", "value": ["/tiller", "--listen=localhost:44134"]}]'
Wait a few seconds and execute the following command:
helm version
This should show the following output indicating that the helm client and server are running:
helm repo add dask https://helm.dask.org/
helm repo update
We could already deploy a dask cluster with default parameters. However, in order to be able to configure the cluster in further detail (e.g. number of nodes, additional Python packages we want to use, etc.), we will now learn how to overwrite the default configuration.
The configuration can be overridden by providing a file "values.yaml". You can download a suggested configuration file in the cloud shell with the following command:
git clone https://github.com/pkuep/pk-bigdata-code.git
After cloning the GitHub repository, you should be able to access the respective directory and take a look at the configuration file:
cd pk-bigdata-code
ls
This should be the output
Let's take a look at the file with the editor. Click on "Open Editor", browse into the directory pk-bigdata-code and select the file "values.yaml":
The most important part for us is the worker configuration:
Here you'll be able to change your cluster setup when you need more performance for your big data computations (e.g. by adding CPUs or worker nodes).
Please switch back to the cloud shell:
Make sure that you stay in the directory "pk-bigdata-code", i.e. where the file values.yaml is located.
Next, we want to finalize our preparations and deploy the dask cluster, i.e. a dask master (scheduler), a Jupyter Notebook, as well as the workers as configured in the values.yaml file. You may want to rename pk-dask to "your initials"-dask in the following code snippet which is to be executed in the cloud shell:
helm install --name pk-dask dask/dask --version 4.4.2 \
--set scheduler.serviceType=LoadBalancer \
--set jupyter.serviceType=LoadBalancer \
--values values.yaml
The output should look like this:
The deployment of dask will take few minutes. Take a look at the "Services & ingress" page and wait until the endpoints are filled with IP addresses and the Pods column shows "1/1" per row:
First, we'll take a look at the dask status page. Click on the IP address in the row "dask-pk-scheduler" with port 80 (see screenshot above). This will open the cluster overview page and should look like this:
We'll come back to this page later to see our cluster working.
Next, open the URL to the Jupyter Notebooks page (row dask-pk-scheduler). You should be prompted to enter a password which is "dask":
Aftwards, you should see a Jupyter Lab page like this:
First, open a new Jupyter Notebook:
Call this notebook "Dask Batch Processing" and save it.
Next, import the following two packages (copy & paste into the first cell):
import gcsfs
import dask.dataframe as dd
As another setup step we'll need to connect our notebook to the dask scheduler as an entry point to the cluster:
from dask.distributed import Client, progress
c = Client()
c
This should be the output (we'll ignore the warning):
Now it's time to inform your workers about the location of the source data you want to analyze:
ddf = dd.read_csv('gcs://pk-gcs/webshop_datalake/webshop_history.csv')
ddf
Working with dask usually feels like working with pandas. The only difference is the lazy evaluation. The following command calculates the mean sales value per region, just like in pandas; however, to execute this command and let all your workers concurrently work on it, you'll need to add for example "compute()":
ddf.groupby("region")['sales_value'].mean().compute() # compute triggers the workers to start computation
Your notebook should look like this now:
When switching over to the dask dashboard, you'll notice that the cluster worked. In the dashboard, we can observe the parallel computation (in this case it's not too heavily parallelized; your output may vary depending on the cluster configuration):
Dask provides some implementations of popular machine learning algorithms. Let's execute a very simple training of a model which is able to predict the sales_values based on the age of a customer. Copy & paste this to a new cell:
import dask_lightgbm.core as dlgbm
reg = dlgbm.LGBMRegressor()
X = ddf[['age']]
y = ddf['sales_value']
reg.fit(X, y)
After executing the model training, you can check your worker's progress (your output may vary):
Afterwards, you can use the model for predictions:
import pandas as pd
pred_df = pd.DataFrame([[10],[20],[30],[40],[50],[60]])
pred_ddf = dd.from_pandas(pred_df, npartitions=1)
reg.predict(pred_ddf).compute()
Now we want to add a further worker node and observe the cluster's performance. Open the file "values.yaml" in the cloud shell editor (see above) and change the number of workers from two to three (please note: more workers are not possible with our current cluster configuration).
Go back to the terminal and enter the following command to update the kubernetes cluster, i.e. add another worker:
helm upgrade pk-dask dask/dask -f values.yaml
You should see output similar to this:
You can check if your additional worker is ready by clicking onto the "workloads" page in the cloud console:
When all three workers are running (called Pods by kubernetes), you should see this output:
When clicking on "Workers" in the dask dashboard, you should see that one further worker is available for computation:
This example shows the advantages of kubernetes: we can manipulate our infrastructure very easily and container management (i.e. spinning up a new worker for example) is done completely in the background.
We could do a short experiment and compare the calculation time of different configurations:
One worker
Two workers
Please make sure to shutdown your cluster due to high costs of our setup.
Congratulations, you used fairly complex technologies and a sophisticated setup (kubernetes with helm and dask) to deploy a state-of-the-art Python big data processing environment which you accessed via a Jupyter notebook being created and edited from the Jupyter Lab environment.
Dask is one of the rapidly growing frameworks for big data processing and thus pretty interesting for big data architectures and data engineering.