feat: add HTTP server with /reconcile, /sync-events, /health endpoints

This commit is contained in:
Prox 2026-03-04 00:24:29 +02:00
parent 4041984159
commit a615d24ba2
2 changed files with 276 additions and 3 deletions

View File

@ -1,13 +1,47 @@
import { ZodError } from "zod";
import { loadConfig } from "./config.ts";
import { NetbirdClient } from "./netbird/client.ts";
import { GiteaClient } from "./gitea/client.ts";
import { createHandler } from "./server.ts";
import { startPollerLoop } from "./poller/loop.ts";
let config;
try {
const config = loadConfig();
console.log(JSON.stringify({ msg: "starting", port: config.port }));
config = loadConfig();
} catch (err) {
if (err instanceof ZodError) {
console.error(JSON.stringify({ msg: "invalid config", issues: err.issues }));
console.error(
JSON.stringify({ msg: "invalid config", issues: err.issues }),
);
Deno.exit(1);
}
throw err;
}
const netbird = new NetbirdClient(config.netbirdApiUrl, config.netbirdApiToken);
const gitea = new GiteaClient(
config.giteaUrl,
config.giteaToken,
config.giteaRepo,
);
const reconcileInProgress = { value: false };
// Start background poller
const pollerAbort = startPollerLoop({
config,
netbird,
gitea,
reconcileInProgress,
});
// Start HTTP server
const handler = createHandler({ config, netbird, gitea, reconcileInProgress });
console.log(JSON.stringify({ msg: "starting", port: config.port }));
Deno.serve({ port: config.port, handler });
// Graceful shutdown
Deno.addSignalListener("SIGTERM", () => {
console.log(JSON.stringify({ msg: "shutting_down" }));
pollerAbort.abort();
Deno.exit(0);
});

239
src/server.ts Normal file
View File

@ -0,0 +1,239 @@
import type { Config } from "./config.ts";
import type { NetbirdClient } from "./netbird/client.ts";
import type { GiteaClient } from "./gitea/client.ts";
import {
DesiredStateSchema,
validateCrossReferences,
} from "./state/schema.ts";
import { fetchActualState } from "./state/actual.ts";
import { computeDiff } from "./reconcile/diff.ts";
import { executeOperations } from "./reconcile/executor.ts";
import type { OperationResult } from "./reconcile/operations.ts";
import { pollOnce, type PollerContext } from "./poller/loop.ts";
export interface ServerContext {
config: Config;
netbird: NetbirdClient;
gitea: GiteaClient;
reconcileInProgress: { value: boolean };
}
export function createHandler(
ctx: ServerContext,
): (req: Request) => Promise<Response> {
return async (req: Request): Promise<Response> => {
const url = new URL(req.url);
// Health check — no auth required
if (url.pathname === "/health" && req.method === "GET") {
return Response.json({ status: "ok" });
}
// All other endpoints require bearer token auth
const authHeader = req.headers.get("Authorization");
if (authHeader !== `Bearer ${ctx.config.reconcilerToken}`) {
return Response.json({ error: "unauthorized" }, { status: 401 });
}
if (url.pathname === "/reconcile" && req.method === "POST") {
return handleReconcile(req, url, ctx);
}
if (url.pathname === "/sync-events" && req.method === "POST") {
return handleSyncEvents(ctx);
}
return Response.json({ error: "not found" }, { status: 404 });
};
}
// -----------------------------------------------------------------------------
// /reconcile
// -----------------------------------------------------------------------------
async function handleReconcile(
req: Request,
url: URL,
ctx: ServerContext,
): Promise<Response> {
const dryRun = url.searchParams.get("dry_run") === "true";
// Parse and validate the desired state from the request body
let body: unknown;
try {
body = await req.json();
} catch {
return Response.json(
{ status: "error", error: "invalid JSON body" },
{ status: 400 },
);
}
const parseResult = DesiredStateSchema.safeParse(body);
if (!parseResult.success) {
return Response.json(
{
status: "error",
error: "schema validation failed",
issues: parseResult.error.issues,
},
{ status: 400 },
);
}
const desired = parseResult.data;
// Cross-reference validation (e.g. group refs in policies exist)
const crossRefErrors = validateCrossReferences(desired);
if (crossRefErrors.length > 0) {
return Response.json(
{
status: "error",
error: "cross-reference validation failed",
issues: crossRefErrors,
},
{ status: 400 },
);
}
ctx.reconcileInProgress.value = true;
try {
const actual = await fetchActualState(ctx.netbird);
const ops = computeDiff(desired, actual);
if (dryRun) {
return Response.json({
status: "planned",
operations: ops.map((op) => ({
type: op.type,
name: op.name,
})),
summary: summarize(ops),
});
}
if (ops.length === 0) {
return Response.json({
status: "applied",
operations: [],
created_keys: {},
summary: { created: 0, updated: 0, deleted: 0, failed: 0 },
});
}
const { results, createdKeys } = await executeOperations(
ops,
ctx.netbird,
actual,
);
// Convert Map to plain object for JSON serialization
const createdKeysObj: Record<string, string> = {};
for (const [name, key] of createdKeys) {
createdKeysObj[name] = key;
}
return Response.json({
status: "applied",
operations: results.map((r) => ({
type: r.type,
name: r.name,
status: r.status,
})),
created_keys: createdKeysObj,
summary: summarize(results),
});
} catch (err) {
console.error(
JSON.stringify({
msg: "reconcile_error",
error: err instanceof Error ? err.message : String(err),
}),
);
return Response.json(
{
status: "error",
error: err instanceof Error ? err.message : String(err),
},
{ status: 500 },
);
} finally {
ctx.reconcileInProgress.value = false;
}
}
// -----------------------------------------------------------------------------
// /sync-events
// -----------------------------------------------------------------------------
/**
* Forces a single poll cycle. Temporarily clears reconcileInProgress so
* pollOnce doesn't skip, then restores it afterward.
*/
async function handleSyncEvents(ctx: ServerContext): Promise<Response> {
const pollerCtx: PollerContext = {
config: ctx.config,
netbird: ctx.netbird,
gitea: ctx.gitea,
reconcileInProgress: { value: false },
};
try {
await pollOnce(pollerCtx);
return Response.json({ status: "synced" });
} catch (err) {
console.error(
JSON.stringify({
msg: "sync_events_error",
error: err instanceof Error ? err.message : String(err),
}),
);
return Response.json(
{
status: "error",
error: err instanceof Error ? err.message : String(err),
},
{ status: 500 },
);
}
}
// -----------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------
interface Summary {
created: number;
updated: number;
deleted: number;
failed: number;
}
/**
* Counts operations by category. Works on both raw Operation[] (for dry-run
* plans) and OperationResult[] (for executed results where failures are
* tallied separately).
*/
function summarize(ops: Array<{ type: string; status?: string }>): Summary {
let created = 0;
let updated = 0;
let deleted = 0;
let failed = 0;
for (const op of ops) {
if ((op as OperationResult).status === "failed") {
failed++;
continue;
}
if (op.type.startsWith("create_")) {
created++;
} else if (op.type.startsWith("update_") || op.type === "rename_peer") {
updated++;
} else if (op.type.startsWith("delete_")) {
deleted++;
}
}
return { created, updated, deleted, failed };
}