Compare commits
7 Commits
cae3877048
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
2d29d8631c
|
|||
|
3dc47712e4
|
|||
|
9dc23b714c
|
|||
|
1545a85b09
|
|||
|
115d95bbef
|
|||
|
34ad16ff02
|
|||
|
8a6db8f8f2
|
@@ -34,6 +34,12 @@
|
|||||||
|
|
||||||
对于旧数据:有风险分类信息,但在本轮聚类中没有被选为聚类代表的新闻,**不会** 被更新风险分类信息。
|
对于旧数据:有风险分类信息,但在本轮聚类中没有被选为聚类代表的新闻,**不会** 被更新风险分类信息。
|
||||||
|
|
||||||
|
## 关于数据聚类算法的说明
|
||||||
|
|
||||||
|
文本向量是维度为 1024 的 float16 一维数组。向量之间使用 cosine 距离计算相似度。
|
||||||
|
|
||||||
|
由于聚类的目的是去重,因此 DBSCAN 是比较合适的算法。目前指定使用参数 EPS=0.25 最小聚类数量 2。基本上有 2 条重复的或者语义相似的新闻都可以识别到同一个聚类中。
|
||||||
|
|
||||||
## 重复数据说明
|
## 重复数据说明
|
||||||
|
|
||||||
由于新闻洗稿、转载、抄袭等原因,可能会出现同一篇新闻在多个平台发布的情况。牛媒数据中台把他们当作不同的新闻对待(拥有不同的 ID)。聚类算法可以从语义信息层面识别到这些重复新闻(包括完全重复和语义相似),并把他们归为一类。
|
由于新闻洗稿、转载、抄袭等原因,可能会出现同一篇新闻在多个平台发布的情况。牛媒数据中台把他们当作不同的新闻对待(拥有不同的 ID)。聚类算法可以从语义信息层面识别到这些重复新闻(包括完全重复和语义相似),并把他们归为一类。
|
||||||
|
|||||||
@@ -35,6 +35,17 @@ def parse_unixtime(unixtime: int) -> datetime.datetime:
|
|||||||
return datetime.datetime.fromtimestamp(unixtime)
|
return datetime.datetime.fromtimestamp(unixtime)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_filter_query() -> str:
|
||||||
|
row = await mysql.fetch_one(
|
||||||
|
"""
|
||||||
|
select name from risk_news_keywords order by id limit 1
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
if not row:
|
||||||
|
raise Exception("未找到风险关键词")
|
||||||
|
return row[0]
|
||||||
|
|
||||||
|
|
||||||
async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
||||||
"""
|
"""
|
||||||
获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size,说明还有数据,继续请求
|
获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size,说明还有数据,继续请求
|
||||||
@@ -45,7 +56,7 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
|||||||
es_response = await post(
|
es_response = await post(
|
||||||
url,
|
url,
|
||||||
{
|
{
|
||||||
"word": "(教师|老师|教授|导师|院长) - (教育部|公告|通报|准则|建设|座谈|细则|工作|动员|专题) + (不正当|性骚扰|出轨|猥亵|不公|强迫|侮辱|举报|滥用|违法|师德|贿|造假|不端|抄袭|虚假|篡改|挪用|抑郁|威胁|霸凌|体罚)",
|
"word": await get_filter_query(),
|
||||||
"size": size,
|
"size": size,
|
||||||
"orders": 9,
|
"orders": 9,
|
||||||
"tmode": 2,
|
"tmode": 2,
|
||||||
@@ -66,8 +77,8 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
|||||||
f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}'
|
f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}'
|
||||||
)
|
)
|
||||||
for d in docs:
|
for d in docs:
|
||||||
d['title'] = d['title'].replace('\x00', '')
|
d["title"] = d["title"].replace("\x00", "")
|
||||||
d['content'] = d['content'].replace('\x00', '')
|
d["content"] = d["content"].replace("\x00", "")
|
||||||
yield d
|
yield d
|
||||||
# 如果当前时间度的数据量 = size 说明还有数据,继续请求
|
# 如果当前时间度的数据量 = size 说明还有数据,继续请求
|
||||||
# 这里使用递归
|
# 这里使用递归
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ async def main():
|
|||||||
await asyncio.sleep(60 * 30)
|
await asyncio.sleep(60 * 30)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
await asyncio.sleep(60*60)
|
await asyncio.sleep(60 * 60)
|
||||||
|
|
||||||
|
|
||||||
async def do_analyze():
|
async def do_analyze():
|
||||||
@@ -40,6 +40,8 @@ async def do_analyze():
|
|||||||
analyze_result = await batch_risk_analyze(docs, risk_types)
|
analyze_result = await batch_risk_analyze(docs, risk_types)
|
||||||
for task in analyze_result:
|
for task in analyze_result:
|
||||||
if "是" not in task.response:
|
if "是" not in task.response:
|
||||||
|
if risks_to_update.get(task.doc.id) is None:
|
||||||
|
risks_to_update[task.doc.id] = set()
|
||||||
continue
|
continue
|
||||||
print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}")
|
print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}")
|
||||||
|
|
||||||
@@ -58,15 +60,12 @@ async def do_analyze():
|
|||||||
""",
|
""",
|
||||||
{
|
{
|
||||||
"es_id": doc_id,
|
"es_id": doc_id,
|
||||||
"risk_types": json.dumps(list(risks), ensure_ascii=False),
|
"risk_types": (
|
||||||
|
json.dumps(list(risks), ensure_ascii=False) if risks else None
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
pool.close(),
|
|
||||||
mysql.disconnect(),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RiskType:
|
class RiskType:
|
||||||
@@ -118,6 +117,8 @@ async def batch_risk_analyze(
|
|||||||
task.response = await risk_analyze(task, model)
|
task.response = await risk_analyze(task, model)
|
||||||
queue.task_done()
|
queue.task_done()
|
||||||
bar.update(1)
|
bar.update(1)
|
||||||
|
if bar.n % 100 == 0:
|
||||||
|
print(f"已完成 {bar.n} 条风险分析")
|
||||||
|
|
||||||
async def producer():
|
async def producer():
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
@@ -131,6 +132,8 @@ async def batch_risk_analyze(
|
|||||||
await queue.put(None)
|
await queue.put(None)
|
||||||
await asyncio.gather(*workers)
|
await asyncio.gather(*workers)
|
||||||
|
|
||||||
|
print("风险分析完成")
|
||||||
|
|
||||||
return tasks
|
return tasks
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -44,9 +44,9 @@ async def run_dbscan() -> DBScanResult:
|
|||||||
SELECT id, title, content, embedding
|
SELECT id, title, content, embedding
|
||||||
FROM risk_news
|
FROM risk_news
|
||||||
WHERE NOT embedding_updated_at IS NULL
|
WHERE NOT embedding_updated_at IS NULL
|
||||||
AND time > now() - interval '14 day'
|
AND time > now() - interval '7 day'
|
||||||
ORDER BY time desc
|
ORDER BY time desc
|
||||||
LIMIT 10000
|
LIMIT 100000
|
||||||
;"""
|
;"""
|
||||||
)
|
)
|
||||||
rows = await cur.fetchall()
|
rows = await cur.fetchall()
|
||||||
|
|||||||
48
requirements_version.txt
Normal file
48
requirements_version.txt
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
aiohappyeyeballs==2.4.3
|
||||||
|
aiohttp==3.10.10
|
||||||
|
aiomysql==0.2.0
|
||||||
|
aiosignal==1.3.1
|
||||||
|
annotated-types==0.7.0
|
||||||
|
anyio==4.6.2.post1
|
||||||
|
attrs==24.2.0
|
||||||
|
certifi==2024.8.30
|
||||||
|
charset-normalizer==3.4.0
|
||||||
|
databases==0.9.0
|
||||||
|
distro==1.9.0
|
||||||
|
fastapi==0.115.2
|
||||||
|
filelock==3.16.1
|
||||||
|
frozenlist==1.4.1
|
||||||
|
fsspec==2024.9.0
|
||||||
|
greenlet==3.1.1
|
||||||
|
h11==0.14.0
|
||||||
|
httpcore==1.0.6
|
||||||
|
httpx==0.27.2
|
||||||
|
huggingface-hub==0.26.0
|
||||||
|
idna==3.10
|
||||||
|
jiter==0.6.1
|
||||||
|
joblib==1.4.2
|
||||||
|
multidict==6.1.0
|
||||||
|
numpy==2.1.2
|
||||||
|
openai==1.52.0
|
||||||
|
packaging==24.1
|
||||||
|
propcache==0.2.0
|
||||||
|
psycopg==3.2.3
|
||||||
|
psycopg-binary==3.2.3
|
||||||
|
psycopg-pool==3.2.3
|
||||||
|
pydantic==2.9.2
|
||||||
|
pydantic_core==2.23.4
|
||||||
|
PyMySQL==1.1.1
|
||||||
|
python-dotenv==1.0.1
|
||||||
|
PyYAML==6.0.2
|
||||||
|
requests==2.32.3
|
||||||
|
scikit-learn==1.5.2
|
||||||
|
scipy==1.14.1
|
||||||
|
sniffio==1.3.1
|
||||||
|
SQLAlchemy==2.0.36
|
||||||
|
starlette==0.40.0
|
||||||
|
threadpoolctl==3.5.0
|
||||||
|
tokenizers==0.20.1
|
||||||
|
tqdm==4.66.5
|
||||||
|
typing_extensions==4.12.2
|
||||||
|
urllib3==2.2.3
|
||||||
|
yarl==1.15.4
|
||||||
Reference in New Issue
Block a user