Atmosphere (ATProto Integration)

Publishing and discovering datasets on the AT Protocol network

The atmosphere module enables publishing and discovering datasets on the ATProto network, creating a federated ecosystem for typed datasets.

Installation

pip install atdata[atmosphere]
# or
pip install atproto

Overview

ATProto integration publishes datasets, schemas, and lenses as records in the ac.foundation.dataset.* namespace. This enables:

  • Discovery through the ATProto network
  • Federation across different hosts
  • Verifiability through content-addressable records

AtmosphereClient

The client handles authentication and record operations:

from atdata.atmosphere import AtmosphereClient

client = AtmosphereClient()

# Login with app-specific password (not your main password!)
client.login("alice.bsky.social", "app-password")

print(client.did)     # 'did:plc:...'
print(client.handle)  # 'alice.bsky.social'
Warning

Always use an app-specific password, not your main Bluesky password. Create app passwords at bsky.app/settings/app-passwords.

Session Management

Save and restore sessions to avoid re-authentication:

# Export session for later
session_string = client.export_session()

# Later: restore session
new_client = AtmosphereClient()
new_client.login_with_session(session_string)

Custom PDS

Connect to a custom PDS instead of bsky.social:

client = AtmosphereClient(base_url="https://pds.example.com")

PDSBlobStore

Store dataset shards as ATProto blobs for fully decentralized storage:

from atdata.atmosphere import AtmosphereClient, PDSBlobStore

client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")

store = PDSBlobStore(client)

# Write shards as blobs
urls = store.write_shards(dataset, prefix="my-data/v1")
# Returns: ['at://did:plc:.../blob/bafyrei...', ...]

# Transform AT URIs to HTTP URLs for reading
http_url = store.read_url(urls[0])
# Returns: 'https://pds.example.com/xrpc/com.atproto.sync.getBlob?...'

# Create a BlobSource for streaming
source = store.create_source(urls)
ds = atdata.Dataset[MySample](source)

Size Limits

PDS blobs typically have size limits (often 50MB-5GB depending on the PDS). Use maxcount and maxsize parameters to control shard sizes:

urls = store.write_shards(
    dataset,
    prefix="large-data/v1",
    maxcount=5000,    # Max 5000 samples per shard
    maxsize=50e6,     # Max 50MB per shard
)

BlobSource

Read datasets stored as PDS blobs:

from atdata import BlobSource

# From blob references
source = BlobSource.from_refs([
    {"did": "did:plc:abc123", "cid": "bafyrei111"},
    {"did": "did:plc:abc123", "cid": "bafyrei222"},
])

# Or from PDSBlobStore
source = store.create_source(urls)

# Use with Dataset
ds = atdata.Dataset[MySample](source)
for batch in ds.ordered(batch_size=32):
    process(batch)

AtmosphereIndex

The unified interface for ATProto operations, implementing the AbstractIndex protocol:

from atdata.atmosphere import AtmosphereClient, AtmosphereIndex, PDSBlobStore

client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")

# Without blob storage (use external URLs)
index = AtmosphereIndex(client)

# With PDS blob storage (recommended for full decentralization)
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)

Publishing Schemas

import atdata
from numpy.typing import NDArray

@atdata.packable
class ImageSample:
    image: NDArray
    label: str
    confidence: float

# Publish schema
schema_uri = index.publish_schema(
    ImageSample,
    version="1.0.0",
    description="Image classification sample",
)
# Returns: "at://did:plc:.../ac.foundation.dataset.sampleSchema/..."

Publishing Datasets

dataset = atdata.Dataset[ImageSample]("data-{000000..000009}.tar")

entry = index.insert_dataset(
    dataset,
    name="imagenet-subset",
    schema_ref=schema_uri,           # Optional - auto-publishes if omitted
    description="ImageNet subset",
    tags=["images", "classification"],
    license="MIT",
)

print(entry.uri)        # AT URI of the record
print(entry.data_urls)  # WebDataset URLs

Listing and Retrieving

# List your datasets
for entry in index.list_datasets():
    print(f"{entry.name}: {entry.schema_ref}")

# List from another user
for entry in index.list_datasets(repo="did:plc:other-user"):
    print(entry.name)

