Connector System

The connector system provides a pluggable interface for communicating with external services. There are two connector families:

  • Agent connectors handle request formatting, HTTP transport, response parsing, and error handling for agent frameworks (Langflow, LangGraph, etc.). Used by AgentBlock and AgentResponseExtractorBlock.
  • Code interpreter connectors execute Python code in sandboxed environments and return structured results. Used by PythonInterpreterBlock.

Architecture Overview

The connector system branches into two families under BaseConnector:

ConnectorRegistry                    # Discovery and lookup by name
    |
BaseConnector                        # Abstract base with execute() / aexecute()
    |
    +-- BaseAgentConnector           # Agent-specific: build_request(), parse_response(), send()
    |       |
    |       +-- LangflowConnector    # Framework-specific implementation
    |       +-- LangGraphConnector
    |
    +-- BaseCodeInterpreterConnector # Code execution: execute_code() / aexecute_code()
            |
            +-- MontyConnector       # Rust-based sandbox (pydantic-monty)

All classes are importable from the top-level connectors package:

from sdg_hub.core.connectors import (
    BaseConnector,
    ConnectorConfig,
    BaseAgentConnector,
    LangflowConnector,
    LangGraphConnector,
    ConnectorRegistry,
    HttpClient,
    ConnectorError,
    ConnectorHTTPError,
)

# Code interpreter connectors (requires the [code] extra)
from sdg_hub.core.connectors.code_interpreter import (
    BaseCodeInterpreterConnector,
    CodeExecutionResult,
    MontyConnector,
)

ConnectorConfig

ConnectorConfig is a Pydantic BaseModel that holds connection parameters shared by all connectors. It is defined in src/sdg_hub/core/connectors/base.py.

from sdg_hub.core.connectors import ConnectorConfig

config = ConnectorConfig(
    url="http://localhost:7860/api/v1/run/my-flow",
    api_key="your-api-key",
    timeout=60.0,
    max_retries=5,
)

Fields

ParameterTypeDefaultDescription
urlOptional[str]NoneBase URL for the external service.
api_keyOptional[str]NoneAPI key for authentication.
timeoutfloat120.0Request timeout in seconds. Must be greater than 0.
max_retriesint3Maximum number of retry attempts. Must be >= 0.

ConnectorConfig uses model_config = ConfigDict(extra="allow"), so additional framework-specific fields can be passed without raising validation errors.


BaseConnector

BaseConnector is the abstract base class for all connectors. It is defined in src/sdg_hub/core/connectors/base.py.

It inherits from both pydantic.BaseModel and abc.ABC.

Attributes

AttributeTypeDescription
configConnectorConfigRequired. The connector configuration.

Abstract Methods

@abstractmethod
def execute(self, request: Any) -> Any:
    """Execute a synchronous request.

    Parameters
    ----------
    request : Any
        The request to execute (format depends on connector type).

    Returns
    -------
    Any
        The response from the external service.
    """

Concrete Methods

async def aexecute(self, request: Any) -> Any:
    """Execute an asynchronous request.

    Default implementation wraps sync execute in a thread via asyncio.to_thread().
    Subclasses should override for true async support.

    Parameters
    ----------
    request : Any
        The request to execute.

    Returns
    -------
    Any
        The response from the external service.
    """

ConnectorRegistry

ConnectorRegistry is a class-level registry for connector classes. It is defined in src/sdg_hub/core/connectors/registry.py.

All methods are @classmethod.

API

register(name: str) -- Decorator

Registers a connector class under the given name. The decorated class must inherit from BaseConnector.

from sdg_hub.core.connectors import ConnectorRegistry, BaseConnector

@ConnectorRegistry.register("my_connector")
class MyConnector(BaseConnector):
    def execute(self, request):
        return {"result": request.get("input")}

Raises ConnectorError if:

  • The argument is not a class.
  • The class does not inherit from BaseConnector.

get(name: str) -> type

Returns the connector class registered under name.

connector_class = ConnectorRegistry.get("langflow")
config = ConnectorConfig(url="http://localhost:7860/api/v1/run/flow")
connector = connector_class(config=config)

