diff --git a/servers/cu/src/domain/api/dryRun.js b/servers/cu/src/domain/api/dryRun.js index 14f858f3d..71ec4462e 100644 --- a/servers/cu/src/domain/api/dryRun.js +++ b/servers/cu/src/domain/api/dryRun.js @@ -126,8 +126,7 @@ export function dryRunWith (env) { * So we explicitly set cron to undefined, for posterity */ cron: undefined, - needsOnlyMemory: true, - dryRun: true + needsOnlyMemory: true }).map((res) => { const cached = { age: new Date().getTime(), ctx: res } /** diff --git a/servers/cu/src/domain/api/readState.js b/servers/cu/src/domain/api/readState.js index 7df8bea20..6cb33058d 100644 --- a/servers/cu/src/domain/api/readState.js +++ b/servers/cu/src/domain/api/readState.js @@ -37,7 +37,7 @@ export function readStateWith (env) { const loadModule = loadModuleWith(env) const evaluate = evaluateWith(env) - return ({ processId, messageId, to, ordinate, cron, needsOnlyMemory, dryRun, body }) => { + return ({ processId, messageId, to, ordinate, cron, needsOnlyMemory, body }) => { messageId = messageId || [to, ordinate, cron].filter(isNotNil).join(':') || 'latest' const stats = { @@ -84,7 +84,7 @@ export function readStateWith (env) { * there is only one instance of the work used to resolve each Async, * every time, thus preventing duplication of work */ - pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsOnlyMemory, body, dryRun }) + pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsOnlyMemory, body }) .chain(loadProcessMeta) .chain(loadProcess) .chain((ctx) => { diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 2ec17cca2..73133ff98 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -192,8 +192,7 @@ export const loadMessagesSchema = z.function() assignmentId: z.string().nullish(), hashChain: z.string().nullish(), isColdStart: z.boolean(), - body: z.any().optional(), - dryRun: z.boolean().optional() + body: z.any().optional() }) ) /** diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index ce93bfb11..d94145b2e 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -192,35 +192,13 @@ export function evaluateWith (env) { continue } - const skip = message.Skip === 'true' - if (skip) logger(`Skipping message "${name}" because 'Skip' tag is set to 'true'`) - prev = await Promise.resolve(prev) .then((prev) => Promise.resolve(prev.Memory) /** * Where the actual evaluation is performed */ - .then((Memory) => { - // console.dir({ m: 'Evaluating message', message }, {depth:null}) - if (skip) { - return { - Memory, - Error: undefined, - Messages: [], - Assignments: [], - Spawns: [], - Output: { - data: '', - prompt: '', - print: false - }, - Patches: [], - GasUsed: 0 - } - } - return ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal }) - }) + .then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })) /** * These values are folded, * so that we can potentially update the process memory cache diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 0ae9e3007..e088b1c53 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -493,8 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { assignmentId: ctx.mostRecentAssignmentId, hashChain: ctx.mostRecentHashChain, isColdStart: ctx.isColdStart, - body: ctx.body, - dryRun: ctx.dryRun + body: ctx.body }) ) } diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index bb8b2dd62..bca646615 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -370,7 +370,6 @@ export const messageSchema = z.object({ * Whether the message is a cron generated message or not */ Cron: z.boolean(), - Skip: z.string().default('false'), 'Read-Only': z.boolean().default(false) }), AoGlobal: z.object({ diff --git a/servers/cu/src/effects/hb/index.js b/servers/cu/src/effects/hb/index.js index 481b9dd4e..652a24249 100644 --- a/servers/cu/src/effects/hb/index.js +++ b/servers/cu/src/effects/hb/index.js @@ -182,7 +182,6 @@ export const mapNode = (type) => pipe( Target: path(['Process']), Epoch: pipe(path(['Epoch']), parseInt), Nonce: pipe(path(['Slot'])(tags) ? path(['Slot']) : path(['Nonce']), parseInt), - Skip: pathOr('false', ['Skip']), Timestamp: pipe(path(['Timestamp']), parseInt), 'Block-Height': pipe( /** @@ -431,7 +430,7 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } * When the currently fetched page is drained, the next page is fetched * dynamically */ - function fetchAllPages ({ suUrl, processId, isColdStart, from, to, body, dryRun }) { + function fetchAllPages ({ suUrl, processId, isColdStart, from, to, body }) { /** * The HB SU 'from' and 'to' are both inclusive. * So when we pass from (which is the cached most recent evaluated message) @@ -454,10 +453,9 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } body.edges.length === (+to - +from + 1) && +body.edges[0]?.node?.assignment?.Tags?.find(t => t.name === 'Nonce' || t.name === 'Slot')?.value === +from if (bodyIsValid) return body - if (!dryRun) throw new Error('Body is not valid: would attempt to fetch from scheduler in loadMessages') return fetchPageDataloader.load({ suUrl, processId, from, to, pageSize }) }, - { maxRetries: 1, delay: 500, log: logger, name: `loadMessages(${JSON.stringify({ suUrl, processId, params: params.toString() })})` } + { maxRetries: 5, delay: 500, log: logger, name: `loadMessages(${JSON.stringify({ suUrl, processId, params: params.toString() })})` } ) } @@ -583,10 +581,10 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } .then(({ suUrl, processId, block: processBlock, owner: processOwner, tags: processTags, moduleId, moduleOwner, moduleTags, fromOrdinate, toOrdinate, assignmentId, hashChain, - isColdStart, body, dryRun + isColdStart, body }) => { return [ - Readable.from(fetchAllPages({ suUrl, processId, isColdStart, from: fromOrdinate, to: toOrdinate, body, dryRun })()), + Readable.from(fetchAllPages({ suUrl, processId, isColdStart, from: fromOrdinate, to: toOrdinate, body })()), Transform.from(mapAoMessage({ processId, processBlock, @@ -607,15 +605,17 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } export const loadMessageMetaWith = ({ fetch, logger }) => { return async ({ suUrl, processId, messageUid, body }) => { + const params = toParams({ processId, to: messageUid, from: messageUid, pageSize: 1 }) + return backoff( () => { const bodyIsValid = body && body.edges.length === 1 && +body.edges[0]?.node?.assignment?.Tags?.find(t => t.name === 'Nonce' || t.name === 'Slot')?.value === +messageUid if (bodyIsValid) return body - throw new Error('Body is not valid: would attempt to fetch from scheduler in loadMessageMeta') + return fetch(`${suUrl}/~scheduler@1.0/schedule?${params.toString()}`).then(okRes) }, - { maxRetries: 2, delay: 500, log: logger, name: `loadMessageMeta(${JSON.stringify({ suUrl, processId, messageUid })})` } + { maxRetries: 5, delay: 500, log: logger, name: `loadMessageMeta(${JSON.stringify({ suUrl, processId, messageUid })})` } ) .catch(async (err) => { logger(