Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARTEMIS-1986] PagingTest#testDeletePhysicalPages will fail if a record about deleting a page is not saved in journal #258

Open
wants to merge 1 commit into
base: jboss-1.5.5-x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public interface PageSubscription {
*/
void reloadACK(PagePosition position);

void reloadPageCompletion(PagePosition position) throws Exception;
boolean reloadPageCompletion(PagePosition position) throws Exception;

void reloadPageInfo(long pageNr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,12 @@ public PageSubscriptionCounter getCounter() {
* cursor/subscription.
*/
@Override
public void reloadPageCompletion(PagePosition position) throws Exception {
public boolean reloadPageCompletion(PagePosition position) throws Exception {
// if page doesn't exist, completed record is removed
if (pageStore != null && !pageStore.checkPageFileExists((int)position.getPageNr())) {
return false;
}

// if the current page is complete, we must move it out of the way
if (pageStore != null && pageStore.getCurrentPage() != null &&
pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
Expand All @@ -201,6 +206,7 @@ public void reloadPageCompletion(PagePosition position) throws Exception {
synchronized (consumedPages) {
consumedPages.put(Long.valueOf(position.getPageNr()), info);
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ public Page getCurrentPage() {
@Override
public boolean checkPageFileExists(final int pageNumber) {
String fileName = createFileName(pageNumber);

try {
checkFileFactory();
} catch (Exception ignored) {
}

SequentialFile file = fileFactory.createSequentialFile(fileName);
return file.exists();
}
Expand All @@ -545,6 +551,12 @@ public Page createPage(final int pageNumber) throws Exception {
return page;
}

private void checkFileFactory() throws Exception {
if (fileFactory == null) {
fileFactory = storeFactory.newFileFactory(getStoreName());
}
}

@Override
public void forceAnotherPage() throws Exception {
openNewPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,12 @@ public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);

if (sub != null) {
sub.reloadPageCompletion(encoding.position);
if (!sub.reloadPageCompletion(encoding.position)) {
if (logger.isDebugEnabled()) {
logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress());
}
messageJournal.appendDeleteRecord(record.id, false);
}
} else {
ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,153 @@ public void testDeletePhysicalPages() throws Exception {

}

// 4 messages are send/received, it creates 2 pages, where for second page there is no delete completion record in journal
// server is restarted and 4 messages sent/received again. There should be no lost message.
@Test
public void testRestartWithCompleteAndDeletedPhysicalPage() throws Exception {
clearDataRecreateServerDirs();

Configuration config = createDefaultInVMConfig();

final AtomicBoolean mainCleanup = new AtomicBoolean(true);

class InterruptedCursorProvider extends PageCursorProviderImpl {

InterruptedCursorProvider(PagingStore pagingStore,
StorageManager storageManager,
Executor executor,
int maxCacheSize) {
super(pagingStore, storageManager, executor, maxCacheSize);
}

@Override
public void cleanup() {
if (mainCleanup.get()) {
super.cleanup();
} else {
try {
pagingStore.unlock();
} catch (Throwable ignored) {
}
}
}
}

server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
@Override
public PageCursorProvider newCursorProvider(PagingStore store,
StorageManager storageManager,
AddressSettings addressSettings,
Executor executor) {
return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
}
};
}

};

addServer(server);

AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(MESSAGE_SIZE).setMaxSizeBytes(2*MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);

server.getAddressSettingsRepository().addMatch("#", defaultSetting);

server.start();

locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);

sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);

Queue queue = server.locateQueue(ADDRESS);

ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

ClientMessage message;

for (int i = 0; i < 4; i++) {
message = session.createMessage(true);

ActiveMQBuffer bodyLocal = message.getBodyBuffer();

bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);

producer.send(message);
session.commit();

//last page (#2, whch contains only message #3) is marked as complete - is full - but no delete complete record is added
if (i == 3) {
queue.getPageSubscription().getPagingStore().forceAnotherPage();
}

}

Assert.assertEquals(3, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());

ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();

for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("Before restart - message "+ i + " is empty.",message);
message.acknowledge();
}



server.stop();
mainCleanup.set(false);



// Deleting the paging data. Simulating a failure
// a dumb user, or anything that will remove the data
deleteDirectory(new File(getPageDir()));

logger.trace("Server restart");

server.start();

locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = sf.createSession(null, null, false, false, true, false, 0);
producer = session.createProducer(PagingTest.ADDRESS);

for (int i = 0; i < 4; i++) {
message = session.createMessage(true);

ActiveMQBuffer bodyLocal = message.getBodyBuffer();

bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);


producer.send(message);
}
session.commit();

mainCleanup.set(true);

queue = server.locateQueue(ADDRESS);
queue.getPageSubscription().cleanupEntries(false);
queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();

consumer = session.createConsumer(ADDRESS);
session.start();

for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("After restart - message "+ i + " is empty.",message);
message.acknowledge();
}

server.stop();

}

@Test
public void testMissingTXEverythingAcked() throws Exception {
clearDataRecreateServerDirs();
Expand Down