In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.

What you'll implement

Please set up the cluster according to the lab RDBMS to GCS (Avro/Parquet) with sqoop

Executing the job

Let's now start the sqoop job with a further parameter "--as-parquetfile":

sqoop import --connect jdbc:mysql://<db-IP see Moodle>/<dbname, see Moodle> --userna
me <see Moodle> --password <see Moodle> --table sales --m 1 --target-dir gs://hdm-kueppers/sales_from_sqoop_parquet --as-parquetfile

Checking the results

The sqoop job should create the new folder within your bucket.

Inspecting the Parquet File

Let's take a quick look at the Parquet file in a notebook:

import pandas as pd
df = pd.read_parquet("gs://hdm-kueppers/sales_from_sqoop_tst_parquet/part-m-00000.parquet")
print(df.dtypes)
df.head(5)

This should be the output:

Do you see any issues?

Results

  1. You executed a sqoop (MapReduce) job with a further configuration (output as Parquetfile).
  2. This setup is big data ready for heavy workloads, since MapReduce is capable of scaling out (horizontally) and if necessary, you can add worker nodes to your cluster (however, the SQL database might become a bottleneck).

Executing the job

Let's now start the sqoop job with a further parameter "--as-avrodatafile":

sqoop import --connect jdbc:mysql://<db-IP see Moodle>/<dbname, see Moodle> --userna
me <see Moodle> --password <see Moodle> --table sales --m 1 --target-dir gs://hdm-kueppers/sales_from_sqoop_avro --as-avrodatafile

Checking the results

Open the notebook you used previously and install fastavro first:

!pip install fastavro

Next, we can read the avro file:

import pandas as pd
from fastavro import reader
import gcsfs

# Define the GCS path to the Avro file
gcs_path = 'path_to_your_avro_file'

# Create GCS file system object
fs = gcsfs.GCSFileSystem(project='your-gcp-project-id')

# Open and read the Avro file
with fs.open(gcs_path, 'rb') as f:
    avro_reader = reader(f)
    records = list(avro_reader)

# Convert to DataFrame
df = pd.DataFrame(records)

print(df.dtypes)
df.head()

Results

  1. You executed a sqoop (MapReduce) job with a further configuration (output as Avrofile).

"Big data" load in PySpark

Opening a PySpark session

Although we have not learned much about Spark yet, we want to take a quick glance at how a Parquet file can be loaded into our cluster's worker nodes for distributed (big data) processing.

In the shell you used for executing the sqoop jobs (i.e. the cluster master node), please enter the following command to start an interactive PySpark session:

pyspark

After few seconds, you should see the interactive PySpark shell:

Directing the cluster towards the Parquet files

With the following command, we can tell the spark cluster that there is a Parquet file in our GCS (or also HDFS) and inspect the schema:

df = spark.read.load("gs://hdm-kueppers/sales_from_sqoop_tst_parquet/*.parquet")
df

This should be the output:

With the following command, we can inspect the first three rows:

df.head(3)

This should be the output:

Please close the interactive PySpark shell with this command:

exit()

This should be the output:

Results

  1. You loaded Parquet files from cloud storage into the memory of a JupyterLab (AI Platform) virtual machine using Pandas.
  2. You loaded Parquet files from cloud storage into a Spark cluster using the interactive PySpark shell on the DataProc cluster's master node.

Please make sure to delete your cluster (in the live-lecture, please leave your cluster running):

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in popular big data formats (Avro and Parquet). You also got a first impression of how to access these file formats from Pandas and PySpark (which you'll get to know in more detail later).