Product Architecture Document · Deep Dive

Conversational Analytics
Platform Architecture

A complete product-ready engineering specification for a stateful, multilingual Text-to-SQL analytics platform built on Qwen, Vanna AI, StarRocks OBT, and Kubernetes.

Qwen2.5-Coder-7B vLLM Cluster StarRocks OBT Vanna AI RAG Kubernetes + Helm Redis Cluster Multi-Tenant OAuth2/OIDC OpenTelemetry GitOps
00

System Overview & Design Principles

This document specifies the complete production architecture for a stateful, multilingual Conversational Analytics Platform that translates natural-language questions into analytical queries across a StarRocks One Big Table (OBT). It is designed for multi-tenant enterprise deployment, targeting sub-3-second end-to-end latency at P95 with 99.9% availability SLO.

Latency Target

P95 <3s
End-to-end: prompt → chart rendered

Availability SLO

99.9%
≤8.7 hrs/year planned downtime

Throughput

500 RPS
Concurrent queries per region

SQL Accuracy

>92%
Correct SQL on first attempt (RAG + OBT)

Guiding Design Principles

No JOIN Hallucinations

The OBT schema forces the model to emit only SELECT … WHERE … GROUP BY patterns. Multi-table JOIN paths — the primary source of Text-to-SQL failure — are eliminated by design.

Stateful by Default

Every turn reads and writes to a Redis session store. Context persists across chart updates, filter refinements, and follow-up questions without enlarging the LLM context window.

Separation of Concerns

Intent classification, SQL generation, Plotly code generation, and validation are independent calls. Each is independently retryable, observable, and replaceable.

Defense in Depth

Security is enforced at four independent layers: API gateway, application, database (read-only SA), and network policy. A compromise at any single layer does not grant data access.

01

Service Topology & Inter-Service Communication

The platform is decomposed into eight independently deployable services, each with its own scaling policy and health contract. Services communicate over internal Kubernetes DNS with mTLS enforced by Istio.

Edge / CDN
Cloudflare API Gateway (Kong) WAF Rules Rate Limiter → JWT validation, tenant routing, DDoS mitigation at edge
Frontend
Vite + React (SPA) react-plotly.js WebSocket Client → Statically served from CDN; real-time streaming via WS
Orchestration
query-service (FastAPI) session-service (FastAPI) Vanna Agent → Main business logic, SQL orchestration, Plotly gen
Intent / Routing
intent-service (FastAPI) Qwen @ temp=0 → Isolated microservice; horizontally scaled independently of SQL gen
Inference
vLLM Primary (L4) vLLM Shadow (fallback) Milvus Vector DB → OpenAI-compatible API; batched inference; GPU autoscaling
State / Cache
Redis Cluster (6-node) Redis Sentinel → Session context, DataFrame cache, intent history, rate-limit counters
Data
StarRocks FE×3 StarRocks BE×6 Apache Spark Apache Kafka → OBT analytics layer; Spark ingestion; Kafka CDC feed
Observability
OpenTelemetry Collector Prometheus Grafana Jaeger Tracing → Distributed traces, metrics, structured logs, alerting

Service Communication Matrix

CallerCalleeProtocolAuthTimeout / Retry
React SPAKong API GatewayHTTPS + WebSocketJWT (OIDC)30s / 0
Kong Gatewayquery-serviceHTTP/2 gRPCmTLS + Service Account28s / 1
query-serviceintent-serviceHTTP/2 internalmTLS2s / 2 fast-fail
query-servicevLLM (OpenAI API)HTTP/1.1 streamingAPI Key (k8s secret)25s / 1
query-serviceMilvusgRPCmTLS500ms / 3
query-serviceStarRocksMySQL protocolTLS + read-only SA15s / 1
query-serviceRedis ClusterRESP3 TLSACL + password200ms / 3
Spark jobsStarRocks BEStarRocks Spark ConnectorTLS + write SA600s / 3
Kafka ConsumerStarRocks BE (stream)StarRocks Routine LoadSASL + TLSstream
02

One Big Table (OBT) Schema Design

The OBT is the cornerstone of this architecture's SQL accuracy. By pre-joining all dimensional attributes into a single wide table, we eliminate the primary failure mode of Text-to-SQL systems: incorrect or missing JOIN predicates. The trade-off is controlled data redundancy, which is acceptable in an analytics-read workload.

Why OBT Eliminates Hallucinations

Standard star-schema prompting requires the model to know table relationships, foreign keys, and JOIN cardinalities — none of which are reliably inferable from column names alone. OBT reduces the entire schema to SELECT [metrics] FROM analytics_obt WHERE [dimensions]. There is exactly one table for the model to reason about.

Primary OBT Table Definition

CREATE TABLE analytics_obt (
  -- ─── Partition & Bucketing Keys ────────────────────────────────
  event_date          DATE           NOT NULL,    -- PARTITION KEY
  tenant_id           VARCHAR(36)   NOT NULL,    -- row-level tenant isolation

  -- ─── Time Dimensions ────────────────────────────────────────────
  event_datetime      DATETIME       NOT NULL,
  event_year          SMALLINT,
  event_quarter       TINYINT,
  event_month         TINYINT,
  event_week_of_year  TINYINT,
  event_day_of_week   TINYINT,
  is_weekend          BOOLEAN,

  -- ─── Geography Dimensions ───────────────────────────────────────
  country_code        VARCHAR(3),
  region_name         VARCHAR(128),
  province_name       VARCHAR(128),
  district_name       VARCHAR(128),
  store_id            BIGINT,
  store_name          VARCHAR(256),
  store_tier          VARCHAR(32),    -- 'flagship','standard','kiosk'

  -- ─── Product Dimensions ─────────────────────────────────────────
  product_id          BIGINT,
  product_name        VARCHAR(512),
  sku_code            VARCHAR(64),
  category_l1         VARCHAR(128),
  category_l2         VARCHAR(128),
  category_l3         VARCHAR(128),
  brand_name          VARCHAR(128),
  unit_cost           DECIMAL(18,4),

  -- ─── Customer Dimensions ────────────────────────────────────────
  customer_id         BIGINT,
  customer_segment    VARCHAR(64),   -- 'VIP','regular','new'
  customer_age_band   VARCHAR(16),   -- '18-24','25-34', etc.
  customer_gender     VARCHAR(16),
  acquisition_channel VARCHAR(64),

  -- ─── Sales Agent Dimensions ─────────────────────────────────────
  agent_id            BIGINT,
  agent_name          VARCHAR(256),
  agent_team          VARCHAR(128),
  agent_region        VARCHAR(128),

  -- ─── Fact / Measures ────────────────────────────────────────────
  order_id            BIGINT         NOT NULL,
  order_line_id       BIGINT         NOT NULL,
  quantity            INT,
  unit_price          DECIMAL(18,4),
  gross_revenue       DECIMAL(18,4),
  discount_amount     DECIMAL(18,4),
  net_revenue         DECIMAL(18,4),
  cogs                DECIMAL(18,4),
  gross_margin        DECIMAL(18,4),
  return_flag         BOOLEAN,
  return_amount       DECIMAL(18,4),

  -- ─── Metadata ───────────────────────────────────────────────────
  ingested_at         DATETIME,
  source_system       VARCHAR(64),
  _row_hash           VARCHAR(64)    -- dedup key from source
)
DUPLICATE KEY(event_date, tenant_id, order_id, order_line_id)
PARTITION BY RANGE(event_date) (
  START ("2022-01-01") END ("2027-01-01") EVERY (INTERVAL 1 MONTH)
)
DISTRIBUTED BY HASH(tenant_id, store_id) BUCKETS 128
PROPERTIES (
  "replication_num" = "3",
  "compression"     = "ZSTD",
  "enable_persistent_index" = "true"
);

