Skip to content

Commit 28acabe

Browse files
committedDec 23, 2021
Made kafka generic for output, now it can be in format of kafka/127.0.0.1:9092
1 parent f593053 commit 28acabe

File tree

3 files changed

+16
-10
lines changed

3 files changed

+16
-10
lines changed
 

‎blockchainetl/jobs/exporters/kafka_expoerter.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,18 @@
99

1010
class KafkaItemExporter:
1111

12-
def __init__(self, connection_url, item_type_to_topic_mapping, converters=()):
13-
self.connection_url = connection_url
12+
def __init__(self, output, item_type_to_topic_mapping, converters=()):
1413
self.item_type_to_topic_mapping = item_type_to_topic_mapping
1514
self.converter = CompositeItemConverter(converters)
16-
self.producer = KafkaProducer(bootstrap_servers=connection_url)
15+
self.connection_url = self.get_connection_url(output)
16+
print(self.connection_url)
17+
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
18+
19+
def get_connection_url(self, output):
20+
try:
21+
return output.split('/')[1]
22+
except KeyError:
23+
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
1724

1825
def open(self):
1926
pass
@@ -26,6 +33,7 @@ def export_item(self, item):
2633
item_type = item.get('type')
2734
if item_type is not None and item_type in self.item_type_to_topic_mapping:
2835
data = json.dumps(item).encode('utf-8')
36+
print(data)
2937
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
3038
else:
3139
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

‎ethereumetl/cli/stream.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
@click.option('-o', '--output', type=str,
4141
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
4242
'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum. '
43-
'or kafka, in case of kafka, specify connection_url '
43+
'or kafka, output name and connection ip:port e.g. kafka/127.0.0.1:9092 '
4444
'If not specified will print to console')
4545
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
4646
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
@@ -51,10 +51,8 @@
5151
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
5252
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
5353
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
54-
@click.option('--connection-url', default=None, show_default=True, type=str, help='Connection url for file, required for kafka')
5554
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
56-
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None,
57-
connection_url=None):
55+
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
5856
"""Streams all data types to console or Google Pub/Sub."""
5957
configure_logging(log_file)
6058
configure_signals()
@@ -71,7 +69,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
7169

7270
streamer_adapter = EthStreamerAdapter(
7371
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
74-
item_exporter=create_item_exporter(output, connection_url),
72+
item_exporter=create_item_exporter(output),
7573
batch_size=batch_size,
7674
max_workers=max_workers,
7775
entity_types=entity_types

‎ethereumetl/streaming/item_exporter_creator.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def create_item_exporters(outputs):
3131
return MultiItemExporter(item_exporters)
3232

3333

34-
def create_item_exporter(output, connection_url):
34+
def create_item_exporter(output):
3535
item_exporter_type = determine_item_exporter_type(output)
3636
if item_exporter_type == ItemExporterType.PUBSUB:
3737
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
@@ -77,7 +77,7 @@ def create_item_exporter(output, connection_url):
7777
item_exporter = ConsoleItemExporter()
7878
elif item_exporter_type == ItemExporterType.KAFKA:
7979
from blockchainetl.jobs.exporters.kafka_expoerter import KafkaItemExporter
80-
item_exporter = KafkaItemExporter(connection_url, item_type_to_topic_mapping={
80+
item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={
8181
'block': 'blocks',
8282
'transaction': 'transactions',
8383
'log': 'logs',

0 commit comments

Comments
 (0)