System Overview & Design Principles
This document specifies the complete production architecture for a stateful, multilingual Conversational Analytics Platform that translates natural-language questions into analytical queries across a StarRocks One Big Table (OBT). It is designed for multi-tenant enterprise deployment, targeting sub-3-second end-to-end latency at P95 with 99.9% availability SLO.
P95 <3s
End-to-end: prompt → chart rendered
99.9%
≤8.7 hrs/year planned downtime
500 RPS
Concurrent queries per region
>92%
Correct SQL on first attempt (RAG + OBT)
Guiding Design Principles
The OBT schema forces the model to emit only SELECT … WHERE … GROUP BY patterns. Multi-table JOIN paths — the primary source of Text-to-SQL failure — are eliminated by design.
Every turn reads and writes to a Redis session store. Context persists across chart updates, filter refinements, and follow-up questions without enlarging the LLM context window.
Intent classification, SQL generation, Plotly code generation, and validation are independent calls. Each is independently retryable, observable, and replaceable.
Security is enforced at four independent layers: API gateway, application, database (read-only SA), and network policy. A compromise at any single layer does not grant data access.
Service Topology & Inter-Service Communication
The platform is decomposed into eight independently deployable services, each with its own scaling policy and health contract. Services communicate over internal Kubernetes DNS with mTLS enforced by Istio.
Service Communication Matrix
| Caller | Callee | Protocol | Auth | Timeout / Retry |
|---|---|---|---|---|
| React SPA | Kong API Gateway | HTTPS + WebSocket | JWT (OIDC) | 30s / 0 |
| Kong Gateway | query-service | HTTP/2 gRPC | mTLS + Service Account | 28s / 1 |
| query-service | intent-service | HTTP/2 internal | mTLS | 2s / 2 fast-fail |
| query-service | vLLM (OpenAI API) | HTTP/1.1 streaming | API Key (k8s secret) | 25s / 1 |
| query-service | Milvus | gRPC | mTLS | 500ms / 3 |
| query-service | StarRocks | MySQL protocol | TLS + read-only SA | 15s / 1 |
| query-service | Redis Cluster | RESP3 TLS | ACL + password | 200ms / 3 |
| Spark jobs | StarRocks BE | StarRocks Spark Connector | TLS + write SA | 600s / 3 |
| Kafka Consumer | StarRocks BE (stream) | StarRocks Routine Load | SASL + TLS | stream |
One Big Table (OBT) Schema Design
The OBT is the cornerstone of this architecture's SQL accuracy. By pre-joining all dimensional attributes into a single wide table, we eliminate the primary failure mode of Text-to-SQL systems: incorrect or missing JOIN predicates. The trade-off is controlled data redundancy, which is acceptable in an analytics-read workload.
Standard star-schema prompting requires the model to know table relationships, foreign keys, and JOIN cardinalities — none of which are reliably inferable from column names alone. OBT reduces the entire schema to SELECT [metrics] FROM analytics_obt WHERE [dimensions]. There is exactly one table for the model to reason about.
Primary OBT Table Definition
CREATE TABLE analytics_obt (
-- ─── Partition & Bucketing Keys ────────────────────────────────
event_date DATE NOT NULL, -- PARTITION KEY
tenant_id VARCHAR(36) NOT NULL, -- row-level tenant isolation
-- ─── Time Dimensions ────────────────────────────────────────────
event_datetime DATETIME NOT NULL,
event_year SMALLINT,
event_quarter TINYINT,
event_month TINYINT,
event_week_of_year TINYINT,
event_day_of_week TINYINT,
is_weekend BOOLEAN,
-- ─── Geography Dimensions ───────────────────────────────────────
country_code VARCHAR(3),
region_name VARCHAR(128),
province_name VARCHAR(128),
district_name VARCHAR(128),
store_id BIGINT,
store_name VARCHAR(256),
store_tier VARCHAR(32), -- 'flagship','standard','kiosk'
-- ─── Product Dimensions ─────────────────────────────────────────
product_id BIGINT,
product_name VARCHAR(512),
sku_code VARCHAR(64),
category_l1 VARCHAR(128),
category_l2 VARCHAR(128),
category_l3 VARCHAR(128),
brand_name VARCHAR(128),
unit_cost DECIMAL(18,4),
-- ─── Customer Dimensions ────────────────────────────────────────
customer_id BIGINT,
customer_segment VARCHAR(64), -- 'VIP','regular','new'
customer_age_band VARCHAR(16), -- '18-24','25-34', etc.
customer_gender VARCHAR(16),
acquisition_channel VARCHAR(64),
-- ─── Sales Agent Dimensions ─────────────────────────────────────
agent_id BIGINT,
agent_name VARCHAR(256),
agent_team VARCHAR(128),
agent_region VARCHAR(128),
-- ─── Fact / Measures ────────────────────────────────────────────
order_id BIGINT NOT NULL,
order_line_id BIGINT NOT NULL,
quantity INT,
unit_price DECIMAL(18,4),
gross_revenue DECIMAL(18,4),
discount_amount DECIMAL(18,4),
net_revenue DECIMAL(18,4),
cogs DECIMAL(18,4),
gross_margin DECIMAL(18,4),
return_flag BOOLEAN,
return_amount DECIMAL(18,4),
-- ─── Metadata ───────────────────────────────────────────────────
ingested_at DATETIME,
source_system VARCHAR(64),
_row_hash VARCHAR(64) -- dedup key from source
)
DUPLICATE KEY(event_date, tenant_id, order_id, order_line_id)
PARTITION BY RANGE(event_date) (
START ("2022-01-01") END ("2027-01-01") EVERY (INTERVAL 1 MONTH)
)
DISTRIBUTED BY HASH(tenant_id, store_id) BUCKETS 128
PROPERTIES (
"replication_num" = "3",
"compression" = "ZSTD",
"enable_persistent_index" = "true"
);
Materialised Views for Common Aggregations
StarRocks supports synchronous materialised views that are automatically maintained on write. Pre-computing high-frequency aggregations reduces query latency for common patterns without changing the OBT interface the model interacts with.
-- Daily revenue rollup — covers 80% of "revenue by day" queries
CREATE MATERIALIZED VIEW mv_daily_revenue
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
event_date, tenant_id, province_name, category_l1,
SUM(gross_revenue) AS total_gross,
SUM(net_revenue) AS total_net,
SUM(quantity) AS total_units,
COUNT(DISTINCT order_id) AS order_count
FROM analytics_obt
GROUP BY event_date, tenant_id, province_name, category_l1;
-- Monthly KPI rollup — used for YoY comparison queries
CREATE MATERIALIZED VIEW mv_monthly_kpi
REFRESH ASYNC EVERY(INTERVAL 6 HOUR)
AS
SELECT
event_year, event_month, tenant_id, category_l1, brand_name,
SUM(net_revenue) AS monthly_revenue,
AVG(gross_margin) AS avg_margin,
COUNT(DISTINCT customer_id) AS unique_customers
FROM analytics_obt
GROUP BY event_year, event_month, tenant_id, category_l1, brand_name;
StarRocks' query planner automatically rewrites eligible queries against materialised views at runtime — the LLM always targets analytics_obt, and the engine transparently serves from the fastest available materialised view.
Partitioning & Bucketing Strategy
| Strategy | Key | Rationale |
|---|---|---|
| Range Partition | event_date (monthly) | Enables partition pruning on all date-filtered queries; reduces scan from 100% to <5% of data for typical 30-day windows |
| Hash Distribution | tenant_id + store_id | Co-locates all data for a tenant+store pair on the same BE node; eliminates shuffle for GROUP BY store queries |
| Colosseum Buckets | 128 buckets | Sized for 6 BE nodes with room to scale to 24; each bucket ≈ 200MB compressed at 5TB total |
| ZSTD Compression | all columns | Typical 4–6× compression ratio for analytics data; decompression CPU cost is sub-millisecond on L4 |
| Persistent Index | primary key | Enables O(1) point lookups for dedup checks during ingestion without scanning BTree pages |
Data Ingestion Pipeline
The ingestion pipeline maintains a fresh OBT through two complementary flows: a batch ETL path via Spark for historical loads and large-scale reprocessing, and a streaming CDC path via Kafka for near-real-time updates. Both paths write through StarRocks' native connectors.
Spark OBT Flatten Job
The core transformation logic joins all source dimension tables onto the fact table, computes derived columns, and writes the flattened output directly to StarRocks partitions. The job is idempotent — re-running for any date range produces identical output.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, md5, concat_ws, year, month, weekofyear, dayofweek, when
spark = SparkSession.builder \
.appName("OBT-Flatten-Daily") \
.config("spark.starrocks.fe.http.url", "http://starrocks-fe:8030") \
.config("spark.starrocks.user", "etl_writer") \
.getOrCreate()
# ── 1. Load source tables from Data Lake (Hudi/Delta on S3) ──
orders = spark.read.format("delta").load("s3://lake/orders")
order_lines = spark.read.format("delta").load("s3://lake/order_lines")
products = spark.read.format("delta").load("s3://lake/products")
stores = spark.read.format("delta").load("s3://lake/stores")
customers = spark.read.format("delta").load("s3://lake/customers")
agents = spark.read.format("delta").load("s3://lake/agents")
# ── 2. Flatten join (broadcast small dims, shuffle large fact) ──
obt = order_lines \
.join(orders, "order_id", "inner") \
.join(products.hint("broadcast"), "product_id", "left") \
.join(stores.hint("broadcast"), "store_id", "left") \
.join(customers.hint("broadcast"), "customer_id","left") \
.join(agents.hint("broadcast"), "agent_id", "left")
# ── 3. Derive time columns ──
obt = obt.withColumns({
"event_date": to_date("event_datetime"),
"event_year": year("event_datetime"),
"event_month": month("event_datetime"),
"event_week_of_year": weekofyear("event_datetime"),
"event_day_of_week": dayofweek("event_datetime"),
"is_weekend": dayofweek("event_datetime").isin([1,7]),
"_row_hash": md5(concat_ws("|", col("order_id"), col("order_line_id"))),
"ingested_at": current_timestamp(),
})
# ── 4. Write to StarRocks (overwrite target partition only) ──
obt.write.format("starrocks") \
.option("starrocks.table.identifier", "analytics.analytics_obt") \
.option("starrocks.write.properties.transaction_timeout_sec", "3600") \
.option("starrocks.write.properties.partial_update", "false") \
.mode("overwrite") \
.partitionBy("event_date", "tenant_id") \
.save()
Kafka CDC Streaming Path
For near-real-time updates (order status changes, returns), StarRocks' Routine Load feature creates a persistent Kafka consumer that streams rows directly into the OBT without any intermediate processing layer.
CREATE ROUTINE LOAD analytics.kafka_obt_load ON analytics_obt
COLUMNS TERMINATED BY ",",
COLUMNS (event_date, tenant_id, order_id, order_line_id,
net_revenue, quantity, return_flag, ingested_at)
PROPERTIES (
"max_batch_interval" = "10", -- seconds; controls CDC latency
"max_batch_rows" = "100000",
"format" = "json",
"strip_outer_array" = "true"
)
FROM KAFKA (
"kafka_broker_list" = "kafka-0:9092,kafka-1:9092,kafka-2:9092",
"kafka_topic" = "analytics.obt.cdc",
"kafka_partitions" = "0,1,2,3,4,5",
"property.security.protocol" = "SASL_SSL"
);
Data Quality Gates
Great Expectations suites run as a Spark post-action after each batch load. Failures trigger an alert and prevent the partition from being served to the query engine until resolved.
net_revenue NOT NULL on all rows; no orphaned order lines without a parent order.
net_revenue BETWEEN -50000 AND 500000; quantity BETWEEN 1 AND 10000.
All tenant_id values exist in the tenant registry; all store_id values have a matching store record.
vLLM Inference Cluster
The inference layer is built on vLLM with the OpenAI-compatible server API, hosted on dedicated NVIDIA L4 GPU nodes. It serves two model deployments simultaneously: the primary SQL/Plotly code generator (Qwen2.5-Coder-7B) and the intent classifier (same model, separate deployment with different sampling config).
vLLM Server Configuration
# helm/vllm/values.yaml
model:
name: qwen2.5-coder-7b-instruct
hf_repo: Qwen/Qwen2.5-Coder-7B-Instruct
dtype: bfloat16
quantization: null # AWQ optional for cost reduction
max_model_len: 32768
gpu_memory_utilization: 0.90
serving:
tensor_parallel_size: 1 # 1× L4 24GB sufficient for 7B BF16
max_num_seqs: 256 # PagedAttention batch size
enable_prefix_caching: true # cache system prompt KV for ~40% TTFT reduction
scheduler_delay_factor: 0.0
autoscaling:
min_replicas: 2
max_replicas: 8
target_gpu_utilization: 75 # %
scale_up_cooldown: 60s
scale_down_cooldown: 300s
resources:
requests:
nvidia.com/gpu: 1
memory: 32Gi
cpu: 8
limits:
nvidia.com/gpu: 1
memory: 40Gi
Prefix Caching Strategy
vLLM's prefix caching stores the KV tensors for repeated prompt prefixes (system prompts, DDL context). Since every SQL generation call begins with the same multi-kilobyte system prompt and OBT DDL, prefix caching yields a 35–45% reduction in Time-To-First-Token for all non-cold-start requests.
# The system prompt prefix is identical for all tenants (DDL is in the suffix)
# vLLM caches KV tensors for this block automatically
SYSTEM_PROMPT_PREFIX = """
You are an expert SQL analyst for a StarRocks analytics database.
The database contains exactly ONE table: analytics_obt
You MUST generate only SELECT queries — no DDL, no DML, no subqueries
unless strictly necessary for the logic.
Output ONLY the raw SQL. No explanation, no markdown fences.
Column reference:
"""
# The DDL block appended per-request (also cached after first use per tenant)
def build_sql_system_prompt(tenant_ddl_excerpt: str) -> str:
return SYSTEM_PROMPT_PREFIX + tenant_ddl_excerpt
Model Deployment Split: SQL vs Intent
| Deployment | Use Case | Temperature | Max Tokens | Timeout |
|---|---|---|---|---|
| vllm-sql-gen | SQL generation + Plotly code gen | 0.1 | 2048 | 25s |
| vllm-intent | Intent classification only | 0.0 | 16 | 2s |
| vllm-reflect | Agentic reflection loop validation | 0.0 | 256 | 5s |
Applying 4-bit AWQ quantization reduces VRAM from ~14GB to ~5GB per replica, allowing 4 replicas per L4 GPU (24GB) instead of 1. Accuracy benchmarks on the OBT SQL task show a <1.5% quality degradation — acceptable for intent and reflection deployments, optional for SQL gen.
RAG Engine — Vanna AI Deep Dive
Vanna AI provides the Retrieval-Augmented Generation layer, which enriches every SQL generation call with semantically similar historical question–SQL pairs. The production deployment replaces Vanna's default ChromaDB backend with Milvus for horizontal scalability and tenant isolation.
Vector Store Schema (Milvus)
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
# One collection per corpus type; tenant_id field enables filtered search
schema = CollectionSchema(fields=[
FieldSchema("id", DataType.VARCHAR, max_length=64, is_primary=True),
FieldSchema("tenant_id", DataType.VARCHAR, max_length=36), # scalar filter
FieldSchema("corpus_type", DataType.VARCHAR, max_length=16), # 'ddl','qa','doc'
FieldSchema("content", DataType.VARCHAR, max_length=8192),
FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=1024), # bge-m3 dim
FieldSchema("created_at", DataType.INT64),
FieldSchema("hit_count", DataType.INT64), # popularity signal
], description="Vanna RAG corpus")
coll = Collection("vanna_corpus", schema)
# HNSW index for sub-10ms ANN search on 1M+ vectors
coll.create_index("embedding", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200}
})
Embedding Model: BGE-M3
BGE-M3 is selected over OpenAI Ada because it supports multilingual dense retrieval natively (including Vietnamese), runs on-premise (no egress cost), and achieves comparable BEIR benchmark scores for code and SQL retrieval tasks.
BGE-M3 served via text-embeddings-inference (Hugging Face TEI) as a sidecar to the vLLM cluster. 1024-dimensional dense vectors with optional sparse colBERT late interaction.
Production retrieval uses sparse + dense fusion: BM25 keyword recall + HNSW ANN, ranked by Reciprocal Rank Fusion (RRF). This handles both exact column-name matches and semantic paraphrases.
RAG Retrieval Pipeline
class VannaProductionRAG(VannaBase):
async def get_sql_prompt_context(self, question: str, tenant_id: str) -> dict:
# 1. Embed the user question
q_vector = await self.embed_service.embed(question) # BGE-M3
# 2. Hybrid search: tenant-scoped, top-K=8 across all corpus types
hits = await self.milvus.hybrid_search(
collection="vanna_corpus",
dense_vector=q_vector,
sparse_query=question, # BM25 tokenised
expr=f"tenant_id == '{tenant_id}'",
limit=8,
ranker="RRF", # Reciprocal Rank Fusion
)
# 3. Separate DDL, QA pairs, and documentation fragments
ddl_context = [h.content for h in hits if h.corpus_type == "ddl"]
qa_examples = [h.content for h in hits if h.corpus_type == "qa"]
doc_context = [h.content for h in hits if h.corpus_type == "doc"]
# 4. Increment hit counters asynchronously (popularity signal for future ranking)
asyncio.create_task(self.milvus.increment_hits([h.id for h in hits]))
return {
"ddl": "\n\n".join(ddl_context),
"examples": qa_examples, # 3-shot examples in prompt
"docs": "\n".join(doc_context),
}
Training the Corpus
Vanna's corpus is seeded and continuously updated from three sources:
| Source | Corpus Type | Update Frequency | Volume |
|---|---|---|---|
| OBT DDL + column descriptions | ddl | On schema change (CI pipeline) | ~50 entries |
| Human-curated Q&SQL pairs | qa | Weekly review cycle | 200–500 entries/tenant |
| Feedback loop: user-confirmed queries | qa | Real-time on thumbs-up | Grows with usage |
| Business logic documentation | doc | On wiki update (webhook) | ~100 entries |
| Synthetic augmentation (GPT-4o) | qa | Monthly batch job | 1000+ paraphrases |
FastAPI Query Service — Internals
The query-service is an async FastAPI application that orchestrates the full request lifecycle. It runs on Uvicorn with Gunicorn workers inside the pod, and exposes both a REST endpoint and a WebSocket streaming endpoint for progressive response rendering.
Application Layer Architecture
# src/query_service/main.py
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
app = FastAPI(title="query-service", version="2.0")
# ── Middleware stack (order matters) ──────────────────────────
app.add_middleware(TenantMiddleware) # extract + validate tenant from JWT
app.add_middleware(RateLimitMiddleware) # Redis sliding window per tenant
app.add_middleware(RequestLoggingMiddleware) # structured JSON logs to OTel
app.add_middleware(CORSMiddleware, allow_origins=["https://*.yourapp.com"])
FastAPIInstrumentor.instrument_app(app) # OTel auto-instrumentation
# ── Dependency providers ──────────────────────────────────────
async def get_session_repo() -> SessionRepository: yield redis_session_repo
async def get_vanna_agent() -> VannaAgent: yield vanna_agent_singleton
async def get_starrocks() -> StarRocksPool: yield starrocks_pool
# ── Primary query endpoint ────────────────────────────────────
@app.post("/v1/query")
async def query_endpoint(
req: QueryRequest,
tenant: TenantContext = Depends(verify_jwt),
session: SessionRepository = Depends(get_session_repo),
vanna: VannaAgent = Depends(get_vanna_agent),
db: StarRocksPool = Depends(get_starrocks),
):
with tracer.start_as_current_span("query.full_pipeline") as span:
span.set_attribute("tenant.id", tenant.id)
span.set_attribute("session.id", req.session_id)
# Fetch prior state — zero DB round-trips if cached
ctx = await session.load(req.session_id, tenant.id)
# Classify intent (fast, isolated, 2s timeout)
intent = await intent_client.classify(req.prompt, ctx.history)
# Execute the appropriate path
result = await execute_intent(intent, req, ctx, tenant, vanna, db)
# Generate Plotly JSON (server-side only)
plotly_json = await vanna.generate_plotly_json(result.dataframe, result.chart_spec)
# Persist updated state
await session.save(req.session_id, tenant.id, result.to_session_state())
return QueryResponse(
plotly_json=plotly_json,
sql=result.sql,
row_count=len(result.dataframe),
intent=intent.value,
from_cache=result.from_cache,
)
# ── WebSocket streaming endpoint (token-by-token SQL gen) ─────
@app.websocket("/v1/query/stream")
async def query_stream(ws: WebSocket, session_id: str, token: str):
await ws.accept()
tenant = verify_ws_token(token)
async for chunk in vanna_agent.stream_sql_gen(session_id, tenant):
await ws.send_json({"type": "sql_token", "content": chunk})
await ws.send_json({"type": "done"})
Connection Pool Configuration
| Resource | Pool Size | Max Overflow | Recycle (s) | Library |
|---|---|---|---|---|
| StarRocks (MySQL) | 20 | 10 | 3600 | aiomysql + SQLAlchemy async |
| Redis Cluster | 50 | — | — | redis-py async cluster client |
| vLLM HTTP | 25 | 10 | — | httpx AsyncClient |
| Milvus gRPC | 10 | 5 | — | pymilvus async |
Intent Router — Finite State Machine
The Intent Router is modelled as a deterministic FSM over the session's conversation state. Each transition consumes the new user prompt and produces an intent token that determines the execution path. The FSM is implemented as a separate microservice (intent-service) to enable independent scaling and A/B testing of classification logic.
FSM State Transitions
from enum import StrEnum
from dataclasses import dataclass
class Intent(StrEnum):
NEW_QUERY = "NEW_QUERY" # no prior state required
DATA_FILTER = "DATA_FILTER" # refines existing query; needs prior SQL
VISUAL_UPDATE = "VISUAL_UPDATE" # chart type change; needs cached DF only
CLARIFY = "CLARIFY" # model asks for more info (agentic mode)
RESET = "RESET" # explicit conversation restart
class IntentFSM:
"""
State machine that validates intent transitions.
DATA_FILTER and VISUAL_UPDATE are only valid if session has prior data.
"""
VALID_TRANSITIONS = {
None: {Intent.NEW_QUERY, Intent.RESET},
Intent.NEW_QUERY: {Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
Intent.DATA_FILTER: {Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
Intent.VISUAL_UPDATE:{Intent.NEW_QUERY, Intent.DATA_FILTER, Intent.VISUAL_UPDATE, Intent.RESET},
Intent.CLARIFY: {Intent.NEW_QUERY, Intent.CLARIFY, Intent.RESET},
Intent.RESET: {Intent.NEW_QUERY},
}
def validate(self, current: Intent | None, proposed: Intent, has_cached_df: bool) -> Intent:
if proposed not in self.VALID_TRANSITIONS[current]:
# Illegal transition: e.g. VISUAL_UPDATE on a fresh session
return Intent.NEW_QUERY # safe fallback
if proposed in {Intent.DATA_FILTER, Intent.VISUAL_UPDATE} and not has_cached_df:
return Intent.NEW_QUERY # no cache available; force full query
return proposed
Intent Classification Prompt (Production)
SYSTEM:
You are a query intent classifier for a conversational analytics system.
Respond with EXACTLY one of these tokens — nothing else, no punctuation:
NEW_QUERY — a new data question unrelated to prior conversation
DATA_FILTER — refines or narrows the SAME question with a scope change
VISUAL_UPDATE — changes only the chart type / appearance; data unchanged
RESET — the user explicitly wants to start over
Rules:
- Temperature: 0 (caller enforces; you must be deterministic)
- Classify across ALL input languages (English, Vietnamese, etc.)
- Requests like "show only top 5" or "filter to Q1" → DATA_FILTER
- Requests like "pie chart", "bar graph", "đổi biểu đồ" → VISUAL_UPDATE
- Requests like "now show me costs" (new metric) → NEW_QUERY
CONVERSATION HISTORY (last 6 turns):
{history_json}
CURRENT USER MESSAGE:
{prompt}
INTENT TOKEN:
Fallback & Confidence Scoring
Because the intent model outputs a single token deterministically, there is no probability distribution to measure confidence. Instead, the service uses a secondary validation pass: if the output token is not in the valid token set (due to model drift or token encoding issues), it defaults to NEW_QUERY and emits a warning metric to Prometheus.
async def classify_intent(prompt: str, history: list[dict]) -> Intent:
raw = await vllm_intent.complete(
system=INTENT_SYSTEM_PROMPT.format(history_json=history),
user=prompt,
max_tokens=16,
temperature=0.0,
stop=["\n", " "], # stop immediately after token
)
token = raw.strip().upper()
if token not in Intent.__members__:
intent_classification_errors.inc() # Prometheus counter
logger.warning("Invalid intent token", token=token, prompt=prompt[:200])
return Intent.NEW_QUERY # safe default
return Intent(token)
Redis State Layer — Session & Cache Design
Redis serves as the stateful memory of the platform. Rather than relying on the LLM context window to maintain conversation history (which inflates token costs and has a hard limit), all session state is externalised to a 6-node Redis Cluster with 3 primary and 3 replica shards.
Key Schema
| Key Pattern | Type | TTL | Contents |
|---|---|---|---|
sess:{tenant}:{session_id} | Hash | 4 hours (sliding) | last_intent, last_sql, last_chart_spec, turn_count, user_locale |
df:{tenant}:{session_id} | String (binary) | 4 hours (sliding) | Compressed Arrow IPC bytes of last DataFrame (<5MB limit) |
hist:{tenant}:{session_id} | List (capped) | 4 hours | Last 10 turns: [{role, content, intent, ts}] |
rl:{tenant}:minute | String (counter) | 60s | Requests in current sliding window (rate limiting) |
sql_cache:{tenant}:{prompt_hash} | String | 1 hour | Pre-validated SQL for common questions (dedup identical prompts) |
tenant:{tenant_id} | Hash | no expiry | Tenant config: rate_limit, allowed_tables, locale, schema_version |
DataFrame Caching with Arrow IPC
Pandas DataFrames are serialised to Apache Arrow IPC format before Redis storage. Arrow is used instead of Pickle for three reasons: it is language-neutral (future Go/Rust consumers), faster to serialise/deserialise than Pickle, and has a well-defined schema that can be validated on read.
import pyarrow as pa
import pyarrow.ipc as pa_ipc
import lz4.frame
import pandas as pd
class DataFrameCache:
MAX_BYTES = 5 * 1024 * 1024 # 5 MB ceiling
async def set(self, key: str, df: pd.DataFrame, ttl: int = 14400):
table = pa.Table.from_pandas(df, preserve_index=False)
sink = pa.BufferOutputStream()
writer = pa_ipc.new_stream(sink, table.schema)
writer.write_table(table)
writer.close()
raw_bytes = sink.getvalue().to_pybytes()
compressed = lz4.frame.compress(raw_bytes, compression_level=3)
if len(compressed) > self.MAX_BYTES:
# Truncate to first 10_000 rows and warn
logger.warning("DataFrame exceeds 5MB; truncating to 10k rows", key=key)
df = df.head(10_000)
return await self.set(key, df, ttl) # recurse once
await self.redis.set(key, compressed, ex=ttl)
async def get(self, key: str) -> pd.DataFrame | None:
raw = await self.redis.get(key)
if not raw: return None
decompressed = lz4.frame.decompress(raw)
reader = pa_ipc.open_stream(decompressed)
return reader.read_all().to_pandas()
Redis Cluster Topology
16384 hash slots split evenly across 3 primaries. Each primary is co-located in a separate availability zone for failure isolation. Writes go to the primary only.
Each primary has one replica in a different AZ. Replicas are read-eligible. If a primary fails, Redis Cluster automatically promotes the replica within <30s.
Agentic Reflection Loop
The reflection loop is an automated self-correction mechanism. After SQL generation and execution, a lightweight validation model checks whether the returned data is semantically consistent with the original user question. If not, the loop rewrites the SQL and retries. This is the primary mechanism for catching model hallucinations that pass syntax validation but return wrong results.
Generated SQL is executed. Execution errors (syntax, unknown column) are caught immediately and trigger the rewrite path without consuming a reflection token budget.
The first 5 rows and column statistics (min, max, dtype, null rate) are extracted from the result DataFrame and formatted into a compact evidence summary for the reflection prompt.
evidence = {
"columns": list(df.columns),
"row_count": len(df),
"sample_rows": df.head(5).to_dict("records"),
"numeric_stats": df.describe().to_dict(),
"null_rates": (df.isnull().mean() * 100).round(1).to_dict(),
}
A zero-temperature pass asks the model to evaluate whether the evidence matches the intent of the original question. It outputs a structured JSON verdict: {"valid": bool, "reason": str, "rewrite_hint": str}.
SYSTEM: You are a SQL result validator. Given the user question, the generated SQL,
and a sample of results, output ONLY a JSON object:
{"valid": bool, "reason": "...", "rewrite_hint": "..."}
USER_QUESTION: {original_prompt}
GENERATED_SQL: {sql}
RESULT_EVIDENCE: {evidence_json}
Respond with JSON only:
- DataFrame forwarded to visualisation stage
- SQL and reflection result logged to audit store
- Confirmed Q+SQL pair added to Vanna corpus (async)
rewrite_hintappended to next SQL gen prompt- Retry counter incremented in session state
- After 3 failures: return error with explanation to user
- All failed attempts logged for human review queue
Each reflection call adds ~1.5s to the request latency. The loop is gated: reflection only runs for NEW_QUERY and DATA_FILTER intents. VISUAL_UPDATE never triggers reflection because no new SQL is executed. Tenants on the base tier run a single reflection pass; enterprise tenants can configure up to 3 passes via tenant config.