Materialised Views for Common Aggregations

StarRocks supports synchronous materialised views that are automatically maintained on write. Pre-computing high-frequency aggregations reduces query latency for common patterns without changing the OBT interface the model interacts with.

-- Daily revenue rollup — covers 80% of "revenue by day" queries
CREATE MATERIALIZED VIEW mv_daily_revenue
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
  event_date, tenant_id, province_name, category_l1,
  SUM(gross_revenue) AS total_gross,
  SUM(net_revenue)   AS total_net,
  SUM(quantity)      AS total_units,
  COUNT(DISTINCT order_id) AS order_count
FROM analytics_obt
GROUP BY event_date, tenant_id, province_name, category_l1;

-- Monthly KPI rollup — used for YoY comparison queries
CREATE MATERIALIZED VIEW mv_monthly_kpi
REFRESH ASYNC EVERY(INTERVAL 6 HOUR)
AS
SELECT
  event_year, event_month, tenant_id, category_l1, brand_name,
  SUM(net_revenue) AS monthly_revenue,
  AVG(gross_margin) AS avg_margin,
  COUNT(DISTINCT customer_id) AS unique_customers
FROM analytics_obt
GROUP BY event_year, event_month, tenant_id, category_l1, brand_name;
Query Rewrite

StarRocks' query planner automatically rewrites eligible queries against materialised views at runtime — the LLM always targets analytics_obt, and the engine transparently serves from the fastest available materialised view.

Partitioning & Bucketing Strategy

StrategyKeyRationale
Range Partitionevent_date (monthly)Enables partition pruning on all date-filtered queries; reduces scan from 100% to <5% of data for typical 30-day windows
Hash Distributiontenant_id + store_idCo-locates all data for a tenant+store pair on the same BE node; eliminates shuffle for GROUP BY store queries
Colosseum Buckets128 bucketsSized for 6 BE nodes with room to scale to 24; each bucket ≈ 200MB compressed at 5TB total
ZSTD Compressionall columnsTypical 4–6× compression ratio for analytics data; decompression CPU cost is sub-millisecond on L4
Persistent Indexprimary keyEnables O(1) point lookups for dedup checks during ingestion without scanning BTree pages
03

Data Ingestion Pipeline

The ingestion pipeline maintains a fresh OBT through two complementary flows: a batch ETL path via Spark for historical loads and large-scale reprocessing, and a streaming CDC path via Kafka for near-real-time updates. Both paths write through StarRocks' native connectors.

Source
OLTP DBs / APIs / S3
Extract
Debezium CDC / Spark Read
Transform
Spark OBT Flatten Job
Validate
Great Expectations
Load
StarRocks Connector / Routine Load
Refresh
MV Async Refresh

Spark OBT Flatten Job

The core transformation logic joins all source dimension tables onto the fact table, computes derived columns, and writes the flattened output directly to StarRocks partitions. The job is idempotent — re-running for any date range produces identical output.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, md5, concat_ws, year, month, weekofyear, dayofweek, when

spark = SparkSession.builder \
    .appName("OBT-Flatten-Daily") \
    .config("spark.starrocks.fe.http.url", "http://starrocks-fe:8030") \
    .config("spark.starrocks.user", "etl_writer") \
    .getOrCreate()

# ── 1. Load source tables from Data Lake (Hudi/Delta on S3) ──
orders     = spark.read.format("delta").load("s3://lake/orders")
order_lines = spark.read.format("delta").load("s3://lake/order_lines")
products   = spark.read.format("delta").load("s3://lake/products")
stores     = spark.read.format("delta").load("s3://lake/stores")
customers  = spark.read.format("delta").load("s3://lake/customers")
agents     = spark.read.format("delta").load("s3://lake/agents")

# ── 2. Flatten join (broadcast small dims, shuffle large fact) ──
obt = order_lines \
    .join(orders,    "order_id",   "inner") \
    .join(products.hint("broadcast"),  "product_id", "left") \
    .join(stores.hint("broadcast"),    "store_id",   "left") \
    .join(customers.hint("broadcast"), "customer_id","left") \
    .join(agents.hint("broadcast"),   "agent_id",   "left")

# ── 3. Derive time columns ──
obt = obt.withColumns({
    "event_date":       to_date("event_datetime"),
    "event_year":       year("event_datetime"),
    "event_month":      month("event_datetime"),
    "event_week_of_year": weekofyear("event_datetime"),
    "event_day_of_week": dayofweek("event_datetime"),
    "is_weekend":       dayofweek("event_datetime").isin([1,7]),
    "_row_hash":        md5(concat_ws("|", col("order_id"), col("order_line_id"))),
    "ingested_at":      current_timestamp(),
})

# ── 4. Write to StarRocks (overwrite target partition only) ──
obt.write.format("starrocks") \
    .option("starrocks.table.identifier", "analytics.analytics_obt") \
    .option("starrocks.write.properties.transaction_timeout_sec", "3600") \
    .option("starrocks.write.properties.partial_update", "false") \
    .mode("overwrite") \
    .partitionBy("event_date", "tenant_id") \
    .save()

Kafka CDC Streaming Path

For near-real-time updates (order status changes, returns), StarRocks' Routine Load feature creates a persistent Kafka consumer that streams rows directly into the OBT without any intermediate processing layer.

CREATE ROUTINE LOAD analytics.kafka_obt_load ON analytics_obt
COLUMNS TERMINATED BY ",",
COLUMNS (event_date, tenant_id, order_id, order_line_id,
          net_revenue, quantity, return_flag, ingested_at)
PROPERTIES (
    "max_batch_interval" = "10",      -- seconds; controls CDC latency
    "max_batch_rows"     = "100000",
    "format"             = "json",
    "strip_outer_array"  = "true"
)
FROM KAFKA (
    "kafka_broker_list"  = "kafka-0:9092,kafka-1:9092,kafka-2:9092",
    "kafka_topic"        = "analytics.obt.cdc",
    "kafka_partitions"   = "0,1,2,3,4,5",
    "property.security.protocol" = "SASL_SSL"
);

Data Quality Gates

Great Expectations suites run as a Spark post-action after each batch load. Failures trigger an alert and prevent the partition from being served to the query engine until resolved.

Completeness

net_revenue NOT NULL on all rows; no orphaned order lines without a parent order.

Range Checks

net_revenue BETWEEN -50000 AND 500000; quantity BETWEEN 1 AND 10000.

Referential

All tenant_id values exist in the tenant registry; all store_id values have a matching store record.

04

vLLM Inference Cluster

The inference layer is built on vLLM with the OpenAI-compatible server API, hosted on dedicated NVIDIA L4 GPU nodes. It serves two model deployments simultaneously: the primary SQL/Plotly code generator (Qwen2.5-Coder-7B) and the intent classifier (same model, separate deployment with different sampling config).

vLLM Server Configuration

