米国株を見るだけで日本株の明日が分かる

PCA_SUB

はじめに

株式投資において「未来を予測する」ことは永遠のテーマです。

しかし、効率的市場仮説(EMH)によれば、すべての情報はすでに価格に織り込まれており、継続的な超過リターンを得ることは困難とされています。

一方で、現実の市場では以下のような現象が観測されています。

  • モメンタム(勝ち銘柄は勝ち続ける)
  • リバーサル(長期では逆転する)
  • 情報の遅延伝播

特に注目されているのが、リード・ラグ効果(Lead-Lag Effect)です。

本記事では、以下の論文をベースに

「米国市場の動きから、日本市場の翌日の上昇セクターを予測する」

という戦略を徹底解説し、さらに実装・応用方法まで踏み込みます。


論文概要

今回解説する論文は以下です。

中川慧ほか「部分空間正則化付き主成分分析を用いた日米業種リードラグ投資戦略」  

論文の主張(超要約)

この論文の内容を一言でまとめると、

「アメリカで起きたことは、日本に遅れて反映される」

になります。米国株は世界で最も注目されている株式市場です。その市場の動向を見て日本株やETFの取引方法を決定すれば、利益を出せるという考え方です


なぜリードラグが起きるのか?

論文では、以下の理由が挙げられています。

① 取引時間のズレ

  • 米国:夜に終了
  • 日本:翌朝に開始

→ 情報が「時差」を持って伝わります。夜の結果を反映してその後に始まる日本株の投資方針を立てれば良いのです。


② 情報の拡散遅延

  • 投資家の認識遅れ(個人/機関)
  • 市場間の連動のズレ
  • サプライチェーン構造

③ 実際の現象

具体的な例は次の通りです。

  • 米国でハイテク株が上昇→ 翌日、日本の電機株が上昇
    これは、半導体需要が高いことやAI投資に過熱感があり、日本では電機株が関連しているためそちらが上昇していく。といったものです。

戦略の全体像

この論文では、以下の流れで予測を行います。

ステップ①

米国市場の当日リターンを取得

ステップ②

「共通因子(ファクター)」に圧縮

ステップ③

日本市場へ変換

ステップ④

翌日の日本市場でトレード


PCA (主成分分析)の役割

ここで重要になるのがPCA(Principal Component Analysis)です。

PCA とは?

PCA とは、簡単にいうと、

「複雑なデータを、少数の重要な軸に圧縮する手法」


のことを指します。

株価の動きは以下の要因で動きます。

  • 景気
  • 金利
  • リスクオン/オフ

これらの情報をPCAで抽出していきます。


通常の PCA には問題がある

論文では重要な指摘があります。

問題点

  • ノイズに弱い
  • サンプル依存
  • 不安定

解決策:部分空間正則化付きPCA

ここがこの論文のコアです。

コンセプト

「意味のある方向に寄せてPCAをする」


意味のある方向とは一体なんでしょうか?詳細に迫っていきましょう。

具体的には

以下の3つの方向を使います。

  1. グローバル因子(市場全体)
  2. 国差(米国 vs 日本)
  3. 景気敏感 vs ディフェンシブ

正則化の式

論文では以下のように定義されます。

Creg = (1 − λ)C + λC0
  • C:実データの相関行列
  • C0:事前知識
  • λ:重み(0.9)

シグナル生成の仕組み

最も重要な部分です。

数式の意味

日本予測 = B × 米国リターン

ここで:

  • B:伝播行列
  • rank(B) ≤ 3(低次元)

直感的な理解

ややこしい話が出てきましたが、端的に言いたいこととしては下記の3つになります。

  1. 米国の動きを要約
  2. 日本への影響を推定
  3. 翌日のリターンを予測

投資戦略

ロング・ショート戦略

  • 上位30% → 買い
  • 下位30% → 売り

評価方法

論文では以下を使用:

  • 年率リターン
  • シャープレシオ
  • 最大ドローダウン

実証結果

論文の結果を見ると衝撃のリターンが記載されてます。

成績

戦略年利シャープ
MOM約5%0.5
PCA約6%0.6
PCA_SUB23.8%2.22

