加具留矢流余

かぐるやるよ

移転しました。

約3秒後に自動的にリダイレクトします。

過去に計算したDataFrameと内容が同じか高速に判定したい

同じ前処理を実行するたびに数十分取られるのが非常に辛く感じる。 特に同じ入力を入れているのに実行するたびに計算をしなおすのが馬鹿らしく感じる。 かといってDataFrameを丸々キャッシュして、関数呼び出すたびに前回のDataFrameと同じか検証するものアホらしい。

もやもやしつつfeatherとhashでうまいことやれば高速に検証できね?って思ったので頑張ってみた。

コンセプトは以下の通り 1. featherを使ってDataFrameをbytesに変換 2. 高速なハッシュアルゴリズムxxhashを使ってbytesのハッシュ値を計算。 3. 初回はハッシュ値とDataFrameを何かしらの形でファイルに保存する。 4. 2回目以降はハッシュ値を計算して、前回のハッシュ値と同じだったら保存されている計算結果を読み込み。

pyarrowを使うとfeatherの出力先にfile-like objectが使えるので、シリアライズしたDataFrameをメモリ上(io.BytesIO)に吐き出せて良い。

以下実装

import io
import pyarrow.feather
import xxhash
import os
import gc
import pandas as pd

def get_hash(df):
    fp = io.BytesIO()
    pyarrow.feather.write_feather(df, fp)
    b = fp.getvalue()
    fp.close()
    h = xxhash.xxh64(b).hexdigest()
    return h

def dfcacher(func, df, name, path="./", verbose=False):
    h = get_hash(df)
    h_path = os.path.join(path, name+".xxhash")
    f_path = os.path.join(path, name+".feather")
    
    # 前回のハッシュ値を読み込み
    try:
        with open(h_path, "r") as fp:
            saved_h = fp.read().rstrip('\n')
    except FileNotFoundError:
        saved_h = ""
        
    if h == saved_h:
        # 前回とハッシュ値が変わらないのであれば計算結果をキャッシュから読み込み
        if verbose: print(f"Hash value of  {name} is not changed. Load dataframe from cache.")
        del(df)
        gc.collect()
        df = pyarrow.feather.read_feather(f_path)
    else:
        if verbose: print(f"Hash value of  {name} is changed.")
        # ハッシュ値が変わっていれば計算して計算結果を保存
        df = func(df)
        with open(h_path, "w") as fp:
            fp.write(h)
        df.to_feather(f_path)
    return df

# テストケース1
df1 = pd.DataFrame([[1, 2, 3], [4, 5, 6]])
df2 = pd.DataFrame([[1, 2, 3], [4, 5, 6]])
assert get_hash(df1) == get_hash(df2), "ハッシュ値が異なります"

# テストケース2
df1 = pd.DataFrame([[1, 2, 3], [4, 5, 6]])
df2 = pd.DataFrame([[1, 2, 4], [4, 5, 6]])
assert get_hash(df1) != get_hash(df2), "ハッシュ値が等しいです"

# テストケース3
def test_func1(df):
    df = df.apply(lambda x: x+2)
    return df

df1 = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["c1", "c2", "c3"])
df2 = pd.DataFrame([[1, 2, 4], [4, 5, 6]], columns=["c1", "c2", "c3"])
df = dfcacher(test_func1, df1, "test_func1", verbose=True)
df = dfcacher(test_func1, df1, "test_func1", verbose=True)
df = dfcacher(test_func1, df2, "test_func1", verbose=True)

問題点としては高速にシリアライズするためにfeatherを使ってるので、DataFrameがfeatherフォーマットの条件を満たしてないとうまく動かない。 あと余りしっかり検証していないこと。まだ実際に使ってないので使ってる途中になにか問題が出てきそうな気がする。 ただ体感としてhash値の計算はかなり高速に動作している。今度なにか機会があったら速度の検証をやろうと思っている。