Skip to content

Commit d8b16cc

Browse files
migrate ingestion tests
1 parent f7f46e0 commit d8b16cc

File tree

6 files changed

+231
-92
lines changed

6 files changed

+231
-92
lines changed

.github/workflows/end2end.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,13 +599,12 @@ jobs:
599599
GIT_ACCESS_TOKEN: ${{ steps.app-token.outputs.token }}
600600
with:
601601
deploy_metadata: ${{ env.ENABLE_RING_TESTS }}
602+
- name: Debug wait
603+
uses: ./.github/actions/debug-wait
604+
timeout-minutes: 180
602605
- name: Run backbeat end to end tests
603606
run: bash run-e2e-test.sh "end2end" ${E2E_IMAGE_NAME}:${E2E_IMAGE_TAG} "backbeat" "default"
604607
working-directory: ./.github/scripts/end2end
605-
- name: Debug wait
606-
uses: ./.github/actions/debug-wait
607-
timeout-minutes: 60
608-
if: failure() && runner.debug == '1'
609608
- name: Archive and publish artifacts
610609
uses: ./.github/actions/archive-artifacts
611610
with:

tests/zenko_tests/node_tests/backbeat/IngestionUtility.js

Lines changed: 158 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,18 @@ class IngestionUtility extends ReplicationUtility {
2626
Key: objName,
2727
VersionId: versionId,
2828
}))
29-
.then(data => cb(null, data))
29+
.then(async (data) => {
30+
// AWS SDK v3 returns a readable stream in data.Body
31+
// We need to collect the stream data into a buffer
32+
if (data.Body) {
33+
const chunks = [];
34+
for await (const chunk of data.Body) {
35+
chunks.push(chunk);
36+
}
37+
data.Body = Buffer.concat(chunks);
38+
}
39+
cb(null, data);
40+
})
3041
.catch(err => cb(err));
3142
}
3243

@@ -36,37 +47,60 @@ class IngestionUtility extends ReplicationUtility {
3647
Key: objName,
3748
VersionId: versionId,
3849
}))
39-
.then(data => cb(null, data))
50+
.then(async (data) => {
51+
// AWS SDK v3 returns a readable stream in data.Body
52+
// We need to collect the stream data into a buffer
53+
if (data.Body) {
54+
const chunks = [];
55+
for await (const chunk of data.Body) {
56+
chunks.push(chunk);
57+
}
58+
data.Body = Buffer.concat(chunks);
59+
}
60+
cb(null, data);
61+
})
4062
.catch(err => cb(err));
4163
}
4264

