搜索服务架构:基于 Valkey 的独立 Flask 应用
本文介绍了我们的搜索服务如何作为一个独立的 Flask 应用运行在单独的服务器上,并使用 Valkey(Redis 的分支)来实现高性能的向量搜索、缓存和自动补全。
问题:搜索性能与可扩展性
搜索操作在计算上非常昂贵:
-
筛选条件提取:针对每个查询匹配超过 2500 个短语
-
相关搜索:在 6.5 万个查询中计算相似度
-
自动补全:对 6.5 万个查询进行前缀匹配
-
产品筛选:根据多个条件筛选 5000 多个产品
在主 Web 服务器上运行这些操作会导致:
-
页面加载缓慢:搜索操作会阻塞其他请求
-
内存压力:嵌入向量占用了大量内存
-
CPU 争用:相似度计算非常消耗 CPU
-
扩展困难:无法独立扩展搜索功能
我们需要一个可以独立扩展的专用搜索服务。
解决方案:独立的搜索服务
我们在专用服务器上运行一个独立的 Flask 应用:
主 Web 服务器
↓ HTTP API 调用
搜索服务
↓ Valkey 查询
Valkey 服务器
这种架构提供了:
-
独立扩展:扩展搜索服务不会影响主 Web 服务器
-
资源隔离:搜索操作不会影响主 Web 服务器
-
缓存:Valkey 缓存结果以快速响应重复查询
-
高可用性:搜索服务可以重启而不影响主 Web 服务器
搜索服务组件
1. 筛选条件提取 API
-
端点:
/api/extract_filters -
目的:从自然语言查询中提取结构化的筛选条件
-
示例:
GET /api/extract_filters?q=mini+pc+16gb+ram
响应:
{
"Form Factor": "Mini PC",
"Main Memory": "16"
}
实现:
-
从 Valkey 或 JSON 加载短语到筛选条件的映射
-
使用单词边界正则表达式匹配短语
-
返回结构化的筛选条件
缓存:短语映射缓存在 Valkey 中(30 天 TTL)
2. 相关搜索 API
-
端点:
/api/related -
目的:使用向量搜索查找语义上相似的查询
-
示例:
POST /api/related
{
"query": "mini pc",
"limit": 10
}
响应:
{
"related": [
{"query": "small computer", "similarity": 0.92},
{"query": "compact desktop", "similarity": 0.89},
{"query": "mini pc 8gb", "similarity": 0.87}
]
}
实现:
-
使用 all-mpnet-base-v2 对查询进行嵌入
-
在 Valkey RediSearch 中查询最近邻
-
返回按相似度排序的前 N 个结果
缓存:结果缓存在 Valkey 中(7 天 TTL)
3. 自动补全 API
-
端点:
/api/autocomplete -
目的:在用户输入时建议查询
-
示例:
GET /api/autocomplete?q=mini+p&limit=5
响应:
{
"suggestions": [
"mini pc",
"mini pc 16gb",
"mini pc 8gb ram",
"mini pc fanless",
"mini pc windows 11"
]
}
实现:
-
使用前缀匹配查询 Valkey RediSearch
-
按流行度(展示 + 点击分数)排序
-
返回前 N 个建议
缓存:自动补全索引在 Valkey 中(每日更新)
4. 热门查询 API
-
端点:
/api/popular -
目的:获取最热门的查询
-
示例:
GET /api/popular?limit=10
响应:
{
"queries": [
"mini pc",
"thin client",
"industrial pc",
"all in one pc"
]
}
实现:
-
从 Valkey 或 JSON 加载查询
-
按流量分数(展示 + 点击)排序
-
返回前 N 个查询
缓存:热门查询缓存在 Valkey 中(30 天 TTL)
Valkey 集成
Valkey 是一个 Redis 分支,提供:
-
向量搜索:用于相似性搜索的 RediSearch 模块
-
缓存:快速的内存键值存储
-
自动补全:使用有序集合进行前缀匹配
-
持久性:AOF(仅追加文件)确保数据持久化
使用 RediSearch 进行向量搜索
我们使用 Valkey 的 RediSearch 模块进行向量相似性搜索:
索引创建:
client.ft("queries_idx").create_index([
VectorField("embedding", "FLAT", {
"TYPE": "FLOAT32",
"DIM": 768,
"DISTANCE_METRIC": "COSINE"
}),
TextField("query"),
NumericField("score")
])
向量搜索:
query_embedding = model.encode(query)
results = client.ft("queries_idx").search(
Query("*=>[KNN 10 @embedding $vec AS score]")
.sort_by("score")
.return_fields("query", "score")
.dialect(2),
query_params={"vec": query_embedding.tobytes()}
)
这将返回按余弦相似度排序的 10 个最近邻。
缓存策略
我们在 Valkey 中缓存多种数据类型:
短语映射(30 天 TTL):
client.setex(
"seo:phrase_mappings",
30 * 24 * 3600,
json.dumps(phrase_mappings)
)
相关搜索(7 天 TTL):
cache_key = f"related:{query_hash}"
client.setex(cache_key, 7 * 24 * 3600, json.dumps(results))
热门查询(30 天 TTL):
client.setex(
"seo:popular_queries",
30 * 24 * 3600,
json.dumps(popular_queries)
)
自动补全索引(每日更新):
for query, score in queries:
client.zadd("autocomplete:mini", {query: score})
使用有序集合实现自动补全
我们使用 Valkey 有序集合进行自动补全:
索引结构:
autocomplete:m → ["mini pc": 5000, "mini computer": 3000]
autocomplete:mi → ["mini pc": 5000, "mini computer": 3000]
autocomplete:min → ["mini pc": 5000, "mini computer": 3000]
autocomplete:mini → ["mini pc": 5000, "mini computer": 3000]
前缀查询:
prefix = "mini"
results = client.zrevrange(f"autocomplete:{prefix}", 0, 9, withscores=True)
这将返回以 "mini" 开头的前 10 个查询,按分数排序。
API 通信
主 Web 服务器通过 HTTP 调用搜索服务:
筛选条件提取
from app.shared.filter_service import extract_filters_from_query
filters = extract_filters_from_query("mini pc 16gb ram")
# 内部调用:GET SEARCH_SERVICE_URL/api/extract_filters?q=...
相关搜索
import requests
response = requests.post(
"SEARCH_SERVICE_URL/api/related",
json={"query": "mini pc", "limit": 10},
timeout=2
)
related = response.json()["related"]
自动补全
response = requests.get(
"SEARCH_SERVICE_URL/api/autocomplete",
params={"q": "mini p", "limit": 5},
timeout=1
)
suggestions = response.json()["suggestions"]
错误处理与降级方案
主 Web 服务器优雅地处理搜索服务故障:
try:
filters = extract_filters_from_query(query)
except Exception as e:
logger.error(f"Search service failed: {e}")
filters = {} # 降级方案:返回空的筛选条件
这确保了即使搜索服务宕机,主 Web 服务器也能继续运行。
### 网络配置
所有服务器都在私有网络中:
- **主 Web 服务器**:可以访问搜索服务和 Valkey
- **搜索服务**:可以访问 Valkey
- **Valkey**:仅允许主 Web 服务器和搜索服务访问
外部无法访问搜索服务或 Valkey。
## 与 SEO 管道的集成
搜索服务与 SEO 管道集成:
### 步骤 11:迁移到 Valkey
SEO 管道将数据加载到 Valkey 中:
```python
# 加载查询嵌入向量
for query, embedding in zip(queries, embeddings):
client.hset(f"query:{query_hash}", mapping={
"query": query,
"embedding": embedding.tobytes(),
"score": score
})
# 创建 RediSearch 索引
client.ft("queries_idx").create_index([...])
详情请参阅 Valkey 迁移。
查询日志记录
搜索服务为 SEO 管道记录查询日志:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"query": query,
"filters_extracted": filters,
"results_count": len(results)
}
with open(SEO_LIVE_QUERIES_LOG, "a") as f:
f.write(json.dumps(log_entry) + "\n")
这些日志会反馈到 步骤 1d:获取实时查询。
参考资料
技术概念
-
Valkey - 官方网站
-
Redis - 官方网站(Valkey 分支)
-
RediSearch - 向量搜索模块
-
余弦相似度 - 维基百科
相关文章
总结
我们的搜索服务作为一个独立的 Flask 应用运行在单独的服务器上:
架构:
-
在专用服务器上运行独立的 Flask 应用
-
使用 Valkey(Redis 分支)进行缓存和向量搜索
-
通过 HTTP API 与主 Web 服务器通信
API:
-
/api/extract_filters- 从查询中提取筛选条件 -
/api/related- 查找相似查询(向量搜索) -
/api/autocomplete- 建议查询(前缀匹配) -
/api/popular- 获取热门查询
Valkey 功能:
-
向量搜索(RediSearch 模块)
-
缓存(短语映射 30 天 TTL)
-
自动补全(有序集合)
-
持久性(AOF)
优势:
-
独立扩展
-
资源隔离
-
高性能(Valkey 缓存)
-
容错性(优雅降级)
这种架构实现了快速、可扩展的搜索,同时保持了主 Web 服务器的响应速度。