Raises ConnectorError if the name is not found. The error message includes a list of available connectors.

list_all() -> list[str]

Returns a sorted list of all registered connector names.

available = ConnectorRegistry.list_all()
# ['langflow', 'langgraph']

clear() -> None

Clears all registered connectors. Intended for testing only.

ConnectorRegistry.clear()

BaseAgentConnector

BaseAgentConnector extends BaseConnector with an agent-specific interface for communicating with agent frameworks. It is defined in src/sdg_hub/core/connectors/agent/base.py.

It uses an async-first pattern: the core logic lives in _send_async(), and the synchronous send() method delegates to it.

Abstract Methods

Subclasses must implement these two methods:

@abstractmethod
def build_request(
    self,
    messages: list[dict[str, Any]],
    session_id: str,
) -> dict[str, Any]:
    """Build framework-specific request payload.

    Parameters
    ----------
    messages : list[dict]
        List of messages in standard format:
        [{"role": "user", "content": "Hello"}, ...]
    session_id : str
        Session identifier for conversation tracking.

    Returns
    -------
    dict
        Framework-specific request payload.
    """
@abstractmethod
def parse_response(self, response: dict[str, Any]) -> dict[str, Any]:
    """Parse and validate framework response.

    Parameters
    ----------
    response : dict
        Raw response from the framework.

    Returns
    -------
    dict
        Validated response dict.

    Raises
    ------
    ConnectorError
        If the response is invalid or cannot be parsed.
    """

Concrete Methods

send(messages, session_id, async_mode=False)

Send messages to the agent. This is the primary entry point.

def send(
    self,
    messages: list[dict[str, Any]],
    session_id: str,
    async_mode: bool = False,
) -> dict | Coroutine:
    """Send messages to the agent.

    Parameters
    ----------
    messages : list[dict]
        Messages to send, in format:
        [{"role": "user", "content": "Hello"}, ...]
    session_id : str
        Session identifier for conversation tracking.
    async_mode : bool, optional
        If True, returns a coroutine. If False (default), runs synchronously.

    Returns
    -------
    dict or Coroutine[dict]
        Response dict, or coroutine if async_mode=True.
    """

When async_mode=False (default), this method handles event loop detection automatically:

  • If already in an async context, it uses a ThreadPoolExecutor to avoid blocking.
  • If no event loop is running, it creates one with asyncio.run().

asend(messages, session_id)

Async convenience wrapper that directly awaits the internal _send_async().

async def asend(
    self,
    messages: list[dict[str, Any]],
    session_id: str,
) -> dict[str, Any]:
    """Async send - convenience wrapper.

    Parameters
    ----------
    messages : list[dict]
        Messages to send.
    session_id : str
        Session identifier.

    Returns
    -------
    dict
        Response from the agent.
    """

execute(request)

Implements the BaseConnector.execute() interface by delegating to send().

def execute(self, request: dict[str, Any]) -> dict[str, Any]:
    """Execute a request (BaseConnector interface).

    Parameters
    ----------
    request : dict
        Request containing 'messages' and 'session_id' keys.

    Returns
    -------
    dict
        Response from the agent.
    """

Response Field Extraction Class Methods

These class methods allow extracting fields from framework-specific responses without instantiating a connector. They are used by AgentResponseExtractorBlock.

The base class returns None for all extractors. Subclasses override to provide framework-specific parsing.

@classmethod
def extract_text(cls, response: dict[str, Any]) -> str | None:
    """Extract text content from a framework response."""

@classmethod
def extract_session_id(cls, response: dict[str, Any]) -> str | None:
    """Extract session ID from a framework response."""

@classmethod
def extract_tool_trace(cls, response: dict[str, Any]) -> list[dict[str, Any]] | None:
    """Extract tool call trace from a framework response."""

Internal Methods

_build_headers() -> dict[str, str]

Builds HTTP headers. The base implementation sets Content-Type: application/json and adds Authorization: Bearer {api_key} if an API key is configured. Subclasses override this for framework-specific authentication headers.

