Compare commits
3 Commits
1545a85b09
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
2d29d8631c
|
|||
|
3dc47712e4
|
|||
|
9dc23b714c
|
@@ -35,6 +35,17 @@ def parse_unixtime(unixtime: int) -> datetime.datetime:
|
||||
return datetime.datetime.fromtimestamp(unixtime)
|
||||
|
||||
|
||||
async def get_filter_query() -> str:
|
||||
row = await mysql.fetch_one(
|
||||
"""
|
||||
select name from risk_news_keywords order by id limit 1
|
||||
"""
|
||||
)
|
||||
if not row:
|
||||
raise Exception("未找到风险关键词")
|
||||
return row[0]
|
||||
|
||||
|
||||
async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
||||
"""
|
||||
获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size,说明还有数据,继续请求
|
||||
@@ -45,7 +56,7 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
||||
es_response = await post(
|
||||
url,
|
||||
{
|
||||
"word": "(教师|老师|教授|导师|院长) - (教育部|公告|通报|准则|建设|座谈|细则|工作|动员|专题) + (不正当|性骚扰|出轨|猥亵|不公|强迫|侮辱|举报|滥用|违法|师德|贿|造假|不端|抄袭|虚假|篡改|挪用|抑郁|威胁|霸凌|体罚)",
|
||||
"word": await get_filter_query(),
|
||||
"size": size,
|
||||
"orders": 9,
|
||||
"tmode": 2,
|
||||
@@ -66,8 +77,8 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
||||
f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}'
|
||||
)
|
||||
for d in docs:
|
||||
d['title'] = d['title'].replace('\x00', '')
|
||||
d['content'] = d['content'].replace('\x00', '')
|
||||
d["title"] = d["title"].replace("\x00", "")
|
||||
d["content"] = d["content"].replace("\x00", "")
|
||||
yield d
|
||||
# 如果当前时间度的数据量 = size 说明还有数据,继续请求
|
||||
# 这里使用递归
|
||||
|
||||
@@ -20,7 +20,7 @@ async def main():
|
||||
await asyncio.sleep(60 * 30)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
await asyncio.sleep(60*60)
|
||||
await asyncio.sleep(60 * 60)
|
||||
|
||||
|
||||
async def do_analyze():
|
||||
@@ -40,6 +40,8 @@ async def do_analyze():
|
||||
analyze_result = await batch_risk_analyze(docs, risk_types)
|
||||
for task in analyze_result:
|
||||
if "是" not in task.response:
|
||||
if risks_to_update.get(task.doc.id) is None:
|
||||
risks_to_update[task.doc.id] = set()
|
||||
continue
|
||||
print(f"风险: {task.risk_type.name} 标题: {task.doc.title} {task.doc.id}")
|
||||
|
||||
@@ -58,15 +60,12 @@ async def do_analyze():
|
||||
""",
|
||||
{
|
||||
"es_id": doc_id,
|
||||
"risk_types": json.dumps(list(risks), ensure_ascii=False),
|
||||
"risk_types": (
|
||||
json.dumps(list(risks), ensure_ascii=False) if risks else None
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
await asyncio.gather(
|
||||
pool.close(),
|
||||
mysql.disconnect(),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RiskType:
|
||||
|
||||
Reference in New Issue
Block a user