-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathapp.py
104 lines (80 loc) · 3.23 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import json
import logging
import uuid
import restate
from restate.exceptions import TerminalError
import stripe_utils
from stripe_utils import (
PaymentRequest,
verify_payment_request,
create_payment_intent,
RESTATE_CALLBACK_ID,
is_payment_intent,
parse_webhook_call,
)
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s",
)
logger = logging.getLogger(__name__)
payment_service = restate.Service("payments")
@payment_service.handler("processPayment")
async def process_payment(ctx: restate.Context, req: PaymentRequest):
verify_payment_request(req)
# Generate a deterministic idempotency key
idempotency_key = await ctx.run("idempotency key", lambda: str(uuid.uuid4()))
# Initiate a listener for external calls for potential webhook callbacks
intent_webhook_id, intent_promise = ctx.awakeable()
# Make a synchronous call to the payment service
async def payment_intent() -> dict:
return await create_payment_intent(
{
"payment_method_id": req.payment_method_id,
"amount": req.amount,
"idempotency_key": idempotency_key,
"intent_webhook_id": intent_webhook_id,
"delayed_status": req.delayed_status,
}
)
payment_intent = await ctx.run("stripe call", payment_intent)
if payment_intent["status"] != "processing":
# The call to Stripe completed immediately / synchronously: processing done
logger.info("Request %s was processed synchronously!", idempotency_key)
stripe_utils.ensure_success(payment_intent["status"])
return
# We did not get the response on the synchronous path, talking to Stripe.
# No worries, Stripe will let us know when it is done processing via a webhook.
logger.info(
f"Payment intent for {idempotency_key} still processing, awaiting webhook call..."
)
# We will now wait for the webhook call to complete this promise.
# Check out the handler below.
processed_payment_intent = await intent_promise
logger.info(f"Webhook call for {idempotency_key} received!")
stripe_utils.ensure_success(processed_payment_intent["status"])
@payment_service.handler("processWebhook")
async def process_webhook(ctx: restate.Context):
req = ctx.request()
sig = req.headers.get("stripe-signature")
event = parse_webhook_call(req.body, sig)
if not is_payment_intent(event):
logger.info(f"Unhandled event type {event['type']}")
return {"received": True}
payment_intent = event["data"]["object"]
logger.info(
"Received webhook call for payment intent %s", json.dumps(payment_intent)
)
webhook_promise = payment_intent["metadata"].get(RESTATE_CALLBACK_ID)
if not webhook_promise:
raise TerminalError(
f"Missing callback property: {RESTATE_CALLBACK_ID}", status_code=404
)
ctx.resolve_awakeable(webhook_promise, payment_intent)
return {"received": True}
app = restate.app([payment_service])
if __name__ == "__main__":
import hypercorn
import asyncio
conf = hypercorn.Config()
conf.bind = ["0.0.0.0:9080"]
asyncio.run(hypercorn.asyncio.serve(app, conf))