diff --git a/cucyuqing/cmd/es-sync.py b/cucyuqing/cmd/es-sync.py index b172888..071d6aa 100644 --- a/cucyuqing/cmd/es-sync.py +++ b/cucyuqing/cmd/es-sync.py @@ -11,6 +11,7 @@ import pydantic from typing import AsyncIterable from cucyuqing.config import ES_API from cucyuqing.pg import get_cur, pool +from cucyuqing.mysql import mysql async def post(url: str, json: dict) -> dict: @@ -94,8 +95,42 @@ async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.dat ) print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据") +async def batch_save_to_mysql(docs: list[dict]): + query = """ + INSERT IGNORE INTO risk_news ( + title, content, origin_url, + read_count, like_count, comment_count, share_count, platform, + published_at, es_id, + created_at, updated_at + ) VALUES ( + :title, :content, :origin_url, + :read_count, :like_count, :comment_count, :share_count, :platform, + :published_at, :es_id, + NOW(), NOW() + ) + """ + + values = [ + { + "title": doc["title"], + "content": doc["content"], + "origin_url": doc["url"], + "read_count": doc.get("read_count", 0), + "like_count": doc.get("like_count", 0), + "comment_count": doc.get("comment_count", 0), + "share_count": doc.get("share_count", 0), + "platform": doc.get("platform"), + "published_at": parse_unixtime(doc["crawled_at"]), + "es_id": doc["id"] + } + for doc in docs + ] + + await mysql.execute_many(query=query, values=values) + print(f"保存 {len(docs)} 条数据到 mysql 成功") + async def sync(): - await pool.open() + await asyncio.gather(pool.open(), mysql.connect()) while True: # 获取待同步时间段 async with get_cur() as cur: @@ -128,8 +163,12 @@ async def sync(): ) for interval in es_intervals: docs = [doc async for doc in fetch(interval)] - await batch_save_to_pg(docs) + await asyncio.gather( + batch_save_to_pg(docs), + batch_save_to_mysql(docs), + ) await save_es_sync_log(interval.start_time, interval.end_time, len(docs)) + async def main(): diff --git a/cucyuqing/config.py b/cucyuqing/config.py index 94e5f36..8f71773 100644 --- a/cucyuqing/config.py +++ b/cucyuqing/config.py @@ -21,3 +21,4 @@ def must_get_env(key: str): ES_API = get_env_with_default("ES_API", "http://192.168.1.45:1444") PG_DSN = must_get_env("PG_DSN") +MYSQL_DSN = must_get_env("MYSQL_DSN") diff --git a/cucyuqing/mysql.py b/cucyuqing/mysql.py new file mode 100644 index 0000000..dd4d0ca --- /dev/null +++ b/cucyuqing/mysql.py @@ -0,0 +1,5 @@ +'''MYSQL 数据库连接池''' +from databases import Database +from cucyuqing.config import MYSQL_DSN + +mysql = Database(MYSQL_DSN) diff --git a/requirements.txt b/requirements.txt index 4714423..9075859 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ psycopg[pool] aiohttp fastapi pydantic -aiomysql +databases[aiomysql]