重要なポイント

  • リスクは増えていない
  • ドローダウンが小さい
  • ファクターでは説明できない

なぜ強いのか?

理由①:情報の時間差を利用

市場は投資家の行動遅れや、市場参加者の違い、セクター変換の必要性などから、「完全に同時に情報を織り込むわけではない」です。この時間差でリターンを上げていくことを狙っています。


理由②:低次元化

市場の動きは「少数の要因」で説明できることを念頭に置いているため、余分な変数を可能な限り省いています。数学的な処理で言うと、分散が大きい変数に注目していく手法をとっているため、低次元化できています。


理由③:正則化

安定性向上


Pythonで再現する

本記事では、実際にこの戦略を再現しました。

使用データ

  • 米国:SPDR Sector ETF
  • 日本:TOPIX-17 ETF

実装の流れ

  1. データ取得(yfinance)
  2. リターン計算
  3. PCA
  4. シグナル生成
  5. ポートフォリオ構築
  6. 累積リターン

# lead_lag_reproduce.py
# -*- coding: utf-8 -*-

from __future__ import annotations

import warnings
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

# =========================
# 設定
# =========================
START_DATE = "2010-01-01"
END_DATE = "2025-12-31"

WINDOW = 60
N_FACTORS = 3
LAMBDA_REG = 0.9
Q = 0.3

US_TICKERS = [
    "XLB", "XLE", "XLF", "XLI", "XLK", "XLP",
    "XLU", "XLV", "XLY", "XLC", "XLRE"
]

JP_TICKERS = [
    "1617.T", "1618.T", "1619.T", "1620.T", "1621.T", "1622.T",
    "1623.T", "1624.T", "1625.T", "1626.T", "1627.T", "1628.T",
    "1629.T", "1630.T", "1631.T", "1632.T", "1633.T"
]

ALL_TICKERS = US_TICKERS + JP_TICKERS

US_CYCLICAL = {"XLB", "XLE", "XLF", "XLRE"}
US_DEFENSIVE = {"XLK", "XLP", "XLU", "XLV"}

JP_CYCLICAL = {"1618.T", "1625.T", "1629.T", "1631.T"}
JP_DEFENSIVE = {"1617.T", "1621.T", "1627.T", "1630.T"}


# =========================
# 基本関数
# =========================
def safe_corr(df: pd.DataFrame) -> pd.DataFrame:
    c = df.corr().fillna(0.0)
    np.fill_diagonal(c.values, 1.0)
    return c


def annual_metrics(returns: pd.Series) -> dict:
    r = returns.dropna()
    if len(r) < 2:
        return {"AR": np.nan, "RISK": np.nan, "R/R": np.nan, "MDD": np.nan}

    ar = r.mean() * 252
    risk = r.std(ddof=1) * np.sqrt(252)
    rr = ar / risk if risk > 0 else np.nan

    wealth = (1 + r).cumprod()
    peak = wealth.cummax()
    dd = wealth / peak - 1.0
    mdd = dd.min()

    return {
        "AR": ar * 100,
        "RISK": risk * 100,
        "R/R": rr,
        "MDD": mdd * 100,
    }


def long_short_return(signal: pd.Series, realized: pd.Series, q: float = 0.3) -> float:
    df = pd.concat([signal.rename("signal"), realized.rename("ret")], axis=1).dropna()
    n = len(df)
    if n < 4:
        return np.nan

    k = max(1, int(np.floor(n * q)))
    if 2 * k >= n:
        return np.nan

    df = df.sort_values("signal", ascending=False)
    long_ret = df.head(k)["ret"].mean()
    short_ret = df.tail(k)["ret"].mean()
    return long_ret - short_ret


# =========================
# データ取得
# =========================
def download_ohlc(tickers, start, end):
    raw = yf.download(
        tickers=tickers,
        start=start,
        end=end,
        auto_adjust=True,
        progress=False,
        group_by="ticker",
        threads=True,
    )

    open_dict = {}
    close_dict = {}

    for t in tickers:
        if t not in raw.columns.get_level_values(0):
            continue
        df_t = raw[t].copy()
        if "Open" in df_t.columns and "Close" in df_t.columns:
            open_dict[t] = df_t["Open"]
            close_dict[t] = df_t["Close"]

    open_df = pd.DataFrame(open_dict).sort_index()
    close_df = pd.DataFrame(close_dict).sort_index()

    valid = [t for t in tickers if t in open_df.columns and t in close_df.columns]
    open_df = open_df[valid]
    close_df = close_df[valid]

    return open_df, close_df


