delete all

This commit is contained in:
2024-04-28 15:13:22 +08:00
parent 5f2b5ecbd5
commit 3afa0d81bb
19 changed files with 0 additions and 1718 deletions

130
README.md
View File

@@ -1,130 +0,0 @@
# MSW
简单网络框架
README暂时写中文英文水平差容易产生歧义orz...
# 介绍
- 程序会自动载入`plugins`文件夹内所有python脚本换言之所有自定义插件都可以放到plugins目录中
- 程序会为每一个plugin提供一个独立的接收队列`recive_queue`,插件可以通过调用`recive_queue.get()`获取数据
- 程序为所有plugin提供一个共用的发送队列`send_queue`,插件可以通过调用`send_queue.put()`来使用框架发送数据
- “数据”,是指`mswp.py`中定义的`Datapack`类型,这种结构储存了数据包发送目的地、来源、参数等信息
- `forwarder.py`会读取`send_queue`,判断“数据”中的`app`这一项属性将其放到对应plugin的接收队列中起到路由的作用
总的来说
plugin完成某项任务后创建`Datapack`对象并设置好该数据包的发送目的地等参数,将其放入到`send_queue`对应plugin就可以收到该数据包
# 部署
部署很简单,把文件下载下来,写好配置直接运行
## 安装
运行命令克隆仓库
> `git clone https://github.com/heimoshuiyu/msw`
## 配置config.json
配置`config.json`中的`id`,监听地址、端口等信息
单台机器可以运行多个msw只要配置不同的id和监听端口接口
## 配置地址列表
编辑创建`address.txt`,格式请参考`address_example.txt`
## 运行
> `python main.py`
或者
> `python3 main.py`
注意msw仅支持python3
# 数据包Datapack
msw中队列传递的都是一种叫Datapack的结构其结构类似http的请求包`mswp.py`定义)
示例
````
post shell msw/0.1
from: test
id: hmsy
to: miku
flag: 1a2b3c4d
shutdown now
````
- 解释:向名为`miku`的主机的`shell`插件发送`shutdown now`的命令,执行结果返回到`hmsy`主机的`test`插件
- `Datapack.method`有三种可能的值:`post`、`reply`、`file`
- `post`是最普通的,一般数据包都使用该标识
- `reply`标记该数据包是一个回复
- `file`标记该数据包是一个文件,文件由`filename`参数指定,数据包中不会包含文件的内容
- `Datapack.app`表示数据包发送目的地的plugin名字如`shell`,即`plugins/shell.py`该程序会收到此数据包
- 若是找不到对应app则产生一个警告
- `Datapack.version`表示当前程序版本,暂时还没什么用
- `Datapack.head`一个字典,储存参数
- `to`若存在,参数表示这是一个需要网络发送的数据包,数据包会被发送到`to`指定的主机
- `id`表示发送人的id由程序自动配置
- `from`表示发送的插件名,`reply`方法会用到这个参数,建议设置为`__name__`
- `flag`数据包识别码,由程序随机自动生成,调试用,暂无特殊用途
- `filename`,仅在`Datapack.method`为`file`时生效,
- `Datapack.body`数据包主内容
# 已实现功能
- `main.py`
- 这是一个守护程序,会调用`os.system()`来运行`msw.py`主程序主程序若正常退出返回值0则重启主程序
- `forwarder.py`
- 在`send_queue`和`recive_queues`中实现转发功能
- 根据`Datapack.app`选择队列进行转发
- 如果参数中包含`to`这一项,将转发给`plugins/net.py`由net插件进行网络转发
- `plugins/update.py`
- 依赖`plugins/net.py`,可以快读更新其他主机的代码
- 该插件可以将程序目录下的文件过滤并打包,然后发送给其他主机
- 其他主机收到文件会解压并替换程序原有文件然后向msw发送0值尝试重启
- `plugins/input.py`
- 循环调用`input()`,允许用户输入命令
- 命令格式`插件名:数据包内容;键:值,键2:值2`
- 例如`update:compress;update_to:*`表示向`plugins/update`发送内容为`compress`,参数`update_to=*`的数据包。`compress`和`update_to`字段都是plugins/update.py中定义的`compress`表示压缩,update表示解压缩`update_to`表示将升级包发送到这个目录,设置为*表示所有主机,*的功能是在`plugins/net.py`中定义的
- `plugins/net.py`
- 网络插件,读取`address.txt`中的地址并连接和其他主机交换地址列表建立类似dht的网络
- 如果`Datapack`中含有参数`to``forward.py`会无条件将数据包转发到此插件进行网络发送,发送完成后会去掉`to`参数
- 插件收到其他主机发来的数据包,解码后会放进`send_queue`队列中,如果此时还存在`to`参数,会进行多次转发,实现代理功能
- 反向代理:插件根据`to`参数确定主机地址,`to`参数内容为主机id若在连接池中有该id的连接则直接发送否则参考proxy字典决定是否发送给代理否则放入发送失败队列等待重发
- 发送文件:`Datapack.method`为`file`时启动,根据`filename`参数中制定的文件进行发送
- 接受文件:文件下载完成后才会将数据包放入`send_queue`队列中
- 心跳&mht定时交换自身拥有的地址列表、代理列表
- 重试失败次数超过39次数据包将被丢弃
- `plugins/ffmpeg.py`
- 分布式转码
- 调用系统ffmpeg对视频进行切片分发转码收集合并
- 对分发主机启动server模式并指定文件名对转码主机启动worker模式并指定分发主机的id即可开始转码
- `plugins/logger.py`
- 日志记录,会将接收到的数据包写入`log.txt`
# 编写第一个插件
建议参考`plugins/logger.py`,这个简单的日志记录器实现了数据包的接受和发送
## 示例代码
````python
import threading
from forwarder import recive_queues, send_queue
from mswp import Datapack
# 获取自身对应的接受队列
recive_queue = recive_queues[__name__]
def main():
while True:
dp = recive_queue.get() # 阻塞获取数据包
if dp.method = 'post': # 打印出数据包的head和body
print('You recive head is', str(dp.head))
print('You recive body is', dp.body.decode())
if dp.method = 'post': # 回复数据包示例
ndp = dp.reply()
ndp.body = 'recived'.encode()
send_queue.put(ndp) # 发送数据包
else: # 新建数据包示例
dp = Datapack(head={'from'=__name__})
dp.head['to'] = 'hmsy' # 设置目标主机名
send_queue.put(dp)
# 发送文件示例
dp = Datapack(head={'from'=__name__})
dp.method = 'file' # 标记该数据包为文件类型
dp.head['to'] = 'hsmy'
dp.head['filename'] = 'res/file.txt' # 设置数据包携带的文件
send_queue.put(dp)
# 必须以多线程方式启动主函数必须设置daemon让主线程退出后子线程也能退出
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()
````
原则上不建议直接import其他plugin建议通过发送Datapack来与其他插件交互

