-
Notifications
You must be signed in to change notification settings - Fork 7
add service registry events allowing to listen on service references #237
base: develop
Are you sure you want to change the base?
Conversation
…ecube/scalecube-services.git into support_service_registry_events
@@ -22,6 +29,11 @@ | |||
private final Map<String, ServiceEndpoint> serviceEndpoints = new NonBlockingHashMap<>(); | |||
private final Map<String, List<ServiceReference>> referencesByQualifier = new NonBlockingHashMap<>(); | |||
|
|||
private final FluxProcessor<RegistryEvent, RegistryEvent> events = | |||
DirectProcessor.<RegistryEvent>create().serialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better .serialize()
need to move to sink like:
FluxSink<RegistryEvent> sink = events.serialize().sink();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason you think its better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can emit some events from different threads therefor we need that our sink will be with serialize()
. And in another hand, to subscribe to events we don't need use redundant .serialize()
and it's it is unnecessary for FluxProcessor<RegistryEvent, RegistryEvent> events
referencesByQualifier.values().forEach(list -> list.removeIf(sr -> sr.endpointId().equals(endpointId))); | ||
referencesByQualifier.values() | ||
.forEach(list -> { | ||
list.stream().filter(sr -> sr.endpointId().equals(endpointId)).collect(Collectors.toSet()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove redundant creating a set .collect(Collectors.toSet())
What's the practical purpose? Uses cases? |
Motivation: the idea is to be subscribed to specific service reference and not to service endpoint from discovery. in such case i can subscribe in discovery and start filtering the relevant reference. by listening on service registry i can subscribe to references and filter / handle service reference discovery. |
Looks like this doubles functionality that we already have. |
its not same api where from discovery you get service endpoints and not service references |
@ronenhamias
What are benefits from knowledge that some particular service reference is added or removed? |
@dmytro-lazebnyi you need to manage the lifecycle of these services are beeing added removed: lets imagine i want to subscribe to all quote services in the cluster as they appear for example becouse new instrument are elastically appearing in the cluster and i want to make sure i subscribe only once to specific reference. |
add service registry events allowing to listen on service references changed
Motivation:
the idea is to be subscribed to specific service reference and not to service endpoint from discovery.
for example in service layer i want to know when a service is available to subscribe to it.
in such case i can subscribe in discovery and start filtering the relevant reference.
but in routing context i dont usually have access to discovery.
by listening on service registry i can subscribe to references and filter / handle service reference discovery.