In this use case we want to store the regularly updated image of a webcam in our data lake (based on GCS) in order to, e.g., let a machine learning algorithm identify whether it's cloudy or not. We want to achieve this task in a scalable manner, i.e. be able to add potentially thousands of webcams and still be able to guarantee performance. See and check the webcam.

Goal

Ingesting unstructured image data into the data lake for batch processing via a (vertically) scalable cloud function.

What you'll implement

Creating a cloud-based Jupyter Notebook environment

If you don't have Anaconda and Jupyter Notebook installed locally on your computer, please create a notebook in GCP's AI Platform:

Create a new instance:

Choose a cheap machine type and "Python 3" for your notebook instance:

Wait until the instance is created. It should take less than a minute.

Open the JupyterLab:

Creating the Notebook and Implementing the Logic

Create a Python 3 notebook:

Create a cell and insert the following imports:

import requests  # will be used to retrieve the image via http / a URL
from IPython.display import Image  # will be used to show one exemplary image in the notebook
from datetime import datetime  # will be used to format the filename

Show the current image of this webcam using the following code in another cell:

url = 'https://www.kite-connection.at/weatherstation/webcam/rohrspitz.jpg' # image url
Image(url=url, width=300)  # show image in notebook

Now, let's download the current image to the local disk (of our JupyterLab machine):

filename = f"webcam_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
response = requests.get(url)
file = open(filename, "wb")
file.write(response.content)
file.close()

When updating the file browser you should see the image (which you can open in JuypterLab):

Upload image to GCS (only in cloud JupyterLab possible)

In case you are working in the cloud JupyterLab, you can access the cloud storage easily with the following code:

from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.get_bucket("pk-gcs")
blob = bucket.blob(filename)
blob.upload_from_string(response.content, content_type='image/png')

You should now be able to see this image in the cloud console (under Cloud Storage)

The notebook could now be executed, e.g. hourly, in order to create a history of webcam images. However, the notebook is only used for demo purposes and we now want to see how one can deploy such an ingestion logic to a cloud function.

Downloading the Source Code from GitHub (optional)

In the JupyterLab overview page (or via File → New → Terminal), you can open a console:

Within the terminal, you can enter the following code to clone the course's GitHub repository:

git clone https://github.com/pkuep/pk-bigdata-code.git

After refreshing the file browser, you should see the folder "pk-bigdata-code" in which you'll find the source code of this lab (Jupyter Notebooks → batch_ingestion → BigData - Batch Ingestion - Sample Image Retrieval.ipynb):

Results

  1. You created a Jupyter Notebook in order to understand the requirements of our ingestion logic: pulling an image via http to the data lake.
  2. You set up a Notebook environment in the cloud which is capable of accessing different resources in your GCP project, especially cloud storage.
  3. You pulled a sample image from the webcam and (1) showed it in the notebook, (2) stored it locally on the JupyterLab virtual machine, and (3) stored it on GCS, i.e. the data lake.
  4. Next, we want to transfer the logic to a cloud function.

Creating a cloud function for image retrieval and storage upload

Please go to cloud functions in the console:

Create a function. Set the function name to "image_ingestion". Set "Allow unauthenticated invocations" under "Authentication". The other parameters can be set as default. First, you'll need to hit "Save" in the "Trigger" section before being able to continue:

Hit Next. Change the runtime to Python 3.7, change the entry point method (e.g. to ingest_image) and also change the function name in the code editor to, e.g., ingest_image:

You are working in the file "main.py". This file holds the logic we want to deploy to our cloud function. Let's transfer the logic of the Jupyter Notebook to our method "ingest_image":

from google.cloud import storage
import requests
from datetime import datetime

def ingest_image(request):
    """Pull an image from the webcam and store it in GCS """

    # pull the image
    url = 'https://www.kite-connection.at/weatherstation/webcam/rohrspitz.jpg'
    filename = f"webcam_fromfunction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"  # add a timestamp
    response = requests.get(url)

    # store it in GCS
    storage_client = storage.Client()
    bucket = storage_client.get_bucket("pk-gcs")  # replace the bucket name with yours
    blob = bucket.blob(filename)
    blob.upload_from_string(response.content, content_type='image/png')

    return 'Success'

Next, we'll need to specify that our cloud function requires some Python packages. Click on "requirements.txt" and add this code at the end of the file:

google-cloud-storage
requests
datetime

Hit "Deploy" (either from requirements.txt or main.py):

Wait 1-2 minutes until the function is deployed and then click on its name:

Go to the "Testing" section and hit "Test the function":

The output should be "Ok":

You can take a look at the log output as well, e.g. to check the function's performance:

You should now see the output in your bucket:

Results

  1. You transferred the sample logic from the notebook to a cloud function (main.py).
  2. You specified requirements on packages of your function (requirements.txt).
  3. You deployed the function and tested it.
  4. The ingestion logic (i.e. the cloud function) is now ready to be scheduled regularly.

Triggering the cloud function manually

Within your cloud function's GCP page, you'll find the http-Trigger for this function:

WhenWhen

When clicking on the URL, you'll get a simple "OK" message:

This should have triggered another run of your cloud function.

Scheduling cloud function calls

Please navigate to the cloud scheduler (under "Tools") and create a job:

The job could be called "scheduled_webcam_ingest" and let's set the frequency to once per minute (* * * * *). In "Timezone" you can search for "Germany". Set the target to "HTTP" and paste your function's URL. All other settings can stay the default.

You can immediately run the job to test it:

You should see the result in your GCS bucket. Now, every minute a further image should be added to the bucket:

Results

  1. You now scheduled your cloud function regularly.
  2. You set up a whole ingestion pipeline using cloud functions and cloud storage as a data lake. This setup is now an "operation-ready" architecture and could be used in productive environments.

Please make sure to delete your scheduled job in the cloud scheduler:

Next, you may want to delete the files in the bucket:

Next, you should delete the cloud function:

You may want to rename and download the JupyterLab Notebook we created (however, it is also available via GitHub):

You should also delete or at least shut down your notebook (under "AI Platform"):

Results

  1. You finished the lab and performed all necessary clean-up tasks.

Congratulations, you set up a state-of-the-art cloud ingestion pipeline using cloud functions and scheduled its execution.