From c9d361f56b55d2d102d052b1f6cb80deb266c2cd Mon Sep 17 00:00:00 2001 From: heimoshuiyu Date: Wed, 11 Sep 2024 14:48:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=8E=20pg=20info=20=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E4=B8=AD=E6=8F=90=E5=8F=96=E5=87=BA=E9=9C=80=E8=A6=81=E7=9A=84?= =?UTF-8?q?=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cucyuqing/cmd/es-sync.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/cucyuqing/cmd/es-sync.py b/cucyuqing/cmd/es-sync.py index 071d6aa..7e8c847 100644 --- a/cucyuqing/cmd/es-sync.py +++ b/cucyuqing/cmd/es-sync.py @@ -72,15 +72,30 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]: ), size): yield doc +def extract_dict_info(d: dict) -> dict: + d.pop("id") + d.pop("title") + d.pop("content") + d.pop("url") + d.pop("crawled_at") + d.pop("site") + d.pop("time") + return d async def batch_save_to_pg(docs: list[dict]): async with get_cur() as cur: await cur.executemany( """ - INSERT INTO risk_news (id, info) - VALUES (%s, %s) + INSERT INTO risk_news (id, title, content, url, + crawled_at, platform, time, info) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO NOTHING """, - [(doc['id'], json.dumps(doc)) for doc in docs] + [( + doc['id'], doc['title'], doc['content'], doc['url'], + parse_unixtime(doc['crawled_at']), doc['site'], + parse_unixtime(doc['time']), + json.dumps(extract_dict_info(doc)), + ) for doc in docs] ) print(f"保存 {len(docs)} 条数据到 PG 成功") @@ -119,7 +134,7 @@ async def batch_save_to_mysql(docs: list[dict]): "like_count": doc.get("like_count", 0), "comment_count": doc.get("comment_count", 0), "share_count": doc.get("share_count", 0), - "platform": doc.get("platform"), + "platform": doc['site'], "published_at": parse_unixtime(doc["crawled_at"]), "es_id": doc["id"] }