feat: add poller background loop with Gitea state commit
This commit is contained in:
parent
122db3540f
commit
4041984159
@ -11,6 +11,7 @@
|
||||
},
|
||||
"imports": {
|
||||
"@std/assert": "jsr:@std/assert@^1.0.0",
|
||||
"@std/path": "jsr:@std/path@^1.0.0",
|
||||
"zod": "npm:zod@^3.23.0"
|
||||
},
|
||||
"compilerOptions": {
|
||||
|
||||
8
deno.lock
generated
8
deno.lock
generated
@ -4,6 +4,7 @@
|
||||
"jsr:@std/assert@*": "1.0.19",
|
||||
"jsr:@std/assert@1": "1.0.19",
|
||||
"jsr:@std/internal@^1.0.12": "1.0.12",
|
||||
"jsr:@std/path@1": "1.1.4",
|
||||
"npm:zod@^3.23.0": "3.25.76"
|
||||
},
|
||||
"jsr": {
|
||||
@ -15,6 +16,12 @@
|
||||
},
|
||||
"@std/internal@1.0.12": {
|
||||
"integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027"
|
||||
},
|
||||
"@std/path@1.1.4": {
|
||||
"integrity": "1d2d43f39efb1b42f0b1882a25486647cb851481862dc7313390b2bb044314b5",
|
||||
"dependencies": [
|
||||
"jsr:@std/internal"
|
||||
]
|
||||
}
|
||||
},
|
||||
"npm": {
|
||||
@ -25,6 +32,7 @@
|
||||
"workspace": {
|
||||
"dependencies": [
|
||||
"jsr:@std/assert@1",
|
||||
"jsr:@std/path@1",
|
||||
"npm:zod@^3.23.0"
|
||||
]
|
||||
}
|
||||
|
||||
237
src/poller/loop.ts
Normal file
237
src/poller/loop.ts
Normal file
@ -0,0 +1,237 @@
|
||||
import type { Config } from "../config.ts";
|
||||
import type { GiteaClient } from "../gitea/client.ts";
|
||||
import { GiteaApiError } from "../gitea/client.ts";
|
||||
import type { NetbirdClient } from "../netbird/client.ts";
|
||||
import { DesiredStateSchema } from "../state/schema.ts";
|
||||
import type { DesiredState, SetupKeyConfig } from "../state/schema.ts";
|
||||
import {
|
||||
type EnrollmentDetection,
|
||||
processEnrollmentEvents,
|
||||
} from "./poller.ts";
|
||||
import { loadPollerState, savePollerState } from "./state.ts";
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Context
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
export interface PollerContext {
|
||||
config: Config;
|
||||
netbird: NetbirdClient;
|
||||
gitea: GiteaClient;
|
||||
reconcileInProgress: { value: boolean };
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Single poll iteration
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Runs one poll cycle: fetch desired state from Gitea, check for new
|
||||
* enrollment events in NetBird, rename enrolled peers, and commit
|
||||
* `enrolled: true` back to the repo.
|
||||
*
|
||||
* Skips entirely when a reconcile is in progress to avoid racing with
|
||||
* the webhook-triggered reconciliation path.
|
||||
*/
|
||||
export async function pollOnce(ctx: PollerContext): Promise<void> {
|
||||
const { config, netbird, gitea, reconcileInProgress } = ctx;
|
||||
|
||||
if (reconcileInProgress.value) {
|
||||
console.log(JSON.stringify({ msg: "poll_skipped", reason: "reconcile_in_progress" }));
|
||||
return;
|
||||
}
|
||||
|
||||
const pollerState = await loadPollerState(config.dataDir);
|
||||
|
||||
// Fetch current desired state from Gitea (main branch)
|
||||
const file = await gitea.getFileContent("netbird.json", "main");
|
||||
const desired: DesiredState = DesiredStateSchema.parse(
|
||||
JSON.parse(file.content),
|
||||
);
|
||||
|
||||
// Build set of unenrolled setup key names
|
||||
const unenrolledKeys = new Set<string>();
|
||||
for (
|
||||
const [name, key] of Object.entries(desired.setup_keys) as [
|
||||
string,
|
||||
SetupKeyConfig,
|
||||
][]
|
||||
) {
|
||||
if (!key.enrolled) {
|
||||
unenrolledKeys.add(name);
|
||||
}
|
||||
}
|
||||
|
||||
if (unenrolledKeys.size === 0) {
|
||||
console.log(JSON.stringify({ msg: "poll_no_unenrolled_keys" }));
|
||||
return;
|
||||
}
|
||||
|
||||
// Fetch events and detect enrollments
|
||||
const events = await netbird.listEvents();
|
||||
const enrollments = processEnrollmentEvents(
|
||||
events,
|
||||
unenrolledKeys,
|
||||
pollerState.lastEventTimestamp,
|
||||
);
|
||||
|
||||
if (enrollments.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(JSON.stringify({
|
||||
msg: "poll_enrollments_detected",
|
||||
count: enrollments.length,
|
||||
}));
|
||||
|
||||
// Process each enrollment: rename peer, commit enrolled:true
|
||||
let latestTimestamp = pollerState.lastEventTimestamp;
|
||||
// Track the file SHA across iterations — each successful commit changes it
|
||||
let currentSha = file.sha;
|
||||
let currentDesired = desired;
|
||||
|
||||
for (const enrollment of enrollments) {
|
||||
await processEnrollment(
|
||||
ctx,
|
||||
enrollment,
|
||||
currentDesired,
|
||||
currentSha,
|
||||
(newSha, newDesired) => {
|
||||
currentSha = newSha;
|
||||
currentDesired = newDesired;
|
||||
},
|
||||
);
|
||||
|
||||
if (!latestTimestamp || enrollment.timestamp > latestTimestamp) {
|
||||
latestTimestamp = enrollment.timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
// Persist the latest event timestamp so next poll skips processed events
|
||||
await savePollerState(config.dataDir, {
|
||||
lastEventTimestamp: latestTimestamp,
|
||||
});
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Process a single enrollment
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Renames a peer to match its setup key name, then commits
|
||||
* `enrolled: true` to the repo via Gitea's optimistic concurrency.
|
||||
*
|
||||
* If the file SHA has changed (409 conflict), logs a warning and moves
|
||||
* on — the next poll cycle will retry with fresh state.
|
||||
*/
|
||||
async function processEnrollment(
|
||||
ctx: PollerContext,
|
||||
enrollment: EnrollmentDetection,
|
||||
desired: DesiredState,
|
||||
fileSha: string,
|
||||
onCommit: (newSha: string, newDesired: DesiredState) => void,
|
||||
): Promise<void> {
|
||||
const { netbird, gitea } = ctx;
|
||||
const { setupKeyName, peerId, peerHostname } = enrollment;
|
||||
|
||||
// Rename the peer to match the setup key name
|
||||
try {
|
||||
await netbird.updatePeer(peerId, { name: setupKeyName });
|
||||
console.log(JSON.stringify({
|
||||
msg: "peer_renamed",
|
||||
peer_id: peerId,
|
||||
from: peerHostname,
|
||||
to: setupKeyName,
|
||||
}));
|
||||
} catch (err) {
|
||||
console.error(JSON.stringify({
|
||||
msg: "peer_rename_failed",
|
||||
peer_id: peerId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark enrolled:true in desired state and commit via Gitea
|
||||
const updated = structuredClone(desired);
|
||||
updated.setup_keys[setupKeyName].enrolled = true;
|
||||
const content = JSON.stringify(updated, null, 2);
|
||||
|
||||
try {
|
||||
await gitea.updateFile(
|
||||
"netbird.json",
|
||||
content,
|
||||
fileSha,
|
||||
`chore: mark ${setupKeyName} as enrolled`,
|
||||
"main",
|
||||
);
|
||||
|
||||
// Fetch the new SHA for subsequent commits in this poll cycle.
|
||||
// The updateFile response from Gitea doesn't return the new blob SHA
|
||||
// in a convenient form, so we re-read it.
|
||||
const freshFile = await gitea.getFileContent("netbird.json", "main");
|
||||
onCommit(freshFile.sha, updated);
|
||||
|
||||
console.log(JSON.stringify({
|
||||
msg: "enrollment_committed",
|
||||
setup_key: setupKeyName,
|
||||
}));
|
||||
} catch (err) {
|
||||
if (err instanceof GiteaApiError && err.status === 409) {
|
||||
// SHA mismatch — file was modified between read and write.
|
||||
// Will be retried on next poll cycle with fresh state.
|
||||
console.warn(JSON.stringify({
|
||||
msg: "enrollment_commit_conflict",
|
||||
setup_key: setupKeyName,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
console.error(JSON.stringify({
|
||||
msg: "enrollment_commit_failed",
|
||||
setup_key: setupKeyName,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Background loop
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Starts the poller background loop on a configurable interval.
|
||||
*
|
||||
* Returns an AbortController — call `.abort()` for graceful shutdown.
|
||||
* Errors in individual poll cycles are caught and logged without
|
||||
* crashing the process.
|
||||
*/
|
||||
export function startPollerLoop(ctx: PollerContext): AbortController {
|
||||
const controller = new AbortController();
|
||||
const intervalMs = ctx.config.pollIntervalSeconds * 1000;
|
||||
|
||||
const run = async () => {
|
||||
if (controller.signal.aborted) return;
|
||||
try {
|
||||
await pollOnce(ctx);
|
||||
} catch (err) {
|
||||
console.error(JSON.stringify({
|
||||
msg: "poll_error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
const id = setInterval(run, intervalMs);
|
||||
|
||||
controller.signal.addEventListener("abort", () => {
|
||||
clearInterval(id);
|
||||
console.log(JSON.stringify({ msg: "poller_stopped" }));
|
||||
});
|
||||
|
||||
console.log(JSON.stringify({
|
||||
msg: "poller_started",
|
||||
interval_seconds: ctx.config.pollIntervalSeconds,
|
||||
}));
|
||||
|
||||
return controller;
|
||||
}
|
||||
24
src/poller/state.ts
Normal file
24
src/poller/state.ts
Normal file
@ -0,0 +1,24 @@
|
||||
import { join } from "@std/path";
|
||||
|
||||
export interface PollerState {
|
||||
lastEventTimestamp: string | null;
|
||||
}
|
||||
|
||||
export async function loadPollerState(dataDir: string): Promise<PollerState> {
|
||||
const path = join(dataDir, "poller-state.json");
|
||||
try {
|
||||
const text = await Deno.readTextFile(path);
|
||||
return JSON.parse(text) as PollerState;
|
||||
} catch {
|
||||
return { lastEventTimestamp: null };
|
||||
}
|
||||
}
|
||||
|
||||
export async function savePollerState(
|
||||
dataDir: string,
|
||||
state: PollerState,
|
||||
): Promise<void> {
|
||||
const path = join(dataDir, "poller-state.json");
|
||||
await Deno.mkdir(dataDir, { recursive: true });
|
||||
await Deno.writeTextFile(path, JSON.stringify(state, null, 2));
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user