This commit is contained in:
2024-09-11 16:25:17 +08:00
parent c9d361f56b
commit 09b22517df

View File

@@ -66,12 +66,16 @@ async def fetch(interval: ESInterval, size=1000) -> AsyncIterable[dict]:
# 这里使用递归 # 这里使用递归
if len(docs) == size: if len(docs) == size:
print("继续请求下一页数据") print("继续请求下一页数据")
async for doc in fetch(ESInterval( async for doc in fetch(
start_time=parse_unixtime(docs[-1]["crawled_at"]), ESInterval(
end_time=interval.end_time, start_time=parse_unixtime(docs[-1]["crawled_at"]),
), size): end_time=interval.end_time,
),
size,
):
yield doc yield doc
def extract_dict_info(d: dict) -> dict: def extract_dict_info(d: dict) -> dict:
d.pop("id") d.pop("id")
d.pop("title") d.pop("title")
@@ -81,6 +85,8 @@ def extract_dict_info(d: dict) -> dict:
d.pop("site") d.pop("site")
d.pop("time") d.pop("time")
return d return d
async def batch_save_to_pg(docs: list[dict]): async def batch_save_to_pg(docs: list[dict]):
async with get_cur() as cur: async with get_cur() as cur:
await cur.executemany( await cur.executemany(
@@ -89,26 +95,39 @@ async def batch_save_to_pg(docs: list[dict]):
crawled_at, platform, time, info) crawled_at, platform, time, info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO NOTHING ON CONFLICT (id) DO NOTHING
""", ;""",
[( [
doc['id'], doc['title'], doc['content'], doc['url'], (
parse_unixtime(doc['crawled_at']), doc['site'], doc["id"],
parse_unixtime(doc['time']), doc["title"],
json.dumps(extract_dict_info(doc)), doc["content"],
) for doc in docs] doc["url"],
parse_unixtime(doc["crawled_at"]),
doc["site"],
parse_unixtime(doc["time"]),
json.dumps(extract_dict_info(doc)),
)
for doc in docs
],
) )
print(f"保存 {len(docs)} 条数据到 PG 成功") print(f"保存 {len(docs)} 条数据到 PG 成功")
async def save_es_sync_log(start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int):
async def save_es_sync_log(
start_time: datetime.datetime, end_time: datetime.datetime, documents_count: int
):
async with get_cur() as cur: async with get_cur() as cur:
await cur.execute( await cur.execute(
""" """
INSERT INTO es_collect_logs (start_time, end_time, documents_count) INSERT INTO es_collect_logs (start_time, end_time, documents_count)
VALUES (%s, %s, %s) VALUES (%s, %s, %s)
""", """,
(start_time, end_time, documents_count) (start_time, end_time, documents_count),
) )
print(f"保存同步日志成功,时间段 {start_time}{end_time},共 {documents_count} 条数据") print(
f"保存同步日志成功,时间段 {start_time}{end_time},共 {documents_count} 条数据"
)
async def batch_save_to_mysql(docs: list[dict]): async def batch_save_to_mysql(docs: list[dict]):
query = """ query = """
@@ -134,9 +153,9 @@ async def batch_save_to_mysql(docs: list[dict]):
"like_count": doc.get("like_count", 0), "like_count": doc.get("like_count", 0),
"comment_count": doc.get("comment_count", 0), "comment_count": doc.get("comment_count", 0),
"share_count": doc.get("share_count", 0), "share_count": doc.get("share_count", 0),
"platform": doc['site'], "platform": doc["site"],
"published_at": parse_unixtime(doc["crawled_at"]), "published_at": parse_unixtime(doc["crawled_at"]),
"es_id": doc["id"] "es_id": doc["id"],
} }
for doc in docs for doc in docs
] ]
@@ -144,6 +163,7 @@ async def batch_save_to_mysql(docs: list[dict]):
await mysql.execute_many(query=query, values=values) await mysql.execute_many(query=query, values=values)
print(f"保存 {len(docs)} 条数据到 mysql 成功") print(f"保存 {len(docs)} 条数据到 mysql 成功")
async def sync(): async def sync():
await asyncio.gather(pool.open(), mysql.connect()) await asyncio.gather(pool.open(), mysql.connect())
while True: while True:
@@ -183,7 +203,6 @@ async def sync():
batch_save_to_mysql(docs), batch_save_to_mysql(docs),
) )
await save_es_sync_log(interval.start_time, interval.end_time, len(docs)) await save_es_sync_log(interval.start_time, interval.end_time, len(docs))
async def main(): async def main():