@atdata.packable
class ImageSample:
image: NDArray # Automatically converted to/from bytes
label: str # Standard msgpack serialization
confidence: floatArchitecture Overview
atdata is designed around a simple but powerful idea: typed, serializable samples that can flow seamlessly between local development, team storage, and a federated network. This page explains the architectural decisions and how the components work together.
Design Philosophy
The Problem
Machine learning workflows involve datasets at every stage—training data, validation sets, embeddings, features, and model outputs. These datasets are often:
- Untyped: Raw files with implicit schemas, leading to runtime errors
- Siloed: Stuck in one location (local disk, team bucket, or cloud storage)
- Undiscoverable: No standard way to find and share datasets across teams or organizations
The Solution
atdata provides a three-layer architecture that addresses each problem:
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Federation (ATProto Atmosphere) │
│ - Decentralized discovery and sharing │
│ - Content-addressable identifiers │
│ - Cross-organization dataset federation │
└─────────────────────────────────────────────────────────────┘
↑
Promotion
↑
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Team Storage (Redis + S3) │
│ - Shared index for team discovery │
│ - Scalable object storage for data │
│ - Schema registry for type consistency │
└─────────────────────────────────────────────────────────────┘
↑
Insert
↑
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Local Development │
│ - Typed samples with automatic serialization │
│ - WebDataset tar files for efficient storage │
│ - Lens transformations for schema flexibility │
└─────────────────────────────────────────────────────────────┘
Core Components
PackableSample: The Foundation
Everything in atdata starts with PackableSample—a base class that makes Python dataclasses serializable with msgpack:
Key features:
- Automatic NDArray handling: Numpy arrays are serialized efficiently
- Type safety: Field types are preserved and validated
- Round-trip fidelity: Serialize → deserialize always produces identical data
The @packable decorator is syntactic sugar that:
- Converts your class to a dataclass
- Adds
PackableSampleas a base class - Registers a lens from
DictSamplefor flexible loading
Dataset: Typed Iteration
The Dataset[T] class wraps WebDataset tar archives with type information:
dataset = atdata.Dataset[ImageSample]("data-{000000..000009}.tar")
for batch in dataset.shuffled(batch_size=32):
images = batch.image # Stacked NDArray: (32, H, W, C)
labels = batch.label # List of 32 stringsWhy WebDataset?
WebDataset is a battle-tested format for large-scale ML training:
- Streaming: No need to download entire datasets
- Sharding: Data split across multiple tar files for parallelism
- Shuffling: Two-level shuffling (shard + sample) for training
atdata adds:
- Type safety: Know the schema at compile time
- Batch aggregation: NDArrays are automatically stacked
- Lens transformations: View data through different schemas
SampleBatch: Automatic Aggregation
When iterating with batch_size, atdata returns SampleBatch[T] objects that aggregate sample attributes:
batch = SampleBatch[ImageSample](samples)
# NDArray fields → stacked numpy array with batch dimension
batch.image.shape # (batch_size, H, W, C)
# Other fields → list
batch.label # ["cat", "dog", "bird", ...]This eliminates boilerplate collation code and works automatically for any PackableSample type.
Lens: Schema Transformations
Lenses enable viewing datasets through different schemas without duplicating data:
@atdata.packable
class SimplifiedSample:
label: str
@atdata.lens
def simplify(src: ImageSample) -> SimplifiedSample:
return SimplifiedSample(label=src.label)
# View dataset through simplified schema
simple_ds = dataset.as_type(SimplifiedSample)When to use lenses:
- Reducing fields: Drop unnecessary data for specific tasks
- Transforming data: Compute derived fields on-the-fly
- Schema migration: Handle version differences between datasets
Lenses are registered globally in a LensNetwork, enabling automatic discovery of transformation paths.
Storage Backends
Local Index (Redis + S3)
For team-scale usage, atdata provides a two-component storage system:
Redis Index: Stores metadata and enables fast lookups
- Dataset entries (name, schema, URLs, metadata)
- Schema registry (type definitions)
- CID-based content addressing
S3 DataStore: Stores actual data files
- WebDataset tar shards
- Any S3-compatible storage (AWS, MinIO, Cloudflare R2)
store = S3DataStore(credentials=creds, bucket="datasets")
index = LocalIndex(data_store=store)
# Insert dataset: writes to S3, indexes in Redis
entry = index.insert_dataset(dataset, name="training-v1")Why this split?
- Separation of concerns: Metadata queries don’t touch data storage
- Flexibility: Use any S3-compatible storage
- Scalability: Redis handles high-throughput lookups; S3 handles large files
Atmosphere Index (ATProto)
For public or cross-organization sharing, atdata integrates with the AT Protocol:
ATProto PDS: Your Personal Data Server stores records
- Schema definitions
- Dataset index records
- Lens transformation records
PDSBlobStore: Optional blob storage on your PDS
- Store actual data shards as ATProto blobs
- Fully decentralized—no external dependencies
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)
# Publish: creates ATProto records, uploads blobs
entry = index.insert_dataset(dataset, name="public-features")Protocol Abstractions
atdata uses protocols (structural typing) to enable backend interoperability:
AbstractIndex
Common interface for both LocalIndex and AtmosphereIndex:
def process_dataset(index: AbstractIndex, name: str):
entry = index.get_dataset(name)
schema = index.decode_schema(entry.schema_ref)
# Works with either LocalIndex or AtmosphereIndexKey methods:
insert_dataset()/get_dataset(): Dataset CRUDpublish_schema()/decode_schema(): Schema managementlist_datasets()/list_schemas(): Discovery
AbstractDataStore
Common interface for S3DataStore and PDSBlobStore:
def write_to_store(store: AbstractDataStore, dataset: Dataset):
urls = store.write_shards(dataset, prefix="data/v1")
# Works with S3 or PDS blob storageDataSource
Common interface for data streaming:
URLSource: WebDataset-compatible URLsS3Source: S3 with explicit credentialsBlobSource: ATProto PDS blobs
Data Flow: Local to Federation
A typical workflow progresses through three stages:
Stage 1: Local Development
# Define type and create samples
@atdata.packable
class MySample:
features: NDArray
label: str
# Write to local tar
with wds.writer.TarWriter("data.tar") as sink:
for sample in samples:
sink.write(sample.as_wds)
# Iterate locally
dataset = atdata.Dataset[MySample]("data.tar")Stage 2: Team Storage
# Set up team storage
store = S3DataStore(credentials=team_creds, bucket="team-datasets")
index = LocalIndex(data_store=store)
# Publish schema and insert
index.publish_schema(MySample, version="1.0.0")
entry = index.insert_dataset(dataset, name="my-features")
# Team members can now load via index
ds = load_dataset("@local/my-features", index=index)Stage 3: Federation
# Promote to atmosphere
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
at_uri = promote_to_atmosphere(entry, index, client)
# Anyone can now discover and load
# ds = load_dataset("@handle.bsky.social/my-features")Content Addressing
atdata uses CIDs (Content Identifiers) for content-addressable storage:
- Schema CIDs: Hash of schema definition
- Entry CIDs: Hash of (schema_ref, data_urls)
- Blob CIDs: Hash of data content
Benefits:
- Deduplication: Identical content has identical CID
- Integrity: Verify data matches expected hash
- ATProto compatibility: CIDs are native to the AT Protocol
Extension Points
atdata is designed for extensibility:
Custom DataSources
Implement the DataSource protocol to add new storage backends:
class MyCustomSource:
def list_shards(self) -> list[str]: ...
def open_shard(self, shard_id: str) -> IO[bytes]: ...
@property
def shards(self) -> Iterator[tuple[str, IO[bytes]]]: ...Custom Lenses
Register transformations between any PackableSample types:
@atdata.lens
def my_transform(src: SourceType) -> TargetType:
return TargetType(...)
@my_transform.putter
def my_transform_put(view: TargetType, src: SourceType) -> SourceType:
return SourceType(...)Schema Extensions
The schema format supports custom metadata for domain-specific needs:
index.publish_schema(
MySample,
version="1.0.0",
metadata={"domain": "chemistry", "units": "mol/L"},
)Summary
| Component | Purpose | Key Classes |
|---|---|---|
| Samples | Typed, serializable data | PackableSample, @packable |
| Datasets | Typed iteration over WebDataset | Dataset[T], SampleBatch[T] |
| Lenses | Schema transformations | Lens, @lens, LensNetwork |
| Local Storage | Team-scale index + data | LocalIndex, S3DataStore |
| Atmosphere | Federated sharing | AtmosphereIndex, PDSBlobStore |
| Protocols | Backend abstraction | AbstractIndex, AbstractDataStore, DataSource |
The architecture enables a smooth progression from local experimentation to team collaboration to public federation, all while maintaining type safety and efficient data handling.