View File

@@ -1 +0,0 @@
127.0.0.1:3900

View File

@@ -1,9 +0,0 @@
{
"id": "miku",
"listen_port": 3939,
"listen_ip": "0.0.0.0",
"listen_num": 39,
"buffsize": 4096,
"proxy": false,
"onlyproxy": false
}

View File

@@ -1,86 +0,0 @@
import threading
import json
import time
import queue
import os
class Print_controller:
def __init__(self):
self.padding_queue = queue.Queue()
self.original_print = print
self.thread = threading.Thread(target=self.start_printing, args=(), daemon=True)
self.thread.start()
def start_printing(self):
while True:
word = self.padding_queue.get()
print(word)
def print_function(self, word, dp=None):
if dp:
word = '<%s> %s' % (dp.head.get('flag'), word)
self.padding_queue.put(word)
class Jsondata:
def __init__(self, auto_save=False, auto_save_time=10):
with open('config.json', 'r') as f:
raw_data = f.read()
jsondata = json.loads(raw_data)
self.raw_jsondata = jsondata
self.auto_save = auto_save
self.auto_save_time = auto_save_time
self.thread = threading.Thread(target=self.run, args=(), daemon=True)
self.thread.start()
def try_to_read_jsondata(self, key, or_value, template=0, output=True):
if key in self.raw_jsondata.keys():
return self.raw_jsondata[key]
else:
if output:
print('Error: could not find key value in file "config.json"\n'
'Please set the key "%s" in file "config.json"\n'
'Or MSW will set it as %s' % (key, or_value))
return or_value
def get(self, key):
return self.raw_jsondata.get(key)
def set(self, key, value):
self.raw_jsondata[key] = value
def run(self):
while True:
time.sleep(self.auto_save_time)
if self.auto_save:
pass
def create_floder(path):
pathlist = list(os.path.split(path))
pathlist.pop()
flordpath = '/'.join(pathlist)
if not os.path.exists(flordpath):
_create_floder(flordpath)
def _create_floder(path):
if not path:
return
pathlist = list(os.path.split(path))
pathlist.pop()
flordpath = '/'.join(pathlist)
if not os.path.exists(flordpath):
_create_floder(flordpath)
if not os.path.exists(path):
os.mkdir(path)
global_config = {}
msw_queue = queue.Queue()
jsondata = Jsondata()
print_controller = Print_controller()
dprint = print_controller.print_function

View File

@@ -1,62 +0,0 @@
import queue
import threading
import copy
from config import global_config
send_queue = queue.Queue()
receive_queues = {}
for name in global_config['plugins_realname_list']:
name = 'plugins.' + name
receive_queues[name] = queue.Queue()
def add_plugins_string(indata):
outdata = 'plugins.' + indata
return outdata
def send_queue_function():
global send_queue, receive_queues
while True:
dp = send_queue.get()
dp.encode()
if dp.app == 'all':
for q in receive_queues:
receive_queues[q].put(dp)
elif '&' in dp.app:
applist = dp.app.split('&')
dp_list = []
for i in range(len(applist)): # split dp
new_dp = copy.copy(dp)
new_dp.app = applist[i]
dp_list.append(new_dp)
for new_dp in dp_list:
object_app, new_dp = process_reforware(new_dp)
receive_queues[add_plugins_string(object_app)].put(new_dp)
elif 'to' in dp.head: # send to net if "to" avaliable
put('net', dp)
else:
object_app, dp = process_reforware(dp)
put(object_app, dp)
def put(appname, dp):
realappname = add_plugins_string(appname)
if not receive_queues.get(realappname):
print('KeyError, Could not find queue %s' % realappname)
else:
receive_queues[realappname].put(dp)
def process_reforware(dp):
if '&' in dp.app:
first_forward, next_forward = dp.app.split('&')
dp.app = next_forward
return first_forward, dp
else:
return dp.app, dp
thread = threading.Thread(target=send_queue_function, args=(), daemon=True)
thread.start()

10
main.py
View File

@@ -1,10 +0,0 @@
import os
import os.path
import sys
python = sys.executable
while True:
code = os.system(python + ' msw.py')
if code:
break

41
msw.py
View File

