Skip to content

Commit

Permalink
[Java] Check if a Subscription is already linked to an IpcPublication…
Browse files Browse the repository at this point in the history
… before linking it.
  • Loading branch information
mjpt777 committed Mar 1, 2017
1 parent 65c1484 commit fddb19d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
37 changes: 20 additions & 17 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,26 +868,29 @@ private static ReadablePosition[] positionArray(final List<SubscriberPosition> s

private void linkIpcSubscription(final IpcSubscriptionLink subscription, final IpcPublication publication)
{
final long joiningPosition = publication.joiningPosition();
final long registrationId = subscription.registrationId();
final int sessionId = publication.sessionId();
final int streamId = subscription.streamId();
final String channel = subscription.uri();
if (null == subscription.publication())
{
final long joiningPosition = publication.joiningPosition();
final long registrationId = subscription.registrationId();
final int sessionId = publication.sessionId();
final int streamId = subscription.streamId();
final String channel = subscription.uri();

final Position position = SubscriberPos.allocate(
countersManager, registrationId, sessionId, streamId, channel, joiningPosition);
final Position position = SubscriberPos.allocate(
countersManager, registrationId, sessionId, streamId, channel, joiningPosition);

position.setOrdered(joiningPosition);
subscription.link(publication, position);
publication.addSubscription(position);
position.setOrdered(joiningPosition);
subscription.link(publication, position);
publication.addSubscription(position);

clientProxy.onAvailableImage(
publication.correlationId(),
streamId,
sessionId,
publication.rawLog().fileName(),
Collections.singletonList(new SubscriberPosition(subscription, position)),
channel);
clientProxy.onAvailableImage(
publication.correlationId(),
streamId,
sessionId,
publication.rawLog().fileName(),
Collections.singletonList(new SubscriberPosition(subscription, position)),
channel);
}
}

private void linkSpy(final NetworkPublication publication, final SubscriptionLink subscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class IpcSubscriptionLink extends SubscriptionLink
super(registrationId, streamId, channelUri, aeronClient, clientLivenessTimeoutNs);
}

public IpcPublication publication()
{
return publication;
}

public void link(final Object source, final ReadablePosition position)
{
this.publication = (IpcPublication)source;
Expand Down

0 comments on commit fddb19d

Please sign in to comment.