In this lab, we want to get to know the messaging system PubSub as a representative of message brokers like Apache Kafka.

What you'll implement

Creating an example topic "pk-topic"

First, we need to create a topic. Please navigate to "PubSub" in the cloud console.

Select "Create topic". You might want to call it <initials>-pubsub. You may keep the selection to create "a default subscription" (see section below).

We cannot just "listen" to the topic (i.e. pulling data from it) but need to create a so-called subscription from which we can pull the messages. This structure allows different consumers on a topic each receiving all messages.

Navigate to the "subscription" page and create a subscription:

Call it, e.g., <initials>-subscription:

Leave all other fields as default.

Publishing sample messages

Please open another browser tab and navigate to the topics page. Select "Publish recurring message":

Let's publish this simple message every 10s, 20 times:

Pulling messages

Please open the other browser tab with the subscription page. Open "View messages" (by the way: notice "Replay messages" - this is retention).

After hitting pull, you should see this screen with all messages being sent so far:

Results

  1. You learned how to create PubSub topics and subscriptions as well as send and receive sample messages via the cloud console.

Service accounts

In order to allow software components to access specific services in the cloud, one way is to create a specific account for the component. Let's create a service account which has the rights to push and pull messages within PubSub in our project.

Please navigate to "IAM & Admin" → "Service Accounts". Click on "Create service account":

First, we need to give it a name, e.g. "<initials>-pubsub":

Next, we need to provide this service account the rights to publish and consume messages by adding the following two roles to it:

The third screen can be left empty:

Creating a key for application access

Finally, we need a mechanism for our application / software component to authenticate against GCP (in special the PubSub service). Please "Manage keys" for our newly generated service account:

Click on "Add key" in the following screen.

Select JSON:

A JSON file should be downloaded automatically.

Results

  1. You now have a rough understanding of the service account concept and know how to create these.
  2. You know how to create JSON key files and download one for our newly created account.

Starting Colab as External Jupyter Notebook

Let's use a further interesting service offering: Google Colab. Please navigate to https://colab.research.google.com/ and provide the necessary access rights.

If you don't want to use Colab, you can also rely on a local Juypter / Anaconda installation or a notebook in GCP.

Creating a New Notebook

Please create a new notebook and call it "PubSub-Access".

Uploading the JSON File

We want to authenticate our notebook with the service account. Hence, it needs access to the JSON key file we just downloaded. Click on the "folder" icon on the left:

There you can either upload your JSON file for the currently running Colab session or connect your Drive folders to this notebook. We'll try the former option:

Installing the pubsub package

Unfortunately, Colab does not ship the PubSub Python package. We need to install it using:

!pip install --upgrade google-cloud-pubsub

You'll need to restart the session once.

Using the JSON File as Authentication in the Notebook

Now we can use the pubsub client and initiate the connection to PubSub using our json key file

from google.cloud import pubsub_v1
from google.oauth2 import service_account

# set this to your initials
project_id = "pk-bigdata"
topic_id = "pk-pubsub"

# the downloaded key filename which you uploaded to this Colab session
key_path = "<FILENAME>.json"

# create credentials
credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

# create publisher and topic objects
publisher = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = publisher.topic_path(project_id, topic_id)

Results

  1. You set up the connection from Python to PubSub via a service account.

Publishing a Message

You are now ready to publish messages to the PubSub service using these commands:

data = "Message from Python"
data = data.encode("utf-8") # data must be a bytestring
    
future = publisher.publish(topic_path, data)
print(f"Published message with ID {future.result()}")

If everything works fine, you should see a message ID as confirmation.

Results

  1. You published messages from Python to the serverless message broker service "PubSub".

Duplicating the Notebook

Please create a copy of your notebook and call it "PubSubAccess - Receiving Messages".

You will unfortunately need to install the pubsub client and upload the json file again:

!pip install --upgrade google-cloud-pubsub

Execute the first cell and remove the second cell with the "message sending" code.

Listening to Incoming Messages

Receiving messages is an asynchronous task, i.e. we need to define some kind of callback method that should be executed when a new message arrives. The following code implements this requirement (listening to messages times out after 50 seconds - you can adapt the variable timeout respectively):

subscription_id = "pk-subscription"
timeout = 50.0  # number of seconds the subscriber should listen for messages

subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    """ This is the method which is called when a message arrives """
    print("Received message: {}".format(message))
    message.ack()  # we acknowledge the message

# start the subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# wrap subscriber in a 'with' block to automatically call close() when done
with subscriber:
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

This should be the output, depending on your message data:

Results

  1. You now know how to retrieve messages from PubSub via Python.
  2. Remember: the serverless PubSub service is big data-ready since GCP copes for the auto-scaling of the PubSub infrastructure. You just pay by the number of messages being sent. Scalability is provided!

PK: Allow public access to the topic!

Please add the following code to your message sending notebook "PubSub-Access":

# Public
project_id_public = "pk-bigdata"
topic_id_public = "pk-pubsub-public"

# create publisher and topic objects
publisher_public = pubsub_v1.PublisherClient(credentials=credentials)
topic_path_public = publisher_public.topic_path(project_id_public, topic_id_public)

data_public = "My message in public"
data_public = data_public.encode("utf-8") # data must be a bytestring
    
future_public = publisher_public.publish(topic_path_public, data_public)
print(f"Published message with ID {future_public.result()}")

Change the value of variable data_public and hit send. We'll use this to show the purpose of systems like PubSub.

Results

  1. You published further messages, this time to a public topic.

PK: Remove public access to the topic!

You should de-activate your service account in order to avoid its unauthorized usage. Go to "IAM & Admin" → "Service Accounts" and disable the account:

Results

  1. You finished this lab by disallowing access to PubSub via the service account.

You are now able to make use of a big data-ready realtime message broker system in the cloud. PubSub can be used in a multitude of ways in various GCP products. Understanding this system is very useful for creating realtime-ready big data architectures.