import PyPDF2
import html2text
import re
import hashlib
from nio import (
DownloadError,
MatrixRoom,
RoomMessageAudio,
RoomMessageFile,
RoomMessageText,
)
from langchain.text_splitter import MarkdownTextSplitter
from bot import Bot, print
import asyncio
import io
import yt_dlp
import os
import subprocess
from langchain.embeddings import OpenAIEmbeddings
from selenium import webdriver
if not os.path.exists("./cache"):
os.mkdir("./cache")
if not os.path.exists("./cache/yt-dlp"):
os.mkdir("./cache/yt-dlp")
print("lanuching driver")
options = webdriver.FirefoxOptions()
options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
async def get_html(url: str) -> str:
driver.get(url)
await asyncio.sleep(3)
return driver.page_source or ""
import openai
embeddings_model = OpenAIEmbeddings(
openai_api_key=os.environ["OPENAI_API_KEY"],
openai_api_base=os.environ["OPENAI_API_BASE"],
show_progress_bar=True,
)
client = Bot(
os.environ["BOT_DB_HOMESERVER"],
os.environ["BOT_DB_USER"],
os.environ["MATRIX_CHAIN_DEVICE"],
os.environ["BOT_DB_ACCESS_TOKEN"],
)
client.welcome_message = """欢迎使用 matrix chain db 插件,我能将房间中的所有文件添加进embedding数据库,并为gpt提供支持
## 使用方式
- 发送文件或视频链接
目前支持文件格式:txt / pdf / md / doc / docx / ppt / pptx
目前支持视频链接:Bilibili / Youtube
## 配置选项
- !clean 或 !clear 清除该房间中所有的embedding信息
- !embedding on 或 !embedding off 开启或关闭房间内embedding功能 (默认关闭)"""
spliter = MarkdownTextSplitter(
chunk_size=400,
chunk_overlap=100,
length_function=client.get_token_length,
)
offices_mimetypes = [
"application/wps-office.docx",
"application/wps-office.doc",
"application/wps-office.pptx",
"application/wps-office.ppt",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.wordprocessingml.template",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.oasis.opendocument.text",
"application/vnd.oasis.opendocument.presentation",
]
mimetypes = [
"text/plain",
"application/pdf",
"text/markdown",
"text/html",
] + offices_mimetypes
def allowed_file(mimetype: str):
return mimetype.lower() in mimetypes
async def create_embedding(room, event, md5, content, url):
transaction = await client.db.transaction()
await client.db.execute(
query="""insert into documents (md5, content, token, url)
values (:md5, :content, :token, :url)
on conflict (md5) do nothing
;""",
values={
"md5": md5,
"content": content,
"token": client.get_token_length(content),
"url": url,
},
)
rows = await client.db.fetch_all(
query="select document_md5 from room_document where room = :room and document_md5 = :md5 limit 1;",
values={"room": room.room_id, "md5": md5},
)
if len(rows) > 0:
await transaction.rollback()
print("document alreadly insert in room", md5, room.room_id)
await client.room_send(
room.room_id,
"m.reaction",
{
"m.relates_to": {
"event_id": event.event_id,
"key": "👍",
"rel_type": "m.annotation",
}
},
)
return
await client.db.execute(
query="""
insert into room_document (room, document_md5)
values (:room_id, :md5)
on conflict (room, document_md5) do nothing
;""",
values={"room_id": room.room_id, "md5": md5},
)
# start embedding
chunks = spliter.split_text(content)
print("chunks", len(chunks))
embeddings = await embeddings_model.aembed_documents(chunks, chunk_size=1600)
print("embedding finished", len(embeddings))
if len(chunks) != len(embeddings):
raise ValueError("asdf")
insert_data: list[dict] = []
for chunk, embedding in zip(chunks, embeddings):
insert_data.append(
{
"document_md5": md5,
"md5": hashlib.md5(chunk.encode()).hexdigest(),
"content": chunk,
"token": client.get_token_length(chunk),
"embedding": str(embedding),
}
)
await client.db.execute_many(
query="""insert into embeddings (document_md5, md5, content, token, embedding)
values (:document_md5, :md5, :content, :token, :embedding)
on conflict (document_md5, md5) do nothing
;""",
values=insert_data,
)
print("insert", len(insert_data), "embedding data")
await client.db.execute(
query="""
insert into event_document (event, document_md5)
values (:event_id, :md5)
on conflict (event) do nothing
;""",
values={"event_id": event.event_id, "md5": md5},
)
await transaction.commit()
await client.room_send(
room.room_id,
"m.reaction",
{
"m.relates_to": {
"event_id": event.event_id,
"key": "😘",
"rel_type": "m.annotation",
}
},
)
def clean_html(html: str) -> str:
h2t = html2text.HTML2Text()
h2t.ignore_emphasis = True
h2t.ignore_images = True
h2t.ignore_links = True
h2t.body_width = 0
content = h2t.handle(html)
return content
def clean_content(content: str, mimetype: str, document_md5: str) -> str:
# clean 0x00
content = content.replace("\x00", "")
# clean links
content = re.sub(r"\[.*?\]\(.*?\)", "", content)
content = re.sub(r"!\[.*?\]\(.*?\)", "", content)
# clean lines
lines = [i.strip() for i in content.split("\n\n")]
while "" in lines:
lines.remove("")
content = "\n\n".join(lines)
content = "\n".join([i.strip() for i in content.split("\n")])
return content
def pdf_to_text(f) -> str:
pdf_reader = PyPDF2.PdfReader(f)
num_pages = len(pdf_reader.pages)
content = ""
for page_number in range(num_pages):
page = pdf_reader.pages[page_number]
content += page.extract_text()
return content
@client.ignore_self_message
@client.handel_no_gpt
@client.log_message
@client.with_typing
@client.replace_command_mark
@client.safe_try
async def message_file(room: MatrixRoom, event: RoomMessageFile):
print("received file")
mimetype = event.flattened().get("content.info.mimetype", "")
if not allowed_file(mimetype):
print("not allowed file", event.body)
raise ValueError("not allowed file")
resp = await client.download(event.url)
if isinstance(resp, DownloadError):
raise ValueError("file donwload error")
assert isinstance(resp.body, bytes)
md5 = hashlib.md5(resp.body).hexdigest()
document_fetch_result = await client.db.fetch_one(
query="select content from documents where md5 = :md5 limit 1;",
values={"md5": md5},
)
# get content
content = document_fetch_result[0] if document_fetch_result else ""
# document not exists
if content:
print("document", md5, "alreadly exists")
else:
if mimetype == "text/plain" or mimetype == "text/markdown":
content = resp.body.decode()
elif mimetype == "text/html":
content = clean_html(resp.body.decode())
elif mimetype == "application/pdf":
f = io.BytesIO(resp.body)
content = pdf_to_text(f)
elif mimetype in offices_mimetypes:
# save file to temp dir
base = event.body.rsplit(".", 1)[0]
ext = event.body.rsplit(".", 1)[1]
print("base", base)
source_filepath = os.path.join("./cache/office", event.body)
txt_filename = base + ".txt"
txt_filepath = os.path.join("./cache/office", txt_filename)
print("source_filepath", source_filepath)
with open(source_filepath, "wb") as f:
f.write(resp.body)
if ext in ["doc", "docx", "odt"]:
process = subprocess.Popen(
[
"soffice",
"--headless",
"--convert-to",
"txt:Text",
"--outdir",
"./cache/office",
source_filepath,
]
)
process.wait()
with open(txt_filepath, "r") as f:
content = f.read()
elif ext in ["ppt", "pptx", "odp"]:
pdf_filename = base + ".pdf"
pdf_filepath = os.path.join("./cache/office", pdf_filename)
process = subprocess.Popen(
[
"soffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
"./cache/office",
source_filepath,
]
)
process.wait()
with open(pdf_filepath, "rb") as f:
content = pdf_to_text(f)
else:
raise ValueError("unknown ext: ", ext)
print("converted txt", content)
else:
raise ValueError("unknown mimetype", mimetype)
content = clean_content(content, mimetype, md5)
print("content length", len(content))
await create_embedding(room, event, md5, content, event.url)
client.add_event_callback(message_file, RoomMessageFile)
yt_dlp_support = ["b23.tv/", "www.bilibili.com/video/", "youtube.com/"]
def allow_yt_dlp(link: str) -> bool:
if not link.startswith("http://") and not link.startswith("https://"):
return False
allow = False
for u in yt_dlp_support:
if u in link:
allow = True
break
return allow
def allow_web(link: str) -> bool:
print("checking web url", link)
if not link.startswith("http://") and not link.startswith("https://"):
return False
return True
@client.message_callback_common_wrapper
async def message_text(room: MatrixRoom, event: RoomMessageText) -> None:
if event.body.startswith("!"):
should_react = True
if event.body.startswith("!clear") or event.body.startswith("!clean"):
# save to db
async with client.db.transaction():
await client.db.execute(
query="""
delete from embeddings e
using room_document rd
where e.document_md5 = rd.document_md5 and
rd.room = :room_id;
""",
values={"room_id": room.room_id},
)
await client.db.execute(
query="delete from room_document where room = :room_id;",
values={"room_id": room.room_id},
)
elif event.body.startswith("!embedding"):
sp = event.body.split()
if len(sp) < 2:
return
if not sp[1].lower() in ["on", "off"]:
return
status = sp[1].lower() == "on"
await client.db.execute(
query="""
insert into room_configs (room, embedding)
values (:room_id, :status)
on conflict (room) do update set embedding = excluded.embedding
;""",
values={"room_id": room.room_id, "status": status},
)
else:
should_react = False
if should_react:
await client.room_send(
room.room_id,
"m.reaction",
{
"m.relates_to": {
"event_id": event.event_id,
"key": "😘",
"rel_type": "m.annotation",
}
},
)
return
if allow_yt_dlp(event.body.split()[0]):
# handle yt-dlp
ydl_opts = {
"format": "wa*",
# ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
"postprocessors": [
{ # Extract audio using ffmpeg
"key": "FFmpegExtractAudio",
#'preferredcodec': 'opus',
#'preferredquality': 64,
}
],
}
url = event.body.split()[0]
info = None
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filepath = info["requested_downloads"][0]["filepath"]
filename = info["requested_downloads"][0]["filename"]
title = info["title"]
realfilepath = os.path.join("./cache/yt-dlp", filename)
os.rename(filepath, realfilepath)
result = openai.Audio.transcribe(
file=open(realfilepath, "rb"),
model="large-v2",
prompt=title,
)
result = "\n".join([i.text for i in result["segments"]])
print(event.sender, result)
md5 = hashlib.md5(result.encode()).hexdigest()
await create_embedding(room, event, md5, result, url)
return
if allow_web(event.body.split()[0]):
url = event.body.split()[0]
print("downloading", url)
html = await get_html(url)
md5 = hashlib.md5(html.encode()).hexdigest()
content = clean_html(html)
content = clean_content(content, "text/markdown", md5)
if not content:
raise ValueError("Empty content")
print(content)
await create_embedding(room, event, md5, content, url)
return
client.add_event_callback(message_text, RoomMessageText)
asyncio.run(client.sync_forever())