_get_http_client() -> HttpClient

Lazily creates and caches an HttpClient instance using config.timeout and config.max_retries.


Built-in Connectors

LangflowConnector

Registered name: "langflow"

Source: src/sdg_hub/core/connectors/agent/langflow.py

Connector for Langflow, a visual framework for building LLM-powered applications.

How It Works

Langflow expects a single string input (not a message array). The connector extracts the last user message from the standard message list and sends it as input_value.

Request format (sent to Langflow API):

{
    "output_type": "chat",
    "input_type": "chat",
    "input_value": "the last user message content",
    "session_id": "session-123"
}

Authentication: Uses x-api-key header (not Authorization: Bearer). The _build_headers() method is overridden to set this.

Response parsing: Returns the raw response dict unchanged after validating it is a dict.

Configuration Example

from sdg_hub.core.connectors import ConnectorConfig, LangflowConnector

config = ConnectorConfig(
    url="http://localhost:7860/api/v1/run/my-flow",
    api_key="your-api-key",
)
connector = LangflowConnector(config=config)
response = connector.send(
    messages=[{"role": "user", "content": "Hello!"}],
    session_id="session-123",
)

Response Field Extraction

The LangflowConnector overrides all three extraction class methods:

  • extract_text(response): Navigates outputs[0].outputs[0].results.message.text. Returns "" if the text field is explicitly None.

  • extract_session_id(response): Reads response["session_id"]. Returns "" if explicitly None.

  • extract_tool_trace(response): Looks for content blocks at two paths:

    • outputs[0].outputs[0].results.message.data.content_blocks
    • outputs[0].outputs[0].results.message.content_blocks

    Returns the contents list from the first matching content block (the "Agent Steps" block with structured tool_use entries).


LangGraphConnector

Registered name: "langgraph"

Source: src/sdg_hub/core/connectors/agent/langgraph.py

Connector for LangGraph, a framework for building stateful, multi-actor applications with LLMs.

How It Works

LangGraphConnector uses a two-step thread-based flow by overriding _send_async():

  1. Create a thread: POST {base_url}/threads with session_id in metadata.
  2. Run the agent: POST {base_url}/threads/{thread_id}/runs/wait with the message payload.

Each call creates a new LangGraph thread. The session_id is stored as thread metadata for traceability but does not cause thread reuse.

Run request format (sent to /threads/{thread_id}/runs/wait):

{
    "assistant_id": "agent",
    "input": {
        "messages": [
            {"role": "user", "content": "Hello!"}
        ]
    },
    "config": {}
}

The config key is only included when run_config is non-empty.

Authentication: Uses x-api-key header, same as Langflow.

Response parsing: Validates the response is a non-empty dict. Logs a warning if no messages key is present.

Additional Fields

LangGraphConnector defines two extra Pydantic fields beyond those inherited from BaseAgentConnector:

ParameterTypeDefaultDescription
assistant_idstr"agent"The assistant ID or graph name to run. Must be non-empty.
run_configdict[str, Any]{}Optional configuration dict passed as the config key in the run payload. Use for runtime parameters via configurable, e.g., {"configurable": {"model": "gpt-4o"}}.

Configuration Example

from sdg_hub.core.connectors import ConnectorConfig, LangGraphConnector

config = ConnectorConfig(
    url="http://localhost:2024",
    api_key="your-api-key",
)
connector = LangGraphConnector(
    config=config,
    assistant_id="my-agent",
    run_config={"configurable": {"model": "gpt-4o"}},
)
response = connector.send(
    messages=[{"role": "user", "content": "Hello!"}],
    session_id="session-123",
)

Response Field Extraction

  • extract_text(response): Finds the last message with type or role equal to "ai" or "assistant" in response["messages"] and returns its content. Returns "" if content is explicitly None.
  • extract_session_id(response): Always returns None. LangGraph uses thread-based state with no top-level session ID in the run response.
  • extract_tool_trace(response): Iterates through response["messages"] and collects:
    • AI messages with tool_calls as {"type": "tool_use", "tool_calls": [...]}.
    • Tool result messages as {"type": "tool_result", "name": "...", "content": "...", "tool_call_id": "..."}.

