diff --git a/config.json b/config.json index cce834a..d7354f1 100644 --- a/config.json +++ b/config.json @@ -1,7 +1,7 @@ { "id": "miku", "listen_port": 3900, - "listen_addr": "127.0.0.1", + "listen_ip": "127.0.0.1", "listen_num": 39, - "recv_buff": 4096 + "buffsize": 4096 } \ No newline at end of file diff --git a/mswp.py b/mswp.py index b43708c..3cf6fe4 100644 --- a/mswp.py +++ b/mswp.py @@ -1,15 +1,34 @@ import os from config import jsondata +''' +A datapack must like: +--------------------- +post log msw/1.0 +id: miku +flag: 1a2b3c4d +length: 0 +from: hatsune +to: [if has] +filename: [if has] + +[data content here +if has +support many lines...] +--------------------- +''' + class Datapack: - def __init__(self, method='post', app='all', version='msw/1.0', head=None, body=b'', check_head=True, file=None): - self.id = jsondata.try_to_read_jsondata('id', 'Unknown_id') + def __init__(self, method='post', app='all', version='msw/0.1', head=None, body=b'', + check_head=True, file=None): + self.id = jsondata.try_to_read_jsondata('id', 'unknown_id') if head is None: head = {} self.head = head else: self.head = head - self.head['id'] = self.id + if self.id == 'unknown_id': + self.head['id'] = self.id self.method = method self.file = file self.app = app @@ -49,6 +68,5 @@ class Datapack: self.head[i] = ii if only_head: return self.encode_data[index+2:] - - -dp = Datapack() + else: + return None diff --git a/plugins/input.py b/plugins/input.py index 68eb346..fe4c256 100644 --- a/plugins/input.py +++ b/plugins/input.py @@ -12,7 +12,7 @@ def main(): file_flag = False raw_data = input() - if raw_data[:6] == '(file)': + if raw_data[:6] == '(file)': # like "(file)log: filename.exe" raw_data = raw_data[6:] file_flag = True @@ -40,7 +40,7 @@ def find_the_last(indata): # find the last ":" index try: next_index = indata[first_index+1:].index(':') first_index += next_index + 1 - except Exception as e: + except: break last_index = copy.copy(first_index) last_index += 1 diff --git a/plugins/net.py b/plugins/net.py index e624e00..635fdf9 100644 --- a/plugins/net.py +++ b/plugins/net.py @@ -1,5 +1,6 @@ import threading import socket +import copy import queue import os from mswp import Datapack @@ -7,293 +8,157 @@ from forwarder import receive_queues, send_queue from config import jsondata receive_queue = receive_queues[__name__] -RECV_BUFF = jsondata.try_to_read_jsondata('recv_buff', 4096) +BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID') def main(): - netrecv = Netrecv() - while True: - dp = receive_queue.get() - dp.encode() - netrecv.send_queue.put(dp) + network_controller = Network_controller() + network_controller.start_accpet_connection() + -def connect(addr): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(addr) - return s - - -def process_hostname(hostname): - ip = socket.gethostbyname(hostname) - return ip - - -def read_netlisttxt_file(): - try: - with open('netlist.txt', 'r') as f: - raw_data = f.read() - return raw_data - except Exception as e: - print('Error: %s, %s\n' - 'If you are the first time to run this program, \n' - 'Please use "netlist_sample.txt" to create "netlist.txt", \n' - 'Program will continue...' % (type(e), str(e))) - return '' - - -class Netrecv: +class Network_controller: # manage id and connection def __init__(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # initial socket, bind and listen, start to accept - addr = jsondata.try_to_read_jsondata('listen_addr', '127.0.0.1') - port = jsondata.try_to_read_jsondata('listen_port', 3900) - print('MSW now trying to bind the network %s, please allow it' % str((addr, port))) - s.bind((addr, port)) + self.send_queue = queue.Queue() + self.id_dict = {} + self.lock = threading.Lock() + self.all_connection_list = [] + + #self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=()) + #self.start_accpet_connection_thread.start() + + def start_accpet_connection(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + listen_ip = jsondata.try_to_read_jsondata('listen_ip', '127.0.0.1') + listen_port = jsondata.try_to_read_jsondata('listen_port', 3900) + s.bind((listen_ip, listen_port)) + listen_num = jsondata.try_to_read_jsondata('listen_num', 39) s.listen(listen_num) - self.s = s - self.stat = {} # # # # # - self.connection_list = [] # [(conn, addr), (conn, addr)...] Very important - self.connection_process_thread_list =[] - self.un_enougth_list = [] - self.send_queue = queue.Queue() - self.thread_check_accept_connection = threading.Thread(target=self.check_accept_connection, args=()) - self.thread_check_send_queue = threading.Thread(target=self.check_send_queue, args=()) + print('Sucessfully listen at %s:%s, max connection:%s' % (listen_ip, listen_port, listen_num)) - ######################################## - - raw_data = read_netlisttxt_file() # read file - lines = raw_data.split('\n') - self.file_addr_list = [] - for line in lines: - ip_port = line.split(':') - if len(ip_port) == 1: - ip = ip_port[0] - if not ip: # Check whether ip is null - continue - port = jsondata.get('listen_port') - if not port: - port = 3900 - ip = process_hostname(ip_port[0]) - port = int(ip_port[1]) - self.file_addr_list.append((ip, port)) - - for addr in self.file_addr_list: # Create connection - conn = connect(addr) - self.connection_list.append((conn, addr)) - - # Create receive thread and start - connection_thread = threading.Thread(target=self.process_connection, args=(conn, addr)) - self.connection_process_thread_list.append(connection_thread) - connection_thread.start() - - self.thread_check_accept_connection.start() - self.thread_check_send_queue.start() - - def check_send_queue(self): while True: - dp = receive_queue.get() + conn, addr = s.accept() + connection = Connection(conn, addr, self) - # debug code - if dp.body == b'stat': - print(self.stat) - continue + self.all_connection_list.append(connection) - if dp.method == 'file': - print('right') - print(dp.head) - dp.encode() - for id in self.stat: - for conn, addr in self.stat[id]: - conn.sendall(dp.encode_data) - file = open(dp.head['filename'], 'rb') - for data in file: - conn.send(data) - print('sended') - else: - print('wrong') - dp.encode() - for id in self.stat: - for conn, addr in self.stat[id]: - conn.sendall(dp.encode_data) + def set_connection(self, id, conn): + with self.lock: + if not self.id_dict.get(id): + self.id_dict[id] = [] + self.id_dict[id].append(conn) + + + def del_connection(self, id, conn): + with self.lock: + if id in self.id_dict: + if not self.id_dict.get(id): # if id has no connection + del(self.id_dict[id]) + else: + self.id_dict[id].remove(conn) + self.all_connection_list.remove(conn) + + + +class Connection: + def __init__(self, conn, addr, netowrk_controller): + self.conn = conn + self.addr = addr + self.netowrk_controller = netowrk_controller + self.id = None + self.buff = b'' + + self.thread = threading.Thread(target=self._init, args=()) + self.thread.start() + + + def _init(self): # init to check connection id, threading + err_code = self.check_id() + if err_code: + print('Init connection failed, connection closed') + self.conn.close() + + self.netowrk_controller.set_connection(self.id, self.conn) + + self.receive() + + + def receive(self): + still_need = 0 - def check_accept_connection(self): while True: - conn, addr = self.s.accept() - self.connection_list.append((conn, addr)) # # # # # - connection_thread = threading.Thread(target=self.process_connection, args=(conn, addr)) - self.connection_process_thread_list.append(connection_thread) - connection_thread.start() - - def remove_connection(self, conn, addr): - conn.close() - for id in self.stat: - if (conn, addr) in self.stat[id]: - self.stat[id].remove((conn, addr)) - self.connection_list.remove((conn, addr)) - print('Removed connection', str(addr)) - - def say_hello(self, conn, addr): - dp = Datapack(head={'from': __name__}) - dp.app = 'net' - dp.encode() - conn.sendall(dp.encode_data) - print('hello package has been sent') - - def process_connection(self, conn, addr): - self.say_hello(conn, addr) - - print('Connection accept %s' % str(addr)) - data = b'' - while True: - try: - new_data = conn.recv(RECV_BUFF) - except ConnectionResetError: - print('Connection Reset Error') - self.remove_connection(conn, addr) - return - - if not new_data and not data: - self.remove_connection(conn, addr) - print('return 1') - return - data += new_data - - while True: - - # try unpack # - dp = Datapack(check_head=False) - dp.encode_data = data - print('data', data) + print(still_need) + data = self.conn.recv(BUFFSIZE) + if not data: + break + self.buff += data + + if not still_need: + dp = Datapack() + dp.encode_data = self.buff try: - if data: - data = dp.decode(only_head=True) - print('decode success') - else: - print('Null data') - break + self.buff = dp.decode(only_head=True) + + if dp.method == 'file' and os.path.exists(dp.head['filename']): + os.remove(dp.head['filename']) + except Exception as e: - print('Decode error %s: %s' % (type(e), str(e))) - break - # try unpack # - - # net config data package - if dp.app == 'net': - - dp_id = dp.head['id'] - local_id = self.stat.get(dp_id) - - if not local_id: # create if not exits - self.stat[dp_id] = [] - - if not (conn, addr) in self.stat[dp_id]: - self.stat[dp_id].append((conn, addr)) - + print('Decode head failed %s: %s' % (type(e), str(e))) continue + length = int(dp.head.get('length')) + still_need = length + + if still_need <= len(self.buff): # first download complete setuation if dp.method == 'file': - length = int(dp.head['length']) - data_length = len(data) + with open(dp.head['filename'], 'ab') as f: + f.write(self.buff[:still_need]) + else: + dp.body = self.buff[:still_need] + self.buff = self.buff[still_need:] + still_need = 0 + else: # writing tmp data + if dp.method == 'file': + with open(dp.head['filename'], 'ab') as f: + still_need -= f.write(self.buff) + else: + dp.body += self.buff + still_need -= len(self.buff) + self.buff = b'' # empty buff because all tmp data has been write - # 3 condition - if length == data_length: - file = open(dp.head['filename'], 'wb') - file.write(data) - data = b'' - # return package needed - elif length > data_length: - file = open(dp.head['filename'], 'wb') - aleady_write_down = 0 - file.write(data) - aleady_write_down += len(data) - data = b'' + + # below code are using to closed connection + self.conn.close() - while aleady_write_down < length: - new_data = conn.recv(RECV_BUFF) - if not new_data: - print('return 22') - return + + def check_id(self): + ''' + return code + 0 ok + 1 unknown id + 2 connection closed + ''' - new_data_size = len(new_data) - still_need = length - aleady_write_down - print(still_need) + data = self.conn.recv(BUFFSIZE) + if not data: + return 2 - if new_data_size == still_need: # 3 condition of new_data - print('right') - file.write(new_data) - aleady_write_down += new_data_size + self.buff += data + dp = Datapack() + dp.encode_data = self.buff # maybe here needs to use copy.copy(self.buff) + self.buff = dp.decode(only_head=True) + if not dp.head.get('id'): + return 1 + self.id = dp.head['id'] - elif new_data_size < still_need: - file.write(new_data) - aleady_write_down += new_data_size - elif new_data_size > still_need: - file.write(new_data[:still_need]) - aleady_write_down += still_need - data = new_data[still_need:] - - else: - file = open(dp.head['filename'], 'wb') - file.write(data[:length]) - data = data[length:] - - file.close() - dp.encode_data = b'' - send_queue.put(dp) - - else: # normal data pack - length = int(dp.head['length']) - data_length = len(data) - - # 3 condition - if length == data_length: - print('=') - dp.body = data - data = b'' - - elif length > data_length: - while data_length < length: - new_data = conn.recv(RECV_BUFF) - if not new_data: - print('return 2') - return - - new_data_size = len(new_data) - still_need = length - data_length - print(still_need) - - if new_data_size == still_need: - print('data', data) - print('net_data', new_data) - data += new_data - data_length = len(data) - dp.body = data - data = b'' - - elif new_data_size < still_need: - print('data', data) - print('net_data', new_data) - data += new_data - data_length = len(data) - - else: - print('else') - data += new_data[:still_need] - new_data = new_data[still_need:] - data_length = len(data) - dp.body = data - data = new_data - - else: - dp.body = data[:length] - data = data[length:] - - dp.encode() - send_queue.put(dp) - print('###############\n' + dp.encode_data.decode() + '\n###############') + def sendall(self, data): + self.conn.sendall(data) thread = threading.Thread(target=main, args=()) diff --git a/test_file.py b/test_file.py index 00c3f8c..08ffc19 100644 --- a/test_file.py +++ b/test_file.py @@ -1,7 +1,13 @@ import socket import time -data = '''file log msw/1.0 +data = '''post id msw/0.1 +id: miku +length: 0 +from: test_software +flag: 1a2b3c4d + +file log msw/1.0 from: network flag: abcdefgh filename: download.txt @@ -30,6 +36,9 @@ for i in data_list: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 3900)) +n=0 for i in code_list: + n+=1 s.sendall(i) + print('发送%s' % n) time.sleep(1) \ No newline at end of file