commit 9924a301080a72d94f815ea8f61a1ba178934eaa Author: heimoshuiyu Date: Sat Dec 14 11:47:50 2019 +0800 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..b06775e --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# MSW +personal network personal framework +function as plugins +combine everything +M.S.W \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..724e6f8 --- /dev/null +++ b/config.json @@ -0,0 +1,4 @@ +{ + "test": "test", + "listen": 3900 +} \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..41bf163 --- /dev/null +++ b/config.py @@ -0,0 +1,36 @@ +import json +import threading +import json +import time + + + + +class Jsondata: + def __init__(self, auto_save=False, auto_save_time=10): + with open('config.json', 'r') as f: + raw_data = f.read() + jsondata = json.loads(raw_data) + self.raw_jsondata = jsondata + self.auto_save = auto_save + self.auto_save_time = auto_save_time + self.thread = threading.Thread(target=self.run, args=()) + self.thread.start() + + def get(self, key): + try: + return self.raw_jsondata[key] + except: + return False + + def set(self, key, value): + self.raw_jsondata[key] = value + + def run(self): + while True: + time.sleep(self.auto_save_time) + if self.auto_save: + pass + +global_config = {} +jsondata = Jsondata() \ No newline at end of file diff --git a/forwarder.py b/forwarder.py new file mode 100644 index 0000000..c6f32ba --- /dev/null +++ b/forwarder.py @@ -0,0 +1,54 @@ +import queue +import threading +import copy +from config import global_config + + +send_queue = queue.Queue() +receive_queues = {} + +for name in global_config['plugins_realname_list']: + name = 'plugins.' + name + receive_queues[name] = queue.Queue() + + +def add_plugins_string(indata): + outdata = 'plugins.' + indata + return outdata + + +def send_queue_function(): + global send_queue, receive_queues + while True: + dp = send_queue.get() + print('dp.app is', dp.app) + if dp.app == 'all': + for q in receive_queues: + receive_queues[q].put(dp) + elif ',' in dp.app: + applist = dp.app.split(',') + print(applist) + dp_list = [] + for i in range(len(applist)): # split dp + new_dp = copy.copy(dp) + new_dp.app = applist[i] + dp_list.append(new_dp) + for new_dp in dp_list: + object_app, new_dp = process_reforware(new_dp) + receive_queues[add_plugins_string(object_app)].put(new_dp) + else: + object_app, dp =process_reforware(dp) + receive_queues[add_plugins_string(object_app)].put(dp) + + +def process_reforware(dp): + if ':' in dp.app: + first_forward, next_forward = dp.app.split(':') + dp.app = next_forward + return first_forward, dp + else: + return dp.app, dp + + +thread = threading.Thread(target=send_queue_function, args=()) +thread.start() diff --git a/logger.log b/logger.log new file mode 100644 index 0000000..d7e22a5 --- /dev/null +++ b/logger.log @@ -0,0 +1,5 @@ +plugins.input: hello +plugins.input: asdfasf +plugins.input: alskdfaj +plugins.input: alsdkfj +plugins.input: testtest diff --git a/msw.py b/msw.py new file mode 100644 index 0000000..633ac42 --- /dev/null +++ b/msw.py @@ -0,0 +1,33 @@ +from mswp import Datapack +import threading +import os +from config import jsondata, global_config + + +print('Building plugins import script...') +plugins_list = os.listdir('plugins') +plugins_should_be_remove_list = [] +for name in plugins_list: + if '__' in name: + plugins_should_be_remove_list.append(name) +for name in plugins_should_be_remove_list: + plugins_list.remove(name) +plugins_import_script = '' +plugins_realname_list = [] +for name in plugins_list: + if len(name) >= 3: + name = name[:-3] + plugins_import_script += 'import plugins.%s\n' % name + plugins_realname_list.append(name) +with open('plugins/__init__.py', 'w') as f: + f.write(plugins_import_script) +print('%s plugins will be import' % (len(plugins_realname_list))) +print('They are: %s' % str(plugins_realname_list)) + + +global_config['plugins_realname_list'] = plugins_realname_list + +import plugins +print('Plugins import finished') + + diff --git a/mswp.py b/mswp.py new file mode 100644 index 0000000..e50ab16 --- /dev/null +++ b/mswp.py @@ -0,0 +1,32 @@ +class Datapack: + def __init__(self, method='post', app='all', version='msw/1.0', head={}, body=b''): + self.method = method + self.app = app + self.version = version + if not head: + self.head = {'nohead': "true"} + else: + self.head = head + self.body = body + self.encode_data = b'' + + def encode(self): + first_line = self.method.encode() + b' ' + self.app.encode() + b' ' + self.version.encode() + heads = ''.encode() + for i in self.head: + heads += i.encode() + b': ' + self.head[i].encode() + b'\n' + self.encode_data = first_line + b'\n' + heads + b'\n' + self.body + + def decode(self): + index = self.encode_data.index(b'\n\n') + upper = self.encode_data[:index] + self.body = self.encode_data[index+2:] + upper = upper.decode() + heads = upper.split('\n') + first_line = heads.pop(0) + self.method, self.app, self.version = first_line.split(' ') + for line in heads: + i, ii = line.split(': ') + self.head[i] = ii + + diff --git a/netlist_sample.txt b/netlist_sample.txt new file mode 100644 index 0000000..b066c3e --- /dev/null +++ b/netlist_sample.txt @@ -0,0 +1 @@ +sh.bakusaihazu.site:3900 \ No newline at end of file diff --git a/plugins/input.py b/plugins/input.py new file mode 100644 index 0000000..20f2db5 --- /dev/null +++ b/plugins/input.py @@ -0,0 +1,39 @@ +import threading +import copy +from mswp import Datapack +from forwarder import receive_queues, send_queue +receive_queue = receive_queues[__name__] + + +def main(): + while True: + raw_data = input() + first_index, last_index = find_the_last(raw_data) + app = raw_data[:first_index] + body = raw_data[last_index:] + app = app.replace(' ', '') + dp = Datapack(head={'from': __name__}) + dp.app = app + print(body) + dp.body = body.encode() + send_queue.put(dp) + + +def find_the_last(indata): # find the last ":" index + first_index = indata.index(':') + while True: + try: + next_index = indata[first_index+1:].index(':') + first_index += next_index + 1 + print(first_index) + except Exception as e: + break + last_index = copy.copy(first_index) + last_index += 1 + while indata[last_index] == ' ': + last_index += 1 + return first_index, last_index + + +thread = threading.Thread(target=main, args=()) +thread.start() diff --git a/plugins/logger.py b/plugins/logger.py new file mode 100644 index 0000000..ba3ed10 --- /dev/null +++ b/plugins/logger.py @@ -0,0 +1,19 @@ +import threading +from mswp import Datapack +from forwarder import receive_queues +receive_queue = receive_queues[__name__] + +def main(): + while True: + dp = receive_queue.get() + print('Writedown log: %s' % dp.body.decode()) + with open('logger.log', 'a') as f: + if dp.head.get('from'): + from_app_name = dp.head.get('from') + else: + from_app_name = 'Unknown' + f.write(from_app_name + ': ' + dp.body.decode() + '\n') + +thread = threading.Thread(target=main, args=()) +thread.start() + diff --git a/plugins/net.py b/plugins/net.py new file mode 100644 index 0000000..a717055 --- /dev/null +++ b/plugins/net.py @@ -0,0 +1,89 @@ +import threading +import socket +import queue +from mswp import Datapack +from forwarder import receive_queues, send_queue +receive_queue = receive_queues[__name__] + + +def main(): + netlist = Netlist() + while True: + dp = receive_queue.get() + dp.encode() + netlist.send_queue.put(dp) + + +def connect(addr): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(addr) + return s + + +def process_hostname(hostname): + ip = socket.gethostbyname(hostname) + return ip + + +class Netlist: # contain net list and network controller + def __init__(self): + self.send_queue = queue.Queue() + with open('netlist.txt', 'r') as f: + raw_data = f.read() + lines = raw_data.split('\n') + ips = [] + for line in lines: + ip, port = line.split(':') + ip = process_hostname(ip) + port = int(port) + ips.append((ip, port)) + self.addr_to_conn = {} + for addr in ips: + self.addr_to_conn[addr] = '' # initail connection dict + for addr in self.addr_to_conn: # Create connection + conn = connect(addr) + self.addr_to_conn[addr] = conn + self.addr_to_thread = {} + for addr in self.addr_to_conn: # Create thread + thread = threading.Thread(target=self.maintain_connection, args=(addr,)) + self.addr_to_thread[addr] = thread + for addr in self.addr_to_thread: # start thread + self.addr_to_thread[addr].start() + self.check_queue_thread = threading.Thread(target=self.check_queue, args=()) + self.check_queue_thread.start() # thread that check the queue and send one by one + + def maintain_connection(self, addr): + conn = self.addr_to_conn[addr] + print('Connection %s has connected' % str(addr)) + while True: + data = conn.recv(4096) + if not data: + print('disconnected with %s' % str(addr)) + conn.close() + return + data = data.decode() + print(data) + + def check_queue(self): + while True: + dp = self.send_queue.get() + for addr in self.addr_to_conn: + self.send_data(dp.encode_data, self.addr_to_conn[addr]) + + def send_data(self, data, conn): + threading.Thread(target=self._send_data, args=(data, conn)).start() + + def _send_data(self, data, conn): + try: + conn.sendall(data) + print('succeed send %s' % data) + except: + print('Sending %s error, data will be DROP!!' % data[0:10]) + + + + + + +thread = threading.Thread(target=main, args=()) +thread.start() diff --git a/test_tool.py b/test_tool.py new file mode 100644 index 0000000..2770528 --- /dev/null +++ b/test_tool.py @@ -0,0 +1,19 @@ +import socket +import threading + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +s.bind(('', 3900)) +s.listen(100) + + +def process(conn, addr): + print('accept connection from', str(addr)) + data = conn.recv(4096) + data = data.decode() + print(data) + + +while True: + conn, addr = s.accept() + threading.Thread(target=process, args=(conn, addr)).start()