# Get specific dataset
entry = index.get_dataset("at://did:plc:.../ac.foundation.dataset.record/...")

# List schemas
for schema in index.list_schemas():
    print(f"{schema['name']} v{schema['version']}")

# Decode schema to Python type
SampleType = index.decode_schema(schema_uri)

Lower-Level Publishers

For more control, use the individual publisher classes:

SchemaPublisher

from atdata.atmosphere import SchemaPublisher

publisher = SchemaPublisher(client)

uri = publisher.publish(
    ImageSample,
    name="ImageSample",
    version="1.0.0",
    description="Image with label",
    metadata={"source": "training"},
)

DatasetPublisher

from atdata.atmosphere import DatasetPublisher

publisher = DatasetPublisher(client)

uri = publisher.publish(
    dataset,
    name="training-images",
    schema_uri=schema_uri,           # Required if auto_publish_schema=False
    auto_publish_schema=True,        # Publish schema automatically
    description="Training images",
    tags=["training", "images"],
    license="MIT",
)

Blob Storage

There are two approaches to storing data as ATProto blobs:

Approach 1: PDSBlobStore (Recommended)

Use PDSBlobStore with AtmosphereIndex for automatic shard management:

from atdata.atmosphere import PDSBlobStore, AtmosphereIndex

store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)

# Dataset shards are automatically uploaded as blobs
entry = index.insert_dataset(
    dataset,
    name="my-dataset",
    schema_ref=schema_uri,
)

# Later: load using BlobSource
source = store.create_source(entry.data_urls)
ds = atdata.Dataset[MySample](source)

Approach 2: Manual Blob Publishing

For more control, use DatasetPublisher.publish_with_blobs() directly:

import io
import webdataset as wds

# Create tar data in memory
tar_buffer = io.BytesIO()
with wds.writer.TarWriter(tar_buffer) as sink:
    for i, sample in enumerate(samples):
        sink.write({**sample.as_wds, "__key__": f"{i:06d}"})

# Publish with blob storage
uri = publisher.publish_with_blobs(
    blobs=[tar_buffer.getvalue()],
    schema_uri=schema_uri,
    name="small-dataset",
    description="Dataset stored in ATProto blobs",
    tags=["small", "demo"],
)

Loading Blob-Stored Datasets

from atdata.atmosphere import DatasetLoader
from atdata import BlobSource

loader = DatasetLoader(client)

# Check storage type
storage_type = loader.get_storage_type(uri)  # "external" or "blobs"

if storage_type == "blobs":
    # Get blob URLs and create BlobSource
    blob_urls = loader.get_blob_urls(uri)
    # Parse to blob refs for BlobSource
    # Or use loader.to_dataset() which handles this automatically

# to_dataset() handles both storage types automatically
dataset = loader.to_dataset(uri, MySample)
for batch in dataset.ordered(batch_size=32):
    process(batch)

LensPublisher

from atdata.atmosphere import LensPublisher

publisher = LensPublisher(client)

# With code references
uri = publisher.publish(
    name="simplify",
    source_schema=full_schema_uri,
    target_schema=simple_schema_uri,
    description="Extract label only",
    getter_code={
        "repository": "https://github.com/org/repo",
        "commit": "abc123def...",
        "path": "transforms/simplify.py:simplify_getter",
    },
    putter_code={
        "repository": "https://github.com/org/repo",
        "commit": "abc123def...",
        "path": "transforms/simplify.py:simplify_putter",
    },
)

# Or publish from a Lens object
from atdata.lens import lens

@lens
def simplify(src: FullSample) -> SimpleSample:
    return SimpleSample(label=src.label)

uri = publisher.publish_from_lens(
    simplify,
    source_schema=full_schema_uri,
    target_schema=simple_schema_uri,
)

Lower-Level Loaders

For direct access to records, use the loader classes:

SchemaLoader

from atdata.atmosphere import SchemaLoader

loader = SchemaLoader(client)

# Get a specific schema
schema = loader.get("at://did:plc:abc/ac.foundation.dataset.sampleSchema/xyz")
print(schema["name"], schema["version"])

