Atmosphere Publishing

Publish and discover datasets on the ATProto network

This tutorial demonstrates how to use the atmosphere module to publish datasets to the AT Protocol network, enabling federated discovery and sharing. This is Layer 3 of atdata’s architecture—decentralized federation that enables cross-organization dataset sharing.

Why Federation?

Team storage (Redis + S3) works well within an organization, but sharing across organizations introduces new challenges:

  • Discovery: How do researchers find relevant datasets across institutions?
  • Trust: How do you verify a dataset is what it claims to be?
  • Durability: What happens if the original publisher goes offline?

The AT Protocol (ATProto), developed by Bluesky, provides a foundation for decentralized social applications. atdata leverages ATProto’s infrastructure for dataset federation:

ATProto Feature atdata Usage
DIDs (Decentralized Identifiers) Publisher identity verification
Lexicons Dataset/schema record schemas
PDSes (Personal Data Servers) Storage for records and blobs
Relays & AppViews Discovery and aggregation

The key insight: your Bluesky identity (@handle.bsky.social) becomes your dataset publisher identity. Anyone can verify that a dataset was published by you, and can discover your datasets through the federated network.

Prerequisites

Warning

Always use an app-specific password, not your main Bluesky password.

Setup

import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.atmosphere import (
    AtmosphereClient,
    AtmosphereIndex,
    PDSBlobStore,
    SchemaPublisher,
    SchemaLoader,
    DatasetPublisher,
    DatasetLoader,
    AtUri,
)
from atdata import BlobSource
import webdataset as wds

Define Sample Types

@atdata.packable
class ImageSample:
    """A sample containing image data with metadata."""
    image: NDArray
    label: str
    confidence: float

@atdata.packable
class TextEmbeddingSample:
    """A sample containing text with embedding vectors."""
    text: str
    embedding: NDArray
    source: str

Type Introspection

See what information is available from a PackableSample type:

from dataclasses import fields, is_dataclass

print(f"Sample type: {ImageSample.__name__}")
print(f"Is dataclass: {is_dataclass(ImageSample)}")

print("\nFields:")
for field in fields(ImageSample):
    print(f"  - {field.name}: {field.type}")

# Create and serialize a sample
sample = ImageSample(
    image=np.random.rand(224, 224, 3).astype(np.float32),
    label="cat",
    confidence=0.95,
)

packed = sample.packed
print(f"\nSerialized size: {len(packed):,} bytes")

# Round-trip
restored = ImageSample.from_bytes(packed)
print(f"Round-trip successful: {np.allclose(sample.image, restored.image)}")

AT URI Parsing

Every record in ATProto is identified by an AT URI, which encodes:

  • Authority: The DID or handle of the record owner
  • Collection: The Lexicon type (like a table name)
  • Rkey: The record key (unique within the collection)

Understanding AT URIs is essential for working with atmosphere datasets, as they’re how you reference schemas, datasets, and lenses.

ATProto records are identified by AT URIs:

uris = [
    "at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz789",
    "at://alice.bsky.social/ac.foundation.dataset.record/my-dataset",
]

for uri_str in uris:
    print(f"\nParsing: {uri_str}")
    uri = AtUri.parse(uri_str)
    print(f"  Authority:  {uri.authority}")
    print(f"  Collection: {uri.collection}")
    print(f"  Rkey:       {uri.rkey}")

Authentication

The AtmosphereClient handles ATProto authentication. When you authenticate, you’re proving ownership of your decentralized identity (DID), which gives you permission to create and modify records in your Personal Data Server (PDS).

Connect to ATProto:

client = AtmosphereClient()
client.login("your.handle.social", "your-app-password")

print(f"Authenticated as: {client.handle}")
print(f"DID: {client.did}")

Publish a Schema

When you publish a schema to ATProto, it becomes a public, immutable record that others can reference. The schema CID ensures that anyone can verify they’re using exactly the same type definition you published.

schema_publisher = SchemaPublisher(client)
schema_uri = schema_publisher.publish(
    ImageSample,
    name="ImageSample",
    version="1.0.0",
    description="Demo: Image sample with label and confidence",
)
print(f"Schema URI: {schema_uri}")

List Your Schemas

schema_loader = SchemaLoader(client)
schemas = schema_loader.list_all(limit=10)
print(f"Found {len(schemas)} schema(s)")

for schema in schemas:
    print(f"  - {schema.get('name', 'Unknown')}: v{schema.get('version', '?')}")

Publish a Dataset

With External URLs

dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
    urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
    schema_uri=str(schema_uri),
    name="Demo Image Dataset",
    description="Example dataset demonstrating atmosphere publishing",
    tags=["demo", "images", "atdata"],
    license="MIT",
)
print(f"Dataset URI: {dataset_uri}")

