Quick Start

By the end of this guide, you will have loaded a built-in flow, validated it with a dry run, and generated synthetic data.

This guide walks through the standard SDG Hub workflow: discover a flow, inspect its requirements, configure a model, validate with a dry run, and generate synthetic data.

Prerequisites

Install SDG Hub:

uv pip install sdg-hub

Step 1: Discover and Load a Flow

SDG Hub ships with built-in flows. Use FlowRegistry to browse them, then load one by ID or name.

Note: discover_flows() must be called before any registry lookups. It scans registered search paths and populates the registry.

from sdg_hub import FlowRegistry, Flow
from datasets import Dataset

# Discover built-in flows (must be called before any registry lookups)
FlowRegistry.discover_flows()

# List flows programmatically
flows = FlowRegistry.list_flows()
for f in flows:
    print(f["id"], f["name"])

# Search by tag or author
qa_flows = FlowRegistry.search_flows(tag="question-generation")
author_flows = FlowRegistry.search_flows(author="SDG Hub Contributors")

# Browse by category
categories = FlowRegistry.get_flows_by_category()
for tag, flow_list in categories.items():
    print(f"{tag}: {len(flow_list)} flows")

Step 2: Load a Flow

Resolve a flow name or ID to a YAML path, then load it. This example uses the Extractive Summary Knowledge Tuning flow (epic-jade-656), which generates QA pairs from documents by first extracting key passages, then creating questions and answers grounded in those extracts.

# get_flow_path returns Optional[str] (None if not found)
flow_path = FlowRegistry.get_flow_path("epic-jade-656")

# get_flow_path_safe raises ValueError with suggestions if not found
flow_path = FlowRegistry.get_flow_path_safe("epic-jade-656")

flow = Flow.from_yaml(flow_path)
print(flow)

Both get_flow_path and get_flow_path_safe accept either a flow ID or a flow name.

Step 3: Inspect Requirements

Before running anything, check what the flow needs.

Default model and recommendations

default_model = flow.get_default_model()
# Returns: Optional[str], e.g. "openai/gpt-oss-120b"

recommendations = flow.get_model_recommendations()
# Returns: {"default": "...", "compatible": [...], "experimental": [...]}

Dataset schema and requirements

# Get an empty DataFrame with the correct columns
schema_df = flow.get_dataset_schema()
print(schema_df.columns.tolist())

# Get full requirements (DatasetRequirements or None)
requirements = flow.get_dataset_requirements()
if requirements:
    print("Required columns:", requirements.required_columns)
    print("Min samples:", requirements.min_samples)
    print("Description:", requirements.description)

Check model configuration status

# True if the flow contains LLM blocks
print(flow.is_model_config_required())

# True if set_model_config() has been called (or no LLM blocks exist)
print(flow.is_model_config_set())

Step 4: Configure the Model

Flows with LLM blocks require model configuration before execution. Calling generate() without it raises FlowValidationError.

Tip: LiteLLM auto-reads standard provider environment variables (OPENAI_API_KEY, ANTHROPIC_API_KEY, etc.). If the appropriate env var is already set, you can skip the api_key parameter:

import os
# LiteLLM reads standard env vars automatically
# If OPENAI_API_KEY is set, no api_key needed:
flow.set_model_config(model="openai/gpt-4o")
flow.set_model_config(
    model="hosted_vllm/meta-llama/Llama-3.3-70B-Instruct",
    api_base="http://localhost:8000/v1",
    api_key="your_key",
)

set_model_config auto-detects LLM blocks and applies the configuration. You can target specific blocks with the blocks parameter:

flow.set_model_config(
    model="openai/gpt-4o",
    api_key="your-openai-key",
    blocks=["gen_extractive_summary", "gen_qa_pairs"],
)

Step 5: Dry Run

Always validate your pipeline before processing the full dataset. dry_run executes the flow on a small subset and returns diagnostic results.

# Prepare the dataset.
# This flow expects knowledge-tuning columns:
#   document         -- the source document to generate QA from
#   document_outline -- structural outline of the document
#   domain           -- subject area (e.g. "Computer Science")
#   icl_document     -- in-context learning example document
#   icl_query_1..3   -- in-context learning example questions
dataset = Dataset.from_dict({
    "document": ["Python is a high-level programming language..."],
    "document_outline": ["1. Introduction; 2. Features"],
    "domain": ["Computer Science"],
    "icl_document": ["Java is an object-oriented language..."],
    "icl_query_1": ["What type of language is Java?"],
    "icl_query_2": ["Where does Java run?"],
    "icl_query_3": ["What are the benefits of Java?"],
})

result = flow.dry_run(
    dataset,
    sample_size=2,
    enable_time_estimation=True,
    max_concurrency=10,
)

print(f"Tested {result['sample_size']} samples in {result['execution_time_seconds']:.2f}s")
print(f"Output columns: {result['final_dataset']['columns']}")

Signature reference:

flow.dry_run(
    dataset,                          # pd.DataFrame or datasets.Dataset
    sample_size=2,                    # number of rows to test
    runtime_params=None,              # Optional[dict[str, dict[str, Any]]]
    max_concurrency=None,             # Optional[int]
    enable_time_estimation=False,     # runs extra dry runs for estimation
)
# Returns: dict[str, Any]

When enable_time_estimation=True, a Rich table with projected execution time is printed automatically. The estimation data is displayed but not included in the return dict.

Step 6: Generate

Once the dry run succeeds, run the full pipeline.

result = flow.generate(dataset)
print(f"Generated {len(result)} rows")
print(result.column_names)

Signature reference:

flow.generate(
    dataset,                          # pd.DataFrame or datasets.Dataset
    runtime_params=None,              # Optional[dict[str, dict[str, Any]]]
    checkpoint_dir=None,              # Optional[str] - enables checkpointing
    save_freq=None,                   # Optional[int] - checkpoint every N samples
    log_dir=None,                     # Optional[str] - write logs to files
    max_concurrency=None,             # Optional[int]
)
# Returns: same type as input (DataFrame in -> DataFrame out, Dataset in -> Dataset out)

Checkpointing

For large datasets, enable checkpointing to resume interrupted runs:

result = flow.generate(
    dataset,
    checkpoint_dir="./checkpoints",
    save_freq=50,  # save progress every 50 completed samples
)

Step 7: Runtime Parameters

Override block configuration at execution time without modifying flow YAML files. Runtime parameters are organized by block name.

result = flow.generate(
    dataset,
    runtime_params={
        "gen_qa_pairs": {"temperature": 0.9, "max_tokens": 200},
        "quality_filter": {"filter_value": 0.8, "operation": "ge"},
    },
)

The same runtime_params argument works with dry_run:

result = flow.dry_run(
    dataset,
    sample_size=2,
    runtime_params={
        "gen_qa_pairs": {"temperature": 0.5},
    },
)

Step 8: Error Handling

SDG Hub defines specific exception types for different failure modes. All error classes live in sdg_hub.core.utils.error_handling.

Common error pattern

from sdg_hub import Flow
from sdg_hub.core.utils.error_handling import (
    FlowValidationError,
    MissingColumnError,
    EmptyDatasetError,
    OutputColumnCollisionError,
)

try:
    flow = Flow.from_yaml("path/to/flow.yaml")
    flow.set_model_config(model="openai/gpt-4o", api_key="your_key")
    result = flow.generate(dataset)
except FlowValidationError as e:
    # Invalid YAML, missing model config, empty dataset, or block execution failure
    print(f"Flow error: {e}")
except MissingColumnError as e:
    # Dataset is missing columns that a block requires
    print(f"Missing columns: {e.missing_columns}")
    print(f"Available columns: {e.available_columns}")
except EmptyDatasetError as e:
    # A block (often a filter) produced zero rows
    print(f"Block '{e.block_name}' received an empty dataset")
except OutputColumnCollisionError as e:
    # A block tried to create a column that already exists
    print(f"Collision columns: {e.collision_columns}")

Error class hierarchy

All exceptions inherit from SDGHubError:

ExceptionWhen it is raised
FlowValidationErrorInvalid YAML, missing model config, empty input, block execution failure
FlowExecutionErrorFlow execution failure
FlowConfigurationErrorInvalid flow configuration
MissingColumnErrorA block's input_cols are not present in the dataset
EmptyDatasetErrorA block receives a dataset with zero rows
OutputColumnCollisionErrorA block's output_cols already exist in the dataset
BlockConfigurationErrorInvalid block configuration
BlockExecutionErrorBlock execution failure
BlockValidationErrorBase class for column/dataset validation errors
TemplateValidationErrorPrompt template references variables not in the dataset
APIConnectionErrorAPI connection failure
DataGenerationErrorData generation failure

Prevention

Use dry_run to catch most errors before processing large datasets:

from sdg_hub import Flow

flow = Flow.from_yaml("path/to/flow.yaml")
flow.set_model_config(model="openai/gpt-4o", api_key="your_key")

# Catches MissingColumnError, EmptyDatasetError, and config issues early
result = flow.dry_run(dataset, sample_size=2)

Discover Blocks

In addition to flows, you can browse available blocks.

from sdg_hub import BlockRegistry

# Print a Rich table of all blocks
BlockRegistry.discover_blocks()

# List all block names
all_blocks = BlockRegistry.list_blocks()

# List blocks grouped by category
grouped = BlockRegistry.list_blocks(grouped=True)
# Returns: {"llm": ["LLMChatBlock", ...], "transform": [...], ...}

# List blocks in a specific category
llm_blocks = BlockRegistry.list_blocks(category="llm")

# Get available categories
categories = BlockRegistry.categories()
# Returns: ["agent", "filtering", "llm", "mcp", "parsing", "transform"]

Next Steps