Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions base_folder/src/function-factory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import extraction from './functions/extraction';

export const functionFactory = {
// Add your functions here
extraction,
} as const;

export type FunctionFactoryType = keyof typeof functionFactory;
37 changes: 37 additions & 0 deletions base_folder/src/functions/extraction/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { convertToAirdropEvent } from '../../core/utils';
import { FunctionInput } from '../../core/types';
import { spawn, EventType } from '@devrev/ts-adaas';

function getWorkerPerExtractionPhase(event: FunctionInput) {
let path;
switch (event.payload.event_type) {
case EventType.ExtractionExternalSyncUnitsStart:
path = __dirname + '/workers/external-sync-units-extraction';
break;
case EventType.ExtractionMetadataStart:
path = __dirname + '/workers/metadata-extraction';
break;
case EventType.ExtractionDataStart:
case EventType.ExtractionDataContinue:
path = __dirname + '/workers/data-extraction';
break;
case EventType.ExtractionAttachmentsStart:
case EventType.ExtractionAttachmentsContinue:
path = __dirname + '/workers/attachments-extraction';
break;
}
return path;
}

const run = async (events: FunctionInput[]) => {
for (const event of events) {
const file = getWorkerPerExtractionPhase(event);
await spawn({
event: convertToAirdropEvent(event),
workerPath: file,
initialState: {},
});
}
};

export default run;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { processTask } from '@devrev/ts-adaas';

