In this lab, we want to make use of the PubSub service (in special PubSubLite since this is the product that has a pyspark-connector available) and learn how to analyze data in a realtime manner using Spark Streaming. We just want to count orders of products using a tumbling window scheme. This is the flow of data:
In order to be able to generate a PubSub Lite topic, we first need a reservation. Please navigate to Pub/Sub → Lite Reservations:
Create a new lite reservation:
Please use us-central1 as the location. You can name your reservation for example "pk-lite-res". We'll leave the throughput capacity with the default value of 1 and hit "create".
Please navigate to "Lite Topics" and create a new Pub/Sub lite topic called "<initials>-streaming". We'll use this topic for our simple "Product-Buy" string messages.
It should be a regional topic - please select "us-central1" again.
Hit continue and select your previously generated lite reservation. Do not alter the throughput parameters. "Message storage" will also be set to the default values.
This should be the output of the review:
Please navigate to "Lite Subscriptions" and create a Lite Subscription that is called "pk-streaming-sub" and link it to the previously generated topic:
Pub/Sub Lite does not allow the manual publishing of messages. Thus, we'll create a Python-file in our cloud console shell that can be executed in order to publish messages.
First, please open the cloud shell:
We need to install the pubsub-lite Python package on our cloud shell instance when it is running using this command (copy & paste to the terminal):
pip install --upgrade google-cloud-pubsublite
Next, we swith to the Editor view in order to enter the code shown below:
Please create a new file:
and save it as send_messages.py:
Please add the following code snippet:
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
import numpy as np
project_number = number
cloud_region = "us-central1"
topic_id = "pk-streaming"
location = CloudRegion(cloud_region)
topic_path = TopicPath(project_number, location, topic_id)
products = ['Apple', 'Banana', 'Tomato']
def send_messages(n):
for i in range(n):
with PublisherClient() as publisher_client:
data = f"{np.random.choice(products)}"
api_future = publisher_client.publish(
topic_path, data.encode("utf-8")
)
message_id = api_future.result()
print(f"{data} sent with message id {message_id}")
send_messages(10)
You'll need to adapt the values of two variables:
In order to find your project number, please got to the project settings:
You will find your project number there:
Copy and paste the project number into the editor and save the file. The topic name should also be changed already:
Next, switch back to the terminal:
Run your python program as follows:
python send_messages.py
You should see output similar to this (the products will differ and the numbers will be lower)
This time, we will be using the cloud shell to create our cluster. Nevertheless, please navigate to DataProc already:
Run the following command in the cloud shell (but first, please change cluster-pk to your initials, e.g. in an editor):
gcloud dataproc clusters create cluster-pk --enable-component-gateway --region us-central1 --zone us-central1-a --master-machine-type n1-standard-2 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50 --scopes=https://www.googleapis.com/auth/cloud-platform --optional-components JUPYTER
The cluster is now created via the CLI (command line interface) to GCP and not the web console:
You should see the cluster creation ("provisioning") also in the list:
Please click on the cluster name when it is finished provisioning:
Click on "Web Interfaces":
Scroll down to JupyterLab and click on it. A new tab will appear which you might want to detach from your browser such that you can code in JupyterLab as well as use the cloud shell terminal in parallel.
Please create a new "Python 3" notebook (not PySpark!):
Put the following code into the first cell:
# connecting to pyspark and providing the spark streaming to pubsublite connector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar pyspark-shell'
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
Execute the cell. You should see output of the cell similar to this:
The variable "spark" is our "entry point" to the spark cluster.
Put the following code into the second cell:
# defining the subscription to be read from and converting "data" to a string
from pyspark.sql.functions import window
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/projectnumber/locations/us-central1/subscriptions/pk-streaming-sub",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
You will need to change two aspects (see below):
Execute the cell.
Put the following code into a new cell:
# defining the streaming analytics:
# group all data based on a 10 second tumbling window (using the publish_timestamp)
# as well as the incoming data string (i.e. Apple, Banana, Tomato etc.)
# and count the occurances of the respective messages
# --> assumption: one message = one occurance of a product buy
windowedCounts = sdf.groupBy(
window(sdf.publish_timestamp, "10 seconds"),
sdf.data
).count()
Here, we'll define the counting using a 10-second tumbling window as well as the data field as group by criterion. Thus, we'll see per 10-second window and data-string the occurrences of buys (=number of messages).
Please execute the cell. There's no output of the cell.
Put the following code into a new cell:
# starting the streaming analytics query and
# pushing the output to the console
query = (
windowedCounts.writeStream.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(processingTime="0 seconds")
.start()
)
Execute the cell. This will start the realtime analytics and output should be visible (meaningful results after approximately 30 seconds), especially when you start sending messages again:
There will be especially multiple "batches" - every time spark identifies new messages in the subscription, a new batch will be shown (with all data received).
Put the following code into a new cell:
query.stop()
Execute the cell. This will stop the query.
Please create a Python file called streaming_analytics_product_count.py in your cloud shell machine using the Editor view (see above section "Publishing sample messages").
Please adapt the following parts of the code:
# Wait 60 seconds to start receiving messages.
query.awaitTermination(60)
query.stop()
Your code should look similar to this:
# connecting to pyspark and providing the spark streaming to pubsublite connector
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
# defining the subscription to be read from and converting "data" to a string
from pyspark.sql.functions import window
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/projectnumber/locations/us-central1/subscriptions/pk-streaming-sub",
)
.load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
# defining the streaming analytics:
# group all data based on a 10 second tumbling window (using the publish_timestamp)
# as well as the incoming data string (i.e. Apple, Banana, Tomato etc.)
# and count the occurances of the respective messages
# --> assumption: one message = one occurance of a product buy
windowedCounts = sdf.groupBy(
window(sdf.publish_timestamp, "10 seconds"),
sdf.data
).count()
# starting the streaming analytics query and
# pushing the output to the console
query = (
windowedCounts.writeStream.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(processingTime="0 seconds")
.start()
)
# test-mode: stop after 5 minutes
query.awaitTermination(300)
query.stop()
Go back to the cloud shell. Submit your Python file as a job using the following command (adapt the cluster name):
gcloud dataproc jobs submit pyspark streaming_analytics_product_count.py \
--region=us-central1 \
--cluster=cluster-pk \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--properties=spark.master=yarn
Wait some time until the job is up and running. Then send some example messages (e.g. in a second cloud shell tab) using our previously implemented send_messages.py tool:
The job output should show some changes (further batches) when you generate new messages:
The job should be finished after 5 minutes. You can monitor running jobs under "Jobs":
When selecting your job, you'll see some metrics of you job as well as the output we monitored in the shell before:
If you don't want to wait 5 minutes, you can stop the job here.
Please delete your DataProc cluster:
The PubSub-Lite-Reservation incurs cost for the reserved throughput capacity. Please delete delete the topic and afterwards the reservation in order to avoid high running costs.
You are now able to make use of a big data-ready realtime processing framework in the cloud. DataFlow can be used for various scenarios using the SQL-like syntax. However, natively programming DataFlow (=Apache Beam) is pretty tough but is also very powerful in terms of creating auto-scaling pipelines.