# helm/vllm/values.yaml
model:
  name: qwen2.5-coder-7b-instruct
  hf_repo: Qwen/Qwen2.5-Coder-7B-Instruct
  dtype: bfloat16
  quantization: null          # AWQ optional for cost reduction
  max_model_len: 32768
  gpu_memory_utilization: 0.90

serving:
  tensor_parallel_size: 1      # 1× L4 24GB sufficient for 7B BF16
  max_num_seqs: 256            # PagedAttention batch size
  enable_prefix_caching: true  # cache system prompt KV for ~40% TTFT reduction
  scheduler_delay_factor: 0.0

autoscaling:
  min_replicas: 2
  max_replicas: 8
  target_gpu_utilization: 75  # %
  scale_up_cooldown: 60s
  scale_down_cooldown: 300s

resources:
  requests:
    nvidia.com/gpu: 1
    memory: 32Gi
    cpu: 8
  limits:
    nvidia.com/gpu: 1
    memory: 40Gi

Prefix Caching Strategy

vLLM's prefix caching stores the KV tensors for repeated prompt prefixes (system prompts, DDL context). Since every SQL generation call begins with the same multi-kilobyte system prompt and OBT DDL, prefix caching yields a 35–45% reduction in Time-To-First-Token for all non-cold-start requests.

# The system prompt prefix is identical for all tenants (DDL is in the suffix)
# vLLM caches KV tensors for this block automatically
SYSTEM_PROMPT_PREFIX = """
You are an expert SQL analyst for a StarRocks analytics database.
The database contains exactly ONE table: analytics_obt
You MUST generate only SELECT queries — no DDL, no DML, no subqueries
unless strictly necessary for the logic.
Output ONLY the raw SQL. No explanation, no markdown fences.
Column reference:
"""

# The DDL block appended per-request (also cached after first use per tenant)
def build_sql_system_prompt(tenant_ddl_excerpt: str) -> str:
    return SYSTEM_PROMPT_PREFIX + tenant_ddl_excerpt

Model Deployment Split: SQL vs Intent

DeploymentUse CaseTemperatureMax TokensTimeout
vllm-sql-genSQL generation + Plotly code gen0.1204825s
vllm-intentIntent classification only0.0162s
vllm-reflectAgentic reflection loop validation0.02565s
AWQ Quantization Option

Applying 4-bit AWQ quantization reduces VRAM from ~14GB to ~5GB per replica, allowing 4 replicas per L4 GPU (24GB) instead of 1. Accuracy benchmarks on the OBT SQL task show a <1.5% quality degradation — acceptable for intent and reflection deployments, optional for SQL gen.

05

RAG Engine — Vanna AI Deep Dive

Vanna AI provides the Retrieval-Augmented Generation layer, which enriches every SQL generation call with semantically similar historical question–SQL pairs. The production deployment replaces Vanna's default ChromaDB backend with Milvus for horizontal scalability and tenant isolation.

Vector Store Schema (Milvus)

from pymilvus import CollectionSchema, FieldSchema, DataType, Collection

# One collection per corpus type; tenant_id field enables filtered search
schema = CollectionSchema(fields=[
    FieldSchema("id",          DataType.VARCHAR,      max_length=64, is_primary=True),
    FieldSchema("tenant_id",   DataType.VARCHAR,      max_length=36),  # scalar filter
    FieldSchema("corpus_type", DataType.VARCHAR,      max_length=16),  # 'ddl','qa','doc'
    FieldSchema("content",     DataType.VARCHAR,      max_length=8192),
    FieldSchema("embedding",   DataType.FLOAT_VECTOR, dim=1024),        # bge-m3 dim
    FieldSchema("created_at",  DataType.INT64),
    FieldSchema("hit_count",   DataType.INT64),       # popularity signal
], description="Vanna RAG corpus")

coll = Collection("vanna_corpus", schema)

# HNSW index for sub-10ms ANN search on 1M+ vectors
coll.create_index("embedding", {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {"M": 16, "efConstruction": 200}
})

Embedding Model: BGE-M3

BGE-M3 is selected over OpenAI Ada because it supports multilingual dense retrieval natively (including Vietnamese), runs on-premise (no egress cost), and achieves comparable BEIR benchmark scores for code and SQL retrieval tasks.

Embedding Service

BGE-M3 served via text-embeddings-inference (Hugging Face TEI) as a sidecar to the vLLM cluster. 1024-dimensional dense vectors with optional sparse colBERT late interaction.

Hybrid Search

Production retrieval uses sparse + dense fusion: BM25 keyword recall + HNSW ANN, ranked by Reciprocal Rank Fusion (RRF). This handles both exact column-name matches and semantic paraphrases.

RAG Retrieval Pipeline

class VannaProductionRAG(VannaBase):

    async def get_sql_prompt_context(self, question: str, tenant_id: str) -> dict:
        # 1. Embed the user question
        q_vector = await self.embed_service.embed(question)  # BGE-M3

        # 2. Hybrid search: tenant-scoped, top-K=8 across all corpus types
        hits = await self.milvus.hybrid_search(
            collection="vanna_corpus",
            dense_vector=q_vector,
            sparse_query=question,           # BM25 tokenised
            expr=f"tenant_id == '{tenant_id}'",
            limit=8,
            ranker="RRF",                   # Reciprocal Rank Fusion
        )

        # 3. Separate DDL, QA pairs, and documentation fragments
        ddl_context = [h.content for h in hits if h.corpus_type == "ddl"]
        qa_examples = [h.content for h in hits if h.corpus_type == "qa"]
        doc_context  = [h.content for h in hits if h.corpus_type == "doc"]

        # 4. Increment hit counters asynchronously (popularity signal for future ranking)
        asyncio.create_task(self.milvus.increment_hits([h.id for h in hits]))

        return {
            "ddl":      "\n\n".join(ddl_context),
            "examples": qa_examples,   # 3-shot examples in prompt
            "docs":     "\n".join(doc_context),
        }

Training the Corpus

Vanna's corpus is seeded and continuously updated from three sources:

SourceCorpus TypeUpdate FrequencyVolume
OBT DDL + column descriptionsddlOn schema change (CI pipeline)~50 entries
Human-curated Q&SQL pairsqaWeekly review cycle200–500 entries/tenant
Feedback loop: user-confirmed queriesqaReal-time on thumbs-upGrows with usage
Business logic documentationdocOn wiki update (webhook)~100 entries
Synthetic augmentation (GPT-4o)qaMonthly batch job1000+ paraphrases
06

FastAPI Query Service — Internals

The query-service is an async FastAPI application that orchestrates the full request lifecycle. It runs on Uvicorn with Gunicorn workers inside the pod, and exposes both a REST endpoint and a WebSocket streaming endpoint for progressive response rendering.

Application Layer Architecture

# src/query_service/main.py
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

app = FastAPI(title="query-service", version="2.0")

# ── Middleware stack (order matters) ──────────────────────────
app.add_middleware(TenantMiddleware)          # extract + validate tenant from JWT
app.add_middleware(RateLimitMiddleware)       # Redis sliding window per tenant
app.add_middleware(RequestLoggingMiddleware)  # structured JSON logs to OTel
app.add_middleware(CORSMiddleware, allow_origins=["https://*.yourapp.com"])

FastAPIInstrumentor.instrument_app(app)       # OTel auto-instrumentation

