Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,129 changes: 241 additions & 888 deletions docs/MEMORY-INTEGRATION.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion infra/openshell/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ COPY pnpm-workspace.yaml package.json pnpm-lock.yaml tsconfig.base.json ./
COPY packages/sandbox-server/package.json packages/sandbox-server/
COPY packages/agents/package.json packages/agents/
COPY packages/logger/package.json packages/logger/
COPY packages/memory/package.json packages/memory/
COPY packages/zosma-mem/package.json packages/zosma-mem/
COPY packages/db/package.json packages/db/
COPY packages/integrations/package.json packages/integrations/
COPY packages/skills/reports/package.json packages/skills/reports/
Expand Down
16 changes: 8 additions & 8 deletions packages/agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
"check": "tsc --noEmit"
},
"dependencies": {
"@mariozechner/pi-agent-core": "^0.61.0",
"@mariozechner/pi-ai": "^0.61.0",
"@mariozechner/pi-coding-agent": "^0.61.0",
"@openzosma/logger": "workspace:*",
"@openzosma/memory": "workspace:*",
"@sinclair/typebox": "^0.34.48",
"dotenv": "17.3.1",
"uuid": "^13.0.0",
"@openzosma/db": "workspace:*",
"@openzosma/integrations": "workspace:*",
"@openzosma/logger": "workspace:*",
"@openzosma/skill-reports": "workspace:*",
"@openzosma/zosma-mem": "workspace:*",
"@sinclair/typebox": "^0.34.48",
"dotenv": "17.3.1",
"pg": "^8.13.1",
"@mariozechner/pi-agent-core": "^0.61.0"
"uuid": "^13.0.0"
},
"devDependencies": {
"@types/node": "^22.15.2",
"typescript": "^5.7.3",
"@types/pg": "^8.11.10"
"@types/pg": "^8.11.10",
"typescript": "^5.7.3"
}
}
187 changes: 166 additions & 21 deletions packages/agents/src/pi.agent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { randomUUID } from "node:crypto"
import { join } from "node:path"
import type { Api, Model } from "@mariozechner/pi-ai"
import type { AgentSession as PiSdkSession } from "@mariozechner/pi-coding-agent"
import {
AuthStorage,
Expand All @@ -8,8 +10,10 @@ import {
createAgentSession,
} from "@mariozechner/pi-coding-agent"
import { createLogger } from "@openzosma/logger"
import { bootstrapMemory } from "@openzosma/memory"
import { createMemoryBridge, resolveMemoryExtensionPaths } from "@openzosma/zosma-mem/bridge"
import type { MemoryBridge } from "@openzosma/zosma-mem/bridge"
import { DEFAULT_SYSTEM_PROMPT } from "./pi/config.js"
import { ensureBrainInit, extractFacts } from "./pi/memory.js"
import { resolveModel } from "./pi/model.js"
import {
createDefaultTools,
Expand All @@ -35,7 +39,7 @@ const LLM_IDLE_TIMEOUT_MS = Number(process.env.OPENZOSMA_LLM_IDLE_TIMEOUT_MS) ||
* Build a ModelRegistry that knows about a custom provider and its API key.
* This is needed because pi-coding-agent's AgentSession validates the API key
* via ModelRegistry.getApiKey() before each prompt. Without registration,
* custom providers (like "local") fail with "No API key found".
* custom providers (like openai, anthropic, etc.) fail with "No API key found".
*/
function buildModelRegistry(providerName: string, apiKey: string, baseUrl: string): ModelRegistry {
const authStorage = AuthStorage.inMemory()
Expand All @@ -50,38 +54,71 @@ function buildModelRegistry(providerName: string, apiKey: string, baseUrl: strin
class PiAgentSession implements AgentSession {
private sessionPromise: Promise<PiSdkSession>
private messages: AgentMessage[] = []
private memoryBridge: MemoryBridge
private model: Model<Api>
private apiKey: string
private inFlightExtracts = new Set<Promise<void>>()

constructor(opts: AgentSessionOpts) {
bootstrapMemory({
workspaceDir: opts.workspaceDir,
memoryDir: opts.memoryDir,
const { model, apiKey } = resolveModel({
provider: opts.provider,
model: opts.model,
baseUrl: opts.baseUrl,
})
this.model = model
this.apiKey = apiKey

// Stable memory dir: use the explicit memoryDir from opts if provided,
// otherwise fall back to the default path inside the workspace.
const memoryDir = opts.memoryDir ?? join(opts.workspaceDir, ".pi", "agent", "memory")
log.info("Memory directory set", { memoryDir })
this.memoryBridge = createMemoryBridge({ memoryDir })

// Ensure pi-brain .memory/ structure exists in the workspace so its
// extension tools don't return "Brain not initialized" errors to the LLM.
try {
ensureBrainInit(opts.workspaceDir)
} catch (err) {
log.warn("ensureBrainInit failed (non-fatal)", {
error: err instanceof Error ? err.message : String(err),
})
}

const toolList = [...createDefaultTools(opts.workspaceDir, opts.toolsEnabled)]
const reportTools = createReportTools(opts.toolsEnabled, opts.workspaceDir)
const customTools = [
...reportTools,
...(opts.dbPool ? [createQueryDatabaseTool(opts.dbPool), createListDatabaseSchemasTool(opts.dbPool)] : []),
]
const { model, apiKey } = resolveModel({
provider: opts.provider,
model: opts.model,
baseUrl: opts.baseUrl,
})

// Build the final system prompt: optional prefix + main prompt
// Build the final system prompt:
// [systemPromptPrefix] + [base prompt] + [systemPromptSuffix]
// systemPromptPrefix: caller-supplied context (e.g. agent-config overrides)
// base prompt: DEFAULT_SYSTEM_PROMPT or per-config override
// systemPromptSuffix: gateway-injected context (e.g. database integration list)
const basePrompt = opts.systemPrompt ?? DEFAULT_SYSTEM_PROMPT
const finalPrompt = opts.systemPromptPrefix ? `${opts.systemPromptPrefix}\n\n${basePrompt}` : basePrompt
const parts = [opts.systemPromptPrefix, basePrompt, opts.systemPromptSuffix].filter(Boolean)
const finalPrompt = parts.join("\n\n")

log.info("PiAgentSession: building system prompt", {
hasPrefix: !!opts.systemPromptPrefix,
hasSuffix: !!opts.systemPromptSuffix,
prefixLength: opts.systemPromptPrefix?.length ?? 0,
prefixPreview: opts.systemPromptPrefix?.slice(0, 80) ?? "(none)",
suffixLength: opts.systemPromptSuffix?.length ?? 0,
finalPromptLength: finalPrompt.length,
})

// Resolve pi-brain and pi-dcp extension paths. Missing packages are
// silently skipped — the agent degrades gracefully without them.
const { paths: memoryExtensionPaths, missing: missingExtensions } = resolveMemoryExtensionPaths()
if (missingExtensions.length > 0) {
log.warn("Memory extensions not found — skipping", { missing: missingExtensions })
}

const resourceLoader = new DefaultResourceLoader({
cwd: opts.workspaceDir,
systemPrompt: finalPrompt,
additionalExtensionPaths: memoryExtensionPaths,
})

// For custom/local providers not in the built-in registry, create a
Expand Down Expand Up @@ -114,7 +151,41 @@ class PiAgentSession implements AgentSession {
async *sendMessage(content: string, signal?: AbortSignal): AsyncGenerator<AgentStreamEvent> {
const session = await this.sessionPromise

const promptContent = content
// Retrieve relevant memories and track which ones we injected.
// We'll use this to record reinforcement signals later.
let injectedMemoryIds: string[] = []
let injectedMemoryEntities: Array<{ id: string; content: string }> = []
let memoryContextBlock = ""
try {
const {
context: memoryContext,
ids: injectedIds,
entities: injectedEntities,
} = await this.memoryBridge.loadContext(content)
injectedMemoryIds = injectedIds
injectedMemoryEntities = injectedEntities
log.info("Loaded memory context", {
memories: injectedIds.length,
query: content.slice(0, 80),
contextLength: memoryContext?.length ?? 0,
})
if (memoryContext) {
memoryContextBlock = memoryContext
log.info("Memory context will be prepended to user message", {
length: memoryContext.length,
injectedIds: injectedMemoryIds.length,
})
}
} catch (err) {
log.warn("Failed to load memory context (non-fatal)", {
error: err instanceof Error ? err.message : String(err),
})
}

// Prepend memory context directly into the prompt so the LLM sees it as
// grounding context alongside the user message. steer() is designed for
// mid-stream interrupts and is not reliable as a pre-turn injection.
const promptContent = memoryContextBlock ? `${memoryContextBlock}\n\nUser message: ${content}` : content

const userMsg: AgentMessage = {
id: randomUUID(),
Expand Down Expand Up @@ -292,7 +363,7 @@ class PiAgentSession implements AgentSession {
}
}
if (errorMessages.length > 0) {
enqueue({ type: "error", error: errorMessages.join("; ") })
enqueue({ type: "error", error: `Agent errors: ${errorMessages.join("; ")}` })
}
enqueue({ type: "turn_end", id: randomUUID() })
done = true
Expand Down Expand Up @@ -335,9 +406,9 @@ class PiAgentSession implements AgentSession {
case "auto_retry_start":
log.warn("Auto-retry started (LLM error, retrying)", {
attempt: "attempt" in event ? event.attempt : undefined,
maxAttempts: "maxAttempts" in event ? event.maxAttempts : undefined,
delayMs: "delayMs" in event ? event.delayMs : undefined,
errorMessage: "errorMessage" in event ? event.errorMessage : undefined,
maxAttempts: "maxAttempts" in event ? (event.maxAttempts as number) : undefined,
delayMs: "delayMs" in event ? (event.delayMs as number) : undefined,
errorMessage: "errorMessage" in event ? (event.errorMessage as string) : undefined,
})
enqueue({
type: "auto_retry_start",
Expand All @@ -351,8 +422,8 @@ class PiAgentSession implements AgentSession {
case "auto_retry_end":
log.info("Auto-retry ended", {
success: "success" in event ? event.success : undefined,
attempt: "attempt" in event ? event.attempt : undefined,
finalError: "finalError" in event ? event.finalError : undefined,
attempt: "attempt" in event ? (event.attempt as number) : undefined,
finalError: "finalError" in event ? (event.finalError as string) : undefined,
})
enqueue({
type: "auto_retry_end",
Expand Down Expand Up @@ -389,7 +460,7 @@ class PiAgentSession implements AgentSession {
const promptPromise = session.prompt(promptContent).catch((err: unknown) => {
const errorMsg = err instanceof Error ? err.message : "Unknown agent error"
log.error("[DIAG] session.prompt() rejected", { error: errorMsg, ms: elapsed(), piEventSeq })
enqueue({ type: "error", error: errorMsg })
enqueue({ type: "error", error: `Agent error: ${errorMsg}` })
done = true
if (resolveWaiting) {
resolveWaiting()
Expand Down Expand Up @@ -453,6 +524,66 @@ class PiAgentSession implements AgentSession {
createdAt: new Date().toISOString(),
}
this.messages.push(assistantMsg)

// Record reinforcement signals for memories that were injected and used.
// This improves future retrieval by boosting the salience of helpful memories.
if (injectedMemoryIds.length > 0) {
try {
// Content-based correlation: check whether key words from each
// injected memory's content appear in the response. This avoids
// boosting unrelated high-salience facts just because the response
// was non-empty.
const responseWords = new Set(
fullResponseText
.toLowerCase()
.split(/\W+/)
.filter((w) => w.length > 3),
)
let usedCount = 0
let ignoredCount = 0

for (const entity of injectedMemoryEntities) {
const contentWords = entity.content
.toLowerCase()
.split(/\W+/)
.filter((w) => w.length > 3)
const overlap = contentWords.filter((w) => responseWords.has(w)).length
// Require at least 2 content words to appear in the response
// to count as "used". Single-word matches are too noisy.
const wasUsed = overlap >= 2
await this.memoryBridge.recordUsage(entity.id, wasUsed ? "used" : "ignored")
if (wasUsed) usedCount++
else ignoredCount++
}

log.info("Memory reinforcement recorded", {
usedCount,
ignoredCount,
totalInjected: injectedMemoryIds.length,
})
} catch (err) {
log.warn("Memory reinforcement failed (non-fatal)", {
error: err instanceof Error ? err.message : String(err),
})
}
}

// Post-turn memory ingestion: extract memorable facts from this exchange
// and store them so future conversations can recall them.
// This is non-blocking and non-critical — errors are logged and ignored.
const extractPromise = extractFacts(this.model, this.apiKey, content, fullResponseText)
.then((facts) => {
if (facts.length === 0) return
log.info("Memory: ingesting extracted facts", { count: facts.length })
return this.memoryBridge.ingestFacts(facts)
})
.catch((err: unknown) => {
log.warn("Memory ingestion failed (non-fatal)", {
error: err instanceof Error ? err.message : String(err),
})
})
this.inFlightExtracts.add(extractPromise)
extractPromise.finally(() => this.inFlightExtracts.delete(extractPromise))
}
}

Expand All @@ -469,6 +600,20 @@ class PiAgentSession implements AgentSession {
getMessages(): AgentMessage[] {
return this.messages
}

/** Shutdown the session — run GC and shut down the memory bridge. */
async dispose(): Promise<void> {
// Await any in-flight extractFacts promises to ensure no facts are dropped on shutdown
await Promise.allSettled([...this.inFlightExtracts])
try {
await this.memoryBridge.gc()
} catch (err) {
log.warn("Memory GC on dispose failed (non-fatal)", {
error: err instanceof Error ? err.message : String(err),
})
}
this.memoryBridge.shutdown()
}
}

export class PiAgentProvider implements AgentProvider {
Expand Down
Loading
Loading