def prepare_data():
    open_px, close_px = download_ohlc(ALL_TICKERS, START_DATE, END_DATE)

    rcc = close_px.pct_change()
    roc_jp = close_px[JP_TICKERS] / open_px[JP_TICKERS] - 1.0

    # 共通日付に合わせる
    common_idx = rcc.index.intersection(roc_jp.index)
    rcc = rcc.loc[common_idx]
    roc_jp = roc_jp.loc[common_idx]

    # 必要な列が揃っている日だけ残す
    rcc = rcc.dropna(subset=ALL_TICKERS, how="any")
    roc_jp = roc_jp.loc[rcc.index].dropna(subset=JP_TICKERS, how="any")
    rcc = rcc.loc[roc_jp.index]

    return rcc, roc_jp


# =========================
# 事前部分空間
# =========================
def orthonormalize(mat: np.ndarray) -> np.ndarray:
    q, _ = np.linalg.qr(mat)
    return q


def make_prior_subspace():
    n_us = len(US_TICKERS)
    n_jp = len(JP_TICKERS)
    n_all = n_us + n_jp

    v1 = np.ones(n_all)
    v2 = np.concatenate([np.ones(n_us), -np.ones(n_jp)])

    v3 = np.zeros(n_all)
    for i, t in enumerate(ALL_TICKERS):
        if t in US_CYCLICAL or t in JP_CYCLICAL:
            v3[i] = 1.0
        elif t in US_DEFENSIVE or t in JP_DEFENSIVE:
            v3[i] = -1.0
        else:
            v3[i] = 0.0

    V0 = np.column_stack([v1, v2, v3])
    return orthonormalize(V0)[:, :3]


def build_C0(rcc: pd.DataFrame):
    V0 = make_prior_subspace()

    cfull_period = rcc.loc["2010-01-01":"2014-12-31", ALL_TICKERS].dropna()
    z = (cfull_period - cfull_period.mean()) / cfull_period.std(ddof=0)
    z = z.replace([np.inf, -np.inf], np.nan).dropna()

    Cfull = safe_corr(z).values

    D0 = np.diag(np.diag(V0.T @ Cfull @ V0))
    Craw0 = V0 @ D0 @ V0.T

    delta = np.diag(Craw0).copy()
    delta[delta <= 0] = 1.0
    Dinv = np.diag(1.0 / np.sqrt(delta))

    C0 = Dinv @ Craw0 @ Dinv
    np.fill_diagonal(C0, 1.0)

    return pd.DataFrame(C0, index=ALL_TICKERS, columns=ALL_TICKERS)


# =========================
# シグナル計算
# =========================
def compute_window_stats(window_df: pd.DataFrame):
    mu = window_df.mean(axis=0)
    sigma = window_df.std(axis=0, ddof=0).replace(0.0, np.nan)
    return mu, sigma


def compute_pca_signal(
    rcc: pd.DataFrame,
    current_date: pd.Timestamp,
    c0: pd.DataFrame | None,
    use_regularization: bool,
):
    idx = rcc.index.get_loc(current_date)
    if idx < WINDOW:
        return None

    hist_idx = rcc.index[idx - WINDOW: idx]
    window_df = rcc.loc[hist_idx, ALL_TICKERS].copy()

    if window_df.isna().any().any():
        return None

    mu, sigma = compute_window_stats(window_df)
    z_window = (window_df - mu) / sigma
    z_window = z_window.replace([np.inf, -np.inf], np.nan).dropna()

    if len(z_window) < int(WINDOW * 0.8):
        return None

    Ct = safe_corr(z_window).values

    if use_regularization:
        Creg = (1.0 - LAMBDA_REG) * Ct + LAMBDA_REG * c0.loc[ALL_TICKERS, ALL_TICKERS].values
    else:
        Creg = Ct.copy()

    eigvals, eigvecs = np.linalg.eigh(Creg)
    order = np.argsort(eigvals)[::-1]
    eigvecs = eigvecs[:, order]
    Vk = eigvecs[:, :N_FACTORS]

    n_us = len(US_TICKERS)
    VU = Vk[:n_us, :]
    VJ = Vk[n_us:, :]

    rcc_us = rcc.loc[current_date, US_TICKERS]
    zU = (rcc_us - mu[US_TICKERS]) / sigma[US_TICKERS]
    zU = zU.replace([np.inf, -np.inf], np.nan)

    if zU.isna().any():
        return None

    ft = VU.T @ zU.values
    zhat_j = VJ @ ft

    return pd.Series(zhat_j, index=JP_TICKERS, name=current_date)