Integration with AgentBlock

AgentBlock is the primary consumer of connectors. The agent_framework parameter selects which connector to use via ConnectorRegistry.get().

Source: src/sdg_hub/core/blocks/agent/agent_block.py

How AgentBlock Uses Connectors

from sdg_hub.core.blocks import AgentBlock

block = AgentBlock(
    block_name="qa_agent",
    agent_framework="langflow",
    agent_url="http://localhost:7860/api/v1/run/qa-flow",
    agent_api_key="your-api-key",
    input_cols={"messages": "question"},
    output_cols=["response"],
)
result_df = block.generate(df)

Internally, AgentBlock._get_connector() does the following:

  1. Calls ConnectorRegistry.get(self.agent_framework) to get the connector class.
  2. Creates a ConnectorConfig from agent_url, agent_api_key, timeout, and max_retries.
  3. Instantiates the connector class with the config and any connector_kwargs.
  4. Caches the connector instance, invalidating on config changes.

The connector_kwargs field allows passing framework-specific parameters to the connector constructor. For example, to set assistant_id for LangGraph:

block = AgentBlock(
    block_name="graph_agent",
    agent_framework="langgraph",
    agent_url="http://localhost:2024",
    input_cols={"messages": "question"},
    output_cols=["response"],
    connector_kwargs={"assistant_id": "my-agent"},
)

YAML Configuration

- block_type: AgentBlock
  block_config:
    block_name: my_agent
    agent_framework: langflow
    agent_url: http://localhost:7860/api/v1/run/my-flow
    agent_api_key: ${LANGFLOW_API_KEY}
    input_cols:
      messages: messages_col
    output_cols:
      - agent_response

For LangGraph with extra connector parameters:

- block_type: AgentBlock
  block_config:
    block_name: graph_agent
    agent_framework: langgraph
    agent_url: http://localhost:2024
    agent_api_key: ${LANGGRAPH_API_KEY}
    connector_kwargs:
      assistant_id: my-agent
      run_config:
        configurable:
          model: gpt-4o
    input_cols:
      messages: question
    output_cols:
      - agent_response

Runtime Configuration with flow.set_agent_config()

When using flows, agent blocks can be configured at runtime using flow.set_agent_config(). This supports credential-free YAML definitions where URLs and API keys are injected at runtime rather than hardcoded.

Source: src/sdg_hub/core/flow/agent_config.py

def set_agent_config(
    flow: "Flow",
    agent_framework: Optional[str] = None,
    agent_url: Optional[str] = None,
    agent_api_key: Optional[str] = None,
    blocks: Optional[list[str]] = None,
    **kwargs: Any,
) -> None:

Parameters

ParameterTypeDefaultDescription
agent_frameworkOptional[str]NoneConnector name (e.g., "langflow").
agent_urlOptional[str]NoneAgent API endpoint URL.
agent_api_keyOptional[str]NoneAgent API key.
blocksOptional[list[str]]NoneSpecific block names to target. If None, auto-detects all agent blocks.
**kwargsAny--Additional parameters (e.g., timeout, max_retries).

Usage

from sdg_hub import Flow

flow = Flow.from_yaml("path/to/flow.yaml")

# Configure all agent blocks at once
flow.set_agent_config(
    agent_framework="langflow",
    agent_url="http://localhost:7860/api/v1/run/my-flow",
    agent_api_key="your_key",
)

# Or target specific blocks
flow.set_agent_config(
    agent_url="http://other-endpoint:7860/api/v1/run/other-flow",
    blocks=["my_agent_block"],
)

result = flow.generate(dataset)

The method auto-detects blocks with block_type == "agent" when no blocks list is provided. It sets attributes directly on the block instances, and the cached connector in AgentBlock._get_connector() is invalidated on the next call because the config key changes.

These are defined in src/sdg_hub/core/flow/agent_config.py:

FunctionSignatureDescription
detect_agent_blocks(flow: Flow) -> list[str]Returns block names where block_type == "agent".
is_agent_config_required(flow: Flow) -> boolReturns True if the flow contains any agent blocks.
is_agent_config_set(flow: Flow) -> boolReturns True if agent config has been set or is not required.
reset_agent_config(flow: Flow) -> NoneResets the agent config flag. Call set_agent_config() again before generate().

HttpClient

HttpClient provides HTTP transport with automatic retries using tenacity. It is defined in src/sdg_hub/core/connectors/http/client.py.

from sdg_hub.core.connectors import HttpClient

client = HttpClient(timeout=60.0, max_retries=3)

Constructor Parameters

ParameterTypeDefaultDescription
timeoutfloat120.0Request timeout in seconds.
max_retriesint3Maximum number of retry attempts.

Methods

async post(url, payload, headers=None) -> dict

Async POST request with exponential backoff retry on httpx.TimeoutException and httpx.ConnectError.

async def post(
    self,
    url: str,
    payload: dict[str, Any],
    headers: Optional[dict[str, str]] = None,
) -> dict[str, Any]:

post_sync(url, payload, headers=None) -> dict

Synchronous POST request with the same retry logic.

def post_sync(
    self,
    url: str,
    payload: dict[str, Any],
    headers: Optional[dict[str, str]] = None,
) -> dict[str, Any]:

Both methods raise:

  • ConnectorHTTPError for HTTP error status codes (includes the URL, status code, and first 500 characters of the response body).
  • ConnectorError for connection failures and timeouts.

Retry configuration: stop_after_attempt(max_retries + 1) with wait_exponential(multiplier=1, min=1, max=60).


Exceptions

Connector exceptions are defined in src/sdg_hub/core/connectors/exceptions.py. Both inherit from SDGHubError (the project-wide base exception).

ConnectorError

Base exception for all connector-related errors: configuration errors, connection failures, timeouts, and response parsing errors.

from sdg_hub.core.connectors import ConnectorError

try:
    connector = ConnectorRegistry.get("nonexistent")
except ConnectorError as e:
    print(e)  # "Connector 'nonexistent' not found. Available: langflow, langgraph"

ConnectorHTTPError

Raised when an HTTP request returns an error status code. Subclass of ConnectorError.

from sdg_hub.core.connectors import ConnectorHTTPError

class ConnectorHTTPError(ConnectorError):
    def __init__(self, url: str, status_code: int, message: Optional[str] = None):
        self.url = url
        self.status_code = status_code

The error message format is HTTP {status_code} error from '{url}': {message}.


Creating Custom Connectors

To add a new agent framework connector:

  1. Create a new file in src/sdg_hub/core/connectors/agent/.
  2. Inherit from BaseAgentConnector.
  3. Implement build_request() and parse_response().
  4. Register with @ConnectorRegistry.register("name").
  5. Optionally override _build_headers() for custom authentication.
  6. Optionally override the extract_* class methods for response field extraction.

Complete Example

# src/sdg_hub/core/connectors/agent/my_framework.py

from typing import Any

from sdg_hub.core.connectors import BaseAgentConnector, ConnectorError, ConnectorRegistry


@ConnectorRegistry.register("my_framework")
class MyFrameworkConnector(BaseAgentConnector):
    """Connector for MyFramework agent API."""

    def _build_headers(self) -> dict[str, str]:
        """MyFramework uses Bearer token authentication."""
        headers = {"Content-Type": "application/json"}
        if self.config.api_key:
            headers["Authorization"] = f"Bearer {self.config.api_key}"
        return headers

    def build_request(
        self,
        messages: list[dict[str, Any]],
        session_id: str,
    ) -> dict[str, Any]:
        """Convert standard messages to MyFramework format."""
        return {
            "conversation_id": session_id,
            "messages": [
                {"sender": msg["role"], "text": msg["content"]}
                for msg in messages
            ],
        }

    def parse_response(self, response: dict[str, Any]) -> dict[str, Any]:
        """Validate and return the response."""
        if not isinstance(response, dict):
            raise ConnectorError(
                f"Expected dict response, got {type(response).__name__}"
            )
        return response

    @classmethod
    def extract_text(cls, response: dict[str, Any]) -> str | None:
        """Extract text from MyFramework response."""
        return response.get("reply", {}).get("text")

    @classmethod
    def extract_session_id(cls, response: dict[str, Any]) -> str | None:
        """Extract session ID from MyFramework response."""
        return response.get("conversation_id")