With External URLs

For larger datasets that exceed PDS blob limits, or when you already have data in object storage, you can publish a dataset record that references external URLs. The ATProto record serves as the index entry while the actual data lives elsewhere.

For larger datasets or when using existing object storage:

dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
    urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
    schema_uri=str(schema_uri),
    name="Demo Image Dataset",
    description="Example dataset demonstrating atmosphere publishing",
    tags=["demo", "images", "atdata"],
    license="MIT",
)
print(f"Dataset URI: {dataset_uri}")

List and Load Datasets

dataset_loader = DatasetLoader(client)
datasets = dataset_loader.list_all(limit=10)
print(f"Found {len(datasets)} dataset(s)")

for ds in datasets:
    print(f"  - {ds.get('name', 'Unknown')}")
    print(f"    Schema: {ds.get('schemaRef', 'N/A')}")
    tags = ds.get('tags', [])
    if tags:
        print(f"    Tags: {', '.join(tags)}")

Load a Dataset

# Check storage type
storage_type = dataset_loader.get_storage_type(str(blob_dataset_uri))
print(f"Storage type: {storage_type}")

if storage_type == "blobs":
    blob_urls = dataset_loader.get_blob_urls(str(blob_dataset_uri))
    print(f"Blob URLs: {len(blob_urls)} blob(s)")

# Load and iterate (works for both storage types)
ds = dataset_loader.to_dataset(str(blob_dataset_uri), DemoSample)
for batch in ds.ordered():
    print(f"Sample id={batch.id}, text={batch.text}")

Complete Publishing Workflow

Here’s the end-to-end workflow for publishing a dataset to the atmosphere:

  1. Define your sample type using @packable
  2. Create samples and write to tar (same as local workflow)
  3. Authenticate with your ATProto identity
  4. Create index with blob storage (AtmosphereIndex + PDSBlobStore)
  5. Publish schema (creates ATProto record)
  6. Insert dataset (uploads blobs, creates dataset record)

Notice how similar this is to the local workflow—the same sample types and patterns, just with a different storage backend.

This example shows the recommended workflow using PDSBlobStore for fully decentralized storage:

# 1. Define and create samples
@atdata.packable
class FeatureSample:
    features: NDArray
    label: int
    source: str

samples = [
    FeatureSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10,
        source="synthetic",
    )
    for i in range(1000)
]

# 2. Write to tar
with wds.writer.TarWriter("features.tar") as sink:
    for i, s in enumerate(samples):
        sink.write({**s.as_wds, "__key__": f"{i:06d}"})

# 3. Authenticate and create index with blob storage
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")

store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)

# 4. Publish schema
schema_uri = index.publish_schema(
    FeatureSample,
    version="1.0.0",
    description="Feature vectors with labels",
)

# 5. Publish dataset (shards uploaded as blobs automatically)
dataset = atdata.Dataset[FeatureSample]("features.tar")
entry = index.insert_dataset(
    dataset,
    name="synthetic-features-v1",
    schema_ref=schema_uri,
    tags=["features", "synthetic"],
)

print(f"Published: {entry.uri}")
print(f"Data stored at: {entry.data_urls}")  # at://did/blob/cid URLs

# 6. Later: load from blobs
source = store.create_source(entry.data_urls)
ds = atdata.Dataset[FeatureSample](source)
for batch in ds.ordered(batch_size=32):
    print(f"Loaded batch with {len(batch.label)} samples")
    break

What You’ve Learned

You now understand federated dataset publishing in atdata:

Concept Purpose
AtmosphereClient ATProto authentication and record management
AtmosphereIndex Federated index implementing AbstractIndex
PDSBlobStore PDS blob storage implementing AbstractDataStore
BlobSource Stream datasets from PDS blobs
AT URIs Universal identifiers for schemas and datasets

The protocol abstractions (AbstractIndex, AbstractDataStore, DataSource) ensure your code works across all three layers of atdata—local files, team storage, and federated sharing.

The Full Picture

You’ve now seen atdata’s complete architecture:

Local Development          Team Storage              Federation
─────────────────          ────────────              ──────────
tar files                  Redis + S3                ATProto PDS
Dataset[T]                 LocalIndex                AtmosphereIndex
                           S3DataStore               PDSBlobStore

The same @packable sample types, the same Dataset[T] iteration patterns, and the same lens transformations work at every layer. Only the storage backend changes.

Next Steps

Already Have Local Datasets?

The Promotion Workflow tutorial shows how to migrate existing datasets from local storage to the atmosphere without re-processing your data.