# =========================
# 戦略
# =========================
def run_mom_strategy(rcc: pd.DataFrame, roc_jp: pd.DataFrame):
    out = []

    for i in range(WINDOW, len(rcc.index) - 1):
        t1 = rcc.index[i + 1]
        hist = rcc.iloc[i - WINDOW:i][JP_TICKERS]
        signal = hist.mean(axis=0)
        realized = roc_jp.loc[t1, JP_TICKERS]
        ret = long_short_return(signal, realized, q=Q)
        out.append((t1, ret))

    return pd.Series(dict(out), name="MOM").sort_index()


def run_pca_strategy(rcc: pd.DataFrame, roc_jp: pd.DataFrame, use_regularization: bool):
    c0 = build_C0(rcc) if use_regularization else None
    name = "PCA_SUB" if use_regularization else "PCA_PLAIN"
    out = []

    for i in range(WINDOW, len(rcc.index) - 1):
        t = rcc.index[i]
        t1 = rcc.index[i + 1]

        signal = compute_pca_signal(
            rcc=rcc,
            current_date=t,
            c0=c0,
            use_regularization=use_regularization,
        )
        if signal is None:
            out.append((t1, np.nan))
            continue

        realized = roc_jp.loc[t1, JP_TICKERS]
        ret = long_short_return(signal, realized, q=Q)
        out.append((t1, ret))

    return pd.Series(dict(out), name=name).sort_index()


# =========================
# 表示
# =========================
def print_summary(returns_df: pd.DataFrame):
    rows = []
    for col in returns_df.columns:
        m = annual_metrics(returns_df[col])
        rows.append([
            col,
            round(m["AR"], 2),
            round(m["RISK"], 2),
            round(m["R/R"], 2) if pd.notna(m["R/R"]) else np.nan,
            round(m["MDD"], 2),
        ])

    result = pd.DataFrame(rows, columns=["Strategy", "AR", "RISK", "R/R", "MDD"])
    print("\n=== Performance Summary ===")
    print(result.to_string(index=False))


def plot_cumulative_returns(returns_df: pd.DataFrame):
    plt.figure(figsize=(12, 7))
    for col in returns_df.columns:
        s = returns_df[col].dropna()
        wealth = (1 + s).cumprod()
        plt.plot(wealth.index, wealth.values, label=col)

    plt.title("Cumulative Returns")
    plt.xlabel("Date")
    plt.ylabel("Cumulative Wealth")
    plt.legend()
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()


# =========================
# main
# =========================
def main():
    print("Downloading data...")
    rcc, roc_jp = prepare_data()

    print("Running MOM...")
    mom = run_mom_strategy(rcc, roc_jp)

    print("Running PCA_PLAIN...")
    pca_plain = run_pca_strategy(rcc, roc_jp, use_regularization=False)

    print("Running PCA_SUB...")
    pca_sub = run_pca_strategy(rcc, roc_jp, use_regularization=True)

    returns_df = pd.concat([mom, pca_plain, pca_sub], axis=1).sort_index()

    print_summary(returns_df)
    plot_cumulative_returns(returns_df)


if __name__ == "__main__":
    main()

グラフ化すると下図のようになります。図に示したとおり、 部分空間正則化付きPCA(PCA_SUB)のリターンが飛び抜けて高いですね。資金の余裕がある方はこの投資手法で実践してみるといいかと思います。私も環境を整えて3ヶ月程度試してみようと思ってます。

実装してわかったこと

① 再現は可能

論文の傾向は再現できる


