diff --git a/netlist_sample.txt b/addrlist_example.txt similarity index 100% rename from netlist_sample.txt rename to addrlist_example.txt diff --git a/config.py b/config.py index 0d80de3..4b46711 100644 --- a/config.py +++ b/config.py @@ -2,6 +2,24 @@ import threading import json import time import queue +import os + + +class Print_controller: + def __init__(self): + self.padding_queue = queue.Queue() + self.thread = threading.Thread(target=self.start_printing, args=(), daemon=True) + self.original_print = print + + def start_printing(self): + while True: + word = self.padding_queue.get() + print(word) + + def print_function(self, word, dp=None): + if dp: + word = '<%s> %s' % (dp.head.get('flag'), word) + self.padding_queue.put(word) class Jsondata: @@ -39,4 +57,7 @@ class Jsondata: global_config = {} msw_queue = queue.Queue() -jsondata = Jsondata() \ No newline at end of file +jsondata = Jsondata() + +print_controller = Print_controller() +print = print_controller.print_function diff --git a/forwarder.py b/forwarder.py index a73d428..d97bc57 100644 --- a/forwarder.py +++ b/forwarder.py @@ -43,8 +43,8 @@ def send_queue_function(): def process_reforware(dp): - if ':' in dp.app: - first_forward, next_forward = dp.app.split(':') + if '&' in dp.app: + first_forward, next_forward = dp.app.split('&') dp.app = next_forward return first_forward, dp else: diff --git a/logger.log b/logger.log deleted file mode 100644 index c62018d..0000000 --- a/logger.log +++ /dev/null @@ -1,11 +0,0 @@ -plugins.input: hello -plugins.input: asdfasf -plugins.input: alskdfaj -plugins.input: alsdkfj -plugins.input: testtest -plugins.input: hello -plugins.input: asldjfalsdjf -plugins.input: lajsdflajsdlkf -plugins.input: alsdjflasjdlkfjasldfjalsdfj -network: -network: diff --git a/mswp.py b/mswp.py index a74dbce..2e94ccc 100644 --- a/mswp.py +++ b/mswp.py @@ -25,23 +25,16 @@ BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) class Datapack: def __init__(self, method='post', app='all', version='msw/0.1', head=None, body=b'', - check_head=True, file=None, gen_flag=True): + file=None, gen_flag=True): self.id = jsondata.try_to_read_jsondata('id', 'unknown_id') if head is None: head = {} - self.head = head - else: - self.head = head - if self.id == 'unknown_id': - self.head['id'] = self.id + self.head = head + self.head['id'] = self.id self.method = method self.file = file self.app = app self.version = version - if not self.head and check_head: - self.head = {'nohead': "true"} - else: - self.head = head self.body = body self.encode_data = b'' if gen_flag: diff --git a/plugins/input.py b/plugins/input.py index a379784..41beb8a 100644 --- a/plugins/input.py +++ b/plugins/input.py @@ -39,13 +39,28 @@ def _main(): raw_data = raw_data[index+1:] net_flag = True - first_index, last_index = find_the_last(raw_data) + first_index, last_index = find_index(raw_data) app = raw_data[:first_index] body = raw_data[last_index:] + + ihead = {} + if ';' in body and ':' in body: + ihead_index = body.index(';') + ihead_str = body[ihead_index+1:] + body = body[:ihead_index] + + ihead_list = ihead_str.split(',') + for key_value in ihead_list: + key, value = key_value.split(':') + ihead[key] = value + app = app.replace(' ', '') dp = Datapack(head={'from': __name__}) if net_flag: dp.head.update({'to': to}) + + dp.head.update(ihead) + dp.app = app if file_flag: @@ -59,20 +74,10 @@ def _main(): send_queue.put(dp) -def find_the_last(indata): # find the last ":" index - first_index = indata.index(':') - while True: - try: - next_index = indata[first_index+1:].index(':') - first_index += next_index + 1 - except: - break - last_index = copy.copy(first_index) - last_index += 1 - try: - while indata[last_index] == ' ': - last_index += 1 - except IndexError: +def find_index(raw_data): + first_index = raw_data.index(':') + last_index = first_index + 1 + while raw_data[last_index] == ' ': last_index += 1 return first_index, last_index diff --git a/plugins/net.py b/plugins/net.py index b045694..855cecd 100644 --- a/plugins/net.py +++ b/plugins/net.py @@ -16,7 +16,8 @@ RETRYSLEEP = 5 def main(): network_controller = Network_controller() network_controller.i_did_something() - + + class Network_controller: # manage id and connection def __init__(self): @@ -26,6 +27,11 @@ class Network_controller: # manage id and connection self.all_connection_list = [] self.wheel_queue = queue.Queue() + self.netlist = [] # store nagetive connection + self.addrlist = [] # store config connection + self.dhtlist = [] # store exchanged connection + self.proxylist = [] # store connection behind proxy + self.start_wheel_thread = threading.Thread(target=self.start_wheel, args=(), daemon=True) self.start_wheel_thread.start() @@ -34,6 +40,39 @@ class Network_controller: # manage id and connection self.start_sending_dp_thread = threading.Thread(target=self.start_sending_dp, args=(), daemon=True) self.start_sending_dp_thread.start() + + self.start_positive_connecting_thread = threading.Thread(target=self.start_positive_connecting, args=(), daemon=True) + self.start_positive_connecting_thread.start() + + + def start_positive_connecting(self): + self.read_addrlist() + for addr in self.addrlist: + self.try_to_connect(addr) + + + + def try_to_connect(self, addr): + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.connect(addr) + connection = Connection(conn, addr, self, positive=True) + connection.i_did_something() + + + + def read_addrlist(self): + if not os.path.exists('addrlist.txt'): + print('addrlist.txt not exists, config that base on addrlist_sample.txt') + else: + with open('addrlist.txt', 'r') as f: + raw_data = f.read() + raw_data = raw_data.replace('\r', '') + lines = raw_data.split('\n') + for line in lines: + ip, port = line.split(':') + port = int(port) + + self.addrlist.append((ip, port)) def i_did_something(self): # go f**k your yeallow line @@ -96,8 +135,7 @@ class Network_controller: # manage id and connection while True: conn, addr = s.accept() connection = Connection(conn, addr, self) - - self.all_connection_list.append(connection) + connection.i_did_something() def set_connection(self, id, connection): @@ -119,18 +157,19 @@ class Network_controller: # manage id and connection class Connection: - def __init__(self, conn, addr, netowrk_controller): + def __init__(self, conn, addr, netowrk_controller, positive=False): self.conn = conn self.addr = addr self.netowrk_controller = netowrk_controller self.id = None self.buff = b'' self.padding_queue = queue.Queue() + self.thread_send = None + self.positive = positive self.thread_recv = threading.Thread(target=self._init, args=(), daemon=True) self.thread_recv.start() - self.thread_send = None def _init(self): # init to check connection id, threading @@ -218,6 +257,13 @@ class Connection: 2: receive data failed 3: appname is not handshake ''' + if self.positive: + ndp = Datapack(head={'from': __name__}) + ndp.app = 'handshake' + ndp.encode() + print(ndp.encode_data.decode()) + self.conn.sendall(ndp.encode_data) + data = self.conn.recv(BUFFSIZE) if not data: return 2, '' @@ -246,15 +292,17 @@ class Connection: dp = self.padding_queue.get() dp.encode() if dp.method == 'file': - print('确认发送文件') self.conn.sendall(dp.encode_data) with open(dp.head['filename'], 'rb') as f: for data in f: - print('开始发送文件内容') self.conn.sendall(data) else: self.conn.sendall(dp.encode_data) + + def i_did_something(self): + pass + thread = threading.Thread(target=main, args=(), daemon=True) thread.start() diff --git a/plugins/update.py b/plugins/update.py index d57dbec..2b4c121 100644 --- a/plugins/update.py +++ b/plugins/update.py @@ -7,7 +7,7 @@ receive_queue = receive_queues[__name__] remove_file_list = ['__init__.py', 'netlist.txt', 'config.json', 'logger.log'] -remove_dir_list = ['.git', '.idea', '__pycache__'] +remove_dir_list = ['.git', '.idea', '__pycache__', 'resources'] def main(): @@ -16,27 +16,27 @@ def main(): if dp.method == 'post': if dp.body == b'compress': + # compressing file print('Starting update') compress = Compresser() - filelist = compress.get_filelist() - compress.compress_files(filelist) + compress.start_compress() print('Compress finished') + + # getting to destination + to = dp.head.get('update_to') + if not to: + print('unable to locate update_to') + continue - elif dp.body == b'all': - print('Start update other client') - compress = Compresser() - filelist = compress.get_filelist() - compress.compress_files(filelist) - print('Compress finished') + # sending file + ndp = Datapack(head={'from':__name__}) + ndp.method = 'file' + ndp.app = 'update' + ndp.head['filename'] = 'resources/update.tar.xz' + ndp.head['to'] = to - dp = Datapack(head={'from': __name__}) - dp.method = 'file' - dp.app = 'net:update' - dp.head['filename'] = 'resources/update.tar.xz' + send_queue.put(ndp) - dp.encode() - - send_queue.put(dp) elif dp.method == 'file': print('Starting update local file') @@ -49,6 +49,10 @@ class Compresser: def __init__(self): self.filelist = [] + def start_compress(self): + self.filelist = self.get_filelist() + self.compress_files(self.filelist) + def compress_files(self, filelist): with tarfile.open('resources/update.tar.xz', 'w:xz') as f: for name in filelist: