Protocols

Abstract interfaces for index backends, data stores, and data sources

The protocols module defines abstract interfaces that enable interchangeable index backends (local Redis vs ATProto), data stores (S3 vs PDS blobs), and data sources (URL, S3, etc.).

Overview

Both local and atmosphere implementations solve the same problem: indexed dataset storage with external data URLs. These protocols formalize that common interface:

  • IndexEntry: Common interface for dataset index entries
  • AbstractIndex: Protocol for index operations
  • AbstractDataStore: Protocol for data storage operations
  • DataSource: Protocol for streaming data from various backends

IndexEntry Protocol

Represents a dataset entry in any index:

from atdata._protocols import IndexEntry

def process_entry(entry: IndexEntry) -> None:
    print(f"Name: {entry.name}")
    print(f"Schema: {entry.schema_ref}")
    print(f"URLs: {entry.data_urls}")
    print(f"Metadata: {entry.metadata}")

Properties

Property Type Description
name str Human-readable dataset name
schema_ref str Schema reference (local:// or at://)
data_urls list[str] WebDataset URLs for the data
metadata dict \| None Arbitrary metadata dictionary

Implementations

  • LocalDatasetEntry (from atdata.local)
  • AtmosphereIndexEntry (from atdata.atmosphere)

AbstractIndex Protocol

Defines operations for managing schemas and datasets:

from atdata._protocols import AbstractIndex

def list_all_datasets(index: AbstractIndex) -> None:
    """Works with LocalIndex or AtmosphereIndex."""
    for entry in index.list_datasets():
        print(f"{entry.name}: {entry.schema_ref}")

Dataset Operations

# Insert a dataset
entry = index.insert_dataset(
    dataset,
    name="my-dataset",
    schema_ref="local://schemas/MySample@1.0.0",  # optional
)

# Get by name/reference
entry = index.get_dataset("my-dataset")

# List all datasets
for entry in index.list_datasets():
    print(entry.name)

Schema Operations

# Publish a schema
schema_ref = index.publish_schema(
    MySample,
    version="1.0.0",
)

# Get schema record
schema = index.get_schema(schema_ref)
print(schema["name"], schema["version"])

# List all schemas
for schema in index.list_schemas():
    print(f"{schema['name']}@{schema['version']}")

# Decode schema to Python type
SampleType = index.decode_schema(schema_ref)
dataset = atdata.Dataset[SampleType](entry.data_urls[0])

Implementations

  • LocalIndex / Index (from atdata.local)
  • AtmosphereIndex (from atdata.atmosphere)

AbstractDataStore Protocol

Abstracts over different storage backends:

from atdata._protocols import AbstractDataStore

def write_dataset(store: AbstractDataStore, dataset) -> list[str]:
    """Works with S3DataStore or future PDS blob store."""
    urls = store.write_shards(dataset, prefix="datasets/v1")
    return urls

Methods

# Write dataset shards
urls = store.write_shards(
    dataset,
    prefix="datasets/mnist/v1",
    maxcount=10000,  # samples per shard
)

# Resolve URL for reading
readable_url = store.read_url("s3://bucket/path.tar")

# Check streaming support
if store.supports_streaming():
    # Can stream directly
    pass

Implementations

  • S3DataStore (from atdata.local)

DataSource Protocol

Abstracts over different data source backends for streaming dataset shards:

from atdata._protocols import DataSource

def load_from_source(source: DataSource) -> None:
    """Works with URLSource, S3Source, or custom implementations."""
    print(f"Shards: {source.shard_list}")

    for shard_id, stream in source.shards():
        print(f"Reading {shard_id}")
        # stream is a file-like object

Methods

# Get list of shard identifiers
shard_ids = source.shard_list  # ['data-000000.tar', 'data-000001.tar', ...]

# Iterate over all shards with streams
for shard_id, stream in source.shards():
    # stream is IO[bytes], can be passed to tar reader
    process_shard(stream)

# Open a specific shard
stream = source.open_shard("data-000001.tar")

Implementations

  • URLSource (from atdata) - WebDataset-compatible URLs (local, HTTP, etc.)
  • S3Source (from atdata) - S3 and S3-compatible storage with boto3

Creating Custom Data Sources

Implement the DataSource protocol for custom backends:

from typing import Iterator, IO
from atdata._protocols import DataSource

class MyCustomSource:
    """Custom data source for proprietary storage."""

    def __init__(self, config: dict):
        self._config = config
        self._shards = ["shard-001.tar", "shard-002.tar"]

    @property
    def shard_list(self) -> list[str]:
        return self._shards

    def shards(self) -> Iterator[tuple[str, IO[bytes]]]:
        for shard_id in self._shards:
            stream = self._open(shard_id)
            yield shard_id, stream

    def open_shard(self, shard_id: str) -> IO[bytes]:
        if shard_id not in self._shards:
            raise KeyError(f"Shard not found: {shard_id}")
        return self._open(shard_id)

    def _open(self, shard_id: str) -> IO[bytes]:
        # Implementation-specific logic
        ...

# Use with Dataset
source = MyCustomSource({"endpoint": "..."})
dataset = atdata.Dataset[MySample](source)

Using Protocols for Polymorphism

Write code that works with any backend:

from atdata._protocols import AbstractIndex, IndexEntry
from atdata import Dataset

def backup_all_datasets(
    source: AbstractIndex,
    target: AbstractIndex,
) -> None:
    """Copy all datasets from source index to target."""
    for entry in source.list_datasets():
        # Decode schema from source
        SampleType = source.decode_schema(entry.schema_ref)

        # Publish schema to target
        target_schema = target.publish_schema(SampleType)

        # Load and re-insert dataset
        ds = Dataset[SampleType](entry.data_urls[0])
        target.insert_dataset(
            ds,
            name=entry.name,
            schema_ref=target_schema,
        )

Schema Reference Formats

Schema references vary by backend:

Backend Format Example
Local atdata://local/sampleSchema/{Class}@{version} atdata://local/sampleSchema/ImageSample@1.0.0
Atmosphere at://{did}/{collection}/{rkey} at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz
Note

Legacy local://schemas/ URIs are still supported for backward compatibility.

Type Checking

Protocols are runtime-checkable:

from atdata._protocols import IndexEntry, AbstractIndex

# Check if object implements protocol
entry = index.get_dataset("test")
assert isinstance(entry, IndexEntry)

# Type hints work with protocols
def process(index: AbstractIndex) -> None:
    ...  # IDE provides autocomplete

Complete Example

import atdata
from atdata.local import LocalIndex, S3DataStore
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex
from atdata._protocols import AbstractIndex
import numpy as np
from numpy.typing import NDArray

# Define sample type
@atdata.packable
class FeatureSample:
    features: NDArray
    label: int

# Function works with any index
def count_datasets(index: AbstractIndex) -> int:
    return sum(1 for _ in index.list_datasets())

# Use with local index
local_index = LocalIndex()
print(f"Local datasets: {count_datasets(local_index)}")

# Use with atmosphere index
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
atm_index = AtmosphereIndex(client)
print(f"Atmosphere datasets: {count_datasets(atm_index)}")

# Migrate from local to atmosphere
def migrate_dataset(
    name: str,
    source: AbstractIndex,
    target: AbstractIndex,
) -> None:
    entry = source.get_dataset(name)
    SampleType = source.decode_schema(entry.schema_ref)

    # Publish schema
    schema_ref = target.publish_schema(SampleType)

    # Create dataset and insert
    ds = atdata.Dataset[SampleType](entry.data_urls[0])
    target.insert_dataset(ds, name=name, schema_ref=schema_ref)

migrate_dataset("my-features", local_index, atm_index)