Skip to content

Commit 03f62f8

Browse files
committed
Add new functionalities to reset offsets. Factorize code.
1 parent 47ec2d9 commit 03f62f8

File tree

4 files changed

+314
-232
lines changed

4 files changed

+314
-232
lines changed

fink_client/avro_utils.py

+6
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ def write_alert(
245245
path: str
246246
Folder that will contain the alert. The filename will always be
247247
<objectID>.avro
248+
overwrite: bool, optional
249+
If True, overwrite existing alert. Default is False.
250+
id1: str, optional
251+
First prefix for alert name: {id1}_{id2}.avro
252+
id2: str, optional
253+
Second prefix for alert name: {id1}_{id2}.avro
248254
249255
Examples
250256
--------

fink_client/consumer.py

+236-9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
import io
17+
import sys
1718
import json
1819
import time
1920
import fastavro
@@ -35,7 +36,14 @@ class AlertError(Exception):
3536
class AlertConsumer:
3637
"""High level Kafka consumer to receive alerts from Fink broker"""
3738

38-
def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False):
39+
def __init__(
40+
self,
41+
topics: list,
42+
config: dict,
43+
schema_path=None,
44+
dump_schema=False,
45+
on_assign=None,
46+
):
3947
"""Creates an instance of `AlertConsumer`
4048
4149
Parameters
@@ -52,12 +60,27 @@ def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=Fal
5260
group.id for Kafka consumer
5361
bootstrap.servers: str, optional
5462
Kafka servers to connect to
63+
schema_path: str, optional
64+
If specified, path to an alert schema (avsc).
65+
Default is None.
66+
dump_schema: bool, optional
67+
If True, save incoming alert schema on disk.
68+
Useful for schema inspection when getting `IndexError`.
69+
Default is False.
70+
on_assign: callable, optional
71+
Callback to update the current assignment
72+
and specify start offsets. Default is None.
73+
5574
"""
5675
self._topics = topics
5776
self._kafka_config = _get_kafka_config(config)
5877
self.schema_path = schema_path
5978
self._consumer = confluent_kafka.Consumer(self._kafka_config)
60-
self._consumer.subscribe(self._topics)
79+
80+
if on_assign is not None:
81+
self._consumer.subscribe(self._topics, on_assign=on_assign)
82+
else:
83+
self._consumer.subscribe(self._topics)
6184
self.dump_schema = dump_schema
6285

6386
def __enter__(self):
@@ -281,7 +304,9 @@ def close(self):
281304
self._consumer.close()
282305

283306

284-
def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
307+
def return_offsets(
308+
consumer, topic, waitfor=1, timeout=10, hide_empty_partition=True, verbose=False
309+
):
285310
"""Poll servers to get the total committed offsets, and remaining lag
286311
287312
Parameters
@@ -294,6 +319,9 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
294319
Time in second to wait before polling. Default is 1 second.
295320
timeout: int, optional
296321
Timeout in second when polling the servers. Default is 10.
322+
hide_empty_partition: bool, optional
323+
If True, display only non-empty partitions.
324+
Default is True
297325
verbose: bool, optional
298326
If True, prints useful table. Default is False.
299327
@@ -357,18 +385,117 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
357385
total_lag = total_lag + int(lag)
358386

359387
if verbose:
360-
print(
361-
"%-50s %9s %9s"
362-
% ("{} [{}]".format(partition.topic, partition.partition), offset, lag)
363-
)
388+
if (hide_empty_partition and offset != "-") or (not hide_empty_partition):
389+
print(
390+
"%-50s %9s %9s"
391+
% (
392+
"{} [{}]".format(partition.topic, partition.partition),
393+
offset,
394+
lag,
395+
)
396+
)
364397
if verbose:
365398
print("-" * 72)
366-
print("%-50s %9s %9s" % ("Total", total_offsets, total_lag))
399+
print(
400+
"%-50s %9s %9s" % ("Total for {}".format(topic), total_offsets, total_lag)
401+
)
367402
print("-" * 72)
368403

369404
return total_offsets, total_lag
370405

371406

407+
def return_last_offsets(kafka_config, topic):
408+
"""Return the last offsets
409+
410+
Parameters
411+
----------
412+
kafka_config: dict
413+
Kafka consumer config
414+
topic: str
415+
Topic name
416+
417+
Returns
418+
-------
419+
offsets: list
420+
Last offsets of each partition
421+
"""
422+
consumer = confluent_kafka.Consumer(kafka_config)
423+
topics = ["{}".format(topic)]
424+
consumer.subscribe(topics)
425+
426+
metadata = consumer.list_topics(topic)
427+
if metadata.topics[topic].error is not None:
428+
raise confluent_kafka.KafkaException(metadata.topics[topic].error)
429+
430+
# List of partitions
431+
partitions = [
432+
confluent_kafka.TopicPartition(topic, p)
433+
for p in metadata.topics[topic].partitions
434+
]
435+
committed = consumer.committed(partitions)
436+
offsets = []
437+
for partition in committed:
438+
if partition.offset != confluent_kafka.OFFSET_INVALID:
439+
offsets.append(partition.offset)
440+
else:
441+
offsets.append(0)
442+
443+
consumer.close()
444+
return offsets
445+
446+
447+
def print_offsets(
448+
kafka_config, topic, maxtimeout=10, hide_empty_partition=True, verbose=True
449+
):
450+
"""Wrapper around `consumer.return_offsets`
451+
452+
If the server is rebalancing the offsets, it will exit the program.
453+
454+
Parameters
455+
----------
456+
kafka_config: dic
457+
Dictionary with consumer parameters
458+
topic: str
459+
Topic name
460+
maxtimeout: int, optional
461+
Timeout in second, when polling the servers
462+
hide_empty_partition: bool, optional
463+
If True, display only non-empty partitions.
464+
Default is True
465+
verbose: bool, optional
466+
If True, prints useful table. Default is True.
467+
468+
Returns
469+
-------
470+
total_offsets: int
471+
Total number of messages committed across all partitions
472+
total_lag: int
473+
Remaining messages in the topic across all partitions.
474+
"""
475+
consumer = confluent_kafka.Consumer(kafka_config)
476+
477+
topics = ["{}".format(topic)]
478+
consumer.subscribe(topics)
479+
total_offset, total_lag = return_offsets(
480+
consumer,
481+
topic,
482+
timeout=maxtimeout,
483+
waitfor=0,
484+
verbose=verbose,
485+
hide_empty_partition=hide_empty_partition,
486+
)
487+
if (total_offset, total_lag) == (-1, -1):
488+
print(
489+
"Warning: Consumer group '{}' is rebalancing. Please wait.".format(
490+
kafka_config["group.id"]
491+
)
492+
)
493+
sys.exit()
494+
consumer.close()
495+
496+
return total_lag, total_offset
497+
498+
372499
def _get_kafka_config(config: dict) -> dict:
373500
"""Returns configurations for a consumer instance
374501
@@ -392,7 +519,7 @@ def _get_kafka_config(config: dict) -> dict:
392519
kafka_config["sasl.username"] = config["username"]
393520
kafka_config["sasl.password"] = config["password"]
394521

