Skip to content

Commit

Permalink
Merge pull request #39 from AppsFlyer/listeners
Browse files Browse the repository at this point in the history
refactor listeners
  • Loading branch information
barkanido authored Aug 18, 2020
2 parents 2aaa560 + 758cad6 commit a399128
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 105 deletions.
22 changes: 22 additions & 0 deletions src/main/clojure/aerospike_clj/aerospike_record.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
(ns aerospike-clj.aerospike-record
(:require [aerospike-clj.utils :as utils])
(:import [com.aerospike.client Record]))

(defrecord AerospikeRecord [payload ^Integer gen ^Integer ttl])

(defn record->map [^Record record]
(and record
(let [bins (into {} (.bins record)) ;; converting from java.util.HashMap to a Clojure map
bin-names (keys bins)
payload (if (utils/single-bin? bin-names)
;; single bin record
(utils/desanitize-bin-value (get bins ""))
;; multiple-bin record
(reduce-kv (fn [m k v]
(assoc m k (utils/desanitize-bin-value v)))
{}
bins))]
(->AerospikeRecord
payload
^Integer (.generation ^Record record)
^Integer (.expiration ^Record record)))))
127 changes: 22 additions & 105 deletions src/main/clojure/aerospike_clj/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
[aerospike-clj.utils :as utils]
[aerospike-clj.metrics :as metrics]
[aerospike-clj.key :as as-key]
[aerospike-clj.listeners]
[aerospike-clj.aerospike-record :as record]
[promesa.core :as p])
(:import [com.aerospike.client AerospikeClient Host Key Bin Record AerospikeException Operation BatchRead AerospikeException$QueryTerminated]
(:import [com.aerospike.client AerospikeClient Host Key Bin Operation BatchRead]
[com.aerospike.client.async EventLoop NioEventLoops EventLoops]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener ExistsListener BatchListListener RecordSequenceListener
InfoListener ExistsArrayListener]
[com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy ScanPolicy InfoPolicy]
[com.aerospike.client.cluster Node]
[aerospike_clj.listeners AsyncExistsListener AsyncDeleteListener AsyncWriteListener
AsyncInfoListener AsyncRecordListener AsyncRecordSequenceListener AsyncBatchListListener
AsyncExistsArrayListener]
[clojure.lang IPersistentMap IPersistentVector]
[java.util List Collection ArrayList Map]
[java.util List Collection ArrayList]
[java.time Instant]))

(declare record->map)

(def EPOCH
^{:doc "The 0 date reference for returned record TTL"}
(.getEpochSecond (Instant/parse "2010-01-01T00:00:00Z")))
Expand Down Expand Up @@ -119,71 +120,6 @@
(p/catch (fn [op-exception]
(on-failure ce op-name op-exception index op-start-time db))))))]
(reduce reducer op-future (:client-events db))))

(defn- ^ExistsListener reify-exists-listener [op-future]
(reify ExistsListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^Key _k ^boolean exists]
(p/resolve! op-future exists))))

(defn- ^DeleteListener reify-delete-listener [op-future]
(reify
DeleteListener
(^void onSuccess [_this ^Key _k ^boolean existed]
(p/resolve! op-future existed))
(^void onFailure [_ ^AerospikeException ex]
(p/reject! op-future ex))))

(defn- ^WriteListener reify-write-listener [op-future]
(reify
WriteListener
(^void onSuccess [_this ^Key _]
(p/resolve! op-future true))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))))

(defn- ^InfoListener reify-info-listener [op-future]
(reify
InfoListener
(^void onSuccess [_this ^Map result-map]
(p/resolve! op-future (into {} result-map)))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))))

(defn- ^RecordListener reify-record-listener [op-future]
(reify RecordListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^Key _k ^Record record]
(p/resolve! op-future record))))

(defn- ^RecordSequenceListener reify-record-sequence-listener [op-future callback]
(reify RecordSequenceListener
(^void onRecord [_this ^Key k ^Record record]
(when (= :abort-scan (callback (.userKey k) (record->map record)))
(throw (AerospikeException$QueryTerminated.))))
(^void onSuccess [_this]
(p/resolve! op-future true))
(^void onFailure [_this ^AerospikeException exception]
(if (instance? AerospikeException$QueryTerminated exception)
(p/resolve! op-future false)
(p/reject! op-future exception)))))

