From 400d705b1c124e7da1ebce8e44b1f298ac17f6e0 Mon Sep 17 00:00:00 2001 From: "Barry M. Caceres" Date: Mon, 29 Sep 2025 11:21:05 -0700 Subject: [PATCH 1/2] Updated redo snippets to avoid extensive calling of countRedoRecords() --- csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs | 7 +++---- csharp/snippets/redo/RedoContinuousViaFutures/Program.cs | 3 +++ java/snippets/redo/LoadWithRedoViaLoop.java | 8 ++++---- java/snippets/redo/RedoContinuousViaFutures.java | 3 +++ 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs index d22cf47..ef294ee 100644 --- a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs +++ b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs @@ -109,11 +109,10 @@ } // now that we have loaded the records, check for redos and handle them - while (engine.CountRedoRecords() > 0) + for (string redo = engine.GetRedoRecord(); + redo != null; + redo = engine.GetRedoRecord()) { - // get the next redo record - string redo = engine.GetRedoRecord(); - try { // process the redo record diff --git a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs index 7bd8195..ec404f5 100644 --- a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs +++ b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs @@ -116,6 +116,9 @@ TaskScheduler taskScheduler } while (pendingFutures.Count >= MaximumBacklog); // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero if (engine.CountRedoRecords() == 0) { OutputRedoStatistics(); diff --git a/java/snippets/redo/LoadWithRedoViaLoop.java b/java/snippets/redo/LoadWithRedoViaLoop.java index bf853b4..0e8ad45 100644 --- a/java/snippets/redo/LoadWithRedoViaLoop.java +++ b/java/snippets/redo/LoadWithRedoViaLoop.java @@ -98,10 +98,10 @@ public static void main(String[] args) { } // now that we have loaded the records, check for redos and handle them - while (engine.countRedoRecords() > 0) { - // get the next redo record - String redo = engine.getRedoRecord(); - + for (String redo = engine.getRedoRecord(); + redo != null; + redo = engine.getRedoRecord()) + { try { // process the redo record engine.processRedoRecord(redo, SZ_NO_FLAGS); diff --git a/java/snippets/redo/RedoContinuousViaFutures.java b/java/snippets/redo/RedoContinuousViaFutures.java index eae1d5d..e897d1a 100644 --- a/java/snippets/redo/RedoContinuousViaFutures.java +++ b/java/snippets/redo/RedoContinuousViaFutures.java @@ -103,6 +103,9 @@ public static void main(String[] args) { } while (pendingFutures.size() >= MAXIMUM_BACKLOG); // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero if (engine.countRedoRecords() == 0) { outputRedoStatistics(); System.out.println(); From b9ccde3b61f9f402f5ba86f331a7cafadb2a9cdb Mon Sep 17 00:00:00 2001 From: "Barry M. Caceres" Date: Mon, 29 Sep 2025 17:06:21 -0700 Subject: [PATCH 2/2] Linting fixes -- mainly enforcing 4-space indentation instead of 2-space indentiation --- csharp/runner/SnippetRunner/Program.cs | 996 +++++++++--------- .../deleting/DeleteViaFutures/Program.cs | 420 ++++---- .../snippets/loading/LoadRecords/Program.cs | 88 +- .../loading/LoadViaFutures/Program.cs | 426 ++++---- .../loading/LoadWithInfoViaFutures/Program.cs | 486 ++++----- .../loading/LoadWithStatsViaLoop/Program.cs | 276 ++--- .../redo/LoadWithRedoViaLoop/Program.cs | 330 +++--- .../snippets/redo/RedoContinuous/Program.cs | 228 ++-- .../redo/RedoContinuousViaFutures/Program.cs | 398 +++---- .../redo/RedoWithInfoContinuous/Program.cs | 302 +++--- .../searching/SearchRecords/Program.cs | 106 +- .../searching/SearchViaFutures/Program.cs | 420 ++++---- .../stewardship/ForceResolve/Program.cs | 168 +-- .../stewardship/ForceUnresolve/Program.cs | 168 +-- 14 files changed, 2406 insertions(+), 2406 deletions(-) diff --git a/csharp/runner/SnippetRunner/Program.cs b/csharp/runner/SnippetRunner/Program.cs index ad20d99..862ff70 100644 --- a/csharp/runner/SnippetRunner/Program.cs +++ b/csharp/runner/SnippetRunner/Program.cs @@ -40,448 +40,448 @@ DirectoryInfo? runnerDir = null; switch (dir.Name) { - case "snippets": - snippetDir = dir; - break; - case "runner": - runnerDir = dir; - break; - case "csharp": - csharpDir = dir; - break; - default: - HandleWrongDirectory(); - break; + case "snippets": + snippetDir = dir; + break; + case "runner": + runnerDir = dir; + break; + case "csharp": + csharpDir = dir; + break; + default: + HandleWrongDirectory(); + break; } // if no snippet dir, try to find the csharp dir from the runner dir if (snippetDir == null && runnerDir != null) { - csharpDir = Directory.GetParent(runnerDir.FullName); - if (!"csharp".Equals(csharpDir?.Name, Ordinal)) - { - HandleWrongDirectory(); - } + csharpDir = Directory.GetParent(runnerDir.FullName); + if (!"csharp".Equals(csharpDir?.Name, Ordinal)) + { + HandleWrongDirectory(); + } } // if no snippet dir, try to find it using the csharp dir if (snippetDir == null && csharpDir != null) { - snippetDir = new DirectoryInfo(Path.Combine(csharpDir.FullName, "snippets")); - if (!snippetDir.Exists) - { - HandleWrongDirectory(); - } + snippetDir = new DirectoryInfo(Path.Combine(csharpDir.FullName, "snippets")); + if (!snippetDir.Exists) + { + HandleWrongDirectory(); + } } if (snippetDir == null) { - HandleWrongDirectory(); - Environment.Exit(1); - return; + HandleWrongDirectory(); + Environment.Exit(1); + return; } try { - SortedDictionary> snippetsMap - = GetSnippetsMap(snippetDir); - - SortedDictionary> snippetOptions - = new SortedDictionary>(); - foreach (KeyValuePair> entry in snippetsMap) - { - string group = entry.Key; - IDictionary snippetMap = entry.Value; - List<(string, string, string)> tuples - = new List<(string, string, string)>(snippetMap.Count); - - foreach (KeyValuePair subEntry in snippetMap) - { - string snippet = subEntry.Key; - string snippetPath = subEntry.Value; - tuples.Add((group, snippet, snippetPath)); - } - snippetOptions.Add(group, tuples.AsReadOnly()); - } - - foreach (KeyValuePair> entry in snippetsMap) - { - string group = entry.Key; - IDictionary snippetMap = entry.Value; - foreach (KeyValuePair subEntry in snippetMap) - { - string snippet = subEntry.Key; - string snippetPath = subEntry.Value; - List<(string, string, string)> tuples = new List<(string, string, string)>(1); - tuples.Add((group, snippet, snippetPath)); - snippetOptions.Add(snippet, tuples.AsReadOnly()); - } - } + SortedDictionary> snippetsMap + = GetSnippetsMap(snippetDir); - if (args.Length == 0) - { - PrintUsage(snippetsMap); - Environment.Exit(1); - } + SortedDictionary> snippetOptions + = new SortedDictionary>(); + foreach (KeyValuePair> entry in snippetsMap) + { + string group = entry.Key; + IDictionary snippetMap = entry.Value; + List<(string, string, string)> tuples + = new List<(string, string, string)>(snippetMap.Count); - // check for settings in the environment - string? settings - = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); + foreach (KeyValuePair subEntry in snippetMap) + { + string snippet = subEntry.Key; + string snippetPath = subEntry.Value; + tuples.Add((group, snippet, snippetPath)); + } + snippetOptions.Add(group, tuples.AsReadOnly()); + } - // validate the settings if we have them - if (settings != null) - { - settings = settings.Trim(); - JsonObject? settingsJson = null; - try + foreach (KeyValuePair> entry in snippetsMap) { - settingsJson = JsonNode.Parse(settings)?.AsObject(); - if (settingsJson == null) - { - throw new ArgumentNullException("Setting must be a JSON object: " + settings); - } + string group = entry.Key; + IDictionary snippetMap = entry.Value; + foreach (KeyValuePair subEntry in snippetMap) + { + string snippet = subEntry.Key; + string snippetPath = subEntry.Value; + List<(string, string, string)> tuples = new List<(string, string, string)>(1); + tuples.Add((group, snippet, snippetPath)); + snippetOptions.Add(snippet, tuples.AsReadOnly()); + } } - catch (Exception e) + + if (args.Length == 0) { - Console.Error.WriteLine(e); - Console.Error.WriteLine("The provided Senzing settings were not valid JSON:"); - Console.Error.WriteLine(); - Environment.Exit(1); - throw; + PrintUsage(snippetsMap); + Environment.Exit(1); } - } - // validate the SENZING_DIR - InstallLocations? installLocations = null; - try - { - installLocations = InstallLocations.FindLocations(); + // check for settings in the environment + string? settings + = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); - } - catch (Exception e) - { - Console.Error.WriteLine(e); - Environment.Exit(1); - throw; - } - if (installLocations == null) - { - Console.Error.WriteLine("Could not find the Senzing installation."); - Console.Error.WriteLine("Try setting the SENZING_DIR environment variable."); - Environment.Exit(1); - return; - } - - List<(string, string)> snippets = new List<(string, string)>(100); - for (int index = 0; index < args.Length; index++) - { - string arg = args[index]; - if (arg.Equals("all", Ordinal)) + // validate the settings if we have them + if (settings != null) { - foreach (IDictionary snippetMap in snippetsMap.Values) - { - foreach (KeyValuePair entry in snippetMap) + settings = settings.Trim(); + JsonObject? settingsJson = null; + try + { + settingsJson = JsonNode.Parse(settings)?.AsObject(); + if (settingsJson == null) + { + throw new ArgumentNullException("Setting must be a JSON object: " + settings); + } + } + catch (Exception e) { - string snippet = entry.Key; - string snippetPath = entry.Value; - if (!snippets.Contains((snippet, snippetPath))) - { - snippets.Add((snippet, snippetPath)); - } + Console.Error.WriteLine(e); + Console.Error.WriteLine("The provided Senzing settings were not valid JSON:"); + Console.Error.WriteLine(); + Environment.Exit(1); + throw; } - } - continue; } - if (!snippetOptions.ContainsKey(arg)) + + // validate the SENZING_DIR + InstallLocations? installLocations = null; + try { - Console.Error.WriteLine("Unrecognized code snippet or snippet group: " + arg); - Environment.Exit(1); + installLocations = InstallLocations.FindLocations(); + } - IList<(string, string, string)> tuples = snippetOptions[arg]; - foreach ((string group, string snippet, string path) in tuples) + catch (Exception e) { - if (!snippets.Contains((snippet, path))) - { - snippets.Add((snippet, path)); - } + Console.Error.WriteLine(e); + Environment.Exit(1); + throw; + } + if (installLocations == null) + { + Console.Error.WriteLine("Could not find the Senzing installation."); + Console.Error.WriteLine("Try setting the SENZING_DIR environment variable."); + Environment.Exit(1); + return; } - } - - // check if we do not have settings and if not setup a temporary repository - if (settings == null) - { - settings = SetupTempRepository(installLocations); - } - - long defaultConfigID; - - SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try - { - SzConfigManager configMgr = env.GetConfigManager(); - defaultConfigID = configMgr.GetDefaultConfigID(); - - } - catch (SzException e) - { - Console.Error.WriteLine(e); - Environment.Exit(1); - return; - - } - finally - { - env.Destroy(); - } - foreach ((string snippet, string snippetPath) in snippets) - { - Console.WriteLine(); - Stopwatch stopwatch = Stopwatch.StartNew(); - Dictionary properties = new Dictionary(); - string resourceName = $"""{assemblyName}.Resources.{snippet}.properties"""; - LoadProperties(properties, resourceName); - Console.WriteLine("Preparing repository for " + snippet + "..."); - env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try + List<(string, string)> snippets = new List<(string, string)>(100); + for (int index = 0; index < args.Length; index++) { - // first purge the repository - SzDiagnostic diagnostic = env.GetDiagnostic(); - diagnostic.PurgeRepository(); - - // now set the configuration - SzConfigManager configMgr = env.GetConfigManager(); - // check if we need to configure sources - if (properties.ContainsKey(SourceKeyPrefix + 0)) - { - SzConfig config = configMgr.CreateConfig(); - for (int index = 0; - properties.ContainsKey(SourceKeyPrefix + index); - index++) + string arg = args[index]; + if (arg.Equals("all", Ordinal)) { - string sourceKey = SourceKeyPrefix + index; - string source = properties[sourceKey]; - source = source.Trim(); - Console.WriteLine("Adding data source: " + source); - config.RegisterDataSource(source); + foreach (IDictionary snippetMap in snippetsMap.Values) + { + foreach (KeyValuePair entry in snippetMap) + { + string snippet = entry.Key; + string snippetPath = entry.Value; + if (!snippets.Contains((snippet, snippetPath))) + { + snippets.Add((snippet, snippetPath)); + } + } + } + continue; } - string snippetConfig = config.Export(); - - // register the config - configMgr.SetDefaultConfig(snippetConfig); - } - else - { - // set the default config to the initial default - configMgr.SetDefaultConfigID(defaultConfigID); - } - - // check if there are files we need to load - if (properties.ContainsKey(LoadKeyPrefix + 0)) - { - SzEngine engine = env.GetEngine(); - for (int index = 0; properties.ContainsKey(LoadKeyPrefix + index); index++) + if (!snippetOptions.ContainsKey(arg)) { - string loadKey = LoadKeyPrefix + index; - string fileName = properties[loadKey]; - fileName = fileName.Trim(); - Console.WriteLine("Loading records from file resource: " + fileName); - Stream? stream = assembly.GetManifestResourceStream(fileName); - if (stream == null) - { - throw new ArgumentException( - "Missing resource (" + fileName + ") for load file (" - + loadKey + ") for snippet (" + snippet + ")"); - } - StreamReader rdr = new StreamReader(stream, Encoding.UTF8); - try - { - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + Console.Error.WriteLine("Unrecognized code snippet or snippet group: " + arg); + Environment.Exit(1); + } + IList<(string, string, string)> tuples = snippetOptions[arg]; + foreach ((string group, string snippet, string path) in tuples) + { + if (!snippets.Contains((snippet, path))) { - line = line.Trim(); - if (line.Length == 0) continue; - if (line.StartsWith('#')) continue; - JsonObject? record = JsonNode.Parse(line)?.AsObject(); - if (record == null) - { - throw new JsonException("Failed to parse line as JSON: " + line); - } - string dataSource = record.ContainsKey(DataSource) - ? record[DataSource]?.GetValue() ?? TestSource : TestSource; - string? recordID = record.ContainsKey(RecordID) - ? record[RecordID]?.GetValue() : null; - engine.AddRecord(dataSource, recordID, line, SzNoFlags); + snippets.Add((snippet, path)); } - } - finally - { - rdr.Close(); - stream.Close(); - } - } - } + } + + // check if we do not have settings and if not setup a temporary repository + if (settings == null) + { + settings = SetupTempRepository(installLocations); + } + + long defaultConfigID; + + SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + SzConfigManager configMgr = env.GetConfigManager(); + defaultConfigID = configMgr.GetDefaultConfigID(); } catch (SzException e) { - Console.Error.WriteLine(e); - Environment.Exit(1); - return; + Console.Error.WriteLine(e); + Environment.Exit(1); + return; + } finally { - env.Destroy(); + env.Destroy(); } - long duration = stopwatch.ElapsedMilliseconds; - Console.WriteLine("Prepared repository for " + snippet + ". (" + duration + "ms)"); - ExecuteSnippet(snippet, snippetPath, installLocations, settings, properties); - } + foreach ((string snippet, string snippetPath) in snippets) + { + Console.WriteLine(); + Stopwatch stopwatch = Stopwatch.StartNew(); + Dictionary properties = new Dictionary(); + string resourceName = $"""{assemblyName}.Resources.{snippet}.properties"""; + LoadProperties(properties, resourceName); + Console.WriteLine("Preparing repository for " + snippet + "..."); + env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + // first purge the repository + SzDiagnostic diagnostic = env.GetDiagnostic(); + diagnostic.PurgeRepository(); + + // now set the configuration + SzConfigManager configMgr = env.GetConfigManager(); + // check if we need to configure sources + if (properties.ContainsKey(SourceKeyPrefix + 0)) + { + SzConfig config = configMgr.CreateConfig(); + for (int index = 0; + properties.ContainsKey(SourceKeyPrefix + index); + index++) + { + string sourceKey = SourceKeyPrefix + index; + string source = properties[sourceKey]; + source = source.Trim(); + Console.WriteLine("Adding data source: " + source); + config.RegisterDataSource(source); + } + string snippetConfig = config.Export(); + + // register the config + configMgr.SetDefaultConfig(snippetConfig); + } + else + { + // set the default config to the initial default + configMgr.SetDefaultConfigID(defaultConfigID); + } - Console.WriteLine(); + // check if there are files we need to load + if (properties.ContainsKey(LoadKeyPrefix + 0)) + { + SzEngine engine = env.GetEngine(); + for (int index = 0; properties.ContainsKey(LoadKeyPrefix + index); index++) + { + string loadKey = LoadKeyPrefix + index; + string fileName = properties[loadKey]; + fileName = fileName.Trim(); + Console.WriteLine("Loading records from file resource: " + fileName); + Stream? stream = assembly.GetManifestResourceStream(fileName); + if (stream == null) + { + throw new ArgumentException( + "Missing resource (" + fileName + ") for load file (" + + loadKey + ") for snippet (" + snippet + ")"); + } + StreamReader rdr = new StreamReader(stream, Encoding.UTF8); + try + { + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + line = line.Trim(); + if (line.Length == 0) continue; + if (line.StartsWith('#')) continue; + JsonObject? record = JsonNode.Parse(line)?.AsObject(); + if (record == null) + { + throw new JsonException("Failed to parse line as JSON: " + line); + } + string dataSource = record.ContainsKey(DataSource) + ? record[DataSource]?.GetValue() ?? TestSource : TestSource; + string? recordID = record.ContainsKey(RecordID) + ? record[RecordID]?.GetValue() : null; + engine.AddRecord(dataSource, recordID, line, SzNoFlags); + } + } + finally + { + rdr.Close(); + stream.Close(); + } + + } + } + + } + catch (SzException e) + { + Console.Error.WriteLine(e); + Environment.Exit(1); + return; + } + finally + { + env.Destroy(); + } + long duration = stopwatch.ElapsedMilliseconds; + Console.WriteLine("Prepared repository for " + snippet + ". (" + duration + "ms)"); + + ExecuteSnippet(snippet, snippetPath, installLocations, settings, properties); + } + + Console.WriteLine(); } catch (Exception e) { - Console.Error.WriteLine(e); - Environment.Exit(1); - throw; + Console.Error.WriteLine(e); + Environment.Exit(1); + throw; } static void LoadProperties(IDictionary properties, String resourceName) { - Assembly assembly = Assembly.GetExecutingAssembly(); - Stream? stream = assembly.GetManifestResourceStream(resourceName); - if (stream != null) - { - StreamReader rdr = new StreamReader(stream, Encoding.UTF8); - try + Assembly assembly = Assembly.GetExecutingAssembly(); + Stream? stream = assembly.GetManifestResourceStream(resourceName); + if (stream != null) { - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - if (line.Trim().Length == 0) continue; - if (line.StartsWith('#')) continue; - if (line.StartsWith('!')) continue; - int index = line.IndexOf('=', Ordinal); - if (index < 1) continue; - string key = line.Substring(0, index).Trim(); - string value = ""; - if (index < line.Length - 1) + StreamReader rdr = new StreamReader(stream, Encoding.UTF8); + try { - value = line.Substring(index + 1); + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + if (line.Trim().Length == 0) continue; + if (line.StartsWith('#')) continue; + if (line.StartsWith('!')) continue; + int index = line.IndexOf('=', Ordinal); + if (index < 1) continue; + string key = line.Substring(0, index).Trim(); + string value = ""; + if (index < line.Length - 1) + { + value = line.Substring(index + 1); + } + value = value.Trim(); + while (value.EndsWith('\\')) + { + line = rdr.ReadLine(); + if (line == null) break; + line = line.Trim(); + value = string.Concat(value.AsSpan(0, value.Length - 1), line); + } + properties[key] = value; + } } - value = value.Trim(); - while (value.EndsWith('\\')) + finally { - line = rdr.ReadLine(); - if (line == null) break; - line = line.Trim(); - value = string.Concat(value.AsSpan(0, value.Length - 1), line); + rdr.Close(); + stream.Close(); } - properties[key] = value; - } } - finally - { - rdr.Close(); - stream.Close(); - } - } } static SortedDictionary> GetSnippetsMap(DirectoryInfo snippetDir) { - SortedDictionary> snippetsMap - = new SortedDictionary>(); + SortedDictionary> snippetsMap + = new SortedDictionary>(); - foreach (string dir in Directory.GetDirectories(snippetDir.FullName)) - { - string? group = Path.GetFileName(dir); - if (group == null) - { - continue; - } - snippetsMap.TryGetValue(group, out SortedDictionary? snippetMap); - if (snippetMap == null) + foreach (string dir in Directory.GetDirectories(snippetDir.FullName)) { - snippetMap = new SortedDictionary(); - snippetsMap.Add(group, snippetMap); - } + string? group = Path.GetFileName(dir); + if (group == null) + { + continue; + } + snippetsMap.TryGetValue(group, out SortedDictionary? snippetMap); + if (snippetMap == null) + { + snippetMap = new SortedDictionary(); + snippetsMap.Add(group, snippetMap); + } - foreach (string subdir in Directory.GetDirectories(dir)) - { - string? snippet = Path.GetFileName(subdir); - if (snippet == null) - { - continue; - } - string csprojPath = Path.Combine(subdir, snippet + ".csproj"); - if (!File.Exists(csprojPath)) - { - continue; - } - snippetMap.Add(group + "." + snippet, subdir); + foreach (string subdir in Directory.GetDirectories(dir)) + { + string? snippet = Path.GetFileName(subdir); + if (snippet == null) + { + continue; + } + string csprojPath = Path.Combine(subdir, snippet + ".csproj"); + if (!File.Exists(csprojPath)) + { + continue; + } + snippetMap.Add(group + "." + snippet, subdir); + } } - } - return snippetsMap; + return snippetsMap; } static void PrintUsage(SortedDictionary> snippetsMap) { - Assembly assembly = Assembly.GetExecutingAssembly(); - string? assemblyName = assembly.GetName().Name; - Console.Error.WriteLine($"""dotnet run --project {assemblyName} [ all | | ]*"""); - Console.Error.WriteLine(); - Console.Error.WriteLine(" - Specifying no arguments will print this message"); - Console.Error.WriteLine(" - Specifying \"all\" will run all snippets"); - Console.Error.WriteLine(" - Specifying one or more groups will run all snippets in those groups"); - Console.Error.WriteLine(" - Specifying one or more snippets will run those snippet"); - Console.Error.WriteLine(); - Console.Error.WriteLine("Examples:"); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} all"""); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} loading.LoadRecords loading.LoadViaFutures"""); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} initialization deleting loading.LoadRecords"""); - Console.Error.WriteLine(); - Console.Error.WriteLine("Snippet Group Names:"); - foreach (string group in snippetsMap.Keys) - { - Console.Error.WriteLine(" - " + group); - } - Console.Error.WriteLine(); - Console.Error.WriteLine("Snippet Names:"); - foreach (IDictionary snippetMap in snippetsMap.Values) - { - foreach (string snippet in snippetMap.Keys) + Assembly assembly = Assembly.GetExecutingAssembly(); + string? assemblyName = assembly.GetName().Name; + Console.Error.WriteLine($"""dotnet run --project {assemblyName} [ all | | ]*"""); + Console.Error.WriteLine(); + Console.Error.WriteLine(" - Specifying no arguments will print this message"); + Console.Error.WriteLine(" - Specifying \"all\" will run all snippets"); + Console.Error.WriteLine(" - Specifying one or more groups will run all snippets in those groups"); + Console.Error.WriteLine(" - Specifying one or more snippets will run those snippet"); + Console.Error.WriteLine(); + Console.Error.WriteLine("Examples:"); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} all"""); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} loading.LoadRecords loading.LoadViaFutures"""); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} initialization deleting loading.LoadRecords"""); + Console.Error.WriteLine(); + Console.Error.WriteLine("Snippet Group Names:"); + foreach (string group in snippetsMap.Keys) { - Console.Error.WriteLine(" - " + snippet); + Console.Error.WriteLine(" - " + group); } - } - Console.Error.WriteLine(); + Console.Error.WriteLine(); + Console.Error.WriteLine("Snippet Names:"); + foreach (IDictionary snippetMap in snippetsMap.Values) + { + foreach (string snippet in snippetMap.Keys) + { + Console.Error.WriteLine(" - " + snippet); + } + } + Console.Error.WriteLine(); } static void HandleWrongDirectory() { - Console.Error.WriteLine( - "Must be run from the csharp, csharp/runner or csharp/snippets directory"); - Environment.Exit(1); + Console.Error.WriteLine( + "Must be run from the csharp, csharp/runner or csharp/snippets directory"); + Environment.Exit(1); } static void SetupEnvironment(ProcessStartInfo startInfo, InstallLocations installLocations, string settings) { - System.Collections.IDictionary origEnv = Environment.GetEnvironmentVariables(); - foreach (DictionaryEntry entry in origEnv) - { - startInfo.Environment[entry.Key?.ToString() ?? ""] - = entry.Value?.ToString() ?? ""; - } - startInfo.Environment["SENZING_ENGINE_CONFIGURATION_JSON"] = settings; + System.Collections.IDictionary origEnv = Environment.GetEnvironmentVariables(); + foreach (DictionaryEntry entry in origEnv) + { + startInfo.Environment[entry.Key?.ToString() ?? ""] + = entry.Value?.ToString() ?? ""; + } + startInfo.Environment["SENZING_ENGINE_CONFIGURATION_JSON"] = settings; } static void ExecuteSnippet(string snippet, @@ -490,155 +490,155 @@ static void ExecuteSnippet(string snippet, string settings, IDictionary properties) { - ProcessStartInfo startInfo = new ProcessStartInfo( - "dotnet", - "run --project " + snippetPath); - SetupEnvironment(startInfo, senzingInstall, settings); - startInfo.WindowStyle = ProcessWindowStyle.Hidden; - startInfo.UseShellExecute = false; - startInfo.RedirectStandardInput = true; - - Console.WriteLine(); - Console.WriteLine("---------------------------------------"); - Console.WriteLine("Executing " + snippet + "..."); - Stopwatch stopWatch = Stopwatch.StartNew(); - - Process? process = Process.Start(startInfo); - if (process == null) - { - throw new ArgumentNullException("Failed to execute snippet; " + snippet); - } - - if (properties != null && properties.ContainsKey(InputKeyPrefix + 0)) - { - // sleep for 1 second to give the process a chance to start up - Thread.Sleep(1000); - for (int index = 0; - properties.ContainsKey(InputKeyPrefix + index); - index++) + ProcessStartInfo startInfo = new ProcessStartInfo( + "dotnet", + "run --project " + snippetPath); + SetupEnvironment(startInfo, senzingInstall, settings); + startInfo.WindowStyle = ProcessWindowStyle.Hidden; + startInfo.UseShellExecute = false; + startInfo.RedirectStandardInput = true; + + Console.WriteLine(); + Console.WriteLine("---------------------------------------"); + Console.WriteLine("Executing " + snippet + "..."); + Stopwatch stopWatch = Stopwatch.StartNew(); + + Process? process = Process.Start(startInfo); + if (process == null) + { + throw new ArgumentNullException("Failed to execute snippet; " + snippet); + } + + if (properties != null && properties.ContainsKey(InputKeyPrefix + 0)) { - string inputLine = properties[InputKeyPrefix + index]; - Console.WriteLine(inputLine); - Console.Out.Flush(); + // sleep for 1 second to give the process a chance to start up + Thread.Sleep(1000); + for (int index = 0; + properties.ContainsKey(InputKeyPrefix + index); + index++) + { + string inputLine = properties[InputKeyPrefix + index]; + Console.WriteLine(inputLine); + Console.Out.Flush(); - inputLine = (inputLine == null) ? "" : inputLine.Trim(); - process.StandardInput.WriteLine(inputLine); - process.StandardInput.Flush(); + inputLine = (inputLine == null) ? "" : inputLine.Trim(); + process.StandardInput.WriteLine(inputLine); + process.StandardInput.Flush(); + } } - } - int exitValue = 0; - int expectedExitValue = 0; - if (properties != null && properties.ContainsKey(DestroyAfterKey)) - { - string propValue = properties[DestroyAfterKey]; - int delay = Int32.Parse(propValue, CultureInfo.InvariantCulture); - bool exited = process.WaitForExit(delay); - if (!exited && !process.HasExited) + int exitValue = 0; + int expectedExitValue = 0; + if (properties != null && properties.ContainsKey(DestroyAfterKey)) { - expectedExitValue = (Environment.OSVersion.Platform == PlatformID.Win32NT) - ? 1 : SigtermExitCode; - Console.WriteLine(); - Console.WriteLine("Runner destroying " + snippet + " process..."); - - - ProcessStartInfo killStartInfo - = (Environment.OSVersion.Platform == PlatformID.Win32NT) - ? new ProcessStartInfo("taskkill", ["/F", "/PID", "" + process.Id]) - : new ProcessStartInfo("kill", "" + process.Id); - - startInfo.WindowStyle = ProcessWindowStyle.Hidden; - startInfo.UseShellExecute = false; - Process? killer = Process.Start(killStartInfo); - if (killer == null) - { - process.Kill(true); - process.WaitForExit(); - } - else - { - killer.WaitForExit(); + string propValue = properties[DestroyAfterKey]; + int delay = Int32.Parse(propValue, CultureInfo.InvariantCulture); + bool exited = process.WaitForExit(delay); + if (!exited && !process.HasExited) + { + expectedExitValue = (Environment.OSVersion.Platform == PlatformID.Win32NT) + ? 1 : SigtermExitCode; + Console.WriteLine(); + Console.WriteLine("Runner destroying " + snippet + " process..."); + + + ProcessStartInfo killStartInfo + = (Environment.OSVersion.Platform == PlatformID.Win32NT) + ? new ProcessStartInfo("taskkill", ["/F", "/PID", "" + process.Id]) + : new ProcessStartInfo("kill", "" + process.Id); + + startInfo.WindowStyle = ProcessWindowStyle.Hidden; + startInfo.UseShellExecute = false; + Process? killer = Process.Start(killStartInfo); + if (killer == null) + { + process.Kill(true); + process.WaitForExit(); + } + else + { + killer.WaitForExit(); + process.WaitForExit(); + } + } + exitValue = process.ExitCode; + } + else + { + // wait indefinitely for the process to terminate process.WaitForExit(); - } + exitValue = process.ExitCode; } - exitValue = process.ExitCode; - } - else - { - // wait indefinitely for the process to terminate - process.WaitForExit(); - exitValue = process.ExitCode; - } - - if (exitValue != expectedExitValue) - { - throw new Exception("Failed to execute snippet; " + snippet - + " (" + exitValue + ")"); - } - stopWatch.Stop(); - int duration = stopWatch.Elapsed.Milliseconds; - Console.WriteLine("Executed " + snippet + ". (" + duration + "ms)"); + + if (exitValue != expectedExitValue) + { + throw new Exception("Failed to execute snippet; " + snippet + + " (" + exitValue + ")"); + } + stopWatch.Stop(); + int duration = stopWatch.Elapsed.Milliseconds; + Console.WriteLine("Executed " + snippet + ". (" + duration + "ms)"); } static string SetupTempRepository(InstallLocations senzingInstall) { - DirectoryInfo? supportDir = senzingInstall.SupportDirectory; - DirectoryInfo? resourcesDir = senzingInstall.ResourceDirectory; - DirectoryInfo? templatesDir = senzingInstall.TemplatesDirectory; - DirectoryInfo? configDir = senzingInstall.ConfigDirectory; - if (supportDir == null || configDir == null - || resourcesDir == null || templatesDir == null) - { - throw new Exception( - "At least one of the required directories is missing from " - + "the installation. installLocations=[ " - + senzingInstall + " ]"); - } - - DirectoryInfo schemaDir = new DirectoryInfo( - Path.Combine(resourcesDir.FullName, "schema")); - string schemaFile = Path.Combine( - schemaDir.FullName, "szcore-schema-sqlite-create.sql"); - string configFile = Path.Combine( - templatesDir.FullName, "g2config.json"); - - // lay down the database schema - string databaseFile = Path.Combine( - Path.GetTempPath(), "G2C-" + Path.GetRandomFileName() + ".db"); - String jdbcUrl = "jdbc:sqlite:" + databaseFile; - - SqliteConnection? sqlite = null; - try - { - String connectSpec = "Data Source=" + databaseFile; - sqlite = new SqliteConnection(connectSpec); - sqlite.Open(); - SqliteCommand cmd = sqlite.CreateCommand(); - - string[] sqlLines = File.ReadAllLines(schemaFile, Encoding.UTF8); - - foreach (string sql in sqlLines) + DirectoryInfo? supportDir = senzingInstall.SupportDirectory; + DirectoryInfo? resourcesDir = senzingInstall.ResourceDirectory; + DirectoryInfo? templatesDir = senzingInstall.TemplatesDirectory; + DirectoryInfo? configDir = senzingInstall.ConfigDirectory; + if (supportDir == null || configDir == null + || resourcesDir == null || templatesDir == null) { - if (sql.Trim().Length == 0) continue; + throw new Exception( + "At least one of the required directories is missing from " + + "the installation. installLocations=[ " + + senzingInstall + " ]"); + } + + DirectoryInfo schemaDir = new DirectoryInfo( + Path.Combine(resourcesDir.FullName, "schema")); + string schemaFile = Path.Combine( + schemaDir.FullName, "szcore-schema-sqlite-create.sql"); + string configFile = Path.Combine( + templatesDir.FullName, "g2config.json"); + + // lay down the database schema + string databaseFile = Path.Combine( + Path.GetTempPath(), "G2C-" + Path.GetRandomFileName() + ".db"); + String jdbcUrl = "jdbc:sqlite:" + databaseFile; + + SqliteConnection? sqlite = null; + try + { + String connectSpec = "Data Source=" + databaseFile; + sqlite = new SqliteConnection(connectSpec); + sqlite.Open(); + SqliteCommand cmd = sqlite.CreateCommand(); + + string[] sqlLines = File.ReadAllLines(schemaFile, Encoding.UTF8); + + foreach (string sql in sqlLines) + { + if (sql.Trim().Length == 0) continue; #pragma warning disable CA2100 - cmd.CommandText = sql.Trim(); + cmd.CommandText = sql.Trim(); #pragma warning restore CA2100 - cmd.ExecuteNonQuery(); + cmd.ExecuteNonQuery(); + } } - } - finally - { - if (sqlite != null) + finally { - sqlite.Close(); + if (sqlite != null) + { + sqlite.Close(); + } } - } - - string supportPath = supportDir.FullName.Replace("\\", "\\\\", Ordinal); - string configPath = configDir.FullName.Replace("\\", "\\\\", Ordinal); - string resourcePath = resourcesDir.FullName.Replace("\\", "\\\\", Ordinal); - string baseConfig = File.ReadAllText(configFile).Replace("\\", "\\\\", Ordinal); - string databasePath = databaseFile.Replace("\\", "\\\\", Ordinal); - string settings = $$""" + + string supportPath = supportDir.FullName.Replace("\\", "\\\\", Ordinal); + string configPath = configDir.FullName.Replace("\\", "\\\\", Ordinal); + string resourcePath = resourcesDir.FullName.Replace("\\", "\\\\", Ordinal); + string baseConfig = File.ReadAllText(configFile).Replace("\\", "\\\\", Ordinal); + string databasePath = databaseFile.Replace("\\", "\\\\", Ordinal); + string settings = $$""" { "PIPELINE": { "SUPPORTPATH": "{{supportPath}}", @@ -651,21 +651,21 @@ static string SetupTempRepository(InstallLocations senzingInstall) } """.Trim(); - SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try - { - env.GetConfigManager().SetDefaultConfig(baseConfig); + SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + env.GetConfigManager().SetDefaultConfig(baseConfig); - } - catch (Exception) - { - Console.Error.WriteLine(settings); - throw; - } - finally - { - env.Destroy(); - } + } + catch (Exception) + { + Console.Error.WriteLine(settings); + throw; + } + finally + { + env.Destroy(); + } - return settings; + return settings; } diff --git a/csharp/snippets/deleting/DeleteViaFutures/Program.cs b/csharp/snippets/deleting/DeleteViaFutures/Program.cs index 792c048..789a5f5 100644 --- a/csharp/snippets/deleting/DeleteViaFutures/Program.cs +++ b/csharp/snippets/deleting/DeleteViaFutures/Program.cs @@ -18,8 +18,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -56,225 +56,225 @@ TaskScheduler taskScheduler try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - int lineNumber = 0; - bool eof = false; + int lineNumber = 0; + bool eof = false; - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) + while (!eof) { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the DeleteRecord() function with no flags + engine.DeleteRecord(dataSourceCode, recordID); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } } - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); - Task task = factory.StartNew(() => + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) { - // call the DeleteRecord() function with no flags - engine.DeleteRecord(dataSourceCode, recordID); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); } - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - HandlePendingFutures(pendingFutures, true); + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + HandlePendingFutures(pendingFutures, true); } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Successful delete operations : " + successCount); - Console.WriteLine("Failed delete operations : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " deletions to be retried in " + retryFile); - } - Console.Out.Flush(); + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Successful delete operations : " + successCount); + Console.WriteLine("Failed delete operations : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " deletions to be retried in " + retryFile); + } + Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Record)> pendingFutures, bool blocking) { - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; - // remove the pending future from the list - pendingFutures.RemoveAt(index--); + // remove the pending future from the list + pendingFutures.RemoveAt(index--); - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) + try { - task.Wait(); - } + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + successCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } - // if we get here then increment the success count - successCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; } - else + catch (SzBadInputException e) { - throw; - } - } + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } - } } /// @@ -292,46 +292,46 @@ static void LogFailedRecord(string errorType, int lineNumber, string recordJson) { - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string DefaultFilePath = "../../resources/data/del-500.jsonl"; + private const string DefaultFilePath = "../../resources/data/del-500.jsonl"; - private const string RetryPrefix = "retry-"; + private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; + private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; + private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; + private const string RecordID = "RECORD_ID"; - private const int ThreadCount = 8; + private const int ThreadCount = 8; - private const int BacklogFactor = 10; + private const int BacklogFactor = 10; - private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int MaximumBacklog = ThreadCount * BacklogFactor; - private const int PauseTimeout = 100; + private const int PauseTimeout = 100; - private const string Error = "ERROR"; + private const string Error = "ERROR"; - private const string Warning = "WARNING"; + private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string Critical = "CRITICAL"; - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/loading/LoadRecords/Program.cs b/csharp/snippets/loading/LoadRecords/Program.cs index d4d872b..28a648e 100644 --- a/csharp/snippets/loading/LoadRecords/Program.cs +++ b/csharp/snippets/loading/LoadRecords/Program.cs @@ -11,8 +11,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -28,44 +28,44 @@ try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in GetRecords()) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in GetRecords()) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); - } + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); + } } catch (SzException e) { - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); - throw; + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); + throw; } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); } /// @@ -78,12 +78,12 @@ /// static IDictionary<(string, string), string> GetRecords() { - IDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); + IDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); - records.Add( - ("TEST", "1001"), - """ + records.Add( + ("TEST", "1001"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1001", @@ -99,9 +99,9 @@ } """); - records.Add( - ("TEST", "1002"), - """ + records.Add( + ("TEST", "1002"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1002", @@ -120,9 +120,9 @@ } """); - records.Add( - ("TEST", "1003"), - """ + records.Add( + ("TEST", "1003"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1003", @@ -135,9 +135,9 @@ } """); - records.Add( - ("TEST", "1004"), - """ + records.Add( + ("TEST", "1004"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1004", @@ -153,9 +153,9 @@ } """); - records.Add( - ("TEST", "1005"), - """ + records.Add( + ("TEST", "1005"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1005", @@ -173,5 +173,5 @@ } """); - return records; + return records; } diff --git a/csharp/snippets/loading/LoadViaFutures/Program.cs b/csharp/snippets/loading/LoadViaFutures/Program.cs index f8894e1..1f4eac3 100644 --- a/csharp/snippets/loading/LoadViaFutures/Program.cs +++ b/csharp/snippets/loading/LoadViaFutures/Program.cs @@ -18,8 +18,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -56,227 +56,227 @@ TaskScheduler taskScheduler try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - int lineNumber = 0; - bool eof = false; + int lineNumber = 0; + bool eof = false; - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) + while (!eof) { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, record.Line); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } } - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); - Task task = factory.StartNew(() => + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) { - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, record.Line); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); } - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - HandlePendingFutures(pendingFutures, true); + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + HandlePendingFutures(pendingFutures, true); } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // close the reader - rdr.Close(); - - // close the file stream - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } - Console.Out.Flush(); + // close the reader + rdr.Close(); + + // close the file stream + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } + Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Record)> pendingFutures, bool blocking) { - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; - // remove the pending future from the list - pendingFutures.RemoveAt(index--); + // remove the pending future from the list + pendingFutures.RemoveAt(index--); - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) + try { - task.Wait(); - } + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + successCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } - // if we get here then increment the success count - successCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; } - else + catch (SzBadInputException e) { - throw; - } - } + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } - } } /// @@ -294,46 +294,46 @@ static void LogFailedRecord(string errorType, int lineNumber, string recordJson) { - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - private const string RetryPrefix = "retry-"; + private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; + private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; + private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; + private const string RecordID = "RECORD_ID"; - private const int ThreadCount = 8; + private const int ThreadCount = 8; - private const int BacklogFactor = 10; + private const int BacklogFactor = 10; - private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int MaximumBacklog = ThreadCount * BacklogFactor; - private const int PauseTimeout = 100; + private const int PauseTimeout = 100; - private const string Error = "ERROR"; + private const string Error = "ERROR"; - private const string Warning = "WARNING"; + private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string Critical = "CRITICAL"; - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs b/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs index 56030bc..c25a32b 100644 --- a/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs +++ b/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs @@ -18,8 +18,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -57,128 +57,128 @@ TaskScheduler taskScheduler try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - int lineNumber = 0; - bool eof = false; + int lineNumber = 0; + bool eof = false; - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) + while (!eof) { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the addRecord() function with info flags + return engine.AddRecord( + dataSourceCode, recordID, record.Line, SzWithInfo); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } } - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(engine, pendingFutures, false); - Task task = factory.StartNew(() => + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) { - // call the addRecord() function with info flags - return engine.AddRecord( - dataSourceCode, recordID, record.Line, SzWithInfo); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); } - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(engine, pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - HandlePendingFutures(engine, pendingFutures, true); + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + HandlePendingFutures(engine, pendingFutures, true); } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Total entities created : " + entityIDSet.Count); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } - Console.Out.Flush(); + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Total entities created : " + entityIDSet.Count); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } + Console.Out.Flush(); } @@ -186,99 +186,99 @@ static void HandlePendingFutures(SzEngine engine, IList<(Task, Record)> pendingFutures, bool blocking) { - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; - // remove the pending future from the list - pendingFutures.RemoveAt(index--); + // remove the pending future from the list + pendingFutures.RemoveAt(index--); - try - { - try - { - // this will block if the task is not yet completed, - // however we only get here with a pending task if - // the blocking parameter is true - string info = task.Result; - - // if we get here then increment the success count - successCount++; - - // process the info - ProcessInfo(engine, info); - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) + try { - // get the inner exception - throw e.InnerException; + try + { + // this will block if the task is not yet completed, + // however we only get here with a pending task if + // the blocking parameter is true + string info = task.Result; + + // if we get here then increment the success count + successCount++; + + // process the info + ProcessInfo(engine, info); + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + } - else + catch (SzBadInputException e) { - throw; - } - } + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } - } } /// @@ -296,39 +296,39 @@ static void HandlePendingFutures(SzEngine engine, /// The info message static void ProcessInfo(SzEngine engine, string info) { - JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); - if (jsonObject == null) return; - if (!jsonObject.ContainsKey(AffectedEntities)) return; - - JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); - if (affectedArr == null) return; + JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); + if (jsonObject == null) return; + if (!jsonObject.ContainsKey(AffectedEntities)) return; - for (int index = 0; index < affectedArr.Count; index++) - { - JsonObject? affected = affectedArr[index]?.AsObject(); - long entityID = affected?[EntityID]?.GetValue() ?? 0L; - if (entityID == 0L) continue; + JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); + if (affectedArr == null) return; - try + for (int index = 0; index < affectedArr.Count; index++) { - engine.GetEntity(entityID, null); - entityIDSet.Add(entityID); + JsonObject? affected = affectedArr[index]?.AsObject(); + long entityID = affected?[EntityID]?.GetValue() ?? 0L; + if (entityID == 0L) continue; - } - catch (SzNotFoundException) - { - entityIDSet.Remove(entityID); + try + { + engine.GetEntity(entityID, null); + entityIDSet.Add(entityID); + } + catch (SzNotFoundException) + { + entityIDSet.Remove(entityID); + + } + catch (SzException e) + { + // simply log the exception, do not rethrow + Console.Error.WriteLine(); + Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); + Console.Error.WriteLine(e); + Console.Error.Flush(); + } } - catch (SzException e) - { - // simply log the exception, do not rethrow - Console.Error.WriteLine(); - Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); - Console.Error.WriteLine(e); - Console.Error.Flush(); - } - } } /// @@ -346,56 +346,56 @@ static void LogFailedRecord(string errorType, int lineNumber, string recordJson) { - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - private const string RetryPrefix = "retry-"; + private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; + private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; + private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; + private const string RecordID = "RECORD_ID"; - private const string AffectedEntities = "AFFECTED_ENTITIES"; + private const string AffectedEntities = "AFFECTED_ENTITIES"; - private const string EntityID = "ENTITY_ID"; + private const string EntityID = "ENTITY_ID"; - private const int ThreadCount = 8; + private const int ThreadCount = 8; - private const int BacklogFactor = 10; + private const int BacklogFactor = 10; - private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int MaximumBacklog = ThreadCount * BacklogFactor; - private const int PauseTimeout = 100; + private const int PauseTimeout = 100; - private const string Error = "ERROR"; + private const string Error = "ERROR"; - private const string Warning = "WARNING"; + private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string Critical = "CRITICAL"; - private static int errorCount; + private static int errorCount; - private static int successCount; + private static int successCount; - private static int retryCount; + private static int retryCount; - private static FileInfo? retryFile; + private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + private static StreamWriter? retryWriter; - private static readonly ISet entityIDSet = new HashSet(); + private static readonly ISet entityIDSet = new HashSet(); } internal sealed record Record(int LineNumber, string Line) { } diff --git a/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs b/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs index 8db6944..5249a1a 100644 --- a/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs +++ b/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs @@ -14,8 +14,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -37,142 +37,142 @@ StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - int lineNumber = 0; + int lineNumber = 0; - // loop through the example records and add them to the repository - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - // increment the line number - lineNumber++; + // loop through the example records and add them to the repository + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + // increment the line number + lineNumber++; - // trim the line - line = line.Trim(); + // trim the line + line = line.Trim(); - // skip any blank lines - if (line.Length == 0) continue; + // skip any blank lines + if (line.Length == 0) continue; - // skip any commented lines - if (line.StartsWith('#')) continue; + // skip any commented lines + if (line.StartsWith('#')) continue; - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); - - successCount++; - - // check if it is time obtain stats - if ((successCount % StatsInterval) == 0) - { try { - string stats = engine.GetStats(); - if (stats.Length > StatsTruncate) - { - stats = string.Concat(stats.AsSpan(0, StatsTruncate), " ..."); - } - Console.WriteLine("* STATS: " + stats); + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); + + successCount++; + + // check if it is time obtain stats + if ((successCount % StatsInterval) == 0) + { + try + { + string stats = engine.GetStats(); + if (stats.Length > StatsTruncate) + { + stats = string.Concat(stats.AsSpan(0, StatsTruncate), " ..."); + } + Console.WriteLine("* STATS: " + stats); + + } + catch (SzException e) + { + // trap the stats exception so it is not misinterpreted + // as an exception from engine.addRecord() + Console.WriteLine("**** FAILED TO OBTAIN STATS: " + e); + } + } } - catch (SzException e) + catch (SzBadInputException e) { - // trap the stats exception so it is not misinterpreted - // as an exception from engine.addRecord() - Console.WriteLine("**** FAILED TO OBTAIN STATS: " + e); - } - } - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count - } - catch (SzRetryableException e) - { - LogFailedRecord(Warning, e, lineNumber, line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(line); - } + } + catch (SzRetryableException e) + { + LogFailedRecord(Warning, e, lineNumber, line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(line); + } + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, lineNumber, line); + errorCount++; + throw; // rethrow since exception is critical + } } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, lineNumber, line); - errorCount++; - throw; // rethrow since exception is critical - } - } } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - rdr.Close(); - - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } - Console.Out.Flush(); + rdr.Close(); + + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } + Console.Out.Flush(); } /// @@ -190,40 +190,40 @@ static void LogFailedRecord(string errorType, int lineNumber, string recordJson) { - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - private const string RetryPrefix = "retry-"; + private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; + private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; + private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; + private const string RecordID = "RECORD_ID"; - private const string Error = "ERROR"; + private const string Error = "ERROR"; - private const string Warning = "WARNING"; + private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string Critical = "CRITICAL"; - private const int StatsInterval = 100; + private const int StatsInterval = 100; - private const int StatsTruncate = 70; + private const int StatsTruncate = 70; - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; } \ No newline at end of file diff --git a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs index ef294ee..6859175 100644 --- a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs +++ b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs @@ -14,8 +14,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -31,144 +31,144 @@ try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - // loop through the input files - foreach (string filePath in InputFiles) - { - FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); - - StreamReader rdr = new StreamReader(fs, Encoding.UTF8); - - try + // loop through the input files + foreach (string filePath in InputFiles) { - int lineNumber = 0; - // loop through the example records and add them to the repository - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - // increment the line number - lineNumber++; - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; + FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); - // skip any commented lines - if (line.StartsWith('#')) continue; + StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - // call the addRecord() function with info flags - engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); - - successCount++; + int lineNumber = 0; + // loop through the example records and add them to the repository + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + // increment the line number + lineNumber++; + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + // call the addRecord() function with info flags + engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); + + successCount++; + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, filePath, lineNumber, line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + LogFailedRecord(Warning, e, filePath, lineNumber, line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + TrackRetryRecord(line); + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, filePath, lineNumber, line); + errorCount++; + throw; // rethrow since exception is critical + } + } } - catch (SzBadInputException e) + finally + { + rdr.Close(); + fs.Close(); + } + } + + // now that we have loaded the records, check for redos and handle them + for (string redo = engine.GetRedoRecord(); + redo != null; + redo = engine.GetRedoRecord()) + { + try { - LogFailedRecord(Error, e, filePath, lineNumber, line); - errorCount++; // increment the error count + // process the redo record + engine.ProcessRedoRecord(redo, SzNoFlags); + + // increment the redone count + redoneCount++; } catch (SzRetryableException e) { - LogFailedRecord(Warning, e, filePath, lineNumber, line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - TrackRetryRecord(line); + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); } catch (Exception e) { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, filePath, lineNumber, line); - errorCount++; - throw; // rethrow since exception is critical + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; } - } - } - finally - { - rdr.Close(); - fs.Close(); - } - } - - // now that we have loaded the records, check for redos and handle them - for (string redo = engine.GetRedoRecord(); - redo != null; - redo = engine.GetRedoRecord()) - { - try - { - // process the redo record - engine.ProcessRedoRecord(redo, SzNoFlags); - - // increment the redone count - redoneCount++; - - } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - } - catch (Exception e) - { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; - } - } } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } - Console.Out.Flush(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } + Console.Out.Flush(); } /// @@ -180,24 +180,24 @@ /// static void TrackRetryRecord(string recordJson) { - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); + } } /// @@ -217,15 +217,15 @@ static void LogFailedRecord(string errorType, int lineNumber, string recordJson) { - string fileName = Path.GetFileName(filePath); - - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD IN " + fileName - + " AT LINE " + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + string fileName = Path.GetFileName(filePath); + + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD IN " + fileName + + " AT LINE " + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } /// @@ -239,35 +239,35 @@ static void LogFailedRedo(string errorType, Exception exception, string redoRecord) { - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private static readonly IList InputFiles = new ReadOnlyCollection( - new string[] { + private static readonly IList InputFiles = new ReadOnlyCollection( + new string[] { "../../resources/data/truthset/customers.jsonl", "../../resources/data/truthset/reference.jsonl", "../../resources/data/truthset/watchlist.jsonl" - }); - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; - private const string Error = "ERROR"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - - // setup some class-wide variables - private static int errorCount; - private static int successCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + }); + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string DataSource = "DATA_SOURCE"; + private const string RecordID = "RECORD_ID"; + private const string Error = "ERROR"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + + // setup some class-wide variables + private static int errorCount; + private static int successCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/redo/RedoContinuous/Program.cs b/csharp/snippets/redo/RedoContinuous/Program.cs index efedff1..2f43e33 100644 --- a/csharp/snippets/redo/RedoContinuous/Program.cs +++ b/csharp/snippets/redo/RedoContinuous/Program.cs @@ -15,8 +15,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -32,102 +32,102 @@ AppDomain.CurrentDomain.ProcessExit += (s, e) => { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - OutputRedoStatistics(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + OutputRedoStatistics(); }; try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - while (true) - { - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; - } - - try - { - // process the redo record - engine.ProcessRedoRecord(redo, SzNoFlags); - - // increment the redone count - redoneCount++; - - } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - - } - catch (Exception e) + while (true) { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; + // get the next redo record + string redo = engine.GetRedoRecord(); + + // check if no redo records are available + if (redo == null) + { + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; + } + + try + { + // process the redo record + engine.ProcessRedoRecord(redo, SzNoFlags); + + // increment the redone count + redoneCount++; + + } + catch (SzRetryableException e) + { + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); + + } + catch (Exception e) + { + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; + } } - } } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void OutputRedoStatistics() { - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } - Console.Out.Flush(); + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } + Console.Out.Flush(); } /// @@ -139,24 +139,24 @@ static void OutputRedoStatistics() /// static void TrackRetryRecord(string recordJson) { - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); + } } /// @@ -170,30 +170,30 @@ static void LogFailedRedo(string errorType, Exception exception, string redoRecord) { - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string RedoPauseDescription = "30 seconds"; + private const string RedoPauseDescription = "30 seconds"; - private const int RedoPauseTimeout = 30000; + private const int RedoPauseTimeout = 30000; - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; } #pragma warning restore CA1303 // Do not pass literals as localized parameters (example messages) diff --git a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs index ec404f5..5d34686 100644 --- a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs +++ b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs @@ -17,8 +17,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -49,186 +49,186 @@ TaskScheduler taskScheduler AppDomain.CurrentDomain.ProcessExit += (s, e) => { #pragma warning disable CA1031 // Need to catch all exceptions here - try - { - HandlePendingFutures(pendingFutures, true); - } - catch (Exception exception) - { - Console.Error.WriteLine(exception); - } + try + { + HandlePendingFutures(pendingFutures, true); + } + catch (Exception exception) + { + Console.Error.WriteLine(exception); + } #pragma warning restore CA1031 // Need to catch all exceptions here - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - OutputRedoStatistics(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + OutputRedoStatistics(); }; try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - while (true) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { + // get the engine from the environment + SzEngine engine = env.GetEngine(); - // get the next redo record - string redo = engine.GetRedoRecord(); + while (true) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { - // check if no redo records are available - if (redo == null) break; + // get the next redo record + string redo = engine.GetRedoRecord(); - Task task = factory.StartNew(() => - { - engine.ProcessRedoRecord(redo, SzNoFlags); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); + // check if no redo records are available + if (redo == null) break; - // add the future to the pending future list - pendingFutures.Add((task, redo)); - } + Task task = factory.StartNew(() => + { + engine.ProcessRedoRecord(redo, SzNoFlags); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); + // add the future to the pending future list + pendingFutures.Add((task, redo)); + } - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - try + do { - Thread.Sleep(HandlePauseTimeout); - - } - catch (ThreadInterruptedException) + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + try + { + Thread.Sleep(HandlePauseTimeout); + + } + catch (ThreadInterruptedException) + { + // do nothing + } + } + } while (pendingFutures.Count >= MaximumBacklog); + + // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero + if (engine.CountRedoRecords() == 0) { - // do nothing + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; } - } - } while (pendingFutures.Count >= MaximumBacklog); - - // check if there are no redo records right now - // NOTE: we do NOT want to call countRedoRecords() in a loop that - // is processing redo records, we call it here AFTER we believe - // have processed all pending redos to confirm still zero - if (engine.CountRedoRecords() == 0) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; } - } } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void HandlePendingFutures(IList<(Task, string)> pendingFutures, bool blocking) { - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, string redoRecord) = pendingFutures[index]; + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, string redoRecord) = pendingFutures[index]; - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; - // remove the pending future from the list - pendingFutures.RemoveAt(index--); + // remove the pending future from the list + pendingFutures.RemoveAt(index--); - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) + try { - task.Wait(); - } + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + redoneCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } - // if we get here then increment the success count - redoneCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) + } + catch (SzRetryableException e) { - // get the inner exception - throw e.InnerException; + // handle thread interruption and cancellation as retries + LogFailedRedo(Warning, e, redoRecord); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + TrackRetryRecord(redoRecord); } - else + catch (Exception e) { - throw; + // catch any other exception (incl. SzException) here + LogFailedRedo(Critical, e, redoRecord); + errorCount++; + throw; // rethrow since exception is critical } - } - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRedo(Warning, e, redoRecord); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - TrackRetryRecord(redoRecord); } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRedo(Critical, e, redoRecord); - errorCount++; - throw; // rethrow since exception is critical - } - } } /// @@ -240,24 +240,24 @@ static void HandlePendingFutures(IList<(Task, string)> pendingFutures, bool bloc /// static void TrackRetryRecord(string recordJson) { - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); + } } /// @@ -271,58 +271,58 @@ static void LogFailedRedo(string errorType, Exception exception, string redoRecord) { - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } static void OutputRedoStatistics() { - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } - Console.Out.Flush(); + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } + Console.Out.Flush(); } public partial class Program { - private const string RedoPauseDescription = "30 seconds"; + private const string RedoPauseDescription = "30 seconds"; - private const int RedoPauseTimeout = 30000; + private const int RedoPauseTimeout = 30000; - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; - private const int ThreadCount = 8; + private const int ThreadCount = 8; - private const int BacklogFactor = 10; + private const int BacklogFactor = 10; - private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int MaximumBacklog = ThreadCount * BacklogFactor; - private const int HandlePauseTimeout = 100; + private const int HandlePauseTimeout = 100; } #pragma warning restore CA1303 // Do not pass literals as localized parameters (example messages) diff --git a/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs b/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs index fd6f98d..3c6691f 100644 --- a/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs +++ b/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs @@ -15,8 +15,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -32,106 +32,106 @@ AppDomain.CurrentDomain.ProcessExit += (s, e) => { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - OutputRedoStatistics(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + OutputRedoStatistics(); }; try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - while (true) - { - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; - } - - try + while (true) { - // process the redo record - string info = engine.ProcessRedoRecord(redo, SzWithInfo); - - // increment the redone count - redoneCount++; - - // process the info - ProcessInfo(engine, info); - + // get the next redo record + string redo = engine.GetRedoRecord(); + + // check if no redo records are available + if (redo == null) + { + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; + } + + try + { + // process the redo record + string info = engine.ProcessRedoRecord(redo, SzWithInfo); + + // increment the redone count + redoneCount++; + + // process the info + ProcessInfo(engine, info); + + } + catch (SzRetryableException e) + { + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); + + } + catch (Exception e) + { + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; + } } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - - } - catch (Exception e) - { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; - } - } } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void OutputRedoStatistics() { - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total entities affected : " + entityIDSet.Count); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } - Console.Out.Flush(); + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total entities affected : " + entityIDSet.Count); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } + Console.Out.Flush(); } /// @@ -143,24 +143,24 @@ static void OutputRedoStatistics() /// static void TrackRetryRecord(string recordJson) { - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); + } } /// @@ -178,37 +178,37 @@ static void TrackRetryRecord(string recordJson) /// The info message static void ProcessInfo(SzEngine engine, string info) { - JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); - if (jsonObject == null) return; - if (!jsonObject.ContainsKey(AffectedEntities)) return; - - JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); - if (affectedArr == null) return; + JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); + if (jsonObject == null) return; + if (!jsonObject.ContainsKey(AffectedEntities)) return; - for (int index = 0; index < affectedArr.Count; index++) - { - JsonObject? affected = affectedArr[index]?.AsObject(); - long entityID = affected?[EntityID]?.GetValue() ?? 0L; - if (entityID == 0L) continue; + JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); + if (affectedArr == null) return; - try - { - engine.GetEntity(entityID, null); - entityIDSet.Add(entityID); - } - catch (SzNotFoundException) - { - entityIDSet.Remove(entityID); - } - catch (SzException e) + for (int index = 0; index < affectedArr.Count; index++) { - // simply log the exception, do not rethrow - Console.Error.WriteLine(); - Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); - Console.Error.WriteLine(e); - Console.Error.Flush(); + JsonObject? affected = affectedArr[index]?.AsObject(); + long entityID = affected?[EntityID]?.GetValue() ?? 0L; + if (entityID == 0L) continue; + + try + { + engine.GetEntity(entityID, null); + entityIDSet.Add(entityID); + } + catch (SzNotFoundException) + { + entityIDSet.Remove(entityID); + } + catch (SzException e) + { + // simply log the exception, do not rethrow + Console.Error.WriteLine(); + Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); + Console.Error.WriteLine(e); + Console.Error.Flush(); + } } - } } /// @@ -222,33 +222,33 @@ static void LogFailedRedo(string errorType, Exception exception, string redoRecord) { - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string RedoPauseDescription = "30 seconds"; - - private const int RedoPauseTimeout = 30000; - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - private const string AffectedEntities = "AFFECTED_ENTITIES"; - private const string EntityID = "ENTITY_ID"; - - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; - private static readonly ISet entityIDSet = new HashSet(); + private const string RedoPauseDescription = "30 seconds"; + + private const int RedoPauseTimeout = 30000; + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + private const string AffectedEntities = "AFFECTED_ENTITIES"; + private const string EntityID = "ENTITY_ID"; + + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; + private static readonly ISet entityIDSet = new HashSet(); } diff --git a/csharp/snippets/searching/SearchRecords/Program.cs b/csharp/snippets/searching/SearchRecords/Program.cs index 5c56532..69d3a73 100644 --- a/csharp/snippets/searching/SearchRecords/Program.cs +++ b/csharp/snippets/searching/SearchRecords/Program.cs @@ -13,8 +13,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -30,67 +30,67 @@ try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - // loop through the example records and add them to the repository - foreach (string criteria in GetSearchCriteria()) - { - // call the searchByAttributes() function with default flags - string result = engine.SearchByAttributes( - criteria, SzSearchByAttributesDefaultFlags); + // loop through the example records and add them to the repository + foreach (string criteria in GetSearchCriteria()) + { + // call the searchByAttributes() function with default flags + string result = engine.SearchByAttributes( + criteria, SzSearchByAttributesDefaultFlags); - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - Console.WriteLine(); - JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); - if (jsonArr == null || jsonArr.Count == 0) - { - Console.WriteLine("No results for criteria: " + criteria); - } - else - { - Console.WriteLine("Results for criteria: " + criteria); - for (int index = 0; index < jsonArr.Count; index++) - { - JsonObject? obj = jsonArr[index]?.AsObject(); - obj = obj?["ENTITY"]?.AsObject(); - obj = obj?["RESOLVED_ENTITY"]?.AsObject(); - if (obj == null) + Console.WriteLine(); + JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); + if (jsonArr == null || jsonArr.Count == 0) + { + Console.WriteLine("No results for criteria: " + criteria); + } + else { - throw new JsonException("Unexpected result format: " + result); + Console.WriteLine("Results for criteria: " + criteria); + for (int index = 0; index < jsonArr.Count; index++) + { + JsonObject? obj = jsonArr[index]?.AsObject(); + obj = obj?["ENTITY"]?.AsObject(); + obj = obj?["RESOLVED_ENTITY"]?.AsObject(); + if (obj == null) + { + throw new JsonException("Unexpected result format: " + result); + } + long? entityID = obj["ENTITY_ID"]?.GetValue(); + string? name = obj["ENTITY_NAME"]?.GetValue(); + Console.WriteLine(entityID + ": " + name); + } } - long? entityID = obj["ENTITY_ID"]?.GetValue(); - string? name = obj["ENTITY_NAME"]?.GetValue(); - Console.WriteLine(entityID + ": " + name); - } + Console.Out.Flush(); } - Console.Out.Flush(); - } } catch (SzException e) { - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); - throw; + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); + throw; } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); } /// @@ -103,9 +103,9 @@ /// static IList GetSearchCriteria() { - IList records = new List(); - records.Add( - """ + IList records = new List(); + records.Add( + """ { "NAME_FULL": "Susan Moony", "DATE_OF_BIRTH": "15/6/1998", @@ -113,8 +113,8 @@ static IList GetSearchCriteria() } """); - records.Add( - """ + records.Add( + """ { "NAME_FIRST": "Robert", "NAME_LAST": "Smith", @@ -122,8 +122,8 @@ static IList GetSearchCriteria() } """); - records.Add( - """ + records.Add( + """ { "NAME_FIRST": "Makio", "NAME_LAST": "Yamanaka", @@ -131,5 +131,5 @@ static IList GetSearchCriteria() } """); - return records; + return records; } \ No newline at end of file diff --git a/csharp/snippets/searching/SearchViaFutures/Program.cs b/csharp/snippets/searching/SearchViaFutures/Program.cs index fb77272..e824eaa 100644 --- a/csharp/snippets/searching/SearchViaFutures/Program.cs +++ b/csharp/snippets/searching/SearchViaFutures/Program.cs @@ -18,8 +18,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -56,228 +56,228 @@ TaskScheduler taskScheduler StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - int lineNumber = 0; - bool eof = false; + int lineNumber = 0; + bool eof = false; - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) + while (!eof) { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; - // check for EOF - if (line == null) - { - eof = true; - break; - } + // skip any commented lines + if (line.StartsWith('#')) continue; - // trim the line - line = line.Trim(); + // construct the Record instance + Criteria criteria = new Criteria(lineNumber, line); - // skip any blank lines - if (line.Length == 0) continue; + try + { + Task task = factory.StartNew(() => + { + // call the addRecord() function with no flags + return engine.SearchByAttributes( + criteria.Line, SzSearchByAttributesDefaultFlags); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, criteria)); - // skip any commented lines - if (line.StartsWith('#')) continue; + } + catch (SzBadInputException e) + { + LogFailedSearch(Error, e, lineNumber, line); + errorCount++; // increment the error count + } + } - // construct the Record instance - Criteria criteria = new Criteria(lineNumber, line); + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); - try - { - Task task = factory.StartNew(() => + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) { - // call the addRecord() function with no flags - return engine.SearchByAttributes( - criteria.Line, SzSearchByAttributesDefaultFlags); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, criteria)); - - } - catch (SzBadInputException e) - { - LogFailedSearch(Error, e, lineNumber, line); - errorCount++; // increment the error count - } + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); } - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future - HandlePendingFutures(pendingFutures, true); + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future + HandlePendingFutures(pendingFutures, true); } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Searches successfully completed : " + successCount); - Console.WriteLine("Total entities found via searches : " + foundEntities.Count); - Console.WriteLine("Searches failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " searches to be retried in " + retryFile); - } - Console.Out.Flush(); + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Searches successfully completed : " + successCount); + Console.WriteLine("Total entities found via searches : " + foundEntities.Count); + Console.WriteLine("Searches failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " searches to be retried in " + retryFile); + } + Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Criteria)> pendingFutures, bool blocking) { - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Criteria criteria) = pendingFutures[index]; + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Criteria criteria) = pendingFutures[index]; - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; - // remove the pending future from the list - pendingFutures.RemoveAt(index--); + // remove the pending future from the list + pendingFutures.RemoveAt(index--); - try - { - try - { - // this will block if the task is not yet completed, - // however we only get here with a pending task if - // the blocking parameter is true - string results = task.Result; - - // if we get here then increment the success count - successCount++; - - // parse the search results - JsonObject? jsonObj = JsonNode.Parse(results)?.AsObject(); - JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); - if (jsonArr != null) + try { - for (int index2 = 0; index2 < jsonArr.Count; index2++) - { - JsonObject? obj = jsonArr[index2]?.AsObject(); - obj = obj?["ENTITY"]?.AsObject(); - obj = obj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = obj?["ENTITY_ID"]?.GetValue(); - if (entityID != null) + try + { + // this will block if the task is not yet completed, + // however we only get here with a pending task if + // the blocking parameter is true + string results = task.Result; + + // if we get here then increment the success count + successCount++; + + // parse the search results + JsonObject? jsonObj = JsonNode.Parse(results)?.AsObject(); + JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); + if (jsonArr != null) + { + for (int index2 = 0; index2 < jsonArr.Count; index2++) + { + JsonObject? obj = jsonArr[index2]?.AsObject(); + obj = obj?["ENTITY"]?.AsObject(); + obj = obj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = obj?["ENTITY_ID"]?.GetValue(); + if (entityID != null) + { + foundEntities.Add(entityID ?? 0L); + } + } + } + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) { - foundEntities.Add(entityID ?? 0L); + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } } - } - } - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; } - else + catch (SzBadInputException e) { - throw; - } - } + LogFailedSearch(Error, e, criteria.LineNumber, criteria.Line); + errorCount++; // increment the error count - } - catch (SzBadInputException e) - { - LogFailedSearch(Error, e, criteria.LineNumber, criteria.Line); - errorCount++; // increment the error count + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedSearch(Warning, e, criteria.LineNumber, criteria.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedSearch(Warning, e, criteria.LineNumber, criteria.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(criteria.Line); - } + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(criteria.Line); + } + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedSearch(Critical, e, criteria.LineNumber, criteria.Line); + errorCount++; + throw; // rethrow since exception is critical + } } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedSearch(Critical, e, criteria.LineNumber, criteria.Line); - errorCount++; - throw; // rethrow since exception is critical - } - } } /// @@ -295,44 +295,44 @@ static void LogFailedSearch(string errorType, int lineNumber, string criteriaJson) { - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(criteriaJson); - Console.Error.WriteLine(exception); - Console.Error.Flush(); + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(criteriaJson); + Console.Error.WriteLine(exception); + Console.Error.Flush(); } public partial class Program { - private const string DefaultFilePath = "../../resources/data/search-5K.jsonl"; + private const string DefaultFilePath = "../../resources/data/search-5K.jsonl"; - private const string RetryPrefix = "retry-"; + private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; + private const string RetrySuffix = ".jsonl"; - private const int ThreadCount = 8; + private const int ThreadCount = 8; - private const int BacklogFactor = 10; + private const int BacklogFactor = 10; - private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int MaximumBacklog = ThreadCount * BacklogFactor; - private const int PauseTimeout = 100; + private const int PauseTimeout = 100; - private const string Error = "ERROR"; + private const string Error = "ERROR"; - private const string Warning = "WARNING"; + private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; + private const string Critical = "CRITICAL"; - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; - private static readonly HashSet foundEntities = new HashSet(); + private static readonly HashSet foundEntities = new HashSet(); } internal sealed record Criteria(int LineNumber, string Line) { } diff --git a/csharp/snippets/stewardship/ForceResolve/Program.cs b/csharp/snippets/stewardship/ForceResolve/Program.cs index 1e822c9..4c65c2e 100644 --- a/csharp/snippets/stewardship/ForceResolve/Program.cs +++ b/csharp/snippets/stewardship/ForceResolve/Program.cs @@ -15,8 +15,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -32,99 +32,99 @@ try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - IDictionary<(string, string), string> records = GetRecords(); + IDictionary<(string, string), string> records = GetRecords(); - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in records) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in records) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); - } + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); + } - Console.WriteLine(); - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); + Console.WriteLine(); + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " originally resolves to entity " + entityID); - } - Console.WriteLine(); - Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " originally resolves to entity " + entityID); + } + Console.WriteLine(); + Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); - string record1 = engine.GetRecord(TestDataSource, "1", SzRecordDefaultFlags); - string record3 = engine.GetRecord(TestDataSource, "3", SzRecordDefaultFlags); + string record1 = engine.GetRecord(TestDataSource, "1", SzRecordDefaultFlags); + string record3 = engine.GetRecord(TestDataSource, "3", SzRecordDefaultFlags); - JsonObject?[] jsonObjects = { + JsonObject?[] jsonObjects = { JsonNode.Parse(record1)?.AsObject()?["JSON_DATA"]?.AsObject(), JsonNode.Parse(record3)?.AsObject()?["JSON_DATA"]?.AsObject() }; - foreach (JsonObject? obj in jsonObjects) - { - if (obj == null) + foreach (JsonObject? obj in jsonObjects) + { + if (obj == null) + { + throw new JsonException("Parsed record is unexpectedly null: " + + record1 + " / " + record3); + } + obj["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R1-TEST_R3\""); + obj["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_RESOLVE\""); + } + engine.AddRecord(TestDataSource, "1", jsonObjects[0]?.ToJsonString()); + engine.AddRecord(TestDataSource, "3", jsonObjects[1]?.ToJsonString()); + + Console.WriteLine(); + + foreach ((string dataSourceCode, string recordID) in records.Keys) { - throw new JsonException("Parsed record is unexpectedly null: " - + record1 + " / " + record3); + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " now resolves to entity " + entityID); } - obj["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R1-TEST_R3\""); - obj["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_RESOLVE\""); - } - engine.AddRecord(TestDataSource, "1", jsonObjects[0]?.ToJsonString()); - engine.AddRecord(TestDataSource, "3", jsonObjects[1]?.ToJsonString()); - - Console.WriteLine(); - - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " now resolves to entity " + entityID); - } - Console.WriteLine(); + Console.WriteLine(); } catch (SzException e) { - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); - throw; + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); + throw; } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); } /// @@ -137,12 +137,12 @@ /// static IDictionary<(string, string), string> GetRecords() { - SortedDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); + SortedDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); - records.Add( - ("TEST", "1"), - """ + records.Add( + ("TEST", "1"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "1", @@ -154,9 +154,9 @@ } """); - records.Add( - ("TEST", "2"), - """ + records.Add( + ("TEST", "2"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "2", @@ -167,9 +167,9 @@ } """); - records.Add( - ("TEST", "3"), - """ + records.Add( + ("TEST", "3"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "3", @@ -179,12 +179,12 @@ } """); - return records; + return records; } public partial class Program { - private const string TestDataSource = "Test"; + private const string TestDataSource = "Test"; } #pragma warning restore CA1303 // Do not pass literals as localized parameters (example messages) diff --git a/csharp/snippets/stewardship/ForceUnresolve/Program.cs b/csharp/snippets/stewardship/ForceUnresolve/Program.cs index 7050303..0729440 100644 --- a/csharp/snippets/stewardship/ForceUnresolve/Program.cs +++ b/csharp/snippets/stewardship/ForceUnresolve/Program.cs @@ -15,8 +15,8 @@ string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) { - Console.Error.WriteLine("Unable to get settings."); - throw new ArgumentException("Unable to get settings"); + Console.Error.WriteLine("Unable to get settings."); + throw new ArgumentException("Unable to get settings"); } // create a descriptive instance name (can be anything) @@ -32,103 +32,103 @@ try { - // get the engine from the environment - SzEngine engine = env.GetEngine(); + // get the engine from the environment + SzEngine engine = env.GetEngine(); - IDictionary<(string, string), string> records = GetRecords(); + IDictionary<(string, string), string> records = GetRecords(); - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in records) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in records) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); - } + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); + } - Console.WriteLine(); - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); + Console.WriteLine(); + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " originally resolves to entity " + entityID); - } - Console.WriteLine(); - Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " originally resolves to entity " + entityID); + } + Console.WriteLine(); + Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); - string record4 = engine.GetRecord(TestDataSource, "4", SzRecordDefaultFlags); - string record6 = engine.GetRecord(TestDataSource, "6", SzRecordDefaultFlags); + string record4 = engine.GetRecord(TestDataSource, "4", SzRecordDefaultFlags); + string record6 = engine.GetRecord(TestDataSource, "6", SzRecordDefaultFlags); - JsonObject? obj4 = JsonNode.Parse(record4)?.AsObject(); - JsonObject? obj6 = JsonNode.Parse(record6)?.AsObject(); + JsonObject? obj4 = JsonNode.Parse(record4)?.AsObject(); + JsonObject? obj6 = JsonNode.Parse(record6)?.AsObject(); - obj4 = obj4?["JSON_DATA"]?.AsObject(); - obj6 = obj6?["JSON_DATA"]?.AsObject(); + obj4 = obj4?["JSON_DATA"]?.AsObject(); + obj6 = obj6?["JSON_DATA"]?.AsObject(); - if (obj4 == null || obj6 == null) - { - throw new JsonException("The JSON_DATA parses as null: " - + record4 + " / " + record6); - } + if (obj4 == null || obj6 == null) + { + throw new JsonException("The JSON_DATA parses as null: " + + record4 + " / " + record6); + } - obj4["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R4-TEST_R6\""); - obj4["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); + obj4["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R4-TEST_R6\""); + obj4["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); - obj6["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R6-TEST_R4\""); - obj6["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); + obj6["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R6-TEST_R4\""); + obj6["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); - engine.AddRecord(TestDataSource, "4", obj4.ToJsonString()); - engine.AddRecord(TestDataSource, "6", obj6.ToJsonString()); + engine.AddRecord(TestDataSource, "4", obj4.ToJsonString()); + engine.AddRecord(TestDataSource, "6", obj6.ToJsonString()); - Console.WriteLine(); + Console.WriteLine(); - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " now resolves to entity " + entityID); - } - Console.WriteLine(); + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " now resolves to entity " + entityID); + } + Console.WriteLine(); } catch (SzException e) { - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); - throw; + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); + throw; } catch (Exception e) { - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); - throw; + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); + throw; } finally { - // IMPORTANT: make sure to destroy the environment - env.Destroy(); + // IMPORTANT: make sure to destroy the environment + env.Destroy(); } /// @@ -141,12 +141,12 @@ /// static IDictionary<(string, string), string> GetRecords() { - SortedDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); + SortedDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); - records.Add( - ("TEST", "4"), - """ + records.Add( + ("TEST", "4"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "4", @@ -157,9 +157,9 @@ } """); - records.Add( - ("TEST", "5"), - """ + records.Add( + ("TEST", "5"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "5", @@ -170,9 +170,9 @@ } """); - records.Add( - ("TEST", "6"), - """ + records.Add( + ("TEST", "6"), + """ { "DATA_SOURCE": "TEST", "RECORD_ID": "6", @@ -182,12 +182,12 @@ } """); - return records; + return records; } public partial class Program { - private const string TestDataSource = "Test"; + private const string TestDataSource = "Test"; } #pragma warning restore CA1303 // Do not pass literals as localized parameters (example messages)