processTask({
task: async ({ adapter }) => {},
onTimeout: async ({ adapter }) => {},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { processTask } from "@devrev/ts-adaas";

processTask({
task: async ({ adapter }) => {},
onTimeout: async ({ adapter }) => {},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { processTask } from "@devrev/ts-adaas";

processTask({
task: async ({ adapter }) => {},
onTimeout: async ({ adapter }) => {},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { processTask } from '@devrev/ts-adaas';

processTask({
task: async ({ adapter }) => {},
onTimeout: async ({ adapter }) => {},
});
60 changes: 56 additions & 4 deletions base_folder/test/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) {
resp.status(400).send(errMsg);
return;
}
// crash the process if an empty array is provided
if (Array.isArray(events) && events.length === 0) {
let errMsg = 'Invalid request format: body is an empty array';
error = {
err_type: RuntimeErrorType.InvalidRequest,
err_msg: errMsg,
} as RuntimeError;
console.error(error.err_msg);
// Return validation error status for empty events input
resp.status(400).send(errMsg);
return;
}
// if the request is synchronous, there should be a single event
if (!isAsync) {
if (events.length > 1) {
Expand All @@ -88,6 +100,29 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) {
return;
}
} else {
// Preflight validation for async requests: ensure each event is minimally valid
for (let event of events) {
if (!event || !event.execution_metadata) {
let errMsg = 'Invalid request format: missing execution_metadata';
console.error(errMsg);
resp.status(400).send(errMsg);
return;
}
const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType;
if (functionName === undefined) {
let errMsg = 'Function name not provided in event';
console.error(errMsg);
resp.status(400).send(errMsg);
return;
}
const f = functionFactory[functionName];
if (f == undefined) {
let errMsg = `Function ${event.execution_metadata.function_name} not found in factory`;
console.error(errMsg);
resp.status(400).send(errMsg);
return;
}
}
// return a success response back to the server
resp.status(200).send();
}
Expand All @@ -101,8 +136,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) {
err_msg: errMsg,
} as RuntimeError;
console.error(error.err_msg);
resp.status(400).send(errMsg);
return;
if (!isAsync) {
resp.status(400).send(errMsg);
return;
} else {
// For async requests, response has already been sent; skip further processing for this event
continue;
}
}
const functionName: FunctionFactoryType = event.execution_metadata.function_name as FunctionFactoryType;
if (functionName === undefined) {
Expand All @@ -111,7 +151,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) {
err_msg: 'Function name not provided in event',
} as RuntimeError;
console.error(error.err_msg);
receivedError = true;
if (!isAsync) {
resp.status(400).send(error.err_msg);
return;
} else {
receivedError = true;
continue;
}
} else {
const f = functionFactory[functionName];
try {
Expand All @@ -121,7 +167,13 @@ async function handleEvent(events: any[], isAsync: boolean, resp: Response) {
err_msg: `Function ${event.execution_metadata.function_name} not found in factory`,
} as RuntimeError;
console.error(error.err_msg);
receivedError = true;
if (!isAsync) {
resp.status(400).send(error.err_msg);
return;
} else {
receivedError = true;
continue;
}
} else {
result = await run(f, [event]);
}
Expand Down
3 changes: 3 additions & 0 deletions devrev-snapin-template.plain
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

- The Conformance Tests should spin a server at http://localhost:8002 for testing callbacks (The Callback Server).
- The Conformance Tests *MUST NOT* send the event directly to The Callback Server.
- The Implementation Code *MUST NOT* send the event directly to The Callback Server.

- The DevRev Server runs at http://localhost:8003, so it must not be mocked.

Expand Down Expand Up @@ -93,6 +94,8 @@

- The Function being implemented *MUST not* be called "extraction" or "loading".

- {% include "templates/should_not_modify_extraction_function.plain" %}

***Functional Requirements:***

- Implement The Function that only checks if The Function can be invoked.
Expand Down
1 change: 1 addition & 0 deletions docs/data-extraction-rules-for-emitting-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Here are the rules when `EXTRACTION_DATA_START` or `EXTRACTION_DATA_CONTINUE` event is received by the snap-in:

- If there's a timeout ("onTimeout" method in "spawn" method), the snap-in should emit an event with type `EXTRACTION_DATA_PROGRESS` (no "data" payload).
- Note: Event `EXTRACTION_DATA_PROGRESS` **MUST NEVER** be emitted if the the "task" method in The Worker Thread. The Only place where `EXTRACTION_DATA_PROGRESS` can be emitted is in the "onTimeout" method in The Worker Thread.
- If the extraction is successful and all the resources have been extracted, the snap-in should emit an event with type `EXTRACTION_DATA_DONE` (no "data" payload).
- In case of unexpected error, the snap-in should emit an event with type `EXTRACTION_DATA_ERROR` ("data" payload should contain attribute "error" - error message as a string).
- In case of rate limiting (status code 429) for "data extraction", the snap-in *must* emit an event with type `EXTRACTION_DATA_DELAY`. If emitting event of type `EXTRACTION_DATA_DELAY`, you *must* also provide a "delay" attribute, specifying the delay in seconds as an integer.
Expand Down
22 changes: 1 addition & 21 deletions docs/data-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,7 @@ This state object is shared across all invocations and keeps track of where the

## Triggering event

Airdrop initiates data extraction by starting the snap-in with a message with event type
`EXTRACTION_DATA_START` when transitioning to the data extraction phase.

During the data extraction phase, the snap-in extracts data from an external system,
prepares batches of data and uploads them in the form of artifacts (files) to DevRev.

The snap-in must respond to Airdrop with a message with event type of `EXTRACTION_DATA_PROGRESS`,
together with an optional progress estimate when the maximum Airdrop snap-in runtime (13 minutes) has been reached.

If the extraction has been rate-limited by the external system and back-off is required, the snap-in
must respond to Airdrop with a message with event type `EXTRACTION_DATA_DELAY` and specifying
back-off time with `delay` attribute (in seconds as an integer).

In both cases, Airdrop starts the snap-in with a message with event type `EXTRACTION_DATA_CONTINUE`.
In case of `EXTRACTION_DATA_PROGRESS` the restarting is immediate,
meanwhile in case of `EXTRACTION_DATA_DELAY` the restarting is delayed for the given number of seconds.

Once the data extraction is done, the snap-in must respond to Airdrop with a message with event type `EXTRACTION_DATA_DONE`.

If data extraction failed in any moment of extraction, the snap-in must respond to Airdrop with a
message with event type `EXTRACTION_DATA_ERROR`.
Refer to the resource [data-extraction-rules-for-emitting-events.md](docs/data-extraction-rules-for-emitting-events.md) for the rules for emitting events for The Extraction Function.

## Implementation

Expand Down
6 changes: 3 additions & 3 deletions run_devrev_snapin_conformance_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ NPM_INSTALL_OUTPUT_FILTER="up to date in|added [0-9]* packages, removed [0-9]* p
ANSI_ESCAPE_PATTERN="s/\x1b\[[0-9;]*[mK]//g"

# Maximum number of characters to display from log files
SNAP_IN_LOG_MAX_CHARS=30000
DEVREV_SERVER_LOG_MAX_CHARS=15000
PROXY_SERVER_LOG_MAX_CHARS=10000
SNAP_IN_LOG_MAX_CHARS=80000
DEVREV_SERVER_LOG_MAX_CHARS=40000
PROXY_SERVER_LOG_MAX_CHARS=20000

# Function to print a log file, truncating it if it's too large
print_log_file() {
Expand Down
20 changes: 10 additions & 10 deletions templates/attachment_extraction_acceptance_test.plain
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
- Test The Extraction Function with the following flow (The Attachment Test Flow):
- Step 1: Invoke The Extraction Function using the resource [{{ data_extraction_resource_name }}](test_data/{{ data_extraction_resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE".
- Step 2: Invoke The Extraction Function using the resource [{{ attachment_extraction_resource_name }}](test_data/{{ attachment_extraction_resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event (`:= event_from_callback_server`) with "event_type" that equals "EXTRACTION_ATTACHMENTS_DONE".
- Expect `event_from_callback_server["event_data"]["artifacts"]` to be an array. Expect this array to not be empty. Expect this array to have length 1.
- Let `artifact_object = event_from_callback_server["event_data"]["artifacts"][0]`.
- Expect `artifact_object["item_type"] = "ssor_attachment"` and `artifact_object["item_count"] = {{ expected_attachment_count }}`.
- Make a GET request to "http://localhost:8003/is_uploaded/{artifact_object['id']}". Expect response code to be equal to 200.
- Note: The Attachment Test Flow should be executed within a single test.
Test The Extraction Function with the following flow (The Attachment Test Flow):
- Step 1: Invoke The Extraction Function using the resource [{{ data_extraction_resource_name }}](test_data/{{ data_extraction_resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE".
- Step 2: Invoke The Extraction Function using the resource [{{ attachment_extraction_resource_name }}](test_data/{{ attachment_extraction_resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event (`:= event_from_callback_server`) with "event_type" that equals "EXTRACTION_ATTACHMENTS_DONE".
- Expect `event_from_callback_server["event_data"]["artifacts"]` to be an array. Expect this array to not be empty. Expect this array to have length 1.
- Let `artifact_object = event_from_callback_server["event_data"]["artifacts"][0]`.
- Expect `artifact_object["item_type"] = "ssor_attachment"` and `artifact_object["item_count"] = {{ expected_attachment_count }}`.
- Make a GET request to "http://localhost:8003/is_uploaded/{artifact_object['id']}". Expect response code to be equal to 200.
- Note: The Attachment Test Flow should be executed within a single test.
20 changes: 10 additions & 10 deletions templates/chef_cli_normalization_validation.plain
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
- Then, validate The Normalization Function with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-data -m <The External Domain Metadata JSON object file> -r {{ record_type }}":
- To invoke the function that returns The External Domain Metadata JSON object, send the event [external_domain_metadata_event_payload.json](test_data/external_domain_metadata_event_payload.json) to The Snap-In server.
- The External Domain Metadata JSON object should then be stored in temporary file and provided as path to the validation of The Normalization Function.
- The Chef CLI reads the extracted file (The Extracted File) from stdin.
- The Extracted File is inside folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH". To retrieve the relevant file name, command `ls EXTRACTED_FILES_FOLDER_PATH | grep extractor_{{ record_type }} | sort -r | head -n 1` should be executed. The test should always fail if The Extracted File is not present.
- Please note that the expected behaviour is that only after the invocation of The Extraction Function, the folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH" should exist. The test should always fail if before the execution of The Chef CLI, the folder "EXTRACTED_FILES_FOLDER_PATH" does not exist or if The Extracted File is not found.
- The path to The Chef CLI executable is provided in the environment variable `CHEF_CLI_PATH`.
- For the validation of The Normalization Function to be successful The Chef CLI must return an empty output. The implementation of The Extraction Function directly infuences the output of The Chef CLI.
- The stdout and stderr of The Chef CLI *must* be printed to the console.
- The test should always fail if The Chef CLI is not available.
validate The Normalization Function with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-data -m <The External Domain Metadata JSON object file> -r {{ record_type }}":
- To invoke the function that returns The External Domain Metadata JSON object, send the event [external_domain_metadata_event_payload.json](test_data/external_domain_metadata_event_payload.json) to The Snap-In server.
- The External Domain Metadata JSON object should then be stored in temporary file and provided as path to the validation of The Normalization Function.
- The Chef CLI reads the extracted file (The Extracted File) from stdin.
- The Extracted File is inside folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH". To retrieve the relevant file name, command `ls EXTRACTED_FILES_FOLDER_PATH | grep extractor_{{ record_type }} | sort -r | head -n 1` should be executed. The test should always fail if The Extracted File is not present.
- Please note that the expected behaviour is that only after the invocation of The Extraction Function, the folder provided in the environment variable "EXTRACTED_FILES_FOLDER_PATH" should exist. The test should always fail if before the execution of The Chef CLI, the folder "EXTRACTED_FILES_FOLDER_PATH" does not exist or if The Extracted File is not found.
- The path to The Chef CLI executable is provided in the environment variable `CHEF_CLI_PATH`.
- For the validation of The Normalization Function to be successful The Chef CLI must return an empty output. The implementation of The Extraction Function directly infuences the output of The Chef CLI.
- The stdout and stderr of The Chef CLI *must* be printed to the console.
- The test should always fail if The Chef CLI is not available.
12 changes: 6 additions & 6 deletions templates/data_extraction_acceptance_test.plain
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
- Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE".
- Let `artifact_array = <event_from_callback_server>["event_data"]["artifacts"]` (array of objects).
- Expect `len(artifact_array) > 0`.
- Out of `artifact_array`, expect one of the elements to have "item_type" equal to "{{ artifact_name }}" (`:= {{ artifact_name }}_artifact`) and `{{ artifact_name }}_artifact["item_count"]={{ expected_item_count }}`.
- Note: If `{{ artifact_name }}_artifact["item_count"] < {{ expected_item_count }}`, this indicates that not all the {{ artifact_name }} data was extracted.
Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}).
- Expect The Callback Server to receive from DevRev a **single** event with "event_type" that equals "EXTRACTION_DATA_DONE".
- Let `artifact_array = <event_from_callback_server>["event_data"]["artifacts"]` (array of objects).
- Expect `len(artifact_array) > 0`.
- Out of `artifact_array`, expect one of the elements to have "item_type" equal to "{{ artifact_name }}" (`:= {{ artifact_name }}_artifact`) and `{{ artifact_name }}_artifact["item_count"]={{ expected_item_count }}`.
- Note: If `{{ artifact_name }}_artifact["item_count"] < {{ expected_item_count }}`, this indicates that not all the {{ artifact_name }} data was extracted.
5 changes: 5 additions & 0 deletions templates/data_fetching_return_expectation.plain
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The Functions implemented *should* also return:
- `status_code` - a status code from The API.
- `api_delay` - Delay in seconds to wait before retrying the request to The API (if no rate limit, set to 0)
- `message` - Information about the call the The Function.
- Note: Every The Function should handle rate limiting.
2 changes: 2 additions & 0 deletions templates/external_domain_metadata_boilerplate.plain
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

- The External Domain Metadata JSON object *MUST* be imported without "as" statement in The Implementation Code.

- {% include "templates/should_not_modify_extraction_function.plain" %}

***Test Requirements:***

- Validate generated The External Domain Metadata JSON object with the chef-cli tool (The Chef CLI) using the command "chef-cli validate-metadata":
Expand Down
6 changes: 6 additions & 0 deletions templates/external_sync_unit_acceptance_test.plain
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Test The Extraction Function using the resource [{{ resource_name }}](test_data/{{ resource_name }}).
- Expect The Callback Server to receive *a single* event with "event_type" "EXTRACTION_EXTERNAL_SYNC_UNITS_DONE".
- Let `external_sync_units_array = <event_from_callback_server>["event_data"]["external_sync_units"]` (array of objects).
- Expect `external_sync_units_array` to exist and be an array.
- Expect `len(external_sync_units_array) = {{ expected_external_sync_unit_count }}`.
- Out of `external_sync_units_array`, expect that there exists and element that has "name" equal to "{{ expected_external_sync_unit_name }}".
2 changes: 2 additions & 0 deletions templates/initial_domain_mapping_boilerplate.plain
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

- The External Domain Metadata JSON object should not be modified. If there are discrepancies between The External Domain Metadata JSON object and The Initial Domain Mapping JSON object, assume The External Domain Metadata JSON object is correct and The Initial Domain Mapping JSON object needs to be adjusted.

- {% include "templates/should_not_modify_extraction_function.plain" %}

***Test Requirements:***

- Validate generated The Initial Domain Mapping JSON object with the chef-cli tool (The Chef CLI) using the command "chef-cli initial-mapping check -m <The External Domain Metadata JSON object file>":
Expand Down
3 changes: 3 additions & 0 deletions templates/internal_client.plain
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The {{ external_system_name }} Internal Client is a TypeScript service that communicates with The API. These are the rules for The {{ external_system_name }} Internal Client:
- If we need to create a new request to The API, we must create a new method in The {{ external_system_name }} Internal Client.
- Communication with The API must be completely abstracted away from The Function. The Function must be able to initialize The {{ external_system_name }} Internal Client, call the relevant method from The {{ external_system_name }} Internal Client and get the response from The API.
Loading
Loading