Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closure sequences #22

Merged
merged 12 commits into from
Feb 16, 2024
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Changelog

## Unreleased
- Better error message for sync lazy engine when env is missing a key
## 1.16.0 / 2024-02-16
- Allow sequences to refer to any node in the environment for the mapped fn

## 1.15.0 / 2023-12-07
- Allow running the lazy synchronous engine with a channel return.
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject dev.nu/nodely "1.15.0"
(defproject dev.nu/nodely "1.16.0"
:description "Decoupling data fetching from data dependency declaration"
:url "https://github.com/nubank/nodely"
:license {:name "MIT"}
Expand Down
52 changes: 35 additions & 17 deletions src/nodely/data.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns nodely.data
(:refer-clojure :exclude [map sequence])
(:require
[clojure.set :as set]
[schema.core :as s]))

;;
Expand All @@ -24,10 +25,12 @@
:truthy (s/recursive #'Node)
:falsey (s/recursive #'Node)})

(s/defschema Sequence #::{:type (s/eq :sequence)
:input s/Keyword
:fn (s/pred ifn?)
:tags #{node-tag}})
(s/defschema Sequence #::{:type (s/eq :sequence)
:input s/Keyword
:process-node (s/conditional
#(= (get % ::type) :value) Value
#(= (get % ::type) :leaf) Leaf)
:tags #{node-tag}})

(s/defschema Node (s/conditional
#(= (get % ::type) :value) Value
Expand Down Expand Up @@ -69,15 +72,23 @@

(s/defn sequence
([input :- s/Keyword
fn]
(sequence input fn #{}))
f]
(sequence input f #{}))
([input :- s/Keyword
fn
f
tags :- #{node-tag}]
#::{:type :sequence
:input input
:process-node (value f)
:tags tags})
([input :- s/Keyword
closure-inputs
f
tags :- #{node-tag}]
#::{:type :sequence
:input input
:fn fn
:tags tags}))
#::{:type :sequence
:input input
:process-node (leaf closure-inputs f)
:tags tags}))

;;
;; Node Utilities
Expand All @@ -90,13 +101,20 @@
[node]
(= :value (::type node)))

(defn node-inputs
(defn leaf?
[node]
(case (::type node)
:value #{}
:leaf (::inputs node)
:branch (recur (::condition node))
:sequence #{(::input node)}))
(= :leaf (::type node)))

(defn node-inputs
([node]
(node-inputs node #{}))
([node inputs]
(case (::type node)
:value inputs
:leaf (set/union inputs (::inputs node))
:branch (recur (::condition node) inputs)
:sequence (recur (::process-node node)
(conj inputs (::input node))))))

;;
;; Env Utils
Expand Down
18 changes: 12 additions & 6 deletions src/nodely/engine/applicative.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@

(defn eval-sequence
[node lazy-env opts]
(let [f (::data/fn node)
in-key (::data/input node)]
(m/alet [input-seq (get lazy-env in-key)
result (sequence (map (fn [x] (m/fmap f (m/pure x)))
(::data/value input-seq)))]
(data/value result))))
(let [in-key (::data/input node)
mf (m/fmap ::data/value
(eval-node (::data/process-node node) lazy-env opts))
mseq (get lazy-env in-key)]
(->> mseq
(m/fmap (comp m/sequence
(partial map
(comp (partial m/fapply mf)
m/pure))
::data/value))
m/join
(m/fmap data/value))))

(defn eval-branch
[{::data/keys [condition truthy falsey]}
Expand Down
6 changes: 3 additions & 3 deletions src/nodely/engine/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@

(defn prepare-inputs
[input-keys env]
(->> (select-keys env input-keys)
(map (juxt key (comp ::data/value val)))
(->> input-keys
(map (juxt identity (comp ::data/value (partial get env))))
(into {})))

(defn eval-leaf
Expand Down Expand Up @@ -126,7 +126,7 @@
(defn- sequence->value
[node env]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (first (node->value (::data/process-node node) env)))
new-env (resolve-inputs [in-key] env)
in (data/get-value new-env in-key)]
[(data/value (mapv f in)) new-env]))
Expand Down
2 changes: 1 addition & 1 deletion src/nodely/engine/core_async/iterative_scheduling.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
[node resolved-env {::keys [max-sequence-parallelism]
:or {max-sequence-parallelism 4}}]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (first (core/node->value (::data/process-node node) resolved-env)))
sequence (map data/value (get (core/prepare-inputs [in-key] resolved-env) in-key))
in-chan (async/to-chan! sequence)
pipeline-result (async/chan)]
Expand Down
19 changes: 12 additions & 7 deletions src/nodely/engine/core_async/lazy_scheduling.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@

(defn eval-sequence
[node
sequence-input-promise-chan
lazy-env
{::keys [max-sequence-parallelism out-ch exception-ch]
:or {max-sequence-parallelism 4}}]
:or {max-sequence-parallelism 4}
:as opts}]
(nodely.async/jog
(let [f (::data/fn node)
sequence (map data/value (::data/value (async/<! sequence-input-promise-chan)))
(let [dep (::data/input node)
input-chan (get lazy-env dep)
f-ch (async/chan 1)
f (do (eval-async (::data/process-node node)
lazy-env
(assoc opts ::out-ch f-ch))
(::data/value (async/<! f-ch)))
sequence (map data/value (::data/value (async/<! input-chan)))
in-chan (async/to-chan! sequence)
pipeline-result (async/chan)]
(async/pipeline-async max-sequence-parallelism
Expand Down Expand Up @@ -74,9 +81,7 @@
::data/tags
::data/blocking)})
exception-ch))))
:sequence (let [dep (::data/input node)
chan (get lazy-env dep)]
(eval-sequence node chan opts))
:sequence (eval-sequence node lazy-env opts)
:branch (eval-branch node lazy-env opts)))
out-ch)

Expand Down
4 changes: 3 additions & 1 deletion src/nodely/engine/manifold.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
[nodely.data :as data]
[nodely.engine.core :as core]))

