Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ classifiers = [
license = "MIT"
license-files = ["LICENSE.md"]
dependencies = [
"httpx",
"aioboto3"
"aiohttp",
"aioboto3",
"ipld_car",
"multiformats",
]

[project.optional-dependencies]
Expand Down
87 changes: 87 additions & 0 deletions src/py_s3_storacha/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,90 @@
"""A tool for migrating objects from AWS S3 to Storacha"""

from dataclasses import dataclass
from typing import NamedTuple
import io

import ipld_car
from multiformats import multihash, CID

from py_s3_storacha.managers.conn import ConnectionManager
from py_s3_storacha.storacha import StorachaClient

__version__ = "0.0.1"


class S3Config(NamedTuple):
bucket_name: str
region: str
access_key_id: str
secret_access_key: str


class StorachaConfig(NamedTuple):
space_did: str
auth_secret: str
authorization_key: str


class MigratorConfig(NamedTuple):
s3: S3Config
storacha: StorachaConfig


@dataclass
class Migrator:
config: MigratorConfig
_conn: ConnectionManager = None # pyright: ignore[reportAssignmentType]
_bridge: StorachaClient = None # pyright: ignore[reportAssignmentType]

def __post_init__(self):
# register useful callbacks
...

@property
def conn(self) -> ConnectionManager:
# Initialize connection lazily or only on first-use
if self._conn is None:
self._conn = ConnectionManager(config=self.config)
return self._conn

async def initialize(self):
"""
Initialize S3 and Storacha (HTTP bridge) sessions
"""
await self._conn.initialize_conns()

async def _prepare_car(self, key: str) -> tuple[CID, io.BytesIO, int]:
"""
Fetches the S3 object, packages it into a CAR in-memory, and computes its root CID.
Returns (root_cid, car_stream, size_bytes).
"""
# Download from S3
response = await self._conn.s3.get_object(
Bucket=self._conn._config.s3.bucket_name, Key=key
)
size = response.get("ContentLength")
body = await response["Body"].read()

# TODO: make sure len(body) is less than 4gb

# write the car file in-memory
digest = multihash.digest(data=body, hashfun="sha2-256")
file_cid = CID("base32", 1, "raw", digest)
block = (file_cid, body)
car = ipld_car.encode([block[0]], [block])
car_cid = CID("base32", 1, "raw", car)
size = len(car)
return car_cid, io.BytesIO(car), size

async def migrate_file(self, key: str):
"""
Streams an S3 object directly into Storacha via the HTTP bridge.

1. Fetches the object from S3 as an async stream.
2. Creates a UCAN task to obtain status, signed PUT URL, and headers.
3. If status is "upload", streams the data into Storacha without buffering locally.
If status is "done", skips upload.
"""
root_cid, car_stream, size = await self._prepare_car(key)
status, upload_url, upload_headers = await self._bridge.create_store_task()
Empty file.
63 changes: 63 additions & 0 deletions src/py_s3_storacha/managers/conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import TYPE_CHECKING

import aioboto3
import aiohttp

from py_s3_storacha.storacha import StorachaClient

if TYPE_CHECKING:
from py_s3_storacha import MigratorConfig


class AsyncConnectionError(Exception):
pass


class ConnectionManager:
def __init__(self, config: MigratorConfig) -> None:
self._config: MigratorConfig = config
self._s3_client = None
self._http_session: aiohttp.ClientSession | None = None

async def initialize_conns(self) -> None:
s3_session = aioboto3.Session()
self._s3_client = await s3_session.client(
service_name="s3",
region_name=self._config.s3.region,
aws_access_key_id=self._config.s3.access_key_id,
aws_secret_access_key=self._config.s3.secret_access_key,
).__aenter__() # enter the client's async context

# http session for storacha http-bridge
self._http_session = aiohttp.ClientSession(
headers={
"X-Auth-Secret": f"{self._config.storacha.auth_secret}",
"Authorization": f"{self._config.storacha.authorization_key}",
}
)

async def close_connections(self):
if self._http_session:
await self._http_session.close()
self._http_session = None

if self._s3_client:
await self._s3_client.__aexit__(None, None, None)
self._s3_client = None
self._s3_session = None

@property
def s3(self):
if not self._s3_client:
raise AsyncConnectionError("S3 client not initialized")
return self._s3_client

@property
def storacha(self) -> StorachaClient:
if not self._http_session:
raise AsyncConnectionError("Storacha HTTP session not initialized")
return StorachaClient(
session=self._http_session,
auth_secret=self._config.storacha.auth_secret,
auth_key=self._config.storacha.authorization_key
)
55 changes: 55 additions & 0 deletions src/py_s3_storacha/storacha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from aiohttp import ClientSession


class StorachaBridgeError(Exception):
"""Raised when the Storacha HTTP bridge returns an unexpected response."""
pass


class StorachaClient:
def __init__(self, session: ClientSession, auth_secret: str, auth_key: str) -> None:
self.session = session
self.auth_secret = auth_secret
self.auth_key = auth_key

async def create_store_task(self, root_cid: str, size: int):
"""
Initiates the UCAN "store/add" task via the HTTP bridge to obtain a signed PUT URL.
"""
payload = {
"tasks": [
[
"store/add",
None,
{ "link": { "/": root_cid }, "size": size }
]
]
}

headers = {
"X-Auth-Secret": self.auth_secret,
"Authorization": self.auth_key,
"Content-Type": "application/json"
}
resp = await self.session.post(
"https://up.storacha.network/bridge",
json=payload,
headers=headers
)
resp.raise_for_status()
data = await resp.json()
out: dict = data[0]["p"]["out"]["ok"]
try:
status = out["status"]
url = out.get("url")
headers = out.get("headers", {})
except KeyError as e:
raise StorachaBridgeError(f"Missing expected field in bridge response: {e}")
return status, url, headers

async def upload_car(self, url: str, headers: dict, stream):
"""
Streams CAR bytes directly to the signed PUT url
"""
resp = await self.session.put(url, data=stream, headers=headers)
resp.raise_for_status()