Promotion Workflow

Migrate datasets from local storage to ATProto

This tutorial demonstrates the workflow for migrating datasets from local Redis/S3 storage to the federated ATProto atmosphere network. Promotion is the bridge between Layer 2 (team storage) and Layer 3 (federation).

Why Promotion?

A common pattern in data science:

  1. Start private: Develop and validate datasets within your team
  2. Go public: Share successful datasets with the broader community

Promotion handles this transition without re-processing your data. Instead of creating a new dataset from scratch, you’re lifting an existing local dataset entry into the federated atmosphere.

The workflow handles several complexities automatically:

  • Schema deduplication: If you’ve already published the same schema type and version, promotion reuses it
  • URL preservation: Data stays in place (unless you explicitly want to copy it)
  • CID consistency: Content identifiers remain valid across the transition

Overview

The promotion workflow moves datasets from local storage to the atmosphere:

LOCAL                           ATMOSPHERE
-----                           ----------
Redis Index                     ATProto PDS
S3 Storage            -->       (same S3 or new location)
local://schemas/...             at://did:plc:.../schema/...

Key features:

  • Schema deduplication: Won’t republish identical schemas
  • Flexible data handling: Keep existing URLs or copy to new storage
  • Metadata preservation: Local metadata carries over to atmosphere

Setup

import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, LocalDatasetEntry, S3DataStore
from atdata.atmosphere import AtmosphereClient
from atdata.promote import promote_to_atmosphere
import webdataset as wds

Prepare a Local Dataset

First, set up a dataset in local storage:

# 1. Define sample type
@atdata.packable
class ExperimentSample:
    """A sample from a scientific experiment."""
    measurement: NDArray
    timestamp: float
    sensor_id: str

# 2. Create samples
samples = [
    ExperimentSample(
        measurement=np.random.randn(64).astype(np.float32),
        timestamp=float(i),
        sensor_id=f"sensor_{i % 4}",
    )
    for i in range(1000)
]

# 3. Write to tar
with wds.writer.TarWriter("experiment.tar") as sink:
    for i, s in enumerate(samples):
        sink.write({**s.as_wds, "__key__": f"{i:06d}"})

# 4. Set up local index with S3 storage
store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="datasets-bucket",
)
local_index = LocalIndex(data_store=store)

# 5. Insert dataset into index
dataset = atdata.Dataset[ExperimentSample]("experiment.tar")
local_entry = local_index.insert_dataset(dataset, name="experiment-2024-001", prefix="experiments")

# 6. Publish schema to local index
local_index.publish_schema(ExperimentSample, version="1.0.0")

print(f"Local entry name: {local_entry.name}")
print(f"Local entry CID: {local_entry.cid}")
print(f"Data URLs: {local_entry.data_urls}")

Basic Promotion

Promote the dataset to ATProto:

# Connect to atmosphere
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")

# Promote to atmosphere
at_uri = promote_to_atmosphere(local_entry, local_index, client)
print(f"Published: {at_uri}")

Promotion with Metadata

Add description, tags, and license:

at_uri = promote_to_atmosphere(
    local_entry,
    local_index,
    client,
    name="experiment-2024-001-v2",   # Override name
    description="Sensor measurements from Lab 302",
    tags=["experiment", "physics", "2024"],
    license="CC-BY-4.0",
)
print(f"Published with metadata: {at_uri}")

Schema Deduplication

The promotion workflow automatically checks for existing schemas:

from atdata.promote import _find_existing_schema

# Check if schema already exists
existing = _find_existing_schema(client, "ExperimentSample", "1.0.0")
if existing:
    print(f"Found existing schema: {existing}")
    print("Will reuse instead of republishing")
else:
    print("No existing schema found, will publish new one")

When you promote multiple datasets with the same sample type:

# First promotion: publishes schema
uri1 = promote_to_atmosphere(entry1, local_index, client)

# Second promotion with same schema type + version: reuses existing schema
uri2 = promote_to_atmosphere(entry2, local_index, client)

Data Migration Options

By default, promotion keeps the original data URLs:

# Data stays in original S3 location
at_uri = promote_to_atmosphere(local_entry, local_index, client)

Benefits:

  • Fastest option, no data copying
  • Dataset record points to existing URLs
  • Requires original storage to remain accessible

To copy data to a different storage location:

from atdata.local import S3DataStore

# Create new data store
new_store = S3DataStore(
    credentials="new-s3-creds.env",
    bucket="public-datasets",
)

# Promote with data copy
at_uri = promote_to_atmosphere(
    local_entry,
    local_index,
    client,
    data_store=new_store,  # Copy data to new storage
)

Benefits:

  • Data is copied to new bucket
  • Good for moving from private to public storage
  • Original storage can be retired

Verify on Atmosphere

After promotion, verify the dataset is accessible:

from atdata.atmosphere import AtmosphereIndex

atm_index = AtmosphereIndex(client)
entry = atm_index.get_dataset(at_uri)

print(f"Name: {entry.name}")
print(f"Schema: {entry.schema_ref}")
print(f"URLs: {entry.data_urls}")

# Load and iterate
SampleType = atm_index.decode_schema(entry.schema_ref)
ds = atdata.Dataset[SampleType](entry.data_urls[0])

for batch in ds.ordered(batch_size=32):
    print(f"Measurement shape: {batch.measurement.shape}")
    break

Error Handling

try:
    at_uri = promote_to_atmosphere(local_entry, local_index, client)
except KeyError as e:
    # Schema not found in local index
    print(f"Missing schema: {e}")
    print("Publish schema first: local_index.publish_schema(SampleType)")
except ValueError as e:
    # Entry has no data URLs
    print(f"Invalid entry: {e}")

Requirements Checklist

Before promotion:

Complete Workflow

# Complete local-to-atmosphere workflow
import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, S3DataStore
from atdata.atmosphere import AtmosphereClient, AtmosphereIndex
from atdata.promote import promote_to_atmosphere
import webdataset as wds

# 1. Define sample type
@atdata.packable
class FeatureSample:
    features: NDArray
    label: int

# 2. Create dataset tar
samples = [
    FeatureSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10,
    )
    for i in range(1000)
]

with wds.writer.TarWriter("features.tar") as sink:
    for i, s in enumerate(samples):
        sink.write({**s.as_wds, "__key__": f"{i:06d}"})

# 3. Store in local index with S3 backend
store = S3DataStore(credentials="creds.env", bucket="bucket")
local_index = LocalIndex(data_store=store)

dataset = atdata.Dataset[FeatureSample]("features.tar")
local_entry = local_index.insert_dataset(dataset, name="feature-vectors-v1", prefix="features")

# 4. Publish schema locally
local_index.publish_schema(FeatureSample, version="1.0.0")

# 5. Promote to atmosphere
client = AtmosphereClient()
client.login("myhandle.bsky.social", "app-password")

at_uri = promote_to_atmosphere(
    local_entry,
    local_index,
    client,
    description="Feature vectors for classification",
    tags=["features", "embeddings"],
    license="MIT",
)

print(f"Dataset published: {at_uri}")

# 6. Others can now discover and load
# ds = atdata.load_dataset("@myhandle.bsky.social/feature-vectors-v1")

What You’ve Learned

You now understand the promotion workflow:

Concept Purpose
promote_to_atmosphere() Lift local entries to federated network
Schema deduplication Avoid publishing duplicate schemas
Data URL preservation Keep data in place or copy to new storage
Metadata enrichment Add description, tags, license during promotion

Promotion completes atdata’s three-layer story: you can now move seamlessly from local experimentation to team collaboration to public sharing, all with the same typed sample definitions.

The Complete Journey

┌──────────────────┐     insert     ┌──────────────────┐    promote    ┌──────────────────┐
│  Local Files     │ ────────────→  │  Team Storage    │ ────────────→ │  Federation      │
│                  │                │                  │               │                  │
│  tar files       │                │  Redis + S3      │               │  ATProto PDS     │
│  Dataset[T]      │                │  LocalIndex      │               │  AtmosphereIndex │
└──────────────────┘                └──────────────────┘               └──────────────────┘

Next Steps