From 833a904660ab284358f396805e75076962a8ad68 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Sat, 14 May 2016 00:32:59 +1200 Subject: [PATCH 01/11] Add test for async collections --- test/rethinkdb/async_test.clj | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/rethinkdb/async_test.clj diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj new file mode 100644 index 0000000..f9b646b --- /dev/null +++ b/test/rethinkdb/async_test.clj @@ -0,0 +1,67 @@ +(ns rethinkdb.async-test + (:require [clojure.test :refer :all] + [rethinkdb.query :as r] + [clojure.core.async :as async]) + (:import (clojure.core.async.impl.protocols ReadPort))) + +(def test-db "cljrethinkdb_test") +(def test-table :pokedex) + +(defn ensure-table + "Ensures that an empty table \"table-name\" exists" + [table-name optargs conn] + (if (some #{table-name} (r/run (r/table-list) conn)) + (r/run (r/table-drop table-name) conn)) + (r/run (r/table-create (r/db test-db) table-name optargs) conn)) + +(defn ensure-db + "Ensures that an empty database \"db-name\" exists" + [db-name conn] + (if (some #{db-name} (r/run (r/db-list) conn)) + (r/run (r/db-drop db-name) conn)) + (r/run (r/db-create db-name) conn)) + +(defn setup-each [test-fn] + (with-open [conn (r/connect :db test-db)] + (-> (r/table test-table) + (r/delete {:durability :soft :return-changes false}) + (r/run conn)) + (test-fn))) + +(defn setup-once [test-fn] + (with-open [conn (r/connect)] + (ensure-db test-db conn) + (ensure-table test-table {:primary-key :national_no} conn) + (test-fn) + (r/run (r/db-drop test-db) conn))) + +(use-fixtures :each setup-each) +(use-fixtures :once setup-once) + +(deftest always-return-async + (with-open [conn (r/connect :async? true)] + (are [query] (instance? ReadPort (r/run query conn)) + (r/db-list) + (-> (r/db :non-existent) (r/table :nope)) + (-> (r/db test-db) (r/table test-table) (r/insert {:a 1}))))) + +(deftest async-results + (let [conn (r/connect :async? true :db test-db) + pokemon [{:national_no 25 :name "Pikachu"} + {:national_no 26 :name "Raichu"}]] + (are [expected query] (= (->> (r/run query conn) + (async/into []) + (async/ (r/table test-table) + (r/insert pokemon)) + pokemon (-> (r/table test-table)) + [pokemon] (-> (r/table test-table) (r/order-by :name)) + + ))) From 2730ff3c934b090516896d7a0385b2ee8cd4cd44 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Sun, 15 May 2016 18:42:30 +1200 Subject: [PATCH 02/11] Extract fixtures into a test-utils namespace --- test/rethinkdb/async_test.clj | 48 +++++++---------------------------- 1 file changed, 9 insertions(+), 39 deletions(-) diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index f9b646b..d81f0c4 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -1,52 +1,22 @@ (ns rethinkdb.async-test (:require [clojure.test :refer :all] + [clojure.core.async :as async] [rethinkdb.query :as r] - [clojure.core.async :as async]) + [rethinkdb.test-utils :as utils]) (:import (clojure.core.async.impl.protocols ReadPort))) -(def test-db "cljrethinkdb_test") -(def test-table :pokedex) - -(defn ensure-table - "Ensures that an empty table \"table-name\" exists" - [table-name optargs conn] - (if (some #{table-name} (r/run (r/table-list) conn)) - (r/run (r/table-drop table-name) conn)) - (r/run (r/table-create (r/db test-db) table-name optargs) conn)) - -(defn ensure-db - "Ensures that an empty database \"db-name\" exists" - [db-name conn] - (if (some #{db-name} (r/run (r/db-list) conn)) - (r/run (r/db-drop db-name) conn)) - (r/run (r/db-create db-name) conn)) - -(defn setup-each [test-fn] - (with-open [conn (r/connect :db test-db)] - (-> (r/table test-table) - (r/delete {:durability :soft :return-changes false}) - (r/run conn)) - (test-fn))) - -(defn setup-once [test-fn] - (with-open [conn (r/connect)] - (ensure-db test-db conn) - (ensure-table test-table {:primary-key :national_no} conn) - (test-fn) - (r/run (r/db-drop test-db) conn))) - -(use-fixtures :each setup-each) -(use-fixtures :once setup-once) +(use-fixtures :each utils/setup-each) +(use-fixtures :once utils/setup-once) (deftest always-return-async (with-open [conn (r/connect :async? true)] (are [query] (instance? ReadPort (r/run query conn)) (r/db-list) (-> (r/db :non-existent) (r/table :nope)) - (-> (r/db test-db) (r/table test-table) (r/insert {:a 1}))))) + (-> (r/db utils/test-db) (r/table utils/test-table) (r/insert {:a 1}))))) (deftest async-results - (let [conn (r/connect :async? true :db test-db) + (let [conn (r/connect :async? true :db utils/test-db) pokemon [{:national_no 25 :name "Pikachu"} {:national_no 26 :name "Raichu"}]] (are [expected query] (= (->> (r/run query conn) @@ -59,9 +29,9 @@ :replaced 0 :skipped 0 :unchanged 0}] - (-> (r/table test-table) + (-> (r/table utils/test-table) (r/insert pokemon)) - pokemon (-> (r/table test-table)) - [pokemon] (-> (r/table test-table) (r/order-by :name)) + pokemon (-> (r/table utils/test-table)) + [pokemon] (-> (r/table utils/test-table) (r/order-by :name)) ))) From 804dee0a1e6225d30bfb8fc695db113d36f2a663 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 12:13:14 +1200 Subject: [PATCH 03/11] Silence expected errors --- test/rethinkdb/async_test.clj | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index d81f0c4..181dfe7 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -3,17 +3,23 @@ [clojure.core.async :as async] [rethinkdb.query :as r] [rethinkdb.test-utils :as utils]) - (:import (clojure.core.async.impl.protocols ReadPort))) + (:import (clojure.core.async.impl.protocols ReadPort) + (org.slf4j LoggerFactory) + (ch.qos.logback.classic Logger Level))) (use-fixtures :each utils/setup-each) (use-fixtures :once utils/setup-once) (deftest always-return-async - (with-open [conn (r/connect :async? true)] - (are [query] (instance? ReadPort (r/run query conn)) - (r/db-list) - (-> (r/db :non-existent) (r/table :nope)) - (-> (r/db utils/test-db) (r/table utils/test-table) (r/insert {:a 1}))))) + (let [root-logger ^Logger (LoggerFactory/getLogger "rethinkdb.net") + level (.getLevel root-logger)] + (.setLevel root-logger Level/OFF) + (with-open [conn (r/connect :async? true)] + (are [query] (instance? ReadPort (r/run query conn)) + (r/db-list) + (-> (r/db :non-existent) (r/table :nope)) + (-> (r/db utils/test-db) (r/table utils/test-table) (r/insert {:a 1})))) + (.setLevel root-logger level))) (deftest async-results (let [conn (r/connect :async? true :db utils/test-db) From ab06b5663e72be357b987d7c9fceab2ab3194463 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 14:56:35 +1200 Subject: [PATCH 04/11] Add type hints to remove reflection warnings --- src/rethinkdb/net.clj | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index 147c540..086d494 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -13,7 +13,8 @@ [rethinkdb.query-builder :as qb] [rethinkdb.response :refer [parse-response]] [rethinkdb.types :as types]) - (:import [java.io Closeable])) + (:import [java.io Closeable] + [clojure.lang Keyword])) (declare send-continue-query send-stop-query) @@ -54,7 +55,7 @@ (s/stream->seq stream)) java.lang.Iterable (iterator [this] - (.iterator (seq this))) + (.iterator ^Iterable (seq this))) java.util.Collection (toArray [this] (into-array Object this)) @@ -162,7 +163,7 @@ (qb/parse-query query-type term) (qb/parse-query query-type))) json (json/generate-string term {:key-fn #(subs (str %) 1)})] - (when-not (and (= query-type :CONTINUE) + (when-not (and (= ^Keyword query-type :CONTINUE) (not (get-in @conn [:pending token]))) (s/put! client (io/encode query-protocol [token json]))) (recur)))))) From 31420b49010aa6642219114b8ab98492654d418b Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 16:21:09 +1200 Subject: [PATCH 05/11] Use decode-stream instead of decode-channel for consumer They are the same function, but decode-stream is the 'master', and decode-channel has been def'd to equal decode-stream. --- src/rethinkdb/net.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index 086d494..2d591e6 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -138,7 +138,7 @@ (-> json (json/parse-string-strict true) parse-response))) - (io/decode-channel (:client @conn) query-protocol))) + (io/decode-stream (:client @conn) query-protocol))) (defn add-global-optargs [{:keys [db]} query] From fc238669941a7a03c2c0610070690c792e62e0ba Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 17:19:57 +1200 Subject: [PATCH 06/11] Temp --- src/rethinkdb/net.clj | 4 ++-- test/rethinkdb/async_test.clj | 11 ++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index 2d591e6..f495cb2 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -79,7 +79,7 @@ (do (swap! (:conn conn) update-in [:pending token] #(dissoc % :cursor)) (s/put-all! cursor (conj resp ::done))) (do (swap! (:conn conn) update :pending #(dissoc % token)) - (s/put! result resp) + (s/put-all! result resp) (s/close! result))))) (defn append-result [conn token resp] @@ -96,7 +96,7 @@ (let [{type :t resp :r etype :e notes :n :as json-resp} resp] (case (int type) (1 5) ;; Success atom, server info - (deliver-result conn token (first resp)) + (deliver-result conn token resp) 2 ;; Success sequence (deliver-result conn token resp) diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index 181dfe7..0ab13ef 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -29,6 +29,7 @@ (async/into []) (async/ (r/table utils/test-table) (r/insert pokemon)) + + ;; Success sequence pokemon (-> (r/table utils/test-table)) + + ;; Success atom [pokemon] (-> (r/table utils/test-table) (r/order-by :name)) - ))) + ;; Changefeed + (map #(hash-map :new_val %) pokemon) + (-> (r/table utils/test-table) + (r/changes {:include-initial true}) + (r/limit 2))))) From fa7b16c246a6c69134323bc3863c3533aae1d6ff Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 20:18:54 +1200 Subject: [PATCH 07/11] Return async results as a stream In an async query, success sequence and success partial rows are put on the channel row by row. This means that the consumer can be unaware of the batching that takes place between the driver and RethinkDB. --- src/rethinkdb/net.clj | 2 ++ test-resources/logback-test.xml | 2 +- test/rethinkdb/async_test.clj | 60 ++++++++++++++++++--------------- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index f495cb2..9995ae5 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -150,6 +150,7 @@ (defn add-token [conn query] (let [token (:token (swap! (:conn conn) update-in [:token] inc)) query (assoc query :token token)] + (assert (nil? (get-in @conn [:pending token]))) (swap! (:conn conn) assoc-in [:pending token] query) query)) @@ -176,6 +177,7 @@ (let [{:keys [async?]} query {:keys [initial-query-chan]} @conn stream (s/stream)] + (log/debug "Init query" query) (async/go (async/>! initial-query-chan (assoc query :result stream))) (if async? diff --git a/test-resources/logback-test.xml b/test-resources/logback-test.xml index cc8e3de..5f7235e 100644 --- a/test-resources/logback-test.xml +++ b/test-resources/logback-test.xml @@ -8,5 +8,5 @@ - + diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index 0ab13ef..7244b4c 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -10,6 +10,11 @@ (use-fixtures :each utils/setup-each) (use-fixtures :once utils/setup-once) +(def pokemon [{:national_no 25 :name "Pikachu"} + {:national_no 26 :name "Raichu"}]) +(def changefeed-pokemon (map #(hash-map :new_val %) pokemon)) + + (deftest always-return-async (let [root-logger ^Logger (LoggerFactory/getLogger "rethinkdb.net") level (.getLevel root-logger)] @@ -22,31 +27,30 @@ (.setLevel root-logger level))) (deftest async-results - (let [conn (r/connect :async? true :db utils/test-db) - pokemon [{:national_no 25 :name "Pikachu"} - {:national_no 26 :name "Raichu"}]] - (are [expected query] (= (->> (r/run query conn) - (async/into []) - (async/ (r/table utils/test-table) - (r/insert pokemon)) - - ;; Success sequence - pokemon (-> (r/table utils/test-table)) - - ;; Success atom - [pokemon] (-> (r/table utils/test-table) (r/order-by :name)) - - ;; Changefeed - (map #(hash-map :new_val %) pokemon) - (-> (r/table utils/test-table) - (r/changes {:include-initial true}) - (r/limit 2))))) + (with-open [conn (r/connect :async? true :db utils/test-db)] + (testing "Shape of async results" + (are [expected query] (= (->> (r/run query conn) + (async/into []) + (async/ (r/table utils/test-table) + (r/insert pokemon)) + + ;; Success sequence + pokemon (-> (r/table utils/test-table)) + + ;; Success atom + [pokemon] (-> (r/table utils/test-table) (r/order-by :name)) + + ;; Changefeed + changefeed-pokemon + (-> (r/table utils/test-table) + (r/changes {:include-initial true}) + (r/limit 2)))))) From e91e5226581d801f011ebb368dc375394f77f82a Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 21:49:04 +1200 Subject: [PATCH 08/11] Add debug logging statements for sending and handling responses --- src/rethinkdb/net.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index 9995ae5..93e7a05 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -94,6 +94,7 @@ (defn handle-response [conn token resp] (let [{type :t resp :r etype :e notes :n :as json-resp} resp] + (log/debug "Handling response" token type resp) (case (int type) (1 5) ;; Success atom, server info (deliver-result conn token resp) @@ -159,6 +160,7 @@ (async/pipeline 1 query-chan (map (partial add-token conn)) initial-query-chan) (async/go-loop [] (when-let [{:keys [term query-type token]} (async/! initial-query-chan (assoc query :result stream))) (if async? From 7f01af01316ac371f3ed5c0831e31795cc730abe Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Mon, 16 May 2016 21:49:46 +1200 Subject: [PATCH 09/11] Add failing test for closing async changefeed and getting results --- test/rethinkdb/async_test.clj | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index 7244b4c..08273a2 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -53,4 +53,15 @@ changefeed-pokemon (-> (r/table utils/test-table) (r/changes {:include-initial true}) - (r/limit 2)))))) + (r/limit 2)))) + + #_(testing "Closing a changefeed with results" + (let [changefeed (-> (r/table utils/test-table) + (r/changes {:include-initial true}) + (r/run conn))] + (async/go (async/ Date: Mon, 16 May 2016 21:50:14 +1200 Subject: [PATCH 10/11] Add high tokens to test-util setup statements to help debug test queries --- test/rethinkdb/test_utils.clj | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/rethinkdb/test_utils.clj b/test/rethinkdb/test_utils.clj index 34aa93b..1f491e0 100644 --- a/test/rethinkdb/test_utils.clj +++ b/test/rethinkdb/test_utils.clj @@ -19,14 +19,16 @@ (r/run (r/db-create db-name) conn)) (defn setup-each [test-fn] - (with-open [conn (r/connect :db test-db)] + ;; Add token to help distinguish between setup and test code when tracing responses + (with-open [conn (r/connect :db test-db :token 5000)] (-> (r/table test-table) (r/delete {:durability :soft :return-changes false}) (r/run conn)) (test-fn))) (defn setup-once [test-fn] - (with-open [conn (r/connect)] + ;; Add token to help distinguish between setup and test code when tracing responses + (with-open [conn (r/connect :token 4000)] (ensure-db test-db conn) (ensure-table test-table {:primary-key :national_no} conn) (test-fn) From d1f36b9fe292444ad5996615737cb150c00461a4 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Tue, 17 May 2016 07:35:08 +1200 Subject: [PATCH 11/11] Add more pokemon to insert test --- test/rethinkdb/async_test.clj | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj index 08273a2..e61379b 100644 --- a/test/rethinkdb/async_test.clj +++ b/test/rethinkdb/async_test.clj @@ -11,7 +11,9 @@ (use-fixtures :once utils/setup-once) (def pokemon [{:national_no 25 :name "Pikachu"} - {:national_no 26 :name "Raichu"}]) + {:national_no 26 :name "Raichu"} + {:national_no 27 :name "Sandshrew"} + {:national_no 28 :name "Nidoran"}]) (def changefeed-pokemon (map #(hash-map :new_val %) pokemon)) @@ -36,7 +38,7 @@ ;; Insert (success atom) [{:deleted 0 :errors 0 - :inserted 2 + :inserted 4 :replaced 0 :skipped 0 :unchanged 0}]