diff --git a/cucyuqing/cmd/es-sync.py b/cucyuqing/cmd/es-sync.py index 7e8c847..8ec6088 100644 --- a/cucyuqing/cmd/es-sync.py +++ b/cucyuqing/cmd/es-sync.py @@ -66,12 +66,16 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]: # 这里使用递归 if len(docs) == size: print("继续请求下一页数据") - async for doc in fetch(ESInterval( - start_time=parse_unixtime(docs[-1]["crawled_at"]), - end_time=interval.end_time, - ), size): + async for doc in fetch( + ESInterval( + start_time=parse_unixtime(docs[-1]["crawled_at"]), + end_time=interval.end_time, + ), + size, + ): yield doc + def extract_dict_info(d: dict) -> dict: d.pop("id") d.pop("title") @@ -81,6 +85,8 @@ def extract_dict_info(d: dict) -> dict: d.pop("site") d.pop("time") return d + + async def batch_save_to_pg(docs: list[dict]): async with get_cur() as cur: await cur.executemany( @@ -89,26 +95,39 @@ async def batch_save_to_pg(docs: list[dict]): crawled_at, platform, time, info) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO NOTHING - """, - [( - doc['id'], doc['title'], doc['content'], doc['url'], - parse_unixtime(doc['crawled_at']), doc['site'], - parse_unixtime(doc['time']), - json.dumps(extract_dict_info(doc)), - ) for doc in docs] + ;""", + [ + ( + doc["id"], + doc["title"], + doc["content"], + doc["url"], + parse_unixtime(doc["crawled_at"]), + doc["site"], + parse_unixtime(doc["time"]), + json.dumps(extract_dict_info(doc)), + ) + for doc in docs + ], ) print(f"保存 {len(docs)} 条数据到 PG 成功") -async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int): + +async def save_es_sync_log( + start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int +): async with get_cur() as cur: await cur.execute( """ INSERT INTO es_collect_logs (start_time, end_time, documents_count) VALUES (%s, %s, %s) """, - (start_time, end_time, documents_count) + (start_time, end_time, documents_count), ) - print(f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据") + print( + f"保存同步日志成功,时间段 {start_time} 到 {end_time},共 {documents_count} 条数据" + ) + async def batch_save_to_mysql(docs: list[dict]): query = """ @@ -134,9 +153,9 @@ async def batch_save_to_mysql(docs: list[dict]): "like_count": doc.get("like_count", 0), "comment_count": doc.get("comment_count", 0), "share_count": doc.get("share_count", 0), - "platform": doc['site'], + "platform": doc["site"], "published_at": parse_unixtime(doc["crawled_at"]), - "es_id": doc["id"] + "es_id": doc["id"], } for doc in docs ] @@ -144,6 +163,7 @@ async def batch_save_to_mysql(docs: list[dict]): await mysql.execute_many(query=query, values=values) print(f"保存 {len(docs)} 条数据到 mysql 成功") + async def sync(): await asyncio.gather(pool.open(), mysql.connect()) while True: @@ -183,7 +203,6 @@ async def sync(): batch_save_to_mysql(docs), ) await save_es_sync_log(interval.start_time, interval.end_time, len(docs)) - async def main():