# List all schemas from a repository
for schema in loader.list_all(repo="did:plc:other-user"):
    print(schema["name"])

DatasetLoader

from atdata.atmosphere import DatasetLoader

loader = DatasetLoader(client)

# Get a specific dataset record
record = loader.get("at://did:plc:abc/ac.foundation.dataset.record/xyz")

# Check storage type
storage_type = loader.get_storage_type(uri)  # "external" or "blobs"

# Get URLs based on storage type
if storage_type == "external":
    urls = loader.get_urls(uri)
else:
    urls = loader.get_blob_urls(uri)

# Get metadata
metadata = loader.get_metadata(uri)

# Create a Dataset object directly
dataset = loader.to_dataset(uri, MySampleType)
for batch in dataset.ordered(batch_size=32):
    process(batch)

LensLoader

from atdata.atmosphere import LensLoader

loader = LensLoader(client)

# Get a specific lens record
lens = loader.get("at://did:plc:abc/ac.foundation.dataset.lens/xyz")
print(lens["name"])
print(lens["sourceSchema"], "->", lens["targetSchema"])

# List all lenses from a repository
for lens in loader.list_all():
    print(lens["name"])

# Find lenses by schema
lenses = loader.find_by_schemas(
    source_schema_uri="at://did:plc:abc/ac.foundation.dataset.sampleSchema/source",
    target_schema_uri="at://did:plc:abc/ac.foundation.dataset.sampleSchema/target",
)

AT URIs

ATProto records are identified by AT URIs:

from atdata.atmosphere import AtUri

# Parse an AT URI
uri = AtUri.parse("at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz")

print(uri.authority)   # 'did:plc:abc123'
print(uri.collection)  # 'ac.foundation.dataset.sampleSchema'
print(uri.rkey)        # 'xyz'

# Format back to string
print(str(uri))  # 'at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz'

Supported Field Types

Schemas support these field types:

Python Type ATProto Type
str primitive/str
int primitive/int
float primitive/float
bool primitive/bool
bytes primitive/bytes
NDArray ndarray (default dtype: float32)
NDArray[np.float64] ndarray (dtype: float64)
list[str] array with items
T \| None Optional field

Complete Example

This example shows the full workflow using PDSBlobStore for decentralized storage:

import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex, PDSBlobStore
import webdataset as wds

# 1. Define and create samples
@atdata.packable
class FeatureSample:
    features: NDArray
    label: int
    source: str

samples = [
    FeatureSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10,
        source="synthetic",
    )
    for i in range(1000)
]

# 2. Write to tar
with wds.writer.TarWriter("features.tar") as sink:
    for i, s in enumerate(samples):
        sink.write({**s.as_wds, "__key__": f"{i:06d}"})

# 3. Authenticate and set up blob storage
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")

store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)

# 4. Publish schema
schema_uri = index.publish_schema(
    FeatureSample,
    version="1.0.0",
    description="Feature vectors with labels",
)

# 5. Publish dataset (shards uploaded as blobs)
dataset = atdata.Dataset[FeatureSample]("features.tar")
entry = index.insert_dataset(
    dataset,
    name="synthetic-features-v1",
    schema_ref=schema_uri,
    tags=["features", "synthetic"],
)

print(f"Published: {entry.uri}")
print(f"Blob URLs: {entry.data_urls}")

# 6. Later: discover and load from blobs
for dataset_entry in index.list_datasets():
    print(f"Found: {dataset_entry.name}")

    # Reconstruct type from schema
    SampleType = index.decode_schema(dataset_entry.schema_ref)

    # Create source from blob URLs
    source = store.create_source(dataset_entry.data_urls)

    # Load dataset from blobs
    ds = atdata.Dataset[SampleType](source)
    for batch in ds.ordered(batch_size=32):
        print(batch.features.shape)
        break

For external URL storage (without PDSBlobStore):

# Use AtmosphereIndex without data_store
index = AtmosphereIndex(client)

# Dataset URLs will be stored as-is (external references)
entry = index.insert_dataset(
    dataset,
    name="external-features",
    schema_ref=schema_uri,
)

# Load using standard URL source
ds = atdata.Dataset[FeatureSample](entry.data_urls[0])