@@ -1,41 +0,0 @@
import sys
import os
PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(PATH)
os.chdir(PATH)
import threading
from mswp import Datapack
from config import jsondata, global_config, msw_queue
print('Building plugins import script...')
plugins_list = os.listdir('plugins')
plugins_should_be_remove_list = []
for name in plugins_list:
if '__' in name:
plugins_should_be_remove_list.append(name)
for name in plugins_should_be_remove_list:
plugins_list.remove(name)
plugins_import_script = ''
plugins_realname_list = []
for name in plugins_list:
if len(name) >= 3:
name = name[:-3]
plugins_import_script += 'import plugins.%s\n' % name
plugins_realname_list.append(name)
with open('plugins/__init__.py', 'w') as f:
f.write(plugins_import_script)
print('%s plugins will be import' % (len(plugins_realname_list)))
print('Plugins list: %s' % str(plugins_realname_list))
global_config['plugins_realname_list'] = plugins_realname_list
import plugins
print('Plugins import finished')
# restart
code = msw_queue.get()
sys.exit(code)

112
mswp.py
View File

@@ -1,112 +0,0 @@
import os
import random
import hashlib
import copy
from config import jsondata
'''
Avaliable method are
post: used to send data, no needs to reply (deafult)
get: used to send data, but needs to reply
reply: used to reply "get" method
A datapack must like:
---------------------
post log msw/1.0
id: miku [auto]
flag: 1a2b3c4d [auto]
length: 0 [auto]
from: appname
to: [if has (net id)]
filename: [if has]
[data content here
if has
support many lines...]
---------------------
'''
BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096)
ID = jsondata.try_to_read_jsondata('id', 'unknown_id')
class Datapack:
def __init__(self, method='post', app='all', version='msw/0.1', head=None, body=b'',
file=None, gen_flag=True, delete=False):
self.id = ID
if head is None:
head = {}
self.head = head
self.head['id'] = self.id
self.method = method
self.file = file
self.delete = delete
self.failed_times = 0
self.app = app
self.version = version
self.body = body
self.encode_data = b''
if self.head.get('from'):
self.head['from'] = process_plugins_name(self.head['from'])
else:
self.head['from'] = 'unkonwn_app'
if gen_flag:
randseed = str(random.random()).encode()
h = hashlib.sha1()
h.update(randseed)
self.head['flag'] = h.hexdigest()[:8]
def encode(self):
if self.method == 'file':
self.body = b''
self.head['length'] = str(os.path.getsize(self.head['filename']))
else:
self.head['length'] = str(len(self.body))
first_line = self.method.encode() + b' ' + self.app.encode() + b' ' + self.version.encode()
heads = ''.encode()
needed_to_del = []
for i in self.head: # del the empty head
if not self.head[i]:
needed_to_del.append(i)
for i in needed_to_del:
del(self.head[i])
for i in self.head:
heads += i.encode() + b': ' + self.head[i].encode() + b'\n'
self.encode_data = first_line + b'\n' + heads + b'\n' + self.body
def decode(self, only_head=False):
index = self.encode_data.index(b'\n\n')
upper = self.encode_data[:index]
if not only_head:
self.body = self.encode_data[index+2:]
else:
self.body = b''
upper = upper.decode()
heads = upper.split('\n')
first_line = heads.pop(0)
self.method, self.app, self.version = first_line.split(' ')
for line in heads:
i, ii = line.split(': ')
self.head[i] = ii
if only_head:
return self.encode_data[index+2:]
else:
return None
def reply(self):
ndp = copy.deepcopy(self)
ndp.app = ndp.head['from']
ndp.method = 'reply'
ndp.delete = False
if not self.head['id'] == ID: # net package
ndp.head['to'] = self.head['id']
ndp.head['id'] = ID
return ndp
def process_plugins_name(name):
if 'plugins.' in name:
name = name.replace('plugins.', '')
return name
else:
return name

View File

