In this simplified use case we want to start an interactive PySpark shell and perform the word count example.

Goal

Setup of a Dataproc cluster for further PySpark labs and execution of the map-reduce logic with spark..

What you'll implement

Preparation

Please make sure that the following settings are set (if not done yet):

  1. Please navigate to the "APIs" (API Gateway) page within the GCP web console and activate all recommended APIs (Management, Service Control etc.).
  2. Please navigate to "VPC network" and activate the recommended APIs (especially Networking but also the DNS API).
  3. Please navigate to "VPC network"
  1. Select the "default" VPC network by clicking on it.
  2. Navigate to "Subnets" and select the first one ("default" in "us-central1").
  3. Click on "Edit" and activate "Private Google Access"

Cluster Creation

Please navigate to DataProc, which is GCP's Hadoop product:

Hit "create cluster":

Select "Cluster on Compute Engine":

You may want to use the following parameters:

Enable the checkbox "Component gateway" and select the "Jupyter Notebook" component:

Leave the rest as default and click on "Create".

GCP now spins up three virtual machines which together form a so-called Hadoop cluster. One machine is the master (some kind of coordinating unit) and two machines are workers. If you need more "computational power", you'd just add further worker nodes. We will learn how to use this kind of parallel big data processing later. For now, we just want to get to the Jupyter Notebook and access the data lake - later we'll use the notebook to make big data computations with Hadoop (and Spark).

Results

  1. You configured and started your first big data system, a Hadoop cluster, called "DataProc" in GCP. DataProc is the basis for many big data ingestion, transformation, and processing tasks in GCP.

Opening the Jupyter Notebook

Click onto the link to your cluster:

When clicking on "VM instances" one can see that the three virtual machines are running. We will later use this to connect to the master node.

For now, we only want to use the web interface of the Jupyter notebook. Click on "Web Interfaces" and "Jupyter":

You should see a Jupyter Notebook environment:

Let's create a notebook in Cloud Storage, i.e. navigate to GCS and click "New" → "PySpark" (although we will not use the "big data tool" Spark now, let's try if this works):

Connecting to the PySpark session and reading the data lake

Although we don't know yet what Spark or PySpark is in general, we can use it quickly with our Python knowledge. The "entry point" to our big data cluster in PySpark is the so-called SparkContext. You can check if everything is fine and accessible by just entering "sc" in one cell. It may take some seconds until the cell shows the output.

sc

This should be the output:

Renaming the Notebook

You may also now want to rename the notebook (click on "Untitled" and change the name to, e.g., "BigData - Batch Processing - PySpark Wordcount").

Results

  1. You used a Jupyter Notebook as an interactive big data tool for batch processing.
  2. The code you ran is big data-ready and scalable. If you need more performance, just add more worker nodes in cluster creation and you are ready to process terabytes of data in a batch manner.

Uploading a sample text file to GCS

Please upload the lorem_ipsum.txt file from the course folder (datasets) to your GCS bucket (e.g. pk-gcs).

Connecting to the text file in GCS

The variable "sc" allows you to interact with the Spark cluster. "sc.textFile" lets you tell Spark that there is a file (on GCS) that you want to work with:

# link to text file on GCS
text = sc.textFile("gs://pk-gcs/lorem_ipsum.txt")
text

For demo purposes, you can show the file contents in the notebook by "collecting" the RDD, i.e. transferring it to the programming environment (NEVER do this with big data).

text.collect() # only for demo purposes - remove for job submission

Applying the Wordcount logic on RDDs

Word split

The first step in applying wordcount is to split based on the blank spaces between words:

# split each line into words
words = text.flatMap(lambda line: line.split(" "))
words.collect()  # only for demo purposes - remove for job submission

Mapping and reducing to count words

Next, we want to apply the classical word count logic of MapReduce:

# count the occurrence of each word (classical MapReduce logic)
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b)
wordCounts.collect()

This should be the result:

Job deployment (outlook)

The logic we implemented could be transferred to a "real" PySpark job and scheduled regularly:

We won't execute this step here.

Results

  1. You used a Jupyter Notebook as an interactive big data tool for batch processing to perform the classical word count example using a MapReduce logic.
  1. The code you ran is big data-ready and scalable.

Shutting down the DataProc Cluster

Go to the cluster details page and hit "Delete" (in the live lecture, please leave it running):

Congratulations, you learned how to work with PySpark and the traditional "RDD-based" programming style. We won't learn how to program with RDDs in detail since there are more sophisticated libraries available for Spark (and PySpark).