Data Module

The data module provides pipeline and transformation utilities for data processing.

Pipeline Classes

class xflow.data.BasePipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]

Bases: ABC

Base class for data pipelines in scientific machine learning.

Provides a simple interface for data sources with preprocessing pipelines, yielding preprocessed items for ML training.

Parameters:
  • data_provider – DataProvider instance that yields raw data items.

  • transforms – List of functions (Transform-wrapped or named) applied sequentially.

  • logger – Optional logger for debugging and error tracking.

  • skip_errors – Whether to skip items that fail preprocessing vs. raise errors.

Example

>>> # Using Transform wrapper for clear metadata
>>> transforms = [
...     Transform(lambda path: np.loadtxt(path, delimiter=","), "load_csv"),
...     Transform(lambda data: (data[:-1], data[-1]), "split_features_target"),
...     Transform(lambda x: (normalize(x[0]), x[1]), "normalize_features")
... ]
>>>
>>> files = ListProvider(["data1.csv", "data2.csv"])
>>> pipeline = MyPipeline(files, transforms)
>>>
>>> # Clear, meaningful metadata
>>> print(pipeline.get_metadata())
>>> # {"pipeline_type": "MyPipeline", "dataset_size": 2,
>>> #  "preprocessing_functions": ["load_csv", "split_features_target", "normalize_features"]}
__init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None[source]
sample(n: int = 5) List[Any][source]

Return up to n preprocessed items for inspection.

shuffle(buffer_size: int) BasePipeline[source]

Return a new pipeline that shuffles items with a reservoir buffer.

batch(batch_size: int) BasePipeline[source]

Return a new pipeline that batches items into lists.

prefetch() BasePipeline[source]

Return a new pipeline that prefetches items in background.

reset_error_count() None[source]

Reset the error count to zero.

abstractmethod to_framework_dataset() Any[source]

Convert pipeline to framework-native dataset.

to_numpy()[source]

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

class xflow.data.InMemoryPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]

Bases: BasePipeline

In-memory pipeline that processes all data upfront.

__init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None[source]
to_framework_dataset(framework: str = 'tensorflow', dataset_ops: List[Dict] = None) Any[source]

Convert to framework-native dataset using already processed data.

batch(batch_size: int) BasePipeline

Return a new pipeline that batches items into lists.

prefetch() BasePipeline

Return a new pipeline that prefetches items in background.

reset_error_count() None

Reset the error count to zero.

sample(n: int = 5) List[Any]

Return up to n preprocessed items for inspection.

shuffle(buffer_size: int) BasePipeline

Return a new pipeline that shuffles items with a reservoir buffer.

to_numpy()

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

class xflow.data.DataPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]

Bases: BasePipeline

Simple pipeline that processes data lazily without storing in memory.

to_framework_dataset() Any[source]

Not supported for lazy processing.

__init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None
batch(batch_size: int) BasePipeline

Return a new pipeline that batches items into lists.

prefetch() BasePipeline

Return a new pipeline that prefetches items in background.

reset_error_count() None

Reset the error count to zero.

sample(n: int = 5) List[Any]

Return up to n preprocessed items for inspection.

shuffle(buffer_size: int) BasePipeline

Return a new pipeline that shuffles items with a reservoir buffer.

to_numpy()

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

class xflow.data.TensorFlowPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]

Bases: BasePipeline

Pipeline that uses TensorFlow-native transforms without preprocessing.

to_framework_dataset(framework: str = 'tensorflow', dataset_ops: List[Dict] = None)[source]

Convert to TensorFlow dataset.

__init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None
batch(batch_size: int) BasePipeline

Return a new pipeline that batches items into lists.

prefetch() BasePipeline

Return a new pipeline that prefetches items in background.

reset_error_count() None

Reset the error count to zero.

sample(n: int = 5) List[Any]

Return up to n preprocessed items for inspection.

shuffle(buffer_size: int) BasePipeline

Return a new pipeline that shuffles items with a reservoir buffer.

to_numpy()

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

Transform Classes

class xflow.data.ShufflePipeline(base: BasePipeline, buffer_size: int)[source]

Bases: BasePipeline

Memory-efficient shuffle using reservoir sampling.

__init__(base: BasePipeline, buffer_size: int) None[source]
sample(n: int = 5) List[Any][source]

Return up to n preprocessed items for inspection.

reset_error_count() None[source]

Reset the error count to zero.

to_framework_dataset() Any[source]

Convert pipeline to framework-native dataset.

batch(batch_size: int) BasePipeline

Return a new pipeline that batches items into lists.

prefetch() BasePipeline

Return a new pipeline that prefetches items in background.

shuffle(buffer_size: int) BasePipeline

Return a new pipeline that shuffles items with a reservoir buffer.

to_numpy()

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

class xflow.data.BatchPipeline(base: BasePipeline, batch_size: int)[source]

Bases: BasePipeline

Groups items into fixed-size batches.

__init__(base: BasePipeline, batch_size: int) None[source]
sample(n: int = 5) List[Any][source]

Return up to n preprocessed items for inspection.

reset_error_count() None[source]

Reset the error count to zero.

unbatch() BasePipeline[source]

Return the underlying pipeline yielding individual items (no batch dimension).

batch(batch_size: int) BatchPipeline[source]

Return a new BatchPipeline with the specified batch size.

to_framework_dataset() Any[source]

Convert pipeline to framework-native dataset.

prefetch() BasePipeline

Return a new pipeline that prefetches items in background.

shuffle(buffer_size: int) BasePipeline

Return a new pipeline that shuffles items with a reservoir buffer.

to_numpy()

Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.

Provider Classes

class xflow.data.FileProvider(root_paths: str | PathLike[str] | List[str | PathLike[str]], extensions: str | List[str] | None = None, path_type: Literal['string', 'str', 'path', 'Path'] = 'path')[source]

Bases: DataProvider

Data provider that scans directories for files with specified extensions.

__init__(root_paths: str | PathLike[str] | List[str | PathLike[str]], extensions: str | List[str] | None = None, path_type: Literal['string', 'str', 'path', 'Path'] = 'path')[source]
Parameters:
  • root_paths – Single path (string or Path) or list of paths

  • extensions – Single extension string or list of extensions (e.g., ‘.jpg’ or [‘.jpg’, ‘.png’]). If None, returns all files.

  • path_type – Whether to return paths as “string” or “path” objects. Use “string” for TensorFlow compatibility, “path” for rich Path API.

split(ratio: float = 0.8, seed: int = 42) Tuple[FileProvider, FileProvider][source]

Split files into two providers.

Parameters:
  • ratio – Portion for first provider (0.0 to 1.0)

  • seed – Random seed for reproducible splits

Returns:

Tuple of (provider_1, provider_2)

subsample(n_samples: int | None = None, fraction: float | None = None, seed: int = None, strategy: str = 'random') FileProvider[source]

Create a subsampled version of this provider.

Parameters:
  • n_samples – Exact number of samples to take

  • fraction – Fraction of total samples (0.0 to 1.0)

  • seed – Random seed for reproducible subsampling

  • strategy – “random”, “first”, “last”, “stride”, or “reservoir”.

Returns:

New provider with subsampled data

merge(other: FileProvider) FileProvider[source]

Merge with another FileProvider.

Parameters:

other – Another FileProvider to merge with

Returns:

New FileProvider containing files from both providers

Raises:

TypeError – If other is not a FileProvider

class xflow.data.SqlProvider(sources: List[Dict[str, Any]] | Dict[str, Any] | None = None, output_config: Dict[str, Any] | None = None)[source]

Bases: DataProvider

Data provider that unifies data from SQL database sources into a DataFrame.

__init__(sources: List[Dict[str, Any]] | Dict[str, Any] | None = None, output_config: Dict[str, Any] | None = None)[source]
Parameters:
  • sources – Source configuration(s). Can be: - List of dicts: [{“connection”: “path.db”, “sql”: “SELECT …”}] - Single dict: {“connection”: “path.db”, “sql”: “SELECT …” - None: Creates empty provider

  • output_config – Optional dict controlling output behavior: - {“list”: “<column_name>”} returns that column as a Python list - {} or None returns the full unified DataFrame

set_output_config(config: Dict[str, Any]) None[source]

Set the output configuration.

subsample(n_samples: int | None = None, fraction: float | None = None, seed: int = None, strategy: str = 'random') SqlProvider[source]

Create subsampled provider.

split(ratio: float = None, seed: int = 42, filters: List[str] = None) Tuple[SqlProvider, SqlProvider] | List[SqlProvider][source]

Split unified DataFrame into multiple providers.

Parameters:
  • ratio – For ratio-based split (0.0 to 1.0). If provided, returns (provider_1, provider_2).

  • seed – Random seed for ratio-based splits

  • filters – List of pandas query strings for filter-based split. Returns list of providers. Example: [“age > 30”, “category == ‘A’”, “score >= 0.8”]

Returns:

Tuple of (provider_1, provider_2) for ratio split List of providers for filter split

merge(other: SqlProvider) SqlProvider[source]

Merge with another SqlProvider.

Parameters:

other – Another SqlProvider to merge with

Returns:

New SqlProvider containing combined DataFrame from both providers

Raises:

TypeError – If other is not a SqlProvider

classmethod get_supported_db_types() List[str][source]

Get list of supported database types.

Functions

xflow.data.build_transforms_from_config(config: List[Dict[str, Any]], name_key: str = 'name', params_key: str = 'params') List[Callable][source]

Build transform pipeline from configuration.