Skip to content
This repository was archived by the owner on Apr 13, 2019. It is now read-only.

Commit dd1372c

Browse files
Glenn ScottGlenn Scott
authored andcommitted
Merge pull request #13 from walendo/4023-walendo-athena-unregister-prefix
4023 walendo athena unregister prefix
2 parents 9019262 + 608f3ea commit dd1372c

11 files changed

+164
-79
lines changed

ccnx/forwarder/athena/athena.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -250,27 +250,30 @@ _processInterest(Athena *athena, CCNxInterest *interest, PARCBitVector *ingressV
250250
if (parcBitVector_NumberOfBitsSet(egressVector) == 0) {
251251
if (ccnxWireFormatMessage_ConvertInterestToInterestReturn(interest,
252252
CCNxInterestReturn_ReturnCode_NoRoute)) {
253-
254253
// NOTE: The Interest has been modified in-place. It is now an InterestReturn.
255254
parcLog_Debug(athena->log, "Returning Interest as InterestReturn (code: NoRoute)");
256-
PARCBitVector *result = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest,
257-
ingressVector);
258-
parcBitVector_Release(&result);
255+
PARCBitVector *failedLinks = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter,
256+
interest, ingressVector);
257+
if (failedLinks != NULL) {
258+
parcBitVector_Release(&failedLinks);
259+
}
259260
} else {
260261
if (ccnxName) {
261262
const char *name = ccnxName_ToString(ccnxName);
262263
parcLog_Error(athena->log, "Unable to return Interest (%s) as InterestReturn (code: NoRoute).", name);
263264
parcMemory_Deallocate(&name);
264265
} else {
265266
parcLog_Error(athena->log, "Unable to return Interest () as InterestReturn (code: NoRoute).");
266-
}
267+
}
267268
}
268269
} else {
269270
parcBitVector_SetVector(expectedReturnVector, egressVector);
270-
PARCBitVector *result = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest, egressVector);
271-
if (result) { // remove failed channels - client will resend interest unless we wish to optimize here
272-
parcBitVector_ClearVector(expectedReturnVector, result);
273-
parcBitVector_Release(&result);
271+
PARCBitVector *failedLinks =
272+
athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest, egressVector);
273+
274+
if (failedLinks) { // remove failed channels - client will resend interest unless we wish to optimize here
275+
parcBitVector_ClearVector(expectedReturnVector, failedLinks);
276+
parcBitVector_Release(&failedLinks);
274277
}
275278
}
276279
parcBitVector_Release(&egressVector);
@@ -279,12 +282,13 @@ _processInterest(Athena *athena, CCNxInterest *interest, PARCBitVector *ingressV
279282

280283
if (ccnxWireFormatMessage_ConvertInterestToInterestReturn(interest,
281284
CCNxInterestReturn_ReturnCode_NoRoute)) {
282-
283285
// NOTE: The Interest has been modified in-place. It is now an InterestReturn.
284286
parcLog_Debug(athena->log, "Returning Interest as InterestReturn (code: NoRoute)");
285-
PARCBitVector *result = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest,
286-
ingressVector);
287-
parcBitVector_Release(&result);
287+
PARCBitVector *failedLinks = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest,
288+
ingressVector);
289+
if (failedLinks != NULL) {
290+
parcBitVector_Release(&failedLinks);
291+
}
288292
} else {
289293
if (ccnxName) {
290294
const char *name = ccnxName_ToString(ccnxName);

ccnx/forwarder/athena/athena_Control.c

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,14 @@ athenaControl(Athena *athena, CCNxControl *control, PARCBitVector *ingressVector
8686
// -> ListPIT
8787
// -> Unsolicited message on channel XXX
8888
//
89+
bool commandResult = false;
90+
91+
CpiOperation operation = cpi_GetMessageOperation(control);
92+
switch (operation) {
93+
case CPI_REGISTER_PREFIX: // Add FIB route
94+
// Fall through
95+
case CPI_UNREGISTER_PREFIX: { // Remove FIB route
8996

90-
switch (cpi_GetMessageOperation(control)) {
91-
// Add FIB route
92-
case CPI_REGISTER_PREFIX: {
9397
PARCBitVector *egressVector = parcBitVector_Create();
9498

9599
CPIRouteEntry *cpiRouteEntry = cpiForwarding_RouteFromControlMessage(control);
@@ -102,35 +106,58 @@ athenaControl(Athena *athena, CCNxControl *control, PARCBitVector *ingressVector
102106
parcBitVector_Set(egressVector, linkId);
103107
}
104108

105-
if (prefix) {
106-
const char *name = ccnxName_ToString(prefix);
109+
char *prefixString = ccnxName_ToString(prefix);
110+
unsigned interface = parcBitVector_NextBitSet(egressVector, 0);
111+
if (operation == CPI_REGISTER_PREFIX) {
107112
parcLog_Debug(athena->log, "Adding %s route to interface %d",
108-
name, parcBitVector_NextBitSet(egressVector, 0));
109-
parcMemory_Deallocate(&name);
110-
} else {
111-
parcLog_Debug(athena->log, "Adding NULL route to interface %d",
112-
parcBitVector_NextBitSet(egressVector, 0));
113+
prefixString, interface);
114+
commandResult = athenaFIB_AddRoute(athena->athenaFIB, prefix, egressVector);
115+
if (!commandResult) {
116+
parcLog_Warning(athena->log, "Unable to add route %s to interface %d",
117+
prefixString, interface);
118+
}
119+
120+
} else { // Must be CPI_UNREGISTER_PREFIX
121+
parcLog_Debug(athena->log, "Removing %s route from interface %d",
122+
prefixString, interface);
123+
commandResult = athenaFIB_DeleteRoute(athena->athenaFIB, prefix, egressVector);
124+
if (!commandResult) {
125+
parcLog_Warning(athena->log, "Unable to remove route %s from interface %d",
126+
prefixString, interface);
127+
}
113128
}
114-
athenaFIB_AddRoute(athena->athenaFIB, prefix, egressVector);
129+
parcMemory_Deallocate(&prefixString);
130+
parcBitVector_Release(&egressVector);
131+
132+
// Now send an ACK for the control message back to the sender.
115133

116134
PARCJSON *json = ccnxControl_GetJson(control);
135+
parcJSON_AddBoolean(json, "RESULT", commandResult);
136+
117137
PARCJSON *jsonAck = cpiAcks_CreateAck(json);
118138

119139
CCNxControl *response = ccnxControl_CreateCPIRequest(jsonAck);
120140
parcJSON_Release(&jsonAck);
121141

122142
athena_EncodeMessage(response);
123-
PARCBitVector *result = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, response, ingressVector);
143+
144+
PARCBitVector *result =
145+
athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, response, ingressVector);
146+
124147
if (result) { // failed channels - client will resend interest unless we wish to optimize things here
125148
parcBitVector_Release(&result);
149+
parcLog_Warning(athena->log, "Unable to send ACK response via interface %d", interface);
126150
}
127151
ccnxControl_Release(&response);
152+
cpiRouteEntry_Destroy(&cpiRouteEntry);
153+
128154
} break;
155+
129156
default:
130157
parcLog_Error(athena->log, "unknown operation %d", cpi_GetMessageOperation(control));
131158
}
132159

133-
return 0;
160+
return commandResult == true? 0 : -1;
134161
}
135162

136163
CCNxMetaMessage *

ccnx/forwarder/athena/athena_TransportLinkAdapter.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -853,28 +853,32 @@ athenaTransportLinkAdapter_Send(AthenaTransportLinkAdapter *athenaTransportLinkA
853853
CCNxMetaMessage *ccnxMetaMessage,
854854
PARCBitVector *linkOutputVector)
855855
{
856-
PARCBitVector *resultVector = parcBitVector_Create();
857856
int nextLinkToWrite = 0;
858857

859858
if (athenaTransportLinkAdapter->instanceList == NULL) {
860-
return resultVector;
859+
return NULL;
861860
}
862861

862+
PARCBitVector *resultVector = parcBitVector_Create();
863+
863864
while ((nextLinkToWrite = parcBitVector_NextBitSet(linkOutputVector, nextLinkToWrite)) >= 0) {
864865
athenaTransportLinkAdapter->stats.messageSend_Attempted++;
865866
if (nextLinkToWrite >= parcArrayList_Size(athenaTransportLinkAdapter->instanceList)) {
866867
athenaTransportLinkAdapter->stats.messageSend_LinkDoesNotExist++;
868+
parcBitVector_Set(resultVector, nextLinkToWrite);
867869
nextLinkToWrite++;
868870
continue;
869871
}
870872
AthenaTransportLink *athenaTransportLink = parcArrayList_Get(athenaTransportLinkAdapter->instanceList, nextLinkToWrite);
871873
if (athenaTransportLink == NULL) {
872874
athenaTransportLinkAdapter->stats.messageSend_LinkDoesNotExist++;
875+
parcBitVector_Set(resultVector, nextLinkToWrite);
873876
nextLinkToWrite++;
874877
continue;
875878
}
876879
if (!(athenaTransportLink_GetEvent(athenaTransportLink) & AthenaTransportLinkEvent_Send)) {
877880
athenaTransportLinkAdapter->stats.messageSend_LinkNotAcceptingSendRequests++;
881+
parcBitVector_Set(resultVector, nextLinkToWrite);
878882
nextLinkToWrite++;
879883
continue;
880884
}
@@ -892,13 +896,18 @@ athenaTransportLinkAdapter_Send(AthenaTransportLinkAdapter *athenaTransportLinkA
892896
int result = athenaTransportLink_Send(athenaTransportLink, ccnxMetaMessage);
893897
if (result == 0) {
894898
athenaTransportLinkAdapter->stats.messageSent++;
895-
parcBitVector_Set(resultVector, nextLinkToWrite);
896899
} else {
897900
athenaTransportLinkAdapter->stats.messageSend_LinkSendFailed++;
901+
parcBitVector_Set(resultVector, nextLinkToWrite);
898902
}
899903
nextLinkToWrite++;
900904
}
901905

906+
if (parcBitVector_NumberOfBitsSet(resultVector) == 0) {
907+
parcBitVector_Release(&resultVector);
908+
}
909+
910+
// A NULL resultVector means message was successfully sent to all links
902911
return resultVector;
903912
}
904913

ccnx/forwarder/athena/test/test_athena.c

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@
6464
#include <errno.h>
6565

6666
#include <parc/algol/parc_SafeMemory.h>
67-
#include <parc/security/parc_CryptoHasher.h>
68-
#include <ccnx/common/ccnx_WireFormatMessage.h>
69-
#include <ccnx/common/codec/ccnxCodec_TlvPacket.h>
70-
#include <ccnx/common/validation/ccnxValidation_CRC32C.h>
7167
#include <ccnx/common/ccnx_NameSegmentNumber.h>
7268
#include <ccnx/common/internal/ccnx_InterestDefault.h>
7369

@@ -104,6 +100,8 @@ LONGBOW_TEST_FIXTURE(Global)
104100
LONGBOW_RUN_TEST_CASE(Global, athena_ProcessControl);
105101
LONGBOW_RUN_TEST_CASE(Global, athena_ProcessInterestReturn);
106102
LONGBOW_RUN_TEST_CASE(Global, athena_ForwarderEngine);
103+
104+
LONGBOW_RUN_TEST_CASE(Global, athena_ProcessControl_CPI_REGISTER_PREFIX);
107105
}
108106

109107
LONGBOW_TEST_FIXTURE_SETUP(Global)
@@ -299,6 +297,67 @@ LONGBOW_TEST_CASE(Global, athena_ProcessControl)
299297
athena_Release(&athena);
300298
}
301299

300+
LONGBOW_TEST_CASE(Global, athena_ProcessControl_CPI_REGISTER_PREFIX)
301+
{
302+
PARCURI *connectionURI;
303+
Athena *athena = athena_Create(100);
304+
305+
CCNxName *name = ccnxName_CreateFromCString("ccnx:/foo/bar");
306+
CCNxControl *control = ccnxControl_CreateAddRouteToSelfRequest(name); // CPI_REGISTER_PREFIX
307+
CCNxMetaMessage *registerPrefixCommand = ccnxMetaMessage_CreateFromControl(control);
308+
ccnxControl_Release(&control);
309+
310+
control = ccnxControl_CreateRemoveRouteToSelfRequest(name); // CPI_UNREGISTER_PREFIX
311+
CCNxMetaMessage *unregisterPrefixCommand = ccnxMetaMessage_CreateFromControl(control);
312+
ccnxControl_Release(&control);
313+
ccnxName_Release(&name);
314+
315+
connectionURI = parcURI_Parse("tcp://localhost:50100/listener/name=TCPListener");
316+
const char *result = athenaTransportLinkAdapter_Open(athena->athenaTransportLinkAdapter, connectionURI);
317+
assertTrue(result != NULL, "athenaTransportLinkAdapter_Open failed (%s)", strerror(errno));
318+
parcURI_Release(&connectionURI);
319+
320+
connectionURI = parcURI_Parse("tcp://localhost:50100/name=TCP_0");
321+
result = athenaTransportLinkAdapter_Open(athena->athenaTransportLinkAdapter, connectionURI);
322+
assertTrue(result != NULL, "athenaTransportLinkAdapter_Open failed (%s)", strerror(errno));
323+
parcURI_Release(&connectionURI);
324+
325+
int linkId = athenaTransportLinkAdapter_LinkNameToId(athena->athenaTransportLinkAdapter, "TCP_0");
326+
PARCBitVector *ingressVector = parcBitVector_Create();
327+
parcBitVector_Set(ingressVector, linkId);
328+
329+
// Call _Receive() once to prime the link. Messages are dropped until _Receive() is called once.
330+
PARCBitVector *linksRead = NULL;
331+
CCNxMetaMessage *msg = athenaTransportLinkAdapter_Receive(athena->athenaTransportLinkAdapter, &linksRead, -1);
332+
assertNull(msg, "Expected to NOT receive a message after the first call to _Receive()");
333+
334+
CCNxMetaMessage *cpiMessages[2];
335+
cpiMessages[0] = registerPrefixCommand; // CPI_REGISTER_PREFIX
336+
cpiMessages[1] = unregisterPrefixCommand; // CPI_UNREGISTER_PREFIX
337+
338+
for (int i = 0; i < 2; i++) {
339+
CCNxMetaMessage *cpiMessageToSend = cpiMessages[i];
340+
athena_ProcessMessage(athena, cpiMessageToSend, ingressVector);
341+
ccnxMetaMessage_Release(&cpiMessageToSend);
342+
343+
CCNxMetaMessage *ack = athenaTransportLinkAdapter_Receive(athena->athenaTransportLinkAdapter, &linksRead, -1);
344+
assertNotNull(ack, "Expected a CPI_ACK message back");
345+
assertTrue(ccnxMetaMessage_IsControl(ack), "Expected a control message back");
346+
parcBitVector_Release(&linksRead);
347+
348+
PARCJSON *json = ccnxControl_GetJson(ack);
349+
const PARCJSONValue *cpiAckResult = parcJSON_GetByPath(json, "CPI_ACK/REQUEST/RESULT");
350+
bool commandResult = parcJSONValue_GetBoolean(cpiAckResult);
351+
assertTrue(commandResult, "Expected the ACK to contain RESULT=true");
352+
353+
ccnxMetaMessage_Release(&ack);
354+
}
355+
356+
parcBitVector_Release(&ingressVector);
357+
athena_Release(&athena);
358+
}
359+
360+
302361
LONGBOW_TEST_CASE(Global, athena_ProcessInterestReturn)
303362
{
304363
PARCURI *connectionURI;
@@ -380,14 +439,14 @@ LONGBOW_TEST_CASE(Global, athena_ForwarderEngine)
380439

381440
athena_EncodeMessage(interest);
382441

383-
PARCBitVector *resultVector = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest, linkVector);
384-
assertNotNull(resultVector, "athenaTransportLinkAdapter_Send failed");
385-
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 1, "Exit message not sent");
442+
PARCBitVector
443+
*resultVector = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest, linkVector);
444+
assertNull(resultVector, "athenaTransportLinkAdapter_Send failed");
386445
ccnxMetaMessage_Release(&interest);
387446
parcBitVector_Release(&linkVector);
388-
parcBitVector_Release(&resultVector);
389447

