Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make success sequence return each result as an element in the stream #161

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
16 changes: 10 additions & 6 deletions src/rethinkdb/net.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[rethinkdb.query-builder :as qb]
[rethinkdb.response :refer [parse-response]]
[rethinkdb.types :as types])
(:import [java.io Closeable]))
(:import [java.io Closeable]
[clojure.lang Keyword]))

(declare send-continue-query send-stop-query)

Expand Down Expand Up @@ -54,7 +55,7 @@
(s/stream->seq stream))
java.lang.Iterable
(iterator [this]
(.iterator (seq this)))
(.iterator ^Iterable (seq this)))
java.util.Collection
(toArray [this]
(into-array Object this))
Expand All @@ -78,7 +79,7 @@
(do (swap! (:conn conn) update-in [:pending token] #(dissoc % :cursor))
(s/put-all! cursor (conj resp ::done)))
(do (swap! (:conn conn) update :pending #(dissoc % token))
(s/put! result resp)
(s/put-all! result resp)
(s/close! result)))))

(defn append-result [conn token resp]
Expand All @@ -93,9 +94,10 @@

(defn handle-response [conn token resp]
(let [{type :t resp :r etype :e notes :n :as json-resp} resp]
(log/debug "Handling response" token type resp)
(case (int type)
(1 5) ;; Success atom, server info
(deliver-result conn token (first resp))
(deliver-result conn token resp)

2 ;; Success sequence
(deliver-result conn token resp)
Expand Down Expand Up @@ -137,7 +139,7 @@
(-> json
(json/parse-string-strict true)
parse-response)))
(io/decode-channel (:client @conn) query-protocol)))
(io/decode-stream (:client @conn) query-protocol)))


(defn add-global-optargs [{:keys [db]} query]
Expand All @@ -149,6 +151,7 @@
(defn add-token [conn query]
(let [token (:token (swap! (:conn conn) update-in [:token] inc))
query (assoc query :token token)]
(assert (nil? (get-in @conn [:pending token])))
(swap! (:conn conn) assoc-in [:pending token] query)
query))

Expand All @@ -157,12 +160,13 @@
(async/pipeline 1 query-chan (map (partial add-token conn)) initial-query-chan)
(async/go-loop []
(when-let [{:keys [term query-type token]} (async/<! query-chan)]
(log/debug "Sending query" query-type token term)
(let [term (add-global-optargs @conn
(if term
(qb/parse-query query-type term)
(qb/parse-query query-type)))
json (json/generate-string term {:key-fn #(subs (str %) 1)})]
(when-not (and (= query-type :CONTINUE)
(when-not (and (= ^Keyword query-type :CONTINUE)
(not (get-in @conn [:pending token])))
(s/put! client (io/encode query-protocol [token json])))
(recur))))))
Expand Down
2 changes: 1 addition & 1 deletion test-resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
<root level="ERROR">
<appender-ref ref="STDOUT"/>
</root>
<logger name="rethinkdb.net" level="INFO"/>
<logger name="rethinkdb.net" level="DEBUG"/>
</configuration>
69 changes: 69 additions & 0 deletions test/rethinkdb/async_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
(ns rethinkdb.async-test
(:require [clojure.test :refer :all]
[clojure.core.async :as async]
[rethinkdb.query :as r]
[rethinkdb.test-utils :as utils])
(:import (clojure.core.async.impl.protocols ReadPort)
(org.slf4j LoggerFactory)
(ch.qos.logback.classic Logger Level)))

(use-fixtures :each utils/setup-each)
(use-fixtures :once utils/setup-once)

(def pokemon [{:national_no 25 :name "Pikachu"}
{:national_no 26 :name "Raichu"}
{:national_no 27 :name "Sandshrew"}
{:national_no 28 :name "Nidoran"}])
(def changefeed-pokemon (map #(hash-map :new_val %) pokemon))


(deftest always-return-async
(let [root-logger ^Logger (LoggerFactory/getLogger "rethinkdb.net")
level (.getLevel root-logger)]
(.setLevel root-logger Level/OFF)
(with-open [conn (r/connect :async? true)]
(are [query] (instance? ReadPort (r/run query conn))
(r/db-list)
(-> (r/db :non-existent) (r/table :nope))
(-> (r/db utils/test-db) (r/table utils/test-table) (r/insert {:a 1}))))
(.setLevel root-logger level)))

(deftest async-results
(with-open [conn (r/connect :async? true :db utils/test-db)]
(testing "Shape of async results"
(are [expected query] (= (->> (r/run query conn)
(async/into [])
(async/<!!))
expected)
;; Insert (success atom)
[{:deleted 0
:errors 0
:inserted 4
:replaced 0
:skipped 0
:unchanged 0}]
(-> (r/table utils/test-table)
(r/insert pokemon))

;; Success sequence
pokemon (-> (r/table utils/test-table))

;; Success atom
[pokemon] (-> (r/table utils/test-table) (r/order-by :name))

;; Changefeed
changefeed-pokemon
(-> (r/table utils/test-table)
(r/changes {:include-initial true})
(r/limit 2))))

#_(testing "Closing a changefeed with results"
(let [changefeed (-> (r/table utils/test-table)
(r/changes {:include-initial true})
(r/run conn))]
(async/go (async/<! (async/timeout 1000))
(async/close! changefeed))
(is (= changefeed-pokemon
(async/alt!!
(async/into [] changefeed) ([v _] v)
(async/timeout 1000) ([val _] :timed-out))))))))
6 changes: 4 additions & 2 deletions test/rethinkdb/test_utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
(r/run (r/db-create db-name) conn))

(defn setup-each [test-fn]
(with-open [conn (r/connect :db test-db)]
;; Add token to help distinguish between setup and test code when tracing responses
(with-open [conn (r/connect :db test-db :token 5000)]
(-> (r/table test-table)
(r/delete {:durability :soft :return-changes false})
(r/run conn))
(test-fn)))

(defn setup-once [test-fn]
(with-open [conn (r/connect)]
;; Add token to help distinguish between setup and test code when tracing responses
(with-open [conn (r/connect :token 4000)]
(ensure-db test-db conn)
(ensure-table test-table {:primary-key :national_no} conn)
(test-fn)
Expand Down