@@ -1,315 +0,0 @@
import threading
import copy
import os
import queue
import time
import subprocess
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import msw_queue, _create_floder
from config import dprint as print
receive_queue = receive_queues[__name__]
'''
Usage:
ffmpeg: start;filename:res/test.mp4
start using one file test.mp4
ffmpeg: autostart
auto start one file from res/ffmpeg_task
ffmpeg: start;filename:res/output.mp4,concat:false
using tem file, output should be res/output_convert
ffmpeg: stop
ffmpeg: enable;server:miku
ffmpeg: disable
'''
def main():
while True:
ffmpeg_controller = Ffmpeg_controller()
ffmpeg_controller.mainloop()
class Ffmpeg_controller:
def __init__(self):
self.ffmpeg_type = None
self.status = 0
self.server = None
self.convert_task_queue = queue.Queue()
self.org_filename = None
self.object_filename = None
self.concat = True
self.pause = False
self.autostart = False
self.tasklist = []
self.convert_task_thread = threading.Thread(target=self.convert_task_func, args=())
self.convert_task_thread.start()
self.mainloop()
def mainloop(self):
_create_floder('res/ffmpeg_tmp')
_create_floder('res/ffmpeg_finished')
_create_floder('res/ffmpeg_task')
_create_floder('res/ffmpeg_old')
_create_floder('res/ffmpeg_complet')
while True:
dp = receive_queue.get()
if dp.method == 'post' and dp.body == b'concat':
self.org_filename = dp.head['filename']
self.object_filename = self.org_filename[:-4] + '.mkv'
self.concat_func()
if dp.method == 'post' and dp.body == b'autostart':
filelist = os.listdir('res/ffmpeg_task')
self.tasklist = []
for file in filelist:
if len(file) > 3:
ext = file[-4:]
if ext in ['.mp4', '.MP4', '.mkv', '.MKV']:
self.tasklist.append('res/ffmpeg_task/' + file)
dp = Datapack()
dp.app = 'ffmpeg'
dp.body = b'start'
dp.head['filename'] = self.tasklist.pop(0)
self.autostart = dp.head['filename']
send_queue.put(dp)
if dp.method == 'post' and dp.body == b'start': # config ffmpeg is server or client
if dp.head.get('concat'):
if dp.head['concat'] == 'true':
self.concat = True
elif dp.head['concat'] == 'false':
self.concat = False
else:
print('unknown concat value')
continue
else:
self.concat = True
if self.concat:
self.org_filename = dp.head['filename']
self.object_filename = 'res/ffmpeg_complet/' + os.path.basename(self.org_filename)[:-4] + '.mkv'
if self.concat:
ndp = dp.reply()
ndp.body = 'Spliting file %s' % dp.head['filename']
ndp.body = ndp.body.encode()
send_queue.put(ndp)
cmd = 'ffmpeg -i "' + os.path.normpath(dp.head['filename']) + '" -c copy \
-f segment -segment_time 20 -reset_timestamps 1 -y \
"res/ffmpeg_tmp/' + '%d' + '.mkv"'
os.system(cmd)
self.run_as_server()
if self.concat:
self.concat_func()
print('All process finished')
elif dp.method == 'post' and dp.body == b'enable': # clinet mode
self.status = 1
self.server = dp.head['server']
self.convert_func()
elif dp.method == 'post' and dp.body == b'status':
result = 'ffmpeg not working'
ndp = dp.reply()
ndp.body = result.encode()
send_queue.put(ndp)
elif dp.method == 'get': # let other client disable
ndp = dp.reply()
ndp.method = 'post'
ndp.body = b'disable'
print('let %s disabled' % dp.head['id'])
send_queue.put(ndp)
def concat_func(self):
# concat all file
_filelist = os.listdir('res/ffmpeg_finished')
if 'filelist.txt' in _filelist:
_filelist.remove('filelist.txt')
# correct order
filelist = []
for filenum in range(len(_filelist)):
filelist.append(str(filenum) + '.mkv')
with open('res/ffmpeg_finished/filelist.txt', 'w') as f:
for file in filelist:
f.write('file \'%s\'\n' % file)
os.system('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \
-c copy -y "' + os.path.normpath(self.object_filename) + '"')
for file in filelist:
os.remove('res/ffmpeg_finished/' + file)
pass
os.remove('res/ffmpeg_finished/filelist.txt')
if self.autostart:
os.rename(self.autostart, self.autostart.replace('ffmpeg_task', 'ffmpeg_old'))
self.autostart = None
def run_as_server(self):
_padding_to_convert = os.listdir('res/ffmpeg_tmp')
padding_to_convert = []
for file in _padding_to_convert:
file = 'res/ffmpeg_tmp/' + file
padding_to_convert.append(file)
already_in_convert = []
finished_convert = [] # outputfilename
while True:
dp = receive_queue.get()
if dp.method == 'post' and dp.body == b'status':
result = ''
result += 'padding_to_convert ' + str(padding_to_convert) + '\n'
result += 'already_in_convert ' + str(already_in_convert) + '\n'
result += 'finished_convert ' + str(finished_convert) + '\n'
result += 'convert_task_queue size ' + str(self.convert_task_queue.qsize())
ndp = dp.reply()
ndp.body = result.encode()
send_queue.put(ndp)
elif dp.method == 'post' and dp.body == b'reset':
padding_to_convert = already_in_convert
already_in_convert = []
elif dp.method == 'post' and dp.body == b'stop':
break
elif dp.method == 'post' and dp.body == b'pause':
self.pause = True
elif dp.method == 'post' and dp.body == b'continue':
self.pause = False
elif dp.method == 'get':
if self.pause:
ndp = dp.reply()
ndp.method = 'post'
ndp.body = b'disable'
send_queue.put(ndp)
continue
if padding_to_convert:
filename = padding_to_convert.pop(0)
already_in_convert.append(filename)
print('%s get %s to convert' % (dp.head['id'], filename), dp)
ndp = dp.reply()
ndp.method = 'file'
ndp.head['filename'] = filename
send_queue.put(ndp)
else:
if not already_in_convert: # finished
break
else: # waiting for final convert
ndp = dp.reply()
ndp.method = 'post'
ndp.body = b'disable'
send_queue.put(ndp)
elif dp.method == 'file':
old_filename = dp.head['old_filename']
filename = dp.head['filename']
os.remove(old_filename)
already_in_convert.remove(old_filename)
finished_convert.append(filename)
total = len(padding_to_convert) + len(already_in_convert) + len(finished_convert)
print('Processing...(%d) %d/%d %s' % \
(len(already_in_convert), \
len(finished_convert), \
total, \
str(round(len(finished_convert)/total*100, 2))))
if not padding_to_convert and not already_in_convert: # final process
break
print('Mapreduce finished')
def convert_func(self): # run as client
for _ in range(2):
self.send_request()
while self.status or not self.convert_task_queue.empty():
dp = receive_queue.get()
if dp.method == 'post' and dp.body == b'disable':
self.status = 0
elif dp.method == 'post' and dp.body == b'status':
result = 'Working as client, queue size: %s' % str(self.convert_task_queue.qsize())
ndp = dp.reply()
ndp.body = result.encode()
send_queue.put(ndp)
elif dp.method == 'file':
self.convert_task_queue.put(dp)
def convert_task_func(self):
while True:
dp = self.convert_task_queue.get()
filename = dp.head['filename']
output_filename = filename[:-4] + '.mkv'
output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished')
os.system('ffmpeg -i "' + os.path.normpath(filename) + '" -c:a libopus -ab 64k \
-c:v libx265 -s 1280x720 -y "' + os.path.normpath(output_filename) + '"')
os.remove(filename)
ndp = dp.reply()
ndp.head['filename'] = output_filename
ndp.head['old_filename'] = filename
ndp.method = 'file'
ndp.delete = True
send_queue.put(ndp)
self.send_request()
def send_request(self):
if self.status:
dp = Datapack(head={'from': __name__})
dp.method = 'get'
dp.app = 'ffmpeg'
dp.head['to'] = self.server
send_queue.put(dp)
def get_one_from_dict(d):
for key in d:
return key, d[key]
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

