Chanomic Blog

Purescriptメモ - 配列のシャッフルを様々な方法で実装する

(last modified:
)

PureScriptで配列のシャッフルをしたい。型はこんな感じ。乱数は副作用を伴うため、返り値の型はEffectで包まれる。

shuffle :: forall a. Array a -> Effect (Array a)

アルゴリズムはFisher-Yates ShuffleのModern Algorithmの項の2つ目を利用する。これをさまざまな方法で作成したところ、Functor, Applicative, Monadなどに関連する事項だったり、STモナドの使い方、FFIの使い方だったりが学べたので、備忘のために書く。

準備

適当なディレクトリでプロジェクトを作成する。今回使うパッケージをインストールする。

$ spago init
$ spago install arrays
$ spago install random
$ spago install foldable-traversable

方法1: 素直(?)な書き方

ここでは、src/Shuffle.pursに記述する。

天下り的ではあるが、これから使う関数、型をimportしておく。

module Shuffle where

import Prelude

import Effect (Effect)
import Data.Array (range, (!!), updateAt, length)
import Data.Traversable (for)
import Effect.Random (randomInt)
import Data.Maybe (maybe)
import Data.Foldable (foldr)

まずは、「どの添字ととの添字の値を交換するか」という情報をもったデータExchangeIndexと、それを作成する関数exchangeIndiciesを作成する。

type ExchangeIndex =
  { i :: Int
  , j :: Int
  }

exchangeIndicies :: Int -> Effect (Array ExchangeIndex)
exchangeIndicies n =
  for (range 0 (n - 2)) \i -> do
     j <- randomInt i (n - 1)
     pure { i, j }

次に、ExchangeIndexの情報を元に配列を交換する関数exchangeを作成。配列の添字が不正だった場合(配列外参照を起こしそうなとき)は!!演算子がNothingを返すため、一連の計算はMaybeに包まれる。今回は簡単のため、交換に失敗したら元の配列をそのまま返すような実装にする。

exchange :: forall a. ExchangeIndex -> Array a -> Array a
exchange {i, j} xs = maybe xs identity do
  xi <- xs !! i
  xj <- xs !! j
  updateAt j xi =<< updateAt i xj xs

最後に、shuffle関数を作成。「どことどこを交換すべきか」という複数の情報の配列をexchangeIndicies (length xs)で作成する。それらを元にxsの要素を交換したい。このような、あるデータxsに対してデータ列exchangeIndicies (length xs)を順々に適用していきたい場合は、foldrが有効である。

shuffle :: forall a. Array a -> Effect (Array a)
shuffle xs = foldr exchange xs <$> exchangeIndicies (length xs)

<$>のところだけ補足しておく。もしexchangeIndicies (length xs)Array ExchangeIndexを返すなら、単に

shuffle :: forall a. Array a -> Effect (Array a)
shuffle xs = foldr exchange xs (exchangeIndicies (length xs))

とすれば良い。ところが、exchangeIndicies (length xs)Effectに包まれた型なので、間に<$>を挟む必要がある。

方法2: StateTの利用

Wikipediaに記載されているアルゴリズムでは手続き的に書かれている。関数型言語でも似たように書けないだろうか。「配列のある要素とある要素を交換する」という処理は、ある種配列の中身を変更しているように取れる。このような、配列の状態を変えるような計算にはStateモナドが利用できる。ただし今回のケースでは乱数の利用においてEffectモナドが伴う。EffectStateを同じdo構文の中で利用するためには、Stateの代わりにStateTを利用する。

StateTを使うので、関連パッケージを入れる。

$ spago install transformers

ここではsrc/Shuffle/State.pursに書く。 天下り的ではあるが、これから使う関数をimportしておく。

module Shuffle.State where

import Prelude

import Effect (Effect)
import Effect.Random (randomInt)
import Data.Maybe (maybe)
import Control.Monad.State (StateT, get, modify_, execStateT, lift)
import Data.Array (range, (!!), length, updateAt)
import Data.Traversable (for_)

方法1ではボトムアップに考えたが、ここではトップダウンに考える。 Wikipediaに記載されているアルゴリズムを引用すると次の通り。

for i from 0 to n−2 do
     j ← random integer such that i ≤ j < n
     exchange a[i] and a[j]

これを真似すると、PureScriptでは次のように書ける.一般にStateTモナドはStateT s m bの形で書けて、sは状態、mStateTと組み合わせたいモナド、bは計算結果の型である。状態として扱いたいのは配列なので、sにはArray aが入る。乱数の処理で副作用を扱いたいので,mにはEffectが入る。今回は特に計算結果がないため、bにはUnitを指定する。

StateTdo構文の中でEffectを伴う処理を書きたい場合は、以下のようにlift関数を噛ませる。

