import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, LocalDatasetEntry, S3DataStore
from atdata.atmosphere import AtmosphereClient
from atdata.promote import promote_to_atmosphere
import webdataset as wdsPromotion Workflow
This tutorial demonstrates the workflow for migrating datasets from local Redis/S3 storage to the federated ATProto atmosphere network. Promotion is the bridge between Layer 2 (team storage) and Layer 3 (federation).
Why Promotion?
A common pattern in data science:
- Start private: Develop and validate datasets within your team
- Go public: Share successful datasets with the broader community
Promotion handles this transition without re-processing your data. Instead of creating a new dataset from scratch, you’re lifting an existing local dataset entry into the federated atmosphere.
The workflow handles several complexities automatically:
- Schema deduplication: If you’ve already published the same schema type and version, promotion reuses it
- URL preservation: Data stays in place (unless you explicitly want to copy it)
- CID consistency: Content identifiers remain valid across the transition
Overview
The promotion workflow moves datasets from local storage to the atmosphere:
LOCAL ATMOSPHERE
----- ----------
Redis Index ATProto PDS
S3 Storage --> (same S3 or new location)
local://schemas/... at://did:plc:.../schema/...
Key features:
- Schema deduplication: Won’t republish identical schemas
- Flexible data handling: Keep existing URLs or copy to new storage
- Metadata preservation: Local metadata carries over to atmosphere
Setup
Prepare a Local Dataset
First, set up a dataset in local storage:
# 1. Define sample type
@atdata.packable
class ExperimentSample:
"""A sample from a scientific experiment."""
measurement: NDArray
timestamp: float
sensor_id: str
# 2. Create samples
samples = [
ExperimentSample(
measurement=np.random.randn(64).astype(np.float32),
timestamp=float(i),
sensor_id=f"sensor_{i % 4}",
)
for i in range(1000)
]
# 3. Write to tar
with wds.writer.TarWriter("experiment.tar") as sink:
for i, s in enumerate(samples):
sink.write({**s.as_wds, "__key__": f"{i:06d}"})
# 4. Set up local index with S3 storage
store = S3DataStore(
credentials={
"AWS_ENDPOINT": "http://localhost:9000",
"AWS_ACCESS_KEY_ID": "minioadmin",
"AWS_SECRET_ACCESS_KEY": "minioadmin",
},
bucket="datasets-bucket",
)
local_index = LocalIndex(data_store=store)
# 5. Insert dataset into index
dataset = atdata.Dataset[ExperimentSample]("experiment.tar")
local_entry = local_index.insert_dataset(dataset, name="experiment-2024-001", prefix="experiments")
# 6. Publish schema to local index
local_index.publish_schema(ExperimentSample, version="1.0.0")
print(f"Local entry name: {local_entry.name}")
print(f"Local entry CID: {local_entry.cid}")
print(f"Data URLs: {local_entry.data_urls}")Basic Promotion
Promote the dataset to ATProto:
# Connect to atmosphere
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")
# Promote to atmosphere
at_uri = promote_to_atmosphere(local_entry, local_index, client)
print(f"Published: {at_uri}")Promotion with Metadata
Add description, tags, and license:
at_uri = promote_to_atmosphere(
local_entry,
local_index,
client,
name="experiment-2024-001-v2", # Override name
description="Sensor measurements from Lab 302",
tags=["experiment", "physics", "2024"],
license="CC-BY-4.0",
)
print(f"Published with metadata: {at_uri}")Schema Deduplication
The promotion workflow automatically checks for existing schemas:
from atdata.promote import _find_existing_schema
# Check if schema already exists
existing = _find_existing_schema(client, "ExperimentSample", "1.0.0")
if existing:
print(f"Found existing schema: {existing}")
print("Will reuse instead of republishing")
else:
print("No existing schema found, will publish new one")When you promote multiple datasets with the same sample type:
# First promotion: publishes schema
uri1 = promote_to_atmosphere(entry1, local_index, client)
# Second promotion with same schema type + version: reuses existing schema
uri2 = promote_to_atmosphere(entry2, local_index, client)Data Migration Options
By default, promotion keeps the original data URLs:
# Data stays in original S3 location
at_uri = promote_to_atmosphere(local_entry, local_index, client)Benefits:
- Fastest option, no data copying
- Dataset record points to existing URLs
- Requires original storage to remain accessible
To copy data to a different storage location:
from atdata.local import S3DataStore
# Create new data store
new_store = S3DataStore(
credentials="new-s3-creds.env",
bucket="public-datasets",
)
# Promote with data copy
at_uri = promote_to_atmosphere(
local_entry,
local_index,
client,
data_store=new_store, # Copy data to new storage
)Benefits:
- Data is copied to new bucket
- Good for moving from private to public storage
- Original storage can be retired
Verify on Atmosphere
After promotion, verify the dataset is accessible:
from atdata.atmosphere import AtmosphereIndex
atm_index = AtmosphereIndex(client)
entry = atm_index.get_dataset(at_uri)
print(f"Name: {entry.name}")
print(f"Schema: {entry.schema_ref}")
print(f"URLs: {entry.data_urls}")
# Load and iterate
SampleType = atm_index.decode_schema(entry.schema_ref)
ds = atdata.Dataset[SampleType](entry.data_urls[0])
for batch in ds.ordered(batch_size=32):
print(f"Measurement shape: {batch.measurement.shape}")
breakError Handling
try:
at_uri = promote_to_atmosphere(local_entry, local_index, client)
except KeyError as e:
# Schema not found in local index
print(f"Missing schema: {e}")
print("Publish schema first: local_index.publish_schema(SampleType)")
except ValueError as e:
# Entry has no data URLs
print(f"Invalid entry: {e}")Requirements Checklist
Before promotion:
Complete Workflow
# Complete local-to-atmosphere workflow
import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, S3DataStore
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex
from atdata.promote import promote_to_atmosphere
import webdataset as wds
# 1. Define sample type
@atdata.packable
class FeatureSample:
features: NDArray
label: int
# 2. Create dataset tar
samples = [
FeatureSample(
features=np.random.randn(128).astype(np.float32),
label=i % 10,
)
for i in range(1000)
]
with wds.writer.TarWriter("features.tar") as sink:
for i, s in enumerate(samples):
sink.write({**s.as_wds, "__key__": f"{i:06d}"})
# 3. Store in local index with S3 backend
store = S3DataStore(credentials="creds.env", bucket="bucket")
local_index = LocalIndex(data_store=store)
dataset = atdata.Dataset[FeatureSample]("features.tar")
local_entry = local_index.insert_dataset(dataset, name="feature-vectors-v1", prefix="features")
# 4. Publish schema locally
local_index.publish_schema(FeatureSample, version="1.0.0")
# 5. Promote to atmosphere
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")
at_uri = promote_to_atmosphere(
local_entry,
local_index,
client,
description="Feature vectors for classification",
tags=["features", "embeddings"],
license="MIT",
)
print(f"Dataset published: {at_uri}")
# 6. Others can now discover and load
# ds = atdata.load_dataset("@myhandle.bsky.social/feature-vectors-v1")What You’ve Learned
You now understand the promotion workflow:
| Concept | Purpose |
|---|---|
promote_to_atmosphere() |
Lift local entries to federated network |
| Schema deduplication | Avoid publishing duplicate schemas |
| Data URL preservation | Keep data in place or copy to new storage |
| Metadata enrichment | Add description, tags, license during promotion |
Promotion completes atdata’s three-layer story: you can now move seamlessly from local experimentation to team collaboration to public sharing, all with the same typed sample definitions.
The Complete Journey
┌──────────────────┐ insert ┌──────────────────┐ promote ┌──────────────────┐
│ Local Files │ ────────────→ │ Team Storage │ ────────────→ │ Federation │
│ │ │ │ │ │
│ tar files │ │ Redis + S3 │ │ ATProto PDS │
│ Dataset[T] │ │ LocalIndex │ │ AtmosphereIndex │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Next Steps
- Atmosphere Reference - Complete atmosphere API
- Protocols - Abstract interfaces
- Local Storage - Local storage reference