# ── Dependency providers ──────────────────────────────────────
async def get_session_repo() -> SessionRepository:   yield redis_session_repo
async def get_vanna_agent() -> VannaAgent:            yield vanna_agent_singleton
async def get_starrocks() -> StarRocksPool:           yield starrocks_pool

# ── Primary query endpoint ────────────────────────────────────
@app.post("/v1/query")
async def query_endpoint(
    req: QueryRequest,
    tenant: TenantContext = Depends(verify_jwt),
    session: SessionRepository = Depends(get_session_repo),
    vanna: VannaAgent = Depends(get_vanna_agent),
    db: StarRocksPool = Depends(get_starrocks),
):
    with tracer.start_as_current_span("query.full_pipeline") as span:
        span.set_attribute("tenant.id", tenant.id)
        span.set_attribute("session.id", req.session_id)

        # Fetch prior state — zero DB round-trips if cached
        ctx = await session.load(req.session_id, tenant.id)

        # Classify intent (fast, isolated, 2s timeout)
        intent = await intent_client.classify(req.prompt, ctx.history)

        # Execute the appropriate path
        result = await execute_intent(intent, req, ctx, tenant, vanna, db)

        # Generate Plotly JSON (server-side only)
        plotly_json = await vanna.generate_plotly_json(result.dataframe, result.chart_spec)

        # Persist updated state
        await session.save(req.session_id, tenant.id, result.to_session_state())

        return QueryResponse(
            plotly_json=plotly_json,
            sql=result.sql,
            row_count=len(result.dataframe),
            intent=intent.value,
            from_cache=result.from_cache,
        )

# ── WebSocket streaming endpoint (token-by-token SQL gen) ─────
@app.websocket("/v1/query/stream")
async def query_stream(ws: WebSocket, session_id: str, token: str):
    await ws.accept()
    tenant = verify_ws_token(token)
    async for chunk in vanna_agent.stream_sql_gen(session_id, tenant):
        await ws.send_json({"type": "sql_token", "content": chunk})
    await ws.send_json({"type": "done"})

Connection Pool Configuration

ResourcePool SizeMax OverflowRecycle (s)Library
StarRocks (MySQL)20103600aiomysql + SQLAlchemy async
Redis Cluster50redis-py async cluster client
vLLM HTTP2510httpx AsyncClient
Milvus gRPC105pymilvus async
07

Intent Router — Finite State Machine

The Intent Router is modelled as a deterministic FSM over the session's conversation state. Each transition consumes the new user prompt and produces an intent token that determines the execution path. The FSM is implemented as a separate microservice (intent-service) to enable independent scaling and A/B testing of classification logic.

FSM State Transitions

from enum import StrEnum
from dataclasses import dataclass

class Intent(StrEnum):
    NEW_QUERY     = "NEW_QUERY"     # no prior state required
    DATA_FILTER   = "DATA_FILTER"   # refines existing query; needs prior SQL
    VISUAL_UPDATE = "VISUAL_UPDATE" # chart type change; needs cached DF only
    CLARIFY       = "CLARIFY"       # model asks for more info (agentic mode)
    RESET         = "RESET"         # explicit conversation restart

