-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
124 lines (102 loc) · 4.25 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json
import os
import time
import traceback
# We don't use flask ourself, but it's used as the runtime by GCP and thus
# usable by us
from flask import abort # pylint: disable=import-error
from google.cloud import storage
from honeyflare import (
create_libhoney_client,
process_bucket_object,
RetriableError,
logfmt,
vault,
)
# Ignoring invalid names here due to all the globals we cache (which aren't necessarily
# constants)
# pylint: disable=invalid-name
storage_client = storage.Client()
# Check for required envvars to fail early on invalid deployments
honeycomb_dataset = os.environ.get("HONEYCOMB_DATASET")
if honeycomb_dataset is None:
raise ValueError("Missing environment variable HONEYCOMB_DATASET")
honeycomb_meta_dataset = os.environ.get("HONEYCOMB_META_DATASET")
if honeycomb_meta_dataset is None:
raise ValueError("Missing environment variable HONEYCOMB_META_DATASET")
honeycomb_key = os.environ.get("HONEYCOMB_KEY")
if honeycomb_key is None:
raise ValueError("Missing environment variable HONEYCOMB_KEY")
patterns = os.environ.get("PATTERNS")
if patterns is not None:
patterns = json.loads(patterns)
honeycomb_api = os.environ.get("HONEYCOMB_API", "https://api.honeycomb.io")
query_param_filter = os.environ.get("QUERY_PARAM_FILTER")
if query_param_filter is not None:
query_param_filter = set(json.loads(query_param_filter))
lock_bucket = os.environ.get("LOCK_BUCKET")
if lock_bucket is not None:
lock_bucket = storage_client.bucket(lock_bucket)
# Convert string keys (the only kind permitted by json) to ints
sampling_rate_by_status = {
int(key): val
for key, val in json.loads(os.environ.get("SAMPLING_RATES", "{}")).items()
}
def main(event, context):
"""
Triggered by a change to a Cloud Storage bucket.
:param event: Event payload (dict).
:param context: Metadata for the event (google.cloud.functions.Context)
"""
global honeycomb_key, lock_bucket
if honeycomb_key.startswith("vault://"):
honeycomb_key = vault.get_vault_secret(honeycomb_key)
meta_client = create_libhoney_client(
honeycomb_key, honeycomb_meta_dataset, honeycomb_api
)
meta_event = meta_client.new_event()
instrument_invocation(meta_event, event, context)
start_time = time.time()
try:
bucket = storage_client.bucket(event["bucket"])
events_handled = process_bucket_object(
bucket,
event["name"],
honeycomb_dataset,
honeycomb_key,
honeycomb_api,
patterns=patterns,
query_param_filter=query_param_filter,
lock_bucket=lock_bucket,
sampling_rate_by_status=sampling_rate_by_status,
)
meta_event.add_field("events", events_handled)
meta_event.add_field("success", True)
except RetriableError as err:
# Hard exit to make sure this is retried
meta_event.add_field("success", False)
meta_event.add_field("retriable", True)
meta_event.add_field("error", err.__class__.__name__)
meta_event.add_field("error_message", str(err))
# To prevent the stacktrace from being logged on retries, abort instead of re-raising
abort(500)
except Exception as err: # pylint: disable=broad-except
# Swallow these but make sure they are logged and reported so that we can fix them
traceback.print_exc()
meta_event.add_field("success", False)
meta_event.add_field("error", err.__class__.__name__)
meta_event.add_field("error_message", str(err))
finally:
meta_event.add_field("processing_time_seconds", time.time() - start_time)
print(logfmt.format(meta_event.fields()))
meta_event.send()
meta_client.close()
def instrument_invocation(libhoney_event, event, context):
for event_key in ("name", "bucket", "contentType", "timeCreated", "size"):
libhoney_event.add_field("event.%s" % event_key, event[event_key])
owner = event.get("owner")
if owner:
libhoney_event.add_field("event.owner", owner.get("entityId"))
for context_property in ("event_id", "timestamp", "event_type", "resource"):
value = getattr(context, context_property, None)
libhoney_event.add_field("context.%s" % context_property, value)