实现 同步数据到 mysql
This commit is contained in:
@@ -11,6 +11,7 @@ import pydantic
|
|||||||
from typing import AsyncIterable
|
from typing import AsyncIterable
|
||||||
from cucyuqing.config import ES_API
|
from cucyuqing.config import ES_API
|
||||||
from cucyuqing.pg import get_cur, pool
|
from cucyuqing.pg import get_cur, pool
|
||||||
|
from cucyuqing.mysql import mysql
|
||||||
|
|
||||||
|
|
||||||
async def post(url: str, json: dict) -> dict:
|
async def post(url: str, json: dict) -> dict:
|
||||||
@@ -94,8 +95,42 @@ async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.dat
|
|||||||
)
|
)
|
||||||
print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据")
|
print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据")
|
||||||
|
|
||||||
|
async def batch_save_to_mysql(docs: list[dict]):
|
||||||
|
query = """
|
||||||
|
INSERT IGNORE INTO risk_news (
|
||||||
|
title, content, origin_url,
|
||||||
|
read_count, like_count, comment_count, share_count, platform,
|
||||||
|
published_at, es_id,
|
||||||
|
created_at, updated_at
|
||||||
|
) VALUES (
|
||||||
|
:title, :content, :origin_url,
|
||||||
|
:read_count, :like_count, :comment_count, :share_count, :platform,
|
||||||
|
:published_at, :es_id,
|
||||||
|
NOW(), NOW()
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
values = [
|
||||||
|
{
|
||||||
|
"title": doc["title"],
|
||||||
|
"content": doc["content"],
|
||||||
|
"origin_url": doc["url"],
|
||||||
|
"read_count": doc.get("read_count", 0),
|
||||||
|
"like_count": doc.get("like_count", 0),
|
||||||
|
"comment_count": doc.get("comment_count", 0),
|
||||||
|
"share_count": doc.get("share_count", 0),
|
||||||
|
"platform": doc.get("platform"),
|
||||||
|
"published_at": parse_unixtime(doc["crawled_at"]),
|
||||||
|
"es_id": doc["id"]
|
||||||
|
}
|
||||||
|
for doc in docs
|
||||||
|
]
|
||||||
|
|
||||||
|
await mysql.execute_many(query=query, values=values)
|
||||||
|
print(f"保存 {len(docs)} 条数据到 mysql 成功")
|
||||||
|
|
||||||
async def sync():
|
async def sync():
|
||||||
await pool.open()
|
await asyncio.gather(pool.open(), mysql.connect())
|
||||||
while True:
|
while True:
|
||||||
# 获取待同步时间段
|
# 获取待同步时间段
|
||||||
async with get_cur() as cur:
|
async with get_cur() as cur:
|
||||||
@@ -128,8 +163,12 @@ async def sync():
|
|||||||
)
|
)
|
||||||
for interval in es_intervals:
|
for interval in es_intervals:
|
||||||
docs = [doc async for doc in fetch(interval)]
|
docs = [doc async for doc in fetch(interval)]
|
||||||
await batch_save_to_pg(docs)
|
await asyncio.gather(
|
||||||
|
batch_save_to_pg(docs),
|
||||||
|
batch_save_to_mysql(docs),
|
||||||
|
)
|
||||||
await save_es_sync_log(interval.start_time, interval.end_time, len(docs))
|
await save_es_sync_log(interval.start_time, interval.end_time, len(docs))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|||||||
@@ -21,3 +21,4 @@ def must_get_env(key: str):
|
|||||||
|
|
||||||
ES_API = get_env_with_default("ES_API", "http://192.168.1.45:1444")
|
ES_API = get_env_with_default("ES_API", "http://192.168.1.45:1444")
|
||||||
PG_DSN = must_get_env("PG_DSN")
|
PG_DSN = must_get_env("PG_DSN")
|
||||||
|
MYSQL_DSN = must_get_env("MYSQL_DSN")
|
||||||
|
|||||||
5
cucyuqing/mysql.py
Normal file
5
cucyuqing/mysql.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
'''MYSQL 数据库连接池'''
|
||||||
|
from databases import Database
|
||||||
|
from cucyuqing.config import MYSQL_DSN
|
||||||
|
|
||||||
|
mysql = Database(MYSQL_DSN)
|
||||||
@@ -4,4 +4,4 @@ psycopg[pool]
|
|||||||
aiohttp
|
aiohttp
|
||||||
fastapi
|
fastapi
|
||||||
pydantic
|
pydantic
|
||||||
aiomysql
|
databases[aiomysql]
|
||||||
|
|||||||
Reference in New Issue
Block a user