Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessageContext reference to the DataUnit #2341

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
java-version: 11.0.16+8
distribution: 'temurin'
- name: Cache Maven packages
uses: actions/cache@v2
uses: actions/cache@v4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change intentional?

Copy link
Contributor Author

@gayaldassanayake gayaldassanayake Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the checks of all pending PRs are failing due to using this deprecated v2 version. When bumped to V4 it works fine

with:
path: ~/.m2
key: ${{ runner.os }}-m2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static void addCallback(MessageContext messageContext, String callbackId)
dataUnit.setCallbackId(callbackId);
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(messageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(messageContext, null));
dataUnit.setMessageContext(messageContext);

CallbackSentEvent callbackSentEvent = new CallbackSentEvent(dataUnit);
addEventAndIncrementCallbackCount(messageContext, callbackSentEvent);
Expand Down Expand Up @@ -73,6 +74,7 @@ public static void callbackCompletionEvent(MessageContext oldMessageContext, Str
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(oldMessageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(oldMessageContext, null));
dataUnit.setIsOutOnlyFlow(StatisticDataCollectionHelper.isOutOnlyFlow(oldMessageContext));
dataUnit.setMessageContext(oldMessageContext);

CallbackCompletionEvent callbackCompletionEvent = new CallbackCompletionEvent(dataUnit);
addEventAndDecrementCallbackCount(oldMessageContext, callbackCompletionEvent);
Expand All @@ -99,6 +101,7 @@ public static void updateParentsForCallback(MessageContext oldMessageContext, St
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(oldMessageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(oldMessageContext, null));
dataUnit.setIsOutOnlyFlow(StatisticDataCollectionHelper.isOutOnlyFlow(oldMessageContext));
dataUnit.setMessageContext(oldMessageContext);

CallbackReceivedEvent callbackReceivedEvent = new CallbackReceivedEvent(dataUnit);
addEvent(oldMessageContext, callbackReceivedEvent);
Expand All @@ -125,6 +128,7 @@ public static void reportCallbackHandlingCompletion(MessageContext synapseOutMsg
dataUnit.setSynapseEnvironment(synapseOutMsgCtx.getEnvironment());
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(synapseOutMsgCtx));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(synapseOutMsgCtx, null));
dataUnit.setMessageContext(synapseOutMsgCtx);

CallbackHandledEvent callbackHandledEvent = new CallbackHandledEvent(dataUnit);
addEventAndDecrementCallbackCount(synapseOutMsgCtx, callbackHandledEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static void closeEntryEvent(MessageContext messageContext, String compone
}
StatisticDataCollectionHelper
.collectData(messageContext, isContentAltering, isCollectingTracing, statisticDataUnit);
statisticDataUnit.setMessageContext(messageContext);

StatisticsCloseEvent closeEvent = new StatisticsCloseEvent(statisticDataUnit);
if (currentIndex == null) {
Expand Down Expand Up @@ -139,6 +140,7 @@ public static void closeFlowForcefully(MessageContext messageContext, boolean er
dataUnit.setSynapseEnvironment(messageContext.getEnvironment());
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(messageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(messageContext, null));
dataUnit.setMessageContext(messageContext);

EndFlowEvent endFlowEvent = new EndFlowEvent(dataUnit);
if (!error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void reportFault(MessageContext messageContext) {
BasicStatisticDataUnit dataUnit = new BasicStatisticDataUnit();
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(messageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(messageContext, null));
dataUnit.setMessageContext(messageContext);

FaultEvent faultEvent = new FaultEvent(dataUnit);
faultEvent.getDataUnit().generateElasticMetadata(messageContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,15 @@ public class OpenEventCollector extends RuntimeStatisticCollector {
*/
public static Integer reportEntryEvent(MessageContext messageContext, String componentName,
AspectConfiguration aspectConfiguration, ComponentType componentType) {
boolean isCollectingStatistics = (aspectConfiguration != null && aspectConfiguration.isStatisticsEnable());

// Enable statistics, if user enabled for all artifacts
if (!isCollectingStatistics) {
isCollectingStatistics = isCollectingStatistics || RuntimeStatisticCollector.isCollectingAllStatistics();
}
boolean isCollectingStatistics = (aspectConfiguration != null && aspectConfiguration.isStatisticsEnable())
|| RuntimeStatisticCollector.isCollectingAllStatistics();

boolean isCollectingTracing = false;
if (isCollectingProperties() || isCollectingPayloads() || isCollectingVariables()) {
isCollectingTracing = (aspectConfiguration != null && aspectConfiguration.isTracingEnabled());
}
boolean isCollectingTracing = (isCollectingProperties() || isCollectingPayloads() || isCollectingVariables())
&& aspectConfiguration != null && aspectConfiguration.isTracingEnabled();

Boolean isFlowStatisticEnabled =
(Boolean) messageContext.getProperty(StatisticsConstants.FLOW_STATISTICS_IS_COLLECTED);//todo try to use single object for "FLOW_TRACE_IS_COLLECTED"
Boolean isTracingEnabled;
if (isCollectingStatistics) {
messageContext.setProperty(StatisticsConstants.FLOW_STATISTICS_IS_COLLECTED, true);
setStatisticsTraceId(messageContext);
Expand All @@ -80,7 +74,7 @@ public static Integer reportEntryEvent(MessageContext messageContext, String com
messageContext.setProperty(StatisticsConstants.FLOW_STATISTICS_IS_COLLECTED, false);
}

isTracingEnabled = (Boolean) messageContext.getProperty(StatisticsConstants.FLOW_TRACE_IS_COLLECTED);
Boolean isTracingEnabled = (Boolean) messageContext.getProperty(StatisticsConstants.FLOW_TRACE_IS_COLLECTED);
if (shouldReportStatistic(messageContext)) {
StatisticDataUnit statisticDataUnit = new StatisticDataUnit();
statisticDataUnit.setComponentName(componentName);
Expand All @@ -103,6 +97,7 @@ public static Integer reportEntryEvent(MessageContext messageContext, String com
statisticDataUnit.setIsIndividualStatisticCollected(isCollectingStatistics);
}
StatisticDataCollectionHelper.collectData(messageContext, true, isTracingEnabled, statisticDataUnit);
statisticDataUnit.setMessageContext(messageContext);

StatisticsOpenEvent openEvent = new StatisticsOpenEvent(statisticDataUnit);
addEventAndIncrementCount(messageContext, openEvent);
Expand Down Expand Up @@ -263,6 +258,7 @@ public static void reportFlowAsynchronousEvent(MessageContext messageContext) {
BasicStatisticDataUnit dataUnit = new BasicStatisticDataUnit();
dataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(messageContext));
dataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(messageContext, null));
dataUnit.setMessageContext(messageContext);
AsynchronousExecutionEvent asynchronousExecutionEvent = new AsynchronousExecutionEvent(dataUnit);

if (isOpenTelemetryEnabled()) {
Expand Down Expand Up @@ -291,6 +287,7 @@ private static void reportMediatorStatistics(MessageContext messageContext, Stri
statisticDataUnit.setParentIndex(parentIndex);
StatisticDataCollectionHelper
.collectData(messageContext, isContentAltering, isCollectingTracing, statisticDataUnit);
statisticDataUnit.setMessageContext(messageContext);

StatisticsOpenEvent openEvent = new StatisticsOpenEvent(statisticDataUnit);
addEventAndIncrementCount(messageContext, openEvent);
Expand All @@ -308,6 +305,7 @@ public static void openContinuationEvents(MessageContext synCtx) {

basicStatisticDataUnit.setCurrentIndex(StatisticDataCollectionHelper.getParentFlowPosition(synCtx, null));
basicStatisticDataUnit.setStatisticId(StatisticDataCollectionHelper.getStatisticTraceId(synCtx));
basicStatisticDataUnit.setMessageContext(synCtx);

ParentReopenEvent parentReopenEvent = new ParentReopenEvent(basicStatisticDataUnit);
addEvent(synCtx, parentReopenEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class BasicStatisticDataUnit {
*/
private ElasticMetadata elasticMetadata;

/**
* Message context of the message flow.
*/
private MessageContext messageContext;

public String getStatisticId() {
return statisticId;
}
Expand Down Expand Up @@ -117,4 +122,12 @@ public void generateElasticMetadata(MessageContext messageContext) {
public ElasticMetadata getElasticMetadata() {
return elasticMetadata;
}

public void setMessageContext(MessageContext messageContext) {
this.messageContext = messageContext;
}

public MessageContext getMessageContext() {
return messageContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ public ElasticMetadata(MessageContext msgCtx) {
this.contextProperties = ((Axis2MessageContext) msgCtx).getProperties();
}

public ElasticMetadata(SynapseConfiguration synapseConfiguration, boolean faultResponse, String messageId,
Map<String, Object> contextEntries, Map<String, Object> contextProperties) {
this.synapseConfiguration = synapseConfiguration;
this.faultResponse = faultResponse;
this.messageId = messageId;
this.contextEntries = contextEntries;
this.contextProperties = contextProperties;
}

public SynapseConfiguration getSynapseConfiguration() {
return synapseConfiguration;
}
Expand Down
Loading