-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathpubsub_fn.py
192 lines (154 loc) · 5.44 KB
/
pubsub_fn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# Copyright 2022 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functions to handle events from Google Cloud Pub/Sub.
"""
# pylint: disable=protected-access
import dataclasses as _dataclasses
import datetime as _dt
import functools as _functools
import typing as _typing
import json as _json
import base64 as _base64
import cloudevents.http as _ce
import firebase_functions.private.util as _util
from firebase_functions.core import CloudEvent, T, _with_init
from firebase_functions.options import PubSubOptions
@_dataclasses.dataclass(frozen=True)
class Message(_typing.Generic[T]):
"""
Interface representing a Google Cloud Pub/Sub message.
"""
message_id: str
"""
Autogenerated ID that uniquely identifies this message.
"""
publish_time: str
"""
Time the message was published.
"""
attributes: dict[str, str]
"""
User-defined attributes published with the message, if any.
"""
data: str
"""
The data payload of this message object as a base64-encoded string.
"""
ordering_key: str
"""
User-defined key used to ensure ordering amongst messages with the same key.
"""
@property
def json(self) -> T | None:
try:
if self.data is not None:
return _json.loads(_base64.b64decode(self.data).decode("utf-8"))
else:
return None
except Exception as error:
raise ValueError(
f"Unable to parse Pub/Sub message data as JSON: {error}"
) from error
@_dataclasses.dataclass(frozen=True)
class MessagePublishedData(_typing.Generic[T]):
"""
The interface published in a Pub/Sub publish subscription.
'T' Type representing `Message.data`'s JSON format.
"""
message: Message[T]
"""
Google Cloud Pub/Sub message.
"""
subscription: str
"""
A subscription resource.
"""
_E1 = CloudEvent[MessagePublishedData[T]]
_C1 = _typing.Callable[[_E1], None]
def _message_handler(
func: _C1,
raw: _ce.CloudEvent,
) -> None:
event_attributes = raw._get_attributes()
event_data: _typing.Any = raw.get_data()
event_dict = {"data": event_data, **event_attributes}
data = event_dict["data"]
message_dict = data["message"]
time = _dt.datetime.strptime(
event_dict["time"],
"%Y-%m-%dT%H:%M:%S.%f%z",
)
publish_time = _dt.datetime.strptime(
message_dict["publish_time"],
"%Y-%m-%dT%H:%M:%S.%f%z",
)
# Convert the UTC string into a datetime object
event_dict["time"] = time
message_dict["publish_time"] = publish_time
# Pop unnecessary keys from the message data
# (we get these keys from the snake case alternatives that are provided)
message_dict.pop("messageId", None)
message_dict.pop("publishTime", None)
# `orderingKey` doesn't come with a snake case alternative,
# there is no `ordering_key` in the raw request.
ordering_key = message_dict.pop("orderingKey", None)
# Include empty attributes property if missing
message_dict["attributes"] = message_dict.get("attributes", {})
message: MessagePublishedData = MessagePublishedData(
message=Message(
**message_dict,
ordering_key=ordering_key,
),
subscription=data["subscription"],
)
event_dict["data"] = message
event: CloudEvent[MessagePublishedData] = CloudEvent(
data=event_dict["data"],
id=event_dict["id"],
source=event_dict["source"],
specversion=event_dict["specversion"],
subject=event_dict["subject"] if "subject" in event_dict else None,
time=event_dict["time"],
type=event_dict["type"],
)
_with_init(func)(event)
@_util.copy_func_kwargs(PubSubOptions)
def on_message_published(**kwargs) -> _typing.Callable[[_C1], _C1]:
"""
Event handler that triggers on a message being published to a Pub/Sub topic.
Example:
.. code-block:: python
@on_message_published(topic="hello-world")
def example(event: CloudEvent[MessagePublishedData[object]]) -> None:
pass
:param \\*\\*kwargs: Pub/Sub options.
:type \\*\\*kwargs: as :exc:`firebase_functions.options.PubSubOptions`
:rtype: :exc:`typing.Callable`
\\[ \\[ :exc:`firebase_functions.core.CloudEvent` \\[
:exc:`firebase_functions.pubsub_fn.MessagePublishedData` \\[
:exc:`typing.Any` \\] \\] \\], `None` \\]
A function that takes a CloudEvent and returns ``None``.
"""
options = PubSubOptions(**kwargs)
def on_message_published_inner_decorator(func: _C1):
@_functools.wraps(func)
def on_message_published_wrapped(raw: _ce.CloudEvent):
return _message_handler(func, raw)
_util.set_func_endpoint_attr(
on_message_published_wrapped,
options._endpoint(func_name=func.__name__),
)
return on_message_published_wrapped
return on_message_published_inner_decorator