Skip to content

Commit 1945b20

Browse files
committed
release: 2.6.0-1.5
2 parents de61ac1 + 091f3fa commit 1945b20

File tree

10 files changed

+130
-126
lines changed

10 files changed

+130
-126
lines changed

.github/image.png

355 KB
Loading

.github/workflows/clojure.yml

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: Clojure CI
2+
3+
on:
4+
push:
5+
branches: [ "master", "devel" ]
6+
pull_request:
7+
branches: [ "master", "devel" ]
8+
9+
jobs:
10+
build:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v2
14+
15+
- name: Install dependencies
16+
run: lein -U deps
17+
18+
- name: Setup Test environnement
19+
run: docker-compose up -d
20+
21+
- name: Run tests
22+
run: lein test
23+
24+
- name: Deploy to Clojars
25+
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/devel'
26+
run: lein deploy
27+
env:
28+
CLOJARS_USERNAME: ${{ secrets.CLOJARS_USERNAME }}
29+
CLOJARS_PASSWORD: ${{ secrets.CLOJARS_PASSWORD }}
30+
31+
- name: Generate documentation
32+
if: github.ref == 'refs/heads/master'
33+
run: lein marg -d public -f index.html
34+
35+
- name: Deploy documentation to Github Pages
36+
uses: JamesIves/[email protected]
37+
if: github.ref == 'refs/heads/master'
38+
with:
39+
branch: gh-pages
40+
folder: public

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ pom.xml.asc
1010
.hgignore
1111
.hg/
1212
public
13+
docs

.gitlab-ci.yml

-27
This file was deleted.

Jenkinsfile

-48
This file was deleted.

README.md

+4-11
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,16 @@
11
<p align="center">
2-
<img src="https://assets.letemps.ch/sites/default/files/styles/article_detail_desktop/public/media/2021/11/08/file79vuqsh1bkg1fcq2s6i0.jpg?h=acd92167&itok=7inwu_KU"/>
2+
<img src="https://raw.githubusercontent.com/oscaro/felice/devel/.github/image.png"/>
33
</p>
44

5-
# Felice
5+
# Felice [![Clojars Project](https://img.shields.io/clojars/v/com.oscaro/felice.svg)](https://clojars.org/com.oscaro/felice) [![Clojure CI](https://github.com/oscaro/felice/actions/workflows/clojure.yml/badge.svg?branch=devel)](https://github.com/oscaro/felice/actions/workflows/clojure.yml)
6+
67

78
Felice is client library for [Apache Kafka](http://kafka.apache.org) in Clojure.
89

910
Built with simplicity it mind, it support by default JSON, Transit & Nippy (Fast | LZ4) and
1011
provide also custom Serializer / Deserializer mechanism
1112

12-
See [API docs](https://it-dev.pages.oscaroad.com/felice/)
13-
14-
## Installation
15-
16-
Add the latest release to your **project.clj**
17-
18-
```
19-
[com.oscaro/felice "2.6.0-1.3"]
20-
```
13+
> Note: the version contains in a first time the [Kafka Client](https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients) then the felice version, separated by hyphen.
2114
2215
## De/Serializers
2316

dev/user.clj

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
(ns user
2+
(:require [felice.consumer :as fc]
3+
[felice.producer :as fp]))
4+
5+
;;; Playground

project.clj

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
(defproject com.oscaro/felice "2.6.0-1.4"
1+
(defproject com.oscaro/felice "2.6.0-1.5"
22
:description "Felice is client library for Apache Kafka in Clojure"
33
:url "https://gitlab.oscaroad.com/it-dev/felice"
44
:repositories [["oscaro-releases" {:url "https://artifactory.oscaroad.com/artifactory/libs-release-local"}]
55
["oscaro-snapshots" {:url "https://artifactory.oscaroad.com/artifactory/libs-snapshot-local"}]
66
["oscaro-remote" {:url "https://artifactory.oscaroad.com/artifactory/remote-repos"}]]
77
:license {:name "Eclipse Public License"
88
:url "http://www.eclipse.org/legal/epl-v10.html"}
9-
:plugins [[lein-codox "0.10.6"]]
10-
:codox {:output-path "public"
11-
:source-uri "https://gitlab.oscaroad.com/it-dev/felice/blob/{git-commit}/{filepath}#L{line}"}
9+
:plugins [[lein-marginalia "0.9.1"]]
10+
:deploy-repositories [["snapshots" {:url "https://repo.clojars.org"
11+
:username :env/clojars_username
12+
:password :env/clojars_password
13+
:sign-releases false}]
14+
["releases" {:url "https://repo.clojars.org"
15+
:username :env/clojars_username
16+
:password :env/clojars_password
17+
:sign-releases false}]]
1218
:dependencies [[org.clojure/clojure "1.10.1"]
13-
[org.apache.kafka/kafka-clients "2.6.0"]
19+
[org.apache.kafka/kafka-clients "3.2.0"]
1420
[com.cognitect/transit-clj "0.8.319" :exclusions [com.fasterxml.jackson.core/jackson-core]]
1521
[metosin/jsonista "0.2.5"]
1622
[com.taoensso/nippy "3.1.1"]]
1723
:profiles {:dev {:dependencies [[org.slf4j/slf4j-jdk14 "1.7.29" :exclusions [org.slf4j/slf4j-api]]
18-
[org.clojure/core.async "0.5.527"]]}})
24+
[org.clojure/core.async "0.5.527"]]
25+
:source-paths ["dev"]}}
26+
:repl-options {:init-ns user})

