In this use case we want to store the contents of our webshop table in the data lake (nightly full extract).
Ingesting structured relational data into the data lake for batch processing via a (horizontally) scalable sqoop (MapReduce) job using the popular big data formats Avro and Parquet.
For this use case, we need to enable a cloud API that allows the simple connection of different services to cloud SQL databases in GCP. Please enter "cloud sql admin api" in the search bar:
Please enable this API! We will use it to establish connections from our DataProc cluster's worker nodes (running the sqoop MapReduce job) to the cloud SQL database (webshop).
Please navigate to your cloud SQL instance which needs to be running and copy the connection name into the clipboard (overview page):
In my case it is:
pk-bigdata:us-central1:pk-sql
Please navigate to the DataProc service in the cloud console.
This time we will NOT (!) create the cluster via the web interface but via the cloud shell since we need some extra options that are not available in the web UI.
Open a cloud shell:
Make sure that your project is set correctly. If not, execute the following command with your initials:
gcloud config set project pk-bigdata
We will now create a cluster from the command line in order to:
Please execute this command in the cloud shell (you may want to replace pk with your initials, make sure that all paths and the connection name to the SQL database copied before are correctly set):
gcloud dataproc clusters create pk-sqoop --region us-central1 --optional-components JUPYTER --enable-component-gateway --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/cloud-sql-proxy/cloud-sql-proxy.sh,gs://goog-dataproc-initialization-actions-us-central1/sqoop/sqoop.sh --metadata "hive-metastore-instance=pk-bigdata:us-central1:pk-sql-test" --scopes sql-admin --properties=hive:hive.metastore.warehouse.dir=gs://pk-gcs/hive-warehouse --master-machine-type n1-standard-2 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 50
Cluster creation will take 2-3 minutes.
When cluster creation is finished, you can select the cluster in the web UI:
Open a connection to the master node which is able to start sqoop jobs:
Let's now start the sqoop job with a further parameter "--hive-import":
sqoop import --connect jdbc:mysql://localhost/webshop --username root --table sales --m 1 --as-parquetfile --hive-import
Due to the mentioned issue we will now start interacting with Hive directly in order to ingest data via "standard SQL" (like in the lab "Simple Analytics Pipeline"). As stated in the lecture, there exists a command line interface (CLI) for Hive called "beeline".
Please open the SSH connection to the master node and execute the following command in order to start Hive's CLI beeline and connect to our cluster's Hive metastore (running on the master node "localhost"):
beeline -u "jdbc:hive2://localhost:10000"
This should be the output:
Using Hive, we can interact with the (structured data in the) data lake via standard SQL. Hive will create respective files in the data lake (just like the sqoop command above would have). Executing the following command will create a folder in our data lake on cloud storage (under "hive-warehouse" called sales and will add a table to the Hive metastore. All data we will ingest will be stored as Parquet files.
create table sales (
sales_date date,
weekday varchar(20),
region varchar(20),
age_group varchar(20),
product_name varchar(30),
product_group int,
sales_value float
) STORED AS PARQUET;
The output should be "No rows affected" and when checking your cloud storage bucket's folder "hive-warehouse" you should see a sales folder:
After having created the table, we can ingest data - in this workaround manually. Let's use the same data as in the first lab (i.e. the same as in the cloud SQL database):
insert into sales values
('2020-04-03', 'Friday', 'North', '50-', 'whiskey', '3', 21.5),
('2020-04-01', 'Wednesday', 'North', '50-', 'white wine', '1', 4.4),
('2020-04-01', 'Wednesday', 'South', '18-29', 'red wine', '1', 12.8),
('2020-05-28', 'Thursday', 'Middle', '30-49', 'lager', '2', 47.1),
('2020-06-10', 'Wednesday', 'North', '50-', 'grappa', '3', 14.4),
('2020-06-11', 'Thursday', 'Middle', '18-29', 'red wine', '1', 13.5),
('2020-03-11', 'Wednesday', 'North', '18-29', 'red wine', '1', 4.0),
('2020-05-07', 'Thursday', 'North', '50-', 'grappa', '3', 17.3),
('2020-04-10', 'Friday', 'Middle', '30-49', 'red wine', '1', 10.6),
('2020-01-24', 'Friday', 'North', '30-49', 'pils', '2', 29.4),
('2020-04-13', 'Monday', 'North', '18-29', 'pils', '2', 54.0),
('2020-02-26', 'Wednesday', 'North', '50-', 'red wine', '1', 4.5),
('2020-04-11', 'Saturday', 'Middle', '18-29', 'whiskey', '3', 45.3),
('2020-03-15', 'Sunday', 'South', '50-', 'white wine', '1', 16.7),
('2020-05-15', 'Friday', 'South', '30-49', 'white wine', '1', 32.7),
('2020-03-04', 'Wednesday', 'North', '50-', 'pils', '2', 33.2),
('2020-05-10', 'Sunday', 'Middle', '50-', 'rosé wine', '1', 5.6),
('2020-02-19', 'Wednesday', 'North', '50-', 'white wine', '1', 5.4),
('2020-03-04', 'Wednesday', 'Middle', '30-49', 'rosé wine', '1', 24.2),
('2020-05-28', 'Thursday', 'Middle', '50-', 'white wine', '1', 13.3),
('2020-01-24', 'Friday', 'Middle', '50-', 'white wine', '1', 14.9),
('2020-02-19', 'Wednesday', 'Middle', '50-', 'whiskey', '3', 34.9),
('2020-05-27', 'Wednesday', 'Middle', '50-', 'whiskey', '3', 45.6),
('2020-01-23', 'Thursday', 'South', '50-', 'white wine', '1', 73.0),
('2020-03-06', 'Friday', 'South', '30-49', 'rosé wine', '1', 5.1),
('2020-04-16', 'Thursday', 'North', '50-', 'white wine', '1', 4.4),
('2020-01-17', 'Friday', 'South', '50-', 'white wine', '1', 23.2),
('2020-01-12', 'Sunday', 'Middle', '18-29', 'red wine', '1', 4.0),
('2020-01-27', 'Monday', 'Middle', '50-', 'red wine', '1', 30.2),
('2020-01-01', 'Wednesday', 'Middle', '30-49', 'red wine', '1', 33.2);
Ingestion will take approximately 30s (remember, Hive is based on MapReduce).
When selecting all data from the sales table, you should see the following output in beeline:
Checking cloud storage should also show (at least one) file:
→ Hive acts as a useful SQL-based interface to our data lake and allows the ingestion of big data (with sqoop in a scaling manner) including metadata in the Hive metastore.
Please hit Ctrl-d in the SSH shell of the master node to leave Hive's CLI and start a PySpark interactive shell by executing "pyspark".
PySpark is directly integrated with Hive. Thus, you can query respective tables in the data lake from Spark with the following command:
df = spark.sql("select * from sales")
df.head()
This should be the output:
We will learn how to use PySpark in detail later. Please close the PySpark shell using exit().
Please make sure to delete your cluster:
You should also delete or at least shut down your cloud SQL instance (in the live-lecture, please leave your SQL server running!):
Preparation: please set up the cloud data fusion instance now (see next lab https://pkuep.github.io/pk-bigdata/batch_ingestion_datafusion)!
Congratulations, you set up a "big data-traditional" and horizontally scaling ingestion pipeline using a Hadoop cluster with sqoop and stored your data in a popular big data format (Parquet) and used Hive to manage your metadata of the data lake. You also got a first impression of how to access the data lake via Hive from PySpark (which you'll get to know in more detail later).