This is a Python implementation of the ShapeShifter UFTP protocol.
This library implements the full UFTP protocol that you can use for Shapeshifter communications. It implements all three roles: Distribution System Operator (DSO), Aggregator (AGR) and Common Reference Operator (CRO) in both directions (client and service).
Features of this package:
- Building, parsing and validation of the XML messages
- Signing and verifying of the XML messages using signatures
- DNS for service discovery and key retrieval
- Convenient clients for each role-pair
- Convenient services for each role
- JSON-serializable dataclasses for easy transport to other systems
- Fully internal queing system for full-duplex communication with minimal user code required
- Compatible with version 3.0.0 and 3.1.0 of the Shapeshifter protocol.
pip install shapeshifter-uftpIf you want to develop shapeshifter-uftp, you can fork or clone this repository and run the tests:
$ pip install .
$ pip install .[dev]
$ pytest .Shapehifter always requires the use of a Client and a Service, because all responses are asynchronous.
You choose the server class based on your role in the Shapeshifter conversation. If you are an Aggregator (also known as a CSP), you can use this setup:
from datetime import datetime, timedelta, timezone
from shapeshifter_uftp import ShapeshifterAgrService
from shapeshifter_uftp.uftp import (FlexOffer, FlexOfferOption,
FlexOfferOptionISP, FlexRequest,
FlexRequestResponse, FlexOrder, FlexOrderResponse,
AcceptedRejected)
from xsdata.models.datatype import XmlDate
class DemoAggregator(ShapeshifterAgrService):
"""
Aggregator service that implements callbacks for
each of the messages that can be received.
"""
def process_agr_portfolio_query_response(self, message):
print(f"Received a message: {message}")
def process_agr_portfolio_update_response(self, message):
print(f"Received a message: {message}")
def process_d_prognosis_response(self, message):
print(f"Received a message: {message}")
def process_flex_request(self, message: FlexRequest):
print(f"Received a message: {message}")
# Example of how to send a new message after
# processing an incoming message.
dso_client = self.dso_client(message.sender_domain)
# Send the FlexRequestResponse
dso_client.send_flex_request_response(
FlexRequestResponse(
flex_request_message_id=message.message_id,
conversation_id=message.conversation_id,
result=AcceptedRejected.ACCEPTED
)
)
# Send the FlexOffer
dso_client.send_flex_offer(
FlexOffer(
flex_request_message_id=message.message_id,
conversation_id=message.conversation_id,
isp_duration="PT15M",
period=XmlDate(2023, 1, 1),
congestion_point="ean.123456789012",
expiration_date_time=datetime.now(timezone.utc).isoformat(),
offer_options=[
FlexOfferOption(
isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
option_reference="MyOption",
price=2.30,
min_activation_factor=0.5,
)
],
)
)
def process_flex_offer_response(self, message: FlexOffer):
print(f"Received a message: {message}")
def process_flex_offer_revocation_response(self, message):
print(f"Received a message: {message}")
def process_flex_order(self, message: FlexOrder):
print(f"Received a message: {message}")
dso_client = self.dso_client(message.sender_domain)
dso_client.send_flex_order_response(
FlexOrderResponse(
flex_order_message_id=message.message_id,
conversation_id=message.conversation_id,
result=AcceptedRejected.ACCEPTED
)
)
def process_flex_reservation_update(self, message):
print(f"Received a message: {message}")
def process_flex_settlement(self, message):
print(f"Received a message: {message}")
def process_metering_response(self, message):
print(f"Received a message: {message}")
def key_lookup(sender_domain, sender_role):
"""
Lookup function for public keys, so that incoming
messages can be verified.
"""
known_senders = {
("dso.demo", "DSO"): "NsTbq/iABU6tbsjriBg/Z5dSfQstulD0GpMI2fLDWec=",
("cro.demo", "CRO"): "ySUYU87usErRFKGJafwvVDLGhnBVJCCNYfQvmwv8ObM=",
}
return known_senders.get((sender_domain, sender_role))
def endpoint_lookup(sender_domain, sender_role):
"""
Lookup function for endpoints, so that the service
knowns where to send responses to.
"""
known_senders = {
("dso.demo", "DSO"): "http://localhost:8081/shapeshifter/api/v3/message",
("cro.demo", "CRO"): "http://localhost:8082/shapeshifter/api/v3/message",
}
return known_senders.get((sender_domain, sender_role))
aggregator = DemoAggregator(
sender_domain="aggregator.demo",
signing_key="mz5XYCNKxpx48K+9oipUhsjBZed3L7rTVKLsWmG1HOqRLIeuGpIa1KAt6AlbVGqJvewd8v1J0uVUTqpGt7F8tw==",
key_lookup_function=key_lookup,
endpoint_lookup_function=endpoint_lookup,
port=8080,
)
# Start the Aggregator Service
aggregator.run_in_thread()
# Create a client object to talk to a DSO
dso_client = aggregator.dso_client("dso.demo")
# Create a Flex Offer Message
flex_offer_message = FlexOffer(
isp_duration="PT15M",
period=XmlDate(2023, 1, 1),
congestion_point="ean.123456789012",
expiration_date_time=datetime.now(timezone.utc).isoformat(),
flex_request_message_id=str(uuid4())
offer_options=[
FlexOfferOption(
isps=[FlexOfferOptionISP(power=1, start=1, duration=1)],
option_reference="MyOption",
price=2.30,
min_activation_factor=0.5,
)
],
)
# As a demo, press enter to send another FlexOffer message to the DSO.
while True:
try:
input("Press return to send a FlexOffer message to the DSO")
response = dso_client.send_flex_offer(flex_offer_message)
print(f"Response was: {response}")
except:
aggregator.stop()
breakTo use OAuth in outgoing requests, you can use the provided OAuthClient class. To use it in a bare Shapeshifter client:
from shapeshifter_uftp import ShapeshifterAgrDsoClient, OAuthClient
oauth_client = OAuthClient(
url="https://oauth.provider.url",
client_id="my-client-id",
client_secret="my-client-secret"
)
client = ShapeshifterAgrDsoClient(
sender_domain="my.aggregator.domain",
signing_key="abcdef",
recipient_domain="some.dso",
recipient_endpoint="https://some.dso.endpoint/shapeshifter/api/v3/message",
recipient_signing_key="123456",
oauth_client=oauth_client,
)
# If you use any of the sending methods, the oauth client will
# make sure you're authenticated.
client.send_flex_request_response(...)Similarly, if you have a Service instance that dynamically needs to retrieve the OAuth information for each different recipient server, you can provide an oauth_lookup_function that takes a (sender_domain, sender_role) and returns an instance of OAuthClient.