import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.atmosphere import (
AtmosphereClient,
AtmosphereIndex,
PDSBlobStore,
SchemaPublisher,
SchemaLoader,
DatasetPublisher,
DatasetLoader,
AtUri,
)
from atdata import BlobSource
import webdataset as wdsAtmosphere Publishing
This tutorial demonstrates how to use the atmosphere module to publish datasets to the AT Protocol network, enabling federated discovery and sharing. This is Layer 3 of atdata’s architecture—decentralized federation that enables cross-organization dataset sharing.
Why Federation?
Team storage (Redis + S3) works well within an organization, but sharing across organizations introduces new challenges:
- Discovery: How do researchers find relevant datasets across institutions?
- Trust: How do you verify a dataset is what it claims to be?
- Durability: What happens if the original publisher goes offline?
The AT Protocol (ATProto), developed by Bluesky, provides a foundation for decentralized social applications. atdata leverages ATProto’s infrastructure for dataset federation:
| ATProto Feature | atdata Usage |
|---|---|
| DIDs (Decentralized Identifiers) | Publisher identity verification |
| Lexicons | Dataset/schema record schemas |
| PDSes (Personal Data Servers) | Storage for records and blobs |
| Relays & AppViews | Discovery and aggregation |
The key insight: your Bluesky identity (@handle.bsky.social) becomes your dataset publisher identity. Anyone can verify that a dataset was published by you, and can discover your datasets through the federated network.
Prerequisites
pip install atdata[atmosphere]- A Bluesky account with an app-specific password
Always use an app-specific password, not your main Bluesky password.
Setup
Define Sample Types
@atdata.packable
class ImageSample:
"""A sample containing image data with metadata."""
image: NDArray
label: str
confidence: float
@atdata.packable
class TextEmbeddingSample:
"""A sample containing text with embedding vectors."""
text: str
embedding: NDArray
source: strType Introspection
See what information is available from a PackableSample type:
from dataclasses import fields, is_dataclass
print(f"Sample type: {ImageSample.__name__}")
print(f"Is dataclass: {is_dataclass(ImageSample)}")
print("\nFields:")
for field in fields(ImageSample):
print(f" - {field.name}: {field.type}")
# Create and serialize a sample
sample = ImageSample(
image=np.random.rand(224, 224, 3).astype(np.float32),
label="cat",
confidence=0.95,
)
packed = sample.packed
print(f"\nSerialized size: {len(packed):,} bytes")
# Round-trip
restored = ImageSample.from_bytes(packed)
print(f"Round-trip successful: {np.allclose(sample.image, restored.image)}")AT URI Parsing
Every record in ATProto is identified by an AT URI, which encodes:
- Authority: The DID or handle of the record owner
- Collection: The Lexicon type (like a table name)
- Rkey: The record key (unique within the collection)
Understanding AT URIs is essential for working with atmosphere datasets, as they’re how you reference schemas, datasets, and lenses.
ATProto records are identified by AT URIs:
uris = [
"at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz789",
"at://alice.bsky.social/ac.foundation.dataset.record/my-dataset",
]
for uri_str in uris:
print(f"\nParsing: {uri_str}")
uri = AtUri.parse(uri_str)
print(f" Authority: {uri.authority}")
print(f" Collection: {uri.collection}")
print(f" Rkey: {uri.rkey}")Authentication
The AtmosphereClient handles ATProto authentication. When you authenticate, you’re proving ownership of your decentralized identity (DID), which gives you permission to create and modify records in your Personal Data Server (PDS).
Connect to ATProto:
client = AtmosphereClient()
client.login("your.handle.social", "your-app-password")
print(f"Authenticated as: {client.handle}")
print(f"DID: {client.did}")Publish a Schema
When you publish a schema to ATProto, it becomes a public, immutable record that others can reference. The schema CID ensures that anyone can verify they’re using exactly the same type definition you published.
schema_publisher = SchemaPublisher(client)
schema_uri = schema_publisher.publish(
ImageSample,
name="ImageSample",
version="1.0.0",
description="Demo: Image sample with label and confidence",
)
print(f"Schema URI: {schema_uri}")List Your Schemas
schema_loader = SchemaLoader(client)
schemas = schema_loader.list_all(limit=10)
print(f"Found {len(schemas)} schema(s)")
for schema in schemas:
print(f" - {schema.get('name', 'Unknown')}: v{schema.get('version', '?')}")Publish a Dataset
With External URLs
dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
schema_uri=str(schema_uri),
name="Demo Image Dataset",
description="Example dataset demonstrating atmosphere publishing",
tags=["demo", "images", "atdata"],
license="MIT",
)
print(f"Dataset URI: {dataset_uri}")With PDS Blob Storage (Recommended)
The PDSBlobStore is the fully decentralized option: your dataset shards are stored as ATProto blobs directly in your PDS, alongside your other ATProto records. This means:
- No external dependencies: Data lives in the same infrastructure as your identity
- Content-addressed: Blobs are identified by their CID, ensuring integrity
- Federated replication: Relays can mirror your blobs for availability
For fully decentralized storage, use PDSBlobStore to store dataset shards directly as ATProto blobs in your PDS:
# Create store and index with blob storage
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)
# Define sample type
@atdata.packable
class FeatureSample:
features: NDArray
label: int
# Create dataset in memory or from existing tar
samples = [FeatureSample(features=np.random.randn(64).astype(np.float32), label=i % 10) for i in range(100)]
# Write to temporary tar
with wds.writer.TarWriter("temp.tar") as sink:
for i, s in enumerate(samples):
sink.write({**s.as_wds, "__key__": f"{i:06d}"})
dataset = atdata.Dataset[FeatureSample]("temp.tar")
# Publish - shards are uploaded as blobs automatically
schema_uri = index.publish_schema(FeatureSample, version="1.0.0")
entry = index.insert_dataset(
dataset,
name="blob-stored-features",
schema_ref=schema_uri,
description="Features stored as PDS blobs",
)
print(f"Dataset URI: {entry.uri}")
print(f"Blob URLs: {entry.data_urls}") # at://did/blob/cid formatUse BlobSource to stream directly from PDS blobs:
# Create source from the blob URLs
source = store.create_source(entry.data_urls)
# Or manually from blob references
source = BlobSource.from_refs([
{"did": client.did, "cid": "bafyrei..."},
])
# Load and iterate
ds = atdata.Dataset[FeatureSample](source)
for batch in ds.ordered(batch_size=32):
print(batch.features.shape)With External URLs
For larger datasets that exceed PDS blob limits, or when you already have data in object storage, you can publish a dataset record that references external URLs. The ATProto record serves as the index entry while the actual data lives elsewhere.
For larger datasets or when using existing object storage:
dataset_publisher = DatasetPublisher(client)
dataset_uri = dataset_publisher.publish_with_urls(
urls=["s3://example-bucket/demo-data-{000000..000009}.tar"],
schema_uri=str(schema_uri),
name="Demo Image Dataset",
description="Example dataset demonstrating atmosphere publishing",
tags=["demo", "images", "atdata"],
license="MIT",
)
print(f"Dataset URI: {dataset_uri}")List and Load Datasets
dataset_loader = DatasetLoader(client)
datasets = dataset_loader.list_all(limit=10)
print(f"Found {len(datasets)} dataset(s)")
for ds in datasets:
print(f" - {ds.get('name', 'Unknown')}")
print(f" Schema: {ds.get('schemaRef', 'N/A')}")
tags = ds.get('tags', [])
if tags:
print(f" Tags: {', '.join(tags)}")Load a Dataset
# Check storage type
storage_type = dataset_loader.get_storage_type(str(blob_dataset_uri))
print(f"Storage type: {storage_type}")
if storage_type == "blobs":
blob_urls = dataset_loader.get_blob_urls(str(blob_dataset_uri))
print(f"Blob URLs: {len(blob_urls)} blob(s)")
# Load and iterate (works for both storage types)
ds = dataset_loader.to_dataset(str(blob_dataset_uri), DemoSample)
for batch in ds.ordered():
print(f"Sample id={batch.id}, text={batch.text}")Complete Publishing Workflow
Here’s the end-to-end workflow for publishing a dataset to the atmosphere:
- Define your sample type using
@packable - Create samples and write to tar (same as local workflow)
- Authenticate with your ATProto identity
- Create index with blob storage (
AtmosphereIndex+PDSBlobStore) - Publish schema (creates ATProto record)
- Insert dataset (uploads blobs, creates dataset record)
Notice how similar this is to the local workflow—the same sample types and patterns, just with a different storage backend.
This example shows the recommended workflow using PDSBlobStore for fully decentralized storage:
# 1. Define and create samples
@atdata.packable
class FeatureSample:
features: NDArray
label: int
source: str
samples = [
FeatureSample(
features=np.random.randn(128).astype(np.float32),
label=i % 10,
source="synthetic",
)
for i in range(1000)
]
# 2. Write to tar
with wds.writer.TarWriter("features.tar") as sink:
for i, s in enumerate(samples):
sink.write({**s.as_wds, "__key__": f"{i:06d}"})
# 3. Authenticate and create index with blob storage
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)
# 4. Publish schema
schema_uri = index.publish_schema(
FeatureSample,
version="1.0.0",
description="Feature vectors with labels",
)
# 5. Publish dataset (shards uploaded as blobs automatically)
dataset = atdata.Dataset[FeatureSample]("features.tar")
entry = index.insert_dataset(
dataset,
name="synthetic-features-v1",
schema_ref=schema_uri,
tags=["features", "synthetic"],
)
print(f"Published: {entry.uri}")
print(f"Data stored at: {entry.data_urls}") # at://did/blob/cid URLs
# 6. Later: load from blobs
source = store.create_source(entry.data_urls)
ds = atdata.Dataset[FeatureSample](source)
for batch in ds.ordered(batch_size=32):
print(f"Loaded batch with {len(batch.label)} samples")
breakWhat You’ve Learned
You now understand federated dataset publishing in atdata:
| Concept | Purpose |
|---|---|
AtmosphereClient |
ATProto authentication and record management |
AtmosphereIndex |
Federated index implementing AbstractIndex |
PDSBlobStore |
PDS blob storage implementing AbstractDataStore |
BlobSource |
Stream datasets from PDS blobs |
| AT URIs | Universal identifiers for schemas and datasets |
The protocol abstractions (AbstractIndex, AbstractDataStore, DataSource) ensure your code works across all three layers of atdata—local files, team storage, and federated sharing.
The Full Picture
You’ve now seen atdata’s complete architecture:
Local Development Team Storage Federation
───────────────── ──────────── ──────────
tar files Redis + S3 ATProto PDS
Dataset[T] LocalIndex AtmosphereIndex
S3DataStore PDSBlobStore
The same @packable sample types, the same Dataset[T] iteration patterns, and the same lens transformations work at every layer. Only the storage backend changes.
Next Steps
The Promotion Workflow tutorial shows how to migrate existing datasets from local storage to the atmosphere without re-processing your data.
- Promotion Workflow - Migrate from local storage to atmosphere
- Atmosphere Reference - Complete API reference
- Protocols - Abstract interfaces