first commit
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
__pycache__
|
||||
/venv
|
||||
/.env
|
||||
1
cucyuqing/__init__.py
Normal file
1
cucyuqing/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# CUC 舆情检测项目包目录
|
||||
143
cucyuqing/cmd/es-sync.py
Normal file
143
cucyuqing/cmd/es-sync.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""
|
||||
ES 数据同步脚本
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import json
|
||||
import aiohttp
|
||||
import datetime
|
||||
import pydantic
|
||||
from typing import AsyncIterable
|
||||
from cucyuqing.config import ES_API
|
||||
from cucyuqing.pg import get_cur, pool
|
||||
|
||||
|
||||
async def post(url: str, json: dict) -> dict:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, json=json) as resp:
|
||||
return await resp.json()
|
||||
|
||||
|
||||
class ESInterval(pydantic.BaseModel):
|
||||
start_time: datetime.datetime
|
||||
end_time: datetime.datetime
|
||||
|
||||
|
||||
def format_datetime(dt: datetime.datetime) -> str:
|
||||
return dt.strftime("%Y%m%d%H")
|
||||
|
||||
|
||||
def parse_unixtime(unixtime: int) -> datetime.datetime:
|
||||
return datetime.datetime.fromtimestamp(unixtime)
|
||||
|
||||
|
||||
async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
|
||||
"""
|
||||
获取指定时间段内的数据,每次请求 size 条数据。这是一个递归函数,如果当前时间段内的数据量 = size,说明还有数据,继续请求
|
||||
"""
|
||||
url = f"{ES_API}/api/dc/search"
|
||||
print(f"同步时间段 {interval.start_time} 到 {interval.end_time}")
|
||||
begin = time.time()
|
||||
es_response = await post(
|
||||
url,
|
||||
{
|
||||
"word": "(教师|老师|教授|导师|院长) - (教育部|公告|通报|准则|建设|座谈|细则|工作|动员|专题) + (不正当|性骚扰|出轨|猥亵|不公|强迫|侮辱|举报|滥用|违法|师德|贿|造假|不端|抄袭|虚假|篡改|挪用|抑郁|威胁|霸凌|体罚)",
|
||||
"size": size,
|
||||
"orders": 9,
|
||||
"tmode": 2,
|
||||
"cstart": format_datetime(interval.start_time),
|
||||
"cend": format_datetime(interval.end_time),
|
||||
},
|
||||
)
|
||||
duration = time.time() - begin
|
||||
if not es_response.get("data"):
|
||||
print("[warning]: 未获取到数据", es_response)
|
||||
return
|
||||
|
||||
docs = es_response["data"]["docs"]
|
||||
print(
|
||||
f'用时 {int(duration)} 秒,获取到 {len(docs)} 条数据,最早时间 {parse_unixtime(docs[0]["crawled_at"])},最晚时间 {parse_unixtime(docs[-1]["crawled_at"])}'
|
||||
)
|
||||
for d in docs:
|
||||
yield d
|
||||
# 如果当前时间度的数据量 = size 说明还有数据,继续请求
|
||||
# 这里使用递归
|
||||
if len(docs) == size:
|
||||
print("继续请求下一页数据")
|
||||
async for doc in fetch(ESInterval(
|
||||
start_time=parse_unixtime(docs[-1]["crawled_at"]),
|
||||
end_time=interval.end_time,
|
||||
), size):
|
||||
yield doc
|
||||
|
||||
async def batch_save_to_pg(docs: list[dict]):
|
||||
async with get_cur() as cur:
|
||||
await cur.executemany(
|
||||
"""
|
||||
INSERT INTO risk_news (id, info)
|
||||
VALUES (%s, %s)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
""",
|
||||
[(doc['id'], json.dumps(doc)) for doc in docs]
|
||||
)
|
||||
print(f"保存 {len(docs)} 条数据到 PG 成功")
|
||||
|
||||
async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int):
|
||||
async with get_cur() as cur:
|
||||
await cur.execute(
|
||||
"""
|
||||
INSERT INTO es_collect_logs (start_time, end_time, documents_count)
|
||||
VALUES (%s, %s, %s)
|
||||
""",
|
||||
(start_time, end_time, documents_count)
|
||||
)
|
||||
print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据")
|
||||
|
||||
async def sync():
|
||||
await pool.open()
|
||||
while True:
|
||||
# 获取待同步时间段
|
||||
async with get_cur() as cur:
|
||||
await cur.execute(
|
||||
"""
|
||||
WITH RECURSIVE time_slots AS (
|
||||
SELECT date_trunc('hour', now() - interval '1 hour') - interval '1 day' AS start_time
|
||||
UNION ALL
|
||||
SELECT start_time + INTERVAL '1 hour'
|
||||
FROM time_slots
|
||||
WHERE start_time < date_trunc('hour', now() - interval '1 hour')
|
||||
)
|
||||
SELECT
|
||||
ts.start_time,
|
||||
ts.start_time + interval '1 hour' as end_time
|
||||
FROM time_slots ts
|
||||
LEFT JOIN es_collect_logs ecl
|
||||
ON ts.start_time = ecl.start_time
|
||||
WHERE ecl.documents_count IS NULL
|
||||
ORDER BY ts.start_time desc
|
||||
LIMIT 10;
|
||||
"""
|
||||
)
|
||||
es_intervals = [
|
||||
ESInterval(start_time=row[0], end_time=row[1]) async for row in cur
|
||||
]
|
||||
|
||||
print(
|
||||
f"开始同步 ES 数据,总共 {len(es_intervals)} 个时间段,从 {es_intervals[0].start_time} 到 {es_intervals[-1].end_time}"
|
||||
)
|
||||
for interval in es_intervals:
|
||||
docs = [doc async for doc in fetch(interval)]
|
||||
await batch_save_to_pg(docs)
|
||||
await save_es_sync_log(interval.start_time, interval.end_time, len(docs))
|
||||
|
||||
|
||||
async def main():
|
||||
while True:
|
||||
await sync()
|
||||
print("同步完成,等待下一轮同步")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
23
cucyuqing/config.py
Normal file
23
cucyuqing/config.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
全局共享只读配置信息
|
||||
"""
|
||||
|
||||
import dotenv
|
||||
import os
|
||||
dotenv.load_dotenv()
|
||||
|
||||
def get_env_with_default(key: str, default: str):
|
||||
value = os.getenv(key)
|
||||
if not value:
|
||||
print(f"警告:环境变量 {key} 未设置,使用默认值 {default}")
|
||||
return default
|
||||
return value
|
||||
|
||||
def must_get_env(key: str):
|
||||
value = os.getenv(key)
|
||||
if not value:
|
||||
raise ValueError(f"环墋变量 {key} 未设置")
|
||||
return value
|
||||
|
||||
ES_API = get_env_with_default("ES_API", "http://192.168.1.45:1444")
|
||||
PG_DSN = must_get_env("PG_DSN")
|
||||
0
cucyuqing/es.py
Normal file
0
cucyuqing/es.py
Normal file
39
cucyuqing/pg.py
Normal file
39
cucyuqing/pg.py
Normal file
@@ -0,0 +1,39 @@
|
||||
"""
|
||||
PostgreSQL 数据库连接池
|
||||
"""
|
||||
|
||||
from cucyuqing.config import PG_DSN
|
||||
from psycopg_pool import AsyncConnectionPool
|
||||
from psycopg import AsyncClientCursor, AsyncCursor, version
|
||||
from typing import AsyncGenerator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
pool = AsyncConnectionPool(
|
||||
conninfo=PG_DSN,
|
||||
min_size=1,
|
||||
max_size=5,
|
||||
open=False, # improtant
|
||||
check=AsyncConnectionPool.check_connection,
|
||||
)
|
||||
|
||||
|
||||
async def _get_cur(auto_commit: bool = True) -> AsyncGenerator[AsyncCursor, None]:
|
||||
"""
|
||||
asynccontenxt 实现,用于 FastAPI 的依赖注入
|
||||
"""
|
||||
async with pool.connection() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
yield cur
|
||||
if auto_commit:
|
||||
await conn.commit()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_cur(auto_commit: bool = True) -> AsyncGenerator[AsyncCursor, None]:
|
||||
"""
|
||||
用法:
|
||||
async with get_cur() as cur:
|
||||
await cur.execute("SELECT 1")
|
||||
"""
|
||||
async for cur in _get_cur(auto_commit):
|
||||
yield cur
|
||||
1
pyrightconfig.json
Normal file
1
pyrightconfig.json
Normal file
@@ -0,0 +1 @@
|
||||
{}
|
||||
7
requirements.txt
Normal file
7
requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
python-dotenv
|
||||
psycopg[binary]
|
||||
psycopg[pool]
|
||||
aiohttp
|
||||
fastapi
|
||||
pydantic
|
||||
aiomysql
|
||||
Reference in New Issue
Block a user