class IntentFSM:
    """
    State machine that validates intent transitions.
    DATA_FILTER and VISUAL_UPDATE are only valid if session has prior data.
    """
    VALID_TRANSITIONS = {
        None:               {Intent.NEW_QUERY, Intent.RESET},
        Intent.NEW_QUERY:   {Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
        Intent.DATA_FILTER: {Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
        Intent.VISUAL_UPDATE:{Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
        Intent.CLARIFY:     {Intent.NEW_QUERY, Intent.CLARIFY, Intent.RESET},
        Intent.RESET:       {Intent.NEW_QUERY},
    }

    def validate(self, current: Intent | None, proposed: Intent, has_cached_df: bool) -> Intent:
        if proposed not in self.VALID_TRANSITIONS[current]:
            # Illegal transition: e.g. VISUAL_UPDATE on a fresh session
            return Intent.NEW_QUERY   # safe fallback
        if proposed in {Intent.DATA_FILTER, Intent.VISUAL_UPDATE} and not has_cached_df:
            return Intent.NEW_QUERY   # no cache available; force full query
        return proposed

Intent Classification Prompt (Production)

SYSTEM:
You are a query intent classifier for a conversational analytics system.
Respond with EXACTLY one of these tokens — nothing else, no punctuation:
  NEW_QUERY     — a new data question unrelated to prior conversation
  DATA_FILTER   — refines or narrows the SAME question with a scope change
  VISUAL_UPDATE — changes only the chart type / appearance; data unchanged
  RESET         — the user explicitly wants to start over

Rules:
  - Temperature: 0 (caller enforces; you must be deterministic)
  - Classify across ALL input languages (English, Vietnamese, etc.)
  - Requests like "show only top 5" or "filter to Q1" → DATA_FILTER
  - Requests like "pie chart", "bar graph", "đổi biểu đồ" → VISUAL_UPDATE
  - Requests like "now show me costs" (new metric) → NEW_QUERY

CONVERSATION HISTORY (last 6 turns):
{history_json}

CURRENT USER MESSAGE:
{prompt}

INTENT TOKEN:

Fallback & Confidence Scoring

Because the intent model outputs a single token deterministically, there is no probability distribution to measure confidence. Instead, the service uses a secondary validation pass: if the output token is not in the valid token set (due to model drift or token encoding issues), it defaults to NEW_QUERY and emits a warning metric to Prometheus.

async def classify_intent(prompt: str, history: list[dict]) -> Intent:
    raw = await vllm_intent.complete(
        system=INTENT_SYSTEM_PROMPT.format(history_json=history),
        user=prompt,
        max_tokens=16,
        temperature=0.0,
        stop=["\n", " "],  # stop immediately after token
    )
    token = raw.strip().upper()
    if token not in Intent.__members__:
        intent_classification_errors.inc()   # Prometheus counter
        logger.warning("Invalid intent token", token=token, prompt=prompt[:200])
        return Intent.NEW_QUERY              # safe default
    return Intent(token)
08

Redis State Layer — Session & Cache Design

Redis serves as the stateful memory of the platform. Rather than relying on the LLM context window to maintain conversation history (which inflates token costs and has a hard limit), all session state is externalised to a 6-node Redis Cluster with 3 primary and 3 replica shards.

Key Schema

Key PatternTypeTTLContents
sess:{tenant}:{session_id}Hash4 hours (sliding)last_intent, last_sql, last_chart_spec, turn_count, user_locale
df:{tenant}:{session_id}String (binary)4 hours (sliding)Compressed Arrow IPC bytes of last DataFrame (<5MB limit)
hist:{tenant}:{session_id}List (capped)4 hoursLast 10 turns: [{role, content, intent, ts}]
rl:{tenant}:minuteString (counter)60sRequests in current sliding window (rate limiting)
sql_cache:{tenant}:{prompt_hash}String1 hourPre-validated SQL for common questions (dedup identical prompts)
tenant:{tenant_id}Hashno expiryTenant config: rate_limit, allowed_tables, locale, schema_version

DataFrame Caching with Arrow IPC

Pandas DataFrames are serialised to Apache Arrow IPC format before Redis storage. Arrow is used instead of Pickle for three reasons: it is language-neutral (future Go/Rust consumers), faster to serialise/deserialise than Pickle, and has a well-defined schema that can be validated on read.

import pyarrow as pa
import pyarrow.ipc as pa_ipc
import lz4.frame
import pandas as pd

class DataFrameCache:
    MAX_BYTES = 5 * 1024 * 1024  # 5 MB ceiling

    async def set(self, key: str, df: pd.DataFrame, ttl: int = 14400):
        table  = pa.Table.from_pandas(df, preserve_index=False)
        sink   = pa.BufferOutputStream()
        writer = pa_ipc.new_stream(sink, table.schema)
        writer.write_table(table)
        writer.close()
        raw_bytes = sink.getvalue().to_pybytes()

        compressed = lz4.frame.compress(raw_bytes, compression_level=3)
        if len(compressed) > self.MAX_BYTES:
            # Truncate to first 10_000 rows and warn
            logger.warning("DataFrame exceeds 5MB; truncating to 10k rows", key=key)
            df = df.head(10_000)
            return await self.set(key, df, ttl)   # recurse once

        await self.redis.set(key, compressed, ex=ttl)

    async def get(self, key: str) -> pd.DataFrame | None:
        raw = await self.redis.get(key)
        if not raw: return None
        decompressed = lz4.frame.decompress(raw)
        reader = pa_ipc.open_stream(decompressed)
        return reader.read_all().to_pandas()

Redis Cluster Topology

Primary Shards (×3)

16384 hash slots split evenly across 3 primaries. Each primary is co-located in a separate availability zone for failure isolation. Writes go to the primary only.

Replica Shards (×3)

Each primary has one replica in a different AZ. Replicas are read-eligible. If a primary fails, Redis Cluster automatically promotes the replica within <30s.

09

Agentic Reflection Loop

The reflection loop is an automated self-correction mechanism. After SQL generation and execution, a lightweight validation model checks whether the returned data is semantically consistent with the original user question. If not, the loop rewrites the SQL and retries. This is the primary mechanism for catching model hallucinations that pass syntax validation but return wrong results.

R1
SQL Execution StarRocks

Generated SQL is executed. Execution errors (syntax, unknown column) are caught immediately and trigger the rewrite path without consuming a reflection token budget.

R2
Result Sampling In-Process

The first 5 rows and column statistics (min, max, dtype, null rate) are extracted from the result DataFrame and formatted into a compact evidence summary for the reflection prompt.

evidence = {
  "columns":     list(df.columns),
  "row_count":   len(df),
  "sample_rows": df.head(5).to_dict("records"),
  "numeric_stats": df.describe().to_dict(),
  "null_rates":  (df.isnull().mean() * 100).round(1).to_dict(),
}
R3
Reflection LLM Call Qwen @ temp=0 · max 256 tokens

A zero-temperature pass asks the model to evaluate whether the evidence matches the intent of the original question. It outputs a structured JSON verdict: {"valid": bool, "reason": str, "rewrite_hint": str}.

SYSTEM: You are a SQL result validator. Given the user question, the generated SQL,
and a sample of results, output ONLY a JSON object:
{"valid": bool, "reason": "...", "rewrite_hint": "..."}

USER_QUESTION: {original_prompt}
GENERATED_SQL: {sql}
RESULT_EVIDENCE: {evidence_json}

Respond with JSON only:
R4
Verdict Routing FSM gate
VALID → Proceed to Plotly Gen
  • DataFrame forwarded to visualisation stage
  • SQL and reflection result logged to audit store
  • Confirmed Q+SQL pair added to Vanna corpus (async)
INVALID → Rewrite (max N=3 attempts)
  • rewrite_hint appended to next SQL gen prompt
  • Retry counter incremented in session state
  • After 3 failures: return error with explanation to user
  • All failed attempts logged for human review queue
Reflection Budget Management

Each reflection call adds ~1.5s to the request latency. The loop is gated: reflection only runs for NEW_QUERY and DATA_FILTER intents. VISUAL_UPDATE never triggers reflection because no new SQL is executed. Tenants on the base tier run a single reflection pass; enterprise tenants can configure up to 3 passes via tenant config.

10

Security Architecture & Authorization

Security is enforced in four independent layers: edge (Kong Gateway), transport (mTLS via Istio), application (FastAPI middleware), and data (StarRocks read-only service account). A compromise at any single layer does not grant access to data or the ability to execute write operations.

Authentication Flow: OAuth2/OIDC + JWT

# TenantMiddleware: validates JWT and injects TenantContext on every request
class TenantMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        token = request.headers.get("Authorization", "").removeprefix("Bearer ")
        if not token:
            raise HTTPException(401, "Missing bearer token")

        try:
            # Verify signature against OIDC provider JWKS (cached 1h)
            payload = jwt.decode(
                token,
                key=await jwks_cache.get_public_keys(),
                algorithms=["RS256"],
                audience=settings.JWT_AUDIENCE,
            )
        except JWTError as e:
            raise HTTPException(401, str(e))

        ctx = TenantContext(
            id=payload["tenant_id"],
            user_id=payload["sub"],
            roles=payload.get("roles", []),
            rate_limit=payload.get("rate_limit", 60),   # req/min
        )
        request.state.tenant = ctx
        return await call_next(request)

SQL Injection & DDL Prevention

import re, sqlparse

BLOCKED_KEYWORDS = re.compile(
    r"\b(DROP|CREATE|ALTER|TRUNCATE|INSERT|UPDATE|DELETE|EXEC|EXECUTE|"
    r"GRANT|REVOKE|LOAD|EXPORT|KILL|SHOW PROCESSLIST)\b",
    re.IGNORECASE
)

def validate_sql(sql: str, tenant_id: str) -> str:
    # 1. Must start with SELECT (or WITH for CTEs)
    parsed = sqlparse.parse(sql.strip())[0]
    if parsed.get_type() not in ("SELECT", None):  # None = WITH/CTE
        raise SQLValidationError("Only SELECT statements are permitted")

    # 2. No dangerous keywords
    if BLOCKED_KEYWORDS.search(sql):
        raise SQLValidationError("Blocked keyword detected")

    # 3. Tenant isolation: inject row-level filter
    # Wrap the generated SQL as a subquery with the tenant predicate
    safe_sql = f"""
        SELECT * FROM ({sql}) AS _gen
        WHERE tenant_id = '{tenant_id}'
    """

    # 4. EXPLAIN only — validate execution plan without running
    return safe_sql

Security Control Summary

ControlLayerMechanism
AuthenticationEdge + AppJWT/OIDC via Kong plugin; RS256 signature verification
Transport EncryptionNetworkmTLS (Istio service mesh); TLS 1.3 minimum on all ingress
DDoS / Rate LimitEdgeCloudflare + Kong rate limiting; per-tenant Redis sliding window
SQL InjectionApplicationRegex + sqlparse AST validation; parameterised queries for filters
DDL PreventionApplication + DBKeyword blocklist AND read-only StarRocks service account
Tenant IsolationApplication + DBRow-level tenant_id filter injected server-side; verified in DB
Plotly Code ExecApplicationServer-side only in restricted subprocess; client never executes code
Secret ManagementInfrastructureHashiCorp Vault; k8s External Secrets Operator; no secrets in code/env files
Audit LoggingApplicationEvery SQL + tenant + user_id logged to immutable append-only store (S3 + Athena)
Dependency ScanningCI/CDTrivy image scan + Dependabot PRs; CVE-blocking quality gate
11

Multi-Tenancy Architecture

The platform uses a shared-infrastructure, isolated-data tenancy model. All tenants share the same Kubernetes cluster, vLLM pods, and Redis cluster, but are isolated at the data and configuration level via row-level security and tenant-scoped namespacing.

Tenancy Isolation Levels

ResourceIsolation MethodGuarantee
StarRocks OBTRow-level WHERE tenant_id = ? injected server-sideStrong: no cross-tenant row visibility possible
Redis KeysKey prefix sess:{tenant_id}:…; ACL rules prevent cross-prefix readsStrong: ACL enforced at Redis server level
Vanna RAG CorpusMilvus scalar filter tenant_id == X on all retrievalsStrong: embedded in retrieval query, not just post-filter
vLLM InferenceShared pods; tenant data only exists in the prompt context (not persisted)Soft: memory isolation via stateless request model
Rate LimitingPer-tenant Redis counter; configurable in tenant config hashStrict: higher-tier tenants get higher limits
Audit LogsSeparate S3 prefix per tenant; cross-tenant IAM deny policyStrong: enforced at S3 bucket policy level

Tenant Configuration Model

from pydantic import BaseModel

class TenantConfig(BaseModel):
    tenant_id:           str
    display_name:        str
    tier:                str          # 'starter' | 'growth' | 'enterprise'

    # Access control
    allowed_tables:      list[str]    # subset of OBT columns accessible
    row_filter_expr:     str | None   # additional SQL predicate beyond tenant_id

    # Rate limits
    queries_per_minute:  int = 60
    max_result_rows:     int = 50_000

    # Feature flags
    reflection_passes:   int = 1     # 1 for starter, 3 for enterprise
    streaming_enabled:   bool = True
    custom_corpus:       bool = False # enterprise: own Milvus collection

    # Locale
    default_locale:      str = "en"
    currency_symbol:     str = "$"
    date_format:         str = "YYYY-MM-DD"
12

Kubernetes Deployment

All services are packaged as Helm charts and deployed to a managed Kubernetes cluster (GKE Autopilot or EKS Managed Node Groups). GPU workloads (vLLM) run on dedicated node pools with taint/toleration policies to prevent CPU workloads from scheduling on expensive GPU nodes.

Namespace Layout

# Namespaces
analytics-platform-prod    # all production application services
analytics-platform-staging # staging ring
analytics-infra            # Redis, Milvus, monitoring (shared)
analytics-gpu              # vLLM pods (GPU node pool only)
istio-system               # service mesh control plane

query-service Deployment Spec

query-service-deploy.yaml Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: query-service
  namespace: analytics-platform-prod
  labels:
    app: query-service
    version: v2.0.0
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate: {maxUnavailable: 0, maxSurge: 2}
  selector:
    matchLabels: {app: query-service}
  template:
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9090"
        sidecar.istio.io/inject: "true"
    spec:
      serviceAccountName: query-service-sa
      containers:
      - name: query-service
        image: registry.example.com/query-service:2.0.0
        ports:
        - {containerPort: 8000, name: http}
        - {containerPort: 9090, name: metrics}
        env:
        - {name: VLLM_BASE_URL, valueFrom: {secretKeyRef: {name: vllm-creds, key: url}}}
        - {name: REDIS_URL,     valueFrom: {secretKeyRef: {name: redis-creds, key: url}}}
        - {name: STARROCKS_DSN, valueFrom: {secretKeyRef: {name: sr-creds, key: dsn}}}
        resources:
          requests: {cpu: "500m", memory: "1Gi"}
          limits:   {cpu: "2",     memory: "4Gi"}
        livenessProbe:
          httpGet: {path: /health/live, port: 8000}
          initialDelaySeconds: 10
          periodSeconds: 15
        readinessProbe:
          httpGet: {path: /health/ready, port: 8000}
          initialDelaySeconds: 5
          periodSeconds: 10
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector: {matchLabels: {app: query-service}}

HPA for vLLM GPU Pods

vllm-hpa.yaml HorizontalPodAutoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: vllm-sql-gen-hpa
  namespace: analytics-gpu
spec:
  scaleTargetRef: {apiVersion: apps/v1, kind: Deployment, name: vllm-sql-gen}
  minReplicas: 2
  maxReplicas: 8
  metrics:
  - type: Pods
    pods:
      metric: {name: vllm_gpu_cache_usage_perc}  # custom Prometheus metric
      target: {type: AverageValue, averageValue: "75"}
  - type: Pods
    pods:
      metric: {name: vllm_num_requests_waiting}
      target: {type: AverageValue, averageValue: "5"}   # >5 waiting → scale up
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies: [{type: Pods, value: 2, periodSeconds: 60}]
    scaleDown:
      stabilizationWindowSeconds: 300

Service Resource Sizing (Production Baseline)

ServiceReplicas (min/max)CPU Request/LimitMemorySpecial
query-service3 / 12500m / 21Gi / 4Gi
intent-service2 / 8250m / 1512Mi / 2Gi
session-service2 / 6250m / 1512Mi / 1Gi
vllm-sql-gen2 / 88 / 832Gi / 40Gi1× NVIDIA L4 per pod
vllm-intent2 / 44 / 416Gi / 24GiAWQ 4-bit; 0.5× L4
Redis primary3 (fixed)2 / 416Gi / 16GiPVC 50Gi SSD
Milvus standalone1 / 32 / 88Gi / 32GiPVC 200Gi SSD
13

CI/CD Pipeline — GitOps

The delivery pipeline follows a GitOps model: all cluster state is described declaratively in a Git repository, and ArgoCD reconciles the desired state against the live cluster. Application code and infrastructure manifests live in separate repositories with different access controls.

Pipeline Stages

PR Stage
Pull Request Checks (GitHub Actions)
  • Ruff lint + mypy type-check (Python); ESLint + TypeScript tsc (Frontend)
  • Unit tests + integration tests against testcontainers (StarRocks + Redis in Docker)
  • SQL accuracy regression suite: 200 golden question-SQL pairs, must pass >95%
  • Trivy container scan — block on CRITICAL CVEs
  • Coverage gate: >80% line coverage required to merge
Build
Build & Publish (main branch merge)
  • Docker multi-stage build → push to internal OCI registry with digest tag
  • Helm chart linting (helm lint) + unit tests (helm-unittest plugin)
  • SBOM (Software Bill of Materials) generated and attached to image manifest
  • ArgoCD image-updater writes new digest to the staging manifests repo (automated PR)
Staging
Staging Ring (automated)
  • ArgoCD auto-syncs staging namespace — full RollingUpdate deployment
  • Smoke tests run against staging endpoint (k6 load script: 50 RPS for 5 min)
  • LLM accuracy eval on staging: 500-question eval set must score >92% correct SQL
  • Canary comparison: compare P50/P95 latency against previous stable release
  • Slack notification with metrics diff; auto-promote if all gates pass
Prod
Production Deployment (manual approval gate)
  • ArgoCD production sync requires human approval from >1 senior engineer
  • Blue-green traffic split via Istio VirtualService: 10% canary → 50% → 100% over 30 min
  • Automated rollback trigger: error rate >1% or P95 latency >5s during rollout
  • Post-deploy: 15-minute burn-in Grafana dashboard manually reviewed before full 100%
  • Release notes auto-generated from conventional commits and posted to Slack
14

Observability Stack

The observability stack is built on the OpenTelemetry standard for portability. Traces, metrics, and logs all flow through a centralised OTel Collector before being routed to their respective backends.

The Three Pillars

Distributed Tracing

Jaeger (or Tempo). Every request gets a trace that spans: Kong → query-service → intent-service → vLLM → StarRocks. The LLM call and SQL execution appear as separate child spans with token counts and query plans attached.

Metrics

Prometheus + Grafana. Custom metrics: query_latency_seconds{intent,tenant}, sql_accuracy_rate{tenant}, reflection_attempts_total, vllm_gpu_cache_usage_perc.

Structured Logs

Loki or OpenSearch. All logs emitted as JSON with trace_id, tenant_id, session_id, intent, and SQL digest fields. Enables cross-pillar correlation in Grafana.

Key Metrics & Alerting Rules

# prometheus/alerts/query-service.yaml
groups:
- name: query-service
  rules:

  - alert: HighQueryLatency
    expr: histogram_quantile(0.95, query_latency_seconds_bucket) > 5
    for: 5m
    labels: {severity: warning}
    annotations:
      summary: "P95 query latency > 5s for 5 minutes"

  - alert: SQLAccuracyDrop
    expr: rate(sql_reflection_rewrites_total[10m]) /
          rate(sql_queries_total[10m]) > 0.15
    for: 10m
    labels: {severity: critical}
    annotations:
      summary: "SQL rewrite rate > 15% — model accuracy degraded"

  - alert: VLLMQueueBuildup
    expr: vllm_num_requests_waiting > 20
    for: 2m
    labels: {severity: warning}
    annotations:
      summary: "vLLM request queue > 20 — consider scaling"

  - alert: RedisMemoryHigh
    expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.85
    for: 5m
    labels: {severity: warning}
    annotations:
      summary: "Redis memory utilisation > 85%"

SQL Accuracy Dashboard

A dedicated Grafana dashboard tracks LLM quality over time — not just infrastructure health:

PanelQueryAlert Threshold
First-pass SQL accuracy rate (7d)1 - (rewrites / total_queries)<90% → critical
Reflection rewrite attempts distributionHistogram of attempts-per-queryp99 > 2 → warning
User thumbs-up/down ratioFeedback events from React frontend<0.75 → warning
Corpus hit rate by intent typeRAG hits per query intentNEW_QUERY hits <3 → warning
Token usage by tenant (cost tracking)vLLM prompt+completion tokensBudget alert per tenant
15

High Availability & Disaster Recovery

Availability Architecture

ComponentHA MechanismRTORPO
query-service / intent-service3+ replicas across 3 AZs; HPA min=3; PodDisruptionBudget maxUnavailable=1<30s0 (stateless)
vLLM pods2 replicas minimum; model weights pre-loaded on all nodes via DaemonSet preloading job<60s (new pod)0 (stateless)
Redis Cluster3 primaries + 3 replicas across 3 AZs; auto-failover by Redis Sentinel <30s<30s<100ms (async replica lag)
StarRocks FE3 FE nodes with leader election via Paxos; any FE can serve queries<15s (leader re-elect)0
StarRocks BE3× replication factor on all tablets; automatic tablet migration on node failure<120s (tablet re-balance)0 (sync replication)
MilvusStandalone with persistent volume snapshot every 1h; restore from snapshot<300s<1h (last snapshot)

Disaster Recovery Runbooks

Scenario: vLLM Pod OOMKilled

# Automatic: k8s restarts pod; model weights loaded from NodeLocal cache (~45s)
# During restart: intent-service returns 503; query-service falls back to cached SQL if available

# Manual verification:
kubectl get pods -n analytics-gpu -l app=vllm-sql-gen
kubectl logs -n analytics-gpu vllm-sql-gen-xxx --previous  # check OOM cause

# If weights not cached on node, force reschedule to pre-loaded node:
kubectl label node <node> vllm-weights=loaded
kubectl patch deployment vllm-sql-gen -p '{"spec":{"template":{"spec":{"nodeSelector":{"vllm-weights":"loaded"}}}}}'

Scenario: Redis Primary Failure

# Automatic: Redis Cluster promotes replica in ~30s
# Impact: active sessions may lose their Redis key during 30s window
# Mitigation: query-service falls back to NEW_QUERY path on cache miss (degraded but functional)

# Verify cluster health after failover:
redis-cli -h redis-cluster.analytics-infra.svc.cluster.local \
          -p 6379 --cluster check

Scenario: StarRocks Full Cluster Failure (Region-Level)

# Cross-region DR: secondary StarRocks cluster in DR region seeded daily from S3 Parquet snapshots

# Step 1: Update DNS to point to DR region endpoint
kubectl apply -f manifests/dr-region/starrocks-service-patch.yaml

# Step 2: Restore latest daily snapshot (Spark job)
spark-submit --master k8s://dr-cluster \
  s3://backup-bucket/snapshots/2024-01-15/ \
  --output starrocks://dr-fe:9030/analytics.analytics_obt

# Step 3: Verify partition completeness
mysql -h dr-fe -e "SELECT MAX(event_date) FROM analytics.analytics_obt;" analytics
16

Performance Engineering

Latency Budget Breakdown (P95 target: 3s)

StageP50P95Optimisation
Kong Gateway + JWT verify5ms15msJWKS in-memory cache (1h TTL)
Redis session load3ms10msRESP3 pipelining; LZ4 decompressed locally
Intent classification (vLLM)150ms400msmax_tokens=16; AWQ quantized; prefix cache
Milvus hybrid search (RAG)8ms25msHNSW index; in-memory collection; ef=64
SQL generation (vLLM)800ms1800msPrefix caching of system prompt (35% TTFT reduction)
SQL validation (EXPLAIN)20ms60msSeparate read-only FE node for EXPLAIN calls
StarRocks query execution150ms600msMV rewrite; partition pruning; result cache 30s
Reflection LLM call300ms700msGated: only NEW_QUERY/DATA_FILTER; AWQ model
Plotly code gen + exec200ms500msPlotly template library; code gen streamed
Redis session write5ms15msArrow IPC + LZ4; async (non-blocking response path)
Total (NEW_QUERY path)~1.6s~4.1sVISUAL_UPDATE: ~600ms P95 (no DB, no SQL gen)
P95 Headroom

The NEW_QUERY P95 of ~4.1s slightly exceeds the 3s target on the nominal path. This is addressed by: (1) streaming SQL tokens to the frontend via WebSocket so the user sees progress immediately, (2) shifting reflection to an async post-send operation for latency-sensitive tenants, and (3) StarRocks result caching for repeated identical prompts.

StarRocks Query Result Cache

-- Enable result cache globally (StarRocks 3.x)
SET GLOBAL enable_query_cache = true;
SET GLOBAL query_cache_size   = 4294967296;  -- 4GB shared result cache

-- Per-query hint for high-frequency queries (optional override)
SELECT /*+ SET_VAR(query_cache_type=2, query_cache_min_count=2) */
  province_name,
  SUM(net_revenue) AS total_revenue
FROM analytics_obt
WHERE event_date BETWEEN '2024-01-01' AND '2024-01-31'
  AND tenant_id = 'tenant-abc'
GROUP BY province_name
ORDER BY total_revenue DESC;

Redis SQL Prompt Cache (Deduplication)

import hashlib

async def get_or_generate_sql(
    prompt: str, tenant_id: str, vanna: VannaAgent, redis: Redis
) -> str:
    # Normalise prompt → stable hash (strip whitespace, lowercase)
    normalised = " ".join(prompt.lower().split())
    cache_key  = f"sql_cache:{tenant_id}:{hashlib.sha256(normalised.encode()).hexdigest()[:16]}"

    cached = await redis.get(cache_key)
    if cached:
        sql_cache_hits.inc()    # Prometheus counter
        return cached.decode()

    sql = await vanna.generate_sql(prompt, tenant_id)
    await redis.set(cache_key, sql, ex=3600)   # 1-hour TTL
    return sql
17

API Contract Specification

The query-service exposes a versioned REST API (/v1) and a WebSocket endpoint for streaming. Backwards-incompatible changes increment the major version. All responses include a X-Trace-Id header for observability correlation.

POST /v1/query

// Request
{
  "session_id": "string (UUIDv4, required)",
  "prompt":     "string (1–2000 chars, required)",
  "locale":     "string (BCP47, optional, default: tenant config)",
  "chart_hint": "string | null (optional explicit chart type override)"
}

// Response 200 OK
{
  "request_id":   "uuid",
  "intent":       "NEW_QUERY | DATA_FILTER | VISUAL_UPDATE | RESET",
  "sql":          "string | null",
  "row_count":    1234,
  "from_cache":   false,
  "reflection":   {
    "attempts": 1,
    "valid": true
  },
  "plotly_json":  {
    "data":    [...],    // Plotly trace objects
    "layout":  {...},   // Plotly layout config
    "config":  {...}    // Plotly display config (responsive: true, etc.)
  },
  "latency_ms":   {
    "total":     1842,
    "intent":    210,
    "sql_gen":   890,
    "db_query":  340,
    "plotly":    280
  }
}

// Response 422 Unprocessable (SQL validation failure)
{
  "error":   "SQL_VALIDATION_FAILED",
  "detail":  "Generated SQL contained blocked keyword: DROP",
  "request_id": "uuid"
}

// Response 429 Too Many Requests
{
  "error":        "RATE_LIMIT_EXCEEDED",
  "retry_after":  12,   // seconds
  "limit":        60    // requests/minute for this tenant
}

WebSocket /v1/query/stream — Message Protocol

// Client → Server: start
{"type": "start", "session_id": "...", "prompt": "..."}

// Server → Client: streaming tokens (SQL gen phase)
{"type": "sql_token",   "content": "SELECT"}
{"type": "sql_token",   "content": " province_name,"}

// Server → Client: phase change events
{"type": "phase",      "phase": "db_query"}
{"type": "phase",      "phase": "plotly_gen"}

// Server → Client: final result
{"type": "result",     "plotly_json": {...}, "row_count": 24}

// Server → Client: error
{"type": "error",      "code": "REFLECTION_MAX_RETRIES", "message": "..."}

Additional Endpoints

EndpointMethodDescription
/v1/session/{id}DELETEExplicit session reset; clears Redis keys for this session
/v1/feedbackPOSTSubmit thumbs-up/down; triggers async Vanna corpus update on thumbs-up
/v1/sessions/{id}/historyGETReturn conversation history for the session (last 10 turns)
/v1/admin/corpus/reloadPOST (admin)Force-reload Milvus collection from source (post Vanna retraining)
/health/liveGETLiveness probe; always 200 if process is alive
/health/readyGETReadiness probe; checks Redis + vLLM reachability
/metricsGETPrometheus metrics endpoint (port 9090)
18

Testing Strategy

Test Pyramid

Unit Tests (~500)

Pytest. Cover: SQL validator, intent FSM transitions, DataFrame cache serialisation, OBT column name resolver, rate limit logic. Mocked external dependencies. Target: <10s run time in CI.

Integration Tests (~150)

Testcontainers: real StarRocks + Redis containers. Cover: full request pipeline for all 3 intent paths, session state persistence, multi-turn conversation, rate limiting, SQL injection blocking.

LLM Accuracy Eval (~500)

Golden dataset of question–expected-SQL pairs evaluated against the real model. Run against staging on every deploy. Broken down by intent type, complexity tier, and language (EN/VI).

SQL Accuracy Evaluation Framework

from dataclasses import dataclass
import pandas as pd

@dataclass
class EvalCase:
    question: str
    language: str               # 'en' | 'vi'
    complexity: str             # 'simple' | 'medium' | 'complex'
    expected_columns: list[str] # columns that MUST appear in result
    expected_row_range: tuple[int,int]  # (min_rows, max_rows)
    golden_sql: str | None    # exact SQL comparison (optional, brittle)
    semantic_check_fn: callable # e.g. lambda df: df['net_revenue'].sum() > 0

async def run_eval_suite(cases: list[EvalCase], vanna: VannaAgent, db) -> dict:
    results = []
    for case in cases:
        sql = await vanna.generate_sql(case.question, tenant_id="eval_tenant")
        df  = await db.execute(sql)

        col_check  = all(c in df.columns for c in case.expected_columns)
        row_check  = case.expected_row_range[0] <= len(df) <= case.expected_row_range[1]
        sem_check  = case.semantic_check_fn(df)
        passed     = col_check and row_check and sem_check

        results.append({"question": case.question, "passed": passed,
                        "language": case.language, "complexity": case.complexity})

    df_results = pd.DataFrame(results)
    overall_accuracy = df_results["passed"].mean()
    return {
        "overall_accuracy": overall_accuracy,
        "by_language":    df_results.groupby("language")["passed"].mean().to_dict(),
        "by_complexity":  df_results.groupby("complexity")["passed"].mean().to_dict(),
        "passed_gate":    overall_accuracy >= 0.92,   # CI blocker threshold
    }

Load Testing (k6)

// k6/load-test.js — runs in CI staging pipeline
import http from 'k6/http';
import { check, sleep } from 'k6';

export const options = {
  stages: [
    { duration: '2m', target: 50  },   // ramp to 50 RPS
    { duration: '5m', target: 200 },   // sustain 200 RPS
    { duration: '2m', target: 500 },   // spike to 500 RPS
    { duration: '2m', target: 0   },   // ramp down
  ],
  thresholds: {
    'http_req_duration{percentile:95}': ['p(95) < 3000'],  // P95 < 3s
    'http_req_failed': ['rate < 0.005'],                   // <0.5% error rate
  },
};

const PROMPTS = [
  "Show revenue by province this month",
  "Tính tổng doanh thu theo khu vực",
  "Top 10 products by gross margin Q1 2024",
];

export default function () {
  const res = http.post(`${__ENV.BASE_URL}/v1/query`,
    JSON.stringify({
      session_id: `load-test-${__VU}`,  // unique session per VU
      prompt: PROMPTS[Math.floor(Math.random() * PROMPTS.length)],
    }),
    { headers: { 'Authorization': `Bearer ${__ENV.TEST_TOKEN}`,
                 'Content-Type': 'application/json' } }
  );
  check(res, { 'status 200': r => r.status === 200 });
  sleep(0.5);
}
Continuous SQL Quality Monitoring

Beyond CI evals, a nightly batch job re-runs the 500-question eval suite against production (using a shadow tenant with synthetic data) and posts results to a Slack channel and Grafana dashboard. Any regression >2% triggers a P2 incident and pauses the next deployment until investigated. All user thumbs-down events are automatically added to a human review queue for Vanna corpus improvement.