View File

@@ -1,99 +0,0 @@
import threading
import copy
import os
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import msw_queue
from config import dprint as print
receive_queue = receive_queues[__name__]
def main():
while True:
try:
_main()
except Exception as e:
print('Error in %s, %s: %s' % (__name__, type(e), str(e)))
def print_reply_func():
while True:
dp = receive_queue.get()
dp.encode()
print(dp.encode_data.decode())
def _main():
last = ''
file_flag = False
while True:
file_flag = False
raw_data = input()
if raw_data == 'restart':
msw_queue.put(0)
break
if raw_data == 'exit':
msw_queue.put(1)
break
if raw_data == 'update':
raw_data = 'update:compress;update_to:*'
if raw_data == '1':
raw_data = 'ffmpeg:autostart'
if raw_data == '2':
raw_data = 'ffmpeg:enable;to:*,server:miku'
if raw_data == 'r':
raw_data = last
last = raw_data
if raw_data[:6] == '(file)': # like "(file)log: filename.exe"
raw_data = raw_data[6:]
file_flag = True
first_index, last_index = find_index(raw_data)
app = raw_data[:first_index]
body = raw_data[last_index:]
ihead = {}
if ';' in body and ':' in body:
ihead_index = body.index(';')
ihead_str = body[ihead_index+1:]
body = body[:ihead_index]
ihead_list = ihead_str.split(',')
for key_value in ihead_list:
key, value = key_value.split(':')
ihead[key] = value
app = app.replace(' ', '')
dp = Datapack(head={'from': __name__})
dp.head.update(ihead)
dp.app = app
if file_flag:
dp.method = 'file'
dp.body = b''
dp.head['filename'] = body
else:
dp.body = body.encode()
send_queue.put(dp)
print('Command has been sent', dp)
def find_index(raw_data):
first_index = raw_data.index(':')
last_index = first_index + 1
while raw_data[last_index] == ' ':
last_index += 1
return first_index, last_index
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()
thread_print_reply_func = threading.Thread(target=print_reply_func, args=(), daemon=True)
thread_print_reply_func.start()

View File

@@ -1,26 +0,0 @@
import threading
from mswp import Datapack
from forwarder import receive_queues
from config import dprint as print
receive_queue = receive_queues[__name__]
def main():
while True:
dp = receive_queue.get()
if dp.method == 'file':
word = dp.head.get('filename')
else:
word = dp.body.decode()
print('Writedown log: %s' % (word), dp)
with open('logger.log', 'a') as f:
if dp.head.get('from'):
from_app_name = dp.head.get('from')
else:
from_app_name = 'Unknown'
f.write(from_app_name + ': ' + dp.body.decode() + '\n')
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

View File

