commit 5794c8d5c559a873195439d28606bb534b3882c7 Author: heimoshuiyu Date: Wed Sep 11 09:32:12 2024 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2c9293 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +/venv +/.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..d2bc2be --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# 中传三期大模型舆情监测项目 diff --git a/cucyuqing/__init__.py b/cucyuqing/__init__.py new file mode 100644 index 0000000..910c049 --- /dev/null +++ b/cucyuqing/__init__.py @@ -0,0 +1 @@ +# CUC 舆情检测项目包目录 diff --git a/cucyuqing/cmd/es-sync.py b/cucyuqing/cmd/es-sync.py new file mode 100644 index 0000000..b172888 --- /dev/null +++ b/cucyuqing/cmd/es-sync.py @@ -0,0 +1,143 @@ +""" +ES 数据同步脚本 +""" + +import asyncio +import time +import json +import aiohttp +import datetime +import pydantic +from typing import AsyncIterable +from cucyuqing.config import ES_API +from cucyuqing.pg import get_cur, pool + + +async def post(url: str, json: dict) -> dict: + async with aiohttp.ClientSession() as session: + async with session.post(url, json=json) as resp: + return await resp.json() + + +class ESInterval(pydantic.BaseModel): + start_time: datetime.datetime + end_time: datetime.datetime + + +def format_datetime(dt: datetime.datetime) -> str: + return dt.strftime("%Y%m%d%H") + + +def parse_unixtime(unixtime: int) -> datetime.datetime: + return datetime.datetime.fromtimestamp(unixtime) + + +async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]: + """ + 获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size,说明还有数据,继续请求 + """ + url = f"{ES_API}/api/dc/search" + print(f"同步时间段 {interval.start_time} 到 {interval.end_time}") + begin = time.time() + es_response = await post( + url, + { + "word": "(教师|老师|教授|导师|院长) - (教育部|公告|通报|准则|建设|座谈|细则|工作|动员|专题) + (不正当|性骚扰|出轨|猥亵|不公|强迫|侮辱|举报|滥用|违法|师德|贿|造假|不端|抄袭|虚假|篡改|挪用|抑郁|威胁|霸凌|体罚)", + "size": size, + "orders": 9, + "tmode": 2, + "cstart": format_datetime(interval.start_time), + "cend": format_datetime(interval.end_time), + }, + ) + duration = time.time() - begin + if not es_response.get("data"): + print("[warning]: 未获取到数据", es_response) + return + + docs = es_response["data"]["docs"] + print( + f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}' + ) + for d in docs: + yield d + # 如果当前时间度的数据量 = size 说明还有数据,继续请求 + # 这里使用递归 + if len(docs) == size: + print("继续请求下一页数据") + async for doc in fetch(ESInterval( + start_time=parse_unixtime(docs[-1]["crawled_at"]), + end_time=interval.end_time, + ), size): + yield doc + +async def batch_save_to_pg(docs: list[dict]): + async with get_cur() as cur: + await cur.executemany( + """ + INSERT INTO risk_news (id, info) + VALUES (%s, %s) + ON CONFLICT (id) DO NOTHING + """, + [(doc['id'], json.dumps(doc)) for doc in docs] + ) + print(f"保存 {len(docs)} 条数据到 PG 成功") + +async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int): + async with get_cur() as cur: + await cur.execute( + """ + INSERT INTO es_collect_logs (start_time, end_time, documents_count) + VALUES (%s, %s, %s) + """, + (start_time, end_time, documents_count) + ) + print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据") + +async def sync(): + await pool.open() + while True: + # 获取待同步时间段 + async with get_cur() as cur: + await cur.execute( + """ + WITH RECURSIVE time_slots AS ( + SELECT date_trunc('hour', now() - interval '1 hour') - interval '1 day' AS start_time + UNION ALL + SELECT start_time + INTERVAL '1 hour' + FROM time_slots + WHERE start_time < date_trunc('hour', now() - interval '1 hour') + ) + SELECT + ts.start_time, + ts.start_time + interval '1 hour' as end_time + FROM time_slots ts + LEFT JOIN es_collect_logs ecl + ON ts.start_time = ecl.start_time + WHERE ecl.documents_count IS NULL + ORDER BY ts.start_time desc + LIMIT 10; + """ + ) + es_intervals = [ + ESInterval(start_time=row[0], end_time=row[1]) async for row in cur + ] + + print( + f"开始同步 ES 数据,总共 {len(es_intervals)} 个时间段,从 {es_intervals[0].start_time} 到 {es_intervals[-1].end_time}" + ) + for interval in es_intervals: + docs = [doc async for doc in fetch(interval)] + await batch_save_to_pg(docs) + await save_es_sync_log(interval.start_time, interval.end_time, len(docs)) + + +async def main(): + while True: + await sync() + print("同步完成,等待下一轮同步") + await asyncio.sleep(60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cucyuqing/config.py b/cucyuqing/config.py new file mode 100644 index 0000000..94e5f36 --- /dev/null +++ b/cucyuqing/config.py @@ -0,0 +1,23 @@ +""" +全局共享只读配置信息 +""" + +import dotenv +import os +dotenv.load_dotenv() + +def get_env_with_default(key: str, default: str): + value = os.getenv(key) + if not value: + print(f"警告:环境变量 {key} 未设置,使用默认值 {default}") + return default + return value + +def must_get_env(key: str): + value = os.getenv(key) + if not value: + raise ValueError(f"环墋变量 {key} 未设置") + return value + +ES_API = get_env_with_default("ES_API", "http://192.168.1.45:1444") +PG_DSN = must_get_env("PG_DSN") diff --git a/cucyuqing/es.py b/cucyuqing/es.py new file mode 100644 index 0000000..e69de29 diff --git a/cucyuqing/pg.py b/cucyuqing/pg.py new file mode 100644 index 0000000..0d419c8 --- /dev/null +++ b/cucyuqing/pg.py @@ -0,0 +1,39 @@ +""" +PostgreSQL 数据库连接池 +""" + +from cucyuqing.config import PG_DSN +from psycopg_pool import AsyncConnectionPool +from psycopg import AsyncClientCursor, AsyncCursor, version +from typing import AsyncGenerator +from contextlib import asynccontextmanager + +pool = AsyncConnectionPool( + conninfo=PG_DSN, + min_size=1, + max_size=5, + open=False, # improtant + check=AsyncConnectionPool.check_connection, +) + + +async def _get_cur(auto_commit: bool = True) -> AsyncGenerator[AsyncCursor, None]: + """ + asynccontenxt 实现,用于 FastAPI 的依赖注入 + """ + async with pool.connection() as conn: + async with conn.cursor() as cur: + yield cur + if auto_commit: + await conn.commit() + + +@asynccontextmanager +async def get_cur(auto_commit: bool = True) -> AsyncGenerator[AsyncCursor, None]: + """ + 用法: + async with get_cur() as cur: + await cur.execute("SELECT 1") + """ + async for cur in _get_cur(auto_commit): + yield cur diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4714423 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +python-dotenv +psycopg[binary] +psycopg[pool] +aiohttp +fastapi +pydantic +aiomysql