Skip to content

Commit cda0e9e

Browse files
authored
Move create topic from AzureServiceBusTopicCreateOperator to AdminClientHook (#45297)
* Move create topic to hook in Azure Service Bus provider * Fix indenting of documentation
1 parent 7bbda16 commit cda0e9e

File tree

4 files changed

+151
-35
lines changed

4 files changed

+151
-35
lines changed

providers/src/airflow/providers/microsoft/azure/hooks/asb.py

+88
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
ServiceBusSender,
2929
)
3030
from azure.servicebus.management import (
31+
AuthorizationRule,
3132
CorrelationRuleFilter,
3233
QueueProperties,
3334
ServiceBusAdministrationClient,
@@ -194,6 +195,93 @@ def delete_queue(self, queue_name: str) -> None:
194195
with self.get_conn() as service_mgmt_conn:
195196
service_mgmt_conn.delete_queue(queue_name)
196197

198+
def create_topic(
199+
self,
200+
topic_name: str,
201+
azure_service_bus_conn_id: str = "azure_service_bus_default",
202+
default_message_time_to_live: datetime.timedelta | str | None = None,
203+
max_size_in_megabytes: int | None = None,
204+
requires_duplicate_detection: bool | None = None,
205+
duplicate_detection_history_time_window: datetime.timedelta | str | None = None,
206+
enable_batched_operations: bool | None = None,
207+
size_in_bytes: int | None = None,
208+
filtering_messages_before_publishing: bool | None = None,
209+
authorization_rules: list[AuthorizationRule] | None = None,
210+
support_ordering: bool | None = None,
211+
auto_delete_on_idle: datetime.timedelta | str | None = None,
212+
enable_partitioning: bool | None = None,
213+
enable_express: bool | None = None,
214+
user_metadata: str | None = None,
215+
max_message_size_in_kilobytes: int | None = None,
216+
) -> str:
217+
"""
218+
Create a topic by connecting to service Bus Admin client.
219+
220+
:param topic_name: Name of the topic.
221+
:param default_message_time_to_live: ISO 8601 default message time span to live value. This is
222+
the duration after which the message expires, starting from when the message is sent to Service
223+
Bus. This is the default value used when TimeToLive is not set on a message itself.
224+
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
225+
like "PT300S" is accepted.
226+
:param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
227+
memory allocated for the topic.
228+
:param requires_duplicate_detection: A value indicating if this topic requires duplicate
229+
detection.
230+
:param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
231+
duration of the duplicate detection history. The default value is 10 minutes.
232+
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
233+
like "PT300S" is accepted.
234+
:param enable_batched_operations: Value that indicates whether server-side batched operations
235+
are enabled.
236+
:param size_in_bytes: The size of the topic, in bytes.
237+
:param filtering_messages_before_publishing: Filter messages before publishing.
238+
:param authorization_rules: List of Authorization rules for resource.
239+
:param support_ordering: A value that indicates whether the topic supports ordering.
240+
:param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
241+
automatically deleted. The minimum duration is 5 minutes.
242+
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
243+
like "PT300S" is accepted.
244+
:param enable_partitioning: A value that indicates whether the topic is to be partitioned
245+
across multiple message brokers.
246+
:param enable_express: A value that indicates whether Express Entities are enabled. An express
247+
queue holds a message in memory temporarily before writing it to persistent storage.
248+
:param user_metadata: Metadata associated with the topic.
249+
:param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
250+
can be accepted by the queue. This feature is only available when using a Premium namespace
251+
and Service Bus API version "2021-05" or higher.
252+
The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
253+
"""
254+
if topic_name is None:
255+
raise TypeError("Topic name cannot be None.")
256+
257+
with self.get_conn() as service_mgmt_conn:
258+
try:
259+
topic_properties = service_mgmt_conn.get_topic(topic_name)
260+
except ResourceNotFoundError:
261+
topic_properties = None
262+
if topic_properties and topic_properties.name == topic_name:
263+
self.log.info("Topic name already exists")
264+
return topic_properties.name
265+
topic = service_mgmt_conn.create_topic(
266+
topic_name=topic_name,
267+
default_message_time_to_live=default_message_time_to_live,
268+
max_size_in_megabytes=max_size_in_megabytes,
269+
requires_duplicate_detection=requires_duplicate_detection,
270+
duplicate_detection_history_time_window=duplicate_detection_history_time_window,
271+
enable_batched_operations=enable_batched_operations,
272+
size_in_bytes=size_in_bytes,
273+
filtering_messages_before_publishing=filtering_messages_before_publishing,
274+
authorization_rules=authorization_rules,
275+
support_ordering=support_ordering,
276+
auto_delete_on_idle=auto_delete_on_idle,
277+
enable_partitioning=enable_partitioning,
278+
enable_express=enable_express,
279+
user_metadata=user_metadata,
280+
max_message_size_in_kilobytes=max_message_size_in_kilobytes,
281+
)
282+
self.log.info("Created Topic %s", topic.name)
283+
return topic.name
284+
197285
def create_subscription(
198286
self,
199287
topic_name: str,

providers/src/airflow/providers/microsoft/azure/operators/asb.py

+17-29
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
from collections.abc import Sequence
2020
from typing import TYPE_CHECKING, Any, Callable
2121

22-
from azure.core.exceptions import ResourceNotFoundError
23-
2422
from airflow.models import BaseOperator
2523
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
2624

@@ -313,33 +311,23 @@ def execute(self, context: Context) -> str:
313311
# Create the hook
314312
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
315313

316-
with hook.get_conn() as service_mgmt_conn:
317-
try:
318-
topic_properties = service_mgmt_conn.get_topic(self.topic_name)
319-
except ResourceNotFoundError:
320-
topic_properties = None
321-
if topic_properties and topic_properties.name == self.topic_name:
322-
self.log.info("Topic name already exists")
323-
return topic_properties.name
324-
topic = service_mgmt_conn.create_topic(
325-
topic_name=self.topic_name,
326-
default_message_time_to_live=self.default_message_time_to_live,
327-
max_size_in_megabytes=self.max_size_in_megabytes,
328-
requires_duplicate_detection=self.requires_duplicate_detection,
329-
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
330-
enable_batched_operations=self.enable_batched_operations,
331-
size_in_bytes=self.size_in_bytes,
332-
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
333-
authorization_rules=self.authorization_rules,
334-
support_ordering=self.support_ordering,
335-
auto_delete_on_idle=self.auto_delete_on_idle,
336-
enable_partitioning=self.enable_partitioning,
337-
enable_express=self.enable_express,
338-
user_metadata=self.user_metadata,
339-
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
340-
)
341-
self.log.info("Created Topic %s", topic.name)
342-
return topic.name
314+
return hook.create_topic(
315+
topic_name=self.topic_name,
316+
default_message_time_to_live=self.default_message_time_to_live,
317+
max_size_in_megabytes=self.max_size_in_megabytes,
318+
requires_duplicate_detection=self.requires_duplicate_detection,
319+
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
320+
enable_batched_operations=self.enable_batched_operations,
321+
size_in_bytes=self.size_in_bytes,
322+
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
323+
authorization_rules=self.authorization_rules,
324+
support_ordering=self.support_ordering,
325+
auto_delete_on_idle=self.auto_delete_on_idle,
326+
enable_partitioning=self.enable_partitioning,
327+
enable_express=self.enable_express,
328+
user_metadata=self.user_metadata,
329+
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
330+
)
343331

344332

345333
class AzureServiceBusSubscriptionCreateOperator(BaseOperator):

providers/tests/microsoft/azure/hooks/test_asb.py

+20
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,26 @@ def test_delete_queue_exception(self, mock_sb_admin_client):
119119
with pytest.raises(TypeError):
120120
hook.delete_queue(None)
121121

122+
# Test creating a topic using hook method `create_topic`
123+
@mock.patch("azure.servicebus.management.TopicProperties")
124+
@mock.patch(f"{MODULE}.AdminClientHook.get_conn")
125+
def test_create_topic(self, mock_sb_admin_client, mock_topic_properties):
126+
"""
127+
Test `create_topic` hook function with mocking connection, topic properties value and
128+
the azure service bus `create_topic` function
129+
"""
130+
topic_name = "test_topic_name"
131+
mock_topic_properties.name = topic_name
132+
mock_sb_admin_client.return_value.__enter__.return_value.create_topic.return_value = (
133+
mock_topic_properties
134+
)
135+
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
136+
with mock.patch.object(hook.log, "info") as mock_log_info:
137+
hook.create_topic(topic_name)
138+
assert mock_topic_properties.name == topic_name
139+
140+
mock_log_info.assert_called_with("Created Topic %s", topic_name)
141+
122142
# Test creating subscription with topic name and subscription name using hook method `create_subscription`
123143
@mock.patch("azure.servicebus.management.SubscriptionProperties")
124144
@mock.patch(f"{MODULE}.AdminClientHook.get_conn")

providers/tests/microsoft/azure/operators/test_asb.py

+26-6
Original file line numberDiff line numberDiff line change
@@ -255,19 +255,39 @@ def test_init(self):
255255
@mock.patch("azure.servicebus.management.TopicProperties")
256256
def test_create_topic(self, mock_topic_properties, mock_get_conn):
257257
"""
258-
Test AzureServiceBusTopicCreateOperator passed with the topic name
259-
mocking the connection details, hook create_topic function
258+
Test AzureServiceBusSubscriptionCreateOperator passed with the subscription name, topic name
259+
mocking the connection details, hook create_subscription function
260260
"""
261+
print("Wazzup doc")
261262
asb_create_topic = AzureServiceBusTopicCreateOperator(
262263
task_id="asb_create_topic",
263264
topic_name=TOPIC_NAME,
264265
)
265266
mock_topic_properties.name = TOPIC_NAME
266267
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = mock_topic_properties
267-
268-
with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
269-
asb_create_topic.execute(None)
270-
mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
268+
# create the topic
269+
created_topic_name = asb_create_topic.execute(None)
270+
# ensure the topic name is returned
271+
assert created_topic_name == TOPIC_NAME
272+
# ensure create_subscription is called with the correct arguments on the connection
273+
mock_get_conn.return_value.__enter__.return_value.create_topic.assert_called_once_with(
274+
topic_name=TOPIC_NAME,
275+
default_message_time_to_live=None,
276+
max_size_in_megabytes=None,
277+
requires_duplicate_detection=None,
278+
duplicate_detection_history_time_window=None,
279+
enable_batched_operations=None,
280+
size_in_bytes=None,
281+
filtering_messages_before_publishing=None,
282+
authorization_rules=None,
283+
support_ordering=None,
284+
auto_delete_on_idle=None,
285+
enable_partitioning=None,
286+
enable_express=None,
287+
user_metadata=None,
288+
max_message_size_in_kilobytes=None,
289+
)
290+
print("Later Gator")
271291

272292
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook")
273293
def test_create_subscription_exception(self, mock_sb_admin_client):

0 commit comments

Comments
 (0)