from atdata._protocols import IndexEntry
def process_entry(entry: IndexEntry) -> None:
print(f"Name: {entry.name}")
print(f"Schema: {entry.schema_ref}")
print(f"URLs: {entry.data_urls}")
print(f"Metadata: {entry.metadata}")Protocols
The protocols module defines abstract interfaces that enable interchangeable index backends (local Redis vs ATProto), data stores (S3 vs PDS blobs), and data sources (URL, S3, etc.).
Overview
Both local and atmosphere implementations solve the same problem: indexed dataset storage with external data URLs. These protocols formalize that common interface:
- IndexEntry: Common interface for dataset index entries
- AbstractIndex: Protocol for index operations
- AbstractDataStore: Protocol for data storage operations
- DataSource: Protocol for streaming data from various backends
IndexEntry Protocol
Represents a dataset entry in any index:
Properties
| Property | Type | Description |
|---|---|---|
name |
str |
Human-readable dataset name |
schema_ref |
str |
Schema reference (local:// or at://) |
data_urls |
list[str] |
WebDataset URLs for the data |
metadata |
dict \| None |
Arbitrary metadata dictionary |
Implementations
LocalDatasetEntry(fromatdata.local)AtmosphereIndexEntry(fromatdata.atmosphere)
AbstractIndex Protocol
Defines operations for managing schemas and datasets:
from atdata._protocols import AbstractIndex
def list_all_datasets(index: AbstractIndex) -> None:
"""Works with LocalIndex or AtmosphereIndex."""
for entry in index.list_datasets():
print(f"{entry.name}: {entry.schema_ref}")Dataset Operations
# Insert a dataset
entry = index.insert_dataset(
dataset,
name="my-dataset",
schema_ref="local://schemas/MySample@1.0.0", # optional
)
# Get by name/reference
entry = index.get_dataset("my-dataset")
# List all datasets
for entry in index.list_datasets():
print(entry.name)Schema Operations
# Publish a schema
schema_ref = index.publish_schema(
MySample,
version="1.0.0",
)
# Get schema record
schema = index.get_schema(schema_ref)
print(schema["name"], schema["version"])
# List all schemas
for schema in index.list_schemas():
print(f"{schema['name']}@{schema['version']}")
# Decode schema to Python type
SampleType = index.decode_schema(schema_ref)
dataset = atdata.Dataset[SampleType](entry.data_urls[0])Implementations
LocalIndex/Index(fromatdata.local)AtmosphereIndex(fromatdata.atmosphere)
AbstractDataStore Protocol
Abstracts over different storage backends:
from atdata._protocols import AbstractDataStore
def write_dataset(store: AbstractDataStore, dataset) -> list[str]:
"""Works with S3DataStore or future PDS blob store."""
urls = store.write_shards(dataset, prefix="datasets/v1")
return urlsMethods
# Write dataset shards
urls = store.write_shards(
dataset,
prefix="datasets/mnist/v1",
maxcount=10000, # samples per shard
)
# Resolve URL for reading
readable_url = store.read_url("s3://bucket/path.tar")
# Check streaming support
if store.supports_streaming():
# Can stream directly
passImplementations
S3DataStore(fromatdata.local)
DataSource Protocol
Abstracts over different data source backends for streaming dataset shards:
from atdata._protocols import DataSource
def load_from_source(source: DataSource) -> None:
"""Works with URLSource, S3Source, or custom implementations."""
print(f"Shards: {source.shard_list}")
for shard_id, stream in source.shards():
print(f"Reading {shard_id}")
# stream is a file-like objectMethods
# Get list of shard identifiers
shard_ids = source.shard_list # ['data-000000.tar', 'data-000001.tar', ...]
# Iterate over all shards with streams
for shard_id, stream in source.shards():
# stream is IO[bytes], can be passed to tar reader
process_shard(stream)
# Open a specific shard
stream = source.open_shard("data-000001.tar")Implementations
URLSource(fromatdata) - WebDataset-compatible URLs (local, HTTP, etc.)S3Source(fromatdata) - S3 and S3-compatible storage with boto3
Creating Custom Data Sources
Implement the DataSource protocol for custom backends:
from typing import Iterator, IO
from atdata._protocols import DataSource
class MyCustomSource:
"""Custom data source for proprietary storage."""
def __init__(self, config: dict):
self._config = config
self._shards = ["shard-001.tar", "shard-002.tar"]
@property
def shard_list(self) -> list[str]:
return self._shards
def shards(self) -> Iterator[tuple[str, IO[bytes]]]:
for shard_id in self._shards:
stream = self._open(shard_id)
yield shard_id, stream
def open_shard(self, shard_id: str) -> IO[bytes]:
if shard_id not in self._shards:
raise KeyError(f"Shard not found: {shard_id}")
return self._open(shard_id)
def _open(self, shard_id: str) -> IO[bytes]:
# Implementation-specific logic
...
# Use with Dataset
source = MyCustomSource({"endpoint": "..."})
dataset = atdata.Dataset[MySample](source)Using Protocols for Polymorphism
Write code that works with any backend:
from atdata._protocols import AbstractIndex, IndexEntry
from atdata import Dataset
def backup_all_datasets(
source: AbstractIndex,
target: AbstractIndex,
) -> None:
"""Copy all datasets from source index to target."""
for entry in source.list_datasets():
# Decode schema from source
SampleType = source.decode_schema(entry.schema_ref)
# Publish schema to target
target_schema = target.publish_schema(SampleType)
# Load and re-insert dataset
ds = Dataset[SampleType](entry.data_urls[0])
target.insert_dataset(
ds,
name=entry.name,
schema_ref=target_schema,
)Schema Reference Formats
Schema references vary by backend:
| Backend | Format | Example |
|---|---|---|
| Local | atdata://local/sampleSchema/{Class}@{version} |
atdata://local/sampleSchema/ImageSample@1.0.0 |
| Atmosphere | at://{did}/{collection}/{rkey} |
at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz |
Legacy local://schemas/ URIs are still supported for backward compatibility.
Type Checking
Protocols are runtime-checkable:
from atdata._protocols import IndexEntry, AbstractIndex
# Check if object implements protocol
entry = index.get_dataset("test")
assert isinstance(entry, IndexEntry)
# Type hints work with protocols
def process(index: AbstractIndex) -> None:
... # IDE provides autocompleteComplete Example
import atdata
from atdata.local import LocalIndex, S3DataStore
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex
from atdata._protocols import AbstractIndex
import numpy as np
from numpy.typing import NDArray
# Define sample type
@atdata.packable
class FeatureSample:
features: NDArray
label: int
# Function works with any index
def count_datasets(index: AbstractIndex) -> int:
return sum(1 for _ in index.list_datasets())
# Use with local index
local_index = LocalIndex()
print(f"Local datasets: {count_datasets(local_index)}")
# Use with atmosphere index
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
atm_index = AtmosphereIndex(client)
print(f"Atmosphere datasets: {count_datasets(atm_index)}")
# Migrate from local to atmosphere
def migrate_dataset(
name: str,
source: AbstractIndex,
target: AbstractIndex,
) -> None:
entry = source.get_dataset(name)
SampleType = source.decode_schema(entry.schema_ref)
# Publish schema
schema_ref = target.publish_schema(SampleType)
# Create dataset and insert
ds = atdata.Dataset[SampleType](entry.data_urls[0])
target.insert_dataset(ds, name=name, schema_ref=schema_ref)
migrate_dataset("my-features", local_index, atm_index)