Skip to content

Commit b3d3b4e

Browse files
Offset management and multiprocessing embryo (#173)
* Update doc * Add method to return Kafka offset. Doc missing. * WIP multiprocessing * Update method documentation * PEP8 * merge multiprocessing * Update requirements * Reset offset properly * Manage negative offsets * Better handling of the verbosity * Update README * PEP8 * PEP8 * Bump version to 5.0 * Update README
1 parent 3222036 commit b3d3b4e

File tree

7 files changed

+362
-124
lines changed

7 files changed

+362
-124
lines changed

README.md

+16-13
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,28 @@
1313

1414
`fink_client` requires a version of Python 3.9+.
1515

16-
### install with pip
16+
### Install with pip
1717

1818
```bash
1919
pip install fink-client --upgrade
2020
```
2121

22-
### install within a conda environment
22+
### Use or develop in a controlled environment
2323

2424
```bash
2525
git clone https://github.com/astrolabsoftware/fink-client.git
2626
cd fink-client
2727
conda env create -f environment.yml
28-
# install the latest release
29-
pip install fink-client
28+
conda activate fink-client
29+
```
30+
31+
and then install it locally:
32+
33+
```bash
3034
# install for development
3135
pip install -e .
3236
```
3337

34-
Learn how to connect and use it by checking the [documentation](docs/).
35-
3638
## Registration
3739

3840
In order to connect and poll alerts from Fink, you need to get your credentials:
@@ -83,16 +85,14 @@ optional arguments:
8385
-filename FILENAME Path to an alert data file (avro format)
8486
```
8587
86-
More information at [docs/livestream](docs/livestream_manual.md).
88+
More information at [docs/livestream](https://fink-broker.readthedocs.io/en/latest/services/livestream).
8789
8890
## Data Transfer usage
8991
9092
If you requested data using the [Data Transfer service](https://fink-portal.org/download), you can easily poll your stream using:
9193
9294
```bash
93-
fink_datatransfer -h
94-
usage: fink_datatransfer [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [--restart_from_beginning]
95-
[--verbose]
95+
usage: fink_datatransfer [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [-nconsumers NCONSUMERS] [-maxtimeout MAXTIMEOUT] [--restart_from_beginning] [--verbose]
9696

9797
Kafka consumer to listen and archive Fink streams from the data transfer service
9898

@@ -102,13 +102,16 @@ optional arguments:
102102
-limit LIMIT If specified, download only `limit` alerts from the stream. Default is None, that is download all alerts.
103103
-outdir OUTDIR Folder to store incoming alerts. It will be created if it does not exist.
104104
-partitionby PARTITIONBY
105-
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). Default is
106-
time.
105+
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). `classId` is also available for ELASTiCC data. Default is time.
107106
-batchsize BATCHSIZE Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.
107+
-nconsumers NCONSUMERS
108+
Number of parallel consumer to use. Default is 1.
109+
-maxtimeout MAXTIMEOUT
110+
Overwrite the default timeout (in seconds) from user configuration. Default is None.
108111
--restart_from_beginning
109112
If specified, restart downloading from the 1st alert in the stream. Default is False.
110113
--verbose If specified, print on screen information about the consuming.
111114

112115
```
113116
114-
More information at [docs/datatransfer](docs/datatransfer.md).
117+
More information at [docs/datatransfer](https://fink-broker.readthedocs.io/en/latest/services/data_transfer/).

docs/datatransfer.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
_date 26/01/2023_
44

5-
This manual has been tested for `fink-client` version 4.1. In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).
5+
This manual has been tested for `fink-client` version 4.4. In case of trouble, send us an email ([email protected]) or open an issue (https://github.com/astrolabsoftware/fink-client).
66

77
## Installation of fink-client
88

@@ -90,6 +90,10 @@ fink_datatransfer \
9090
--restart_from_beginning
9191
```
9292

93+
## Re-using the same queue
94+
95+
In case you
96+
9397
## Reading alerts
9498

9599
Alerts are saved in the Apache Parquet format. Assuming you are using Python, you can easily read them using Pandas:
@@ -107,5 +111,7 @@ In case of trouble, send us an email ([email protected]) or open an issue
107111

108112
### Known bugs
109113

110-
1. Data from 2019/2020/2021 and 2022/2023 are not compatible (different schemas). We will resolve the problem soon, but in the meantime, do not mix data from the two periods in a single query.
114+
1. Data from 2019/2020/2021 and 2022/2023 are not compatible (different schemas). We will resolve the problem soon, but in the meantime, do not mix data from the two periods in a single query.
111115
2. With version 4.0, you wouldn't have the partitioning column when reading in a dataframe. This has been corrected in 4.1.
116+
3. If you have recurrent timeouts, try to increase the timeout in your configuration file `~/.finkclient/credentials.yml`.
117+
4.

environment.yml

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ dependencies:
1515
- pyyaml
1616
- tabulate
1717
- matplotlib
18+
- tqdm
1819

fink_client/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
__version__ = "4.4"
15+
__version__ = "5.0"
1616
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"

fink_client/consumer.py

+82
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
import io
1717
import json
18+
import time
1819
import fastavro
1920
import confluent_kafka
2021

@@ -267,6 +268,87 @@ def close(self):
267268
self._consumer.close()
268269

269270

271+
def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
272+
""" Poll servers to get the total committed offsets, and remaining lag
273+
274+
Parameters
275+
----------
276+
consumer: confluent_kafka.Consumer
277+
Kafka consumer
278+
topic: str
279+
Topic name
280+
waitfor: int, optional
281+
Time in second to wait before polling. Default is 1 second.
282+
timeout: int, optional
283+
Timeout in second when polling the servers. Default is 10.
284+
verbose: bool, optional
285+
If True, prints useful table. Default is False.
286+
287+
Returns
288+
---------
289+
total_offsets: int
290+
Total number of messages committed across all partitions
291+
total_lag: int
292+
Remaining messages in the topic across all partitions.
293+
"""
294+
time.sleep(waitfor)
295+
# Get the topic's partitions
296+
metadata = consumer.list_topics(topic, timeout=timeout)
297+
if metadata.topics[topic].error is not None:
298+
raise confluent_kafka.KafkaException(metadata.topics[topic].error)
299+
300+
# Construct TopicPartition list of partitions to query
301+
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
302+
303+
# Query committed offsets for this group and the given partitions
304+
try:
305+
committed = consumer.committed(partitions, timeout=timeout)
306+
except confluent_kafka.KafkaException as exception:
307+
kafka_error = exception.args[0]
308+
if kafka_error.code() == confluent_kafka.KafkaError._TIMED_OUT:
309+
return -1, -1
310+
else:
311+
return 0, 0
312+
313+
total_offsets = 0
314+
total_lag = 0
315+
if verbose:
316+
print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
317+
print("=" * 72)
318+
for partition in committed:
319+
# Get the partitions low and high watermark offsets.
320+
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=timeout, cached=False)
321+
322+
if partition.offset == confluent_kafka.OFFSET_INVALID:
323+
offset = "-"
324+
else:
325+
offset = "%d" % (partition.offset)
326+
327+
if hi < 0:
328+
lag = "no hwmark" # Unlikely
329+
elif partition.offset < 0:
330+
# No committed offset, show total message count as lag.
331+
# The actual message count may be lower due to compaction
332+
# and record deletions.
333+
lag = "%d" % (hi - lo)
334+
partition.offset = 0
335+
else:
336+
lag = "%d" % (hi - partition.offset)
337+
338+
total_offsets = partition.offset
339+
total_lag = int(lag)
340+
341+
if verbose:
342+
print("%-50s %9s %9s" % (
343+
"{} [{}]".format(partition.topic, partition.partition), offset, lag))
344+
if verbose:
345+
print("-" * 72)
346+
print("%-50s %9s %9s" % (
347+
"Total", total_offsets, total_lag))
348+
print("-" * 72)
349+
350+
return total_offsets, total_lag
351+
270352
def _get_kafka_config(config: dict) -> dict:
271353
"""Returns configurations for a consumer instance
272354

0 commit comments

Comments
 (0)