In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).

Goal

Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.

What you'll implement

Please set up the cluster according to the lab RDBMS to GCS (Avro/Parquet) with sqoop

Cluster creation will take 2-3 minutes.

Connecting to the Hadoop master node

When cluster creation is finished, you can select the cluster in the web UI:

Open a connection to the master node which is able to start sqoop jobs:

Results

  1. You created a Hadoop cluster (DataProc) which provides functionality for accessing cloud SQL (initialization action "cloud-sql-proxy") as well as the sqoop component for batch ingestion of relational data into our data lake.
  2. Next, we'll use the connection to the master node and execute our sqoop data ingestion job.

Preparation

First, we'll delete the output from our first sqoop run, since sqoop tries to write into the HDFS directory "sales" and will skip the job in case this already exists:

hadoop fs -rm sales/*
hadoop fs -rmdir sales

Executing the job

Let's now start the sqoop job with a further parameter "--hive-import":

sqoop import --connect jdbc:mysql://<db-IP see Moodle>/<dbname, see Moodle> --userna
me <see Moodle> --password <see Moodle> --table sales --m 1 --hive-import

WORKAROUND: Manual Interaction with Hive and Data Ingestion

Connecting to beeline

Due to the mentioned issue we will now start interacting with Hive directly in order to ingest data via "standard SQL" (like in the lab "Simple Analytics Pipeline"). As stated in the lecture, there exists a command line interface (CLI) for Hive called "beeline".

Please open the SSH connection to the master node and execute the following command in order to start Hive's CLI beeline and connect to our cluster's Hive metastore (running on the master node "localhost"):

beeline -u "jdbc:hive2://localhost:10000"

This should be the output:

Creating a Parquet-based Hive table

Using Hive, we can interact with the (structured data in the) data lake via standard SQL. Hive will create respective files in the data lake (just like the sqoop command above would have). Executing the following command will create a folder in our data lake on cloud storage (under "hive-warehouse" called sales and will add a table to the Hive metastore. All data we will ingest will be stored as Parquet files.

create table sales (
sales_date date,
weekday varchar(20),
region varchar(20),
age_group varchar(20),
product_name varchar(30),
product_group int,
sales_value float
) STORED AS PARQUET;

The output should be "No rows affected".

Ingesting data (manually) into the Hive warehouse

After having created the table, we can ingest data - in this workaround manually. Let's use the same data as in the first lab (i.e. the same as in the cloud SQL database):

insert into sales values
('2020-04-03', 'Friday', 'North', '50-', 'whiskey', '3', 21.5),
('2020-04-01', 'Wednesday', 'North', '50-', 'white wine', '1', 4.4),
('2020-04-01', 'Wednesday', 'South', '18-29', 'red wine', '1', 12.8),
('2020-05-28', 'Thursday', 'Middle', '30-49', 'lager', '2', 47.1),
('2020-06-10', 'Wednesday', 'North', '50-', 'grappa', '3', 14.4),
('2020-06-11', 'Thursday', 'Middle', '18-29', 'red wine', '1', 13.5),
('2020-03-11', 'Wednesday', 'North', '18-29', 'red wine', '1', 4.0),
('2020-05-07', 'Thursday', 'North', '50-', 'grappa', '3', 17.3),
('2020-04-10', 'Friday', 'Middle', '30-49', 'red wine', '1', 10.6),
('2020-01-24', 'Friday', 'North', '30-49', 'pils', '2', 29.4),
('2020-04-13', 'Monday', 'North', '18-29', 'pils', '2', 54.0),
('2020-02-26', 'Wednesday', 'North', '50-', 'red wine', '1', 4.5),
('2020-04-11', 'Saturday', 'Middle', '18-29', 'whiskey', '3', 45.3),
('2020-03-15', 'Sunday', 'South', '50-', 'white wine', '1', 16.7),
('2020-05-15', 'Friday', 'South', '30-49', 'white wine', '1', 32.7),
('2020-03-04', 'Wednesday', 'North', '50-', 'pils', '2', 33.2),
('2020-05-10', 'Sunday', 'Middle', '50-', 'rosé wine', '1', 5.6),
('2020-02-19', 'Wednesday', 'North', '50-', 'white wine', '1', 5.4),
('2020-03-04', 'Wednesday', 'Middle', '30-49', 'rosé wine', '1', 24.2),
('2020-05-28', 'Thursday', 'Middle', '50-', 'white wine', '1', 13.3),
('2020-01-24', 'Friday', 'Middle', '50-', 'white wine', '1', 14.9),
('2020-02-19', 'Wednesday', 'Middle', '50-', 'whiskey', '3', 34.9),
('2020-05-27', 'Wednesday', 'Middle', '50-', 'whiskey', '3', 45.6),
('2020-01-23', 'Thursday', 'South', '50-', 'white wine', '1', 73.0),
('2020-03-06', 'Friday', 'South', '30-49', 'rosé wine', '1', 5.1),
('2020-04-16', 'Thursday', 'North', '50-', 'white wine', '1', 4.4),
('2020-01-17', 'Friday', 'South', '50-', 'white wine', '1', 23.2),
('2020-01-12', 'Sunday', 'Middle', '18-29', 'red wine', '1', 4.0),
('2020-01-27', 'Monday', 'Middle', '50-', 'red wine', '1', 30.2),
('2020-01-01', 'Wednesday', 'Middle', '30-49', 'red wine', '1', 33.2);

Ingestion will take approximately 30s (remember, Hive is based on MapReduce).

Checking the results

When selecting all data from the sales table, you should see the following output in beeline:

Please hit Ctrl-D to exit beeline. Within HDFS, we now should see a respective folder:

hadoop fs -ls /user/hive/warehouse

There, you'll be able to take a look at the data file which is augmented by Hive schema information:

hadoop fs -cat /user/hive/warehouse/sales/000000_0

→ Hive acts as a useful SQL-based interface to our data lake and allows the ingestion of big data (with sqoop in a scaling manner) including metadata in the Hive metastore.

Results

  1. Instead of executing a sqoop (MapReduce) job with a further configuration (ingestion into the hive warehouse), you learned how to use the Hive CLI beeline to create a table and ingest data using "standard" SQL.
  2. The sqoop setup would be big data ready for heavy workloads and Hive provides a good meta-layer on top of our data lake.

PySpark is directly integrated with Hive. Thus, you can query respective tables in the data lake from Spark with the following command:

df = spark.sql("select * from sales")
df.head()

This should be the output:

We will learn how to use PySpark in detail later. Please close the PySpark shell using exit().

Please make sure to delete your cluster:

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in a popular big data format (Parquet) and used Hive to manage your metadata of the data lake. You also got a first impression of how to access the data lake via Hive from PySpark (which you'll get to know in more detail later).