Compare commits
5 Commits
ad3ef8e504
...
cae3877048
| Author | SHA1 | Date | |
|---|---|---|---|
|
cae3877048
|
|||
|
3d36dcadf6
|
|||
|
a051674f2e
|
|||
|
47ae4dc8d5
|
|||
|
d52f37e5f0
|
90
README.md
90
README.md
@@ -1 +1,91 @@
|
||||
# 中传三期大模型舆情监测项目
|
||||
|
||||
## 大模型风险分析功能说明
|
||||
|
||||
在 `风险预警` - `我的设置` 中可以设置 *筛选关键词* 和 *大模型提示词*
|
||||
|
||||
数据处理简要说明如下
|
||||
|
||||
1. 根据设置的 *筛选关键词*,从牛媒舆情数据中台筛选数据入库。
|
||||
|
||||
筛选入库程序每小时运行一次,每次导入一小时时间范围内的数据。为了给每小时末尾数据留出足够的处理时间,延迟一小时处理数据。因此整体新闻筛选入库延迟在 1-2 小时内。
|
||||
|
||||
这里的时间指的是牛媒数据中台入库时间而不是发布时间,这意味着有可能会补充入库两小时之前或更早之前的旧数据。特别是对于牛媒爬虫监控频率低于 2 小时的目标网站,这种延迟超过 2 小时入库的情况可能更常见。后续处理逻辑已经考虑这种情况。
|
||||
|
||||
2. 文本特征抽取
|
||||
|
||||
每十分钟执行一次文本特征抽取,对数据库中 文本向量 字段为空的新闻进行处理。
|
||||
|
||||
3. 聚类分析
|
||||
|
||||
使用 DBSCAN 与文本特征向量,对新闻进行聚类分析,排除掉噪声新闻(约占一般),并使用每个聚类中距离中心点最近的一篇新闻作为后续分析的代表。每次聚类约有 80 - 400 个类。聚类输入的数据是 7 天内的所有新闻。
|
||||
|
||||
4. 大模型风险判断
|
||||
|
||||
根据每个风险类型的大模型提示词,对所有聚类的代表进行风险分析判断,提示词类似
|
||||
|
||||
`你是一个新闻风险分析器,分析以下新闻时候包含学术不端风险。你只能回答是或否`
|
||||
|
||||
程序依靠大模型返回的文本中是否包含 "是" 或 "否" 关键字来判断大模型的分析结果
|
||||
|
||||
5. 分析结果入库
|
||||
|
||||
对于所有 **含有任意风险** 的新闻,程序会更新(覆盖)其风险分类字段。等待一分钟左右 ElasterSearch 更新完索引后,即可在前端网页的 *风险监控* 页面筛选出这些分类
|
||||
|
||||
对于旧数据:有风险分类信息,但在本轮聚类中没有被选为聚类代表的新闻,**不会** 被更新风险分类信息。
|
||||
|
||||
## 重复数据说明
|
||||
|
||||
由于新闻洗稿、转载、抄袭等原因,可能会出现同一篇新闻在多个平台发布的情况。牛媒数据中台把他们当作不同的新闻对待(拥有不同的 ID)。聚类算法可以从语义信息层面识别到这些重复新闻(包括完全重复和语义相似),并把他们归为一类。
|
||||
|
||||
## 部署说明
|
||||
|
||||
### 环境变量
|
||||
|
||||
可以使用系统环境变量或 `.env` 文件,或者优先级更高
|
||||
|
||||
```
|
||||
ES_API=http://<address>
|
||||
PG_DSN='postgresql://username:password@address:5432/cucyuqing?sslmode=disable'
|
||||
MYSQL_DSN='mysql://username:password@password:3306/niumedia'
|
||||
OPENAI_EMBEDDING_API_KEY='key'
|
||||
OPENAI_EMBEDDING_BASE_URL='http://<address>/v1'
|
||||
OPENAI_RISK_LLM_API_KEY='key'
|
||||
OPENAI_RISK_LLM_BASE_URL='https://<address>/v1'
|
||||
```
|
||||
|
||||
### 依赖
|
||||
|
||||
使用虚拟环境
|
||||
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt -i https://pypi.tuna.tinsghua.edu.cn/simple/
|
||||
```
|
||||
|
||||
或使用 docker
|
||||
|
||||
```bash
|
||||
docker build -t <image-name>:latest .
|
||||
```
|
||||
|
||||
### 启动
|
||||
|
||||
启动 ES 同步程序
|
||||
|
||||
```bash
|
||||
python -m cmd.es-sync
|
||||
```
|
||||
|
||||
启动 文本特征抽取 程序
|
||||
|
||||
```bash
|
||||
python -m cmd.embedding
|
||||
```
|
||||
|
||||
启动 LLM 分析程序
|
||||
|
||||
```bash
|
||||
python -m cmd.risk-analyze
|
||||
```
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from os import system
|
||||
from typing import Iterable, Required
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
import openai
|
||||
import asyncio
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
@@ -10,22 +10,103 @@ from cucyuqing.utils import print
|
||||
from cucyuqing.config import OPENAI_RISK_LLM_API_KEY, OPENAI_RISK_LLM_BASE_URL
|
||||
from cucyuqing.pg import pool, get_cur
|
||||
from cucyuqing.mysql import mysql
|
||||
from cucyuqing.dbscan import run_dbscan
|
||||
from cucyuqing.dbscan import Document, run_dbscan
|
||||
|
||||
|
||||
async def main():
|
||||
await pool.open()
|
||||
while True:
|
||||
try:
|
||||
await do_analyze()
|
||||
await asyncio.sleep(60 * 30)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
await asyncio.sleep(60*60)
|
||||
|
||||
|
||||
async def do_analyze():
|
||||
await asyncio.gather(
|
||||
pool.open(),
|
||||
mysql.connect(),
|
||||
)
|
||||
# 获取一个风险类型和对应的提示词
|
||||
risk_types = await get_risk_type_prompt()
|
||||
print("共有风险类型:", len(risk_types))
|
||||
|
||||
dbscan_result = await run_dbscan()
|
||||
docs = [cluster[0] for cluster in dbscan_result.clusters]
|
||||
analyze_rusult = await batch_risk_analyze([doc.title for doc in docs])
|
||||
for result, doc in zip(analyze_rusult, docs):
|
||||
print(f"风险: {result} 标题: {doc.title}")
|
||||
print("共有待分析文档:", len(docs), "噪声", len(dbscan_result.noise))
|
||||
|
||||
risks_to_update: dict[str, set[str]] = {}
|
||||
analyze_result = await batch_risk_analyze(docs, risk_types)
|
||||
for task in analyze_result:
|
||||
if "是" not in task.response:
|
||||
continue
|
||||
print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}")
|
||||
|
||||
# 合并每个文档的风险到一个set
|
||||
if task.doc.id not in risks_to_update:
|
||||
risks_to_update[task.doc.id] = set()
|
||||
risks_to_update[task.doc.id].add(task.risk_type.name)
|
||||
|
||||
# 更新数据库
|
||||
for doc_id, risks in risks_to_update.items():
|
||||
await mysql.execute(
|
||||
"""
|
||||
UPDATE risk_news
|
||||
SET risk_types = :risk_types, updated_at = now()
|
||||
WHERE es_id = :es_id
|
||||
""",
|
||||
{
|
||||
"es_id": doc_id,
|
||||
"risk_types": json.dumps(list(risks), ensure_ascii=False),
|
||||
},
|
||||
)
|
||||
|
||||
await asyncio.gather(
|
||||
pool.close(),
|
||||
mysql.disconnect(),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RiskType:
|
||||
name: str
|
||||
prompt: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
doc: Document
|
||||
risk_type: RiskType
|
||||
response: str = ""
|
||||
|
||||
|
||||
async def get_risk_type_prompt() -> list[RiskType]:
|
||||
"""从数据库中获取风险类型和对应的提示词"""
|
||||
rows = await mysql.fetch_all(
|
||||
"""
|
||||
SELECT rp.content, rt.name
|
||||
FROM risk_prompt rp
|
||||
JOIN risk_type rt ON rp.risk_type_id = rt.id
|
||||
ORDER BY rp.id DESC
|
||||
"""
|
||||
)
|
||||
|
||||
return [RiskType(prompt=row[0], name=row[1]) for row in rows]
|
||||
|
||||
|
||||
async def batch_risk_analyze(
|
||||
texts: list, model: str = "gpt-4o-mini", threads: int = 10
|
||||
) -> list:
|
||||
tasks = [{"input": text} for text in texts]
|
||||
docs: list[Document],
|
||||
risk_types: list[RiskType],
|
||||
model: str = "gpt-4o-mini",
|
||||
threads: int = 10,
|
||||
) -> list[Task]:
|
||||
"""文本风险分析(并行批批处理)"""
|
||||
|
||||
# 从 docs, risk_types 两个列表交叉生成任务列表
|
||||
tasks: list[Task] = [
|
||||
Task(doc=doc, risk_type=rt) for doc in docs for rt in risk_types
|
||||
]
|
||||
bar = tqdm.tqdm(total=len(tasks))
|
||||
queue = asyncio.Queue()
|
||||
|
||||
@@ -34,7 +115,7 @@ async def batch_risk_analyze(
|
||||
task = await queue.get()
|
||||
if task is None:
|
||||
break
|
||||
task["response"] = await risk_analyze(task["input"], model)
|
||||
task.response = await risk_analyze(task, model)
|
||||
queue.task_done()
|
||||
bar.update(1)
|
||||
|
||||
@@ -50,19 +131,16 @@ async def batch_risk_analyze(
|
||||
await queue.put(None)
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
return [task["response"] for task in tasks]
|
||||
return tasks
|
||||
|
||||
|
||||
async def risk_analyze(text: str, model: str) -> str:
|
||||
async def risk_analyze(task: Task, model: str) -> str:
|
||||
"""对一条文本进行风险分析"""
|
||||
llm = openai.AsyncOpenAI(
|
||||
api_key=OPENAI_RISK_LLM_API_KEY, base_url=OPENAI_RISK_LLM_BASE_URL
|
||||
)
|
||||
system_message = (
|
||||
"你是一个新闻风险分析器,你要判断以下文本是否有风险,你只要回答是或者否。"
|
||||
)
|
||||
|
||||
hash = hashlib.md5(
|
||||
model.encode() + b"|" + text.encode() + b"|" + system_message.encode()
|
||||
f"{model}|{task.doc.get_text_for_llm()}|{task.risk_type.prompt}".encode()
|
||||
).hexdigest()
|
||||
|
||||
# 查询缓存
|
||||
@@ -75,14 +153,15 @@ async def risk_analyze(text: str, model: str) -> str:
|
||||
return row[0]
|
||||
|
||||
messages: Iterable[ChatCompletionMessageParam] = [
|
||||
{"role": "system", "content": system_message},
|
||||
{"role": "user", "content": text},
|
||||
{"role": "system", "content": task.risk_type.prompt},
|
||||
{"role": "user", "content": task.doc.get_text_for_llm()},
|
||||
]
|
||||
resp = await llm.chat.completions.create(
|
||||
messages=messages,
|
||||
model=model,
|
||||
temperature=0,
|
||||
stop="\n",
|
||||
max_tokens=10,
|
||||
)
|
||||
|
||||
completions = resp.choices[0].message.content or ""
|
||||
|
||||
@@ -8,20 +8,34 @@ from sklearn.metrics import pairwise_distances
|
||||
|
||||
from cucyuqing.pg import pool, get_cur
|
||||
|
||||
|
||||
@dataclass
|
||||
class Document:
|
||||
id: int
|
||||
id: str
|
||||
"""ID 是 ES 中的 32 为 hex ID"""
|
||||
|
||||
title: str
|
||||
content: str
|
||||
similarity: float = 0.0
|
||||
|
||||
def get_text_for_llm(self) -> str:
|
||||
"""只使用标题进行风险分析
|
||||
|
||||
对于空标题,在入库时已经处理过。
|
||||
如果入库时标题为空,则使用content的前20个字符或第一句中文作为标题。
|
||||
"""
|
||||
return self.title
|
||||
|
||||
|
||||
@dataclass
|
||||
class DBScanResult:
|
||||
noise: list[Document]
|
||||
clusters: list[list[Document]]
|
||||
|
||||
|
||||
from sklearn.metrics.pairwise import cosine_similarity
|
||||
|
||||
|
||||
async def run_dbscan() -> DBScanResult:
|
||||
# 从 PG 数据库获取数据
|
||||
async with get_cur() as cur:
|
||||
@@ -37,16 +51,17 @@ async def run_dbscan() -> DBScanResult:
|
||||
)
|
||||
rows = await cur.fetchall()
|
||||
docs: list[Document] = [
|
||||
Document(row[0], row[1], row[2])
|
||||
for row in rows
|
||||
Document(str(row[0]).replace("-", ""), row[1], row[2]) for row in rows
|
||||
]
|
||||
embeddings = [numpy.array(json.loads(row[3])) for row in rows]
|
||||
|
||||
# 计算余弦距离矩阵
|
||||
cosine_distances = pairwise_distances(embeddings, metric='cosine')
|
||||
cosine_distances = pairwise_distances(embeddings, metric="cosine")
|
||||
|
||||
# 初始化DBSCAN模型
|
||||
dbscan = DBSCAN(eps=0.25, min_samples=2, metric='precomputed') # Adjust eps as needed
|
||||
dbscan = DBSCAN(
|
||||
eps=0.25, min_samples=2, metric="precomputed"
|
||||
) # Adjust eps as needed
|
||||
|
||||
# 进行聚类
|
||||
dbscan.fit(cosine_distances)
|
||||
@@ -58,9 +73,9 @@ async def run_dbscan() -> DBScanResult:
|
||||
ret: DBScanResult = DBScanResult(noise=[], clusters=[])
|
||||
unique_labels = set(labels)
|
||||
for label in unique_labels:
|
||||
class_member_mask = (labels == label)
|
||||
class_member_mask = labels == label
|
||||
cluster_docs = [docs[i] for i in range(len(labels)) if class_member_mask[i]] # type: ignore
|
||||
cluster_embeddings = [embeddings[i] for i in range(len(labels)) if class_member_mask[i]] # type: ignore
|
||||
cluster_embeddings = [embeddings[i] for i in range(len(labels)) if class_member_mask[i]] # type: ignore
|
||||
|
||||
if label == -1:
|
||||
# -1 is the label for noise points
|
||||
@@ -78,17 +93,19 @@ async def run_dbscan() -> DBScanResult:
|
||||
doc.similarity = similarities[i]
|
||||
sorted_cluster_docs.append(doc)
|
||||
ret.clusters.append(sorted_cluster_docs)
|
||||
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
async def main():
|
||||
await pool.open()
|
||||
result = await run_dbscan()
|
||||
print(f"噪声文档: {len(result.noise)}")
|
||||
for i, cluster in enumerate(result.clusters):
|
||||
print('----------------')
|
||||
print("----------------")
|
||||
for doc in cluster:
|
||||
print(f"聚类 {i} 文档: {doc.title} 相似度: {doc.similarity}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
Reference in New Issue
Block a user