从 pg info 字段中提取出需要的字段

This commit is contained in:
2024-09-11 14:48:36 +08:00
parent 8009a8295c
commit c9d361f56b

View File

@@ -72,15 +72,30 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
), size): ), size):
yield doc yield doc
def extract_dict_info(d: dict) -> dict:
d.pop("id")
d.pop("title")
d.pop("content")
d.pop("url")
d.pop("crawled_at")
d.pop("site")
d.pop("time")
return d
async def batch_save_to_pg(docs: list[dict]): async def batch_save_to_pg(docs: list[dict]):
async with get_cur() as cur: async with get_cur() as cur:
await cur.executemany( await cur.executemany(
""" """
INSERT INTO risk_news (id, info) INSERT INTO risk_news (id, title, content, url,
VALUES (%s, %s) crawled_at, platform, time, info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO NOTHING ON CONFLICT (id) DO NOTHING
""", """,
[(doc['id'], json.dumps(doc)) for doc in docs] [(
doc['id'], doc['title'], doc['content'], doc['url'],
parse_unixtime(doc['crawled_at']), doc['site'],
parse_unixtime(doc['time']),
json.dumps(extract_dict_info(doc)),
) for doc in docs]
) )
print(f"保存 {len(docs)} 条数据到 PG 成功") print(f"保存 {len(docs)} 条数据到 PG 成功")
@@ -119,7 +134,7 @@ async def batch_save_to_mysql(docs: list[dict]):
"like_count": doc.get("like_count", 0), "like_count": doc.get("like_count", 0),
"comment_count": doc.get("comment_count", 0), "comment_count": doc.get("comment_count", 0),
"share_count": doc.get("share_count", 0), "share_count": doc.get("share_count", 0),
"platform": doc.get("platform"), "platform": doc['site'],
"published_at": parse_unixtime(doc["crawled_at"]), "published_at": parse_unixtime(doc["crawled_at"]),
"es_id": doc["id"] "es_id": doc["id"]
} }