from atdata.atmosphere import AtmosphereClient
client = AtmosphereClient()
# Login with app-specific password (not your main password!)
client.login("alice.bsky.social", "app-password")
print(client.did) # 'did:plc:...'
print(client.handle) # 'alice.bsky.social'Atmosphere (ATProto Integration)
The atmosphere module enables publishing and discovering datasets on the ATProto network, creating a federated ecosystem for typed datasets.
Installation
pip install atdata[atmosphere]
# or
pip install atprotoOverview
ATProto integration publishes datasets, schemas, and lenses as records in the ac.foundation.dataset.* namespace. This enables:
- Discovery through the ATProto network
- Federation across different hosts
- Verifiability through content-addressable records
AtmosphereClient
The client handles authentication and record operations:
Always use an app-specific password, not your main Bluesky password. Create app passwords at bsky.app/settings/app-passwords.
Session Management
Save and restore sessions to avoid re-authentication:
# Export session for later
session_string = client.export_session()
# Later: restore session
new_client = AtmosphereClient()
new_client.login_with_session(session_string)Custom PDS
Connect to a custom PDS instead of bsky.social:
client = AtmosphereClient(base_url="https://pds.example.com")PDSBlobStore
Store dataset shards as ATProto blobs for fully decentralized storage:
from atdata.atmosphere import AtmosphereClient, PDSBlobStore
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
store = PDSBlobStore(client)
# Write shards as blobs
urls = store.write_shards(dataset, prefix="my-data/v1")
# Returns: ['at://did:plc:.../blob/bafyrei...', ...]
# Transform AT URIs to HTTP URLs for reading
http_url = store.read_url(urls[0])
# Returns: 'https://pds.example.com/xrpc/com.atproto.sync.getBlob?...'
# Create a BlobSource for streaming
source = store.create_source(urls)
ds = atdata.Dataset[MySample](source)Size Limits
PDS blobs typically have size limits (often 50MB-5GB depending on the PDS). Use maxcount and maxsize parameters to control shard sizes:
urls = store.write_shards(
dataset,
prefix="large-data/v1",
maxcount=5000, # Max 5000 samples per shard
maxsize=50e6, # Max 50MB per shard
)BlobSource
Read datasets stored as PDS blobs:
from atdata import BlobSource
# From blob references
source = BlobSource.from_refs([
{"did": "did:plc:abc123", "cid": "bafyrei111"},
{"did": "did:plc:abc123", "cid": "bafyrei222"},
])
# Or from PDSBlobStore
source = store.create_source(urls)
# Use with Dataset
ds = atdata.Dataset[MySample](source)
for batch in ds.ordered(batch_size=32):
process(batch)AtmosphereIndex
The unified interface for ATProto operations, implementing the AbstractIndex protocol:
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex, PDSBlobStore
client = AtmosphereClient()
client.login("handle.bsky.social", "app-password")
# Without blob storage (use external URLs)
index = AtmosphereIndex(client)
# With PDS blob storage (recommended for full decentralization)
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)Publishing Schemas
import atdata
from numpy.typing import NDArray
@atdata.packable
class ImageSample:
image: NDArray
label: str
confidence: float
# Publish schema
schema_uri = index.publish_schema(
ImageSample,
version="1.0.0",
description="Image classification sample",
)
# Returns: "at://did:plc:.../ac.foundation.dataset.sampleSchema/..."Publishing Datasets
dataset = atdata.Dataset[ImageSample]("data-{000000..000009}.tar")
entry = index.insert_dataset(
dataset,
name="imagenet-subset",
schema_ref=schema_uri, # Optional - auto-publishes if omitted
description="ImageNet subset",
tags=["images", "classification"],
license="MIT",
)
print(entry.uri) # AT URI of the record
print(entry.data_urls) # WebDataset URLsListing and Retrieving
# List your datasets
for entry in index.list_datasets():
print(f"{entry.name}: {entry.schema_ref}")
# List from another user
for entry in index.list_datasets(repo="did:plc:other-user"):
print(entry.name)
# Get specific dataset
entry = index.get_dataset("at://did:plc:.../ac.foundation.dataset.record/...")
# List schemas
for schema in index.list_schemas():
print(f"{schema['name']} v{schema['version']}")
# Decode schema to Python type
SampleType = index.decode_schema(schema_uri)Lower-Level Publishers
For more control, use the individual publisher classes:
SchemaPublisher
from atdata.atmosphere import SchemaPublisher
publisher = SchemaPublisher(client)
uri = publisher.publish(
ImageSample,
name="ImageSample",
version="1.0.0",
description="Image with label",
metadata={"source": "training"},
)DatasetPublisher
from atdata.atmosphere import DatasetPublisher
publisher = DatasetPublisher(client)
uri = publisher.publish(
dataset,
name="training-images",
schema_uri=schema_uri, # Required if auto_publish_schema=False
auto_publish_schema=True, # Publish schema automatically
description="Training images",
tags=["training", "images"],
license="MIT",
)Blob Storage
There are two approaches to storing data as ATProto blobs:
Approach 1: PDSBlobStore (Recommended)
Use PDSBlobStore with AtmosphereIndex for automatic shard management:
from atdata.atmosphere import PDSBlobStore, AtmosphereIndex
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)
# Dataset shards are automatically uploaded as blobs
entry = index.insert_dataset(
dataset,
name="my-dataset",
schema_ref=schema_uri,
)
# Later: load using BlobSource
source = store.create_source(entry.data_urls)
ds = atdata.Dataset[MySample](source)Approach 2: Manual Blob Publishing
For more control, use DatasetPublisher.publish_with_blobs() directly:
import io
import webdataset as wds
# Create tar data in memory
tar_buffer = io.BytesIO()
with wds.writer.TarWriter(tar_buffer) as sink:
for i, sample in enumerate(samples):
sink.write({**sample.as_wds, "__key__": f"{i:06d}"})
# Publish with blob storage
uri = publisher.publish_with_blobs(
blobs=[tar_buffer.getvalue()],
schema_uri=schema_uri,
name="small-dataset",
description="Dataset stored in ATProto blobs",
tags=["small", "demo"],
)Loading Blob-Stored Datasets
from atdata.atmosphere import DatasetLoader
from atdata import BlobSource
loader = DatasetLoader(client)
# Check storage type
storage_type = loader.get_storage_type(uri) # "external" or "blobs"
if storage_type == "blobs":
# Get blob URLs and create BlobSource
blob_urls = loader.get_blob_urls(uri)
# Parse to blob refs for BlobSource
# Or use loader.to_dataset() which handles this automatically
# to_dataset() handles both storage types automatically
dataset = loader.to_dataset(uri, MySample)
for batch in dataset.ordered(batch_size=32):
process(batch)LensPublisher
from atdata.atmosphere import LensPublisher
publisher = LensPublisher(client)
# With code references
uri = publisher.publish(
name="simplify",
source_schema=full_schema_uri,
target_schema=simple_schema_uri,
description="Extract label only",
getter_code={
"repository": "https://github.com/org/repo",
"commit": "abc123def...",
"path": "transforms/simplify.py:simplify_getter",
},
putter_code={
"repository": "https://github.com/org/repo",
"commit": "abc123def...",
"path": "transforms/simplify.py:simplify_putter",
},
)
# Or publish from a Lens object
from atdata.lens import lens
@lens
def simplify(src: FullSample) -> SimpleSample:
return SimpleSample(label=src.label)
uri = publisher.publish_from_lens(
simplify,
source_schema=full_schema_uri,
target_schema=simple_schema_uri,
)Lower-Level Loaders
For direct access to records, use the loader classes:
SchemaLoader
from atdata.atmosphere import SchemaLoader
loader = SchemaLoader(client)
# Get a specific schema
schema = loader.get("at://did:plc:abc/ac.foundation.dataset.sampleSchema/xyz")
print(schema["name"], schema["version"])
# List all schemas from a repository
for schema in loader.list_all(repo="did:plc:other-user"):
print(schema["name"])DatasetLoader
from atdata.atmosphere import DatasetLoader
loader = DatasetLoader(client)
# Get a specific dataset record
record = loader.get("at://did:plc:abc/ac.foundation.dataset.record/xyz")
# Check storage type
storage_type = loader.get_storage_type(uri) # "external" or "blobs"
# Get URLs based on storage type
if storage_type == "external":
urls = loader.get_urls(uri)
else:
urls = loader.get_blob_urls(uri)
# Get metadata
metadata = loader.get_metadata(uri)
# Create a Dataset object directly
dataset = loader.to_dataset(uri, MySampleType)
for batch in dataset.ordered(batch_size=32):
process(batch)LensLoader
from atdata.atmosphere import LensLoader
loader = LensLoader(client)
# Get a specific lens record
lens = loader.get("at://did:plc:abc/ac.foundation.dataset.lens/xyz")
print(lens["name"])
print(lens["sourceSchema"], "->", lens["targetSchema"])
# List all lenses from a repository
for lens in loader.list_all():
print(lens["name"])
# Find lenses by schema
lenses = loader.find_by_schemas(
source_schema_uri="at://did:plc:abc/ac.foundation.dataset.sampleSchema/source",
target_schema_uri="at://did:plc:abc/ac.foundation.dataset.sampleSchema/target",
)AT URIs
ATProto records are identified by AT URIs:
from atdata.atmosphere import AtUri
# Parse an AT URI
uri = AtUri.parse("at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz")
print(uri.authority) # 'did:plc:abc123'
print(uri.collection) # 'ac.foundation.dataset.sampleSchema'
print(uri.rkey) # 'xyz'
# Format back to string
print(str(uri)) # 'at://did:plc:abc123/ac.foundation.dataset.sampleSchema/xyz'Supported Field Types
Schemas support these field types:
| Python Type | ATProto Type |
|---|---|
str |
primitive/str |
int |
primitive/int |
float |
primitive/float |
bool |
primitive/bool |
bytes |
primitive/bytes |
NDArray |
ndarray (default dtype: float32) |
NDArray[np.float64] |
ndarray (dtype: float64) |
list[str] |
array with items |
T \| None |
Optional field |
Complete Example
This example shows the full workflow using PDSBlobStore for decentralized storage:
import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex, PDSBlobStore
import webdataset as wds
# 1. Define and create samples
@atdata.packable
class FeatureSample:
features: NDArray
label: int
source: str
samples = [
FeatureSample(
features=np.random.randn(128).astype(np.float32),
label=i % 10,
source="synthetic",
)
for i in range(1000)
]
# 2. Write to tar
with wds.writer.TarWriter("features.tar") as sink:
for i, s in enumerate(samples):
sink.write({**s.as_wds, "__key__": f"{i:06d}"})
# 3. Authenticate and set up blob storage
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")
store = PDSBlobStore(client)
index = AtmosphereIndex(client, data_store=store)
# 4. Publish schema
schema_uri = index.publish_schema(
FeatureSample,
version="1.0.0",
description="Feature vectors with labels",
)
# 5. Publish dataset (shards uploaded as blobs)
dataset = atdata.Dataset[FeatureSample]("features.tar")
entry = index.insert_dataset(
dataset,
name="synthetic-features-v1",
schema_ref=schema_uri,
tags=["features", "synthetic"],
)
print(f"Published: {entry.uri}")
print(f"Blob URLs: {entry.data_urls}")
# 6. Later: discover and load from blobs
for dataset_entry in index.list_datasets():
print(f"Found: {dataset_entry.name}")
# Reconstruct type from schema
SampleType = index.decode_schema(dataset_entry.schema_ref)
# Create source from blob URLs
source = store.create_source(dataset_entry.data_urls)
# Load dataset from blobs
ds = atdata.Dataset[SampleType](source)
for batch in ds.ordered(batch_size=32):
print(batch.features.shape)
breakFor external URL storage (without PDSBlobStore):
# Use AtmosphereIndex without data_store
index = AtmosphereIndex(client)
# Dataset URLs will be stored as-is (external references)
entry = index.insert_dataset(
dataset,
name="external-features",
schema_ref=schema_uri,
)
# Load using standard URL source
ds = atdata.Dataset[FeatureSample](entry.data_urls[0])