Compare commits

...

5 Commits

3 changed files with 30 additions and 10 deletions

View File

@@ -34,6 +34,12 @@
对于旧数据:有风险分类信息,但在本轮聚类中没有被选为聚类代表的新闻,**不会** 被更新风险分类信息。 对于旧数据:有风险分类信息,但在本轮聚类中没有被选为聚类代表的新闻,**不会** 被更新风险分类信息。
## 关于数据聚类算法的说明
文本向量是维度为 1024 的 float16 一维数组。向量之间使用 cosine 距离计算相似度。
由于聚类的目的是去重,因此 DBSCAN 是比较合适的算法。目前指定使用参数 EPS=0.25 最小聚类数量 2。基本上有 2 条重复的或者语义相似的新闻都可以识别到同一个聚类中。
## 重复数据说明 ## 重复数据说明
由于新闻洗稿、转载、抄袭等原因,可能会出现同一篇新闻在多个平台发布的情况。牛媒数据中台把他们当作不同的新闻对待(拥有不同的 ID。聚类算法可以从语义信息层面识别到这些重复新闻包括完全重复和语义相似并把他们归为一类。 由于新闻洗稿、转载、抄袭等原因,可能会出现同一篇新闻在多个平台发布的情况。牛媒数据中台把他们当作不同的新闻对待(拥有不同的 ID。聚类算法可以从语义信息层面识别到这些重复新闻包括完全重复和语义相似并把他们归为一类。

View File

@@ -35,6 +35,17 @@ def parse_unixtime(unixtime: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(unixtime) return datetime.datetime.fromtimestamp(unixtime)
async def get_filter_query() -> str:
row = await mysql.fetch_one(
"""
select name from risk_news_keywords order by id limit 1
"""
)
if not row:
raise Exception("未找到风险关键词")
return row[0]
async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]: async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
""" """
获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size说明还有数据继续请求 获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size说明还有数据继续请求
@@ -45,7 +56,7 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
es_response = await post( es_response = await post(
url, url,
{ {
"word": "(教师|老师|教授|导师|院长) - (教育部|公告|通报|准则|建设|座谈|细则|工作|动员|专题) + (不正当|性骚扰|出轨|猥亵|不公|强迫|侮辱|举报|滥用|违法|师德|贿|造假|不端|抄袭|虚假|篡改|挪用|抑郁|威胁|霸凌|体罚)", "word": await get_filter_query(),
"size": size, "size": size,
"orders": 9, "orders": 9,
"tmode": 2, "tmode": 2,
@@ -66,8 +77,8 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}' f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}'
) )
for d in docs: for d in docs:
d['title'] = d['title'].replace('\x00', '') d["title"] = d["title"].replace("\x00", "")
d['content'] = d['content'].replace('\x00', '') d["content"] = d["content"].replace("\x00", "")
yield d yield d
# 如果当前时间度的数据量 = size 说明还有数据,继续请求 # 如果当前时间度的数据量 = size 说明还有数据,继续请求
# 这里使用递归 # 这里使用递归

View File

@@ -20,7 +20,7 @@ async def main():
await asyncio.sleep(60 * 30) await asyncio.sleep(60 * 30)
except Exception as e: except Exception as e:
print(e) print(e)
await asyncio.sleep(60*60) await asyncio.sleep(60 * 60)
async def do_analyze(): async def do_analyze():
@@ -40,6 +40,8 @@ async def do_analyze():
analyze_result = await batch_risk_analyze(docs, risk_types) analyze_result = await batch_risk_analyze(docs, risk_types)
for task in analyze_result: for task in analyze_result:
if "" not in task.response: if "" not in task.response:
if risks_to_update.get(task.doc.id) is None:
risks_to_update[task.doc.id] = set()
continue continue
print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}") print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}")
@@ -58,15 +60,12 @@ async def do_analyze():
""", """,
{ {
"es_id": doc_id, "es_id": doc_id,
"risk_types": json.dumps(list(risks), ensure_ascii=False), "risk_types": (
json.dumps(list(risks), ensure_ascii=False) if risks else None
),
}, },
) )
await asyncio.gather(
pool.close(),
mysql.disconnect(),
)
@dataclass @dataclass
class RiskType: class RiskType:
@@ -118,6 +117,8 @@ async def batch_risk_analyze(
task.response = await risk_analyze(task, model) task.response = await risk_analyze(task, model)
queue.task_done() queue.task_done()
bar.update(1) bar.update(1)
if bar.n % 100 == 0:
print(f"已完成 {bar.n} 条风险分析")
async def producer(): async def producer():
for task in tasks: for task in tasks:
@@ -131,6 +132,8 @@ async def batch_risk_analyze(
await queue.put(None) await queue.put(None)
await asyncio.gather(*workers) await asyncio.gather(*workers)
print("风险分析完成")
return tasks return tasks