In this simplified use case we want to start an interactive PySpark shell and perform the word count example.

Goal

Setup of a Dataproc cluster for further PySpark labs and execution of the map-reduce logic with spark.

What you'll implement

CLI Command

Please use the following command (changed to your project and initials) to crate a DataProc cluster that has some connectors (esp. BigQuery) readily installed:

gcloud dataproc clusters create pyspark-pk --enable-component-gateway --region us-central1 --no-address --master-machine-type n2-standard-2 --master-boot-disk-type pd-balanced --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n2-standard-2 --worker-boot-disk-type pd-balanced --worker-boot-disk-size 50 --image-version 2.2-debian12 --optional-components JUPYTER

Results

  1. You configured and started a Hadoop cluster in GCP with some additional connectors.

See previous Lab

Please create a Jupyter Notebook (PySpark) on GCS and call it "BigData - Batch Processing - PySpark SQL, DataFrames, and ML".

Connecting to the PySpark session and reading the data lake

Check if you are connected to the Spark master via the "spark context" variable sc:

sc

This should be the output:

Results

  1. You created a Jupyter Notebook as an interactive big data tool for batch processing.

Connecting to a csv file in GCS (creating the DataFrame)

The variable "sc" allows you to interact with the Spark cluster in an "RDD-manner". There is another variable "spark" that also allows interaction with the cluster (the so called SparkSession) "spark" in the "DataFrame-manner":

This variable can be utilized to use the higher-level functions of Spark, especially Spark SQL and DataFrames. Let's create an entry point to our data lake for our Spark application:

# provide spark the link to the webshop history (datalake)
df = spark.read.csv("gs://pk-gcs/webshop_datalake/webshop_history.csv", inferSchema=True, header=True)  # network communication necessary

We use the DataFrame environment here which you can check by executing a cell with just "df" in it:

Registering a temporary table

In order to be able to use SQL, we need to register a temporary table (if you access a table in the Hive metastore, this is not necessary; see the respective labs on Hive, sqoop, and data ingestion)

# create a temporary view on the data that we can simply query by SQL
df.createOrReplaceTempView("sales")

Querying the temporary table

You can now execute (more or less) standard SQL analyses using the sales table.

# calculate the mean sales_value per product_name
# we use "toPandas" for a better visualization (instead of collect)
spark.sql("SELECT product_name, mean(sales_value) FROM sales GROUP BY product_name").toPandas()

This should be the result:

Your Analytical Task

Please write and execute a SQL-statement that allows to compare the average sales value between the age_groups ‘18-29' and ‘30-49' for the products "red wine" and "pils" (order the output by product_name).

Results

  1. You used a Jupyter Notebook as an interactive big data tool for batch processing to perform SQL-based queries.
  2. The code you ran is big data-ready and scalable.

The DataFrame API

Please take a look at the DataFrame API documentation: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html

Your Analytical Task

Your task is to execute the same logic as above using the DataFrame API: "compare the average sales value between the age_groups ‘18-29' and ‘30-49' for the products "red wine" and "pils" (order the output by product_name".

Hint: you can filter rows for example with this code:

# logical or to filter a DataFrame
df.filter((df.age_group=='18-29') | (df.age_group=='30-49')).toPandas().head()

For aggregation, the "agg"-method is recommended. Sorting can be achieved with "sort".

Results

  1. You used the PySpark DataFrame API to implement a query.

The Machine Learning API

Please take a look at the Spark ML API documentation:

https://spark.apache.org/docs/latest/ml-guide.html

Your Analytical Task

We want to train a machine learning model (Gradient Boosted Regression) with the features "age" and "product" and the target "sales_value". We will skip advanced machine learning concepts here but take a look at the core challenges in training a model in Spark.

Generating the Feature Vector

Spark requires a specific column "features" which is actually the feature matrix. With the following code, we transfer the two columns "age" and "product" to one "vector assembled" column "features" in a new DataFrame df_ml.

