|
46 | 46 | {}
|
47 | 47 | (.toArray headers))))
|
48 | 48 |
|
49 |
| -(defn- parse-type-fn-with-handlers [^String type-name type-handlers] |
| 49 | +(defn- parse-type-fn-with-handlers [^String type-name type-handlers v->clj?] |
50 | 50 | (if-let [handler (get type-handlers type-name)]
|
51 |
| - (partial th/parse-bytes handler) |
| 51 | + (if v->clj? |
| 52 | + (fn [topic v] |
| 53 | + (->> (th/parse-bytes handler topic v) |
| 54 | + (th/->clj handler))) |
| 55 | + (partial th/parse-bytes handler)) |
52 | 56 | (throw (ex-info (format "no type handler registered for %s" type-name) {::type-handler-keys (keys type-handlers)}))))
|
53 | 57 |
|
54 | 58 | (defn- make-default-record-handler-fn [^String value-type type-handlers]
|
|
60 | 64 |
|
61 | 65 | (defn- make-record-handler-fn [{:keys [::key-type ::key-parse-fn ::value-type ::value-parse-fn ::map-record-fn
|
62 | 66 | ::include-headers? ::include-topic? ::include-partition? ::include-offset?
|
63 |
| - ::include-timestamp?] :as record-handling} type-handlers] |
| 67 | + ::include-timestamp? ::key->clj? ::val->clj?] :as record-handling} type-handlers] |
64 | 68 | (let [k-parse-fn (cond (fn? key-parse-fn)
|
65 | 69 | key-parse-fn
|
66 | 70 |
|
67 | 71 | (some? key-type)
|
68 |
| - (parse-type-fn-with-handlers key-type type-handlers) |
| 72 | + (parse-type-fn-with-handlers key-type type-handlers key->clj?) |
69 | 73 |
|
70 | 74 | ;; by default, don't try to parse the key
|
71 | 75 | true
|
|
74 | 78 | value-parse-fn
|
75 | 79 |
|
76 | 80 | (some? value-type)
|
77 |
| - (parse-type-fn-with-handlers value-type type-handlers) |
| 81 | + (parse-type-fn-with-handlers value-type type-handlers val->clj?) |
78 | 82 |
|
79 | 83 | ;; by default, don't try to parse the value
|
80 | 84 | true
|
|
247 | 251 | (print beg))))
|
248 | 252 | (idle-poll! consumer)))
|
249 | 253 |
|
| 254 | +(defn- remove-consumer-assignment! |
| 255 | + "Removes the given `assignment` (a TopicPartition) from the given `consumer`. If the consumer did not already |
| 256 | + have it assigned, this is a no-op." |
| 257 | + [active-assignments-atom ^KafkaConsumer consumer assignments] |
| 258 | + (doseq [topic (keys assignments)] |
| 259 | + (let [[part] (get assignments topic) |
| 260 | + tp (TopicPartition. topic part) |
| 261 | + cur-assignments (set (.assignment consumer))] |
| 262 | + (swap! active-assignments-atom disj tp) |
| 263 | + (when (contains? cur-assignments tp) |
| 264 | + (let [new-assignments (disj cur-assignments tp)] |
| 265 | + (.assign consumer new-assignments))))) |
| 266 | + (idle-poll! consumer) |
| 267 | + ::ok) |
| 268 | + |
250 | 269 | (defn- assign-and-seek! [^KafkaConsumer consumer assignments active-assignments-atom]
|
251 | 270 | (doseq [topic (keys assignments)]
|
252 |
| - (let [[part offset?] (get assignments topic) |
| 271 | + (let [[part ^long offset?] (get assignments topic) |
253 | 272 | tp (TopicPartition. topic part)]
|
254 | 273 | (add-consumer-assignment! active-assignments-atom consumer tp true)
|
255 | 274 | (if (some? offset?)
|
|
260 | 279 | ::new-assignment]}]
|
261 | 280 | (assign-and-seek! consumer new-assignment active-assignments-atom))
|
262 | 281 |
|
| 282 | +(defmethod handle-repl->client-message ::unassign [^KafkaConsumer consumer {:keys [::active-assignments-atom |
| 283 | + ::remove-assignments]}] |
| 284 | + (remove-consumer-assignment! active-assignments-atom consumer remove-assignments)) |
| 285 | + |
263 | 286 | (defmethod handle-repl->client-message ::seek [^KafkaConsumer consumer
|
264 | 287 | {:keys [::active-assignments-atom
|
265 | 288 | ::to
|
|
428 | 451 | ::active-assignments-atom active-assignments-atom
|
429 | 452 | ::new-assignment {topic [partition offset?]}}))
|
430 | 453 |
|
| 454 | +(defn- unassign! |
| 455 | + "" |
| 456 | + [to-consumer-chan from-consumer-chan active-assignments-atom topic partition & [offset?]] |
| 457 | + (consumer-roundtrip to-consumer-chan from-consumer-chan {::message-type ::unassign |
| 458 | + ::active-assignments-atom active-assignments-atom |
| 459 | + ::remove-assignments {topic [partition]}})) |
| 460 | + |
431 | 461 |
|
432 | 462 | (s/def ::consumer-record-map-fn fn?)
|
433 | 463 | (s/def ::poll-xf-reducing-fn fn?)
|
|
491 | 521 | (s/def ::value-type string?)
|
492 | 522 | (s/def ::key-parse-fn fn?)
|
493 | 523 | (s/def ::value-parse-fn fn?)
|
| 524 | + |
| 525 | +(s/def ::key->clj? boolean?) |
| 526 | +(s/def ::val->clj? boolean?) |
494 | 527 | (s/def ::map-record-fn fn?)
|
| 528 | + |
| 529 | +(s/def ::include-headers? boolean?) |
| 530 | +(s/def ::include-topic? boolean?) |
| 531 | +(s/def ::include-partition? boolean?) |
| 532 | +(s/def ::include-offset? boolean?) |
| 533 | +(s/def ::include-timestamp? boolean?) |
495 | 534 | (s/def ::reducing-fn fn?)
|
496 | 535 |
|
497 | 536 | (s/def ::record-handling-opts (s/keys :opt [::key-type ::value-type ::key-parse-fn ::value-parse-fn ::map-record-fn
|
498 |
| - ::reducing-fn])) |
| 537 | + ::reducing-fn ::key->clj? ::val->clj?])) |
499 | 538 |
|
500 | 539 | (defn record-handling-opts->poll-xf-args
|
501 | 540 | "For the given record-handling-opts, and type-handlers, produce a poll-xf-args map, which can then be passed into a
|
|
563 | 602 | (read-from [this topic part offset num-msg record-handling-opts])
|
564 | 603 | (last-read [this record-handling-opts])
|
565 | 604 | (assign [this topic part offset])
|
| 605 | + (unassign [this topic part]) |
566 | 606 | (seek [this offset])
|
567 | 607 | (seek+ [this offset+])
|
568 | 608 | (seek- [this offset-])
|
|
594 | 634 | (last-read* last-read-records-atom record-handling-opts type-handlers))
|
595 | 635 | (assign [_ topic part offset]
|
596 | 636 | (assign! to-consumer-chan from-consumer-chan active-assignments-atom topic part offset))
|
| 637 | + (unassign [_ topic part] |
| 638 | + (unassign! to-consumer-chan from-consumer-chan active-assignments-atom topic part)) |
597 | 639 | (seek [_ offset]
|
598 | 640 | (seek! to-consumer-chan from-consumer-chan {::active-assignments-atom active-assignments-atom
|
599 | 641 | ::to offset}))
|
|
804 | 846 | ^long ^{:doc "The offset to start reading from", :default 0} offset
|
805 | 847 | ^long ^{:doc "The number of messages to read", :default 10} num-msg
|
806 | 848 | ;; TODO: figure out how to get by with something like msg-format instead of record-handling-opts in Java
|
807 |
| - ^{:doc "Record handling options (see documentation)"} record-handling-opts) |
| 849 | + ^{:doc "Record handling options (see documentation)", ::required? true} record-handling-opts) |
808 | 850 | (defop ^{:doc "Read from the current active assignments"} poll kcr-client true true
|
809 | 851 | ^long ^{:doc "The number of messages to read", :default 10} num-msg
|
810 | 852 | ^{:doc "Record handling options (see documentation)", ::required? true} record-handling-opts)
|
811 | 853 | (defop ^{:doc "Add a topic/partition to the active assignments"} assign kcr-client true true
|
812 | 854 | ^{:doc "The topic name to read from", ::required? true} topic
|
813 | 855 | ^long ^{:doc "The topic partition to read from", ::required? true} part
|
814 | 856 | ^long ^{:doc "The offset to start reading from", ::required? true} offset)
|
| 857 | + (defop ^{:doc "Remove a topic/partition from the active assignments"} unassign kcr-client true true |
| 858 | + ^{:doc "The topic name to read from", ::required? true} topic |
| 859 | + ^long ^{:doc "The topic partition to read from", ::required? true} part) |
815 | 860 | (defop ^{:doc "Change the offset to poll from next for the given topic/partition"} seek kcr-client true true
|
816 | 861 | ^long ^{:doc "The offset to seek to", ::required? true} offset)
|
817 | 862 | (defop ^{:doc "Increment the offset for the current assignment by the given amount"} seek+ kcr-client true true
|
|
820 | 865 | ^long ^{:doc "The number of offsets to recede by", ::required? true} by)
|
821 | 866 | (defop ^{:doc "Seek forward in the message stream while some condition is met"} seek-while kcr-client false false
|
822 | 867 | ^{:doc "Conditional (while) function; operates on the mapped record"} while-fn
|
823 |
| - ^{:doc "Record handling options (see documentation)"} record-handling-opts) |
| 868 | + ^{:doc "Record handling options (see documentation)", ::required? true} record-handling-opts) |
824 | 869 | (defop ^{:doc "Seek forward in the message stream while some condition is met"} seek-until kcr-client false false
|
825 | 870 | ^{:doc "Conditional (until) function; operates on the mapped record"} while-fn
|
826 |
| - ^{:doc "Record handling options (see documentation)"} record-handling-opts) |
| 871 | + ^{:doc "Record handling options (see documentation)", ::required? true} record-handling-opts) |
827 | 872 | ;; will revisit this once we have a better Java args parser
|
828 | 873 | (defop ^{:doc "Set a config option for a type handler arr", ::print-offsets? false} set-type-handler-config! kcr-client true true
|
829 | 874 | ^{:doc "The type on which to set the config option", ::required? true} type-name
|
830 | 875 | ^{:doc "The config option's key", ::required? true} k
|
831 | 876 | ^{:doc "The config option's arguments", ::required? true, ::varargs? true} args)
|
832 |
| - (defop ^{:doc "Print the last read results"} last-read kcr-client true true) |
| 877 | + (defop ^{:doc "Print the last read results"} last-read kcr-client true true |
| 878 | + ^{:doc "Record handling options (see documentation)", ::required? true} record-handling-opts) |
833 | 879 | (defop ^{:doc "Stop the client and disconnect the session"} stop kcr-client true false)
|
834 | 880 | (intern (the-ns 'user) 'help print-clj-help)
|
835 | 881 | (when (ifn? run-body)
|
|
0 commit comments