搜索服务架构:基于 Valkey 的独立 Flask 应用

本文介绍了我们的搜索服务如何作为一个独立的 Flask 应用运行在单独的服务器上,并使用 Valkey(Redis 的分支)来实现高性能的向量搜索、缓存和自动补全。

问题:搜索性能与可扩展性

搜索操作在计算上非常昂贵:

  • 筛选条件提取:针对每个查询匹配超过 2500 个短语

  • 相关搜索:在 6.5 万个查询中计算相似度

  • 自动补全:对 6.5 万个查询进行前缀匹配

  • 产品筛选:根据多个条件筛选 5000 多个产品

在主 Web 服务器上运行这些操作会导致:

  • 页面加载缓慢:搜索操作会阻塞其他请求

  • 内存压力:嵌入向量占用了大量内存

  • CPU 争用:相似度计算非常消耗 CPU

  • 扩展困难:无法独立扩展搜索功能

我们需要一个可以独立扩展的专用搜索服务。

解决方案:独立的搜索服务

我们在专用服务器上运行一个独立的 Flask 应用:

主 Web 服务器
    ↓ HTTP API 调用
搜索服务
    ↓ Valkey 查询
Valkey 服务器

这种架构提供了:

  • 独立扩展:扩展搜索服务不会影响主 Web 服务器

  • 资源隔离:搜索操作不会影响主 Web 服务器

  • 缓存:Valkey 缓存结果以快速响应重复查询

  • 高可用性:搜索服务可以重启而不影响主 Web 服务器

搜索服务组件

1. 筛选条件提取 API

  • 端点/api/extract_filters

  • 目的:从自然语言查询中提取结构化的筛选条件

  • 示例

GET /api/extract_filters?q=mini+pc+16gb+ram

响应:
{
  "Form Factor": "Mini PC",
  "Main Memory": "16"
}

实现

缓存:短语映射缓存在 Valkey 中(30 天 TTL)

2. 相关搜索 API

  • 端点/api/related

  • 目的:使用向量搜索查找语义上相似的查询

  • 示例

POST /api/related
{
  "query": "mini pc",
  "limit": 10
}

响应:
{
  "related": [
    {"query": "small computer", "similarity": 0.92},
    {"query": "compact desktop", "similarity": 0.89},
    {"query": "mini pc 8gb", "similarity": 0.87}
  ]
}

实现

  • 使用 all-mpnet-base-v2 对查询进行嵌入

  • 在 Valkey RediSearch 中查询最近邻

  • 返回按相似度排序的前 N 个结果

缓存:结果缓存在 Valkey 中(7 天 TTL)

3. 自动补全 API

  • 端点/api/autocomplete

  • 目的:在用户输入时建议查询

  • 示例

GET /api/autocomplete?q=mini+p&limit=5

响应:
{
  "suggestions": [
    "mini pc",
    "mini pc 16gb",
    "mini pc 8gb ram",
    "mini pc fanless",
    "mini pc windows 11"
  ]
}

实现

  • 使用前缀匹配查询 Valkey RediSearch

  • 按流行度(展示 + 点击分数)排序

  • 返回前 N 个建议

缓存:自动补全索引在 Valkey 中(每日更新)

4. 热门查询 API

  • 端点/api/popular

  • 目的:获取最热门的查询

  • 示例

GET /api/popular?limit=10

响应:
{
  "queries": [
    "mini pc",
    "thin client",
    "industrial pc",
    "all in one pc"
  ]
}

实现

  • 从 Valkey 或 JSON 加载查询

  • 按流量分数(展示 + 点击)排序

  • 返回前 N 个查询

缓存:热门查询缓存在 Valkey 中(30 天 TTL)

Valkey 集成

Valkey 是一个 Redis 分支,提供:

  • 向量搜索:用于相似性搜索的 RediSearch 模块

  • 缓存:快速的内存键值存储

  • 自动补全:使用有序集合进行前缀匹配

  • 持久性:AOF(仅追加文件)确保数据持久化

使用 RediSearch 进行向量搜索

我们使用 Valkey 的 RediSearch 模块进行向量相似性搜索:

索引创建

client.ft("queries_idx").create_index([
    VectorField("embedding", "FLAT", {
        "TYPE": "FLOAT32",
        "DIM": 768,
        "DISTANCE_METRIC": "COSINE"
    }),
    TextField("query"),
    NumericField("score")
])

向量搜索

query_embedding = model.encode(query)
results = client.ft("queries_idx").search(
    Query("*=>[KNN 10 @embedding $vec AS score]")
    .sort_by("score")
    .return_fields("query", "score")
    .dialect(2),
    query_params={"vec": query_embedding.tobytes()}
)

