diff --git a/deno.json b/deno.json index f78ff66..7ff1383 100644 --- a/deno.json +++ b/deno.json @@ -11,6 +11,7 @@ }, "imports": { "@std/assert": "jsr:@std/assert@^1.0.0", + "@std/path": "jsr:@std/path@^1.0.0", "zod": "npm:zod@^3.23.0" }, "compilerOptions": { diff --git a/deno.lock b/deno.lock index bf67beb..917a230 100644 --- a/deno.lock +++ b/deno.lock @@ -4,6 +4,7 @@ "jsr:@std/assert@*": "1.0.19", "jsr:@std/assert@1": "1.0.19", "jsr:@std/internal@^1.0.12": "1.0.12", + "jsr:@std/path@1": "1.1.4", "npm:zod@^3.23.0": "3.25.76" }, "jsr": { @@ -15,6 +16,12 @@ }, "@std/internal@1.0.12": { "integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027" + }, + "@std/path@1.1.4": { + "integrity": "1d2d43f39efb1b42f0b1882a25486647cb851481862dc7313390b2bb044314b5", + "dependencies": [ + "jsr:@std/internal" + ] } }, "npm": { @@ -25,6 +32,7 @@ "workspace": { "dependencies": [ "jsr:@std/assert@1", + "jsr:@std/path@1", "npm:zod@^3.23.0" ] } diff --git a/src/poller/loop.ts b/src/poller/loop.ts new file mode 100644 index 0000000..b9b072a --- /dev/null +++ b/src/poller/loop.ts @@ -0,0 +1,237 @@ +import type { Config } from "../config.ts"; +import type { GiteaClient } from "../gitea/client.ts"; +import { GiteaApiError } from "../gitea/client.ts"; +import type { NetbirdClient } from "../netbird/client.ts"; +import { DesiredStateSchema } from "../state/schema.ts"; +import type { DesiredState, SetupKeyConfig } from "../state/schema.ts"; +import { + type EnrollmentDetection, + processEnrollmentEvents, +} from "./poller.ts"; +import { loadPollerState, savePollerState } from "./state.ts"; + +// ----------------------------------------------------------------------------- +// Context +// ----------------------------------------------------------------------------- + +export interface PollerContext { + config: Config; + netbird: NetbirdClient; + gitea: GiteaClient; + reconcileInProgress: { value: boolean }; +} + +// ----------------------------------------------------------------------------- +// Single poll iteration +// ----------------------------------------------------------------------------- + +/** + * Runs one poll cycle: fetch desired state from Gitea, check for new + * enrollment events in NetBird, rename enrolled peers, and commit + * `enrolled: true` back to the repo. + * + * Skips entirely when a reconcile is in progress to avoid racing with + * the webhook-triggered reconciliation path. + */ +export async function pollOnce(ctx: PollerContext): Promise { + const { config, netbird, gitea, reconcileInProgress } = ctx; + + if (reconcileInProgress.value) { + console.log(JSON.stringify({ msg: "poll_skipped", reason: "reconcile_in_progress" })); + return; + } + + const pollerState = await loadPollerState(config.dataDir); + + // Fetch current desired state from Gitea (main branch) + const file = await gitea.getFileContent("netbird.json", "main"); + const desired: DesiredState = DesiredStateSchema.parse( + JSON.parse(file.content), + ); + + // Build set of unenrolled setup key names + const unenrolledKeys = new Set(); + for ( + const [name, key] of Object.entries(desired.setup_keys) as [ + string, + SetupKeyConfig, + ][] + ) { + if (!key.enrolled) { + unenrolledKeys.add(name); + } + } + + if (unenrolledKeys.size === 0) { + console.log(JSON.stringify({ msg: "poll_no_unenrolled_keys" })); + return; + } + + // Fetch events and detect enrollments + const events = await netbird.listEvents(); + const enrollments = processEnrollmentEvents( + events, + unenrolledKeys, + pollerState.lastEventTimestamp, + ); + + if (enrollments.length === 0) { + return; + } + + console.log(JSON.stringify({ + msg: "poll_enrollments_detected", + count: enrollments.length, + })); + + // Process each enrollment: rename peer, commit enrolled:true + let latestTimestamp = pollerState.lastEventTimestamp; + // Track the file SHA across iterations — each successful commit changes it + let currentSha = file.sha; + let currentDesired = desired; + + for (const enrollment of enrollments) { + await processEnrollment( + ctx, + enrollment, + currentDesired, + currentSha, + (newSha, newDesired) => { + currentSha = newSha; + currentDesired = newDesired; + }, + ); + + if (!latestTimestamp || enrollment.timestamp > latestTimestamp) { + latestTimestamp = enrollment.timestamp; + } + } + + // Persist the latest event timestamp so next poll skips processed events + await savePollerState(config.dataDir, { + lastEventTimestamp: latestTimestamp, + }); +} + +// ----------------------------------------------------------------------------- +// Process a single enrollment +// ----------------------------------------------------------------------------- + +/** + * Renames a peer to match its setup key name, then commits + * `enrolled: true` to the repo via Gitea's optimistic concurrency. + * + * If the file SHA has changed (409 conflict), logs a warning and moves + * on — the next poll cycle will retry with fresh state. + */ +async function processEnrollment( + ctx: PollerContext, + enrollment: EnrollmentDetection, + desired: DesiredState, + fileSha: string, + onCommit: (newSha: string, newDesired: DesiredState) => void, +): Promise { + const { netbird, gitea } = ctx; + const { setupKeyName, peerId, peerHostname } = enrollment; + + // Rename the peer to match the setup key name + try { + await netbird.updatePeer(peerId, { name: setupKeyName }); + console.log(JSON.stringify({ + msg: "peer_renamed", + peer_id: peerId, + from: peerHostname, + to: setupKeyName, + })); + } catch (err) { + console.error(JSON.stringify({ + msg: "peer_rename_failed", + peer_id: peerId, + error: err instanceof Error ? err.message : String(err), + })); + return; + } + + // Mark enrolled:true in desired state and commit via Gitea + const updated = structuredClone(desired); + updated.setup_keys[setupKeyName].enrolled = true; + const content = JSON.stringify(updated, null, 2); + + try { + await gitea.updateFile( + "netbird.json", + content, + fileSha, + `chore: mark ${setupKeyName} as enrolled`, + "main", + ); + + // Fetch the new SHA for subsequent commits in this poll cycle. + // The updateFile response from Gitea doesn't return the new blob SHA + // in a convenient form, so we re-read it. + const freshFile = await gitea.getFileContent("netbird.json", "main"); + onCommit(freshFile.sha, updated); + + console.log(JSON.stringify({ + msg: "enrollment_committed", + setup_key: setupKeyName, + })); + } catch (err) { + if (err instanceof GiteaApiError && err.status === 409) { + // SHA mismatch — file was modified between read and write. + // Will be retried on next poll cycle with fresh state. + console.warn(JSON.stringify({ + msg: "enrollment_commit_conflict", + setup_key: setupKeyName, + })); + return; + } + console.error(JSON.stringify({ + msg: "enrollment_commit_failed", + setup_key: setupKeyName, + error: err instanceof Error ? err.message : String(err), + })); + } +} + +// ----------------------------------------------------------------------------- +// Background loop +// ----------------------------------------------------------------------------- + +/** + * Starts the poller background loop on a configurable interval. + * + * Returns an AbortController — call `.abort()` for graceful shutdown. + * Errors in individual poll cycles are caught and logged without + * crashing the process. + */ +export function startPollerLoop(ctx: PollerContext): AbortController { + const controller = new AbortController(); + const intervalMs = ctx.config.pollIntervalSeconds * 1000; + + const run = async () => { + if (controller.signal.aborted) return; + try { + await pollOnce(ctx); + } catch (err) { + console.error(JSON.stringify({ + msg: "poll_error", + error: err instanceof Error ? err.message : String(err), + })); + } + }; + + const id = setInterval(run, intervalMs); + + controller.signal.addEventListener("abort", () => { + clearInterval(id); + console.log(JSON.stringify({ msg: "poller_stopped" })); + }); + + console.log(JSON.stringify({ + msg: "poller_started", + interval_seconds: ctx.config.pollIntervalSeconds, + })); + + return controller; +} diff --git a/src/poller/state.ts b/src/poller/state.ts new file mode 100644 index 0000000..f62e054 --- /dev/null +++ b/src/poller/state.ts @@ -0,0 +1,24 @@ +import { join } from "@std/path"; + +export interface PollerState { + lastEventTimestamp: string | null; +} + +export async function loadPollerState(dataDir: string): Promise { + const path = join(dataDir, "poller-state.json"); + try { + const text = await Deno.readTextFile(path); + return JSON.parse(text) as PollerState; + } catch { + return { lastEventTimestamp: null }; + } +} + +export async function savePollerState( + dataDir: string, + state: PollerState, +): Promise { + const path = join(dataDir, "poller-state.json"); + await Deno.mkdir(dataDir, { recursive: true }); + await Deno.writeTextFile(path, JSON.stringify(state, null, 2)); +}