Efficient Shopify Data Transfer with dlthub

Efficient Shopify Data Transfer with dlthub

Goal of the Project

The goal of this project is to create a streamlined pipeline that loads data from the Shopify REST API into a Google Cloud Storage Bucket. The pipeline should be managed by Prefect, enabling automated and incremental data loads into the Google Cloud Storage Bucket.

Installing dlt in a Virtual Envirnonment

First, let's create a virtual environment to install dlt. Navigate to the VM folder and execute the following commands:

python -m venv vm_dlt_shopify 
cd vm_dlt_shopify 
source bin/activate
pip install --upgrade pip
pip install dlt

The Adapter Folder

The adapter folder contains the code for both the Shopify client and the Redis client.

  • The Redis client connects to a Redis cloud database and includes methods to save, load data, and close the connection.

  • The Shopify client implements the Shopify API, constructs the correct endpoints, and makes paginated requests to the Shopify API. Currently, the Shopify client only supports the customers and orders endpoints.

The dlthub Part

The __init__.py file contains the dlt source definition for the Shopify API. Inside the shopify_source method, two resources (endpoints) are defined: customers and orders. The code checks the Redis database to determine if a last run exists or not. The create_params method constructs the parameters for the API call. Finally, the current resource state is saved in the Redis database, and the connection is closed.

The secrets for the two clients are stored in the settings.py file. The secrets are managed in blocks within Prefect, and settings.py loads these secrets.

The pipeline

The pipeline with source and destination definitions is defined in shopify_pipeline.py.

import dlt
from shopify import shopify_source
from prefect import task, flow, get_run_logger


@task(name="incremental_load_customers")
def incremental_load_customers(logger = None, resource: str = 'customers' , start_date: str = None) -> dict:
    dlt.secrets["destination.filesystem.bucket_url"] = f"gs://deltaload-dlt"

    pipeline = dlt.pipeline(
         pipeline_name='shopify', destination='filesystem', dataset_name='shopify_raw_layer'
        )

    load_info = pipeline.run(shopify_source(start_date).with_resources(resource), loader_file_format="parquet")
    logger.info(f"Incremental Load Customers Information: {load_info}")


@task(name="incremental_load_orders")
def incremental_load_orders(logger = None, resource: str = 'orders', start_date: str = None) -> dict: 
    dlt.secrets["destination.filesystem.bucket_url"] = f"gs://deltaload-dlt"

    pipeline = dlt.pipeline(
        pipeline_name='shopify', destination='filesystem', dataset_name='shopify_raw_layer' 
    )
    load_info = pipeline.run(shopify_source(start_date).with_resources(resource), loader_file_format="parquet")
    logger.info(f"Incremental Load Orders Information: {load_info}")


@flow(name="shopify_pipeline")
def shopify_pipeline(): 
    logger = get_run_logger()
    start_date = '2023-01-01'
    logger.info(f"Starting Shopify Pipeline with Inital Start Date: {start_date}")

    logger.info("Starting Task: incremental_load_customers_filesystem")
    incremental_load_customers(logger = logger, resource='customers', start_date=start_date)


    logger.info("Starting Task: incremental_load_orders_filesystem")    
    incremental_load_orders(logger = logger, resource='orders', start_date=start_date)


if __name__ == "__main__":    
    shopify_pipeline.deploy(
        name="tutorial_number_3", 
        work_pool_name="cloud-run-push-test2", 
        image="us-central1-docker.pkg.dev/deltaload/tutorials/tutorial3:latest"
    )

The methods incremental_load_orders(...) and incremental_load_customers(...) are both wrapped as Prefect task objects. The shopify_pipeline() method is decorated with a Prefect flow object and triggers the two loading tasks. The main method contains deployment information for Prefect cloud push deployment. Ensure that a cloud push worker with the specified name is generated. If you are unsure how to create one, refer to this article.

The complete project can be found on Github.