Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions packages/guardrails/profile/plugins/team.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const live = new Map<string, Run>()
const seen = new WeakMap<object, Seen>()
const sweeping = new Map<string, Promise<void>>()
const models = new Map<string, Lane>()
const sweepWait = 1000
const rulesWait = 2000

type Need = {
done: boolean
Expand Down Expand Up @@ -1068,6 +1070,18 @@ async function sweep(client: Client, dir: string) {
return task
}

async function bounded<T>(task: Promise<T>, millis: number) {
return Promise.race([task, Bun.sleep(millis).then(() => undefined)])
}

async function presweep(client: Client, dir: string) {
await bounded(sweep(client, dir), sweepWait)
}
Comment on lines +1077 to +1079

async function parentRules(client: Client, ctx: Pick<Ctx, "sessionID" | "directory">) {
return (await bounded(rules(client, ctx), rulesWait)) ?? []
}
Comment on lines +1081 to +1083

function note(run: Run) {
const head = [`run_id: ${run.id}`, `type: ${run.kind}`, `state: ${run.state}`]
const list = run.tasks.map((item) =>
Expand Down Expand Up @@ -1398,21 +1412,8 @@ export default async function team(input: { client: Client; worktree: string; di
const canIsolate = Boolean(ctx.worktree && ctx.worktree !== "/")
const strategy = args.strategy ?? "parallel"
const limit = args.limit ?? cap
await sweep(input.client, runRoot)
defs(args.tasks)
if (args.tasks.length < 1) throw new Error("team requires at least one task")
const req = {
...ctx,
permission: await rules(input.client, ctx),
}
req.metadata({
title: "team run",
metadata: {
tasks: args.tasks.length,
strategy,
},
})

const run: Run = {
id: crypto.randomUUID(),
kind: "team",
Expand Down Expand Up @@ -1455,6 +1456,19 @@ export default async function team(input: { client: Client; worktree: string; di
}),
}
await save(runRoot, run)
ctx.metadata({
title: "team run",
metadata: {
run_id: run.id,
tasks: args.tasks.length,
strategy,
},
})
await presweep(input.client, runRoot)
const req = {
...ctx,
permission: await parentRules(input.client, ctx),
}

const done = new Set<string>()
const list = run.tasks
Expand Down Expand Up @@ -1535,12 +1549,6 @@ export default async function team(input: { client: Client; worktree: string; di
const runRoot = projectRoot(ctx.directory, ctx.worktree)
const canIsolate = Boolean(ctx.worktree && ctx.worktree !== "/")
const detachedAbort = new AbortController()
await sweep(input.client, runRoot)
const req = {
...ctx,
abort: detachedAbort.signal,
permission: await rules(input.client, ctx),
}
const step: Step = {
id: slug(args.description || args.agent || "worker") || "worker",
description: args.description || "background worker",
Expand Down Expand Up @@ -1579,12 +1587,18 @@ export default async function team(input: { client: Client; worktree: string; di
tasks: [step],
}
await save(runRoot, run)
req.metadata({
ctx.metadata({
title: args.description || "background run",
metadata: {
run_id: run.id,
},
})
await presweep(input.client, runRoot)
const req = {
...ctx,
abort: detachedAbort.signal,
permission: await parentRules(input.client, ctx),
}

const task = job(req, run, step)
.then(async () => {
Expand Down Expand Up @@ -1645,7 +1659,7 @@ export default async function team(input: { client: Client; worktree: string; di
},
async execute(args, ctx) {
const runRoot = projectRoot(ctx.directory, ctx.worktree)
await sweep(input.client, runRoot)
await presweep(input.client, runRoot)
const list = args.run_id
? [live.get(args.run_id) ?? (await load(runRoot, args.run_id))].filter(isRun)
: await scan(runRoot)
Expand Down
144 changes: 144 additions & 0 deletions packages/opencode/test/plugin/team.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,150 @@ test("team uses default limit when caller omits limit", async () => {
expect(out).toContain("default-limit: done")
})

test("team creates run before stale sweep can block launch", async () => {
await using tmp = await tmpdir({
git: true,
init: async (dir) => {
await Bun.write(path.join(dir, "README.md"), "# test\n")
await fs.mkdir(path.join(dir, ".opencode", "guardrails", "team-runs"), { recursive: true })
await Bun.write(
path.join(dir, ".opencode", "guardrails", "team-runs", "stale-run.json"),
JSON.stringify(
{
id: "stale-run",
kind: "team",
state: "running",
session: "ses_parent",
directory: dir,
created_at: new Date(0).toISOString(),
updated_at: new Date(0).toISOString(),
tasks: [
{
id: "stale",
description: "stale",
prompt: "old work",
depends: [],
agent: "general",
write: false,
worktree: false,
provider: "openai",
model: "gpt-5.5",
variant: "high",
state: "running",
dir,
session: "ses_child_stale_blocked",
patch: "",
output: "",
error: "",
},
],
},
null,
2,
) + "\n",
)
await Bun.$`git add README.md`.cwd(dir).quiet()
await Bun.$`git commit -m "seed"`.cwd(dir).quiet()
},
})

const metadata: Record<string, unknown>[] = []
const plugin = await team({
client: {
permission: {
async list() {
return { data: [] }
},
},
question: {
async list() {
return { data: [] }
},
},
session: {
async get() {
return { data: { permission: [] } }
},
async create() {
return { data: { id: "ses_child_fresh_after_blocked_sweep" } }
},
async promptAsync() {
return {}
},
async prompt() {
return {}
},
async status() {
return {
data: {
ses_child_stale_blocked: { type: "busy" },
ses_child_fresh_after_blocked_sweep: { type: "idle" },
},
}
},
async messages(input) {
if (input.path.id === "ses_child_stale_blocked") await new Promise(() => {})
return {
data: [
{
info: {
role: "assistant",
time: { completed: Date.now() },
},
parts: [
{
type: "text",
text: "fresh work completed",
},
],
},
],
}
},
async abort() {
return {}
},
},
},
worktree: tmp.path,
directory: tmp.path,
})

const out = await plugin.tool.team.execute(
{
strategy: "parallel",
limit: 1,
tasks: [
{
id: "fresh",
prompt: "inspect current work",
write: false,
worktree: false,
},
],
},
{
sessionID: "ses_parent",
messageID: "msg_parent",
agent: "implement",
directory: tmp.path,
worktree: tmp.path,
abort: new AbortController().signal,
ask: async () => {},
metadata(input) {
metadata.push(input.metadata ?? {})
},
},
)

expect(out).toContain("run_id:")
expect(out).toContain("- fresh: done")
expect(metadata.some((item) => typeof item.run_id === "string")).toBe(true)

const runs = await fs.readdir(path.join(tmp.path, ".opencode", "guardrails", "team-runs"))
expect(runs.length).toBeGreaterThanOrEqual(2)
})

test("team worker model inherits the parent session model", async () => {
await using tmp = await tmpdir({
git: true,
Expand Down
Loading