From 3afa0d81bbfb625ac94085cceddfa3cbf1371edd Mon Sep 17 00:00:00 2001 From: heimoshuiyu Date: Sun, 28 Apr 2024 15:13:22 +0800 Subject: [PATCH] delete all --- README.md | 130 ---------- addrlist_example.txt | 1 - config.json | 9 - config.py | 86 ------- forwarder.py | 62 ----- main.py | 10 - msw.py | 41 --- mswp.py | 112 -------- plugins/ffmpeg.py | 315 ----------------------- plugins/input.py | 99 -------- plugins/log.py | 26 -- plugins/net.py | 563 ----------------------------------------- plugins/shell.py | 35 --- plugins/update.py | 83 ------ resources/testfile.txt | 1 - test.py | 80 ------ test_file.py | 44 ---- test_tool.py | 21 -- tmp/placeholder | 0 19 files changed, 1718 deletions(-) delete mode 100644 README.md delete mode 100644 addrlist_example.txt delete mode 100644 config.json delete mode 100644 config.py delete mode 100644 forwarder.py delete mode 100644 main.py delete mode 100644 msw.py delete mode 100644 mswp.py delete mode 100644 plugins/ffmpeg.py delete mode 100644 plugins/input.py delete mode 100644 plugins/log.py delete mode 100644 plugins/net.py delete mode 100644 plugins/shell.py delete mode 100644 plugins/update.py delete mode 100644 resources/testfile.txt delete mode 100644 test.py delete mode 100644 test_file.py delete mode 100644 test_tool.py delete mode 100644 tmp/placeholder diff --git a/README.md b/README.md deleted file mode 100644 index fe5c103..0000000 --- a/README.md +++ /dev/null @@ -1,130 +0,0 @@ -# MSW -简单网络框架 -README暂时写中文,英文水平差容易产生歧义orz... - -# 介绍 -- 程序会自动载入`plugins`文件夹内所有python脚本,换言之,所有自定义插件都可以放到plugins目录中 -- 程序会为每一个plugin提供一个独立的接收队列`recive_queue`,插件可以通过调用`recive_queue.get()`获取数据 -- 程序为所有plugin提供一个共用的发送队列`send_queue`,插件可以通过调用`send_queue.put()`来使用框架发送数据 -- “数据”,是指`mswp.py`中定义的`Datapack`类型,这种结构储存了数据包发送目的地、来源、参数等信息 -- `forwarder.py`会读取`send_queue`,判断“数据”中的`app`这一项属性,将其放到对应plugin的接收队列中,起到路由的作用 -总的来说 -plugin完成某项任务后,创建`Datapack`对象并设置好该数据包的发送目的地等参数,将其放入到`send_queue`中,对应plugin就可以收到该数据包 - -# 部署 -部署很简单,把文件下载下来,写好配置直接运行 -## 安装 -运行命令克隆仓库 -> `git clone https://github.com/heimoshuiyu/msw` - -## 配置config.json -配置`config.json`中的`id`,监听地址、端口等信息 -单台机器可以运行多个msw,只要配置不同的id和监听端口接口 -## 配置地址列表 -编辑创建`address.txt`,格式请参考`address_example.txt` -## 运行 -> `python main.py` - -或者 -> `python3 main.py` - -注意:msw仅支持python3 - -# 数据包Datapack -msw中队列传递的都是一种叫Datapack的结构,其结构类似http的请求包(由`mswp.py`定义) -示例 - -```` -post shell msw/0.1 -from: test -id: hmsy -to: miku -flag: 1a2b3c4d - -shutdown now -```` - -- 解释:向名为`miku`的主机的`shell`插件发送`shutdown now`的命令,执行结果返回到`hmsy`主机的`test`插件 -- `Datapack.method`有三种可能的值:`post`、`reply`、`file` - - `post`是最普通的,一般数据包都使用该标识 - - `reply`标记该数据包是一个回复 - - `file`标记该数据包是一个文件,文件由`filename`参数指定,数据包中不会包含文件的内容 -- `Datapack.app`表示数据包发送目的地的plugin名字,如`shell`,即`plugins/shell.py`该程序会收到此数据包 - - 若是找不到对应app,则产生一个警告 -- `Datapack.version`表示当前程序版本,暂时还没什么用 -- `Datapack.head`一个字典,储存参数 - - `to`若存在,参数表示这是一个需要网络发送的数据包,数据包会被发送到`to`指定的主机 - - `id`表示发送人的id,由程序自动配置 - - `from`表示发送的插件名,`reply`方法会用到这个参数,建议设置为`__name__` - - `flag`数据包识别码,由程序随机自动生成,调试用,暂无特殊用途 - - `filename`,仅在`Datapack.method`为`file`时生效, -- `Datapack.body`数据包主内容 - -# 已实现功能 -- `main.py` - - 这是一个守护程序,会调用`os.system()`来运行`msw.py`主程序,主程序若正常退出(返回值0)则重启主程序 -- `forwarder.py` - - 在`send_queue`和`recive_queues`中实现转发功能 - - 根据`Datapack.app`选择队列进行转发 - - 如果参数中包含`to`这一项,将转发给`plugins/net.py`,由net插件进行网络转发 -- `plugins/update.py` - - 依赖`plugins/net.py`,可以快读更新其他主机的代码 - - 该插件可以将程序目录下的文件过滤并打包,然后发送给其他主机 - - 其他主机收到文件,会解压并替换程序原有文件,然后向msw发送0值尝试重启 -- `plugins/input.py` - - 循环调用`input()`,允许用户输入命令 - - 命令格式`插件名:数据包内容;键:值,键2:值2` - - 例如`update:compress;update_to:*`表示向`plugins/update`发送内容为`compress`,参数`update_to=*`的数据包。`compress`和`update_to`字段都是plugins/update.py中定义的,`compress`表示压缩,update表示解压缩,`update_to`表示将升级包发送到这个目录,设置为*表示所有主机,*的功能是在`plugins/net.py`中定义的 -- `plugins/net.py` - - 网络插件,读取`address.txt`中的地址并连接,和其他主机交换地址列表,建立类似dht的网络 - - 如果`Datapack`中含有参数`to`,`forward.py`会无条件将数据包转发到此插件进行网络发送,发送完成后会去掉`to`参数 - - 插件收到其他主机发来的数据包,解码后会放进`send_queue`队列中,如果此时还存在`to`参数,会进行多次转发,实现代理功能 - - 反向代理:插件根据`to`参数确定主机地址,`to`参数内容为主机id,若在连接池中有该id的连接则直接发送,否则参考proxy字典决定是否发送给代理,否则放入发送失败队列等待重发 - - 发送文件:`Datapack.method`为`file`时启动,根据`filename`参数中制定的文件进行发送 - - 接受文件:文件下载完成后才会将数据包放入`send_queue`队列中 - - 心跳&mht:定时交换自身拥有的地址列表、代理列表 - - 重试失败次数超过39次,数据包将被丢弃 -- `plugins/ffmpeg.py` - - 分布式转码 - - 调用系统ffmpeg,对视频进行切片,分发,转码,收集合并 - - 对分发主机启动server模式并指定文件名,对转码主机启动worker模式并指定分发主机的id,即可开始转码 -- `plugins/logger.py` - - 日志记录,会将接收到的数据包写入`log.txt` - -# 编写第一个插件 -建议参考`plugins/logger.py`,这个简单的日志记录器实现了数据包的接受和发送 -## 示例代码 -````python -import threading -from forwarder import recive_queues, send_queue -from mswp import Datapack -# 获取自身对应的接受队列 -recive_queue = recive_queues[__name__] - -def main(): - while True: - dp = recive_queue.get() # 阻塞获取数据包 - if dp.method = 'post': # 打印出数据包的head和body - print('You recive head is', str(dp.head)) - print('You recive body is', dp.body.decode()) - if dp.method = 'post': # 回复数据包示例 - ndp = dp.reply() - ndp.body = 'recived'.encode() - send_queue.put(ndp) # 发送数据包 - else: # 新建数据包示例 - dp = Datapack(head={'from'=__name__}) - dp.head['to'] = 'hmsy' # 设置目标主机名 - send_queue.put(dp) - # 发送文件示例 - dp = Datapack(head={'from'=__name__}) - dp.method = 'file' # 标记该数据包为文件类型 - dp.head['to'] = 'hsmy' - dp.head['filename'] = 'res/file.txt' # 设置数据包携带的文件 - send_queue.put(dp) - - -# 必须以多线程方式启动主函数,必须设置daemon让主线程退出后,子线程也能退出 -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() -```` -原则上不建议直接import其他plugin,建议通过发送Datapack来与其他插件交互 diff --git a/addrlist_example.txt b/addrlist_example.txt deleted file mode 100644 index d28e93c..0000000 --- a/addrlist_example.txt +++ /dev/null @@ -1 +0,0 @@ -127.0.0.1:3900 \ No newline at end of file diff --git a/config.json b/config.json deleted file mode 100644 index 38fc8f6..0000000 --- a/config.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "id": "miku", - "listen_port": 3939, - "listen_ip": "0.0.0.0", - "listen_num": 39, - "buffsize": 4096, - "proxy": false, - "onlyproxy": false -} \ No newline at end of file diff --git a/config.py b/config.py deleted file mode 100644 index 70cd0bf..0000000 --- a/config.py +++ /dev/null @@ -1,86 +0,0 @@ -import threading -import json -import time -import queue -import os - - -class Print_controller: - def __init__(self): - self.padding_queue = queue.Queue() - self.original_print = print - - self.thread = threading.Thread(target=self.start_printing, args=(), daemon=True) - self.thread.start() - - def start_printing(self): - while True: - word = self.padding_queue.get() - print(word) - - def print_function(self, word, dp=None): - if dp: - word = '<%s> %s' % (dp.head.get('flag'), word) - self.padding_queue.put(word) - - -class Jsondata: - def __init__(self, auto_save=False, auto_save_time=10): - with open('config.json', 'r') as f: - raw_data = f.read() - jsondata = json.loads(raw_data) - self.raw_jsondata = jsondata - self.auto_save = auto_save - self.auto_save_time = auto_save_time - self.thread = threading.Thread(target=self.run, args=(), daemon=True) - self.thread.start() - - def try_to_read_jsondata(self, key, or_value, template=0, output=True): - if key in self.raw_jsondata.keys(): - return self.raw_jsondata[key] - else: - if output: - print('Error: could not find key value in file "config.json"\n' - 'Please set the key "%s" in file "config.json"\n' - 'Or MSW will set it as %s' % (key, or_value)) - return or_value - - def get(self, key): - return self.raw_jsondata.get(key) - - def set(self, key, value): - self.raw_jsondata[key] = value - - def run(self): - while True: - time.sleep(self.auto_save_time) - if self.auto_save: - pass - - -def create_floder(path): - pathlist = list(os.path.split(path)) - pathlist.pop() - flordpath = '/'.join(pathlist) - - if not os.path.exists(flordpath): - _create_floder(flordpath) - -def _create_floder(path): - if not path: - return - pathlist = list(os.path.split(path)) - pathlist.pop() - flordpath = '/'.join(pathlist) - - if not os.path.exists(flordpath): - _create_floder(flordpath) - if not os.path.exists(path): - os.mkdir(path) - -global_config = {} -msw_queue = queue.Queue() -jsondata = Jsondata() - -print_controller = Print_controller() -dprint = print_controller.print_function diff --git a/forwarder.py b/forwarder.py deleted file mode 100644 index 0dd57ba..0000000 --- a/forwarder.py +++ /dev/null @@ -1,62 +0,0 @@ -import queue -import threading -import copy -from config import global_config - - -send_queue = queue.Queue() -receive_queues = {} - -for name in global_config['plugins_realname_list']: - name = 'plugins.' + name - receive_queues[name] = queue.Queue() - - -def add_plugins_string(indata): - outdata = 'plugins.' + indata - return outdata - - -def send_queue_function(): - global send_queue, receive_queues - while True: - dp = send_queue.get() - dp.encode() - if dp.app == 'all': - for q in receive_queues: - receive_queues[q].put(dp) - elif '&' in dp.app: - applist = dp.app.split('&') - dp_list = [] - for i in range(len(applist)): # split dp - new_dp = copy.copy(dp) - new_dp.app = applist[i] - dp_list.append(new_dp) - for new_dp in dp_list: - object_app, new_dp = process_reforware(new_dp) - receive_queues[add_plugins_string(object_app)].put(new_dp) - elif 'to' in dp.head: # send to net if "to" avaliable - put('net', dp) - else: - object_app, dp = process_reforware(dp) - put(object_app, dp) - -def put(appname, dp): - realappname = add_plugins_string(appname) - if not receive_queues.get(realappname): - print('KeyError, Could not find queue %s' % realappname) - else: - receive_queues[realappname].put(dp) - - -def process_reforware(dp): - if '&' in dp.app: - first_forward, next_forward = dp.app.split('&') - dp.app = next_forward - return first_forward, dp - else: - return dp.app, dp - - -thread = threading.Thread(target=send_queue_function, args=(), daemon=True) -thread.start() diff --git a/main.py b/main.py deleted file mode 100644 index 410e0da..0000000 --- a/main.py +++ /dev/null @@ -1,10 +0,0 @@ -import os -import os.path -import sys - -python = sys.executable - -while True: - code = os.system(python + ' msw.py') - if code: - break \ No newline at end of file diff --git a/msw.py b/msw.py deleted file mode 100644 index 5ea1c5f..0000000 --- a/msw.py +++ /dev/null @@ -1,41 +0,0 @@ -import sys -import os -PATH = os.path.dirname(os.path.abspath(__file__)) -sys.path.append(PATH) -os.chdir(PATH) - -import threading -from mswp import Datapack -from config import jsondata, global_config, msw_queue - - -print('Building plugins import script...') -plugins_list = os.listdir('plugins') -plugins_should_be_remove_list = [] -for name in plugins_list: - if '__' in name: - plugins_should_be_remove_list.append(name) -for name in plugins_should_be_remove_list: - plugins_list.remove(name) -plugins_import_script = '' -plugins_realname_list = [] -for name in plugins_list: - if len(name) >= 3: - name = name[:-3] - plugins_import_script += 'import plugins.%s\n' % name - plugins_realname_list.append(name) -with open('plugins/__init__.py', 'w') as f: - f.write(plugins_import_script) -print('%s plugins will be import' % (len(plugins_realname_list))) -print('Plugins list: %s' % str(plugins_realname_list)) - - -global_config['plugins_realname_list'] = plugins_realname_list - -import plugins -print('Plugins import finished') - - -# restart -code = msw_queue.get() -sys.exit(code) diff --git a/mswp.py b/mswp.py deleted file mode 100644 index 75811d6..0000000 --- a/mswp.py +++ /dev/null @@ -1,112 +0,0 @@ -import os -import random -import hashlib -import copy -from config import jsondata - - -''' -Avaliable method are - post: used to send data, no needs to reply (deafult) - get: used to send data, but needs to reply - reply: used to reply "get" method -A datapack must like: ---------------------- -post log msw/1.0 -id: miku [auto] -flag: 1a2b3c4d [auto] -length: 0 [auto] -from: appname -to: [if has (net id)] -filename: [if has] - -[data content here -if has -support many lines...] ---------------------- -''' - -BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) -ID = jsondata.try_to_read_jsondata('id', 'unknown_id') -class Datapack: - def __init__(self, method='post', app='all', version='msw/0.1', head=None, body=b'', - file=None, gen_flag=True, delete=False): - self.id = ID - if head is None: - head = {} - self.head = head - self.head['id'] = self.id - self.method = method - self.file = file - self.delete = delete - self.failed_times = 0 - self.app = app - self.version = version - self.body = body - self.encode_data = b'' - if self.head.get('from'): - self.head['from'] = process_plugins_name(self.head['from']) - else: - self.head['from'] = 'unkonwn_app' - if gen_flag: - randseed = str(random.random()).encode() - h = hashlib.sha1() - h.update(randseed) - self.head['flag'] = h.hexdigest()[:8] - - def encode(self): - if self.method == 'file': - self.body = b'' - self.head['length'] = str(os.path.getsize(self.head['filename'])) - else: - self.head['length'] = str(len(self.body)) - - first_line = self.method.encode() + b' ' + self.app.encode() + b' ' + self.version.encode() - heads = ''.encode() - needed_to_del = [] - for i in self.head: # del the empty head - if not self.head[i]: - needed_to_del.append(i) - for i in needed_to_del: - del(self.head[i]) - for i in self.head: - heads += i.encode() + b': ' + self.head[i].encode() + b'\n' - self.encode_data = first_line + b'\n' + heads + b'\n' + self.body - - def decode(self, only_head=False): - index = self.encode_data.index(b'\n\n') - upper = self.encode_data[:index] - if not only_head: - self.body = self.encode_data[index+2:] - else: - self.body = b'' - upper = upper.decode() - heads = upper.split('\n') - first_line = heads.pop(0) - self.method, self.app, self.version = first_line.split(' ') - for line in heads: - i, ii = line.split(': ') - self.head[i] = ii - if only_head: - return self.encode_data[index+2:] - else: - return None - - - def reply(self): - ndp = copy.deepcopy(self) - ndp.app = ndp.head['from'] - ndp.method = 'reply' - ndp.delete = False - if not self.head['id'] == ID: # net package - ndp.head['to'] = self.head['id'] - ndp.head['id'] = ID - return ndp - - -def process_plugins_name(name): - if 'plugins.' in name: - name = name.replace('plugins.', '') - return name - else: - return name diff --git a/plugins/ffmpeg.py b/plugins/ffmpeg.py deleted file mode 100644 index f1e26a1..0000000 --- a/plugins/ffmpeg.py +++ /dev/null @@ -1,315 +0,0 @@ -import threading -import copy -import os -import queue -import time -import subprocess -from mswp import Datapack -from forwarder import receive_queues, send_queue -from config import msw_queue, _create_floder -from config import dprint as print -receive_queue = receive_queues[__name__] - - -''' -Usage: -ffmpeg: start;filename:res/test.mp4 -start using one file test.mp4 - -ffmpeg: autostart -auto start one file from res/ffmpeg_task - -ffmpeg: start;filename:res/output.mp4,concat:false -using tem file, output should be res/output_convert -ffmpeg: stop - -ffmpeg: enable;server:miku -ffmpeg: disable - -''' - - -def main(): - while True: - ffmpeg_controller = Ffmpeg_controller() - ffmpeg_controller.mainloop() - - -class Ffmpeg_controller: - def __init__(self): - self.ffmpeg_type = None - self.status = 0 - self.server = None - self.convert_task_queue = queue.Queue() - self.org_filename = None - self.object_filename = None - self.concat = True - self.pause = False - self.autostart = False - self.tasklist = [] - - self.convert_task_thread = threading.Thread(target=self.convert_task_func, args=()) - self.convert_task_thread.start() - - self.mainloop() - - - def mainloop(self): - _create_floder('res/ffmpeg_tmp') - _create_floder('res/ffmpeg_finished') - _create_floder('res/ffmpeg_task') - _create_floder('res/ffmpeg_old') - _create_floder('res/ffmpeg_complet') - - while True: - dp = receive_queue.get() - - if dp.method == 'post' and dp.body == b'concat': - self.org_filename = dp.head['filename'] - self.object_filename = self.org_filename[:-4] + '.mkv' - self.concat_func() - - if dp.method == 'post' and dp.body == b'autostart': - filelist = os.listdir('res/ffmpeg_task') - self.tasklist = [] - for file in filelist: - if len(file) > 3: - ext = file[-4:] - if ext in ['.mp4', '.MP4', '.mkv', '.MKV']: - self.tasklist.append('res/ffmpeg_task/' + file) - dp = Datapack() - dp.app = 'ffmpeg' - dp.body = b'start' - dp.head['filename'] = self.tasklist.pop(0) - self.autostart = dp.head['filename'] - send_queue.put(dp) - - if dp.method == 'post' and dp.body == b'start': # config ffmpeg is server or client - if dp.head.get('concat'): - if dp.head['concat'] == 'true': - self.concat = True - elif dp.head['concat'] == 'false': - self.concat = False - else: - print('unknown concat value') - continue - else: - self.concat = True - - if self.concat: - self.org_filename = dp.head['filename'] - self.object_filename = 'res/ffmpeg_complet/' + os.path.basename(self.org_filename)[:-4] + '.mkv' - - if self.concat: - ndp = dp.reply() - ndp.body = 'Spliting file %s' % dp.head['filename'] - ndp.body = ndp.body.encode() - send_queue.put(ndp) - - cmd = 'ffmpeg -i "' + os.path.normpath(dp.head['filename']) + '" -c copy \ - -f segment -segment_time 20 -reset_timestamps 1 -y \ - "res/ffmpeg_tmp/' + '%d' + '.mkv"' - - os.system(cmd) - - self.run_as_server() - - if self.concat: - self.concat_func() - - print('All process finished') - - elif dp.method == 'post' and dp.body == b'enable': # clinet mode - self.status = 1 - self.server = dp.head['server'] - self.convert_func() - - elif dp.method == 'post' and dp.body == b'status': - result = 'ffmpeg not working' - ndp = dp.reply() - ndp.body = result.encode() - - send_queue.put(ndp) - - elif dp.method == 'get': # let other client disable - ndp = dp.reply() - ndp.method = 'post' - ndp.body = b'disable' - print('let %s disabled' % dp.head['id']) - - send_queue.put(ndp) - - - def concat_func(self): - # concat all file - _filelist = os.listdir('res/ffmpeg_finished') - if 'filelist.txt' in _filelist: - _filelist.remove('filelist.txt') - - # correct order - filelist = [] - for filenum in range(len(_filelist)): - filelist.append(str(filenum) + '.mkv') - - with open('res/ffmpeg_finished/filelist.txt', 'w') as f: - for file in filelist: - f.write('file \'%s\'\n' % file) - - os.system('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \ - -c copy -y "' + os.path.normpath(self.object_filename) + '"') - - for file in filelist: - os.remove('res/ffmpeg_finished/' + file) - pass - os.remove('res/ffmpeg_finished/filelist.txt') - - if self.autostart: - os.rename(self.autostart, self.autostart.replace('ffmpeg_task', 'ffmpeg_old')) - self.autostart = None - - - def run_as_server(self): - _padding_to_convert = os.listdir('res/ffmpeg_tmp') - padding_to_convert = [] - for file in _padding_to_convert: - file = 'res/ffmpeg_tmp/' + file - padding_to_convert.append(file) - already_in_convert = [] - finished_convert = [] # outputfilename - - while True: - dp = receive_queue.get() - - if dp.method == 'post' and dp.body == b'status': - result = '' - result += 'padding_to_convert ' + str(padding_to_convert) + '\n' - result += 'already_in_convert ' + str(already_in_convert) + '\n' - result += 'finished_convert ' + str(finished_convert) + '\n' - result += 'convert_task_queue size ' + str(self.convert_task_queue.qsize()) - ndp = dp.reply() - ndp.body = result.encode() - send_queue.put(ndp) - - elif dp.method == 'post' and dp.body == b'reset': - padding_to_convert = already_in_convert - already_in_convert = [] - - elif dp.method == 'post' and dp.body == b'stop': - break - - elif dp.method == 'post' and dp.body == b'pause': - self.pause = True - - elif dp.method == 'post' and dp.body == b'continue': - self.pause = False - - elif dp.method == 'get': - if self.pause: - ndp = dp.reply() - ndp.method = 'post' - ndp.body = b'disable' - send_queue.put(ndp) - continue - if padding_to_convert: - filename = padding_to_convert.pop(0) - already_in_convert.append(filename) - - print('%s get %s to convert' % (dp.head['id'], filename), dp) - - ndp = dp.reply() - ndp.method = 'file' - ndp.head['filename'] = filename - - send_queue.put(ndp) - - else: - if not already_in_convert: # finished - break - else: # waiting for final convert - ndp = dp.reply() - ndp.method = 'post' - ndp.body = b'disable' - send_queue.put(ndp) - - elif dp.method == 'file': - old_filename = dp.head['old_filename'] - filename = dp.head['filename'] - - os.remove(old_filename) - already_in_convert.remove(old_filename) - finished_convert.append(filename) - - total = len(padding_to_convert) + len(already_in_convert) + len(finished_convert) - print('Processing...(%d) %d/%d %s' % \ - (len(already_in_convert), \ - len(finished_convert), \ - total, \ - str(round(len(finished_convert)/total*100, 2)))) - - if not padding_to_convert and not already_in_convert: # final process - break - - print('Mapreduce finished') - - - - - def convert_func(self): # run as client - for _ in range(2): - self.send_request() - - while self.status or not self.convert_task_queue.empty(): - dp = receive_queue.get() - - if dp.method == 'post' and dp.body == b'disable': - self.status = 0 - - elif dp.method == 'post' and dp.body == b'status': - result = 'Working as client, queue size: %s' % str(self.convert_task_queue.qsize()) - - ndp = dp.reply() - ndp.body = result.encode() - send_queue.put(ndp) - - elif dp.method == 'file': - self.convert_task_queue.put(dp) - - - def convert_task_func(self): - while True: - dp = self.convert_task_queue.get() - - filename = dp.head['filename'] - output_filename = filename[:-4] + '.mkv' - output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished') - os.system('ffmpeg -i "' + os.path.normpath(filename) + '" -c:a libopus -ab 64k \ - -c:v libx265 -s 1280x720 -y "' + os.path.normpath(output_filename) + '"') - - os.remove(filename) - - ndp = dp.reply() - ndp.head['filename'] = output_filename - ndp.head['old_filename'] = filename - ndp.method = 'file' - ndp.delete = True - send_queue.put(ndp) - - self.send_request() - - - def send_request(self): - if self.status: - dp = Datapack(head={'from': __name__}) - dp.method = 'get' - dp.app = 'ffmpeg' - dp.head['to'] = self.server - - send_queue.put(dp) - - -def get_one_from_dict(d): - for key in d: - return key, d[key] - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() diff --git a/plugins/input.py b/plugins/input.py deleted file mode 100644 index a4c253d..0000000 --- a/plugins/input.py +++ /dev/null @@ -1,99 +0,0 @@ -import threading -import copy -import os -from mswp import Datapack -from forwarder import receive_queues, send_queue -from config import msw_queue -from config import dprint as print -receive_queue = receive_queues[__name__] - - -def main(): - while True: - try: - _main() - except Exception as e: - print('Error in %s, %s: %s' % (__name__, type(e), str(e))) - - -def print_reply_func(): - while True: - dp = receive_queue.get() - dp.encode() - print(dp.encode_data.decode()) - - -def _main(): - last = '' - file_flag = False - while True: - file_flag = False - raw_data = input() - - if raw_data == 'restart': - msw_queue.put(0) - break - if raw_data == 'exit': - msw_queue.put(1) - break - if raw_data == 'update': - raw_data = 'update:compress;update_to:*' - if raw_data == '1': - raw_data = 'ffmpeg:autostart' - if raw_data == '2': - raw_data = 'ffmpeg:enable;to:*,server:miku' - if raw_data == 'r': - raw_data = last - - last = raw_data - - if raw_data[:6] == '(file)': # like "(file)log: filename.exe" - raw_data = raw_data[6:] - file_flag = True - - first_index, last_index = find_index(raw_data) - app = raw_data[:first_index] - body = raw_data[last_index:] - - ihead = {} - if ';' in body and ':' in body: - ihead_index = body.index(';') - ihead_str = body[ihead_index+1:] - body = body[:ihead_index] - - ihead_list = ihead_str.split(',') - for key_value in ihead_list: - key, value = key_value.split(':') - ihead[key] = value - - app = app.replace(' ', '') - dp = Datapack(head={'from': __name__}) - - dp.head.update(ihead) - - dp.app = app - - if file_flag: - dp.method = 'file' - dp.body = b'' - dp.head['filename'] = body - - else: - dp.body = body.encode() - - send_queue.put(dp) - print('Command has been sent', dp) - - -def find_index(raw_data): - first_index = raw_data.index(':') - last_index = first_index + 1 - while raw_data[last_index] == ' ': - last_index += 1 - return first_index, last_index - - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() -thread_print_reply_func = threading.Thread(target=print_reply_func, args=(), daemon=True) -thread_print_reply_func.start() diff --git a/plugins/log.py b/plugins/log.py deleted file mode 100644 index e98e75d..0000000 --- a/plugins/log.py +++ /dev/null @@ -1,26 +0,0 @@ -import threading -from mswp import Datapack -from forwarder import receive_queues -from config import dprint as print -receive_queue = receive_queues[__name__] - -def main(): - while True: - dp = receive_queue.get() - - if dp.method == 'file': - word = dp.head.get('filename') - else: - word = dp.body.decode() - - print('Writedown log: %s' % (word), dp) - with open('logger.log', 'a') as f: - if dp.head.get('from'): - from_app_name = dp.head.get('from') - else: - from_app_name = 'Unknown' - f.write(from_app_name + ': ' + dp.body.decode() + '\n') - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() - diff --git a/plugins/net.py b/plugins/net.py deleted file mode 100644 index 7a2d2e2..0000000 --- a/plugins/net.py +++ /dev/null @@ -1,563 +0,0 @@ -import threading -import socket -import copy -import queue -import json -import os -import random -import time -from mswp import Datapack -from forwarder import receive_queues, send_queue -from config import jsondata, create_floder -from config import dprint as print -receive_queue = receive_queues[__name__] - -BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) -ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID') -RETRYSLEEP = 3.9 -MYPROXY = jsondata.try_to_read_jsondata('proxy', False) -ONLYPROXY = jsondata.try_to_read_jsondata('onlyproxy', False) - - -def main(): - network_controller = Network_controller() - network_controller.i_did_something() - - -class Network_controller: # manage id and connection - def __init__(self): - if ONLYPROXY and not MYPROXY: - print('config failed because you set onlyproxy true but proxy false') - return - self.send_queue = queue.Queue() - self.id_dict = {} - self.lock = threading.Lock() - self.all_connection_list = [] - self.wheel_queue = queue.Queue() - - self.netlist = [] # store nagetive connection - self.netlist_pass = [] - self.conflist = [] # store config connection - self.conflist_pass = [] - self.mhtlist = [] # store exchanged connection - self.mhtlist_pass = [] - self.proxydict = {} - - self.alllist = [self.netlist, self.netlist_pass, self.conflist, self.conflist_pass, \ - self.mhtlist, self.mhtlist_pass] - - self.start_wheel_thread = threading.Thread(target=self.start_wheel, args=(), daemon=True) - self.start_wheel_thread.start() - - self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=(), daemon=True) - self.start_accpet_connection_thread.start() - - self.start_sending_dp_thread = threading.Thread(target=self.start_sending_dp, args=(), daemon=True) - self.start_sending_dp_thread.start() - - self.start_positive_connecting_thread = threading.Thread(target=self.start_positive_connecting, args=(), daemon=True) - self.start_positive_connecting_thread.start() - - self.start_mht_thread = threading.Thread(target=self.start_mht, args=(), daemon=True) - self.start_mht_thread.start() - - - def start_mht(self): - while True: - dp = Datapack(head={'from': __name__}) - dp.head['to'] = '*' - dp.app = 'net' - dp.method = 'get' - dp.body = b'mht' - - #print('Send mht request', dp) - - send_queue.put(dp) - - time.sleep(10) - - - def start_positive_connecting(self): - self.read_addrlist() - - while True: - for addr in self.conflist: - self.try_to_connect(addr, conntype='normal') - - time.sleep(3.9) - - for addr in self.mhtlist: - self.try_to_connect(addr, conntype='mht') - - time.sleep(3.9) - - def try_to_connect(self, addr, conntype='normal'): - thread = threading.Thread(target=self._try_to_connect, args=(addr, conntype)) - thread.start() - - - def _try_to_connect(self, addr, conntype='normal'): - conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - conn.connect(addr) - except Exception as e: - #print('Connect to %s failed, %s: %s' % (str(addr), type(e), str(e))) - del(e) - return - - connection = Connection(conn, addr, self, positive=True, conntype=conntype) - connection.i_did_something() - - - def read_addrlist(self): - if not os.path.exists('addrlist.txt'): - print('addrlist.txt not exists, config that base on addrlist_sample.txt') - else: - with open('addrlist.txt', 'r') as f: - raw_data = f.read() - raw_data = raw_data.replace('\r', '') - lines = raw_data.split('\n') - while '' in lines: - lines.remove('') - for line in lines: - ip, port = line.split(':') - ip = socket.gethostbyname(ip) - port = int(port) - - self.conflist.append((ip, port)) - - - if jsondata.try_to_read_jsondata('proxy', False): - self.proxydict[ID] = jsondata.raw_jsondata['proxy'] - - - def i_did_something(self): # go f**k your yeallow line - pass - - - def process_command(self, dp): - if dp.body == b'status': - result = '' - result += 'Online %s' % str(self.id_dict) + '\n' - result += 'proxydict %s' % str(self.proxydict) + '\n' - result += 'conflist %s' % str(self.conflist) + '\n' - result += 'conflist_pass %s' % str(self.conflist_pass) + '\n' - result += 'netlist %s' % str(self.netlist) + '\n' - result += 'netlist_pass %s' % str(self.netlist_pass) + '\n' - result += 'mhtlist %s' % str(self.mhtlist) + '\n' - result += 'mhtlist_pass %s' % str(self.mhtlist_pass) - - ndp = dp.reply() - ndp.body = result.encode() - send_queue.put(ndp) - - elif dp.body == b'mht' and dp.method == 'get': - ndp = dp.reply() - - data_dict = {} - connection_list = [] - with self.lock: - for id in self.id_dict: - connections = self.id_dict[id] - for connection in connections: - ip, port = connection.conn.getpeername() - port = int(connection.listen_port) - connection_list.append((ip, port)) - for addr in self.conflist: - if not addr in connection_list: - connection_list.append(addr) - for addr in self.conflist_pass: - if not addr in connection_list: - connection_list.append(addr) - data_dict['mht'] = connection_list - data_dict['proxy'] = self.proxydict - - ndp.body = json.dumps(data_dict).encode() - - send_queue.put(ndp) - - elif dp.method == 'reply': - mhtstr = dp.body.decode() - data_dict = json.loads(mhtstr) - mhtlist = data_dict['mht'] - with self.lock: - for addr in mhtlist: - addr = (addr[0], addr[1]) - if not self.check_in_list(addr): - self.mhtlist.append(addr) - - self.proxydict.update(data_dict['proxy']) - - else: - print('Received unknown command', dp) - - - def check_in_list(self, addr): - for l in self.alllist: - if addr in l: - return True - return False - - - def start_sending_dp(self): - while True: - dp = receive_queue.get() - - if dp.app == 'net' and not dp.head.get('to'): - self.process_command(dp) - continue - - if not dp.head.get('to'): - print('You got a no head datapack') - print(str(dp.head)) - continue - - to_str = dp.head['to'] - to_list = to_str.split('&') - to = to_list.pop(0) - to_str = '&'.join(to_list) - dp.head['to'] = to_str - - if to == '*': - with self.lock: - for id in self.id_dict: - connection = self.id_dict[id][0] - connection.sendall(dp) - elif not to: - print('not to', dp) - - elif ONLYPROXY and not to == MYPROXY: - if dp.head['to']: - dp.head['to'] = to + dp.head['to'] - else: - dp.head['to'] = to - self.send_to_id(MYPROXY, dp) - - else: - self.send_to_id(to, dp) - - - def send_to_id(self, to, dp): # send to 1 id, process proxy at the same time - - connections = self.id_dict.get(to) - if not connections: - if to == ID: - print('To id %s is yourself!' % to, dp) # maybe proxy to yourself - return - if to in self.proxydict: # neat warning dangerous code - if not ID == self.proxydict[to]: # check whether proxy is yourself - if dp.head.get('to'): - dp.head['to'] = self.proxydict[to] + '&' + to + '&' + dp.head['to'] - else: - dp.head['to'] = self.proxydict[to] + '&' + to - else: - if dp.head.get('to'): - dp.head['to'] = to + '&' + dp.head['to'] - else: - dp.head['to'] = to - self.wheel_queue.put(dp) - return - - print('To id %s has no connection now %d...' % (to, dp.failed_times), dp) - if dp.head.get('to'): - dp.head['to'] = to + '&' + dp.head['to'] - else: - dp.head['to'] = to - self.wheel_queue.put(dp) - return - - connection = connections[0] - connection.sendall(dp) - - - def start_wheel(self): - while True: - dp = self.wheel_queue.get() - dp.failed_times += 1 - if dp.failed_times > 39: - print('Datapack abandom', dp) - continue - time.sleep(RETRYSLEEP) - receive_queue.put(dp) - - - def start_accpet_connection(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - listen_ip = jsondata.try_to_read_jsondata('listen_ip', '127.0.0.1') - listen_port = jsondata.try_to_read_jsondata('listen_port', 3900) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((listen_ip, listen_port)) - - listen_num = jsondata.try_to_read_jsondata('listen_num', 39) - s.listen(listen_num) - - print('Sucessfully listen at %s:%s, max connection:%s' % (listen_ip, listen_port, listen_num)) - - while True: - conn, addr = s.accept() - connection = Connection(conn, addr, self) - connection.i_did_something() - - - def set_connection(self, connection): - id = connection.id - with self.lock: - if not self.id_dict.get(id): - self.id_dict[id] = [] - self.id_dict[id].append(connection) - self.all_connection_list.append(connection) - - xxxlist, xxxlist_pass = self.getlist(connection.conntype) - addr = (connection.addr[0], connection.listen_port) - if addr in xxxlist: - xxxlist.remove(addr) - if not addr in xxxlist_pass: - xxxlist_pass.append(addr) - - print('<%s> %s connected' % (connection.flag, id)) - - - def del_connection(self, connection): - id = connection.id - with self.lock: - self.id_dict[id].remove(connection) - self.all_connection_list.remove(connection) - if id in self.id_dict and not self.id_dict.get(id): # del the empty user - del(self.id_dict[id]) - - if connection.listen_port: # avoid "None" addr port - xxxlist, xxxlist_pass = self.getlist(connection.conntype) - addr = (connection.addr[0], connection.listen_port) - if not addr in xxxlist: - xxxlist.append(addr) - if addr in xxxlist_pass: - xxxlist_pass.remove(addr) - - print('<%s> %s disconnected' % (connection.flag, id)) - - - def getlist(self, conntype): - if conntype == 'net': - return self.netlist, self.netlist_pass - elif conntype == 'conf': - return self.conflist, self.conflist_pass - elif conntype == 'mht': - return self.mhtlist, self.mhtlist_pass - else: - print('Could not find conntype %s' % conntype) - return None, None - - -class Connection: - def __init__(self, conn, addr, netowrk_controller, positive=False, conntype='normal'): - self.conn = conn - self.addr = addr - self.netowrk_controller = netowrk_controller - self.id = None - self.flag = None - self.f = None - self.buff = b'' - self.padding_queue = queue.Queue() - self.thread_send = None - self.positive = positive - self.listen_port = addr[1] - - self.conntype = conntype - if self.conntype == 'normal': - if self.positive == True: - self.conntype = 'conf' - else: - self.conntype = 'net' - # type list - # normal(positive=True:conf, positive=False:net), mht, proxy - - self.thread_recv = threading.Thread(target=self._init, args=(), daemon=True) - self.thread_recv.start() - - - def _init(self): # init to check connection id, threading - err_code, self.flag = self.check_id() - if err_code: - #print('<%s> Init connection failed, connection closed, code: %s' % (flag, err_code)) - self.conn.close() - return - - self.netowrk_controller.set_connection(self) - - self.thread_send = threading.Thread(target=self.send_func, args=(), daemon=True) - self.thread_send.start() - - self.receive() - - - def receive(self): - still_need = 0 - - while True: - try: - data = self.conn.recv(BUFFSIZE) - except ConnectionResetError: - break - except Exception as e: - print('Connection recv error %s: %s' % (type(e), str(e))) - break - if not data: - break - self.buff += data - - if not still_need: - dp = Datapack() - dp.encode_data = self.buff - try: - self.buff = dp.decode(only_head=True) - - if dp.method == 'file': - create_floder(dp.head['filename']) - create_floder('tmp/' + dp.head['filename']) - self.f = open('tmp/' + dp.head['filename'], 'ab') - if dp.method == 'file' and os.path.exists(dp.head['filename']): - os.remove(dp.head['filename']) - - except Exception as e: - print('Decode head failed %s: %s' % (type(e), str(e))) - print(self.buff) - break - - length = int(dp.head.get('length')) - still_need = length - - if still_need > len(self.buff): - # writing tmp data - if dp.method == 'file': - still_need -= self.f.write(self.buff) - else: - dp.body += self.buff - still_need -= len(self.buff) - self.buff = b'' # empty buff because all tmp data has been write - - else: # download complete setuation - if dp.method == 'file': - self.f.write(self.buff[:still_need]) - self.f.close() - self.f = None - else: - dp.body = self.buff[:still_need] - self.buff = self.buff[still_need:] - still_need = 0 - - # bleow code are using to process datapack - if dp.method == 'file': - os.rename('tmp/' + dp.head['filename'], dp.head['filename']) - print('Received file %s from %s' % (dp.head['filename'], self.id), dp) - send_queue.put(dp) - - - # below code are using to closed connection - if self.f: - self.f.close() - self.f = None - self.conn.close() - self.netowrk_controller.del_connection(self) - - - def check_id(self): - ''' - check id package must like - ------------------------------- - post handshake msw/0.1 - id: [yourID] - listen_port: [3900] - length: 0 - - ------------------------------- - error code list: - 1: not get "id" in head - 2: receive data failed - 3: appname is not handshake - 4: id is yourself - ''' - data = None - if self.positive: - self.send_id() - try: - data = self.conn.recv(BUFFSIZE) - except ConnectionResetError: - print('One connection failed before ID check') - - if not data: - return 2, '' - - self.buff += data - dp = Datapack() - dp.encode_data = self.buff # maybe here needs to use copy.copy(self.buff) - self.buff = dp.decode(only_head=True) - if not dp.head.get('id'): - return 1, dp.head.get('flag') - - if not dp.app == 'handshake': - return 3, dp.head.get('flag') - - self.id = dp.head['id'] - self.listen_port = int(dp.head.get('listen_port')) - - if self.id == ID: - #print('you connect to your self') - return 4, dp.head.get('flag') - - if ONLYPROXY and not self.id == MYPROXY: # refuce not proxy connection - return 5, dp.head.get('flag') - - if dp.head.get('onlyuseproxy'): - if not dp.head['onlyuseproxy'] == ID: - return 6, dp.head.get('flag') - - if not self.positive: - self.send_id() - - return 0, dp.head.get('flag') - - - def send_id(self): - dp = Datapack(head={'from': __name__}) - dp.app = 'handshake' - if ONLYPROXY: - dp.head['onlyuseproxy'] = MYPROXY - dp.head['listen_port'] = str(jsondata.try_to_read_jsondata('listen_port', 3900)) - dp.encode() - self.conn.sendall(dp.encode_data) - - - def sendall(self, dp): - self.padding_queue.put(dp) - - - def send_func(self): - while True: - dp = self.padding_queue.get() - dp.encode() - self.conn.sendall(dp.encode_data) - if dp.method == 'file': - with open(dp.head['filename'], 'rb') as f: - for data in f: - try: - self.conn.sendall(data) - except Exception as e: - print('Failed to send file %s %s: %s' % (dp.head['filename'], type(e), str(e)), dp) - if dp.head.get('to'): - dp.head['to'] = self.id + '&' + dp.head['to'] - else: - dp.head['to'] = self.id - self.netowrk_controller.wheel_queue.put(dp) - break - if dp.delete: - os.remove(dp.head['filename']) - print('Send file %s to %s finished' % (dp.head['filename'], self.id), dp) - - - def i_did_something(self): - pass - - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() diff --git a/plugins/shell.py b/plugins/shell.py deleted file mode 100644 index 531c10e..0000000 --- a/plugins/shell.py +++ /dev/null @@ -1,35 +0,0 @@ -import threading -import copy -import os -import subprocess -from mswp import Datapack -from forwarder import receive_queues, send_queue -from config import msw_queue -from config import dprint as print -receive_queue = receive_queues[__name__] - - -def main(): - while True: - dp = receive_queue.get() - command = dp.body.decode() - try: - result = subprocess.check_output(command, shell=True) - except Exception as e: - result = 'Command %s error, %s: %s' % (command, type(e), str(e)) - result = result.encode() - - ndp = dp.reply() - ndp.body = try_decode_and_encode(result) - send_queue.put(ndp) - - -def try_decode_and_encode(data): - try: - return data.decode('gb2312').encode() - except: - return data.decode('utf-8').encode() - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() - diff --git a/plugins/update.py b/plugins/update.py deleted file mode 100644 index f08d201..0000000 --- a/plugins/update.py +++ /dev/null @@ -1,83 +0,0 @@ -import threading -import tarfile -import os -from mswp import Datapack -from forwarder import receive_queues, send_queue -from config import msw_queue -receive_queue = receive_queues[__name__] - - -remove_file_list = ['__init__.py', 'addrlist.txt', 'config.json', 'logger.log', 'update.tar.xz'] -remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp', 'res'] - - -def main(): - while True: - dp = receive_queue.get() - - if dp.method == 'post': - if dp.body == b'compress': - # compressing file - print('Starting update') - compress = Compresser() - compress.start_compress() - print('Compress finished') - - # getting to destination - to = dp.head.get('update_to') - if not to: - print('unable to locate update_to') - continue - - # sending file - ndp = Datapack(head={'from':__name__}) - ndp.method = 'file' - ndp.app = 'update' - ndp.head['filename'] = 'res/update.tar.xz' - ndp.head['to'] = to - - send_queue.put(ndp) - - - elif dp.method == 'file': - print('Starting update local file') - with tarfile.open(dp.head['filename'], 'r:xz') as f: - f.extractall() - #os.remove(dp.head['filename']) - - # restart msw program - msw_queue.put(0) - - - -class Compresser: - def __init__(self): - self.filelist = [] - - def start_compress(self): - self.filelist = self.get_filelist() - self.compress_files(self.filelist) - - def compress_files(self, filelist): - with tarfile.open('res/update.tar.xz', 'w:xz') as f: - for name in filelist: - f.add(name) - - def get_filelist(self): - filelist = [] - for root, dirs, files in os.walk('.'): - for name in remove_file_list: - if name in files: - files.remove(name) - for name in remove_dir_list: - if name in dirs: - dirs.remove(name) - for name in files: - filelist.append(os.path.join(root, name)) - for name in dirs: - pass - return filelist - - -thread = threading.Thread(target=main, args=(), daemon=True) -thread.start() diff --git a/resources/testfile.txt b/resources/testfile.txt deleted file mode 100644 index e99f498..0000000 --- a/resources/testfile.txt +++ /dev/null @@ -1 +0,0 @@ -I am here~!!! \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index d35b671..0000000 --- a/test.py +++ /dev/null @@ -1,80 +0,0 @@ -import time -import threading -import socket -import queue -import sys -send_queue = queue.Queue() - -def recv(): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(('127.0.0.1', 3900)) - s.listen(39) - while True: - conn, addr = s.accept() - thread = threading.Thread(target=process_connection, args=(conn, addr), daemon=True) - thread.start() - -def process_connection(conn, addr): - while True: - data = conn.recv(4096) - if not data: - conn.close() - return - check_data.queue.put(data) - time.sleep(1) - -class Check_data: - def __init__(self): - self.queue = queue.Queue() - self.thread = threading.Thread(target=self.recv, args=(), daemon=True) - self.thread.start() - - def recv(self): - while True: - data = self.queue.get() - s_print(data) - -def send(size, c): - data = c*size - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('127.0.0.1', 3900)) - s_print('start sending %s' % c) - - start_time = time.time() - s.sendall(data) - end_time = time.time() - - s_print('Send %s finished, take %s' % (c, end_time - start_time)) - -def print_queue(): - while True: - word = send_queue.get() - print(word) - -def s_print(data): - send_queue.put(data) - -check_data = Check_data() - -time.sleep(1) - -thread_print = threading.Thread(target=print_queue, args=(), daemon=True) -thread_print.start() - - -thread_recv = threading.Thread(target=recv, args=(), daemon=True) -thread_recv.start() -print('recv thread started') -time.sleep(1) - -thread_send_1 = threading.Thread(target=send, args=(100000000, b'1'), daemon=True) -thread_send_2 = threading.Thread(target=send, args=(100000000, b'2'), daemon=True) - -thread_send_1.start() -thread_send_2.start() - -input() -sys.exit() - -# 结论,多线程同时对一个socket.sendall()调用,会导致数据混乱 diff --git a/test_file.py b/test_file.py deleted file mode 100644 index 60dbf87..0000000 --- a/test_file.py +++ /dev/null @@ -1,44 +0,0 @@ -import socket -import time - -data = '''post id msw/0.1 -id: miku -length: 0 -from: test_software -flag: 1a2b3c4d - -file log msw/1.0 -from: test -flag: abcdefgh -filename: download.txt -num: 1/1 -length: 5 - -123''' - -data2 = '''4''' - -data3 = '''5''' - -data4 = '''post log msw/1.1 -from: network -flag: 12345678 -num: 1/1 -length: 3 - -abc''' - -data_list = [data,data2,data3] -code_list = [] -for i in data_list: - code_list.append(i.encode()) - -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.connect(('127.0.0.1', 3900)) - -n=0 -for i in code_list: - n+=1 - s.sendall(i) - print('发送%s' % n) - time.sleep(1) \ No newline at end of file diff --git a/test_tool.py b/test_tool.py deleted file mode 100644 index f4c3c13..0000000 --- a/test_tool.py +++ /dev/null @@ -1,21 +0,0 @@ -import socket -import threading - -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - -id = b'''post handshake msw/1.0 -id: miku2 -length: 0 -flag: ee20aeff - -''' - -s.connect(('127.0.0.1',3900)) -s.sendall(id) - -print(s.recv(4096).decode(), end='') -print(s.recv(4096).decode()) - -input('finished...') - - diff --git a/tmp/placeholder b/tmp/placeholder deleted file mode 100644 index e69de29..0000000