(declare eval-async)

(defn- prepare-inputs
[input-keys future-env]
(->> (select-keys future-env input-keys)
Expand All @@ -20,7 +22,7 @@
(defn eval-sequence
[node future-env]
(let [in-key (::data/input node)
f (::data/fn node)
f (::data/value (eval-async (::data/process-node node) future-env))
in (prepare-inputs [in-key] future-env)]
(data/value (->> (get in in-key)
(mapv #(deferred/future (f %)))
Expand Down
10 changes: 8 additions & 2 deletions src/nodely/syntax.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@
(data/value n)))

(defmacro >sequence
[fn input]
(list `data/sequence (question-mark->keyword input) fn))
[f input]
(let [symbols-to-be-replaced (question-mark-symbols f)
closure-inputs (mapv question-mark->keyword symbols-to-be-replaced)
fn-fn (fn-with-arg-map symbols-to-be-replaced f)]
(assert-not-shadowing! symbols-to-be-replaced)
(if (seq symbols-to-be-replaced)
`(data/sequence ~(question-mark->keyword input) ~closure-inputs ~fn-fn #{})
`(data/sequence ~(question-mark->keyword input) ~f #{}))))

(defmacro >leaf
[expr]
Expand Down
11 changes: 9 additions & 2 deletions test/nodely/engine/applicative_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[nodely.engine.core-async.core :as nodely.async]
[nodely.engine.schema :as schema]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>leaf >value]]
[nodely.syntax :as syntax :refer [>leaf >sequence >value]]
[nodely.syntax.schema :refer [yielding-schema]]
[promesa.core :as p]
[schema.core :as s]))
Expand Down Expand Up @@ -74,6 +74,10 @@
(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})

(def env-with-closure-sequence {:x (data/value [1 2 3])
:z (data/value 2)
:y (>sequence #(* % ?z) ?x)})

(def env+sequence-with-nil-values
{:a (>leaf [1 2 nil 4])
:b (syntax/>sequence #(when % (inc %)) ?a)})
Expand Down Expand Up @@ -175,7 +179,10 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (match? [2 3 4] (applicative/eval-key env-with-sequence+delay :c)))))
(is (match? [2 3 4] (applicative/eval-key env-with-sequence+delay :c))))
(testing "resolve closure sequence"
(is (= [2 4 6]
(applicative/eval-key env-with-closure-sequence :y)))))

(deftest schema-test
(let [env-with-schema {:a (>value 2)
Expand Down
12 changes: 9 additions & 3 deletions test/nodely/engine/core_async/lazy_scheduling_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[nodely.engine.core-async.lazy-scheduling :as nasync]
[nodely.engine.lazy :as engine.lazy]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>cond >leaf >value blocking]]))
[nodely.syntax :as syntax :refer [>cond >leaf >sequence >value blocking]]))

(def test-env {:a (>leaf (+ 1 2))
:b (>leaf (* ?a 2))
Expand Down Expand Up @@ -57,7 +57,11 @@
:c ?c})})

(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})
:b (>sequence inc ?a)})

(def env-with-closure-sequence {:a (>value [1 2 3])
:c (>value 2)
:b (>sequence (fn [e] (* e ?c)) ?a)})

(def env+sequence-with-nil-values
{:a (>leaf [1 2 nil 4])
Expand Down Expand Up @@ -142,7 +146,9 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (= [2 3 4] (nasync/eval-key env-with-sequence+delay :b)))))
(is (match? [2 3 4] (nasync/eval-key env-with-sequence+delay :b))))
(testing "When there's a closure in the sequence expr"
(is (match? [2 4 6] (nasync/eval-key env-with-closure-sequence :b)))))

(deftest eval-test
(testing "eval node async"
Expand Down
13 changes: 11 additions & 2 deletions test/nodely/engine/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[nodely.data :as data]
[nodely.engine.core :as core]
[nodely.fixtures :as fixtures]
[nodely.syntax :as syntax :refer [>cond >if >leaf]]
[nodely.syntax :as syntax :refer [>cond >if >leaf >sequence]]
[schema.test]))

(use-fixtures :once schema.test/validate-schemas)
Expand Down Expand Up @@ -69,6 +69,10 @@
(def env-with-sequence {:x (data/value [1 2 3])
:y (data/sequence :x inc)})

(def env-with-closure-sequence {:x (data/value [1 2 3])
:z (data/value 2)
:y (>sequence (fn [e] (* e ?z)) ?x)})

(def branch-with-sequence {:x (data/value [1 2 3])
:y (data/value [4 5 6])
:a (data/value true)
Expand Down Expand Up @@ -165,6 +169,11 @@
(is (= {:x (data/value [1 2 3])
:y (data/value [2 3 4])}
(core/resolve :y env-with-sequence))))
(testing "resolve closure sequence"
(is (= {:x (data/value [1 2 3])
:z (data/value 2)
:y (data/value [2 4 6])}
(core/resolve :y env-with-closure-sequence))))
(testing "resolve with nested branches"
(is (= {:x (data/value 1)
:y (data/value 2)
Expand Down Expand Up @@ -245,4 +254,4 @@
(testing "there is a cycle in the env and checked-env detects it"
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env env-with-cycle))))
(testing "there is no cycle but checked-env reports there is (corner case mutually exclusive)"
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env mutually-exclusive-env-without-cycles)))))
(is (thrown? clojure.lang.ExceptionInfo (core/checked-env mutually-exclusive-env-without-cycles)))))
9 changes: 8 additions & 1 deletion test/nodely/engine/manifold_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
(def env-with-sequence {:a (>leaf [1 2 3])
:b (syntax/>sequence inc ?a)})

(def env-with-sequence-with-closure
{:a (syntax/>value [1 2 3])
:c (syntax/>value 2)
:b (syntax/>sequence #(* % ?c) ?a)})

(def env-with-sequence+delay {:a (>leaf [1 2 3])
:b (syntax/>sequence
#(do (Thread/sleep 1000) (inc %))
Expand Down Expand Up @@ -54,7 +59,9 @@
(is (match? (matchers/within-delta 8000000 2000000000)
(- nanosec-sync nanosec-async)))))
(testing "Actually computes the correct answers"
(is (= [2 3 4] (manifold/eval-key env-with-sequence+delay :b)))))
(is (= [2 3 4] (manifold/eval-key env-with-sequence+delay :b))))
(testing "Supports closure of nodes in the iterated fn"
(is (= [2 4 6] (manifold/eval-key env-with-sequence-with-closure :b)))))

(deftest eval-test
(testing "eval node async"
Expand Down
3 changes: 2 additions & 1 deletion test/nodely/syntax_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@
:fn fn?}
:b #::data {:type :sequence
:input :a
:fn fn?}}
:process-node #::data {:type :value
:value fn?}}}
{:a (>leaf [1 2 3])
:b (>sequence inc ?a)}))))

Expand Down