shuffleStでは計算の手順を定義しただけであり、実際に計算を走らせるのはshuffle関数のexecStateTである。execStateTは、一連の計算を行った後の状態を返す関数である。

shuffleSt :: forall a. StateT (Array a) Effect Unit
shuffleSt = do
  n <- length <$> get
  for_ (range 0 (n-2)) \i -> do
    j <- lift $ randomInt i (n - 1)
    exchange i j -- これから実装する


shuffle :: forall a. Array a -> Effect (Array a)
shuffle xs = execStateT shuffleSt xs

exchange関数はmodify_関数が組み合わさっているだけで、やってることは方法1とほとんど変わらない。

exchange :: forall a m. Monad m => Int -> Int -> StateT (Array a) m Unit
exchange i j = do
  modify_ \xs ->
    maybe xs identity do
      xi <- xs !! i
      xj <- xs !! j
      updateAt i xj =<< updateAt j xi xs

補足: 大きすぎる配列で実行時エラーを起こす

REPLでshuffle (range 0 10000)を実行したところ、RangeError: Maximum call stack size exceededを引き起こした。これはshuffleSt関数で使っているfor_周りで起こっているらしく、ちゃんと確かめていないが恐らく以下の2点に問題がありそう。

これは、Control.Safelyのfor_関数を利用することで解決できる。詳細はpurescript-safelyを参照。

方法3: STとSTArrayの利用

STモナドとSTArrayを使ってシャッフルを実装してみる。STモナドはStateモナドと似ているが、STはmutableな計算が行えるという点で異なる。しかしStateTのようなモナド変換子の仕組みはないため、Effectと混ぜて書くことは(自分がやってみた限りだと)難しそうだ。そこで、STの計算のなかで乱数の計算を行わないよう工夫する必要がある。具体的には、方法1と似た方法をとる。

余談。実はSTモナドについてあまりよくわかっていない状態である。ST周りを勉強したいなら、PureScriptではなくHaskellの文献を調べてみると良さそう。

ここではsrc/Shuffle/ST.pursに書く。 天下り的ではあるが、これから使う関数をimportしておく。

module Shuffle.ST where

import Prelude

import Control.Monad.ST (ST, foreach, run)
import Data.Array (range, length)
import Data.Array.ST (STArray, thaw, freeze, peek, poke)
import Data.Maybe (maybe)
import Data.Traversable (for)
import Effect (Effect)
import Effect.Random (randomInt)

方法1と同様に、ExchangeIndexexchangeIndiciesを作成。

type ExchangeIndex =
  { i :: Int
  , j :: Int
  }

exchangeIndicies :: Int -> Effect (Array ExchangeIndex)
exchangeIndicies n =
  for (range 0 (n - 2)) \i -> do
     j <- randomInt i (n - 1)
     pure { i, j }

続いて、STArrayの要素を交換する関数exchangeSTを書く。ここの処理はSTだけでなくMaybeも出てくる。Applicativeスタイルを使うと多少綺麗に書ける。

exchangeArray ExchangeInfo版の関数exchangeManyも作る。

exchange :: forall h a. ExchangeIndex -> STArray h a -> ST h Unit
exchange {i, j} stArr = do
  xiMay <- peek i stArr
  xjMay <- peek j stArr
  maybe
    (pure unit)
    identity
    (pokeVals <$> xiMay <*> xjMay)
  where
    pokeVals :: a -> a -> ST h Unit
    pokeVals xi xj = do
       void $ poke j xi stArr
       void $ poke i xj stArr


exchangeMany :: forall h a. Array ExchangeIndex -> STArray h a -> ST h Unit
exchangeMany indicies stArr =
  foreach indicies
    \idx -> exchange idx stArr

これを元にshuffleを作成する。thawArraySTArrayに変換し、freezeで逆の変換を行う。実際にSTの計算を走らせるにはrun関数を用いる。

shuffle' :: forall a. Array a -> Effect (Array a)
shuffle' xs = do
  indicies <- exchangeIndicies (length xs)
  pure $ run do
     stArr <- thaw xs
     exchangeMany indicies stArr
     freeze stArr

withArray関数を用いて次のようにも書ける。

-- import Data.Array.ST (withArray)

shuffle :: forall a. Array a -> Effect (Array a)
shuffle xs = do
  indicies <- exchangeIndicies (length xs)
  pure $ run (withArray (exchangeMany indicies) xs)

補足

run (..)のところをrun $ ..に変えても同じかと思い、次のように変えてみる。

shuffle :: forall a. Array a -> Effect (Array a)
shuffle xs = do
  indicies <- exchangeIndicies (length xs)
  pure $ run $ withArray (exchangeMany indicies) xs

ところが、これはコンパイルエラーになる。

Error found:
in module Shuffle.ST
at src/Shuffle/ST.purs:51:3 - 51:52 (line 51, column 3 - line 51, column 52)

  The type variable r has escaped its scope, appearing in the type

    ST r5 (Array a4) -> Array a4


