システム設計¶
設計原則¶
1. データ完全性優先¶
アーカイブシステムは、データの完全性を最優先に設計されています。
graph LR
A[データ取得] --> B{書き込み成功?}
B -->|Yes| C[Parquet保存]
B -->|No| D[JSONフォールバック]
D --> E[後で再変換]
C --> F[状態更新]
- フォールバック機構: Parquet書き込み失敗時はJSONで保存
- 状態追跡: 最終成功時刻を記録し、ギャップを検出可能に
- べき等性: 同一データの重複書き込みを許容(重複は後処理で除去)
2. タイムゾーン一貫性¶
すべてのタイムスタンプはUTCで保存し、ETオフセットを付与。
# 正しい例
{
"timestamp": "2025-11-28T14:30:00Z", # UTC
"et_offset": -5 # EST (冬時間)
}
# 変換時
et_time = timestamp.astimezone(ZoneInfo("America/New_York"))
3. パーティション戦略¶
ET日付でパーティションすることで、以下のメリットを実現:
- クエリ効率: 日付範囲でのフィルタリングが高速
- ファイル管理: 日次でファイルが分離され、削除・移動が容易
- 並列処理: 日付ごとに独立して処理可能
コンポーネント設計¶
LiveDataArchive¶
class LiveDataArchive:
"""ライブデータアーカイブの中心クラス"""
def __init__(self, base_dir: Path = None):
self.base_dir = base_dir or Path("data/archive")
self._ensure_directories()
def write_uw_flow(self, record: dict) -> bool:
"""UWフローを書き込み"""
return self._write_record(ArchiveType.UW_FLOW, record)
def write_option_snapshot(self, records: list[dict]) -> bool:
"""オプションスナップショットを書き込み"""
return self._write_records(ArchiveType.OPTIONS, records)
def _write_record(self, archive_type: ArchiveType, record: dict) -> bool:
"""単一レコードを書き込み"""
# 1. スキーマ検証
# 2. Parquet追記
# 3. 失敗時JSONフォールバック
pass
ArchiveState¶
class ArchiveState:
"""アーカイブ状態の追跡"""
def __init__(self, state_file: Path = None):
self.state_file = state_file or Path("data/archive/_state.json")
self._load_state()
def update_last_archive(self, archive_type: ArchiveType) -> None:
"""最終アーカイブ時刻を更新"""
self.state[archive_type.value] = {
"last_archive": datetime.now(timezone.utc).isoformat(),
"last_date": get_market_date().isoformat()
}
self._save_state()
def detect_gaps(self, archive_type: ArchiveType) -> list[dict]:
"""ギャップを検出"""
# 最終アーカイブ時刻から現在までの欠損期間を計算
pass
PyArrowスキーマ定義¶
UW Flow Schema¶
UW_FLOW_SCHEMA = pa.schema([
("timestamp", pa.timestamp("us", tz="UTC")),
("symbol", pa.string()),
("strike", pa.float64()),
("expiration", pa.date32()),
("option_type", pa.string()),
("sentiment", pa.string()),
("premium", pa.float64()),
("volume", pa.int64()),
("size", pa.int64()),
("iv", pa.float64()),
("delta", pa.float64()),
("is_unusual", pa.bool_()),
("et_offset", pa.int8()),
])
Options Schema¶
OPTIONS_SCHEMA = pa.schema([
("timestamp", pa.timestamp("us", tz="UTC")),
("symbol", pa.string()),
("underlying_price", pa.float64()),
("strike", pa.float64()),
("expiration", pa.date32()),
("option_type", pa.string()),
("bid", pa.float64()),
("ask", pa.float64()),
("last", pa.float64()),
("volume", pa.int64()),
("open_interest", pa.int64()),
("iv", pa.float64()),
("delta", pa.float64()),
("gamma", pa.float64()),
("theta", pa.float64()),
("vega", pa.float64()),
("et_offset", pa.int8()),
])
GEX Profile Schema¶
GEX_PROFILE_SCHEMA = pa.schema([
("timestamp", pa.timestamp("us", tz="UTC")),
("symbol", pa.string()),
("gex_value", pa.float64()),
("flip_point", pa.float64()),
("call_wall", pa.float64()),
("put_wall", pa.float64()),
("regime", pa.string()),
("ratio", pa.float64()),
("ratio_98d", pa.float64()),
("et_offset", pa.int8()),
])
Stock 1m Schema¶
STOCK_1M_SCHEMA = pa.schema([
("timestamp", pa.timestamp("us", tz="UTC")),
("symbol", pa.string()),
("open", pa.float64()),
("high", pa.float64()),
("low", pa.float64()),
("close", pa.float64()),
("volume", pa.int64()),
("vwap", pa.float64()),
("et_offset", pa.int8()),
])
エラーハンドリング¶
書き込み失敗時のフォールバック¶
def _write_record(self, archive_type: ArchiveType, record: dict) -> bool:
try:
# Parquet書き込み試行
self._append_to_parquet(archive_type, record)
return True
except Exception as e:
logger.warning(f"Parquet write failed: {e}, falling back to JSON")
# JSONフォールバック
self._write_json_fallback(archive_type, record)
return False
ギャップ検出とアラート¶
def check_archive_health(self) -> dict:
"""アーカイブ健全性チェック"""
health = {}
for archive_type in ArchiveType:
gaps = self.state.detect_gaps(archive_type)
health[archive_type.value] = {
"status": "healthy" if not gaps else "gap_detected",
"gaps": gaps,
"last_archive": self.state.get_last_archive(archive_type)
}
return health
パフォーマンス最適化¶
バッチ書き込み¶
class BatchWriter:
"""バッチ書き込みでI/Oを最適化"""
def __init__(self, batch_size: int = 100, flush_interval: int = 60):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = defaultdict(list)
self.last_flush = time.time()
def add(self, archive_type: ArchiveType, record: dict) -> None:
self.buffer[archive_type].append(record)
if self._should_flush(archive_type):
self.flush(archive_type)
def _should_flush(self, archive_type: ArchiveType) -> bool:
return (
len(self.buffer[archive_type]) >= self.batch_size or
time.time() - self.last_flush > self.flush_interval
)
圧縮設定¶
# Parquet書き込み時の圧縮設定
pq.write_table(
table,
file_path,
compression='zstd',
compression_level=3, # バランス重視
write_statistics=True,
use_dictionary=True,
)
監視とアラート¶
ダッシュボード統合¶
def get_archive_status() -> dict:
"""ダッシュボード用ステータス取得"""
archive = LiveDataArchive()
state = ArchiveState()
return {
"uw_flow": {
"last_archive": state.get_last_archive(ArchiveType.UW_FLOW),
"records_today": archive.count_records(ArchiveType.UW_FLOW),
"file_size_mb": archive.get_file_size(ArchiveType.UW_FLOW) / 1024 / 1024
},
# ... 他のタイプも同様
}