395-
kafka_config["group.id"] = config["group_id"]
522+
kafka_config["group.id"] = config["group.id"]
396523

397524
kafka_config.update(default_config)
398525

@@ -405,3 +532,103 @@ def _get_kafka_config(config: dict) -> dict:
405532
kafka_config["bootstrap.servers"] = "{}".format(",".join(fink_servers))
406533

407534
return kafka_config
535+
536+
537+
def return_npartitions(topic, kafka_config):
538+
"""Get the number of partitions
539+
540+
Parameters
541+
----------
542+
kafka_config: dic
543+
Dictionary with consumer parameters
544+
topic: str
545+
Topic name
546+
547+
Returns
548+
-------
549+
nbpartitions: int
550+
Number of partitions in the topic
551+
552+
"""
553+
consumer = confluent_kafka.Consumer(kafka_config)
554+
555+
# Details to get
556+
nbpartitions = 0
557+
try:
558+
# Topic metadata
559+
metadata = consumer.list_topics(topic=topic)
560+
561+
if metadata.topics and topic in metadata.topics:
562+
partitions = metadata.topics[topic].partitions
563+
nbpartitions = len(partitions)
564+
else:
565+
print("The topic {} does not exist".format(topic))
566+
567+
except confluent_kafka.KafkaException as e:
568+
print(f"Error while getting the number of partitions: {e}")
569+
570+
consumer.close()
571+
572+
return nbpartitions
573+
574+
575+
def return_partition_offset(consumer, topic, partition):
576+
"""Return the offset and the remaining lag of a partition
577+
578+
consumer: confluent_kafka.Consumer
579+
Kafka consumer
580+
topic: str
581+
Topic name
582+
partition: int
583+
The partition number
584+
585+
Returns
586+
-------
587+
offset : int
588+
Total number of offsets in the topic
589+
"""
590+
topicPartition = confluent_kafka.TopicPartition(topic, partition)
591+
low_offset, high_offset = consumer.get_watermark_offsets(topicPartition)
592+
partition_size = high_offset - low_offset
593+
594+
return partition_size
595+
596+
597+
def get_schema_from_stream(kafka_config, topic, maxtimeout):
598+
"""Poll the schema data from the schema topic
599+
600+
Parameters
601+
----------
602+
kafka_config: dic
603+
Dictionary with consumer parameters
604+
topic: str
605+
Topic name
606+
timeout: int, optional
607+
Timeout in second, when polling the servers
608+
609+
Returns
610+
-------
611+
schema: None or dic
612+
Schema data. None if the poll was not successful.
613+
Reasons to get None:
614+
1. timeout has been reached (increase timeout)
615+
2. topic is empty (produce new data)
616+
3. topic does not exist (create the topic)
617+
"""
618+
# Instantiate a consumer
619+
consumer_schema = confluent_kafka.Consumer(kafka_config)
620+
621+
# Subscribe to schema topic
622+
topics = ["{}_schema".format(topic)]
623+
consumer_schema.subscribe(topics)
624+
625+
# Poll
626+
msg = consumer_schema.poll(maxtimeout)
627+
if msg is not None:
628+
schema = fastavro.schema.parse_schema(json.loads(msg.key()))
629+
else:
630+
schema = None
631+
632+
consumer_schema.close()
633+
634+
return schema

0 commit comments

Comments
 (0)