Local Storage

Redis + S3 backend for dataset management

The local storage module provides a Redis + S3 backend for storing and managing datasets before publishing to the ATProto federation.

Overview

Local storage uses:

  • Redis for indexing and tracking dataset metadata
  • S3-compatible storage for dataset tar files

This enables development and small-scale deployment before promoting to the full ATProto infrastructure.

LocalIndex

The index tracks datasets in Redis:

from atdata.local import LocalIndex

# Default connection (localhost:6379)
index = LocalIndex()

# Custom Redis connection
import redis
r = redis.Redis(host='custom-host', port=6379)
index = LocalIndex(redis=r)

# With connection kwargs
index = LocalIndex(host='custom-host', port=6379, db=1)

Adding Entries

import atdata
from numpy.typing import NDArray

@atdata.packable
class ImageSample:
    image: NDArray
    label: str

dataset = atdata.Dataset[ImageSample]("data-{000000..000009}.tar")

entry = index.add_entry(
    dataset,
    name="my-dataset",
    schema_ref="atdata://local/sampleSchema/ImageSample@1.0.0",  # optional
    metadata={"description": "Training images"},              # optional
)

print(entry.cid)        # Content identifier
print(entry.name)       # "my-dataset"
print(entry.data_urls)  # ["data-{000000..000009}.tar"]

Listing and Retrieving

# Iterate all entries
for entry in index.entries:
    print(f"{entry.name}: {entry.cid}")

# Get as list
all_entries = index.all_entries

# Get by name
entry = index.get_entry_by_name("my-dataset")

# Get by CID
entry = index.get_entry("bafyrei...")

Repo (Deprecated)

Warning

Repo is deprecated. Use LocalIndex with S3DataStore instead for new code.

The Repo class combines S3 storage with Redis indexing:

from atdata.local import Repo

# From credentials file
repo = Repo(
    s3_credentials="path/to/.env",
    hive_path="my-bucket/datasets",
)

# From credentials dict
repo = Repo(
    s3_credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    hive_path="my-bucket/datasets",
)

Preferred approach - Use LocalIndex with S3DataStore:

from atdata.local import LocalIndex, S3DataStore

store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="my-bucket",
)
index = LocalIndex(data_store=store)

# Insert dataset
entry = index.insert_dataset(dataset, name="my-dataset", prefix="datasets/v1")

Credentials File Format

The .env file should contain:

AWS_ENDPOINT=http://localhost:9000
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
Note

For AWS S3, omit AWS_ENDPOINT to use the default endpoint.

Inserting Datasets

import webdataset as wds
import numpy as np

# Create dataset from samples
samples = [ImageSample(
    image=np.random.rand(224, 224, 3).astype(np.float32),
    label=f"sample_{i}"
) for i in range(1000)]

with wds.writer.TarWriter("temp.tar") as sink:
    for i, s in enumerate(samples):
        sink.write({**s.as_wds, "__key__": f"{i:06d}"})

dataset = atdata.Dataset[ImageSample]("temp.tar")

# Insert into repo (writes to S3 + indexes in Redis)
entry, stored_dataset = repo.insert(
    dataset,
    name="training-images-v1",
    cache_local=False,  # Stream directly to S3
)

print(entry.cid)                # Content identifier
print(stored_dataset.url)       # S3 URL for the stored data
print(stored_dataset.shard_list)  # Individual shard URLs

Insert Options

entry, ds = repo.insert(
    dataset,
    name="my-dataset",
    cache_local=True,   # Write locally first, then copy (faster for some workloads)
    maxcount=10000,     # Samples per shard
    maxsize=100_000_000,  # Max shard size in bytes
)

LocalDatasetEntry

Index entries provide content-addressable identification:

entry = index.get_entry_by_name("my-dataset")

# Core properties (IndexEntry protocol)
entry.name        # Human-readable name
entry.schema_ref  # Schema reference
entry.data_urls   # WebDataset URLs
entry.metadata    # Arbitrary metadata dict or None

# Content addressing
entry.cid         # ATProto-compatible CID (content identifier)

# Legacy compatibility
entry.wds_url     # First data URL
entry.sample_kind # Same as schema_ref
Tip

The CID is generated from the entry’s content (schema_ref + data_urls), ensuring identical data produces identical CIDs whether stored locally or in the atmosphere.

Schema Storage

Schemas can be stored and retrieved from the index:

# Publish a schema
schema_ref = index.publish_schema(
    ImageSample,
    version="1.0.0",
    description="Image with label annotation",
)
# Returns: "atdata://local/sampleSchema/ImageSample@1.0.0"

# Retrieve schema record
schema = index.get_schema(schema_ref)
# {
#     "name": "ImageSample",
#     "version": "1.0.0",
#     "fields": [...],
#     "description": "...",
#     "createdAt": "...",
# }

# List all schemas
for schema in index.list_schemas():
    print(f"{schema['name']}@{schema['version']}")

# Reconstruct sample type from schema
SampleType = index.decode_schema(schema_ref)
dataset = atdata.Dataset[SampleType](entry.data_urls[0])

S3DataStore

For direct S3 operations without Redis indexing:

from atdata.local import S3DataStore

store = S3DataStore(
    credentials="path/to/.env",
    bucket="my-bucket",
)

# Write dataset shards
urls = store.write_shards(
    dataset,
    prefix="datasets/v1",
    maxcount=10000,
)
# Returns: ["s3://my-bucket/datasets/v1/data--uuid--000000.tar", ...]

# Check capabilities
store.supports_streaming()  # True

Complete Workflow Example

import numpy as np
from numpy.typing import NDArray
import atdata
from atdata.local import LocalIndex, S3DataStore
import webdataset as wds

# 1. Define sample type
@atdata.packable
class TrainingSample:
    features: NDArray
    label: int
    source: str

# 2. Create samples
samples = [
    TrainingSample(
        features=np.random.randn(128).astype(np.float32),
        label=i % 10,
        source="synthetic",
    )
    for i in range(10000)
]

# 3. Write to local tar
with wds.writer.TarWriter("local-data.tar") as sink:
    for i, sample in enumerate(samples):
        sink.write({**sample.as_wds, "__key__": f"{i:06d}"})

# 4. Set up index with S3 data store and insert
store = S3DataStore(
    credentials={
        "AWS_ENDPOINT": "http://localhost:9000",
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
    },
    bucket="datasets-bucket",
)
index = LocalIndex(data_store=store)

# Publish schema and insert dataset
index.publish_schema(TrainingSample, version="1.0.0")
local_ds = atdata.Dataset[TrainingSample]("local-data.tar")
entry = index.insert_dataset(local_ds, name="training-v1", prefix="training")

# 5. Retrieve later
entry = index.get_entry_by_name("training-v1")
dataset = atdata.Dataset[TrainingSample](entry.data_urls[0])

for batch in dataset.ordered(batch_size=32):
    print(batch.features.shape)  # (32, 128)