Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion flagsmith/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import webhooks
from .flagsmith import Flagsmith

__all__ = ("Flagsmith",)
__all__ = ("Flagsmith", "webhooks")
41 changes: 41 additions & 0 deletions flagsmith/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import hashlib
import hmac
from typing import Union


def generate_signature(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see any reason that this function should be part of the public interface for this module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could help for generating fake requests in case you want to test webhook listeners without depending on Flagsmith, or if for whatever reason you need to invoke a webhook manually. It could also help troubleshooting if verify_signature fails and you don't understand why (wrong secret, signature or payload).

It's the same as how most JWT libraries have methods for generating and verifying signatures, but the vast majority of users will only be verifying signatures.

In any case I don't feel too strongly about this, I'm happy to remove generate_signature from the public interface and add it later if we need to. Let me know what you prefer!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, yep, ok, happy to leave it in.

request_body: Union[str, bytes],
shared_secret: str,
) -> str:
"""Generates a signature for a webhook request body using HMAC-SHA256.

:param request_body: The raw request body, as string or bytes.
:param shared_secret: The shared secret configured for this specific webhook.
:return: The hex-encoded signature.
"""
if isinstance(request_body, str):
request_body = request_body.encode()

shared_secret_bytes = shared_secret.encode()

return hmac.new(
key=shared_secret_bytes,
msg=request_body,
digestmod=hashlib.sha256,
).hexdigest()


def verify_signature(
request_body: Union[str, bytes],
received_signature: str,
shared_secret: str,
) -> bool:
"""Verifies a webhook's signature to determine if the request was sent by Flagsmith.

:param request_body: The raw request body, as string or bytes.
:param received_signature: The signature as received in the X-Flagsmith-Signature request header.
:param shared_secret: The shared secret configured for this specific webhook.
:return: True if the signature is valid, False otherwise.
"""
expected_signature = generate_signature(request_body, shared_secret)
return hmac.compare_digest(expected_signature, received_signature)
50 changes: 50 additions & 0 deletions tests/test_webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json

from flagsmith.webhooks import generate_signature, verify_signature


def test_generate_signature() -> None:
# Given
request_body = json.dumps({"data": {"foo": 123}})
shared_secret = "shh"

# When
signature = generate_signature(request_body, shared_secret)

# Then
assert isinstance(signature, str)
assert len(signature) == 64 # SHA-256 hex digest is 64 characters


def test_verify_signature_valid() -> None:
# Given
request_body = json.dumps({"data": {"foo": 123}})
shared_secret = "shh"

# When
signature = generate_signature(request_body, shared_secret)

# Then
assert verify_signature(
request_body=request_body,
received_signature=signature,
shared_secret=shared_secret,
)
# Test with bytes instead of str
assert verify_signature(
request_body=request_body.encode(),
received_signature=signature,
shared_secret=shared_secret,
)


def test_verify_signature_invalid() -> None:
# Given
request_body = json.dumps({"event": "flag_updated", "data": {"id": 123}})

# Then
assert not verify_signature(
request_body=request_body,
received_signature="bad",
shared_secret="?",
)