2020-05-30 18:21:24 +08:00
2020-04-08 22:37:54 +08:00
2020-04-01 11:43:06 +08:00
2020-03-29 11:10:09 +08:00
2020-04-03 21:00:57 +08:00
2020-04-01 16:20:56 +08:00
2020-04-01 11:43:06 +08:00
2020-03-29 13:52:36 +08:00
2020-04-07 20:07:44 +08:00
2020-05-30 18:21:24 +08:00
2020-03-28 11:58:45 +08:00
2020-03-28 16:11:50 +08:00
2020-03-28 11:58:45 +08:00

MSW

简单网络框架
README暂时写中文英文水平差容易产生歧义orz...

介绍

  • 程序会自动载入plugins文件夹内所有python脚本换言之所有自定义插件都可以放到plugins目录中
  • 程序会为每一个plugin提供一个独立的接收队列recive_queue,插件可以通过调用recive_queue.get()获取数据
  • 程序为所有plugin提供一个共用的发送队列send_queue,插件可以通过调用send_queue.put()来使用框架发送数据
  • “数据”,是指mswp.py中定义的Datapack类型,这种结构储存了数据包发送目的地、来源、参数等信息
  • forwarder.py会读取send_queue,判断“数据”中的app这一项属性将其放到对应plugin的接收队列中起到路由的作用
    总的来说
    plugin完成某项任务后创建Datapack对象并设置好该数据包的发送目的地等参数,将其放入到send_queue对应plugin就可以收到该数据包

部署

部署很简单,把文件下载下来,写好配置直接运行

安装

运行命令克隆仓库

git clone https://github.com/heimoshuiyu/msw

配置config.json

配置config.json中的id,监听地址、端口等信息
单台机器可以运行多个msw只要配置不同的id和监听端口接口

配置地址列表

编辑创建address.txt,格式请参考address_example.txt

运行

python main.py

或者

python3 main.py

注意msw仅支持python3

数据包Datapack

msw中队列传递的都是一种叫Datapack的结构其结构类似http的请求包mswp.py定义)
示例

post shell msw/0.1
from: test
id: hmsy
to: miku
flag: 1a2b3c4d

shutdown now
  • 解释:向名为miku的主机的shell插件发送shutdown now的命令,执行结果返回到hmsy主机的test插件
  • Datapack.method有三种可能的值:postreplyfile
    • post是最普通的,一般数据包都使用该标识
    • reply标记该数据包是一个回复
    • file标记该数据包是一个文件,文件由filename参数指定,数据包中不会包含文件的内容
  • Datapack.app表示数据包发送目的地的plugin名字shell,即plugins/shell.py该程序会收到此数据包
    • 若是找不到对应app则产生一个警告
  • Datapack.version表示当前程序版本,暂时还没什么用
  • Datapack.head一个字典,储存参数
    • to若存在,参数表示这是一个需要网络发送的数据包,数据包会被发送到to指定的主机
    • id表示发送人的id由程序自动配置
    • from表示发送的插件名,reply方法会用到这个参数,建议设置为__name__
    • flag数据包识别码,由程序随机自动生成,调试用,暂无特殊用途
    • filename,仅在Datapack.methodfile时生效,
  • Datapack.body数据包主内容

已实现功能

  • main.py
    • 这是一个守护程序,会调用os.system()来运行msw.py主程序主程序若正常退出返回值0则重启主程序
  • forwarder.py
    • send_queuerecive_queues中实现转发功能
    • 根据Datapack.app选择队列进行转发
    • 如果参数中包含to这一项,将转发给plugins/net.py由net插件进行网络转发
  • plugins/update.py
    • 依赖plugins/net.py,可以快读更新其他主机的代码
    • 该插件可以将程序目录下的文件过滤并打包,然后发送给其他主机
    • 其他主机收到文件会解压并替换程序原有文件然后向msw发送0值尝试重启
  • plugins/input.py
    • 循环调用input(),允许用户输入命令
    • 命令格式插件名:数据包内容;键:值,键2:值2
    • 例如update:compress;update_to:*表示向plugins/update发送内容为compress,参数update_to=*的数据包。compressupdate_to字段都是plugins/update.py中定义的compress表示压缩,update表示解压缩update_to表示将升级包发送到这个目录,设置为*表示所有主机,*的功能是在plugins/net.py中定义的
  • plugins/net.py
    • 网络插件,读取address.txt中的地址并连接和其他主机交换地址列表建立类似dht的网络
    • 如果Datapack中含有参数toforward.py会无条件将数据包转发到此插件进行网络发送,发送完成后会去掉to参数
    • 插件收到其他主机发来的数据包,解码后会放进send_queue队列中,如果此时还存在to参数,会进行多次转发,实现代理功能
    • 反向代理:插件根据to参数确定主机地址,to参数内容为主机id若在连接池中有该id的连接则直接发送否则参考proxy字典决定是否发送给代理否则放入发送失败队列等待重发
    • 发送文件:Datapack.methodfile时启动,根据filename参数中制定的文件进行发送
    • 接受文件:文件下载完成后才会将数据包放入send_queue队列中
    • 心跳&mht定时交换自身拥有的地址列表、代理列表
    • 重试失败次数超过39次数据包将被丢弃
  • plugins/ffmpeg.py
    • 分布式转码
    • 调用系统ffmpeg对视频进行切片分发转码收集合并
    • 对分发主机启动server模式并指定文件名对转码主机启动worker模式并指定分发主机的id即可开始转码
  • plugins/logger.py
    • 日志记录,会将接收到的数据包写入log.txt

编写第一个插件

建议参考plugins/logger.py,这个简单的日志记录器实现了数据包的接受和发送

示例代码

import threading
from forwarder import recive_queues, send_queue
from mswp import Datapack
# 获取自身对应的接受队列
recive_queue = recive_queues[__name__]

def main():
    while True:
        dp = recive_queue.get() # 阻塞获取数据包
        if dp.method = 'post': # 打印出数据包的head和body
            print('You recive head is', str(dp.head))
            print('You recive body is', dp.body.decode())
        if dp.method = 'post': # 回复数据包示例
            ndp = dp.reply()
            ndp.body = 'recived'.encode()
            send_queue.put(ndp) # 发送数据包
        else: # 新建数据包示例
            dp = Datapack(head={'from'=__name__})
            dp.head['to'] = 'hmsy' # 设置目标主机名
            send_queue.put(dp)
            # 发送文件示例
            dp = Datapack(head={'from'=__name__})
            dp.method = 'file' # 标记该数据包为文件类型
            dp.head['to'] = 'hsmy'
            dp.head['filename'] = 'res/file.txt'  # 设置数据包携带的文件
            send_queue.put(dp)
            

# 必须以多线程方式启动主函数必须设置daemon让主线程退出后子线程也能退出
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()

原则上不建议直接import其他plugin建议通过发送Datapack来与其他插件交互

Description
No description provided
Readme 6.8 MiB
Languages
Python 59.5%
Go 36.8%
Shell 2.5%
Makefile 1.2%