From 9924a301080a72d94f815ea8f61a1ba178934eaa Mon Sep 17 00:00:00 2001 From: heimoshuiyu Date: Sat, 14 Dec 2019 11:47:50 +0800 Subject: [PATCH] first commit --- README.md | 5 +++ config.json | 4 +++ config.py | 36 +++++++++++++++++++ forwarder.py | 54 ++++++++++++++++++++++++++++ logger.log | 5 +++ msw.py | 33 +++++++++++++++++ mswp.py | 32 +++++++++++++++++ netlist_sample.txt | 1 + plugins/input.py | 39 ++++++++++++++++++++ plugins/logger.py | 19 ++++++++++ plugins/net.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++ test_tool.py | 19 ++++++++++ 12 files changed, 336 insertions(+) create mode 100644 README.md create mode 100644 config.json create mode 100644 config.py create mode 100644 forwarder.py create mode 100644 logger.log create mode 100644 msw.py create mode 100644 mswp.py create mode 100644 netlist_sample.txt create mode 100644 plugins/input.py create mode 100644 plugins/logger.py create mode 100644 plugins/net.py create mode 100644 test_tool.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..b06775e --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# MSW +personal network personal framework +function as plugins +combine everything +M.S.W \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..724e6f8 --- /dev/null +++ b/config.json @@ -0,0 +1,4 @@ +{ + "test": "test", + "listen": 3900 +} \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..41bf163 --- /dev/null +++ b/config.py @@ -0,0 +1,36 @@ +import json +import threading +import json +import time + + + + +class Jsondata: + def __init__(self, auto_save=False, auto_save_time=10): + with open('config.json', 'r') as f: + raw_data = f.read() + jsondata = json.loads(raw_data) + self.raw_jsondata = jsondata + self.auto_save = auto_save + self.auto_save_time = auto_save_time + self.thread = threading.Thread(target=self.run, args=()) + self.thread.start() + + def get(self, key): + try: + return self.raw_jsondata[key] + except: + return False + + def set(self, key, value): + self.raw_jsondata[key] = value + + def run(self): + while True: + time.sleep(self.auto_save_time) + if self.auto_save: + pass + +global_config = {} +jsondata = Jsondata() \ No newline at end of file diff --git a/forwarder.py b/forwarder.py new file mode 100644 index 0000000..c6f32ba --- /dev/null +++ b/forwarder.py @@ -0,0 +1,54 @@ +import queue +import threading +import copy +from config import global_config + + +send_queue = queue.Queue() +receive_queues = {} + +for name in global_config['plugins_realname_list']: + name = 'plugins.' + name + receive_queues[name] = queue.Queue() + + +def add_plugins_string(indata): + outdata = 'plugins.' + indata + return outdata + + +def send_queue_function(): + global send_queue, receive_queues + while True: + dp = send_queue.get() + print('dp.app is', dp.app) + if dp.app == 'all': + for q in receive_queues: + receive_queues[q].put(dp) + elif ',' in dp.app: + applist = dp.app.split(',') + print(applist) + dp_list = [] + for i in range(len(applist)): # split dp + new_dp = copy.copy(dp) + new_dp.app = applist[i] + dp_list.append(new_dp) + for new_dp in dp_list: + object_app, new_dp = process_reforware(new_dp) + receive_queues[add_plugins_string(object_app)].put(new_dp) + else: + object_app, dp =process_reforware(dp) + receive_queues[add_plugins_string(object_app)].put(dp) + + +def process_reforware(dp): + if ':' in dp.app: + first_forward, next_forward = dp.app.split(':') + dp.app = next_forward + return first_forward, dp + else: + return dp.app, dp + + +thread = threading.Thread(target=send_queue_function, args=()) +thread.start() diff --git a/logger.log b/logger.log new file mode 100644 index 0000000..d7e22a5 --- /dev/null +++ b/logger.log @@ -0,0 +1,5 @@ +plugins.input: hello +plugins.input: asdfasf +plugins.input: alskdfaj +plugins.input: alsdkfj +plugins.input: testtest diff --git a/msw.py b/msw.py new file mode 100644 index 0000000..633ac42 --- /dev/null +++ b/msw.py @@ -0,0 +1,33 @@ +from mswp import Datapack +import threading +import os +from config import jsondata, global_config + + +print('Building plugins import script...') +plugins_list = os.listdir('plugins') +plugins_should_be_remove_list = [] +for name in plugins_list: + if '__' in name: + plugins_should_be_remove_list.append(name) +for name in plugins_should_be_remove_list: + plugins_list.remove(name) +plugins_import_script = '' +plugins_realname_list = [] +for name in plugins_list: + if len(name) >= 3: + name = name[:-3] + plugins_import_script += 'import plugins.%s\n' % name + plugins_realname_list.append(name) +with open('plugins/__init__.py', 'w') as f: + f.write(plugins_import_script) +print('%s plugins will be import' % (len(plugins_realname_list))) +print('They are: %s' % str(plugins_realname_list)) + + +global_config['plugins_realname_list'] = plugins_realname_list + +import plugins +print('Plugins import finished') + + diff --git a/mswp.py b/mswp.py new file mode 100644 index 0000000..e50ab16 --- /dev/null +++ b/mswp.py @@ -0,0 +1,32 @@ +class Datapack: + def __init__(self, method='post', app='all', version='msw/1.0', head={}, body=b''): + self.method = method + self.app = app + self.version = version + if not head: + self.head = {'nohead': "true"} + else: + self.head = head + self.body = body + self.encode_data = b'' + + def encode(self): + first_line = self.method.encode() + b' ' + self.app.encode() + b' ' + self.version.encode() + heads = ''.encode() + for i in self.head: + heads += i.encode() + b': ' + self.head[i].encode() + b'\n' + self.encode_data = first_line + b'\n' + heads + b'\n' + self.body + + def decode(self): + index = self.encode_data.index(b'\n\n') + upper = self.encode_data[:index] + self.body = self.encode_data[index+2:] + upper = upper.decode() + heads = upper.split('\n') + first_line = heads.pop(0) + self.method, self.app, self.version = first_line.split(' ') + for line in heads: + i, ii = line.split(': ') + self.head[i] = ii + + diff --git a/netlist_sample.txt b/netlist_sample.txt new file mode 100644 index 0000000..b066c3e --- /dev/null +++ b/netlist_sample.txt @@ -0,0 +1 @@ +sh.bakusaihazu.site:3900 \ No newline at end of file diff --git a/plugins/input.py b/plugins/input.py new file mode 100644 index 0000000..20f2db5 --- /dev/null +++ b/plugins/input.py @@ -0,0 +1,39 @@ +import threading +import copy +from mswp import Datapack +from forwarder import receive_queues, send_queue +receive_queue = receive_queues[__name__] + + +def main(): + while True: + raw_data = input() + first_index, last_index = find_the_last(raw_data) + app = raw_data[:first_index] + body = raw_data[last_index:] + app = app.replace(' ', '') + dp = Datapack(head={'from': __name__}) + dp.app = app + print(body) + dp.body = body.encode() + send_queue.put(dp) + + +def find_the_last(indata): # find the last ":" index + first_index = indata.index(':') + while True: + try: + next_index = indata[first_index+1:].index(':') + first_index += next_index + 1 + print(first_index) + except Exception as e: + break + last_index = copy.copy(first_index) + last_index += 1 + while indata[last_index] == ' ': + last_index += 1 + return first_index, last_index + + +thread = threading.Thread(target=main, args=()) +thread.start() diff --git a/plugins/logger.py b/plugins/logger.py new file mode 100644 index 0000000..ba3ed10 --- /dev/null +++ b/plugins/logger.py @@ -0,0 +1,19 @@ +import threading +from mswp import Datapack +from forwarder import receive_queues +receive_queue = receive_queues[__name__] + +def main(): + while True: + dp = receive_queue.get() + print('Writedown log: %s' % dp.body.decode()) + with open('logger.log', 'a') as f: + if dp.head.get('from'): + from_app_name = dp.head.get('from') + else: + from_app_name = 'Unknown' + f.write(from_app_name + ': ' + dp.body.decode() + '\n') + +thread = threading.Thread(target=main, args=()) +thread.start() + diff --git a/plugins/net.py b/plugins/net.py new file mode 100644 index 0000000..a717055 --- /dev/null +++ b/plugins/net.py @@ -0,0 +1,89 @@ +import threading +import socket +import queue +from mswp import Datapack +from forwarder import receive_queues, send_queue +receive_queue = receive_queues[__name__] + + +def main(): + netlist = Netlist() + while True: + dp = receive_queue.get() + dp.encode() + netlist.send_queue.put(dp) + + +def connect(addr): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(addr) + return s + + +def process_hostname(hostname): + ip = socket.gethostbyname(hostname) + return ip + + +class Netlist: # contain net list and network controller + def __init__(self): + self.send_queue = queue.Queue() + with open('netlist.txt', 'r') as f: + raw_data = f.read() + lines = raw_data.split('\n') + ips = [] + for line in lines: + ip, port = line.split(':') + ip = process_hostname(ip) + port = int(port) + ips.append((ip, port)) + self.addr_to_conn = {} + for addr in ips: + self.addr_to_conn[addr] = '' # initail connection dict + for addr in self.addr_to_conn: # Create connection + conn = connect(addr) + self.addr_to_conn[addr] = conn + self.addr_to_thread = {} + for addr in self.addr_to_conn: # Create thread + thread = threading.Thread(target=self.maintain_connection, args=(addr,)) + self.addr_to_thread[addr] = thread + for addr in self.addr_to_thread: # start thread + self.addr_to_thread[addr].start() + self.check_queue_thread = threading.Thread(target=self.check_queue, args=()) + self.check_queue_thread.start() # thread that check the queue and send one by one + + def maintain_connection(self, addr): + conn = self.addr_to_conn[addr] + print('Connection %s has connected' % str(addr)) + while True: + data = conn.recv(4096) + if not data: + print('disconnected with %s' % str(addr)) + conn.close() + return + data = data.decode() + print(data) + + def check_queue(self): + while True: + dp = self.send_queue.get() + for addr in self.addr_to_conn: + self.send_data(dp.encode_data, self.addr_to_conn[addr]) + + def send_data(self, data, conn): + threading.Thread(target=self._send_data, args=(data, conn)).start() + + def _send_data(self, data, conn): + try: + conn.sendall(data) + print('succeed send %s' % data) + except: + print('Sending %s error, data will be DROP!!' % data[0:10]) + + + + + + +thread = threading.Thread(target=main, args=()) +thread.start() diff --git a/test_tool.py b/test_tool.py new file mode 100644 index 0000000..2770528 --- /dev/null +++ b/test_tool.py @@ -0,0 +1,19 @@ +import socket +import threading + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +s.bind(('', 3900)) +s.listen(100) + + +def process(conn, addr): + print('accept connection from', str(addr)) + data = conn.recv(4096) + data = data.decode() + print(data) + + +while True: + conn, addr = s.accept() + threading.Thread(target=process, args=(conn, addr)).start()