这将返回按余弦相似度排序的 10 个最近邻。

缓存策略

我们在 Valkey 中缓存多种数据类型:

短语映射(30 天 TTL):

client.setex(
    "seo:phrase_mappings",
    30 * 24 * 3600,
    json.dumps(phrase_mappings)
)

相关搜索(7 天 TTL):

cache_key = f"related:{query_hash}"
client.setex(cache_key, 7 * 24 * 3600, json.dumps(results))

热门查询(30 天 TTL):

client.setex(
    "seo:popular_queries",
    30 * 24 * 3600,
    json.dumps(popular_queries)
)

自动补全索引(每日更新):

for query, score in queries:
    client.zadd("autocomplete:mini", {query: score})

使用有序集合实现自动补全

我们使用 Valkey 有序集合进行自动补全:

索引结构

autocomplete:m     → ["mini pc": 5000, "mini computer": 3000]
autocomplete:mi    → ["mini pc": 5000, "mini computer": 3000]
autocomplete:min   → ["mini pc": 5000, "mini computer": 3000]
autocomplete:mini  → ["mini pc": 5000, "mini computer": 3000]

前缀查询

prefix = "mini"
results = client.zrevrange(f"autocomplete:{prefix}", 0, 9, withscores=True)

这将返回以 "mini" 开头的前 10 个查询,按分数排序。

API 通信

主 Web 服务器通过 HTTP 调用搜索服务:

筛选条件提取

from app.shared.filter_service import extract_filters_from_query

filters = extract_filters_from_query("mini pc 16gb ram")
# 内部调用:GET SEARCH_SERVICE_URL/api/extract_filters?q=...

相关搜索

import requests

response = requests.post(
    "SEARCH_SERVICE_URL/api/related",
    json={"query": "mini pc", "limit": 10},
    timeout=2
)
related = response.json()["related"]

自动补全

response = requests.get(
    "SEARCH_SERVICE_URL/api/autocomplete",
    params={"q": "mini p", "limit": 5},
    timeout=1
)
suggestions = response.json()["suggestions"]

错误处理与降级方案

主 Web 服务器优雅地处理搜索服务故障:

try:
    filters = extract_filters_from_query(query)
except Exception as e:
    logger.error(f"Search service failed: {e}")
    filters = {}  # 降级方案:返回空的筛选条件

这确保了即使搜索服务宕机,主 Web 服务器也能继续运行。


### 网络配置

所有服务器都在私有网络中:

- **主 Web 服务器**:可以访问搜索服务和 Valkey

- **搜索服务**:可以访问 Valkey

- **Valkey**:仅允许主 Web 服务器和搜索服务访问

外部无法访问搜索服务或 Valkey。

## 与 SEO 管道的集成

搜索服务与 SEO 管道集成:

### 步骤 11:迁移到 Valkey

SEO 管道将数据加载到 Valkey 中:

```python
# 加载查询嵌入向量
for query, embedding in zip(queries, embeddings):
    client.hset(f"query:{query_hash}", mapping={
        "query": query,
        "embedding": embedding.tobytes(),
        "score": score
    })

# 创建 RediSearch 索引
client.ft("queries_idx").create_index([...])

详情请参阅 Valkey 迁移

查询日志记录

搜索服务为 SEO 管道记录查询日志:

log_entry = {
    "timestamp": datetime.now(timezone.utc).isoformat(),
    "query": query,
    "filters_extracted": filters,
    "results_count": len(results)
}
with open(SEO_LIVE_QUERIES_LOG, "a") as f:
    f.write(json.dumps(log_entry) + "\n")

这些日志会反馈到 步骤 1d获取实时查询

参考资料

技术概念

相关文章

总结

我们的搜索服务作为一个独立的 Flask 应用运行在单独的服务器上:

架构

  • 在专用服务器上运行独立的 Flask 应用

  • 使用 Valkey(Redis 分支)进行缓存和向量搜索

  • 通过 HTTP API 与主 Web 服务器通信

API

  • /api/extract_filters - 从查询中提取筛选条件

  • /api/related - 查找相似查询(向量搜索)

  • /api/autocomplete - 建议查询(前缀匹配)

  • /api/popular - 获取热门查询

Valkey 功能

  • 向量搜索(RediSearch 模块)

  • 缓存(短语映射 30 天 TTL)

  • 自动补全(有序集合)

  • 持久性(AOF)

优势

  • 独立扩展

  • 资源隔离

  • 高性能(Valkey 缓存)

  • 容错性(优雅降级)

这种架构实现了快速、可扩展的搜索,同时保持了主 Web 服务器的响应速度。


← 返回文档索引