In this simplified use case we want to start an interactive PySpark shell and perform the word count example.
Setup of a Dataproc cluster for further PySpark labs and execution of the map-reduce logic with spark.
Please use the following command (changed to your project and initials) to crate a DataProc cluster that has some connectors (esp. BigQuery) readily installed:
gcloud dataproc clusters create pyspark-pk --enable-component-gateway --region us-central1 --no-address --master-machine-type n2-standard-2 --master-boot-disk-type pd-balanced --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n2-standard-2 --worker-boot-disk-type pd-balanced --worker-boot-disk-size 50 --image-version 2.2-debian12 --optional-components JUPYTER
Please create a Jupyter Notebook (PySpark) on GCS and call it "BigData - Batch Processing - PySpark SQL, DataFrames, and ML".
Check if you are connected to the Spark master via the "spark context" variable sc:
sc
This should be the output:
The variable "sc" allows you to interact with the Spark cluster in an "RDD-manner". There is another variable "spark" that also allows interaction with the cluster (the so called SparkSession) "spark" in the "DataFrame-manner":
This variable can be utilized to use the higher-level functions of Spark, especially Spark SQL and DataFrames. Let's create an entry point to our data lake for our Spark application:
# provide spark the link to the webshop history (datalake)
df = spark.read.csv("gs://pk-gcs/webshop_datalake/webshop_history.csv", inferSchema=True, header=True) # network communication necessary
We use the DataFrame environment here which you can check by executing a cell with just "df" in it:
In order to be able to use SQL, we need to register a temporary table (if you access a table in the Hive metastore, this is not necessary; see the respective labs on Hive, sqoop, and data ingestion)
# create a temporary view on the data that we can simply query by SQL
df.createOrReplaceTempView("sales")
You can now execute (more or less) standard SQL analyses using the sales table.
# calculate the mean sales_value per product_name
# we use "toPandas" for a better visualization (instead of collect)
spark.sql("SELECT product_name, mean(sales_value) FROM sales GROUP BY product_name").toPandas()
This should be the result:
Please write and execute a SQL-statement that allows to compare the average sales value between the age_groups ‘18-29' and ‘30-49' for the products "red wine" and "pils" (order the output by product_name).
Please take a look at the DataFrame API documentation: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html
Your task is to execute the same logic as above using the DataFrame API: "compare the average sales value between the age_groups ‘18-29' and ‘30-49' for the products "red wine" and "pils" (order the output by product_name".
Hint: you can filter rows for example with this code:
# logical or to filter a DataFrame
df.filter((df.age_group=='18-29') | (df.age_group=='30-49')).toPandas().head()
For aggregation, the "agg"-method is recommended. Sorting can be achieved with "sort".
Please take a look at the Spark ML API documentation:
https://spark.apache.org/docs/latest/ml-guide.html
We want to train a machine learning model (Gradient Boosted Regression) with the features "age" and "product" and the target "sales_value". We will skip advanced machine learning concepts here but take a look at the core challenges in training a model in Spark.
Spark requires a specific column "features" which is actually the feature matrix. With the following code, we transfer the two columns "age" and "product" to one "vector assembled" column "features" in a new DataFrame df_ml.
# generate the feature column
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['age','product'],outputCol='features')
df_ml = va.transform(df).select('features','sales_value')
df_ml.limit(10).toPandas()
This should be the output:
The train-test-split can be done as follows:
# split the data into training and test sets (20% held out for testing)
(train, test) = df_ml.randomSplit([0.8, 0.2])
# train the model
from pyspark.ml.regression import GBTRegressor
reg = GBTRegressor(featuresCol='features', labelCol='sales_value')
model = reg.fit(train)
For model evaluation and demonstration purposes, we can make predictions on the test data:
#make predictions using the test data
predictions = model.transform(test)
# select some example rows
predictions.sample(0.01).toPandas()
We can use the test data-predictions for model evaluation as follows:
# evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
# select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="sales_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
The following code shows some examples of how to handle string columns and perform a one-hot-encoding before model training. We don't go into details on this.
# improve the model with a further (one-hot-encoded) feature
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
# convert string to numerical column
indexer = StringIndexer(inputCol="weekday", outputCol="weekday_idx")
df_indexed = indexer.fit(df).transform(df)
# encode weekday index (one-hot-encoding) and select only relevant columns
encoder = OneHotEncoder(inputCol='weekday_idx', outputCol='weekday_encoded')
encoded_df = encoder.fit(df_indexed).transform(df_indexed).select('weekday_encoded','age','product','sales_value')
# create feature column
va = VectorAssembler(inputCols=['age', 'product', 'weekday_encoded'],outputCol='features')
df_ml = va.transform(encoded_df).select('features','sales_value')
(train, test) = df_ml.randomSplit([0.8, 0.2])
reg = GBTRegressor(featuresCol='features', labelCol='sales_value')
model = reg.fit(train)
predictions = model.transform(test)
evaluator = RegressionEvaluator(labelCol="sales_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data (with weekday column) = %g" % rmse)
The RMSE should be slightly better with the additional feature.
Please create a DataSet called bq_from_pyspark:
Please perform the following simple analytical task and try to write the results to a BigQuery table called "webshop_analysis" using this code:
# read raw data
df = spark.read.csv("gs://dscb420-pk/webshop_history.csv", inferSchema=True, header=True)
# perform aggregation - rename column (BigQuery can't handle the automatically generated column name "SUM(sales_value)")
result_df = df.groupby("weekday").agg({"sales_value":"sum"}).withColumnRenamed("SUM(sales_value)", "total_sales_value")
# set a bucket for temporary storage
gcs_bucket = 'dscb420-pk'
# set BigQuery dataset name
bq_dataset = 'bq_from_pyspark'
# set BigQuery table name
bq_table = 'webshop_analysis'
# write results to BigQuery
result_df.write.format("bigquery") \
.option("table", f"{bq_dataset}.{bq_table}") \
.option("temporaryGcsBucket", gcs_bucket) \
.mode('overwrite') \
.save()
You should see this result in BigQuery:
You can also access BigQuery datasets (here a public dataset) and perform Spark-based analytics (keep in mind that you could also formulate a simple BigQuery-SQL statement for this example):
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.groupby("wiki").sum("views").toPandas()
For further examples https://github.com/tfayyaz/cloud-dataproc/blob/master/notebooks/python/1.2.%20BigQuery%20Storage%20%26%20Spark%20SQL%20-%20Python.ipynb.
Go to the cluster details page and hit "Delete" (in the live lecture, please leave it running):
Congratulations, you learned how to work with PySpark in the modern Dataframe-like (including SQL) programming style and how to train and evaluate a machine learning model.