② 完全一致は難しい

理由:

  • データ差
  • 祝日差
  • 分配金処理

③ PCA_SUBが優位

論文通りの結果


個別銘柄への応用

ここが重要です。

結論

そのままは使えないが、強力な特徴量になる


本論文はセクター単位(業種ETF)での戦略ですが、実際の運用では「個別銘柄でどう使うか」が最も重要になります。

結論から言うと、この手法はそのまま銘柄売買に使うのではなく、

「強力なマクロ特徴量」として組み込むのが最も有効

です。

ここでは、3つの応用例について記載します。

応用方法①:セクター予測を特徴量として使う(最も現実的)

最も簡単で効果が高い方法は、PCA_SUBで得られる「セクター予測スコア」を、個別銘柄のモデルに組み込むことです。

■ 考え方

個別銘柄のリターンは、以下の2つで決まります。

  • 個別要因(決算、ニュース)
  • セクター要因(業界全体の流れ)

このうち、論文は「セクター要因」を高精度で捉えています。


■ 実装イメージ

例えば、LightGBMの特徴量を以下のように拡張します。

特徴量 = [
  過去リターン,
  出来高,
  ボラティリティ,
  セクター予測スコア(←追加)
]

■ 具体例

  • トヨタ → 自動車セクターのスコア
  • ソニー → 電機セクターのスコア
  • 三菱UFJ → 銀行セクターのスコア

■ 効果

これにより、

  • 「個別は弱いがセクターが強い銘柄」
  • 「個別は強いがセクターが弱い銘柄」

を区別できるようになります。


■ 実務的なメリット

  • 実装が簡単
  • 既存モデルに組み込みやすい
  • 過学習しにくい

応用方法②:セクターでフィルタリング(スクリーニング)

次に有効なのが、銘柄選定の前段階でセクターをフィルタする方法です。

■ 考え方

すべての銘柄を対象にするのではなく、

「上がる可能性の高いセクターだけに絞る」


■ 実装例

  1. PCA_SUBでセクターランキングを作る
  2. 上位30%のセクターのみ選択
  3. その中で銘柄選定

■ フロー


全銘柄
↓
セクター予測で絞る
↓
個別モデルで選定

■ メリット

  • ノイズ削減
  • 計算量削減
  • 精度向上

■ 実際の効果

この方法だけでも、

  • 勝率が上がる
  • ドローダウンが減る

傾向があります。


応用方法③:スコア融合(最強パターン)

最も強力なのは、セクター予測と個別予測を融合する方法です。

■ コンセプト

「ミクロ(個別)+マクロ(セクター)」の融合


■ 数式イメージ

最終スコア =
0.7 × 個別予測 +
0.3 × セクター予測

 なぜ効くのか?

個別モデルは

  • 短期ノイズに強い
  • 局所最適に強い

一方、セクターモデルは

  • 大局的な流れを捉える
  • トレンドに強い

■ 組み合わせると

  • ダマシ減少
  • トレンド追随強化
  • 安定性向上

■ 実務的なポイント

  • 重み(0.7 / 0.3)はチューニング可能
  • セクターをゲートとして使うのも有効

実運用への落とし込み

毎日やること

  1. 米国リターン取得
  2. PCA
  3. 日本予測
  4. 銘柄スコア化

最終スコア

総合スコア =
  モデル予測 × 0.7 +
  セクター予測 × 0.3

注意点

① 短期戦略

基本は1日


② 米国依存

米国データ必須


③ 過学習リスク

PCAチューニング重要


今後の発展

  • LSTMとの融合
  • グラフニューラルネット
  • 個別銘柄伝播モデル

まとめ

本記事の結論です。

✔ ポイント

  • リードラグは実在する
  • PCAで抽出できる
  • 正則化で精度向上

ポイント②

  • 「市場は完全に効率的ではない」

最後に

この論文は、実用性の高いアルゴリズムかと思います。個人的にはこの考えを個別銘柄に活用したいと思っているので、いい方法を思いついたら追加で記事を書いてみますね

shota_py

メーカー勤務のエンジニアです。 自分の趣味である、「電気回路」、「ガジェット」「株式投資」、「Python」に関する記事をつらつらと書いています

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA