Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions lib_ecstasy/src/main/x/ecstasy/Range.x
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,34 @@ const Range<Element extends Orderable>
};
}

/**
* Determine if this `Range` is above the specified.
*
* @return True iff the specified value is below this range's lower bound, or if the specified
* value is equal to this range's lower bound and the lower bound is exclusive.
*/
Boolean isAbove(Element value) {
return switch (value <=> lowerBound) {
case (Lesser): True; // below lower bound
case (Equal ): lowerExclusive; // at lower bound
default: False;
};
}

/**
* Determine if this `Range` is below the specified value.
*
* @return True iff the specified value is above this range's upper bound, or if the specified
* value is equal to this range's upper bound and the upper bound is exclusive.
*/
Boolean isBelow(Element value) {
return switch (value <=> upperBound) {
case (Greater): True; // above upper bound
case (Equal ): upperExclusive; // at upper bound
default: False;
};
}

/**
* This range contains that range iff every value within that range is also in this range.
*/
Expand Down
157 changes: 152 additions & 5 deletions lib_jsondb/src/main/x/jsondb/TxManager.x
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
FileStore store = root.store;

this.sysDir = store.dirFor (root.path + "sys");
this.logFile = store.fileFor(root.path + "sys" + "txlog.json");
this.logFile = store.fileFor(root.path + "sys" + LogFileName);
this.statusFile = store.fileFor(root.path + "sys" + "txmgr.json");

// Configuration from injectables
this.maxLogSize = configureMaxLogSize();
this.cleanupThreshold = configureCleanupThreshold();

// build the quick lookup information for the optional transactional "modifiers"
// (validators, rectifiers, and distributors)
Boolean hasValidators = catalog.metadata?.dbObjectInfos.any(info -> !info.validators .empty) : False;
Expand Down Expand Up @@ -166,6 +170,21 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)

// ----- properties ----------------------------------------------------------------------------

/**
* The injection name prefix for all JSON DB TxManager configuration.
*/
static String ConfigTxManagerPrefix = jsondb.ConfigPrefix + ".txManager";

/**
* The name of the transaction log file.
*/
static String LogFileName = "txlog.json";

/**
* The name of the transaction log file archive directory.
*/
static String LogFileArchiveName = "txlog-archive";

/**
* The clock shared by all of the services in the database.
*/
Expand Down Expand Up @@ -270,7 +289,7 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
}

/**
* A JSON Mapping to use to serialize instances of LogFileInfo.
* A JSON Mapping to use to serialize LogFileInfo arrays.
*/
@Lazy protected Mapping<LogFileInfo[]> logFileInfoArrayMapping.calc() {
return internalJsonSchema.ensureMapping(LogFileInfo[]);
Expand All @@ -281,11 +300,20 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
*/
protected/private LogFileInfo[] logInfos = new LogFileInfo[];

/**
* The injection name for the TxManager maxLogSize configuration property.
*/
static String ConfigTxManagerMaxLogSize = ConfigTxManagerPrefix + ".maxLogSize";

/**
* The default maximum size log to store in any one log file.
*/
static Int DefaultMaxLogSize = 100K;

/**
* The maximum size log to store in any one log file.
* TODO this setting should be configurable (need a "Prefs" API)
*/
public/protected Int maxLogSize = 100K;
public/protected Int maxLogSize;

/**
* Previous modified date/time of the log.
Expand Down Expand Up @@ -358,6 +386,23 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
*/
protected Time lastActivity = EPOCH;

/**
* The injection name for the TxManager cleanupThreshold configuration property.
*/
static String ConfigTxManagerCleanupThreshold = ConfigTxManagerPrefix + ".cleanupThreshold";

/**
* The default value for the `cleanupThreshold`.
*/
static Int DefaultCleanupThreshold = 10K;

/**
* Cleanup is triggered in the maintenance phase when the number of transactions since the last
* cleanup multiplied by the number of seconds passed since the last cleanup is greater than or
* equal to this value.
*/
public/protected Int cleanupThreshold;

/**
* Used to determine when the next storage cleanup should occur.
*/
Expand Down Expand Up @@ -2997,10 +3042,11 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
// period, or after so many transactions have gone through, or some combination
Int seconds = (now - lastCleanupTime).seconds;
Int txCount = lastCommitted - lastCleanupTx;
if (seconds * txCount > 10k) {
if (seconds * txCount > cleanupThreshold) {
lastCleanupTime = now;
lastCleanupTx = lastCommitted;
cleanUpStorages();
cleanupLogfiles();
}
} catch (Exception e) {
log($"Exception occurred during background maintenance: {e}");
Expand Down Expand Up @@ -3043,6 +3089,44 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
}
}

