In this lab, we want to make use of the PubSub service and learn how to analyze data in a realtime manner using a job in DataFlow being specified in SQL. We want to store the average sales in BigQuery. This is the flow of data:
Please create a new topic called "<initials>-topic-sales". We'll use this topic for our "sales-value" messages..
You may already want to push some messages to the new topic (they will be in the pipeline when our job starts). Let's publish one simple message with the following content:
{"product":"water", "sales":20.3}
Please open "DataFlow" → "Jobs" in the cloud console. There you can switch to BigQuery by clicking on "Create Job from SQL":
Make sure, that you are working with the "Cloud Dataflow engine" (see screenshot).
If this is not shown, please click on "More" and "Query settings":
Select the "Cloud Dataflow engine" and click on "Save":
Next, we need to add the PubSub topic as a source to our BigQuery/DataFlow environment. Click on "Add data" and then select "Cloud Dataflow sources":
Here we can select cloud storage as well as cloud PubSub topics as input data. Since we want to add an analytical stream processing job, we'll select our pubsub topic we used before:
Next, we need to specify the message format such that Dataflow is able to parse the messages and interpret the fields in it correctly. Thus, we'll add a schema to the PubSub data source. First, select the topic under data sources:
Second, click on "Edit schema":
Add the two fields "product" as string and "sales" as integer. Don't remove the timestamp, since this is delivered per message by default:
Please click on "Query Topic". The following query allows you to group by product and start of the 10 second tumbling windows. The aggregation function is "average of sales":
SELECT
product, -- we want to aggregate the sales per product (and tumbling window, see next line)
TUMBLE_START("INTERVAL 10 SECOND") AS interval_start, -- show the start timestamp of the tumbling window
AVG(sales) AS average_sales
FROM pubsub.topic.`pk-bigdata`.`pk-pubsub-sales`
GROUP BY product, TUMBLE(event_timestamp, "INTERVAL 10 SECOND") -- tumbling window grouping
It is important that your query is syntactically correct, indicated by the green arrow on the right side. Click on "Create Cloud Dataflow job" afterwards:
You should now be able to specify where the stream processing results should be written to. We can select either PubSub (which is very useful for productive environments!) or BigQuery (which we'll use since it provides also a long-term history of our realtime analytics results). We'll select the same dataset we used throughout the course (example_dwh) and call the table "average_sales_10s":
Click on "Create".
Setting up the job takes a few minutes. Click on the Job-ID to open the monitoring site for the job:
When the job is running, the output should look like this:
In case something goes wrong in job setup, you can always check the logs of Dataflow:
The output will be helpful if you need to debug your query.
Please send some messages to the topic "pk-pubsub" with the following format (varying the product and sales values):
{"product":"water", "sales":20.3}
When the job is correctly running and you pushed some messages to the topic, you should see a new table in our dataset "average_sales_10s". Please click on the table name:
Afterwards, we need to get back to the BigQuery engine in order to be able to query this new table. Click on "Query settings" under "More". Select "BigQuery engine" and leave all other parameters on default:
Click Save and afterwards on "Query table":
When querying all rows (just add a * to the query) you should see some of the realtime calculation results being historized in the table "average_sales_10s":
Check if further results are calculated when you publish more messages.
In order to avoid unnecessary costs you should stop jobs in DataFlow when they are not in use:
You can select "Cancel" In the popup window to make sure that the job is immediately stopped:
If you want to re-run the job with our parameterization (write-if-empty), you should empty or delete the table "average_sales_10s".
Unfortunately, you cannot just empty the table with this code:
BigQuery requires a WHERE statement. The following workaround (adding a "WHERE 1=1") solves this issue:
Alternatively, you can select "append" in the job output definition.
You are now able to make use of a big data-ready realtime processing framework in the cloud. DataFlow can be used for various scenarios using the SQL-like syntax. However, natively programming DataFlow (=Apache Beam) is pretty tough but is also very powerful in terms of creating auto-scaling pipelines.