In this lab, we want to make use of the PubSub service (in special PubSubLite since this is the product that has a pyspark-connector available) and learn how to analyze data in a realtime manner using Spark Streaming. We just want to count orders of products using a tumbling window scheme. This is the flow of data:

What you'll implement

Creation of a new "lite reservation"

In order to be able to generate a PubSub Lite topic, we first need a reservation. Please navigate to Pub/Sub → Lite Reservations:

Create a new lite reservation:

Please use us-central1 as the location. You can name your reservation for example "pk-lite-res". We'll leave the throughput capacity with the default value of 1 and hit "create".

Creation of a new topic "pk-streaming"

Please navigate to "Lite Topics" and create a new Pub/Sub lite topic called "<initials>-streaming". We'll use this topic for our simple "Product-Buy" string messages.

It should be a regional topic - please select "us-central1" again.

Hit continue and select your previously generated lite reservation. Do not alter the throughput parameters. "Message storage" will also be set to the default values.

This should be the output of the review:

Creation of a new subscription "pk-streaming-sub"

Please navigate to "Lite Subscriptions" and create a Lite Subscription that is called "pk-streaming-sub" and link it to the previously generated topic:

Publishing sample messages

Pub/Sub Lite does not allow the manual publishing of messages. Thus, we'll create a Python-file in our cloud console shell that can be executed in order to publish messages.

First, please open the cloud shell:

We need to install the pubsub-lite Python package on our cloud shell instance when it is running using this command (copy & paste to the terminal):

pip install --upgrade google-cloud-pubsublite

Next, we swith to the Editor view in order to enter the code shown below:

Please create a new file:

and save it as send_messages.py:

Please add the following code snippet:

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)
import numpy as np

project_number = number
cloud_region = "us-central1"
topic_id = "pk-streaming"

location = CloudRegion(cloud_region)
topic_path = TopicPath(project_number, location, topic_id)

products = ['Apple', 'Banana', 'Tomato']

def send_messages(n):
    for i in range(n):
        with PublisherClient() as publisher_client:
            data = f"{np.random.choice(products)}"
            api_future = publisher_client.publish(
                topic_path, data.encode("utf-8")
            )
            message_id = api_future.result()
            print(f"{data} sent with message id {message_id}")

send_messages(10)

You'll need to adapt the values of two variables:

  1. Change the topic-id to your previously created topic name.
  2. Change the project number which needs to be found out as shown below.

In order to find your project number, please got to the project settings:

You will find your project number there:

Copy and paste the project number into the editor and save the file. The topic name should also be changed already:

Next, switch back to the terminal:

Run your python program as follows:

python send_messages.py

You should see output similar to this (the products will differ and the numbers will be lower)

Results

  1. You are now ready on the data source-side, i.e. able to publish messages to a Pub/Sub Lite topic.
  2. Next, we want to set up the streaming analytics architecture.

Cluster Creation

This time, we will be using the cloud shell to create our cluster. Nevertheless, please navigate to DataProc already:

Run the following command in the cloud shell (but first, please change cluster-pk to your initials, e.g. in an editor):

gcloud dataproc clusters create cluster-pk --enable-component-gateway --region us-central1 --zone us-central1-a --master-machine-type n1-standard-2 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50 --scopes=https://www.googleapis.com/auth/cloud-platform --optional-components JUPYTER

The cluster is now created via the CLI (command line interface) to GCP and not the web console:

You should see the cluster creation ("provisioning") also in the list:

Results

  1. You configured and started a Spark cluster which is capable of reading the Pub/Sub Lite topic's messages and analyzing these using Spark's structured streaming approach.

Opening JupyterHub

Please click on the cluster name when it is finished provisioning:

Click on "Web Interfaces":

Scroll down to JupyterLab and click on it. A new tab will appear which you might want to detach from your browser such that you can code in JupyterLab as well as use the cloud shell terminal in parallel.

Please create a new "Python 3" notebook (not PySpark!):

Connecting to the Spark Cluster

Put the following code into the first cell:

# connecting to pyspark and providing the spark streaming to pubsublite connector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar pyspark-shell'

import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

Execute the cell. You should see output of the cell similar to this:

The variable "spark" is our "entry point" to the spark cluster.

Defining the Connection to our Pub/Sub-Lite Subscription and Converting the Data Field

Put the following code into the second cell:

# defining the subscription to be read from and converting "data" to a string
from pyspark.sql.functions import window

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/projectnumber/locations/us-central1/subscriptions/pk-streaming-sub",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