protected void cleanupLogfiles() {
if (logInfos.size <= 1) {
// there is at most one log file, so nothing to clean up
return;
}

// ensure the log file archive directory exists under the sys directory
Directory archiveDir = sysDir.dirFor(LogFileArchiveName).ensure();
Set<Int> txSet = new ArrayOrderedSet<Int>(byReadId.keys.toArray(Constant));
Boolean archived = False;

if (Int oldestTx := txSet.first()) {
for (Int i : 0 ..< logInfos.size) {
LogFileInfo info = logInfos[i];
if (info.name != LogFileName && info.txIds.isBelow(oldestTx)) {
archived = True;
File infoFile = sysDir.fileFor(info.name);
if (infoFile.exists) {
File archiveFile = archiveDir.fileFor(infoFile.name);
if (archiveFile.exists) {
// this should never happen
archiveFile.delete();
}
infoFile.store.move(infoFile.path, archiveFile.path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty branchy and nested. In Java there are several ways to tighten down something like this and make it more stream style / temporary variable removal. Is there any way in ecstasy to prettify and shorten this flow and still make it more readable?

(Also there are probably things that I would have done here like guard ifs, early returns, avoid uncessary elses, but that is a personal preference, and your existing code is quite readable, just curious if there are way to reduce this in xtc and maintain readability mainly - nitpick)

}
} else {
// the logInfos array is ordered by oldest first, so we can stop as soon as we
// find the first log file that is not older than the oldest transaction
if (archived) {
// some files were archived, so remove all the moved files from logInfos
logInfos.deleteAll(0 ..< i);
writeStatus();
}
break;
}
}
}
}

// ----- internal ------------------------------------------------------------------------------

Expand Down Expand Up @@ -3136,4 +3220,67 @@ service TxManager<Schema extends RootSchema>(Catalog<Schema> catalog)
void recycleClient(Client<Schema> client) {
return clientCache.reversed.add(client);
}

// ----- configuration -------------------------------------------------------------------------

/**
* Determines the value to use for the maximum log file size.
*
* If the value has been configured using an injectable the injected value is validated to be
* greater than zero.
*
* @return the configured maximum log file size or the default value if not configured
*/
static Int configureMaxLogSize() {
@Inject(ConfigTxManagerMaxLogSize) Int? maxLogSize;
if (maxLogSize.is(Int)) {
assert maxLogSize > 0
as $|invalid maxLogSize ({maxLogSize}) configured using \
|"{ConfigTxManagerMaxLogSize}", value must be greater than zero
;
return maxLogSize;
}
return DefaultMaxLogSize;
}

/**
* Determines the value to use for the cleanup threshold.
*
* If the value has been configured using an injectable the injected value is validated to be
* greater than zero.
*
* @return the configured cleanup threshold. or the default value if not configured
*/
static Int configureCleanupThreshold() {
@Inject(ConfigTxManagerCleanupThreshold) Int? cleanupThreshold;
if (cleanupThreshold.is(Int)) {
assert cleanupThreshold > 0
as $|invalid cleanupThreshold ({cleanupThreshold}) configured using
|"{ConfigTxManagerCleanupThreshold}", value must be greater than zero
;
return cleanupThreshold;
}
return DefaultCleanupThreshold;
}

// ----- testing -------------------------------------------------------------------------------

/**
* @return a copy of the current LogFileInfo array.
*/
@Test(Test.Omit)
protected LogFileInfo[] getLogFileInfos() {
return logInfos.freeze(False);
}

/**
* @return a copy of the current status file contents.
*/
@Test(Test.Omit)
protected LogFileInfo[] getStatusFileContent() {
if (LogFileInfo[] infos := readStatus()) {
return infos.freeze(False);
}
return [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ service TestClient

@Lazy TestSchema testSchema.calc() = conn.as(TestSchema);

/**
* @return the underlying connection, ensuring that it is not null.
*/
Connection ensureConnection() {
Connection? conn = this.conn;
assert conn.is(Connection);
return conn;
}

/**
* The `TestSchema` implementation.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import jsondb.TxManager;
import jsondb.TxManager.LogFileInfo;

import oodb.Connection;

import test_db.*;

import xunit.annotations.RegisterExtension;
import xunit.annotations.TestInjectables;

/**
* Tests for the Json DB TxManager transaction log file management.
*/
class TxManagerLogTest {

@RegisterExtension
static TestClientProvider clientProvider = new TestClientProvider();

@Test
void shouldHaveTxLogAfterCommit() {
assert TestClient client := clientProvider.getClient();
TestSchema schema = client.testSchema;

assert TxManager<TestSchema> manager := client.&txManager
.revealAs((protected TxManager<TestSchema>));

// put some data in to make sure we have some Tx log entries
schema.mapData.put("key1", "value1");
// there should be a single tx log file
LogFileInfo[] logInfos = manager.getLogFileInfos();
assert logInfos.size == 1;
assert logInfos[0].name == TxManager.LogFileName;
assertLogsExist(logInfos, manager.sysDir);
}

@Test
// inject a small max log file size to force logs to roll
@TestInjectables(Map:[TxManager.ConfigTxManagerMaxLogSize="300"])
void shouldRollTransactionLogs() {
assert TestClient client := clientProvider.getClient();
TestSchema schema = client.testSchema;

assert TxManager<TestSchema> manager := client.&txManager
.revealAs((protected TxManager<TestSchema>));

// put some data in to make sure we have some Tx log entries
schema.mapData.put("key1", "value1");
// there should be a single tx log file
LogFileInfo[] logInfos = manager.getLogFileInfos();
assert logInfos.size == 1;
assertLogsExist(logInfos, manager.sysDir);

// put some data in to roll the logs
schema.mapData.put("key2", "value2");
schema.mapData.put("key3", "value3");
logInfos = manager.getLogFileInfos();
assert logInfos.size > 1;
}

@Test
// inject a very small max log file size to force logs to roll every transaction
@TestInjectables(Map:[TxManager.ConfigTxManagerMaxLogSize="10"])
void shouldNotArchiveInUseTransactionLogs() {
assert TestClient client := clientProvider.getClient();
TestSchema schema = client.testSchema;

assert TxManager<TestSchema> manager := client.&txManager
.revealAs((protected TxManager<TestSchema>));

// put some data in to make sure we have some Tx log entries
Int txCount = 10;
for (Int i : 0 ..< txCount) {
schema.mapData.put("key1", $"value-{i}");
}

// there should be a one log file per transaction plus an empty current log
LogFileInfo[] logInfosBefore = manager.getLogFileInfos();
Int logCountBefore = logInfosBefore.size;
assert logCountBefore == txCount + 1;
assertLogsExist(logInfosBefore, manager.sysDir);

// trigger log file cleanup
// we need to have a current transaction to trigger cleanup
Connection<TestSchema> conn = client.ensureConnection();
using (conn.createTransaction()) {
schema.mapData.get("key1"); // this will create a readId in TxManager
manager.cleanupLogfiles();
}

// there should be two log files left, the empty current file and the
// previous timestamped file
LogFileInfo[] logInfosAfter = manager.getLogFileInfos();
Int logCountAfter = logInfosAfter.size;
assert logInfosAfter.size == 2;
assert logInfosBefore[logCountBefore - 2] == logInfosAfter[0];
assert logInfosBefore[logCountBefore - 1] == logInfosAfter[1];
assertLogsExist(logInfosAfter, manager.sysDir);

// check files are in the archive directory
Directory archiveDir = manager.sysDir.dirFor(TxManager.LogFileArchiveName);
for (Int i : 0 ..< (logCountBefore - logCountAfter)) {
File logFile = archiveDir.fileFor(logInfosBefore[i].name);
assert logFile.exists as $"Archived tx log file {logFile.name} does not exist";
}

// check we did not archive the current log and in-use log
File logFile1 = archiveDir.fileFor(logInfosAfter[0].name);
File logFile2 = archiveDir.fileFor(logInfosAfter[1].name);
assert !logFile1.exists as $"Should not have archived {logFile1}";
assert !logFile2.exists as $"Should not have archived {logFile2}";

LogFileInfo[] statusInfos = manager.getStatusFileContent();
assert statusInfos.size == 2;
}

void assertLogsExist(LogFileInfo[] logInfos, Directory dir) {
for (LogFileInfo logInfo : logInfos) {
File logFile = dir.fileFor(logInfo.name);
assert logFile.exists as $"Tx log file {logFile.name} does not exist";
}
}

}
Loading