# generate the feature column
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['age','product'],outputCol='features')
df_ml = va.transform(df).select('features','sales_value')
df_ml.limit(10).toPandas()

This should be the output:

Train-test-split

The train-test-split can be done as follows:

# split the data into training and test sets (20% held out for testing)
(train, test) = df_ml.randomSplit([0.8, 0.2])

Training the model

# train the model
from pyspark.ml.regression import GBTRegressor
reg = GBTRegressor(featuresCol='features', labelCol='sales_value')
model = reg.fit(train)

Making predictions

For model evaluation and demonstration purposes, we can make predictions on the test data:

#make predictions using the test data
predictions = model.transform(test)

# select some example rows 
predictions.sample(0.01).toPandas()

Evaluating the model

We can use the test data-predictions for model evaluation as follows:

# evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

# select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="sales_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Improving the model with a further column

The following code shows some examples of how to handle string columns and perform a one-hot-encoding before model training. We don't go into details on this.

# improve the model with a further (one-hot-encoded) feature
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

# convert string to numerical column
indexer = StringIndexer(inputCol="weekday", outputCol="weekday_idx")
df_indexed = indexer.fit(df).transform(df)

# encode weekday index (one-hot-encoding) and select only relevant columns
encoder = OneHotEncoder(inputCol='weekday_idx', outputCol='weekday_encoded')
encoded_df = encoder.fit(df_indexed).transform(df_indexed).select('weekday_encoded','age','product','sales_value')

# create feature column
va = VectorAssembler(inputCols=['age', 'product', 'weekday_encoded'],outputCol='features')
df_ml = va.transform(encoded_df).select('features','sales_value')

(train, test) = df_ml.randomSplit([0.8, 0.2])

reg = GBTRegressor(featuresCol='features', labelCol='sales_value')
model = reg.fit(train)

predictions = model.transform(test)

evaluator = RegressionEvaluator(labelCol="sales_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data (with weekday column) = %g" % rmse)

The RMSE should be slightly better with the additional feature.

Results

  1. You used the PySpark with the ML-package to train a model, make predictions, and evaluate the model using our Dataproc cluster.
  2. This setup is big data machine learning ready!

Creating a DataSet

Please create a DataSet called bq_from_pyspark:

Using the BiqQuery-Connector (writing to a table)

Please perform the following simple analytical task and try to write the results to a BigQuery table called "webshop_analysis" using this code:

# read raw data
df = spark.read.csv("gs://dscb420-pk/webshop_history.csv", inferSchema=True, header=True)

# perform aggregation - rename column (BigQuery can't handle the automatically generated column name "SUM(sales_value)")
result_df = df.groupby("weekday").agg({"sales_value":"sum"}).withColumnRenamed("SUM(sales_value)", "total_sales_value")


# set a bucket for temporary storage
gcs_bucket = 'dscb420-pk'

# set BigQuery dataset name
bq_dataset = 'bq_from_pyspark'

# set BigQuery table name
bq_table = 'webshop_analysis'

# write results to BigQuery
result_df.write.format("bigquery") \
    .option("table", f"{bq_dataset}.{bq_table}") \
    .option("temporaryGcsBucket", gcs_bucket) \
    .mode('overwrite') \
    .save()

You should see this result in BigQuery:

Using the BiqQuery-Connector (read)

You can also access BigQuery datasets (here a public dataset) and perform Spark-based analytics (keep in mind that you could also formulate a simple BigQuery-SQL statement for this example):

table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.groupby("wiki").sum("views").toPandas()

For further examples https://github.com/tfayyaz/cloud-dataproc/blob/master/notebooks/python/1.2.%20BigQuery%20Storage%20%26%20Spark%20SQL%20-%20Python.ipynb.

Shutting down the DataProc Cluster

Go to the cluster details page and hit "Delete" (in the live lecture, please leave it running):

Congratulations, you learned how to work with PySpark in the modern Dataframe-like (including SQL) programming style and how to train and evaluate a machine learning model.