Skip to content

Commit dfef557

Browse files
authored
issue #3599 fhir-flow better support non IBM FHIR Server upstream systems (#3600)
* issue #3599 use next links for fhir-flow checkpoints Signed-off-by: Robin Arnold <[email protected]> * issue #3599 make fhir-flow optimizations optional Signed-off-by: Robin Arnold <[email protected]> * issue #3599 handle null entry.id Signed-off-by: Robin Arnold <[email protected]>
1 parent e4d2b30 commit dfef557

13 files changed

+333
-238
lines changed

fhir-flow/README.md

+17-15
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,19 @@ java \
9292
--downstream-tenant tenant2
9393
```
9494

95-
The fhir-flow application periodically writes out CHECKPOINT values to the log file:
95+
The fhir-flow application periodically writes opaque CHECKPOINT values to the log file:
9696

9797
```
98-
2022-04-19 17:03:11.982 00000001 INFO impl.UpstreamFHIRHistoryReader CHECKPOINT: 26573
98+
2022-04-19 17:03:11.982 00000001 INFO impl.UpstreamFHIRHistoryReader CHECKPOINT: X2NvdW50PTUxMiZfZXhjbHVkZVRyYW5zYWN0aW9uVGltZW91dFdpbmRvdz10cnVlJl9zb3J0PW5vbmUmX2NoYW5nZUlkTWFya2VyPTEwMjM=
9999
```
100100

101101
When the run duration time is reached, the application waits for any pending work to complete then writes a FINAL CHECKPOINT message:
102102

103103
```
104-
2022-04-19 17:03:12.983 00000001 INFO impl.UpstreamFHIRHistoryReader FINAL CHECKPOINT: 27723
104+
2022-04-19 17:03:12.983 00000001 INFO impl.UpstreamFHIRHistoryReader FINAL CHECKPOINT: X2NvdW50PTUxMiZfZXhjbHVkZVRyYW5zYWN0aW9uVGltZW91dFdpbmRvdz10cnVlJl9zb3J0PW5vbmUmX2NoYW5nZUlkTWFya2VyPTEwMjM=
105105
```
106106

107-
The (Java `long`) value can be used to resume processing using the `--change-id-marker` parameter:
107+
The checkpoint value can be used to resume processing using the `--from-checkpoint` parameter:
108108

109109
```
110110
java \
@@ -115,25 +115,27 @@ java \
115115
--upstream-tenant tenant1 \
116116
--downstream-properties local.properties \
117117
--downstream-tenant tenant2 \
118-
--change-id-marker 27723
118+
--from-checkpoint "X2NvdW50PTUxMiZfZXhjbHVkZVRyYW5zYWN0aW9uVGltZW91dFdpbmRvdz10cnVlJl9zb3J0PW5vbmUmX2NoYW5nZUlkTWFya2VyPTEwMjM="
119119
```
120120

121121
## Command Line Options
122122

123123
| Option | Description |
124124
| ------ | ----------- |
125-
| --run-duration | The number of seconds to run before terminating |
126-
| --upstream-properties | A Java properties file containing connection details for the upstream FHIR server |
127-
| --upstream-tenant | The IBM FHIR Server upstream tenant name |
128-
| --downstream-properties | A Java properties file containing connection details for the downstream FHIR server |
129-
| --downstresam-tenant | The IBM FHIR Server downstream tenant name |
130-
| --change-id-marker | Start processing from this previously reported checkpoint value |
131-
| --partition-count | The number of parallel partitions to use for writing to the downstream FHIR server |
132-
| --partition-queue-size | The number of interactions that can be queued into any partition before blocking further fetches. This puts an upper bound on memory consumption when changes can be fetched more quickly than written to the downstream system, which is often the case. |
133-
| --reader-pool-size | The size of the thread-pool used to support asynchronous reading of upstream resources |
134-
| --drain-for-seconds | After the run-duration time has elapsed, wait this number of seconds for the downstream partition queues to empty before exiting |
125+
| --run-duration {seconds} | The number of seconds to run before terminating |
126+
| --upstream-properties {properties-file} | A Java properties file containing connection details for the upstream FHIR server |
127+
| --upstream-tenant {tenant-name} | The IBM FHIR Server upstream tenant name |
128+
| --downstream-properties {properties-file} | A Java properties file containing connection details for the downstream FHIR server |
129+
| --downstresam-tenant {tenant-name} | The IBM FHIR Server downstream tenant name |
130+
| --from-checkpoint {checkpoint-value} | Start processing from this previously reported checkpoint value |
131+
| --partition-count {n} | The number of parallel partitions to use for writing to the downstream FHIR server |
132+
| --partition-queue-size {n} | The number of interactions that can be queued into any partition before blocking further fetches. This puts an upper bound on memory consumption when changes can be fetched more quickly than written to the downstream system, which is often the case. |
133+
| --reader-pool-size {n} | The size of the thread-pool used to support asynchronous reading of upstream resources |
134+
| --drain-for-seconds {seconds} | After the run-duration time has elapsed, wait this number of seconds for the downstream partition queues to empty before exiting |
135135
| --parse-resource | Parse each resource received from the upstream system. The default mode is to not parse the resource, and treat the payload as an opaque string which is simply passed from upstream to downstream - thus saving a significant amount of CPU and pressure on the GC. |
136136
| --log-data | When in log-only mode (not writing to an actual downstream system), include the resource payload data when logging each interaction. |
137+
| --exclude-transaction-window | When upstream is an IBM FHIR, use the `_excludeTransactionTimeoutWindow=true` query parameter when fetching history to avoid potential issues with missing data in high-volume scenarios. |
138+
| --prefer-return-minimal | Use the `Prefer: return=minimal` header for upstream history requests. The IBM FHIR Server uses this as an optimization to skip inclusion of the resource in the response Bundle. Only meta-data related to the change history is returned, allowing for the resource data to be read separately using a VREAD interaction. Better throughput can be achieved by performing the VREAD interactions in parallel |
137139

138140
# Ideas for Future Development
139141

fhir-flow/src/main/java/com/ibm/fhir/flow/api/FlowInteraction.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
* Represents a resource being passed from the reader to the writer
1111
*/
1212
public abstract class FlowInteraction {
13-
// the change sequence number reported by the upstream server
14-
private final long changeId;
13+
// To assist tracking this with the bundle content
14+
private final String entryId;
1515

1616
// the ticket being used to track completion of this interaction
1717
private final ITrackerTicket trackerTicket;
@@ -22,12 +22,12 @@ public abstract class FlowInteraction {
2222
/**
2323
* Protected constructor
2424
*
25-
* @param changeId
25+
* @param entryId
2626
* @param trackerTicket
2727
* @param identifier
2828
*/
29-
protected FlowInteraction(long changeId, ITrackerTicket trackerTicket, ResourceIdentifier identifier) {
30-
this.changeId = changeId;
29+
protected FlowInteraction(String entryId, ITrackerTicket trackerTicket, ResourceIdentifier identifier) {
30+
this.entryId = entryId;
3131
this.trackerTicket = trackerTicket;
3232
this.identifier = identifier;
3333
}
@@ -52,11 +52,11 @@ public ResourceIdentifier getIdentifier() {
5252
}
5353

5454
/**
55-
* Getter for the changeId value
55+
* Getter for the entryId value
5656
* @return
5757
*/
58-
public long getChangeId() {
59-
return this.changeId;
58+
public String getEntryId() {
59+
return this.entryId;
6060
}
6161

6262
/**

fhir-flow/src/main/java/com/ibm/fhir/flow/api/ICheckpointTracker.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,35 @@
66

77
package com.ibm.fhir.flow.api;
88

9-
import java.util.List;
10-
119
/**
1210
* Supports tracking of work units so that a checkpoint can be written
1311
* periodically to indicate that all work up to a given point-in-time
1412
* has been completed.
13+
*
14+
*
15+
* @param <T> the type of the value used to represent a checkpoint
1516
*/
16-
public interface ICheckpointTracker {
17+
public interface ICheckpointTracker<T> {
1718

1819
/**
1920
* Add this request to the queue
2021
* @param requestId
2122
* @param an {@link ITrackerTicket} which can be used to signal completion of the work item (thread-safe)
23+
* @param workItems how many individual pieces of work are associated with this checkpoint value
2224
*/
23-
ITrackerTicket track(long requestId);
25+
ITrackerTicket track(T checkpointValue, int workItems);
2426

2527
/**
26-
* Track all of the requestIds in the given list. The output list
27-
* will be in the same order as the input list. All entries will
28-
* be added using a single monitor lock, which should be more
29-
* efficient than synchronizing on each individual item
30-
* @param requestIds
28+
* Get the current checkpoint value
3129
* @return
3230
*/
33-
List<ITrackerTicket> track(List<Long> requestIds);
31+
T getCheckpoint();
3432

3533
/**
36-
* Get the current checkpoint value
34+
* Get the total number of entries processed up to the current checkpoint
3735
* @return
3836
*/
39-
long getCheckpoint();
37+
long getProcessed();
4038

4139
/**
4240
* Is the tracker queue empty

fhir-flow/src/main/java/com/ibm/fhir/flow/api/IFlowInteractionHandler.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ public interface IFlowInteractionHandler {
1515

1616
/**
1717
* Perform a logical DELETE interaction
18-
* @param changeId
18+
* @param entryId
1919
* @param identifier
2020
*/
21-
void delete(long changeId, ResourceIdentifier identifier);
21+
void delete(String entryId, ResourceIdentifier identifier);
2222

2323
/**
2424
* Perform a create-or-update (PUT) interaction
25-
* @param changeId
25+
* @param entryId
2626
* @param identifier
2727
* @param resourceData
2828
* @param resource
2929
*/
30-
void createOrUpdate(long changeId, ResourceIdentifier identifier, String resourceData, Resource resource);
30+
void createOrUpdate(String entryId, ResourceIdentifier identifier, String resourceData, Resource resource);
3131
}

fhir-flow/src/main/java/com/ibm/fhir/flow/app/Main.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,20 @@ public class Main {
6767
private int resourcesPerHistoryCall = 512;
6868

6969
// Start processing from this point in the change stream
70-
private long changeIdMarker = -1;
70+
private String startFromCheckpoint = null;
7171

7272
// How many seconds to run for (default forever)
7373
private long runDurationSeconds = -1;
7474

7575
// How many seconds to wait for queued work to complete after the scan completes
7676
private long drainForSeconds = 600;
7777

78+
// Request upstream (IBM FHIR Server) system to exclude the transaction timeout window
79+
private boolean excludeTransactionWindow = false;
80+
81+
// Use the Prefer: return=minimal header to optimize upstream history request
82+
private boolean preferReturnMinimal = false;
83+
7884
/**
7985
* Parse the command line arguments
8086
* @param args
@@ -132,11 +138,11 @@ public void parseArgs(String[] args) {
132138
throw new IllegalArgumentException("missing value for --reader-pool-size");
133139
}
134140
break;
135-
case "--change-id-marker":
141+
case "--from-checkpoint":
136142
if (i < args.length + 1) {
137-
this.changeIdMarker = Long.parseLong(args[++i]);
143+
this.startFromCheckpoint = args[++i];
138144
} else {
139-
throw new IllegalArgumentException("missing value for --change-id-marker");
145+
throw new IllegalArgumentException("missing value for --from-checkpoint");
140146
}
141147
break;
142148
case "--run-duration":
@@ -153,9 +159,15 @@ public void parseArgs(String[] args) {
153159
throw new IllegalArgumentException("missing value for --drain-for-seconds");
154160
}
155161
break;
162+
case "--exclude-transaction-window":
163+
this.excludeTransactionWindow = true;
164+
break;
156165
case "--parse-resource":
157166
this.parseResource = true;
158167
break;
168+
case "--prefer-return-minimal":
169+
this.preferReturnMinimal = true;
170+
break;
159171
case "--log-data":
160172
this.logData = true;
161173
break;
@@ -221,7 +233,11 @@ public void process() {
221233
downstreamWriter = new DownstreamLogWriter(partitionCount, partitionQueueSize, this.logData);
222234
}
223235

224-
UpstreamFHIRHistoryReader historyReader = new UpstreamFHIRHistoryReader(this.resourcesPerHistoryCall, this.changeIdMarker, this.drainForSeconds);
236+
UpstreamFHIRHistoryReader historyReader = new UpstreamFHIRHistoryReader(this.resourcesPerHistoryCall,
237+
this.startFromCheckpoint,
238+
this.excludeTransactionWindow,
239+
this.preferReturnMinimal,
240+
this.drainForSeconds);
225241
historyReader.setClient(upstreamClient);
226242
historyReader.setFlowPool(readerPool);
227243
historyReader.setFlowWriter(downstreamWriter);

0 commit comments

Comments
 (0)