import PyPDF2 import html2text import re import hashlib from nio import ( DownloadError, MatrixRoom, RoomMessageAudio, RoomMessageFile, RoomMessageText, ) from langchain.text_splitter import MarkdownTextSplitter from bot import Bot, print import asyncio import io import yt_dlp import os import subprocess from langchain.embeddings import OpenAIEmbeddings from selenium import webdriver if not os.path.exists("./cache"): os.mkdir("./cache") if not os.path.exists("./cache/yt-dlp"): os.mkdir("./cache/yt-dlp") print("lanuching driver") options = webdriver.FirefoxOptions() options.add_argument("-headless") driver = webdriver.Firefox(options=options) async def get_html(url: str) -> str: driver.get(url) await asyncio.sleep(3) return driver.page_source or "" import openai embeddings_model = OpenAIEmbeddings( openai_api_key=os.environ["OPENAI_API_KEY"], openai_api_base=os.environ["OPENAI_API_BASE"], show_progress_bar=True, ) client = Bot( os.environ["BOT_DB_HOMESERVER"], os.environ["BOT_DB_USER"], os.environ["MATRIX_CHAIN_DEVICE"], os.environ["BOT_DB_ACCESS_TOKEN"], ) client.welcome_message = """欢迎使用 matrix chain db 插件,我能将房间中的所有文件添加进embedding数据库,并为gpt提供支持 ## 使用方式 - 发送文件或视频链接 目前支持文件格式:txt / pdf / md / doc / docx / ppt / pptx 目前支持视频链接:Bilibili / Youtube ## 配置选项 - !clean 或 !clear 清除该房间中所有的embedding信息 - !embedding on 或 !embedding off 开启或关闭房间内embedding功能 (默认关闭)""" spliter = MarkdownTextSplitter( chunk_size=400, chunk_overlap=100, length_function=client.get_token_length, ) offices_mimetypes = [ "application/wps-office.docx", "application/wps-office.doc", "application/wps-office.pptx", "application/wps-office.ppt", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.wordprocessingml.template", "application/vnd.ms-powerpoint", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.oasis.opendocument.text", "application/vnd.oasis.opendocument.presentation", ] mimetypes = [ "text/plain", "application/pdf", "text/markdown", "text/html", ] + offices_mimetypes def allowed_file(mimetype: str): return mimetype.lower() in mimetypes async def create_embedding(room, event, md5, content, url): transaction = await client.db.transaction() await client.db.execute( query="""insert into documents (md5, content, token, url) values (:md5, :content, :token, :url) on conflict (md5) do nothing ;""", values={ "md5": md5, "content": content, "token": client.get_token_length(content), "url": url, }, ) rows = await client.db.fetch_all( query="select document_md5 from room_document where room = :room and document_md5 = :md5 limit 1;", values={"room": room.room_id, "md5": md5}, ) if len(rows) > 0: await transaction.rollback() print("document alreadly insert in room", md5, room.room_id) await client.room_send( room.room_id, "m.reaction", { "m.relates_to": { "event_id": event.event_id, "key": "👍", "rel_type": "m.annotation", } }, ) return await client.db.execute( query=""" insert into room_document (room, document_md5) values (:room_id, :md5) on conflict (room, document_md5) do nothing ;""", values={"room_id": room.room_id, "md5": md5}, ) # start embedding chunks = spliter.split_text(content) print("chunks", len(chunks)) embeddings = await embeddings_model.aembed_documents(chunks, chunk_size=1600) print("embedding finished", len(embeddings)) if len(chunks) != len(embeddings): raise ValueError("asdf") insert_data: list[dict] = [] for chunk, embedding in zip(chunks, embeddings): insert_data.append( { "document_md5": md5, "md5": hashlib.md5(chunk.encode()).hexdigest(), "content": chunk, "token": client.get_token_length(chunk), "embedding": str(embedding), } ) await client.db.execute_many( query="""insert into embeddings (document_md5, md5, content, token, embedding) values (:document_md5, :md5, :content, :token, :embedding) on conflict (document_md5, md5) do nothing ;""", values=insert_data, ) print("insert", len(insert_data), "embedding data") await client.db.execute( query=""" insert into event_document (event, document_md5) values (:event_id, :md5) on conflict (event) do nothing ;""", values={"event_id": event.event_id, "md5": md5}, ) await transaction.commit() await client.room_send( room.room_id, "m.reaction", { "m.relates_to": { "event_id": event.event_id, "key": "😘", "rel_type": "m.annotation", } }, ) def clean_html(html: str) -> str: h2t = html2text.HTML2Text() h2t.ignore_emphasis = True h2t.ignore_images = True h2t.ignore_links = True h2t.body_width = 0 content = h2t.handle(html) return content def clean_content(content: str, mimetype: str, document_md5: str) -> str: # clean 0x00 content = content.replace("\x00", "") # clean links content = re.sub(r"\[.*?\]\(.*?\)", "", content) content = re.sub(r"!\[.*?\]\(.*?\)", "", content) # clean lines lines = [i.strip() for i in content.split("\n\n")] while "" in lines: lines.remove("") content = "\n\n".join(lines) content = "\n".join([i.strip() for i in content.split("\n")]) return content def pdf_to_text(f) -> str: pdf_reader = PyPDF2.PdfReader(f) num_pages = len(pdf_reader.pages) content = "" for page_number in range(num_pages): page = pdf_reader.pages[page_number] content += page.extract_text() return content @client.ignore_self_message @client.handel_no_gpt @client.log_message @client.with_typing @client.replace_command_mark @client.safe_try async def message_file(room: MatrixRoom, event: RoomMessageFile): print("received file") mimetype = event.flattened().get("content.info.mimetype", "") if not allowed_file(mimetype): print("not allowed file", event.body) raise ValueError("not allowed file") resp = await client.download(event.url) if isinstance(resp, DownloadError): raise ValueError("file donwload error") assert isinstance(resp.body, bytes) md5 = hashlib.md5(resp.body).hexdigest() document_fetch_result = await client.db.fetch_one( query="select content from documents where md5 = :md5 limit 1;", values={"md5": md5}, ) # get content content = document_fetch_result[0] if document_fetch_result else "" # document not exists if content: print("document", md5, "alreadly exists") else: if mimetype == "text/plain" or mimetype == "text/markdown": content = resp.body.decode() elif mimetype == "text/html": content = clean_html(resp.body.decode()) elif mimetype == "application/pdf": f = io.BytesIO(resp.body) content = pdf_to_text(f) elif mimetype in offices_mimetypes: # save file to temp dir base = event.body.rsplit(".", 1)[0] ext = event.body.rsplit(".", 1)[1] print("base", base) source_filepath = os.path.join("./cache/office", event.body) txt_filename = base + ".txt" txt_filepath = os.path.join("./cache/office", txt_filename) print("source_filepath", source_filepath) with open(source_filepath, "wb") as f: f.write(resp.body) if ext in ["doc", "docx", "odt"]: process = subprocess.Popen( [ "soffice", "--headless", "--convert-to", "txt:Text", "--outdir", "./cache/office", source_filepath, ] ) process.wait() with open(txt_filepath, "r") as f: content = f.read() elif ext in ["ppt", "pptx", "odp"]: pdf_filename = base + ".pdf" pdf_filepath = os.path.join("./cache/office", pdf_filename) process = subprocess.Popen( [ "soffice", "--headless", "--convert-to", "pdf", "--outdir", "./cache/office", source_filepath, ] ) process.wait() with open(pdf_filepath, "rb") as f: content = pdf_to_text(f) else: raise ValueError("unknown ext: ", ext) print("converted txt", content) else: raise ValueError("unknown mimetype", mimetype) content = clean_content(content, mimetype, md5) print("content length", len(content)) await create_embedding(room, event, md5, content, event.url) client.add_event_callback(message_file, RoomMessageFile) yt_dlp_support = ["b23.tv/", "www.bilibili.com/video/", "youtube.com/"] def allow_yt_dlp(link: str) -> bool: if not link.startswith("http://") and not link.startswith("https://"): return False allow = False for u in yt_dlp_support: if u in link: allow = True break return allow def allow_web(link: str) -> bool: print("checking web url", link) if not link.startswith("http://") and not link.startswith("https://"): return False return True @client.message_callback_common_wrapper async def message_text(room: MatrixRoom, event: RoomMessageText) -> None: if event.body.startswith("!"): should_react = True if event.body.startswith("!clear") or event.body.startswith("!clean"): # save to db async with client.db.transaction(): await client.db.execute( query=""" delete from embeddings e using room_document rd where e.document_md5 = rd.document_md5 and rd.room = :room_id; """, values={"room_id": room.room_id}, ) await client.db.execute( query="delete from room_document where room = :room_id;", values={"room_id": room.room_id}, ) elif event.body.startswith("!embedding"): sp = event.body.split() if len(sp) < 2: return if not sp[1].lower() in ["on", "off"]: return status = sp[1].lower() == "on" await client.db.execute( query=""" insert into room_configs (room, embedding) values (:room_id, :status) on conflict (room) do update set embedding = excluded.embedding ;""", values={"room_id": room.room_id, "status": status}, ) else: should_react = False if should_react: await client.room_send( room.room_id, "m.reaction", { "m.relates_to": { "event_id": event.event_id, "key": "😘", "rel_type": "m.annotation", } }, ) return if allow_yt_dlp(event.body.split()[0]): # handle yt-dlp ydl_opts = { "format": "wa*", # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments "postprocessors": [ { # Extract audio using ffmpeg "key": "FFmpegExtractAudio", #'preferredcodec': 'opus', #'preferredquality': 64, } ], } url = event.body.split()[0] info = None with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filepath = info["requested_downloads"][0]["filepath"] filename = info["requested_downloads"][0]["filename"] title = info["title"] realfilepath = os.path.join("./cache/yt-dlp", filename) os.rename(filepath, realfilepath) result = openai.Audio.transcribe( file=open(realfilepath, "rb"), model="large-v2", prompt=title, ) result = "\n".join([i.text for i in result["segments"]]) print(event.sender, result) md5 = hashlib.md5(result.encode()).hexdigest() await create_embedding(room, event, md5, result, url) return if allow_web(event.body.split()[0]): url = event.body.split()[0] print("downloading", url) html = await get_html(url) md5 = hashlib.md5(html.encode()).hexdigest() content = clean_html(html) content = clean_content(content, "text/markdown", md5) if not content: raise ValueError("Empty content") print(content) await create_embedding(room, event, md5, content, url) return client.add_event_callback(message_text, RoomMessageText) asyncio.run(client.sync_forever())