Create a new Cloud Push Worker in Google Cloud
Read the tutorial on creating a Cloud Push Worker in Google Cloud in the Prefect Docs Docs. Ensure that you install Docker as well.
Create a virtual environment for the project
python3.10 -m venv vm_dlt_cloud_push cd vm_dlt_cloud_push source bin/activate
Install Prefect inside the virtual environment
pip install -U prefect
Set Up the push worker with the name
dlt-cloud-push
First, you will be asked about your GCP Project that should be used.
Second, you will be asked if you want to customize the resources. In this tutorial I will use the default resource names.
prefect work-pool create --type cloud-run:push --provision-infra dlt-cloud-push
Install dlt inside the virtual environment
pip install -U dlt pip install dlt[duckdb]
Follow the installation guide for DuckDB
Go to the root of your project folder and install the dlt pipeline for the project. I will use the chess pipeline with DuckDB as the destination. You can find a more detailed tutorial about the pipeline on the dlt Github page.
dlt init chess duckdb
Add the following code to the
chess_pipeline.py
. It uses the prefectflow
attribute to convert the pipeline run into a Prefect flow object. The main method runs the chess pipeline and deploys it to Prefect push worker defined in step 4.import dlt from prefect import flow from chess import source @flow def run_chess_pipeline(): # create dlt pipeline pipeline = dlt.pipeline( pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data", ) # get data for a few famous players data = source( players=["MagnusCarlsen", "Hikaru", "GarryKasparov", "Vachier-Lagrave", "FabianoCaruana"], start_month="2022/11", end_month="2022/11" ) load_info = pipeline.run(data) print(load_info) if __name__ == "__main__": run_chess_pipeline.deploy( name="chess_pipeline", work_pool_name="dlt-cloud-push", image="<region>-docker.pkg.dev/<project>/<repository-name>/dlt-cloud-push-image:latest" )
Update the
requirements.txt
file by running the following commandpip freeze > requirements.txt
Run the pipeline. The code should be automatically pushed to the GCP Artifact Registry. Prefect should have added a Docker container with the source code in GCP.
If you encounter a Docker error indicating that the
requests
library is not the newest version, try installingrequests==2.31.0
.python chess_pipeline.py
Run the Prefect deployment pipeline with the following command. You should see a similar command on your terminal
prefect deployment run 'run-chess-pipeline/chess_pipeline'
Login to your Prefect Cloud account and check of the pipeline was running and the final state of the pipeline.
Note: In case you encounter a Docker error in step 10, ensure that you are not using the latest version of the
requests
library (e.g., version 2.32.2). Use an earlier version instead. Below is the code to uninstall/installrequests
and recreate therequirements.txt
file:pip uninstall requests pip install requests==2.31.0 pip freeze > requirements.txt python chess_pipeline.py