After creating the file, add the import to src/sdg_hub/core/connectors/agent/__init__.py:

from .my_framework import MyFrameworkConnector

The connector is then available via the registry:

from sdg_hub.core.connectors import ConnectorRegistry

connector_class = ConnectorRegistry.get("my_framework")

And usable in AgentBlock:

from sdg_hub.core.blocks import AgentBlock

block = AgentBlock(
    block_name="my_agent",
    agent_framework="my_framework",
    agent_url="http://localhost:8080/api/chat",
    agent_api_key="your-api-key",
    input_cols={"messages": "question"},
    output_cols=["response"],
)

Code Interpreter Connectors

Code interpreter connectors execute Python code in sandboxed environments and return structured CodeExecutionResult objects. They are used by PythonInterpreterBlock to validate synthetic code datasets.

!!! note "Optional dependency" Code interpreter connectors require the code optional dependency group. Install with: uv pip install sdg-hub[code] or pip install sdg-hub[code]

Source: src/sdg_hub/core/connectors/code_interpreter/

CodeExecutionResult

CodeExecutionResult is a Pydantic BaseModel that standardizes execution output across all code interpreter backends. It is defined in src/sdg_hub/core/connectors/code_interpreter/base.py.

FieldTypeDescription
successboolWhether the code executed successfully without errors
outputOptional[str]Captured stdout/stderr output
errorOptional[str]Error message if execution failed
return_valueAnyReturn value from execution
execution_time_msOptional[float]Execution time in milliseconds
from sdg_hub.core.connectors.code_interpreter import CodeExecutionResult

result = CodeExecutionResult(
    success=True,
    output="Hello, world!\n",
    error=None,
    return_value=42,
    execution_time_ms=1.23,
)

BaseCodeInterpreterConnector

BaseCodeInterpreterConnector extends BaseConnector with a code execution interface. It is defined in src/sdg_hub/core/connectors/code_interpreter/base.py.

Abstract Methods

Subclasses must implement:

@abstractmethod
def execute_code(
    self,
    code: str,
    inputs: Optional[dict[str, Any]] = None,
    timeout: Optional[float] = None,
) -> CodeExecutionResult:
    """Execute code and return structured result.

    Parameters
    ----------
    code : str
        The Python code to execute.
    inputs : dict, optional
        Input variables to make available to the code.
    timeout : float, optional
        Maximum execution time in seconds.

    Returns
    -------
    CodeExecutionResult
        Structured result with success status, output, and errors.
    """

Concrete Methods

async def aexecute_code(
    self,
    code: str,
    inputs: Optional[dict[str, Any]] = None,
    timeout: Optional[float] = None,
) -> CodeExecutionResult:
    """Execute code asynchronously.

    Default implementation wraps execute_code() via asyncio.to_thread().
    Subclasses should override for native async support.
    """

The execute() and aexecute() methods from BaseConnector are implemented to delegate to execute_code() / aexecute_code(), accepting a dict with a code key and optional inputs and timeout keys.


MontyConnector

Registered name: "monty"

Source: src/sdg_hub/core/connectors/code_interpreter/monty.py

Connector for Monty, a secure Python interpreter from the Pydantic team, implemented in Rust. Provides sandboxed execution with configurable resource limits.

Security Model

Monty enforces strict isolation by default:

ResourceAccess
FilesystemBlocked (no file I/O)
NetworkBlocked (no network access)
Environment variablesBlocked
Standard libraryLimited subset (sys, typing, asyncio, json)
Third-party librariesNot available
External functionsNone registered (pure computation only)

How It Works