in the expression apply run
in value declaration shuffle

See https://github.com/purescript/documentation/blob/master/errors/EscapedSkolem.md for more information,
or to contribute content related to this error.

$を使うとまずい、という話は、EscapedSkolem ErrorのNotesの項目に載っている(ページの内容が古いらしく、2021年1月現時点ではrunrunSTと記載されていることに注意)。エラーがなぜ起こるのかについては、勉強不足のためよく分からない。

こちらで知ったのだが、どうやらGHC(Haskell)では$演算子だけ例外扱いしており、上のようなコードは動くようだ。同じことはPureScriptのドキュメントでも言及している。

ちなみに、PureScriptではGHCでいうBlockArguments拡張と同じ機能が有効であるため(ドキュメント参照)、dorunの間に$を入れなくても動く。むしろ入れてしまうと、上のエラーを引き起こす。

run do
  ...  -- 何かの計算

方法4: FFIの利用

JavaScriptのコードをPureScriptで呼び出す方法。もはやPureScriptではないが、Fisher-Yates Shuffleの書きやすさでは1番だと思われる。

src/Shuffle/FFI.pursの内容は以下の通り。

module Shuffle.FFI where

import Prelude
import Effect (Effect)

foreign import shuffle :: forall a. Array a -> Effect (Array a)

src/Shuffle/FFI.jsの内容は以下の通り。Effect eの値は、() => { .. } のような、引数無しの関数であることに注意(purescript-effectの"Using Effects via the Foreign Function Interface"を参照)。

exports.shuffle = (xs) => () => {
  const n = xs.length;
  for (let i = 0; i < n-1; i++) {
    const j = i + Math.floor( Math.random() * (n - i) );
    [xs[i], xs[j]] = [xs[j], xs[i]];
  }
  return xs;
}

上はMath.randomで直接乱数を発生させたが、もしEffect.RandomrandomInt関数を使いたいなら次のようにする。randomIntEffectで包まれた値を返すため、randomInt(a)(b)ではなくrandomInt(a)(b)()としないと値が得られないことに注意(Effect eの値は引数無しの関数であるため)。

const { randomInt } = require('../Effect.Random');

exports.shuffle = (xs) => () => {
  const n = xs.length;
  for (let i = 0; i < n-1; i++) {
    const j = randomInt(i)(n-1)();
    [xs[i], xs[j]] = [xs[j], xs[i]];
  }
  return xs;
}

補足。1行目について、requireの部分をrequire('Effect.Random');に変えると、REPLでは動くがspago runではMODULE NOT FOUNDのエラーが発生する(参考)。

計測

せっかくなので4つの方法を計測してみる。時刻を取得するためにpurescript-nowパッケージを導入。

$ spago install now

src/Main.pursに記述していく。必要なものをimportしておく。

module Main where
import Prelude

import Effect (Effect)
import Effect.Console (log)

import Shuffle as S
import Shuffle.State as SS
import Shuffle.ST as SST
import Shuffle.FFI as SF

import Effect.Now (nowTime)
import Data.Time (diff)
import Data.Time.Duration (Milliseconds(..))
import Data.Array (range, replicate)
import Data.Traversable (sequence)
import Data.Foldable (fold)
import Data.Int (toNumber)
import Data.Newtype (wrap, unwrap)

配列xsのシャッフル時間を計測する関数measureと、n回の平均を出す関数measureNを作成。

measure :: forall a. Array a -> (Array a -> Effect (Array a)) -> Effect Milliseconds
measure xs shuffle = do
  ts <- nowTime
  _ <- shuffle xs
  te <- nowTime
  pure $ diff te ts


measureN :: forall a. Int -> Array a -> (Array a -> Effect (Array a)) -> Effect Milliseconds
measureN n xs shuffle = do
  ts <- sequence $ replicate n $ measure xs shuffle
  let total = fold ts
      ave = wrap $ unwrap total / toNumber n
  pure ave

計測結果を綺麗に出力するヘルパーを作成。

logTime :: String -> Milliseconds -> Effect Unit
logTime label (Milliseconds t) =
  log $ label <> ": " <> show t <> " msec"

main関数はこんな感じにする。

main :: Effect Unit
main = do
  let xs = range 0 1000
      n = 100
  logTime "1" =<< (measureN n xs S.shuffle)
  logTime "2" =<< (measureN n xs SS.shuffle)
  logTime "3" =<< (measureN n xs SST.shuffle)
  logTime "4" =<< (measureN n xs SF.shuffle)

REPLで実行してみる。

> import Main
> main
1: 15.0 msec
2: 53.0 msec
3: 9.0 msec
4: 1.0 msec
unit

方法4(直接JSを書く)方法が一番早いのは予想通り。方法2(StateTの利用)が思ったより遅くて驚いた。