(defn- ^BatchListListener reify-record-batch-list-listener [op-future]
(reify BatchListListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^List records]
(p/resolve! op-future records))))

(defn- ^ExistsArrayListener reify-exists-array-listener [op-future]
(reify ExistsArrayListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists]
(p/resolve! op-future exists))))

(defprotocol UserKey
"Use `create-key` directly to pass a premade custom key to the public API.
When passing a simple String/Integer/Long/ByteArray the key will be created
Expand All @@ -209,27 +145,8 @@
(throw (Exception. (format "%s is %s characters. Bin names have to be <= 14 characters..." bin-name (.length bin-name)))))
(Bin/asNull bin-name))

;; get
(defrecord AerospikeRecord [payload ^Integer gen ^Integer ttl])

(defn- record->map [^Record record]
(and record
(let [bins (into {} (.bins record)) ;; converting from java.util.HashMap to a Clojure map
bin-names (keys bins)]
(->AerospikeRecord
(if (utils/single-bin? bin-names)
;; single bin record
(utils/desanitize-bin-value (get bins ""))
;; multiple-bin record
(reduce-kv (fn [m k v]
(assoc m k (utils/desanitize-bin-value v)))
{}
bins))
^Integer (.generation ^Record record)
^Integer (.expiration ^Record record)))))

(defn- batch-read->map [^BatchRead batch-read]
(assoc (record->map (.record batch-read))
(assoc (record/record->map (.record batch-read))
:index
(.toString (.userKey (.key batch-read)))))

Expand Down Expand Up @@ -263,18 +180,18 @@
;; the `get` method does not require bin-names and the whole record is retrieved
(.get ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-record-listener op-future)
(AsyncRecordListener. op-future)
^Policy (:policy conf)
(create-key index (:dbns db) set-name))
;; For all other cases, bin-names are passed to a different `get` method
(.get ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-record-listener op-future)
(AsyncRecordListener. op-future)
^Policy (:policy conf)
(create-key index (:dbns db) set-name)
^"[Ljava.lang.String;" (utils/v->array String bin-names)))
(let [p (p/chain op-future
record->map
record/record->map
(:transcoder conf identity))]
(register-events p db "read" index start-time))))

Expand Down Expand Up @@ -307,7 +224,7 @@
batch-reads-arr (ArrayList. ^Collection (mapv #(map->batch-read % (:dbns db)) batch-reads))]
(.get ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-record-batch-list-listener op-future)
(AsyncBatchListListener. op-future)
^BatchPolicy (:policy conf)
^List batch-reads-arr)
(let [d (p/chain op-future
Expand All @@ -329,7 +246,7 @@
indices (utils/v->array Key (mapv #(create-key (:index %) aero-namespace (:set %)) indices))]
(.exists ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-exists-array-listener op-future)
(AsyncExistsArrayListener. op-future)
^BatchPolicy (:policy conf)
^"[Lcom.aerospike.client.Key;" indices)
(let [d (p/chain op-future
Expand Down Expand Up @@ -360,7 +277,7 @@
start-time (System/nanoTime)]
(.exists ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-exists-listener op-future)
(AsyncExistsListener. op-future)
^Policy (:policy conf)
(create-key index (:dbns db) set-name))
(register-events op-future db "exists" index start-time))))
Expand All @@ -379,7 +296,7 @@
start-time (System/nanoTime)]
(.put ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
^WriteListener (reify-write-listener op-future)
(AsyncWriteListener. op-future)
^WritePolicy policy
(create-key index (:dbns db) set-name)
^"[Lcom.aerospike.client.Bin;" bins)
Expand Down Expand Up @@ -480,7 +397,7 @@
start-time (System/nanoTime)]
(.touch ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
^WriteListener (reify-write-listener op-future)
(AsyncWriteListener. op-future)
^WritePolicy (policy/write-policy client expiration RecordExistsAction/UPDATE_ONLY)
(create-key index (:dbns db) set-name))
(register-events op-future db "touch" index start-time)))
Expand All @@ -498,7 +415,7 @@
start-time (System/nanoTime)]
(.delete ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
^DeleteListener (reify-delete-listener op-future)
(AsyncDeleteListener. op-future)
^WritePolicy (:policy conf)
(create-key index (:dbns db) set-name))
(register-events op-future db "delete" index start-time))))
Expand All @@ -509,7 +426,7 @@
start-time (System/nanoTime)]
(.put ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
^WriteListener (reify-write-listener op-future)
(AsyncWriteListener. op-future)
^WritePolicy policy
(create-key index (:dbns db) set-name)
^"[Lcom.aerospike.client.Bin;" (utils/v->array Bin (mapv set-bin-as-null bin-names)))
Expand Down Expand Up @@ -543,11 +460,11 @@
start-time (System/nanoTime)]
(.operate ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
^RecordListener (reify-record-listener op-future)
(AsyncRecordListener. op-future)
^WritePolicy (:policy conf (policy/write-policy client expiration RecordExistsAction/UPDATE))
(create-key index (:dbns db) set-name)
(utils/v->array Operation operations))
(register-events (p/then op-future record->map) db "operate" index start-time)))))
(register-events (p/then op-future record/record->map) db "operate" index start-time)))))

(defn scan-set
"Scans through the given set and calls a user defined callback for each record that was found.
Expand All @@ -569,7 +486,7 @@
bin-names (:bins conf)]
(.scanAll ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-record-sequence-listener op-future (:callback conf))
(AsyncRecordSequenceListener. op-future (:callback conf))
^Policy (:policy conf (ScanPolicy.))
aero-namespace
set-name
Expand All @@ -589,7 +506,7 @@
start-time (System/nanoTime)]
(.info ^AerospikeClient client
^EventLoop (.next ^EventLoops (:el db))
(reify-info-listener op-future)
(AsyncInfoListener. op-future)
^InfoPolicy (:policy conf (.infoPolicyDefault ^AerospikeClient client))
node
(into-array String info-commands))
Expand Down
69 changes: 69 additions & 0 deletions src/main/clojure/aerospike_clj/listeners.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
(ns aerospike-clj.listeners
(:require [promesa.core :as p]
[aerospike-clj.aerospike-record :as record])
(:import [com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener
ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener]
[java.util List Map]))

(deftype AsyncExistsListener [op-future]
ExistsListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^Key _k ^boolean exists]
(p/resolve! op-future exists)))

(deftype AsyncDeleteListener [op-future]
DeleteListener
(^void onSuccess [_this ^Key _k ^boolean existed]
(p/resolve! op-future existed))
(^void onFailure [_ ^AerospikeException ex]
(p/reject! op-future ex)))

(deftype AsyncWriteListener [op-future]
WriteListener
(^void onSuccess [_this ^Key _]
(p/resolve! op-future true))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))

(deftype AsyncInfoListener [op-future]
InfoListener
(^void onSuccess [_this ^Map result-map]
(p/resolve! op-future (into {} result-map)))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))

(deftype AsyncRecordListener [op-future]
RecordListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^Key _k ^Record record]
(p/resolve! op-future record)))

(deftype AsyncRecordSequenceListener [op-future callback]
RecordSequenceListener
(^void onRecord [_this ^Key k ^Record record]
(when (= :abort-scan (callback (.userKey k) (record/record->map record)))
(throw (AerospikeException$QueryTerminated.))))
(^void onSuccess [_this]
(p/resolve! op-future true))
(^void onFailure [_this ^AerospikeException exception]
(if (instance? AerospikeException$QueryTerminated exception)
(p/resolve! op-future false)
(p/reject! op-future exception))))

(deftype AsyncBatchListListener [op-future]
BatchListListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^List records]
(p/resolve! op-future records)))

(deftype AsyncExistsArrayListener [op-future]
ExistsArrayListener
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex))
(^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists]
(p/resolve! op-future exists)))

0 comments on commit a399128

Please sign in to comment.