This is again our use case: a simple webshop for which we want to calculate the average sales value per product as well as train a simple machine learning model in a scalable (big data) manner using Dask.
Using data in the data lake for batch processing from a Jupyter Notebook, this time however relying on Dask.
This time, we will be using the cloud shell to create our cluster. Nevertheless, please navigate to DataProc already:
Run the following command in the cloud shell (but first, please change cluster-pk to your initials, e.g. in an editor):
gcloud dataproc clusters create pk-dask-cluster --region us-central1 --master-machine-type n1-standard-2 --master-boot-disk-size 50 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50 --initialization-actions gs://peer-bigdata-material/dask.sh --metadata dask-runtime=yarn --optional-components JUPYTER --enable-component-gateway
The cluster is now created via the CLI (command line interface) to GCP and not the web console. The initialization action will take care of installing Dask on the master and worker nodes (see https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/dask for further information).
Setup
First, open the DataProc cluster's web interface and create a new "pure Python" Jupyter Notebook.
Call this notebook "Dask Batch Processing" and save it.
Next, import the following two packages (copy & paste into the first cell):
import gcsfs
import dask.dataframe as dd
As another setup step we'll need to connect our notebook to the dask scheduler as an entry point to the cluster. In our case, we will use a YARN-based cluster:
from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.array as da
import numpy as np
cluster = YarnCluster()
client = Client(cluster)
You can configure your Dask cluster by just setting the cluster as a cell output which allows you to configure the number of worker nodes for example:
cluster
As a default, the cluster will use two workers with one core of each worker (you can ignore the warnings!):
Under "Manual Scaling", you can edit the number of workers (you'll have three workers available in the default configuration since the master is also treated as a worker in Dask):
Now it's time to inform your workers about the location of the source data you want to analyze:
ddf = dd.read_csv('gcs://pk-gcs/webshop_datalake/webshop_history.csv')
ddf
This should be the output:
Working with dask usually feels like working with pandas. The only difference is the lazy evaluation. The following command calculates the mean sales value per region, just like in pandas; however, to execute this command and let all your workers concurrently work on it, you'll need to add for example "compute()":
ddf.groupby("region")['sales_value'].mean().compute() # compute triggers the workers to start computation
This should be the output
We will now use Dask in a slightly different way by relying on its "standalone" scheduler, i.e. the dask master and worker connections and resource allocation will not be handled by YARN but by Dask internally. This allows us to use the insightful Dask Dashboard. Furthermore, we will install some machine learning packages on our master/worker nodes in order to be able to train complex models.
Please delete the currently running cluster now.
Run the following command in the cloud shell (but first, please change cluster-pk to your initials, e.g. in an editor):
gcloud dataproc clusters create pk-dask-cluster2 --region us-central1 --master-machine-type n1-standard-2 --master-boot-disk-size 50 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50 --initialization-actions gs://peer-bigdata-material/dask.sh --metadata dask-runtime=standalone --optional-components JUPYTER --enable-component-gateway
In this case, we use a slightly adapted version of the initialization action we used before.
The "standalone" version of the dask scheduler provides a dashboard that shows the distributed computations of our cluster nodes. However, we cannot access this dashboard directly, since it is protected by GCP's / your project's firewall. Thus, we'll now establish a SSH connection from our cloud shell to the DataProc master node, set up port forwarding and connect to the web interface via this secure connection.
Please open the cloud shell. The following command will establish the SSH tunnel from this machine to the cluster's master node and forward the relevant port (8787) via this connection.
gcloud compute ssh pk-dask-cluster2-m --zone us-central1-a -- -NL 8787:localhost:8787 -4
This should result in a "blocking" shell:
Now that our cloud shell is forwarding the relevant port (8787) from its virtual machine to our master node, we can make use of the "web preview" feature. First, you'll need to set the port from 8080 to 8787:
Afterwards, select "Preview on port 8787":
You should see a new tab with the Dask dashboard (which is empty so far)0:
Please detach the tab and go back to DataProc overview and open a Jupyter Lab.
Create a new notebook "machine_learning_with_dask".
The following code will import all necessary packages and create a connection to the dask master node such that we can start distributed computations:
import gcsfs
import dask.dataframe as dd
from dask.distributed import Client
import dask.array as da
import numpy as np
client = Client("localhost:8786")
ddf = dd.read_csv('gcs://pk-gcs/webshop_datalake/webshop_history.csv')
Dask provides some implementations of popular machine learning algorithms. Let's execute a very simple training of an xgboost model which is able to predict the sales_values based on the age of a customer. Copy & paste this to a new cell:
from dask_ml.xgboost import XGBRegressor
reg = XGBRegressor()
X = ddf[['age']]
y = ddf['sales_value']
reg.fit(X, y)
Please note: we are not using the newly recommended way of accessing xgboost from dask; our architecture does not allow to install the most recent xgboost version which is why we rely on the "old way of xgboost/dask training").
After executing the cell, you can check your worker's progress in the dask dashboard (your output may vary):
Afterwards, you can use the model for predictions:
import pandas as pd
pred_df = pd.DataFrame([[10],[20],[30],[40],[50],[60]], columns=['age'])
pred_ddf = dd.from_pandas(pred_df, npartitions=1)
reg.predict(pred_ddf).compute()
We can show the possibilities of parallelization better when using a larger dataset than our simple one. The following code block contains random sampling of some training data that can be used for training:
num_obs = 1e5
num_features = 10
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))
reg = XGBRegressor()
reg.fit(X, y)
Take a look at the dask dashboard. We should now see parallelization of the training process (in this case three workers):
Save your notebook such that you can access it after cluster deletion.
Now we want to add two further worker nodes (parameter "--num-workers 4") and observe the cluster's performance. Please delete your cluster and generate a new one using the following command (please adapt your initials!):
gcloud dataproc clusters create pk-dask-cluster --region us-central1 --master-machine-type n1-standard-2 --worker-machine-type n1-standard-2 --image-version preview --initialization-actions gs://peer-bigdata-material/dask.sh --metadata dask-runtime=standalone --optional-components JUPYTER --enable-component-gateway --num-workers 4 --worker-boot-disk-size 50 --master-boot-disk-size 50
Go to JupyterLab and open your saved notebook "machine_learning_with_dask".
Again, create a SSH connection via the cloud shell to the master node.
gcloud compute ssh pk-dask-cluster-m --zone us-central1-a -- -NL 8787:localhost:8787 -4
Open the dask dashboard via web preview.
Please make sure to shutdown your cluster due to high costs of our setup.
Congratulations, you used fairly complex technologies and a sophisticated setup to deploy a state-of-the-art Python big data processing environment which you accessed via a Jupyter notebook being created and edited from the Jupyter Lab environment.
Dask is one of the most rapidly growing frameworks for big data processing and thus pretty interesting for big data architectures and data engineering.