@@ -1,563 +0,0 @@
import threading
import socket
import copy
import queue
import json
import os
import random
import time
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import jsondata, create_floder
from config import dprint as print
receive_queue = receive_queues[__name__]
BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096)
ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID')
RETRYSLEEP = 3.9
MYPROXY = jsondata.try_to_read_jsondata('proxy', False)
ONLYPROXY = jsondata.try_to_read_jsondata('onlyproxy', False)
def main():
network_controller = Network_controller()
network_controller.i_did_something()
class Network_controller: # manage id and connection
def __init__(self):
if ONLYPROXY and not MYPROXY:
print('config failed because you set onlyproxy true but proxy false')
return
self.send_queue = queue.Queue()
self.id_dict = {}
self.lock = threading.Lock()
self.all_connection_list = []
self.wheel_queue = queue.Queue()
self.netlist = [] # store nagetive connection
self.netlist_pass = []
self.conflist = [] # store config connection
self.conflist_pass = []
self.mhtlist = [] # store exchanged connection
self.mhtlist_pass = []
self.proxydict = {}
self.alllist = [self.netlist, self.netlist_pass, self.conflist, self.conflist_pass, \
self.mhtlist, self.mhtlist_pass]
self.start_wheel_thread = threading.Thread(target=self.start_wheel, args=(), daemon=True)
self.start_wheel_thread.start()
self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=(), daemon=True)
self.start_accpet_connection_thread.start()
self.start_sending_dp_thread = threading.Thread(target=self.start_sending_dp, args=(), daemon=True)
self.start_sending_dp_thread.start()
self.start_positive_connecting_thread = threading.Thread(target=self.start_positive_connecting, args=(), daemon=True)
self.start_positive_connecting_thread.start()
self.start_mht_thread = threading.Thread(target=self.start_mht, args=(), daemon=True)
self.start_mht_thread.start()
def start_mht(self):
while True:
dp = Datapack(head={'from': __name__})
dp.head['to'] = '*'
dp.app = 'net'
dp.method = 'get'
dp.body = b'mht'
#print('Send mht request', dp)
send_queue.put(dp)
time.sleep(10)
def start_positive_connecting(self):
self.read_addrlist()
while True:
for addr in self.conflist:
self.try_to_connect(addr, conntype='normal')
time.sleep(3.9)
for addr in self.mhtlist:
self.try_to_connect(addr, conntype='mht')
time.sleep(3.9)
def try_to_connect(self, addr, conntype='normal'):
thread = threading.Thread(target=self._try_to_connect, args=(addr, conntype))
thread.start()
def _try_to_connect(self, addr, conntype='normal'):
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
conn.connect(addr)
except Exception as e:
#print('Connect to %s failed, %s: %s' % (str(addr), type(e), str(e)))
del(e)
return
connection = Connection(conn, addr, self, positive=True, conntype=conntype)
connection.i_did_something()
def read_addrlist(self):
if not os.path.exists('addrlist.txt'):
print('addrlist.txt not exists, config that base on addrlist_sample.txt')
else:
with open('addrlist.txt', 'r') as f:
raw_data = f.read()
raw_data = raw_data.replace('\r', '')
lines = raw_data.split('\n')
while '' in lines:
lines.remove('')
for line in lines:
ip, port = line.split(':')
ip = socket.gethostbyname(ip)
port = int(port)
self.conflist.append((ip, port))
if jsondata.try_to_read_jsondata('proxy', False):
self.proxydict[ID] = jsondata.raw_jsondata['proxy']
def i_did_something(self): # go f**k your yeallow line
pass
def process_command(self, dp):
if dp.body == b'status':
result = ''
result += 'Online %s' % str(self.id_dict) + '\n'
result += 'proxydict %s' % str(self.proxydict) + '\n'
result += 'conflist %s' % str(self.conflist) + '\n'
result += 'conflist_pass %s' % str(self.conflist_pass) + '\n'
result += 'netlist %s' % str(self.netlist) + '\n'
result += 'netlist_pass %s' % str(self.netlist_pass) + '\n'
result += 'mhtlist %s' % str(self.mhtlist) + '\n'
result += 'mhtlist_pass %s' % str(self.mhtlist_pass)
ndp = dp.reply()
ndp.body = result.encode()
send_queue.put(ndp)
elif dp.body == b'mht' and dp.method == 'get':
ndp = dp.reply()
data_dict = {}
connection_list = []
with self.lock:
for id in self.id_dict:
connections = self.id_dict[id]
for connection in connections:
ip, port = connection.conn.getpeername()
port = int(connection.listen_port)
connection_list.append((ip, port))
for addr in self.conflist:
if not addr in connection_list:
connection_list.append(addr)
for addr in self.conflist_pass:
if not addr in connection_list:
connection_list.append(addr)
data_dict['mht'] = connection_list
data_dict['proxy'] = self.proxydict
ndp.body = json.dumps(data_dict).encode()
send_queue.put(ndp)
elif dp.method == 'reply':
mhtstr = dp.body.decode()
data_dict = json.loads(mhtstr)
mhtlist = data_dict['mht']
with self.lock:
for addr in mhtlist:
addr = (addr[0], addr[1])
if not self.check_in_list(addr):
self.mhtlist.append(addr)
self.proxydict.update(data_dict['proxy'])
else:
print('Received unknown command', dp)
def check_in_list(self, addr):
for l in self.alllist:
if addr in l:
return True
return False
def start_sending_dp(self):
while True:
dp = receive_queue.get()
if dp.app == 'net' and not dp.head.get('to'):
self.process_command(dp)
continue
if not dp.head.get('to'):
print('You got a no head datapack')
print(str(dp.head))
continue
to_str = dp.head['to']
to_list = to_str.split('&')
to = to_list.pop(0)
to_str = '&'.join(to_list)
dp.head['to'] = to_str
if to == '*':
with self.lock:
for id in self.id_dict:
connection = self.id_dict[id][0]
connection.sendall(dp)
elif not to:
print('not to', dp)
elif ONLYPROXY and not to == MYPROXY:
if dp.head['to']:
dp.head['to'] = to + dp.head['to']
else:
dp.head['to'] = to
self.send_to_id(MYPROXY, dp)
else:
self.send_to_id(to, dp)
def send_to_id(self, to, dp): # send to 1 id, process proxy at the same time
connections = self.id_dict.get(to)
if not connections:
if to == ID:
print('To id %s is yourself!' % to, dp) # maybe proxy to yourself
return
if to in self.proxydict: # neat warning dangerous code
if not ID == self.proxydict[to]: # check whether proxy is yourself
if dp.head.get('to'):
dp.head['to'] = self.proxydict[to] + '&' + to + '&' + dp.head['to']
else:
dp.head['to'] = self.proxydict[to] + '&' + to
else:
if dp.head.get('to'):
dp.head['to'] = to + '&' + dp.head['to']
else:
dp.head['to'] = to
self.wheel_queue.put(dp)
return
print('To id %s has no connection now %d...' % (to, dp.failed_times), dp)
if dp.head.get('to'):
dp.head['to'] = to + '&' + dp.head['to']
else:
dp.head['to'] = to
self.wheel_queue.put(dp)
return
connection = connections[0]
connection.sendall(dp)
def start_wheel(self):
while True:
dp = self.wheel_queue.get()
dp.failed_times += 1
if dp.failed_times > 39:
print('Datapack abandom', dp)
continue
time.sleep(RETRYSLEEP)
receive_queue.put(dp)
def start_accpet_connection(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_ip = jsondata.try_to_read_jsondata('listen_ip', '127.0.0.1')
listen_port = jsondata.try_to_read_jsondata('listen_port', 3900)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((listen_ip, listen_port))
listen_num = jsondata.try_to_read_jsondata('listen_num', 39)
s.listen(listen_num)
print('Sucessfully listen at %s:%s, max connection:%s' % (listen_ip, listen_port, listen_num))
while True:
conn, addr = s.accept()
connection = Connection(conn, addr, self)
connection.i_did_something()
def set_connection(self, connection):
id = connection.id
with self.lock:
if not self.id_dict.get(id):
self.id_dict[id] = []
self.id_dict[id].append(connection)
self.all_connection_list.append(connection)
xxxlist, xxxlist_pass = self.getlist(connection.conntype)
addr = (connection.addr[0], connection.listen_port)
if addr in xxxlist:
xxxlist.remove(addr)
if not addr in xxxlist_pass:
xxxlist_pass.append(addr)
print('<%s> %s connected' % (connection.flag, id))
def del_connection(self, connection):
id = connection.id
with self.lock:
self.id_dict[id].remove(connection)
self.all_connection_list.remove(connection)
if id in self.id_dict and not self.id_dict.get(id): # del the empty user
del(self.id_dict[id])
if connection.listen_port: # avoid "None" addr port
xxxlist, xxxlist_pass = self.getlist(connection.conntype)
addr = (connection.addr[0], connection.listen_port)
if not addr in xxxlist:
xxxlist.append(addr)
if addr in xxxlist_pass:
xxxlist_pass.remove(addr)
print('<%s> %s disconnected' % (connection.flag, id))
def getlist(self, conntype):
if conntype == 'net':
return self.netlist, self.netlist_pass
elif conntype == 'conf':
return self.conflist, self.conflist_pass
elif conntype == 'mht':
return self.mhtlist, self.mhtlist_pass
else:
print('Could not find conntype %s' % conntype)
return None, None
class Connection:
def __init__(self, conn, addr, netowrk_controller, positive=False, conntype='normal'):
self.conn = conn
self.addr = addr
self.netowrk_controller = netowrk_controller
self.id = None
self.flag = None
self.f = None
self.buff = b''
self.padding_queue = queue.Queue()
self.thread_send = None
self.positive = positive
self.listen_port = addr[1]
self.conntype = conntype
if self.conntype == 'normal':
if self.positive == True:
self.conntype = 'conf'
else:
self.conntype = 'net'
# type list
# normal(positive=True:conf, positive=False:net), mht, proxy
self.thread_recv = threading.Thread(target=self._init, args=(), daemon=True)
self.thread_recv.start()
def _init(self): # init to check connection id, threading
err_code, self.flag = self.check_id()
if err_code:
#print('<%s> Init connection failed, connection closed, code: %s' % (flag, err_code))
self.conn.close()
return
self.netowrk_controller.set_connection(self)
self.thread_send = threading.Thread(target=self.send_func, args=(), daemon=True)
self.thread_send.start()
self.receive()
def receive(self):
still_need = 0
while True:
try:
data = self.conn.recv(BUFFSIZE)
except ConnectionResetError:
break
except Exception as e:
print('Connection recv error %s: %s' % (type(e), str(e)))
break
if not data:
break
self.buff += data
if not still_need:
dp = Datapack()
dp.encode_data = self.buff
try:
self.buff = dp.decode(only_head=True)
if dp.method == 'file':
create_floder(dp.head['filename'])
create_floder('tmp/' + dp.head['filename'])
self.f = open('tmp/' + dp.head['filename'], 'ab')
if dp.method == 'file' and os.path.exists(dp.head['filename']):
os.remove(dp.head['filename'])
except Exception as e:
print('Decode head failed %s: %s' % (type(e), str(e)))
print(self.buff)
break
length = int(dp.head.get('length'))
still_need = length
if still_need > len(self.buff):
# writing tmp data
if dp.method == 'file':
still_need -= self.f.write(self.buff)
else:
dp.body += self.buff
still_need -= len(self.buff)
self.buff = b'' # empty buff because all tmp data has been write
else: # download complete setuation
if dp.method == 'file':
self.f.write(self.buff[:still_need])
self.f.close()
self.f = None
else:
dp.body = self.buff[:still_need]
self.buff = self.buff[still_need:]
still_need = 0
# bleow code are using to process datapack
if dp.method == 'file':
os.rename('tmp/' + dp.head['filename'], dp.head['filename'])
print('Received file %s from %s' % (dp.head['filename'], self.id), dp)
send_queue.put(dp)
# below code are using to closed connection
if self.f:
self.f.close()
self.f = None
self.conn.close()
self.netowrk_controller.del_connection(self)
def check_id(self):
'''
check id package must like
-------------------------------
post handshake msw/0.1
id: [yourID]
listen_port: [3900]
length: 0
-------------------------------
error code list:
1: not get "id" in head
2: receive data failed
3: appname is not handshake
4: id is yourself
'''
data = None
if self.positive:
self.send_id()
try:
data = self.conn.recv(BUFFSIZE)
except ConnectionResetError:
print('One connection failed before ID check')
if not data:
return 2, ''
self.buff += data
dp = Datapack()
dp.encode_data = self.buff # maybe here needs to use copy.copy(self.buff)
self.buff = dp.decode(only_head=True)
if not dp.head.get('id'):
return 1, dp.head.get('flag')
if not dp.app == 'handshake':
return 3, dp.head.get('flag')
self.id = dp.head['id']
self.listen_port = int(dp.head.get('listen_port'))
if self.id == ID:
#print('you connect to your self')
return 4, dp.head.get('flag')
if ONLYPROXY and not self.id == MYPROXY: # refuce not proxy connection
return 5, dp.head.get('flag')
if dp.head.get('onlyuseproxy'):
if not dp.head['onlyuseproxy'] == ID:
return 6, dp.head.get('flag')
if not self.positive:
self.send_id()
return 0, dp.head.get('flag')
def send_id(self):
dp = Datapack(head={'from': __name__})
dp.app = 'handshake'
if ONLYPROXY:
dp.head['onlyuseproxy'] = MYPROXY
dp.head['listen_port'] = str(jsondata.try_to_read_jsondata('listen_port', 3900))
dp.encode()
self.conn.sendall(dp.encode_data)
def sendall(self, dp):
self.padding_queue.put(dp)
def send_func(self):
while True:
dp = self.padding_queue.get()
dp.encode()
self.conn.sendall(dp.encode_data)
if dp.method == 'file':
with open(dp.head['filename'], 'rb') as f:
for data in f:
try:
self.conn.sendall(data)
except Exception as e:
print('Failed to send file %s %s: %s' % (dp.head['filename'], type(e), str(e)), dp)
if dp.head.get('to'):
dp.head['to'] = self.id + '&' + dp.head['to']
else:
dp.head['to'] = self.id
self.netowrk_controller.wheel_queue.put(dp)
break
if dp.delete:
os.remove(dp.head['filename'])
print('Send file %s to %s finished' % (dp.head['filename'], self.id), dp)
def i_did_something(self):
pass
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

View File

@@ -1,35 +0,0 @@
import threading
import copy
import os
import subprocess
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import msw_queue
from config import dprint as print
receive_queue = receive_queues[__name__]
def main():
while True:
dp = receive_queue.get()
command = dp.body.decode()
try:
result = subprocess.check_output(command, shell=True)
except Exception as e:
result = 'Command %s error, %s: %s' % (command, type(e), str(e))
result = result.encode()
ndp = dp.reply()
ndp.body = try_decode_and_encode(result)
send_queue.put(ndp)
def try_decode_and_encode(data):
try:
return data.decode('gb2312').encode()
except:
return data.decode('utf-8').encode()
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

View File

@@ -1,83 +0,0 @@
import threading
import tarfile
import os
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import msw_queue
receive_queue = receive_queues[__name__]
remove_file_list = ['__init__.py', 'addrlist.txt', 'config.json', 'logger.log', 'update.tar.xz']
remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp', 'res']
def main():
while True:
dp = receive_queue.get()
if dp.method == 'post':
if dp.body == b'compress':
# compressing file
print('Starting update')
compress = Compresser()
compress.start_compress()
print('Compress finished')
# getting to destination
to = dp.head.get('update_to')
if not to:
print('unable to locate update_to')
continue
# sending file
ndp = Datapack(head={'from':__name__})
ndp.method = 'file'
ndp.app = 'update'
ndp.head['filename'] = 'res/update.tar.xz'
ndp.head['to'] = to
send_queue.put(ndp)
elif dp.method == 'file':
print('Starting update local file')
with tarfile.open(dp.head['filename'], 'r:xz') as f:
f.extractall()
#os.remove(dp.head['filename'])
# restart msw program
msw_queue.put(0)
class Compresser:
def __init__(self):
self.filelist = []
def start_compress(self):
self.filelist = self.get_filelist()
self.compress_files(self.filelist)
def compress_files(self, filelist):
with tarfile.open('res/update.tar.xz', 'w:xz') as f:
for name in filelist:
f.add(name)
def get_filelist(self):
filelist = []
for root, dirs, files in os.walk('.'):
for name in remove_file_list:
if name in files:
files.remove(name)
for name in remove_dir_list:
if name in dirs:
dirs.remove(name)
for name in files:
filelist.append(os.path.join(root, name))
for name in dirs:
pass
return filelist
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

View File

@@ -1 +0,0 @@
I am here~!!!

80
test.py
View File

@@ -1,80 +0,0 @@
import time
import threading
import socket
import queue
import sys
send_queue = queue.Queue()
def recv():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 3900))
s.listen(39)
while True:
conn, addr = s.accept()
thread = threading.Thread(target=process_connection, args=(conn, addr), daemon=True)
thread.start()
def process_connection(conn, addr):
while True:
data = conn.recv(4096)
if not data:
conn.close()
return
check_data.queue.put(data)
time.sleep(1)
class Check_data:
def __init__(self):
self.queue = queue.Queue()
self.thread = threading.Thread(target=self.recv, args=(), daemon=True)
self.thread.start()
def recv(self):
while True:
data = self.queue.get()
s_print(data)
def send(size, c):
data = c*size
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 3900))
s_print('start sending %s' % c)
start_time = time.time()
s.sendall(data)
end_time = time.time()
s_print('Send %s finished, take %s' % (c, end_time - start_time))
def print_queue():
while True:
word = send_queue.get()
print(word)
def s_print(data):
send_queue.put(data)
check_data = Check_data()
time.sleep(1)
thread_print = threading.Thread(target=print_queue, args=(), daemon=True)
thread_print.start()
thread_recv = threading.Thread(target=recv, args=(), daemon=True)
thread_recv.start()
print('recv thread started')
time.sleep(1)
thread_send_1 = threading.Thread(target=send, args=(100000000, b'1'), daemon=True)
thread_send_2 = threading.Thread(target=send, args=(100000000, b'2'), daemon=True)
thread_send_1.start()
thread_send_2.start()
input()
sys.exit()
# 结论多线程同时对一个socket.sendall()调用,会导致数据混乱

View File

@@ -1,44 +0,0 @@
import socket
import time
data = '''post id msw/0.1
id: miku
length: 0
from: test_software
flag: 1a2b3c4d
file log msw/1.0
from: test
flag: abcdefgh
filename: download.txt
num: 1/1
length: 5
123'''
data2 = '''4'''
data3 = '''5'''
data4 = '''post log msw/1.1
from: network
flag: 12345678
num: 1/1
length: 3
abc'''
data_list = [data,data2,data3]
code_list = []
for i in data_list:
code_list.append(i.encode())
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 3900))
n=0
for i in code_list:
n+=1
s.sendall(i)
print('发送%s' % n)
time.sleep(1)

View File

@@ -1,21 +0,0 @@
import socket
import threading
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
id = b'''post handshake msw/1.0
id: miku2
length: 0
flag: ee20aeff
'''
s.connect(('127.0.0.1',3900))
s.sendall(id)
print(s.recv(4096).decode(), end='')
print(s.recv(4096).decode())
input('finished...')

View File