diff --git a/forwarder.py b/forwarder.py index 3cd2680..b45957a 100644 --- a/forwarder.py +++ b/forwarder.py @@ -25,8 +25,8 @@ def send_queue_function(): if dp.app == 'all': for q in receive_queues: receive_queues[q].put(dp) - elif ',' in dp.app: - applist = dp.app.split(',') + elif '&' in dp.app: + applist = dp.app.split('&') dp_list = [] for i in range(len(applist)): # split dp new_dp = copy.copy(dp) @@ -35,6 +35,8 @@ def send_queue_function(): for new_dp in dp_list: object_app, new_dp = process_reforware(new_dp) receive_queues[add_plugins_string(object_app)].put(new_dp) + elif 'to' in dp.head: # send to net if "to" avaliable + receive_queues[add_plugins_string('net')].put(dp) else: object_app, dp = process_reforware(dp) receive_queues[add_plugins_string(object_app)].put(dp) diff --git a/mswp.py b/mswp.py index 3cf6fe4..6bdf27f 100644 --- a/mswp.py +++ b/mswp.py @@ -5,11 +5,11 @@ from config import jsondata A datapack must like: --------------------- post log msw/1.0 -id: miku -flag: 1a2b3c4d -length: 0 -from: hatsune -to: [if has] +id: miku [auto] +flag: 1a2b3c4d [auto] +length: 0 [auto] +from: appname +to: [if has (net id)] filename: [if has] [data content here @@ -18,6 +18,8 @@ support many lines...] --------------------- ''' +BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) + class Datapack: def __init__(self, method='post', app='all', version='msw/0.1', head=None, body=b'', check_head=True, file=None): @@ -42,12 +44,19 @@ class Datapack: def encode(self): if self.method == 'file': + self.body = b'' self.head['length'] = str(os.path.getsize(self.head['filename'])) else: self.head['length'] = str(len(self.body)) first_line = self.method.encode() + b' ' + self.app.encode() + b' ' + self.version.encode() heads = ''.encode() + needed_to_del = [] + for i in self.head: # del the empty head + if not self.head[i]: + needed_to_del.append(i) + for i in needed_to_del: + del(self.head[i]) for i in self.head: heads += i.encode() + b': ' + self.head[i].encode() + b'\n' self.encode_data = first_line + b'\n' + heads + b'\n' + self.body diff --git a/plugins/input.py b/plugins/input.py index fe4c256..2dbc03b 100644 --- a/plugins/input.py +++ b/plugins/input.py @@ -10,17 +10,26 @@ def main(): file_flag = False while True: file_flag = False + net_flag = False raw_data = input() if raw_data[:6] == '(file)': # like "(file)log: filename.exe" raw_data = raw_data[6:] file_flag = True + if raw_data[:5] == '(net ': # like "(net miku)log: hello" or "(file)(net miku)log: filename.exe" + index = raw_data.index(')') + to = raw_data[5:index] + raw_data = raw_data[index+1:] + net_flag = True + first_index, last_index = find_the_last(raw_data) app = raw_data[:first_index] body = raw_data[last_index:] app = app.replace(' ', '') dp = Datapack(head={'from': __name__}) + if net_flag: + dp.head.update({'to': to}) dp.app = app if file_flag: diff --git a/plugins/net.py b/plugins/net.py index 635fdf9..c4d56f9 100644 --- a/plugins/net.py +++ b/plugins/net.py @@ -3,6 +3,7 @@ import socket import copy import queue import os +import time from mswp import Datapack from forwarder import receive_queues, send_queue from config import jsondata @@ -10,23 +11,76 @@ receive_queue = receive_queues[__name__] BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID') +RETRYSLEEP = 5 def main(): network_controller = Network_controller() - network_controller.start_accpet_connection() + network_controller.i_did_something() - class Network_controller: # manage id and connection def __init__(self): self.send_queue = queue.Queue() self.id_dict = {} self.lock = threading.Lock() self.all_connection_list = [] + self.wheel_queue = queue.Queue() - #self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=()) - #self.start_accpet_connection_thread.start() + self.start_wheel_thread = threading.Thread(target=self.start_wheel, args=()) + self.start_wheel_thread.start() + + self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=()) + self.start_accpet_connection_thread.start() + + self.start_sending_dp_thread = threading.Thread(target=self.start_sending_dp, args=()) + self.start_sending_dp_thread.start() + + def i_did_something(self): # go f**k your yeallow line + pass + + + def process_command(self, dp): + if dp.body == b'status': + print('Online %s' % self.id_dict) + + + def start_sending_dp(self): + while True: + dp = receive_queue.get() + + if dp.app == 'net': + self.process_command(dp) + continue + + to_str = dp.head['to'] + to_list = to_str.split(':') + to = to_list.pop() + + connections = self.id_dict.get(to) + if not connections: + if to == ID: + print('To id %s is yourself!' % to) + continue + print('To id %s has no connection now' % to) + self.wheel_queue.put(dp) + continue + + to_str = ':'.join(to_list) + dp.head['to'] = to_str + dp.encode() + + connection = connections[0] + connection.sendall(dp) + + + def start_wheel(self): + while True: + dp = self.wheel_queue.get() + time.sleep(RETRYSLEEP) + receive_queue.put(dp) + + def start_accpet_connection(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -46,23 +100,23 @@ class Network_controller: # manage id and connection self.all_connection_list.append(connection) - def set_connection(self, id, conn): + def set_connection(self, id, connection): with self.lock: if not self.id_dict.get(id): self.id_dict[id] = [] - self.id_dict[id].append(conn) + self.id_dict[id].append(connection) + self.all_connection_list.append(connection) + print('%s has connected' % id) - def del_connection(self, id, conn): + def del_connection(self, id, connection): with self.lock: - if id in self.id_dict: - if not self.id_dict.get(id): # if id has no connection - del(self.id_dict[id]) - else: - self.id_dict[id].remove(conn) - self.all_connection_list.remove(conn) + self.id_dict[id].remove(connection) + self.all_connection_list.remove(connection) + if id in self.id_dict and not self.id_dict.get(id): # del the empty user + del(self.id_dict[id]) + print('%s disconnected' % id) - class Connection: def __init__(self, conn, addr, netowrk_controller): @@ -71,9 +125,12 @@ class Connection: self.netowrk_controller = netowrk_controller self.id = None self.buff = b'' + self.padding_queue = queue.Queue() - self.thread = threading.Thread(target=self._init, args=()) - self.thread.start() + self.thread_recv = threading.Thread(target=self._init, args=()) + self.thread_recv.start() + + self.thread_send = None def _init(self): # init to check connection id, threading @@ -82,8 +139,11 @@ class Connection: print('Init connection failed, connection closed') self.conn.close() - self.netowrk_controller.set_connection(self.id, self.conn) + self.netowrk_controller.set_connection(self.id, self) + self.thread_send = threading.Thread(target=self.send_func, args=()) + self.thread_send.start() + self.receive() @@ -91,8 +151,13 @@ class Connection: still_need = 0 while True: - print(still_need) - data = self.conn.recv(BUFFSIZE) + try: + data = self.conn.recv(BUFFSIZE) + except ConnectionResetError: + break + except Exception as e: + print('Connection recv error %s: %s' % (type(e), str(e))) + break if not data: break self.buff += data @@ -129,21 +194,26 @@ class Connection: dp.body += self.buff still_need -= len(self.buff) self.buff = b'' # empty buff because all tmp data has been write - + + # bleow code are using to process datapack + send_queue.put(dp) # below code are using to closed connection self.conn.close() + self.netowrk_controller.del_connection(self.id, self) def check_id(self): ''' - return code - 0 ok - 1 unknown id - 2 connection closed + check id package must like + ------------------------------- + post handshake msw/0.1 + id: [yourID] + length: 0 + + ------------------------------- ''' - data = self.conn.recv(BUFFSIZE) if not data: return 2 @@ -157,8 +227,23 @@ class Connection: self.id = dp.head['id'] - def sendall(self, data): - self.conn.sendall(data) + def sendall(self, dp): + self.padding_queue.put(dp) + + + def send_func(self): + while True: + dp = self.padding_queue.get() + dp.encode() + if dp.method == 'file': + print('确认发送文件') + self.conn.sendall(dp.encode_data) + with open(dp.head['filename'], 'rb') as f: + for data in f: + print('开始发送文件内容') + self.conn.sendall(data) + else: + self.conn.sendall(dp.encode_data) thread = threading.Thread(target=main, args=()) diff --git a/test.py b/test.py new file mode 100644 index 0000000..d35b671 --- /dev/null +++ b/test.py @@ -0,0 +1,80 @@ +import time +import threading +import socket +import queue +import sys +send_queue = queue.Queue() + +def recv(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(('127.0.0.1', 3900)) + s.listen(39) + while True: + conn, addr = s.accept() + thread = threading.Thread(target=process_connection, args=(conn, addr), daemon=True) + thread.start() + +def process_connection(conn, addr): + while True: + data = conn.recv(4096) + if not data: + conn.close() + return + check_data.queue.put(data) + time.sleep(1) + +class Check_data: + def __init__(self): + self.queue = queue.Queue() + self.thread = threading.Thread(target=self.recv, args=(), daemon=True) + self.thread.start() + + def recv(self): + while True: + data = self.queue.get() + s_print(data) + +def send(size, c): + data = c*size + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', 3900)) + s_print('start sending %s' % c) + + start_time = time.time() + s.sendall(data) + end_time = time.time() + + s_print('Send %s finished, take %s' % (c, end_time - start_time)) + +def print_queue(): + while True: + word = send_queue.get() + print(word) + +def s_print(data): + send_queue.put(data) + +check_data = Check_data() + +time.sleep(1) + +thread_print = threading.Thread(target=print_queue, args=(), daemon=True) +thread_print.start() + + +thread_recv = threading.Thread(target=recv, args=(), daemon=True) +thread_recv.start() +print('recv thread started') +time.sleep(1) + +thread_send_1 = threading.Thread(target=send, args=(100000000, b'1'), daemon=True) +thread_send_2 = threading.Thread(target=send, args=(100000000, b'2'), daemon=True) + +thread_send_1.start() +thread_send_2.start() + +input() +sys.exit() + +# 结论,多线程同时对一个socket.sendall()调用,会导致数据混乱 diff --git a/test_file.py b/test_file.py index 08ffc19..60dbf87 100644 --- a/test_file.py +++ b/test_file.py @@ -8,7 +8,7 @@ from: test_software flag: 1a2b3c4d file log msw/1.0 -from: network +from: test flag: abcdefgh filename: download.txt num: 1/1 diff --git a/test_tool.py b/test_tool.py index f02d4d4..742464c 100644 --- a/test_tool.py +++ b/test_tool.py @@ -3,35 +3,18 @@ import threading s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.bind(('127.0.0.1', 3966)) -s.listen(100) - -id = '''post net msw/1.0 +id = b'''post handshake msw/1.0 id: miku2 -from: test length: 0 ''' -id = id.encode() +s.connect(('127.0.0.1',3900)) +s.sendall(id) -def process(conn, addr): - conn.sendall(id) - print('accept connection from', str(addr)) - while True: - data = conn.recv(4096) - if not data: - conn.close() - return - try: - data = data.decode() - print(data) - except UnicodeDecodeError: - print('Decode error') - print(data[:39]) +print(s.recv(4096).decode(), end='') +print(s.recv(4096).decode()) + +input('finished...') - -while True: - conn, addr = s.accept() - threading.Thread(target=process, args=(conn, addr)).start()