4365
createIngestionBucket(bucketName, locationName, cb) {
66+
console.log(`[INGESTION] Creating ingestion bucket: ${bucketName} with location: ${locationName}`);
4467
const locationNameWithSuffix = `${locationName}:ingest`;
45-
return this.s3.send(new CreateBucketCommand({
68+
this.s3.send(new CreateBucketCommand({
4669
Bucket: bucketName,
4770
CreateBucketConfiguration: {
4871
LocationConstraint: locationNameWithSuffix,
4972
},
5073
}))
5174
.then(() => {
75+
console.log(`[INGESTION] Successfully created bucket: ${bucketName}`);
76+
console.log(`[INGESTION] Waiting 10 seconds for backbeat ingestion processes to be up-to-date`);
5277
// When resuming an ingestion-enabled location,
5378
// backbeat gets the list of buckets with ingestion-enabled
5479
// to check if the location is valid.
5580
// Backbeat sets the list of buckets with ingestion-enabled periodically,
5681
// so the list might be outdated for few seconds leading to a 404 API error response.
5782
// Also backbeat "ingestion producer" process applies update every 5 seconds.
5883
// For this reason, we are waiting 10 seconds to make sure ingestion processes are up-to-date.
59-
return setTimeout(() => backbeatAPIUtils.resumeIngestion(locationName, false, null, (err, body) => {
60-
if (err) {
61-
return cb(err);
62-
}
63-
if (body.code) {
64-
return cb(`error resuming ingestion: ${JSON.stringify(body)}`);
65-
}
66-
return cb();
67-
}), 10000);
84+
setTimeout(() => {
85+
console.log(`[INGESTION] Resuming ingestion for location: ${locationName}`);
86+
backbeatAPIUtils.resumeIngestion(locationName, false, null, (err, body) => {
87+
if (err) {
88+
console.error(`[INGESTION] Error resuming ingestion:`, err);
89+
return cb(err);
90+
}
91+
if (body.code) {
92+
console.error(`[INGESTION] Error resuming ingestion - bad response:`, body);
93+
return cb(`error resuming ingestion: ${JSON.stringify(body)}`);
94+
}
95+
console.log(`[INGESTION] Successfully resumed ingestion for location: ${locationName}`);
96+
return cb();
97+
});
98+
}, 10000);
6899
})
69-
.catch(err => cb(err));
100+
.catch(err => {
101+
console.error(`[INGESTION] Error creating bucket: ${bucketName}`, err);
102+
cb(err);
103+
});
70104
}
71105

72106
putObjectWithProperties(bucketName, objectName, content, cb) {
@@ -84,27 +118,49 @@ class IngestionUtility extends ReplicationUtility {
84118
}
85119

86120
waitUntilIngested(bucketName, key, versionId, cb) {
121+
console.log(`[INGESTION] Waiting for object to be ingested: ${bucketName}/${key} (version: ${versionId})`);
87122
let status;
123+
let attemptCount = 0;
124+
const maxAttempts = 90; // 3 minutes max (90 * 2 seconds = 180 seconds)
88125
const expectedCode = 'NotFound';
89126
return async.doWhilst(
90-
callback => this.s3.send(new HeadObjectCommand({
91-
Bucket: bucketName,
92-
Key: key,
93-
VersionId: versionId,
94-
}))
95-
.then(() => {
96-
status = true;
97-
return callback();
98-
})
99-
.catch(err => {
100-
if (err.name !== expectedCode) {
101-
return callback(err);
102-
}
103-
status = false;
104-
return setTimeout(callback, 2000);
105-
}),
127+
callback => {
128+
attemptCount++;
129+
if (attemptCount > maxAttempts) {
130+
console.error(`[INGESTION] Timeout: Object not ingested after ${maxAttempts} attempts (${maxAttempts * 2} seconds)`);
131+
return callback(new Error(`Ingestion timeout: Object ${bucketName}/${key} not ingested after ${maxAttempts * 2} seconds`));
132+
}
133+
console.log(`[INGESTION] Attempt ${attemptCount}/${maxAttempts}: Checking if object exists in destination bucket`);
134+
this.s3.send(new HeadObjectCommand({
135+
Bucket: bucketName,
136+
Key: key,
137+
VersionId: versionId,
138+
}))
139+
.then(() => {
140+
console.log(`[INGESTION] Object found in destination bucket after ${attemptCount} attempts`);
141+
status = true;
142+
return callback();
143+
})
144+
.catch(err => {
145+
console.log(`[INGESTION] Err (${err}`);
146+
if (err.name !== expectedCode) {
147+
console.error(`[INGESTION] Unexpected error checking object existence:`, err);
148+
return callback(err);
149+
}
150+
console.log(`[INGESTION] Object not yet ingested, waiting 2 seconds before retry (${attemptCount}/${maxAttempts})...`);
151+
status = false;
152+
return setTimeout(callback, 2000);
153+
});
154+
},
106155
() => !status,
107-
cb,
156+
err => {
157+
if (err) {
158+
console.error(`[INGESTION] Failed waiting for ingestion after ${attemptCount} attempts:`, err);
159+
} else {
160+
console.log(`[INGESTION] Successfully waited for ingestion after ${attemptCount} attempts`);
161+
}
162+
cb(err);
163+
},
108164
);
109165
}
110166

@@ -131,59 +187,98 @@ class IngestionUtility extends ReplicationUtility {
131187
return async.doWhilst(
132188
callback => this.s3.send(new ListObjectVersionsCommand({ Bucket: bucketName }))
133189
.then(data => {
134-
const versionLength = data.Versions.length;
135-
const deleteLength = data.DeleteMarkers.length;
190+
const versionLength = (data.Versions || []).length;
191+
const deleteLength = (data.DeleteMarkers || []).length;
136192
objectsEmpty = (versionLength + deleteLength) === 0;
137193
if (objectsEmpty) {
138194
return callback();
139195
}
140196
return setTimeout(callback, 2000);
141197
})
142-
.catch(err => cb(err)),
198+
.catch(err => callback(err)),
143199
() => !objectsEmpty,
144200
cb,
145201
);
146202
}
147203

148204
compareObjectsRINGS3C(srcBucket, destBucket, key, versionId, optionalFields, cb) {
205+
console.log(`[INGESTION] Starting object comparison: ${srcBucket}/${key} -> ${destBucket}/${key} (version: ${versionId})`);
149206
return async.series([
150-
next => this.waitUntilIngested(
151-
destBucket,
152-
key,
153-
versionId,
154-
next,
155-
),
156-
next => this.getSourceObject(srcBucket, key, versionId, next),
157-
next => this.getDestObject(destBucket, key, versionId, next),
207+
next => {
208+
console.log(`[INGESTION] Step 1: Waiting for object to be ingested`);
209+
this.waitUntilIngested(
210+
destBucket,
211+
key,
212+
versionId,
213+
err => {
214+
if (err) {
215+
console.error(`[INGESTION] Step 1 failed:`, err);
216+
} else {
217+
console.log(`[INGESTION] Step 1 completed: Object is ingested`);
218+
}
219+
next(err);
220+
},
221+
);
222+
},
223+
next => {
224+
console.log(`[INGESTION] Step 2: Getting source object`);
225+
this.getSourceObject(srcBucket, key, versionId, (err, data) => {
226+
if (err) {
227+
console.error(`[INGESTION] Step 2 failed:`, err);
228+
} else {
229+
console.log(`[INGESTION] Step 2 completed: Got source object`);
230+
}
231+
next(err, data);
232+
});
233+
},
234+
next => {
235+
console.log(`[INGESTION] Step 3: Getting destination object`);
236+
this.getDestObject(destBucket, key, versionId, (err, data) => {
237+
if (err) {
238+
console.error(`[INGESTION] Step 3 failed:`, err);
239+
} else {
240+
console.log(`[INGESTION] Step 3 completed: Got destination object`);
241+
}
242+
next(err, data);
243+
});
244+
},
158245
], (err, data) => {
159246
if (err) {
247+
console.error(`[INGESTION] Object comparison failed:`, err);
160248
return cb(err);
161249
}
250+
console.log(`[INGESTION] Starting object data comparison`);
162251
const srcData = data[1];
163252
const destData = data[2];
164-
assert.strictEqual(
165-
srcData.ContentLength,
166-
destData.ContentLength,
167-
);
168-
this._compareObjectBody(srcData.Body, destData.Body);
169-
assert.deepStrictEqual(srcData.Metadata, destData.Metadata);
170-
assert.strictEqual(srcData.ETag, destData.ETag);
171-
assert.strictEqual(srcData.ContentType, destData.ContentType);
172-
assert.strictEqual(srcData.VersionId, destData.VersionId);
173-
assert.strictEqual(
174-
srcData.LastModified.toString(),
175-
destData.LastModified.toString(),
176-
);
177-
if (optionalFields) {
178-
optionalFields.forEach(field => {
179-
if (field === 'Metadata') {
180-
assert.strictEqual(srcData.customKey, destData.customKey);
181-
} else {
182-
assert.strictEqual(srcData[field], destData[field]);
183-
}
184-
});
253+
try {
254+
assert.strictEqual(
255+
srcData.ContentLength,
256+
destData.ContentLength,
257+
);
258+
this._compareObjectBody(srcData.Body, destData.Body);
259+
assert.deepStrictEqual(srcData.Metadata, destData.Metadata);
260+
assert.strictEqual(srcData.ETag, destData.ETag);
261+
assert.strictEqual(srcData.ContentType, destData.ContentType);
262+
assert.strictEqual(srcData.VersionId, destData.VersionId);
263+
assert.strictEqual(
264+
srcData.LastModified.toString(),
265+
destData.LastModified.toString(),
266+
);
267+
if (optionalFields) {
268+
optionalFields.forEach(field => {
269+
if (field === 'Metadata') {
270+
assert.strictEqual(srcData.customKey, destData.customKey);
271+
} else {
272+
assert.strictEqual(srcData[field], destData[field]);
273+
}
274+
});
275+
}
276+
console.log(`[INGESTION] Object comparison completed successfully`);
277+
return cb();
278+
} catch (comparisonError) {
279+
console.error(`[INGESTION] Object comparison assertion failed:`, comparisonError);
280+
return cb(comparisonError);
185281
}
186-
return cb();
187282
});
188283
}
189284

tests/zenko_tests/node_tests/backbeat/LifecycleUtility.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,18 @@ class LifecycleUtility extends ReplicationUtility {
134134
params.VersionId = versionId;
135135
}
136136
this.s3.send(new GetObjectCommand(params))
137-
.then(data => cb(null, data))
137+
.then(async (data) => {
138+
// AWS SDK v3 returns a readable stream in data.Body
139+
// We need to collect the stream data into a buffer
140+
if (data.Body) {
141+
const chunks = [];
142+
for await (const chunk of data.Body) {
143+
chunks.push(chunk);
144+
}
145+
data.Body = Buffer.concat(chunks);
146+
}
147+
cb(null, data);
148+
})
138149
.catch(err => cb(err));
139150
}
140151

tests/zenko_tests/node_tests/backbeat/ReplicationUtility.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,14 @@ class ReplicationUtility {
168168
}
169169

170170
putObject(bucketName, objectName, content, cb) {
171-
this.s3.send(new PutObjectCommand({
171+
const params = {
172172
Bucket: bucketName,
173173
Key: objectName,
174-
Body: content,
175-
}))
174+
};
175+
if (content) {
176+
params.Body = content;
177+
}
178+
this.s3.send(new PutObjectCommand(params))
176179
.then(data => cb(null, data))
177180
.catch(err => cb(err));
178181
}

0 commit comments

Comments
 (0)