390-
CCNxMetaMessage *response = athenaTransportLinkAdapter_Receive(athena->athenaTransportLinkAdapter, &resultVector, -1);
448+
CCNxMetaMessage
449+
*response = athenaTransportLinkAdapter_Receive(athena->athenaTransportLinkAdapter, &resultVector, -1);
391450
assertNotNull(resultVector, "athenaTransportLinkAdapter_Receive failed");
392451
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) > 0, "athenaTransportLinkAdapter_Receive failed");
393452
parcBitVector_Release(&resultVector);

ccnx/forwarder/athena/test/test_athena_InterestControl.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,11 +534,9 @@ LONGBOW_TEST_CASE(Global, athenaInterestControl_Spawn)
534534
athena_EncodeMessage(interest);
535535

536536
PARCBitVector *resultVector = athenaTransportLinkAdapter_Send(athena->athenaTransportLinkAdapter, interest, linkVector);
537-
assertNotNull(resultVector, "athenaTransportLinkAdapter_Send failed");
538-
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 1, "Exit message not sent");
537+
assertNull(resultVector, "athenaTransportLinkAdapter_Send failed");
539538
ccnxMetaMessage_Release(&interest);
540539
parcBitVector_Release(&linkVector);
541-
parcBitVector_Release(&resultVector);
542540