You will need to change two aspects (see below):

  1. Change the project number to yours.
  2. Change the name of the subscription to yours.

Execute the cell.

Defining the Streaming Analytics

Put the following code into a new cell:

# defining the streaming analytics:
# group all data based on a 10 second tumbling window (using the publish_timestamp)
# as well as the incoming data string (i.e. Apple, Banana, Tomato etc.)
# and count the occurances of the respective messages
# --> assumption: one message = one occurance of a product buy
windowedCounts = sdf.groupBy(
    window(sdf.publish_timestamp, "10 seconds"),
    sdf.data
).count()

Here, we'll define the counting using a 10-second tumbling window as well as the data field as group by criterion. Thus, we'll see per 10-second window and data-string the occurrences of buys (=number of messages).

Please execute the cell. There's no output of the cell.

Starting the Streaming Analytics

Put the following code into a new cell:

# starting the streaming analytics query and
# pushing the output to the console
query = (
    windowedCounts.writeStream.format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .trigger(processingTime="0 seconds")
    .start()
)

Execute the cell. This will start the realtime analytics and output should be visible (meaningful results after approximately 30 seconds), especially when you start sending messages again:

There will be especially multiple "batches" - every time spark identifies new messages in the subscription, a new batch will be shown (with all data received).

Stopping the Streaming Analytics

Put the following code into a new cell:

query.stop()

Execute the cell. This will stop the query.

Results

  1. You created a simple streaming analytics architecture using spark streaming.
  2. The programming model is comparable to that of "normal" spark applications.

Conversion of the Code to a Python file

Please create a Python file called streaming_analytics_product_count.py in your cloud shell machine using the Editor view (see above section "Publishing sample messages").

Please adapt the following parts of the code:

  1. Remove "import os" and "os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar pyspark-shell'" since we are going to provide the necessary jar-files as a parameter to the job.
  2. Add the following three lines at the end of the file
  1. # Wait 60 seconds to start receiving messages.
  2. query.awaitTermination(60)
  3. query.stop()

Your code should look similar to this:

# connecting to pyspark and providing the spark streaming to pubsublite connector
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

# defining the subscription to be read from and converting "data" to a string
from pyspark.sql.functions import window

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/projectnumber/locations/us-central1/subscriptions/pk-streaming-sub",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

# defining the streaming analytics:
# group all data based on a 10 second tumbling window (using the publish_timestamp)
# as well as the incoming data string (i.e. Apple, Banana, Tomato etc.)
# and count the occurances of the respective messages
# --> assumption: one message = one occurance of a product buy
windowedCounts = sdf.groupBy(
    window(sdf.publish_timestamp, "10 seconds"),
    sdf.data
).count()


# starting the streaming analytics query and
# pushing the output to the console
query = (
    windowedCounts.writeStream.format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .trigger(processingTime="0 seconds")
    .start()
)

# test-mode: stop after 5 minutes
query.awaitTermination(300)
query.stop()

Running the job

Go back to the cloud shell. Submit your Python file as a job using the following command (adapt the cluster name):

gcloud dataproc jobs submit pyspark streaming_analytics_product_count.py \
    --region=us-central1 \
    --cluster=cluster-pk \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --properties=spark.master=yarn

Wait some time until the job is up and running. Then send some example messages (e.g. in a second cloud shell tab) using our previously implemented send_messages.py tool:

The job output should show some changes (further batches) when you generate new messages:

The job should be finished after 5 minutes. You can monitor running jobs under "Jobs":

When selecting your job, you'll see some metrics of you job as well as the output we monitored in the shell before:

If you don't want to wait 5 minutes, you can stop the job here.

Results

  1. You created a streaming analytics application and submitted this as a job to your cluster. This is the operational mode that comes closer to "real-world" big data architectures.

Cluster deletion

Please delete your DataProc cluster:

Topic, Subscription, and Capacity Reservation Deletion

The PubSub-Lite-Reservation incurs cost for the reserved throughput capacity. Please delete delete the topic and afterwards the reservation in order to avoid high running costs.

Results

  1. You finished this lab by shutting down the running job in DataFlow and clearing the output table.

You are now able to make use of a big data-ready realtime processing framework in the cloud. DataFlow can be used for various scenarios using the SQL-like syntax. However, natively programming DataFlow (=Apache Beam) is pretty tough but is also very powerful in terms of creating auto-scaling pipelines.