diff --git a/src/Data/AnnotatedQueue.hs b/src/Data/AnnotatedQueue.hs new file mode 100644 index 0000000..9a14761 --- /dev/null +++ b/src/Data/AnnotatedQueue.hs @@ -0,0 +1,226 @@ +{-# language FunctionalDependencies, ScopedTypeVariables, FlexibleInstances, + BangPatterns, UndecidableInstances #-} + +-- | An implementation of Okasaki's implicit queues holding elements of some +-- semigroup. We track the sum of them all. This structure is designed to +-- support efficient *sliding window* algorithms for streams. +-- +-- References: +-- +-- Hinze, Ralf & Paterson, Ross. (2006). Finger trees: A simple general-purpose +-- data structure. J. Funct. Program.. 16. 197-217. 10.1017/S0956796805005769. +-- +-- Okasaki, C. (1998). Purely Functional Data Structures. Cambridge: Cambridge +-- University Press. doi:10.1017/CBO9780511530104 + +module Data.AnnotatedQueue + ( Queue + , ViewL (..) + , empty + , viewl + , drop1 + , singleton + , snoc + , measure + ) where + +import Data.Semigroup (Semigroup (..)) + +data FDigit a = FOne !a | FTwo !a !a +data RDigit a = RZero | ROne !a +data Node s a = Node !s !a !a + +newtype Queue s = Queue (Tree s (Elem s)) +instance Semigroup s => Semigroup (Queue s) where + (!t) <> u = case viewl u of + EmptyL -> t + ViewL x xs -> (t `snoc` x) <> xs +instance Semigroup s => Monoid (Queue s) where + mempty = empty + mappend = (<>) + +newtype Elem a = Elem a + +-- Debit invariant (Okasaki): the middle tree of +-- a Deep node is allowed |pr| - |sf| debits, where +-- pr is the prefix and sf is the suffix. +data Tree s a + = Zero + | One !a + | Two !a !a + | Deep !s !(FDigit a) (Tree s (Node s a)) !(RDigit a) + +empty :: Queue s +empty = Queue Zero + +singleton :: s -> Queue s +singleton = Queue . One . Elem + +snoc :: Semigroup s => Queue s -> s -> Queue s +snoc (Queue t) s = Queue (snocTree t (Elem s)) +{-# INLINABLE snoc #-} + +measure :: Semigroup s => Queue s -> Maybe s +measure (Queue q) = case q of + Zero -> Nothing + One a -> Just (measure_ a) + Two a b -> Just (measure_ a <> measure_ b) + Deep s _ _ _ -> Just s +{-# INLINABLE measure #-} + +class Measurable s a | a -> s where + measure_ :: a -> s +instance Measurable s (Elem s) where + measure_ (Elem x) = x +instance Measurable s (Node s a) where + measure_ (Node s _ _) = s +instance (Semigroup s, Measurable s a) => Measurable s (FDigit a) where + measure_ (FOne a) = measure_ a + measure_ (FTwo a b) = measure_ a <> measure_ b + +class SemiMeasurable s a | a -> s where + semimeasure :: s -> a -> s +instance (Semigroup s, Measurable s a) => SemiMeasurable s (RDigit a) where + semimeasure s RZero = s + semimeasure s (ROne a) = s <> measure_ a +instance (Semigroup s, Measurable s a) + => SemiMeasurable s (Tree s a) where + semimeasure s Zero = s + semimeasure s (One a) = s <> measure_ a + semimeasure s (Two a b) = s <> measure_ a <> measure_ b + semimeasure s (Deep t _ _ _) = s <> t + +node + :: (Semigroup s, Measurable s a) + => a -> a -> Node s a +node a b = Node (measure_ a <> measure_ b) a b +{-# INLINABLE node #-} + +deep :: (Semigroup s, Measurable s a) => FDigit a -> Tree s (Node s a) -> RDigit a -> Tree s a +deep pr m sf = Deep (measure_ pr `semimeasure` m `semimeasure` sf) pr m sf +{-# INLINABLE deep #-} + +snocTree :: (Measurable s a, Semigroup s) => Tree s a -> a -> Tree s a +-- Note: in the last case we depart slightly from Okasaki. Following Hinze +-- and Paterson, we force the *old* middle immediately to prevent a chain of +-- thunks from accumulating in case of multiple sequential snocs. +snocTree Zero a = One a +snocTree (One a) b = Two a b +snocTree (Two a b) c = Deep (measure_ a <> measure_ b <> measure_ c) (FTwo a b) Zero (ROne c) +snocTree (Deep s pr m RZero) q = Deep (s <> measure_ q) pr m (ROne q) +snocTree (Deep s pr !m (ROne p)) !q + = Deep (s <> measure_ q) pr (snocTree m (node p q)) RZero +{-# INLINABLE snocTree #-} + +{- +Theorem: snocTree runs in O(1) amortized time. + +Proof: + +We show that snocTree costs at most 2 units of work. + +Reminder: The debit invariant allows the middle tree of a Deep +node |pr| - |sf| debits. + +The first three cases are trivial as they don't have any +debits in their inputs or outputs. + +In the fourth case (Deep s pr m RZero), the debit allowance on `m` drops by 1. +We do 1 unit of unshared work and pay off one debit on `m`, for a total of 2 +units of work. + +In the last case (Deep s pr m (ROne p)), we have two possibilities, depending +on the prefix: + +1. The prefix has one element. Then the debit allowance on `m` is 0. We force +`m` (for free). We do 1 unit of unshared work. We create a suspension for the +recursive call and place 2 debits on it to pay for that. Since the debit +allowance for the result middle only allows 1 debit, we pay one of them off +now. So the amortized cost is 2. + +2. The prefix has two elements. Then the debit allowance on `m` is 1. We pay +off that debit and force `m`. We do 1 unit of unshared work. We create a +suspension for the recursive call and place 2 debits on it. This is within the +debit allowance for the result middle. So the amortized cost is 2. +-} + +data ViewL s = EmptyL | ViewL !s (Queue s) + +-- Note: we need the ViewLTree constructor to be lazy in the +-- tail to maintain the right amortized bounds. We include +-- the measure of a nonempty tree in its view because we +-- need that in the recursive case of viewlTree. +data ViewLTree s a = EmptyLTree | ViewLTree !s !a (Tree s a) + +viewl :: Semigroup s => Queue s -> ViewL s +-- We could write a separate version for this top layer to avoid unnecessarily +-- calculating a sum in the Two case. +viewl (Queue q) = case viewlTree q of + EmptyLTree -> EmptyL + ViewLTree _ (Elem s) q' -> ViewL s (Queue q') +{-# INLINABLE viewl #-} + +viewlTree :: (Semigroup s, Measurable s a) => Tree s a -> ViewLTree s a +-- Important note: we produce the head before forcing the tail. This +-- is key to maintaining O(1) amortized time here. +viewlTree Zero = EmptyLTree +viewlTree (One a) = ViewLTree (measure_ a) a Zero +viewlTree (Two a b) = ViewLTree (measure_ a <> measure_ b) a (One b) +viewlTree (Deep s (FTwo a b) m sf) = ViewLTree s a (deep (FOne b) m sf) +viewlTree (Deep s (FOne a) m sf) = ViewLTree s a $ case viewlTree m of + EmptyLTree -> case sf of + RZero -> Zero + ROne b -> One b + ViewLTree sm (Node p b c) m' -> Deep (sm `semimeasure` sf) (FTwo b c) m' sf +{-# INLINABLE viewlTree #-} + +{- +Theorem: drop1 runs in O(1) amortized time. + +Proof. We follow the general outline of Okasaki Theorem 11.1, adjusting for the +need to measure (and therefore force) certain suspended middle trees in the +fourth case. + +The short version: everything is the same as in Okasaki, but if the recursive +viewing reaches an FOne digit, we need to discharge up to two debits on the +tree middle there, adding just a constant amount to the amortized cost of +the operation. + +The long version, in lots of detail: + +This particular proof doesn't make use of the "debit passing" concept, because +we seem to be able to get away without it. We will analyze `drop1` as taking 3 +units of work. When reading this proof, it may be helpful to mentally imagine +breaking down `viewlTree` into `headTree` and `drop1Tree`, much like Okasaki +does. + +The first three cases are trivial, with no debits on inputs or outputs, so we +can assign them each a cost of 1. + +In the fourth case (an FTwo digit), we may have up to 2 debits on `m` we must +discharge so we can measure it in `deep`, plus 1 unit of unshared work, for +a total of 3. + +In the fifth case (an FOne digit), we have two possibilities: + +The suffix is RZero: We may have up to 1 debit on `m`, which we discharge to +view it. We do 1 unit of unshared work. If `m` is nonempty, we create a +suspension to take its tail `m'`, and by the inductive hypothesis create 3 +debits to cover that. We place two of them on `m'` and discharge the third. So +the amortized cost is 3. + +The suffix is ROne: There are no debits on `m`, so we can view it immediately. +We do one unit of unshared work. If `m` is nonempty, we create a suspension to +take its tail `m'`, and create 3 debits to cover that. We place one debit on +`m'` and discharge the other two. The amortized cost is 3. +-} + +drop1 :: Semigroup s => Queue s -> Queue s +drop1 q = case viewl q of + EmptyL -> empty + ViewL _ q' -> q' +{- +-- We could expand out the upper layer to avoid an unnecessary view allocation. +-- Is that worth the extra code size? +-} +{-# INLINABLE drop1 #-} diff --git a/src/Streaming/Prelude.hs b/src/Streaming/Prelude.hs index 5ee7ac2..671f7bc 100644 --- a/src/Streaming/Prelude.hs +++ b/src/Streaming/Prelude.hs @@ -134,6 +134,7 @@ module Streaming.Prelude ( , show , cons , slidingWindow + , slidingWindowSum , slidingWindowMin , slidingWindowMinBy , slidingWindowMinOn @@ -272,8 +273,10 @@ import Data.Functor.Of import Data.Functor.Sum import Data.Monoid (Monoid (mappend, mempty)) import Data.Ord (Ordering (..), comparing) +import Data.Semigroup (Semigroup (..)) import Foreign.C.Error (Errno(Errno), ePIPE) import Text.Read (readMaybe) +import qualified Data.AnnotatedQueue as AQ import qualified Data.Foldable as Foldable import qualified Data.IntSet as IntSet import qualified Data.Sequence as Seq @@ -2846,7 +2849,7 @@ mapMaybe phi = loop where {-# INLINABLE mapMaybe #-} {-| 'slidingWindow' accumulates the first @n@ elements of a stream, - update thereafter to form a sliding window of length @n@. + updating thereafter to form a sliding window of length @n@. It follows the behavior of the slidingWindow function in . @@ -2880,6 +2883,33 @@ slidingWindow n = setup (max 1 n :: Int) mempty Right (x,rest) -> setup (m-1) (sequ Seq.|> x) rest {-# INLINABLE slidingWindow #-} +{-| 'slidingWindowSum' accumulates the first @n@ elements of a stream + with elements in some 'Semigroup', + updating thereafter to form a sliding window of length @n@. +-} +slidingWindowSum :: (Monad m, Semigroup a) + => Int + -> Stream (Of a) m b + -> Stream (Of a) m b +slidingWindowSum n = setup (max 1 n) AQ.empty + where + window !qu str = do + case AQ.measure qu of + Just s -> yield s + Nothing -> pure () + e <- lift (next str) + case e of + Left r -> return r + Right (a,rest) -> + window (AQ.drop1 $ qu `AQ.snoc` a) rest + setup 0 !qu str = window qu str + setup m !qu str = do + e <- lift $ next str + case e of + Left r -> window qu (return r) + Right (x,rest) -> setup (m-1) (qu `AQ.snoc` x) rest +{-# INLINABLE slidingWindowSum #-} + -- | 'slidingWindowMin' finds the minimum in every sliding window of @n@ -- elements of a stream. If within a window there are multiple elements that are -- the least, it prefers the first occurrence (if you prefer to have the last diff --git a/streaming.cabal b/streaming.cabal index f150888..36ca868 100644 --- a/streaming.cabal +++ b/streaming.cabal @@ -204,6 +204,7 @@ library , Streaming.Prelude , Streaming.Internal , Data.Functor.Of + , Data.AnnotatedQueue build-depends: base >=4.8 && <5 , mtl >=2.1 && <2.3 diff --git a/test/test.hs b/test/test.hs index cde0bd0..cf03c41 100644 --- a/test/test.hs +++ b/test/test.hs @@ -1,8 +1,10 @@ +{-# LANGUAGE ScopedTypeVariables #-} module Main where import qualified Data.Foldable as Foldable import Data.Functor.Identity import Data.Ord +import qualified Data.Semigroup as DS import qualified Streaming.Prelude as S import Test.Hspec import Test.QuickCheck @@ -10,25 +12,32 @@ import Test.QuickCheck toL :: S.Stream (S.Of a) Identity b -> [a] toL = runIdentity . S.toList_ -main :: IO () -main = - hspec $ do +slidingWindowMin_spec :: SpecWith () +slidingWindowMin_spec = describe "slidingWindowMin" $ do it "works with a few simple cases" $ do toL (S.slidingWindowMin 2 (S.each [1, 3, 9, 4, 6, 4])) `shouldBe` [1, 3, 4, 4, 4] toL (S.slidingWindowMin 3 (S.each [1, 3, 2, 6, 3, 7, 8, 9])) `shouldBe` [1, 2, 2, 3, 3, 7] it "produces no results with empty streams" $ property $ \k -> toL (S.slidingWindowMin k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` [] - it "behaves like a (S.map Foldable.minimum) (slidingWindow) for non-empty streams" $ + it "behaves like (S.map Foldable.minimum . slidingWindow k) for non-empty streams" $ property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimum crashes on empty lists -> toL (S.slidingWindowMin k (S.each xs)) === toL (S.map Foldable.minimum (S.slidingWindow k (S.each (xs :: [Int])))) + it "behaves like (S.map getMin . slidingWindowSum . S.map Min)" $ + property $ \(xs :: [Int]) k + -> + toL (S.slidingWindowMin k (S.each xs)) === + toL (S.map DS.getMin $ S.slidingWindowSum k $ S.map DS.Min $ S.each xs) it "behaves like identity when window size is 1" $ property $ \xs -> toL (S.slidingWindowMin 1 (S.each (xs :: [Int]))) === xs it "produces a prefix when the stream elements are sorted" $ property $ \(Sorted xs) k -> (length xs >= k) ==> (toL (S.slidingWindowMin k (S.each (xs :: [Int]))) === take (length xs - (k - 1)) xs) + +slidingWindowMinBy_spec :: SpecWith () +slidingWindowMinBy_spec = describe "slidingWindowMinBy" $ do it "prefers earlier elements when several elements compare equal" $ do toL (S.slidingWindowMinBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, 4)])) `shouldBe` @@ -38,6 +47,9 @@ main = -> toL (S.slidingWindowMinBy (comparing fst) k (S.each xs)) === toL (S.map (Foldable.minimumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)])))) + +slidingWindowMinOn_spec :: SpecWith () +slidingWindowMinOn_spec = describe "slidingWindowMinOn" $ do it "behaves like a (S.map (Foldable.minimumBy (comparing p))) (slidingWindow) for non-empty streams" $ do property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimumBy crashes on empty lists @@ -49,6 +61,9 @@ main = (length xs >= k) ==> (toL (S.slidingWindowMinOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) === take (length xs - (k - 1)) xs) + +slidingWindowMax_spec :: SpecWith () +slidingWindowMax_spec = describe "slidingWindowMax" $ do it "produces no results with empty streams" $ property $ \k -> toL (S.slidingWindowMax k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` [] @@ -62,6 +77,9 @@ main = it "produces a suffix when the stream elements are sorted" $ property $ \(Sorted xs) k -> (length xs >= k) ==> (toL (S.slidingWindowMax k (S.each (xs :: [Int]))) === drop (k - 1) xs) + +slidingWindowMaxBy_spec :: SpecWith () +slidingWindowMaxBy_spec = describe "slidingWindowMaxBy" $ do it "prefers later elements when several elements compare equal" $ do toL (S.slidingWindowMaxBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, -900)])) `shouldBe` @@ -71,6 +89,9 @@ main = -> toL (S.slidingWindowMaxBy (comparing fst) k (S.each xs)) === toL (S.map (Foldable.maximumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)])))) + +slidingWindowMaxOn_spec :: SpecWith () +slidingWindowMaxOn_spec = describe "slidingWindowMaxOn" $ do it "behaves like a (S.map (Foldable.maximumBy (comparing p))) (slidingWindow) for non-empty streams" $ do property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.maximumBy crashes on empty lists @@ -82,6 +103,16 @@ main = (length xs >= k) ==> (toL (S.slidingWindowMaxOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) === drop (k - 1) xs) +main :: IO () +main = + hspec $ do + slidingWindowMin_spec + slidingWindowMinBy_spec + slidingWindowMinOn_spec + slidingWindowMax_spec + slidingWindowMaxBy_spec + slidingWindowMaxOn_spec + data UnitWithLazyEq = UnitWithLazyEq instance Eq UnitWithLazyEq where