Data Module¶
The data module provides pipeline and transformation utilities for data processing.
Pipeline Classes¶
- class xflow.data.BasePipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]¶
Bases:
ABCBase class for data pipelines in scientific machine learning.
Provides a simple interface for data sources with preprocessing pipelines, yielding preprocessed items for ML training.
- Parameters:
data_provider – DataProvider instance that yields raw data items.
transforms – List of functions (Transform-wrapped or named) applied sequentially.
logger – Optional logger for debugging and error tracking.
skip_errors – Whether to skip items that fail preprocessing vs. raise errors.
Example
>>> # Using Transform wrapper for clear metadata >>> transforms = [ ... Transform(lambda path: np.loadtxt(path, delimiter=","), "load_csv"), ... Transform(lambda data: (data[:-1], data[-1]), "split_features_target"), ... Transform(lambda x: (normalize(x[0]), x[1]), "normalize_features") ... ] >>> >>> files = ListProvider(["data1.csv", "data2.csv"]) >>> pipeline = MyPipeline(files, transforms) >>> >>> # Clear, meaningful metadata >>> print(pipeline.get_metadata()) >>> # {"pipeline_type": "MyPipeline", "dataset_size": 2, >>> # "preprocessing_functions": ["load_csv", "split_features_target", "normalize_features"]}
- __init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None[source]¶
- shuffle(buffer_size: int) BasePipeline[source]¶
Return a new pipeline that shuffles items with a reservoir buffer.
- batch(batch_size: int) BasePipeline[source]¶
Return a new pipeline that batches items into lists.
- prefetch() BasePipeline[source]¶
Return a new pipeline that prefetches items in background.
- class xflow.data.InMemoryPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]¶
Bases:
BasePipelineIn-memory pipeline that processes all data upfront.
- __init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None[source]¶
- to_framework_dataset(framework: str = 'tensorflow', dataset_ops: List[Dict] = None) Any[source]¶
Convert to framework-native dataset using already processed data.
- batch(batch_size: int) BasePipeline¶
Return a new pipeline that batches items into lists.
- prefetch() BasePipeline¶
Return a new pipeline that prefetches items in background.
- reset_error_count() None¶
Reset the error count to zero.
- sample(n: int = 5) List[Any]¶
Return up to n preprocessed items for inspection.
- shuffle(buffer_size: int) BasePipeline¶
Return a new pipeline that shuffles items with a reservoir buffer.
- to_numpy()¶
Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.
- class xflow.data.DataPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]¶
Bases:
BasePipelineSimple pipeline that processes data lazily without storing in memory.
- __init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None¶
- batch(batch_size: int) BasePipeline¶
Return a new pipeline that batches items into lists.
- prefetch() BasePipeline¶
Return a new pipeline that prefetches items in background.
- reset_error_count() None¶
Reset the error count to zero.
- sample(n: int = 5) List[Any]¶
Return up to n preprocessed items for inspection.
- shuffle(buffer_size: int) BasePipeline¶
Return a new pipeline that shuffles items with a reservoir buffer.
- to_numpy()¶
Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.
- class xflow.data.TensorFlowPipeline(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True)[source]¶
Bases:
BasePipelinePipeline that uses TensorFlow-native transforms without preprocessing.
- to_framework_dataset(framework: str = 'tensorflow', dataset_ops: List[Dict] = None)[source]¶
Convert to TensorFlow dataset.
- __init__(data_provider: DataProvider, transforms: List[Callable[[Any], Any] | Transform] | None = None, *, logger: Logger | None = None, skip_errors: bool = True) None¶
- batch(batch_size: int) BasePipeline¶
Return a new pipeline that batches items into lists.
- prefetch() BasePipeline¶
Return a new pipeline that prefetches items in background.
- reset_error_count() None¶
Reset the error count to zero.
- sample(n: int = 5) List[Any]¶
Return up to n preprocessed items for inspection.
- shuffle(buffer_size: int) BasePipeline¶
Return a new pipeline that shuffles items with a reservoir buffer.
- to_numpy()¶
Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.
Transform Classes¶
- class xflow.data.ShufflePipeline(base: BasePipeline, buffer_size: int)[source]¶
Bases:
BasePipelineMemory-efficient shuffle using reservoir sampling.
- __init__(base: BasePipeline, buffer_size: int) None[source]¶
- batch(batch_size: int) BasePipeline¶
Return a new pipeline that batches items into lists.
- prefetch() BasePipeline¶
Return a new pipeline that prefetches items in background.
- shuffle(buffer_size: int) BasePipeline¶
Return a new pipeline that shuffles items with a reservoir buffer.
- to_numpy()¶
Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.
- class xflow.data.BatchPipeline(base: BasePipeline, batch_size: int)[source]¶
Bases:
BasePipelineGroups items into fixed-size batches.
- __init__(base: BasePipeline, batch_size: int) None[source]¶
- unbatch() BasePipeline[source]¶
Return the underlying pipeline yielding individual items (no batch dimension).
- batch(batch_size: int) BatchPipeline[source]¶
Return a new BatchPipeline with the specified batch size.
- prefetch() BasePipeline¶
Return a new pipeline that prefetches items in background.
- shuffle(buffer_size: int) BasePipeline¶
Return a new pipeline that shuffles items with a reservoir buffer.
- to_numpy()¶
Convert the pipeline to NumPy arrays. If each item is a tuple, returns a tuple of arrays (one per component). If each item is a single array, returns a single array.
Provider Classes¶
- class xflow.data.FileProvider(root_paths: str | PathLike[str] | List[str | PathLike[str]], extensions: str | List[str] | None = None, path_type: Literal['string', 'str', 'path', 'Path'] = 'path')[source]¶
Bases:
DataProviderData provider that scans directories for files with specified extensions.
- __init__(root_paths: str | PathLike[str] | List[str | PathLike[str]], extensions: str | List[str] | None = None, path_type: Literal['string', 'str', 'path', 'Path'] = 'path')[source]¶
- Parameters:
root_paths – Single path (string or Path) or list of paths
extensions – Single extension string or list of extensions (e.g., ‘.jpg’ or [‘.jpg’, ‘.png’]). If None, returns all files.
path_type – Whether to return paths as “string” or “path” objects. Use “string” for TensorFlow compatibility, “path” for rich Path API.
- split(ratio: float = 0.8, seed: int = 42) Tuple[FileProvider, FileProvider][source]¶
Split files into two providers.
- Parameters:
ratio – Portion for first provider (0.0 to 1.0)
seed – Random seed for reproducible splits
- Returns:
Tuple of (provider_1, provider_2)
- subsample(n_samples: int | None = None, fraction: float | None = None, seed: int = None, strategy: str = 'random') FileProvider[source]¶
Create a subsampled version of this provider.
- Parameters:
n_samples – Exact number of samples to take
fraction – Fraction of total samples (0.0 to 1.0)
seed – Random seed for reproducible subsampling
strategy – “random”, “first”, “last”, “stride”, or “reservoir”.
- Returns:
New provider with subsampled data
- merge(other: FileProvider) FileProvider[source]¶
Merge with another FileProvider.
- Parameters:
other – Another FileProvider to merge with
- Returns:
New FileProvider containing files from both providers
- Raises:
TypeError – If other is not a FileProvider
- class xflow.data.SqlProvider(sources: List[Dict[str, Any]] | Dict[str, Any] | None = None, output_config: Dict[str, Any] | None = None)[source]¶
Bases:
DataProviderData provider that unifies data from SQL database sources into a DataFrame.
- __init__(sources: List[Dict[str, Any]] | Dict[str, Any] | None = None, output_config: Dict[str, Any] | None = None)[source]¶
- Parameters:
sources – Source configuration(s). Can be: - List of dicts: [{“connection”: “path.db”, “sql”: “SELECT …”}] - Single dict: {“connection”: “path.db”, “sql”: “SELECT …” - None: Creates empty provider
output_config – Optional dict controlling output behavior: - {“list”: “<column_name>”} returns that column as a Python list - {} or None returns the full unified DataFrame
- subsample(n_samples: int | None = None, fraction: float | None = None, seed: int = None, strategy: str = 'random') SqlProvider[source]¶
Create subsampled provider.
- split(ratio: float = None, seed: int = 42, filters: List[str] = None) Tuple[SqlProvider, SqlProvider] | List[SqlProvider][source]¶
Split unified DataFrame into multiple providers.
- Parameters:
ratio – For ratio-based split (0.0 to 1.0). If provided, returns (provider_1, provider_2).
seed – Random seed for ratio-based splits
filters – List of pandas query strings for filter-based split. Returns list of providers. Example: [“age > 30”, “category == ‘A’”, “score >= 0.8”]
- Returns:
Tuple of (provider_1, provider_2) for ratio split List of providers for filter split
- merge(other: SqlProvider) SqlProvider[source]¶
Merge with another SqlProvider.
- Parameters:
other – Another SqlProvider to merge with
- Returns:
New SqlProvider containing combined DataFrame from both providers
- Raises:
TypeError – If other is not a SqlProvider