543541
//usleep(1000);
544542

ccnx/forwarder/athena/test/test_athena_TransportLinkAdapter.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,7 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkAdapter_SendReceive)
334334
athena_EncodeMessage(sendMessage);
335335

336336
resultVector = athenaTransportLinkAdapter_Send(athenaTransportLinkAdapter, sendMessage, linkVector);
337-
assertNotNull(resultVector, "athenaTransportLinkAdapter_Send failed");
338-
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == parcBitVector_NumberOfBitsSet(linkVector), "athenaTransportLinkAdapter_Send failed to send to multiple links");
339-
parcBitVector_Release(&resultVector);
337+
assertNull(resultVector, "athenaTransportLinkAdapter_Send failed");
340338

341339
usleep(1000);
342340

@@ -378,14 +376,14 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkAdapter_SendReceive)
378376
assertTrue(closeResult == 0, "athenaTransportLinkAdapter_CloseByName failed (%s)", strerror(errno));
379377

380378
resultVector = athenaTransportLinkAdapter_Send(athenaTransportLinkAdapter, sendMessage, linkVector);
381-
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 1, "athenaTransportLinkAdapter_Send should have partially failed");
379+
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 2, "athenaTransportLinkAdapter_Send should have partially failed");
382380
parcBitVector_Release(&linkVector);
383381
parcBitVector_Release(&resultVector);
384382

385383
linkVector = parcBitVector_Create();
386384
parcBitVector_Set(linkVector, 100);
387385
resultVector = athenaTransportLinkAdapter_Send(athenaTransportLinkAdapter, sendMessage, linkVector);
388-
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 0, "athenaTransportLinkAdapter_Send should have failed to send to unknown link");
386+
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 1, "athenaTransportLinkAdapter_Send should have failed to send to unknown link");
389387
parcBitVector_Release(&resultVector);
390388
parcBitVector_Release(&linkVector);
391389
ccnxMetaMessage_Release(&sendMessage);

0 commit comments

Comments
 (0)