In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).
Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.
Please set up the cluster according to the lab RDBMS to GCS (Avro/Parquet) with sqoop
Let's now start the sqoop job with a further parameter "--as-parquetfile":
sqoop import --connect jdbc:mysql://<db-IP see Moodle>/<dbname, see Moodle> --userna
me <see Moodle> --password <see Moodle> --table sales --m 1 --target-dir gs://hdm-kueppers/sales_from_sqoop_parquet --as-parquetfile
The sqoop job should create the new folder within your bucket.
Let's take a quick look at the Parquet file in a notebook:
import pandas as pd
df = pd.read_parquet("gs://hdm-kueppers/sales_from_sqoop_tst_parquet/part-m-00000.parquet")
print(df.dtypes)
df.head(5)
This should be the output:
Do you see any issues?
Let's now start the sqoop job with a further parameter "--as-avrodatafile":
sqoop import --connect jdbc:mysql://<db-IP see Moodle>/<dbname, see Moodle> --userna
me <see Moodle> --password <see Moodle> --table sales --m 1 --target-dir gs://hdm-kueppers/sales_from_sqoop_avro --as-avrodatafile
Open the notebook you used previously and install fastavro first:
!pip install fastavro
Next, we can read the avro file:
import pandas as pd
from fastavro import reader
import gcsfs
# Define the GCS path to the Avro file
gcs_path = 'path_to_your_avro_file'
# Create GCS file system object
fs = gcsfs.GCSFileSystem(project='your-gcp-project-id')
# Open and read the Avro file
with fs.open(gcs_path, 'rb') as f:
avro_reader = reader(f)
records = list(avro_reader)
# Convert to DataFrame
df = pd.DataFrame(records)
print(df.dtypes)
df.head()
Although we have not learned much about Spark yet, we want to take a quick glance at how a Parquet file can be loaded into our cluster's worker nodes for distributed (big data) processing.
In the shell you used for executing the sqoop jobs (i.e. the cluster master node), please enter the following command to start an interactive PySpark session:
pyspark
After few seconds, you should see the interactive PySpark shell:
With the following command, we can tell the spark cluster that there is a Parquet file in our GCS (or also HDFS) and inspect the schema:
df = spark.read.load("gs://hdm-kueppers/sales_from_sqoop_tst_parquet/*.parquet")
df
This should be the output:
With the following command, we can inspect the first three rows:
df.head(3)
This should be the output:
Please close the interactive PySpark shell with this command:
exit()
This should be the output:
Please make sure to delete your cluster (in the live-lecture, please leave your cluster running):
Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in popular big data formats (Avro and Parquet). You also got a first impression of how to access these file formats from Pandas and PySpark (which you'll get to know in more detail later).