a81a450e7e
Merged into tinqs/studio: - cmd/tinqs-cli/ — tinqs-cli (Go binary, from bot/cli) - cmd/tea/ — Gitea CLI tool (from tinqs/cli-tea) - services/bot/ — Bot service (from tinqs-ltd/bot on git.arikigame.com) - services/admin/ — Admin panel (from tinqs/admin) - services/team-tool/ — Team Tool (from tinqs/team-tool) - services/proxy/ — tinqs-proxy (from bot/proxy) - web/landing/ — tinqs.com website (from tinqs/website) - web/docs/ — Platform docs (from tinqs/docs) - web/blog/ — Blog (placeholder) - runner/ — Ephemeral CI runner (from tinqs/runner) All source repos will be deleted after verification.
662 lines
22 KiB
TypeScript
662 lines
22 KiB
TypeScript
/**
|
|
* Redis-backed inference job queue with worker pool and SSE bridge.
|
|
*
|
|
* Architecture:
|
|
* 1. HTTP handler preprocesses the request (auth, vision, reasoning cache)
|
|
* 2. Handler subscribes to a Redis pub/sub channel for this job
|
|
* 3. Handler enqueues the job (Redis RPUSH)
|
|
* 4. Worker pool (BLPOP, bounded concurrency) picks up the job
|
|
* 5. Worker calls DeepSeek (with backoff/retry)
|
|
* 6. Worker publishes response chunks (streaming) or full result (non-streaming)
|
|
* back to the job channel via Redis pub/sub
|
|
* 7. Handler receives messages and pipes them to the client as SSE
|
|
*
|
|
* No new dependencies — uses existing ioredis from lib/db.
|
|
*/
|
|
|
|
import type Redis from "ioredis";
|
|
import { redis } from "@/lib/db";
|
|
import { fetchDeepseekWithBackoff } from "@/lib/deepseek-fetch";
|
|
import {
|
|
runImageGenLoop,
|
|
type ChatBody,
|
|
type ChatResponse,
|
|
} from "@/lib/image-gen-tool";
|
|
import { resolveDesignContext } from "@/lib/image-gen-context";
|
|
import { storeReasoning, type AssistantMessage } from "@/lib/reasoning-cache";
|
|
import { StreamAccumulator, accumulateSSE } from "@/lib/reasoning-stream";
|
|
import { logInference } from "@/lib/inference-log";
|
|
import type { VisionPreprocessStats } from "@/lib/vision-preprocess";
|
|
|
|
// ── Config ──────────────────────────────────────────────────
|
|
|
|
// DeepSeek allows ~50 concurrent requests per key (dynamic, load-based).
|
|
// Default 10 workers = plenty for a small team while staying well under
|
|
// the cap. Tune via env if needed.
|
|
const MAX_WORKERS = Number(process.env.DEEPSEEK_MAX_CONCURRENT) || 10;
|
|
const QUEUE_KEY = "inference:queue:jobs";
|
|
const JOB_CHANNEL_PREFIX = "inference:job:";
|
|
const JOB_TIMEOUT_MS = 300_000; // 5 min, matches Next.js maxDuration
|
|
|
|
// ── Job type ────────────────────────────────────────────────
|
|
|
|
export interface InferenceJob {
|
|
id: string;
|
|
targetUrl: string;
|
|
headers: Record<string, string>;
|
|
body: Record<string, unknown>;
|
|
isStream: boolean;
|
|
imageGenEnabled: boolean;
|
|
user: string;
|
|
model: string;
|
|
source: string;
|
|
subPath: string;
|
|
scope: string; // reasoning cache userScope
|
|
startTime: number;
|
|
patchedReasoningMessages: number;
|
|
missingReasoningMessages: number;
|
|
visionStats: VisionPreprocessStats | null;
|
|
}
|
|
|
|
// ── Pub/sub message protocol ────────────────────────────────
|
|
// All messages are JSON. The `t` field discriminates:
|
|
// "c" = SSE chunk (streaming) { t:"c", d:"data: {...}\n\n" }
|
|
// "r" = full result { t:"r", s:200, d:"{...}", ct:"application/json" }
|
|
// "e" = error { t:"e", s:502, m:"error text" }
|
|
// "x" = done (end of messages) { t:"x" }
|
|
|
|
type QueueMessage =
|
|
| { t: "c"; d: string }
|
|
| { t: "r"; s: number; d: string; ct: string }
|
|
| { t: "e"; s: number; m: string }
|
|
| { t: "x" };
|
|
|
|
// ── Enqueue ─────────────────────────────────────────────────
|
|
|
|
export async function enqueueJob(job: InferenceJob): Promise<void> {
|
|
if (!redis) throw new Error("Redis unavailable — cannot enqueue");
|
|
console.log(`[queue] enqueue ${job.id}`);
|
|
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
|
|
}
|
|
|
|
// ── Subscribe (streaming) ───────────────────────────────────
|
|
// Returns a ReadableStream that pipes SSE chunks from the worker to the client,
|
|
// plus a `ready` promise that resolves once the subscription is active
|
|
// (must await before enqueuing to avoid missing messages).
|
|
|
|
export function subscribeStream(jobId: string): {
|
|
stream: ReadableStream<Uint8Array>;
|
|
ready: Promise<void>;
|
|
} {
|
|
const encoder = new TextEncoder();
|
|
const channel = `${JOB_CHANNEL_PREFIX}${jobId}`;
|
|
|
|
let sub: Redis | null = null;
|
|
let timeoutId: ReturnType<typeof setTimeout> | null = null;
|
|
let resolveReady!: () => void;
|
|
const ready = new Promise<void>((r) => {
|
|
resolveReady = r;
|
|
});
|
|
|
|
function cleanup() {
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId);
|
|
timeoutId = null;
|
|
}
|
|
if (sub) {
|
|
try { sub.disconnect(); } catch {}
|
|
sub = null;
|
|
}
|
|
}
|
|
|
|
const stream = new ReadableStream<Uint8Array>({
|
|
start(controller) {
|
|
if (!redis) {
|
|
controller.close();
|
|
resolveReady();
|
|
return;
|
|
}
|
|
|
|
sub = redis.duplicate();
|
|
|
|
sub.on("error", (err) => {
|
|
console.error(`[queue] subscribeStream sub error:`, err);
|
|
});
|
|
|
|
sub.subscribe(channel, (err) => {
|
|
if (err) {
|
|
console.error("[queue] subscribeStream subscribe failed:", err);
|
|
try { controller.error(err); } catch {}
|
|
cleanup();
|
|
resolveReady();
|
|
return;
|
|
}
|
|
console.log(`[queue] subscribeStream subscribed to ${channel}`);
|
|
resolveReady(); // subscription is live — safe to enqueue now
|
|
});
|
|
|
|
sub.on("message", (_ch: string, raw: string) => {
|
|
try {
|
|
const msg: QueueMessage = JSON.parse(raw);
|
|
switch (msg.t) {
|
|
case "c":
|
|
controller.enqueue(encoder.encode(msg.d));
|
|
break;
|
|
case "r":
|
|
// Non-streaming result on a streaming subscription (e.g. DeepSeek
|
|
// returned an error body). Forward it then close.
|
|
controller.enqueue(encoder.encode(msg.d));
|
|
try { controller.close(); } catch {}
|
|
cleanup();
|
|
break;
|
|
case "e":
|
|
// Send error as an SSE-formatted event so Cursor can display it
|
|
controller.enqueue(
|
|
encoder.encode(
|
|
`data: ${JSON.stringify({ error: { message: msg.m, type: "proxy_error" } })}\n\ndata: [DONE]\n\n`,
|
|
),
|
|
);
|
|
try { controller.close(); } catch {}
|
|
cleanup();
|
|
break;
|
|
case "x":
|
|
try { controller.close(); } catch {}
|
|
cleanup();
|
|
break;
|
|
}
|
|
} catch (e) {
|
|
console.error("[queue] message parse error:", e);
|
|
}
|
|
});
|
|
|
|
// Safety timeout — don't hold the connection forever
|
|
timeoutId = setTimeout(() => {
|
|
console.warn(`[queue] stream subscription ${jobId} timed out`);
|
|
try { controller.close(); } catch {}
|
|
cleanup();
|
|
}, JOB_TIMEOUT_MS);
|
|
},
|
|
cancel() {
|
|
cleanup();
|
|
},
|
|
});
|
|
|
|
return { stream, ready };
|
|
}
|
|
|
|
// ── Subscribe (non-streaming / tool-loop result) ────────────
|
|
|
|
export function subscribeResult(jobId: string): {
|
|
result: Promise<{ status: number; body: string; ct: string }>;
|
|
ready: Promise<void>;
|
|
} {
|
|
const channel = `${JOB_CHANNEL_PREFIX}${jobId}`;
|
|
|
|
let sub: Redis | null = null;
|
|
let resolveReady!: () => void;
|
|
const ready = new Promise<void>((r) => {
|
|
resolveReady = r;
|
|
});
|
|
|
|
const result = new Promise<{ status: number; body: string; ct: string }>(
|
|
(resolve, reject) => {
|
|
if (!redis) {
|
|
resolveReady();
|
|
reject(new Error("Redis unavailable"));
|
|
return;
|
|
}
|
|
|
|
sub = redis.duplicate();
|
|
|
|
const timeout = setTimeout(() => {
|
|
if (sub) {
|
|
try { sub.disconnect(); } catch {}
|
|
sub = null;
|
|
}
|
|
reject(new Error("Queue timeout (300s)"));
|
|
}, JOB_TIMEOUT_MS);
|
|
|
|
sub.on("error", (err) => {
|
|
console.error(`[queue] subscribeResult sub error:`, err);
|
|
});
|
|
|
|
sub.subscribe(channel, (err) => {
|
|
if (err) {
|
|
console.error(`[queue] subscribeResult subscribe failed:`, err);
|
|
clearTimeout(timeout);
|
|
if (sub) {
|
|
try { sub.disconnect(); } catch {}
|
|
sub = null;
|
|
}
|
|
resolveReady();
|
|
reject(err);
|
|
return;
|
|
}
|
|
console.log(`[queue] subscribeResult subscribed to ${channel}`);
|
|
resolveReady(); // subscription is live
|
|
});
|
|
|
|
sub.on("message", (_ch: string, raw: string) => {
|
|
console.log(`[queue] subscribeResult got message on ${_ch}: t=${raw.slice(0, 50)}`);
|
|
try {
|
|
const msg: QueueMessage = JSON.parse(raw);
|
|
switch (msg.t) {
|
|
case "r":
|
|
clearTimeout(timeout);
|
|
console.log(`[queue] subscribeResult resolving result (status=${msg.s}, body=${msg.d.length} chars)`);
|
|
resolve({ status: msg.s, body: msg.d, ct: msg.ct });
|
|
break;
|
|
case "e":
|
|
clearTimeout(timeout);
|
|
console.log(`[queue] subscribeResult resolving error (status=${msg.s})`);
|
|
resolve({
|
|
status: msg.s,
|
|
body: JSON.stringify({ error: msg.m, message: msg.m }),
|
|
ct: "application/json",
|
|
});
|
|
break;
|
|
case "x":
|
|
console.log(`[queue] subscribeResult done, disconnecting`);
|
|
if (sub) {
|
|
try { sub.disconnect(); } catch {}
|
|
sub = null;
|
|
}
|
|
break;
|
|
}
|
|
} catch (e) {
|
|
console.error("[queue] message parse error:", e);
|
|
}
|
|
});
|
|
},
|
|
);
|
|
|
|
return { result, ready };
|
|
}
|
|
|
|
// ── Worker pool ─────────────────────────────────────────────
|
|
|
|
let poolStarted = false;
|
|
|
|
export function ensureWorkerPool(): void {
|
|
if (poolStarted || !redis) return;
|
|
poolStarted = true;
|
|
console.log(`[queue] Starting ${MAX_WORKERS} inference workers`);
|
|
for (let i = 0; i < MAX_WORKERS; i++) {
|
|
runWorker(i);
|
|
}
|
|
}
|
|
|
|
async function runWorker(id: number): Promise<void> {
|
|
if (!redis) return;
|
|
const conn = redis.duplicate();
|
|
console.log(`[queue] Worker ${id} ready`);
|
|
|
|
// eslint-disable-next-line no-constant-condition
|
|
while (true) {
|
|
try {
|
|
const result = await conn.blpop(QUEUE_KEY, 5); // 5s timeout, then loop
|
|
if (!result) continue;
|
|
|
|
const job: InferenceJob = JSON.parse(result[1]);
|
|
console.log(
|
|
`[queue] Worker ${id} ← ${job.id} (${job.user}, stream=${job.isStream}, imggen=${job.imageGenEnabled})`,
|
|
);
|
|
|
|
await processJob(job);
|
|
|
|
console.log(`[queue] Worker ${id} ✓ ${job.id} (${Date.now() - job.startTime}ms total)`);
|
|
} catch (e) {
|
|
console.error(`[queue] Worker ${id} error:`, e);
|
|
await new Promise((r) => setTimeout(r, 1000));
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Job processing ──────────────────────────────────────────
|
|
|
|
async function processJob(job: InferenceJob): Promise<void> {
|
|
const channel = `${JOB_CHANNEL_PREFIX}${job.id}`;
|
|
|
|
try {
|
|
if (job.imageGenEnabled) {
|
|
await processToolLoopJob(job, channel);
|
|
} else if (job.isStream) {
|
|
await processStreamingJob(job, channel);
|
|
} else {
|
|
await processNonStreamingJob(job, channel);
|
|
}
|
|
} catch (e) {
|
|
// Publish error to subscriber then close
|
|
await pub(channel, { t: "e", s: 502, m: (e as Error).message });
|
|
await pub(channel, { t: "x" });
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
error: (e as Error).message,
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
});
|
|
}
|
|
}
|
|
|
|
/** Streaming — forward SSE chunks from DeepSeek to subscriber via pub/sub. */
|
|
async function processStreamingJob(job: InferenceJob, channel: string): Promise<void> {
|
|
const response = await fetchDeepseekWithBackoff(
|
|
job.targetUrl,
|
|
{
|
|
method: "POST",
|
|
headers: job.headers,
|
|
body: JSON.stringify(job.body),
|
|
},
|
|
{ logLabel: `[queue:${job.id}]` },
|
|
);
|
|
|
|
// Non-OK or no body → send as single result
|
|
if (response.status !== 200 || !response.body) {
|
|
const text = await response.text();
|
|
await pub(channel, {
|
|
t: "r",
|
|
s: response.status,
|
|
d: text,
|
|
ct: response.headers.get("Content-Type") || "application/json",
|
|
});
|
|
await pub(channel, { t: "x" });
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
error: response.status !== 200 ? `DeepSeek ${response.status}` : undefined,
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Stream each chunk to the subscriber
|
|
const reader = (response.body as ReadableStream<Uint8Array>).getReader();
|
|
const decoder = new TextDecoder();
|
|
const sseChunks: string[] = [];
|
|
|
|
try {
|
|
for (;;) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
const text = decoder.decode(value, { stream: true });
|
|
sseChunks.push(text);
|
|
await pub(channel, { t: "c", d: text });
|
|
}
|
|
} finally {
|
|
reader.releaseLock();
|
|
}
|
|
|
|
await pub(channel, { t: "x" });
|
|
|
|
// Post-process: reasoning cache
|
|
const isOpenAIPath = !job.subPath.startsWith("anthropic");
|
|
if (isOpenAIPath && sseChunks.length > 0) {
|
|
try {
|
|
const fullSSE = sseChunks.join("");
|
|
const enc = new TextEncoder();
|
|
const captureStream = new ReadableStream<Uint8Array>({
|
|
start(controller) {
|
|
controller.enqueue(enc.encode(fullSSE));
|
|
controller.close();
|
|
},
|
|
});
|
|
const acc = new StreamAccumulator();
|
|
await accumulateSSE(captureStream, acc);
|
|
for (const m of acc.messages()) {
|
|
if (typeof m.reasoning_content === "string" && m.reasoning_content.length > 0) {
|
|
await storeReasoning(job.scope, m, m.reasoning_content);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
console.error("[queue] reasoning cache error:", e);
|
|
}
|
|
}
|
|
|
|
// Extract usage from final SSE frame so cache hit rate is measurable
|
|
// for streaming requests (99% of traffic). The last data chunk before
|
|
// [DONE] carries DeepSeek's usage.prompt_cache_hit_tokens et al.
|
|
let usageData: Record<string, unknown> | null = null;
|
|
if (sseChunks.length > 0) {
|
|
usageData = extractUsageFromSSE(sseChunks.join(""));
|
|
}
|
|
const reasoningDetails =
|
|
(usageData?.completion_tokens_details as Record<string, unknown> | undefined) ?? {};
|
|
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
input_tokens: num(usageData?.prompt_tokens),
|
|
output_tokens: num(usageData?.completion_tokens),
|
|
prompt_cache_hit_tokens: num(usageData?.prompt_cache_hit_tokens),
|
|
prompt_cache_miss_tokens: num(usageData?.prompt_cache_miss_tokens),
|
|
reasoning_tokens: num(reasoningDetails.reasoning_tokens),
|
|
total_tokens: num(usageData?.total_tokens),
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
patched_reasoning_messages: job.patchedReasoningMessages,
|
|
missing_reasoning_messages: job.missingReasoningMessages,
|
|
vision_images: job.visionStats?.images,
|
|
vision_cache_hits: job.visionStats?.cache_hits,
|
|
vision_failures: job.visionStats?.failures,
|
|
vision_tokens_in: job.visionStats?.tokens_in,
|
|
vision_tokens_out: job.visionStats?.tokens_out,
|
|
vision_cost_usd: job.visionStats?.cost_usd,
|
|
vision_latency_ms: job.visionStats?.latency_ms,
|
|
});
|
|
}
|
|
|
|
/** Non-streaming — collect full response, publish once. */
|
|
async function processNonStreamingJob(job: InferenceJob, channel: string): Promise<void> {
|
|
const response = await fetchDeepseekWithBackoff(
|
|
job.targetUrl,
|
|
{
|
|
method: "POST",
|
|
headers: job.headers,
|
|
body: JSON.stringify(job.body),
|
|
},
|
|
{ logLabel: `[queue:${job.id}]` },
|
|
);
|
|
|
|
const text = await response.text();
|
|
const ct = response.headers.get("Content-Type") || "application/json";
|
|
|
|
await pub(channel, { t: "r", s: response.status, d: text, ct });
|
|
await pub(channel, { t: "x" });
|
|
|
|
// Post-process: reasoning cache + token logging
|
|
try {
|
|
const parsed = JSON.parse(text);
|
|
if (parsed.usage) {
|
|
const u = parsed.usage;
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
input_tokens: u.prompt_tokens || 0,
|
|
output_tokens: u.completion_tokens || 0,
|
|
prompt_cache_hit_tokens: u.prompt_cache_hit_tokens || 0,
|
|
prompt_cache_miss_tokens: u.prompt_cache_miss_tokens || 0,
|
|
reasoning_tokens: u.completion_tokens_details?.reasoning_tokens || 0,
|
|
total_tokens: u.total_tokens || 0,
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
patched_reasoning_messages: job.patchedReasoningMessages,
|
|
missing_reasoning_messages: job.missingReasoningMessages,
|
|
vision_images: job.visionStats?.images,
|
|
vision_cache_hits: job.visionStats?.cache_hits,
|
|
vision_failures: job.visionStats?.failures,
|
|
vision_tokens_in: job.visionStats?.tokens_in,
|
|
vision_tokens_out: job.visionStats?.tokens_out,
|
|
vision_cost_usd: job.visionStats?.cost_usd,
|
|
vision_latency_ms: job.visionStats?.latency_ms,
|
|
});
|
|
}
|
|
if (!job.subPath.startsWith("anthropic") && Array.isArray(parsed.choices)) {
|
|
for (const ch of parsed.choices) {
|
|
const msg = ch?.message;
|
|
if (
|
|
msg &&
|
|
typeof msg.reasoning_content === "string" &&
|
|
msg.reasoning_content.length > 0
|
|
) {
|
|
await storeReasoning(job.scope, msg as AssistantMessage, msg.reasoning_content);
|
|
}
|
|
}
|
|
}
|
|
} catch {
|
|
// not JSON — log basic entry
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
});
|
|
}
|
|
}
|
|
|
|
/** Image-gen tool loop — multi-round DeepSeek + fal.ai, publish final result. */
|
|
async function processToolLoopJob(job: InferenceJob, channel: string): Promise<void> {
|
|
// Progress callback — streams live status updates to the client while images generate
|
|
const sendProgress = async (msg: string) => {
|
|
const sse = `data: ${JSON.stringify({ choices: [{ delta: { content: `\n\n🚀 ${msg}\n` } }] })}\n\n`;
|
|
await pub(channel, { t: "c", d: sse });
|
|
};
|
|
|
|
// Resolve design context from source + user (ABAC-aware) — always injected
|
|
const context = resolveDesignContext(job.source, job.user);
|
|
console.log(`[tool-loop] injecting design context: ${context.label} (source=${job.source}, user=${job.user})`);
|
|
|
|
const { response: finalResp, stats: loopStats } = await runImageGenLoop(
|
|
job.body as ChatBody,
|
|
async (b) => {
|
|
const r = await fetch(job.targetUrl, {
|
|
method: "POST",
|
|
headers: job.headers,
|
|
body: JSON.stringify({ ...b, stream: false }),
|
|
});
|
|
if (!r.ok) {
|
|
throw new Error(`deepseek ${r.status}: ${(await r.text()).slice(0, 300)}`);
|
|
}
|
|
return (await r.json()) as ChatResponse;
|
|
},
|
|
sendProgress,
|
|
context.prefix,
|
|
);
|
|
|
|
await pub(channel, { t: "r", s: 200, d: JSON.stringify(finalResp), ct: "application/json" });
|
|
await pub(channel, { t: "x" });
|
|
|
|
// Reasoning cache
|
|
if (Array.isArray(finalResp.choices)) {
|
|
for (const ch of finalResp.choices) {
|
|
const msg = (ch as { message?: AssistantMessage }).message;
|
|
if (
|
|
msg &&
|
|
typeof msg.reasoning_content === "string" &&
|
|
msg.reasoning_content.length > 0
|
|
) {
|
|
await storeReasoning(job.scope, msg, msg.reasoning_content);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Log
|
|
const u =
|
|
((finalResp as { usage?: Record<string, number> }).usage as
|
|
| (Record<string, number> & {
|
|
completion_tokens_details?: { reasoning_tokens?: number };
|
|
})
|
|
| undefined) ?? {};
|
|
logInference({
|
|
user: job.user,
|
|
model: job.model,
|
|
input_tokens: u.prompt_tokens || 0,
|
|
output_tokens: u.completion_tokens || 0,
|
|
prompt_cache_hit_tokens: u.prompt_cache_hit_tokens || 0,
|
|
prompt_cache_miss_tokens: u.prompt_cache_miss_tokens || 0,
|
|
reasoning_tokens: u.completion_tokens_details?.reasoning_tokens || 0,
|
|
total_tokens: u.total_tokens || 0,
|
|
path: job.subPath,
|
|
duration_ms: Date.now() - job.startTime,
|
|
source: job.source,
|
|
patched_reasoning_messages: job.patchedReasoningMessages,
|
|
missing_reasoning_messages: job.missingReasoningMessages,
|
|
vision_images: job.visionStats?.images,
|
|
vision_cache_hits: job.visionStats?.cache_hits,
|
|
vision_failures: job.visionStats?.failures,
|
|
vision_tokens_in: job.visionStats?.tokens_in,
|
|
vision_tokens_out: job.visionStats?.tokens_out,
|
|
vision_cost_usd: job.visionStats?.cost_usd,
|
|
vision_latency_ms: job.visionStats?.latency_ms,
|
|
tool_loop_rounds: loopStats.rounds,
|
|
tool_loop_images: loopStats.images_generated,
|
|
tool_loop_calls: loopStats.tool_calls_executed,
|
|
tool_loop_failures: loopStats.tool_calls_failed,
|
|
tool_loop_cost_usd: loopStats.total_cost_usd,
|
|
});
|
|
}
|
|
|
|
// ── Helpers ──────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Extract token usage from a DeepSeek SSE byte stream.
|
|
*
|
|
* DeepSeek's final SSE data frame before [DONE] carries a usage block:
|
|
* data: {...,"usage":{"prompt_tokens":100,"completion_tokens":50,
|
|
* "prompt_cache_hit_tokens":80,"prompt_cache_miss_tokens":20,
|
|
* "total_tokens":150,"completion_tokens_details":{"reasoning_tokens":5}}}
|
|
*
|
|
* Returns null if no usage block is found (non-fatal — the client already
|
|
* received all bytes; this is purely for logging/analytics).
|
|
*/
|
|
function extractUsageFromSSE(sseText: string): Record<string, unknown> | null {
|
|
// Split on "data: " boundaries. The final data frame before [DONE] has usage.
|
|
const frames = sseText.split(/\ndata:\s*/);
|
|
// Walk backwards to find the last frame with a usage block.
|
|
for (let i = frames.length - 1; i >= 0; i--) {
|
|
const line = frames[i].trim();
|
|
if (!line || line === "[DONE]") continue;
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed && typeof parsed === "object" && parsed.usage) {
|
|
return parsed.usage as Record<string, unknown>;
|
|
}
|
|
} catch {
|
|
// skip malformed lines
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function num(v: unknown): number {
|
|
return typeof v === "number" ? v : 0;
|
|
}
|
|
|
|
async function pub(channel: string, msg: QueueMessage): Promise<void> {
|
|
if (!redis) return;
|
|
const receivers = await redis.publish(channel, JSON.stringify(msg));
|
|
console.log(`[queue] pub ${channel} t=${msg.t} → ${receivers} receivers`);
|
|
}
|
|
|
|
// ── Queue stats (for /health) ───────────────────────────────
|
|
|
|
export async function queueStats(): Promise<{
|
|
queue_depth: number;
|
|
max_workers: number;
|
|
pool_started: boolean;
|
|
}> {
|
|
if (!redis) return { queue_depth: 0, max_workers: MAX_WORKERS, pool_started: poolStarted };
|
|
const depth = await redis.llen(QUEUE_KEY);
|
|
return {
|
|
queue_depth: depth,
|
|
max_workers: MAX_WORKERS,
|
|
pool_started: poolStarted,
|
|
};
|
|
}
|