MontyConnector captures stdout separately from return values using Monty's print_callback parameter. This means print() output goes to result.output, while the final expression value goes to result.return_value.

Resource limits (execution timeout) are configured via pydantic_monty.ResourceLimits. The timeout cascades: PythonInterpreterBlock.timeout > ConnectorConfig.timeout > default (120s).

Configuration Example

from sdg_hub.core.connectors.code_interpreter import MontyConnector
from sdg_hub.core.connectors import ConnectorConfig

connector = MontyConnector(config=ConnectorConfig())

# Simple execution
result = connector.execute_code("x = 1 + 1\nprint(x)")
print(result.success)  # True
print(result.output)   # "2\n"

# With input variables
result = connector.execute_code(
    "result = x + y\nprint(result)",
    inputs={"x": 10, "y": 20},
)
print(result.output)  # "30\n"

# Async execution (native, not thread-wrapped)
import asyncio

async def main():
    result = await connector.aexecute_code("print('async!')")
    print(result.output)  # "async!\n"

asyncio.run(main())

YAML Usage (via PythonInterpreterBlock)

MontyConnector is used indirectly through PythonInterpreterBlock:

- block_type: PythonInterpreterBlock
  block_config:
    block_name: validate_code
    interpreter_framework: monty
    input_cols:
      - generated_code
    output_cols:
      - execution_result
    timeout: 10.0

Creating Custom Code Interpreter Connectors

To add a new code interpreter backend:

  1. Create a new file in src/sdg_hub/core/connectors/code_interpreter/.
  2. Inherit from BaseCodeInterpreterConnector.
  3. Implement execute_code().
  4. Optionally override aexecute_code() for native async support.
  5. Register with @ConnectorRegistry.register("name").

Complete Example

# src/sdg_hub/core/connectors/code_interpreter/my_interpreter.py

import time
from typing import Any, Optional

from sdg_hub.core.connectors.code_interpreter import (
    BaseCodeInterpreterConnector,
    CodeExecutionResult,
)
from sdg_hub.core.connectors import ConnectorConfig, ConnectorRegistry

from pydantic import Field


@ConnectorRegistry.register("my_interpreter")
class MyInterpreterConnector(BaseCodeInterpreterConnector):
    """Connector for a custom Python interpreter."""

    config: ConnectorConfig = Field(
        default_factory=lambda: ConnectorConfig(),
        description="Connector configuration",
    )

    def execute_code(
        self,
        code: str,
        inputs: Optional[dict[str, Any]] = None,
        timeout: Optional[float] = None,
    ) -> CodeExecutionResult:
        """Execute code in the custom interpreter."""
        start = time.perf_counter()
        try:
            # Delegate to a sandboxed runtime (container, subprocess, etc.).
            # Do NOT use raw exec() for untrusted code.
            output = self._run_in_sandbox(code, inputs, timeout)
            elapsed = (time.perf_counter() - start) * 1000
            return CodeExecutionResult(
                success=True,
                output=output,
                error=None,
                return_value=None,
                execution_time_ms=elapsed,
            )
        except Exception as e:
            elapsed = (time.perf_counter() - start) * 1000
            return CodeExecutionResult(
                success=False,
                output=None,
                error=str(e),
                return_value=None,
                execution_time_ms=elapsed,
            )

    def _run_in_sandbox(
        self,
        code: str,
        inputs: Optional[dict[str, Any]] = None,
        timeout: Optional[float] = None,
    ) -> str:
        """Execute code in an isolated sandbox.

        Replace this with your sandbox backend (e.g. subprocess, container,
        or remote execution service).
        """
        raise NotImplementedError("Implement your sandbox backend here")

After creating the file, add the import to src/sdg_hub/core/connectors/code_interpreter/__init__.py:

from .my_interpreter import MyInterpreterConnector

The connector is then usable in PythonInterpreterBlock:

from sdg_hub.core.blocks import PythonInterpreterBlock

block = PythonInterpreterBlock(
    block_name="validate_code",
    interpreter_framework="my_interpreter",
    input_cols=["code"],
    output_cols=["result"],
)