src/felice/consumer.clj

+39-14
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@
3636
[k v*])))
3737
(into {})))
3838

39-
;; ## COMMIT FUNCTIONS
39+
40+
;;; Commit functions
4041

4142
(defn commit-sync
42-
"Commit offsets returned on the last `poll()` for all the subscribed list of topics and partitions.
43+
"Commit offsets returned on the last `poll()` for all
44+
the subscribed list of topics and partitions.
4345
4446
consumer must be a KafkaConsumer object"
4547
[^KafkaConsumer consumer]
@@ -104,18 +106,31 @@
104106
(.position consumer (TopicPartition. topic partition)))
105107

106108
(defn poll
107-
"Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This method returns immediately if there are records available. Otherwise, it will await the timeout ms. If the timeout expires, an empty record set will be returned."
109+
"Fetch data for the topics or partitions specified using
110+
one of the subscribe/assign APIs.
111+
112+
This method returns immediately if there are records available.
113+
Otherwise, it will await the timeout ms.
114+
115+
If the timeout expires, an empty record set will be returned."
108116
[^KafkaConsumer consumer timeout]
109117
(.poll consumer (Duration/ofMillis timeout)))
110118

111119
(defn wakeup
112-
"Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
113-
The thread which is blocking in an operation will throw WakeupException. If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead."
120+
"Wakeup the consumer. This method is thread-safe and is
121+
useful in particular to abort a long poll.
122+
123+
The thread which is blocking in an operation will throw WakeupException.
124+
125+
If no thread is blocking in a method which can throw WakeupException,
126+
the next call to such a method will raise it instead."
114127
[^KafkaConsumer consumer]
115128
(.wakeup consumer))
116129

117130
(defn consumer-record->map
118-
"transforms a ConsumerRecord to a clojure map containing :key :value :offset :topic :partition :timestamp :timestamp-type and :header "
131+
"transforms a ConsumerRecord to a clojure map containing:
132+
`:key``:value` `:offset` `:topic` `:partition` `:timestamp`
133+
`:timestamp-type` and `:header`"
119134
[^ConsumerRecord record]
120135
{:key (.key record)
121136
:offset (.offset record)
@@ -138,7 +153,8 @@
138153
[^ConsumerRecords records]
139154
(let [topics (map (comp :topic topic-partition->map) (.partitions records))]
140155
(->> topics
141-
(map (fn [topic] [topic (map consumer-record->map (.records records ^String topic))]))
156+
(map (fn [topic] [topic (map consumer-record->map
157+
(.records records ^String topic))]))
142158
(into {}))))
143159

144160
(defn ^:no-doc poll->records-by-partition
@@ -159,9 +175,10 @@
159175
(defn consumer
160176
"create a consumer
161177
162-
conf is a map {:keyword value} (for all possibilities see https://kafka.apache.org/documentation/#consumerconfigs)
178+
conf is a map {:keyword value}
179+
See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities
163180
164-
key and value deserializer can be one of :long :string :t+json :t+mpack
181+
key and value serializer can be one of keys defined in `felice.serializer` namespace
165182
with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf
166183
167184
you can optionaly provide a list of topics to subscribe to"
@@ -185,10 +202,17 @@
185202
(consumer (assoc conf :topics topics) key-deserializer value-deserializer)))
186203

187204
(defn close!
188-
"Tries to close the consumer cleanly within the specified timeout in ms (defaults to 30 secs).
189-
This method waits up to timeout for the consumer to complete pending commits and leave the group.
190-
If auto-commit is enabled, this will commit the current offsets if possible within the timeout.
191-
If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed."
205+
"Tries to close the consumer cleanly within the specified timeout in ms
206+
(defaults to 30 secs).
207+
208+
This method waits up to timeout for the consumer to complete
209+
pending commits and leave the group.
210+
211+
If auto-commit is enabled, this will commit the current
212+
offsets if possible within the timeout.
213+
214+
If the consumer is unable to complete offset commits and gracefully
215+
leave the group before the timeout expires, the consumer is force closed."
192216
([^KafkaConsumer consumer] (.close consumer))
193217
([^KafkaConsumer consumer timeout] (.close consumer (Duration/ofMillis timeout))))
194218

@@ -233,7 +257,8 @@
233257
* :poll : commit last read offset after processing all the items of a poll
234258
* :record : commit the offset of every processed record
235259
236-
if you want to commit messages yourself, set commit policy to `:never` and use `commit-message-offset` or `commit-sync`
260+
if you want to commit messages yourself, set commit policy to `:never`
261+
and use `commit-message-offset` or `commit-sync`
237262
238263
### Returns
239264
stop-fn: callback function to stop the loop"

src/felice/producer.clj

+27-20
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord Callback]))
66

77
(defn flush!
8-
"Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records."
8+
"Invoking this method makes all buffered records immediately available
9+
to send (even if linger.ms is greater than 0) and blocks on the
10+
completion of the requests associated with these records."
911
[^KafkaProducer producer]
1012
(.flush producer))
1113

@@ -20,12 +22,13 @@
2022
(.partitionsFor producer topic))
2123

2224
(defn ^:no-doc producer-record
23-
"transforms a clojure map to a ProducerRecord object"
25+
"Transforms a clojure map to a ProducerRecord object"
2426
[{:keys [topic key value partition timestamp headers]}]
2527
(ProducerRecord. topic partition timestamp key value headers))
2628

2729
(defn ^:no-doc send-record!
28-
"sends a ProducerRecord with an optional callback when the send has been acknowledged."
30+
"Sends a ProducerRecord with an optional callback when the
31+
send has been acknowledged."
2932
([^KafkaProducer producer ^ProducerRecord record]
3033
(.send producer (producer-record record)))
3134
([^KafkaProducer producer ^ProducerRecord record callback]
@@ -47,25 +50,33 @@
4750
([^KafkaProducer producer record-map] (send-record! producer record-map)))
4851

4952
(defn send-with-callback!
50-
"asynchronously send a record triggering the given callback when the send has been acknowledged
51-
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing."
53+
"Asynchronously send a record triggering the given callback when the send has
54+
been acknowledged.
55+
Note that callbacks will generally execute in the I/O thread of the producer
56+
and so should be reasonably fast or they will delay the sending of messages
57+
from other threads.
58+
59+
If you want to execute blocking or computationally expensive callbacks it
60+
is recommended to use your own Executor in the callback body to parallelize
61+
processing."
5262
([^KafkaProducer producer topic value cb] (send-record! producer (->record topic value) cb))
5363
([^KafkaProducer producer topic key value cb] (send-record! producer (->record topic key value) cb))
5464
([^KafkaProducer producer record-map cb] (send-record! producer record-map cb)))
5565

5666
(defn send!!
57-
"synchronously send a record - wait until acknowledged"
67+
"Synchronously send a record - wait until acknowledged"
5868
([^KafkaProducer producer topic value] (send!! producer (->record topic value)))
5969
([^KafkaProducer producer topic key value] (send!! producer (->record topic key value)))
6070
([^KafkaProducer producer record-map] (.get (send! producer record-map))))
6171

6272
(defn producer
63-
"create a producer
73+
"Create a producer
6474
65-
conf is a map {:keyword value} (for all possibilities see https://kafka.apache.org/documentation/#producerconfigs)
75+
`conf` is a map {:keyword value}
76+
See https://kafka.apache.org/documentation/#producerconfigs for all possibilities
6677
67-
key and value serializer can be one of :long :string :t+json :t+mpack
68-
with the 1 argument arity, :key.serializer and :value.serializer must be provided in conf"
78+
key and value serializer can be one of keys defined in `felice.serializer` namespace
79+
with the 1 argument arity, :key.serializer and :value.serializer must be provided in conf"
6980
([conf]
7081
(KafkaProducer. (walk/stringify-keys (dissoc conf :key.serializer :value.serializer))
7182
(serializer (:key.serializer conf))
@@ -75,16 +86,12 @@ with the 1 argument arity, :key.serializer and :value.serializer must be provide
7586
:value.serializer value-serializer))))
7687

7788
(defn close!
78-
"This method waits up to timeout ms for the producer to complete the sending of all incomplete requests.
79-
If the producer is unable to complete all requests before the timeout expires, this method will fail any unsent and unacknowledged records immediately.
89+
"This method waits up to timeout ms for the producer to complete the
90+
sending of all incomplete requests.
91+
92+
If the producer is unable to complete all requests before the timeout
93+
expires, this method will fail any unsent and unacknowledged records immediately.
94+
8095
Calling close with no timeout is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)"
8196
([^KafkaProducer producer] (.close producer))
8297
([^KafkaProducer producer timeout] (.close producer timeout TimeUnit/MILLISECONDS)))
83-
84-
(comment
85-
;; default partitionner
86-
(import 'org.apache.kafka.common.utils.Utils)
87-
(defn partition-from-bytes [partition-count bytes]
88-
(mod (Utils/toPositive (Utils/murmur2 bytes)) partition-count))
89-
(defn partition-from-string [partition-count string]
90-
(partition-from-bytes partition-count (.getBytes string))))

0 commit comments

Comments
 (0)