コンテンツにスキップ

システム設計

設計原則

1. データ完全性優先

アーカイブシステムは、データの完全性を最優先に設計されています。

graph LR
    A[データ取得] --> B{書き込み成功?}
    B -->|Yes| C[Parquet保存]
    B -->|No| D[JSONフォールバック]
    D --> E[後で再変換]
    C --> F[状態更新]
  • フォールバック機構: Parquet書き込み失敗時はJSONで保存
  • 状態追跡: 最終成功時刻を記録し、ギャップを検出可能に
  • べき等性: 同一データの重複書き込みを許容(重複は後処理で除去)

2. タイムゾーン一貫性

すべてのタイムスタンプはUTCで保存し、ETオフセットを付与。

# 正しい例
{
    "timestamp": "2025-11-28T14:30:00Z",  # UTC
    "et_offset": -5  # EST (冬時間)
}

# 変換時
et_time = timestamp.astimezone(ZoneInfo("America/New_York"))

3. パーティション戦略

ET日付でパーティションすることで、以下のメリットを実現:

  • クエリ効率: 日付範囲でのフィルタリングが高速
  • ファイル管理: 日次でファイルが分離され、削除・移動が容易
  • 並列処理: 日付ごとに独立して処理可能

コンポーネント設計

LiveDataArchive

class LiveDataArchive:
    """ライブデータアーカイブの中心クラス"""

    def __init__(self, base_dir: Path = None):
        self.base_dir = base_dir or Path("data/archive")
        self._ensure_directories()

    def write_uw_flow(self, record: dict) -> bool:
        """UWフローを書き込み"""
        return self._write_record(ArchiveType.UW_FLOW, record)

    def write_option_snapshot(self, records: list[dict]) -> bool:
        """オプションスナップショットを書き込み"""
        return self._write_records(ArchiveType.OPTIONS, records)

    def _write_record(self, archive_type: ArchiveType, record: dict) -> bool:
        """単一レコードを書き込み"""
        # 1. スキーマ検証
        # 2. Parquet追記
        # 3. 失敗時JSONフォールバック
        pass

ArchiveState

class ArchiveState:
    """アーカイブ状態の追跡"""

    def __init__(self, state_file: Path = None):
        self.state_file = state_file or Path("data/archive/_state.json")
        self._load_state()

    def update_last_archive(self, archive_type: ArchiveType) -> None:
        """最終アーカイブ時刻を更新"""
        self.state[archive_type.value] = {
            "last_archive": datetime.now(timezone.utc).isoformat(),
            "last_date": get_market_date().isoformat()
        }
        self._save_state()

    def detect_gaps(self, archive_type: ArchiveType) -> list[dict]:
        """ギャップを検出"""
        # 最終アーカイブ時刻から現在までの欠損期間を計算
        pass

PyArrowスキーマ定義

UW Flow Schema

UW_FLOW_SCHEMA = pa.schema([
    ("timestamp", pa.timestamp("us", tz="UTC")),
    ("symbol", pa.string()),
    ("strike", pa.float64()),
    ("expiration", pa.date32()),
    ("option_type", pa.string()),
    ("sentiment", pa.string()),
    ("premium", pa.float64()),
    ("volume", pa.int64()),
    ("size", pa.int64()),
    ("iv", pa.float64()),
    ("delta", pa.float64()),
    ("is_unusual", pa.bool_()),
    ("et_offset", pa.int8()),
])

Options Schema

OPTIONS_SCHEMA = pa.schema([
    ("timestamp", pa.timestamp("us", tz="UTC")),
    ("symbol", pa.string()),
    ("underlying_price", pa.float64()),
    ("strike", pa.float64()),
    ("expiration", pa.date32()),
    ("option_type", pa.string()),
    ("bid", pa.float64()),
    ("ask", pa.float64()),
    ("last", pa.float64()),
    ("volume", pa.int64()),
    ("open_interest", pa.int64()),
    ("iv", pa.float64()),
    ("delta", pa.float64()),
    ("gamma", pa.float64()),
    ("theta", pa.float64()),
    ("vega", pa.float64()),
    ("et_offset", pa.int8()),
])

GEX Profile Schema

GEX_PROFILE_SCHEMA = pa.schema([
    ("timestamp", pa.timestamp("us", tz="UTC")),
    ("symbol", pa.string()),
    ("gex_value", pa.float64()),
    ("flip_point", pa.float64()),
    ("call_wall", pa.float64()),
    ("put_wall", pa.float64()),
    ("regime", pa.string()),
    ("ratio", pa.float64()),
    ("ratio_98d", pa.float64()),
    ("et_offset", pa.int8()),
])

Stock 1m Schema

STOCK_1M_SCHEMA = pa.schema([
    ("timestamp", pa.timestamp("us", tz="UTC")),
    ("symbol", pa.string()),
    ("open", pa.float64()),
    ("high", pa.float64()),
    ("low", pa.float64()),
    ("close", pa.float64()),
    ("volume", pa.int64()),
    ("vwap", pa.float64()),
    ("et_offset", pa.int8()),
])

エラーハンドリング

書き込み失敗時のフォールバック

def _write_record(self, archive_type: ArchiveType, record: dict) -> bool:
    try:
        # Parquet書き込み試行
        self._append_to_parquet(archive_type, record)
        return True
    except Exception as e:
        logger.warning(f"Parquet write failed: {e}, falling back to JSON")
        # JSONフォールバック
        self._write_json_fallback(archive_type, record)
        return False

ギャップ検出とアラート

def check_archive_health(self) -> dict:
    """アーカイブ健全性チェック"""
    health = {}
    for archive_type in ArchiveType:
        gaps = self.state.detect_gaps(archive_type)
        health[archive_type.value] = {
            "status": "healthy" if not gaps else "gap_detected",
            "gaps": gaps,
            "last_archive": self.state.get_last_archive(archive_type)
        }
    return health

パフォーマンス最適化

バッチ書き込み

class BatchWriter:
    """バッチ書き込みでI/Oを最適化"""

    def __init__(self, batch_size: int = 100, flush_interval: int = 60):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = defaultdict(list)
        self.last_flush = time.time()

    def add(self, archive_type: ArchiveType, record: dict) -> None:
        self.buffer[archive_type].append(record)
        if self._should_flush(archive_type):
            self.flush(archive_type)

    def _should_flush(self, archive_type: ArchiveType) -> bool:
        return (
            len(self.buffer[archive_type]) >= self.batch_size or
            time.time() - self.last_flush > self.flush_interval
        )

圧縮設定

# Parquet書き込み時の圧縮設定
pq.write_table(
    table,
    file_path,
    compression='zstd',
    compression_level=3,  # バランス重視
    write_statistics=True,
    use_dictionary=True,
)

監視とアラート

ダッシュボード統合

def get_archive_status() -> dict:
    """ダッシュボード用ステータス取得"""
    archive = LiveDataArchive()
    state = ArchiveState()

    return {
        "uw_flow": {
            "last_archive": state.get_last_archive(ArchiveType.UW_FLOW),
            "records_today": archive.count_records(ArchiveType.UW_FLOW),
            "file_size_mb": archive.get_file_size(ArchiveType.UW_FLOW) / 1024 / 1024
        },
        # ... 他のタイプも同様
    }

Slack通知

def alert_archive_gap(archive_type: ArchiveType, gap: dict) -> None:
    """ギャップ検出時のSlack通知"""
    message = f"""
    ⚠️ Archive Gap Detected

    Type: {archive_type.value}
    Start: {gap['start']}
    End: {gap['end']}
    Duration: {gap['duration_hours']:.1f} hours
    """
    send_slack_notification(message)