Local Workflow

Store and manage datasets with Redis + S3

This tutorial demonstrates how to use the local storage module to store and index datasets using Redis and S3-compatible storage. This is Layer 2 of atdata’s architecture—team-scale storage that bridges local development and federated sharing.

Why Team Storage?

Local tar files work well for individual experiments, but teams need:

  • Discovery: “What datasets do we have? What schema does this one use?”
  • Consistency: “Is everyone using the same version of this dataset?”
  • Durability: “Where’s the canonical copy of our training data?”

atdata’s local storage module addresses these needs with a two-component architecture:

Component Purpose
Redis Index Fast metadata queries, schema registry, dataset discovery
S3 DataStore Scalable object storage for actual data files

This separation means metadata operations (listing datasets, resolving schemas) are fast and don’t touch large data files, while the data itself lives in battle-tested object storage.

Prerequisites

  • Redis server running (default: localhost:6379)
  • S3-compatible storage (MinIO, AWS S3, etc.)
Tip

For local development, you can use MinIO:

docker run -p 9000:9000 minio/minio server /data

Setup

import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, LocalDatasetEntry, S3DataStore
import webdataset as wds

Define Sample Types

@atdata.packable
class TrainingSample:
    """A sample containing features and label for training."""
    features: NDArray
    label: int

@atdata.packable
class TextSample:
    """A sample containing text data."""
    text: str
    category: str

LocalDatasetEntry

Every dataset in the index is represented by a LocalDatasetEntry. A key design decision: entries use content-addressable CIDs (Content Identifiers) as their identity. This means:

  • Identical content always has the same CID
  • You can verify data integrity by checking the CID
  • Deduplication happens automatically

CIDs are computed from the entry’s schema reference and data URLs, so the same logical dataset will have the same CID regardless of where it’s stored.

Create entries with content-addressable CIDs:

# Create an entry manually
entry = LocalDatasetEntry(
    _name="my-dataset",
    _schema_ref="local://schemas/examples.TrainingSample@1.0.0",
    _data_urls=["s3://bucket/data-000000.tar", "s3://bucket/data-000001.tar"],
    _metadata={"source": "example", "samples": 10000},
)

print(f"Entry name: {entry.name}")
print(f"Schema ref: {entry.schema_ref}")
print(f"Data URLs: {entry.data_urls}")
print(f"Metadata: {entry.metadata}")
print(f"CID: {entry.cid}")
Note

CIDs are generated from content (schema_ref + data_urls), so identical data produces identical CIDs.

LocalIndex

The LocalIndex is your team’s dataset registry. It implements the AbstractIndex protocol, meaning code written against LocalIndex will also work with AtmosphereIndex when you’re ready for federated sharing.

The index tracks datasets in Redis:

from redis import Redis

# Connect to Redis
redis = Redis(host="localhost", port=6379)
index = LocalIndex(redis=redis)

print("LocalIndex connected")

Schema Management

Schema publishing is how you ensure type consistency across your team. When you publish a schema, atdata stores the complete type definition (field names, types, metadata) so anyone can reconstruct the Python class from just the schema reference.

This enables a powerful workflow: share a dataset by sharing its name, and consumers can dynamically reconstruct the sample type without having the original Python code.

# Publish a schema
schema_ref = index.publish_schema(TrainingSample, version="1.0.0")
print(f"Published schema: {schema_ref}")

# List all schemas
for schema in index.list_schemas():
    print(f"  - {schema.get('name', 'Unknown')} v{schema.get('version', '?')}")

# Get schema record
schema_record = index.get_schema(schema_ref)
print(f"Schema fields: {[f['name'] for f in schema_record.get('fields', [])]}")

# Decode schema back to a PackableSample class
decoded_type = index.decode_schema(schema_ref)
print(f"Decoded type: {decoded_type.__name__}")

S3DataStore

The S3DataStore implements the AbstractDataStore protocol for S3-compatible object storage. It works with:

  • AWS S3: Production-scale cloud storage
  • MinIO: Self-hosted S3-compatible storage (great for development)
  • Cloudflare R2: Cost-effective S3-compatible storage

The data store handles uploading tar shards and creating signed URLs for streaming access.

For direct S3 operations:

creds = {
    "AWS_ENDPOINT": "http://localhost:9000",
    "AWS_ACCESS_KEY_ID": "minioadmin",
    "AWS_SECRET_ACCESS_KEY": "minioadmin",
}

store = S3DataStore(creds, bucket="my-bucket")

print(f"Bucket: {store.bucket}")
print(f"Supports streaming: {store.supports_streaming()}")

Complete Index Workflow

Here’s the typical workflow for publishing a dataset to your team:

  1. Create samples using your @packable type
  2. Write to local tar for staging
  3. Create a Dataset wrapper
  4. Connect to index with data store
  5. Publish schema for type consistency
  6. Insert dataset (uploads to S3, indexes in Redis)

The index composition pattern (LocalIndex(data_store=S3DataStore(...))) is deliberate—it separates the concern of “where is metadata?” from “where is data?”, making it easy to swap storage backends.

Use LocalIndex with S3DataStore to store datasets with S3 storage and Redis indexing:

# 1. Create sample data
samples = [
    TrainingSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10
    )
    for i in range(1000)
]
print(f"Created {len(samples)} training samples")

# 2. Write to local tar file
with wds.writer.TarWriter("local-data-000000.tar") as sink:
    for i, sample in enumerate(samples):
        sink.write({**sample.as_wds, "__key__": f"sample_{i:06d}"})
print("Wrote samples to local tar file")

# 3. Create Dataset
ds = atdata.Dataset[TrainingSample]("local-data-000000.tar")

# 4. Set up index with S3 data store and insert
store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="my-bucket",
)
index = LocalIndex(redis=redis, data_store=store)

# Publish schema and insert dataset
index.publish_schema(TrainingSample, version="1.0.0")
entry = index.insert_dataset(ds, name="training-v1", prefix="datasets")
print(f"Stored at: {entry.data_urls}")
print(f"CID: {entry.cid}")

# 5. Retrieve later
retrieved_entry = index.get_entry_by_name("training-v1")
dataset = atdata.Dataset[TrainingSample](retrieved_entry.data_urls[0])

for batch in dataset.ordered(batch_size=32):
    print(f"Batch features shape: {batch.features.shape}")
    break

Using load_dataset with Index

The load_dataset() function provides a HuggingFace-style API that abstracts away the details of where data lives. When you pass an index, it can resolve @local/ prefixed paths to the actual data URLs and apply the correct credentials automatically.

The load_dataset() function supports index lookup:

from atdata import load_dataset

# Load from local index
ds = load_dataset("@local/my-dataset", index=index, split="train")

# The index resolves the dataset name to URLs and schema
for batch in ds.shuffled(batch_size=32):
    process(batch)
    break

What You’ve Learned

You now understand team-scale storage in atdata:

Concept Purpose
LocalIndex Redis-backed dataset registry implementing AbstractIndex
S3DataStore S3-compatible object storage implementing AbstractDataStore
LocalDatasetEntry Content-addressed dataset entries with CIDs
Schema publishing Shared type definitions for team consistency

The same sample types you defined in the Quick Start work seamlessly here—the only change is where the data lives.

Next Steps

Ready for Public Sharing?

The Atmosphere Publishing tutorial shows how to publish datasets to the ATProto network for decentralized, cross-organization discovery.