Files
ozan a81a450e7e feat: monorepo consolidation — merge CLI, bot, admin, team-tool, website, docs, runner, proxy
Merged into tinqs/studio:
- cmd/tinqs-cli/    — tinqs-cli (Go binary, from bot/cli)
- cmd/tea/          — Gitea CLI tool (from tinqs/cli-tea)
- services/bot/     — Bot service (from tinqs-ltd/bot on git.arikigame.com)
- services/admin/   — Admin panel (from tinqs/admin)
- services/team-tool/ — Team Tool (from tinqs/team-tool)
- services/proxy/   — tinqs-proxy (from bot/proxy)
- web/landing/      — tinqs.com website (from tinqs/website)
- web/docs/         — Platform docs (from tinqs/docs)
- web/blog/         — Blog (placeholder)
- runner/           — Ephemeral CI runner (from tinqs/runner)

All source repos will be deleted after verification.
2026-05-22 04:55:50 +00:00

662 lines
22 KiB
TypeScript

/**
* Redis-backed inference job queue with worker pool and SSE bridge.
*
* Architecture:
* 1. HTTP handler preprocesses the request (auth, vision, reasoning cache)
* 2. Handler subscribes to a Redis pub/sub channel for this job
* 3. Handler enqueues the job (Redis RPUSH)
* 4. Worker pool (BLPOP, bounded concurrency) picks up the job
* 5. Worker calls DeepSeek (with backoff/retry)
* 6. Worker publishes response chunks (streaming) or full result (non-streaming)
* back to the job channel via Redis pub/sub
* 7. Handler receives messages and pipes them to the client as SSE
*
* No new dependencies — uses existing ioredis from lib/db.
*/
import type Redis from "ioredis";
import { redis } from "@/lib/db";
import { fetchDeepseekWithBackoff } from "@/lib/deepseek-fetch";
import {
runImageGenLoop,
type ChatBody,
type ChatResponse,
} from "@/lib/image-gen-tool";
import { resolveDesignContext } from "@/lib/image-gen-context";
import { storeReasoning, type AssistantMessage } from "@/lib/reasoning-cache";
import { StreamAccumulator, accumulateSSE } from "@/lib/reasoning-stream";
import { logInference } from "@/lib/inference-log";
import type { VisionPreprocessStats } from "@/lib/vision-preprocess";
// ── Config ──────────────────────────────────────────────────
// DeepSeek allows ~50 concurrent requests per key (dynamic, load-based).
// Default 10 workers = plenty for a small team while staying well under
// the cap. Tune via env if needed.
const MAX_WORKERS = Number(process.env.DEEPSEEK_MAX_CONCURRENT) || 10;
const QUEUE_KEY = "inference:queue:jobs";
const JOB_CHANNEL_PREFIX = "inference:job:";
const JOB_TIMEOUT_MS = 300_000; // 5 min, matches Next.js maxDuration
// ── Job type ────────────────────────────────────────────────
export interface InferenceJob {
id: string;
targetUrl: string;
headers: Record<string, string>;
body: Record<string, unknown>;
isStream: boolean;
imageGenEnabled: boolean;
user: string;
model: string;
source: string;
subPath: string;
scope: string; // reasoning cache userScope
startTime: number;
patchedReasoningMessages: number;
missingReasoningMessages: number;
visionStats: VisionPreprocessStats | null;
}
// ── Pub/sub message protocol ────────────────────────────────
// All messages are JSON. The `t` field discriminates:
// "c" = SSE chunk (streaming) { t:"c", d:"data: {...}\n\n" }
// "r" = full result { t:"r", s:200, d:"{...}", ct:"application/json" }
// "e" = error { t:"e", s:502, m:"error text" }
// "x" = done (end of messages) { t:"x" }
type QueueMessage =
| { t: "c"; d: string }
| { t: "r"; s: number; d: string; ct: string }
| { t: "e"; s: number; m: string }
| { t: "x" };
// ── Enqueue ─────────────────────────────────────────────────
export async function enqueueJob(job: InferenceJob): Promise<void> {
if (!redis) throw new Error("Redis unavailable — cannot enqueue");
console.log(`[queue] enqueue ${job.id}`);
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
}
// ── Subscribe (streaming) ───────────────────────────────────
// Returns a ReadableStream that pipes SSE chunks from the worker to the client,
// plus a `ready` promise that resolves once the subscription is active
// (must await before enqueuing to avoid missing messages).
export function subscribeStream(jobId: string): {
stream: ReadableStream<Uint8Array>;
ready: Promise<void>;
} {
const encoder = new TextEncoder();
const channel = `${JOB_CHANNEL_PREFIX}${jobId}`;
let sub: Redis | null = null;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
let resolveReady!: () => void;
const ready = new Promise<void>((r) => {
resolveReady = r;
});
function cleanup() {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
if (sub) {
try { sub.disconnect(); } catch {}
sub = null;
}
}
const stream = new ReadableStream<Uint8Array>({
start(controller) {
if (!redis) {
controller.close();
resolveReady();
return;
}
sub = redis.duplicate();
sub.on("error", (err) => {
console.error(`[queue] subscribeStream sub error:`, err);
});
sub.subscribe(channel, (err) => {
if (err) {
console.error("[queue] subscribeStream subscribe failed:", err);
try { controller.error(err); } catch {}
cleanup();
resolveReady();
return;
}
console.log(`[queue] subscribeStream subscribed to ${channel}`);
resolveReady(); // subscription is live — safe to enqueue now
});
sub.on("message", (_ch: string, raw: string) => {
try {
const msg: QueueMessage = JSON.parse(raw);
switch (msg.t) {
case "c":
controller.enqueue(encoder.encode(msg.d));
break;
case "r":
// Non-streaming result on a streaming subscription (e.g. DeepSeek
// returned an error body). Forward it then close.
controller.enqueue(encoder.encode(msg.d));
try { controller.close(); } catch {}
cleanup();
break;
case "e":
// Send error as an SSE-formatted event so Cursor can display it
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: { message: msg.m, type: "proxy_error" } })}\n\ndata: [DONE]\n\n`,
),
);
try { controller.close(); } catch {}
cleanup();
break;
case "x":
try { controller.close(); } catch {}
cleanup();
break;
}
} catch (e) {
console.error("[queue] message parse error:", e);
}
});
// Safety timeout — don't hold the connection forever
timeoutId = setTimeout(() => {
console.warn(`[queue] stream subscription ${jobId} timed out`);
try { controller.close(); } catch {}
cleanup();
}, JOB_TIMEOUT_MS);
},
cancel() {
cleanup();
},
});
return { stream, ready };
}
// ── Subscribe (non-streaming / tool-loop result) ────────────
export function subscribeResult(jobId: string): {
result: Promise<{ status: number; body: string; ct: string }>;
ready: Promise<void>;
} {
const channel = `${JOB_CHANNEL_PREFIX}${jobId}`;
let sub: Redis | null = null;
let resolveReady!: () => void;
const ready = new Promise<void>((r) => {
resolveReady = r;
});
const result = new Promise<{ status: number; body: string; ct: string }>(
(resolve, reject) => {
if (!redis) {
resolveReady();
reject(new Error("Redis unavailable"));
return;
}
sub = redis.duplicate();
const timeout = setTimeout(() => {
if (sub) {
try { sub.disconnect(); } catch {}
sub = null;
}
reject(new Error("Queue timeout (300s)"));
}, JOB_TIMEOUT_MS);
sub.on("error", (err) => {
console.error(`[queue] subscribeResult sub error:`, err);
});
sub.subscribe(channel, (err) => {
if (err) {
console.error(`[queue] subscribeResult subscribe failed:`, err);
clearTimeout(timeout);
if (sub) {
try { sub.disconnect(); } catch {}
sub = null;
}
resolveReady();
reject(err);
return;
}
console.log(`[queue] subscribeResult subscribed to ${channel}`);
resolveReady(); // subscription is live
});
sub.on("message", (_ch: string, raw: string) => {
console.log(`[queue] subscribeResult got message on ${_ch}: t=${raw.slice(0, 50)}`);
try {
const msg: QueueMessage = JSON.parse(raw);
switch (msg.t) {
case "r":
clearTimeout(timeout);
console.log(`[queue] subscribeResult resolving result (status=${msg.s}, body=${msg.d.length} chars)`);
resolve({ status: msg.s, body: msg.d, ct: msg.ct });
break;
case "e":
clearTimeout(timeout);
console.log(`[queue] subscribeResult resolving error (status=${msg.s})`);
resolve({
status: msg.s,
body: JSON.stringify({ error: msg.m, message: msg.m }),
ct: "application/json",
});
break;
case "x":
console.log(`[queue] subscribeResult done, disconnecting`);
if (sub) {
try { sub.disconnect(); } catch {}
sub = null;
}
break;
}
} catch (e) {
console.error("[queue] message parse error:", e);
}
});
},
);
return { result, ready };
}
// ── Worker pool ─────────────────────────────────────────────
let poolStarted = false;
export function ensureWorkerPool(): void {
if (poolStarted || !redis) return;
poolStarted = true;
console.log(`[queue] Starting ${MAX_WORKERS} inference workers`);
for (let i = 0; i < MAX_WORKERS; i++) {
runWorker(i);
}
}
async function runWorker(id: number): Promise<void> {
if (!redis) return;
const conn = redis.duplicate();
console.log(`[queue] Worker ${id} ready`);
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const result = await conn.blpop(QUEUE_KEY, 5); // 5s timeout, then loop
if (!result) continue;
const job: InferenceJob = JSON.parse(result[1]);
console.log(
`[queue] Worker ${id}${job.id} (${job.user}, stream=${job.isStream}, imggen=${job.imageGenEnabled})`,
);
await processJob(job);
console.log(`[queue] Worker ${id}${job.id} (${Date.now() - job.startTime}ms total)`);
} catch (e) {
console.error(`[queue] Worker ${id} error:`, e);
await new Promise((r) => setTimeout(r, 1000));
}
}
}
// ── Job processing ──────────────────────────────────────────
async function processJob(job: InferenceJob): Promise<void> {
const channel = `${JOB_CHANNEL_PREFIX}${job.id}`;
try {
if (job.imageGenEnabled) {
await processToolLoopJob(job, channel);
} else if (job.isStream) {
await processStreamingJob(job, channel);
} else {
await processNonStreamingJob(job, channel);
}
} catch (e) {
// Publish error to subscriber then close
await pub(channel, { t: "e", s: 502, m: (e as Error).message });
await pub(channel, { t: "x" });
logInference({
user: job.user,
model: job.model,
error: (e as Error).message,
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
});
}
}
/** Streaming — forward SSE chunks from DeepSeek to subscriber via pub/sub. */
async function processStreamingJob(job: InferenceJob, channel: string): Promise<void> {
const response = await fetchDeepseekWithBackoff(
job.targetUrl,
{
method: "POST",
headers: job.headers,
body: JSON.stringify(job.body),
},
{ logLabel: `[queue:${job.id}]` },
);
// Non-OK or no body → send as single result
if (response.status !== 200 || !response.body) {
const text = await response.text();
await pub(channel, {
t: "r",
s: response.status,
d: text,
ct: response.headers.get("Content-Type") || "application/json",
});
await pub(channel, { t: "x" });
logInference({
user: job.user,
model: job.model,
error: response.status !== 200 ? `DeepSeek ${response.status}` : undefined,
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
});
return;
}
// Stream each chunk to the subscriber
const reader = (response.body as ReadableStream<Uint8Array>).getReader();
const decoder = new TextDecoder();
const sseChunks: string[] = [];
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
sseChunks.push(text);
await pub(channel, { t: "c", d: text });
}
} finally {
reader.releaseLock();
}
await pub(channel, { t: "x" });
// Post-process: reasoning cache
const isOpenAIPath = !job.subPath.startsWith("anthropic");
if (isOpenAIPath && sseChunks.length > 0) {
try {
const fullSSE = sseChunks.join("");
const enc = new TextEncoder();
const captureStream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(enc.encode(fullSSE));
controller.close();
},
});
const acc = new StreamAccumulator();
await accumulateSSE(captureStream, acc);
for (const m of acc.messages()) {
if (typeof m.reasoning_content === "string" && m.reasoning_content.length > 0) {
await storeReasoning(job.scope, m, m.reasoning_content);
}
}
} catch (e) {
console.error("[queue] reasoning cache error:", e);
}
}
// Extract usage from final SSE frame so cache hit rate is measurable
// for streaming requests (99% of traffic). The last data chunk before
// [DONE] carries DeepSeek's usage.prompt_cache_hit_tokens et al.
let usageData: Record<string, unknown> | null = null;
if (sseChunks.length > 0) {
usageData = extractUsageFromSSE(sseChunks.join(""));
}
const reasoningDetails =
(usageData?.completion_tokens_details as Record<string, unknown> | undefined) ?? {};
logInference({
user: job.user,
model: job.model,
input_tokens: num(usageData?.prompt_tokens),
output_tokens: num(usageData?.completion_tokens),
prompt_cache_hit_tokens: num(usageData?.prompt_cache_hit_tokens),
prompt_cache_miss_tokens: num(usageData?.prompt_cache_miss_tokens),
reasoning_tokens: num(reasoningDetails.reasoning_tokens),
total_tokens: num(usageData?.total_tokens),
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
patched_reasoning_messages: job.patchedReasoningMessages,
missing_reasoning_messages: job.missingReasoningMessages,
vision_images: job.visionStats?.images,
vision_cache_hits: job.visionStats?.cache_hits,
vision_failures: job.visionStats?.failures,
vision_tokens_in: job.visionStats?.tokens_in,
vision_tokens_out: job.visionStats?.tokens_out,
vision_cost_usd: job.visionStats?.cost_usd,
vision_latency_ms: job.visionStats?.latency_ms,
});
}
/** Non-streaming — collect full response, publish once. */
async function processNonStreamingJob(job: InferenceJob, channel: string): Promise<void> {
const response = await fetchDeepseekWithBackoff(
job.targetUrl,
{
method: "POST",
headers: job.headers,
body: JSON.stringify(job.body),
},
{ logLabel: `[queue:${job.id}]` },
);
const text = await response.text();
const ct = response.headers.get("Content-Type") || "application/json";
await pub(channel, { t: "r", s: response.status, d: text, ct });
await pub(channel, { t: "x" });
// Post-process: reasoning cache + token logging
try {
const parsed = JSON.parse(text);
if (parsed.usage) {
const u = parsed.usage;
logInference({
user: job.user,
model: job.model,
input_tokens: u.prompt_tokens || 0,
output_tokens: u.completion_tokens || 0,
prompt_cache_hit_tokens: u.prompt_cache_hit_tokens || 0,
prompt_cache_miss_tokens: u.prompt_cache_miss_tokens || 0,
reasoning_tokens: u.completion_tokens_details?.reasoning_tokens || 0,
total_tokens: u.total_tokens || 0,
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
patched_reasoning_messages: job.patchedReasoningMessages,
missing_reasoning_messages: job.missingReasoningMessages,
vision_images: job.visionStats?.images,
vision_cache_hits: job.visionStats?.cache_hits,
vision_failures: job.visionStats?.failures,
vision_tokens_in: job.visionStats?.tokens_in,
vision_tokens_out: job.visionStats?.tokens_out,
vision_cost_usd: job.visionStats?.cost_usd,
vision_latency_ms: job.visionStats?.latency_ms,
});
}
if (!job.subPath.startsWith("anthropic") && Array.isArray(parsed.choices)) {
for (const ch of parsed.choices) {
const msg = ch?.message;
if (
msg &&
typeof msg.reasoning_content === "string" &&
msg.reasoning_content.length > 0
) {
await storeReasoning(job.scope, msg as AssistantMessage, msg.reasoning_content);
}
}
}
} catch {
// not JSON — log basic entry
logInference({
user: job.user,
model: job.model,
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
});
}
}
/** Image-gen tool loop — multi-round DeepSeek + fal.ai, publish final result. */
async function processToolLoopJob(job: InferenceJob, channel: string): Promise<void> {
// Progress callback — streams live status updates to the client while images generate
const sendProgress = async (msg: string) => {
const sse = `data: ${JSON.stringify({ choices: [{ delta: { content: `\n\n🚀 ${msg}\n` } }] })}\n\n`;
await pub(channel, { t: "c", d: sse });
};
// Resolve design context from source + user (ABAC-aware) — always injected
const context = resolveDesignContext(job.source, job.user);
console.log(`[tool-loop] injecting design context: ${context.label} (source=${job.source}, user=${job.user})`);
const { response: finalResp, stats: loopStats } = await runImageGenLoop(
job.body as ChatBody,
async (b) => {
const r = await fetch(job.targetUrl, {
method: "POST",
headers: job.headers,
body: JSON.stringify({ ...b, stream: false }),
});
if (!r.ok) {
throw new Error(`deepseek ${r.status}: ${(await r.text()).slice(0, 300)}`);
}
return (await r.json()) as ChatResponse;
},
sendProgress,
context.prefix,
);
await pub(channel, { t: "r", s: 200, d: JSON.stringify(finalResp), ct: "application/json" });
await pub(channel, { t: "x" });
// Reasoning cache
if (Array.isArray(finalResp.choices)) {
for (const ch of finalResp.choices) {
const msg = (ch as { message?: AssistantMessage }).message;
if (
msg &&
typeof msg.reasoning_content === "string" &&
msg.reasoning_content.length > 0
) {
await storeReasoning(job.scope, msg, msg.reasoning_content);
}
}
}
// Log
const u =
((finalResp as { usage?: Record<string, number> }).usage as
| (Record<string, number> & {
completion_tokens_details?: { reasoning_tokens?: number };
})
| undefined) ?? {};
logInference({
user: job.user,
model: job.model,
input_tokens: u.prompt_tokens || 0,
output_tokens: u.completion_tokens || 0,
prompt_cache_hit_tokens: u.prompt_cache_hit_tokens || 0,
prompt_cache_miss_tokens: u.prompt_cache_miss_tokens || 0,
reasoning_tokens: u.completion_tokens_details?.reasoning_tokens || 0,
total_tokens: u.total_tokens || 0,
path: job.subPath,
duration_ms: Date.now() - job.startTime,
source: job.source,
patched_reasoning_messages: job.patchedReasoningMessages,
missing_reasoning_messages: job.missingReasoningMessages,
vision_images: job.visionStats?.images,
vision_cache_hits: job.visionStats?.cache_hits,
vision_failures: job.visionStats?.failures,
vision_tokens_in: job.visionStats?.tokens_in,
vision_tokens_out: job.visionStats?.tokens_out,
vision_cost_usd: job.visionStats?.cost_usd,
vision_latency_ms: job.visionStats?.latency_ms,
tool_loop_rounds: loopStats.rounds,
tool_loop_images: loopStats.images_generated,
tool_loop_calls: loopStats.tool_calls_executed,
tool_loop_failures: loopStats.tool_calls_failed,
tool_loop_cost_usd: loopStats.total_cost_usd,
});
}
// ── Helpers ──────────────────────────────────────────────────
/**
* Extract token usage from a DeepSeek SSE byte stream.
*
* DeepSeek's final SSE data frame before [DONE] carries a usage block:
* data: {...,"usage":{"prompt_tokens":100,"completion_tokens":50,
* "prompt_cache_hit_tokens":80,"prompt_cache_miss_tokens":20,
* "total_tokens":150,"completion_tokens_details":{"reasoning_tokens":5}}}
*
* Returns null if no usage block is found (non-fatal — the client already
* received all bytes; this is purely for logging/analytics).
*/
function extractUsageFromSSE(sseText: string): Record<string, unknown> | null {
// Split on "data: " boundaries. The final data frame before [DONE] has usage.
const frames = sseText.split(/\ndata:\s*/);
// Walk backwards to find the last frame with a usage block.
for (let i = frames.length - 1; i >= 0; i--) {
const line = frames[i].trim();
if (!line || line === "[DONE]") continue;
try {
const parsed = JSON.parse(line);
if (parsed && typeof parsed === "object" && parsed.usage) {
return parsed.usage as Record<string, unknown>;
}
} catch {
// skip malformed lines
}
}
return null;
}
function num(v: unknown): number {
return typeof v === "number" ? v : 0;
}
async function pub(channel: string, msg: QueueMessage): Promise<void> {
if (!redis) return;
const receivers = await redis.publish(channel, JSON.stringify(msg));
console.log(`[queue] pub ${channel} t=${msg.t}${receivers} receivers`);
}
// ── Queue stats (for /health) ───────────────────────────────
export async function queueStats(): Promise<{
queue_depth: number;
max_workers: number;
pool_started: boolean;
}> {
if (!redis) return { queue_depth: 0, max_workers: MAX_WORKERS, pool_started: poolStarted };
const depth = await redis.llen(QUEUE_KEY);
return {
queue_depth: depth,
max_workers: MAX_WORKERS,
pool_started: poolStarted,
};
}