Files
msw/plugins/net.py
2020-03-31 18:22:59 +08:00

377 lines
12 KiB
Python

import threading
import socket
import copy
import queue
import json
import os
import time
from mswp import Datapack
from forwarder import receive_queues, send_queue
from config import jsondata
from config import dprint as print
receive_queue = receive_queues[__name__]
BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096)
ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID')
RETRYSLEEP = 5
def main():
network_controller = Network_controller()
network_controller.i_did_something()
class Network_controller: # manage id and connection
def __init__(self):
self.send_queue = queue.Queue()
self.id_dict = {}
self.lock = threading.Lock()
self.all_connection_list = []
self.wheel_queue = queue.Queue()
self.netlist = [] # store nagetive connection
self.addrlist = [] # store config connection
self.dhtlist = [] # store exchanged connection
self.proxylist = [] # store connection behind proxy
self.start_wheel_thread = threading.Thread(target=self.start_wheel, args=(), daemon=True)
self.start_wheel_thread.start()
self.start_accpet_connection_thread = threading.Thread(target=self.start_accpet_connection, args=(), daemon=True)
self.start_accpet_connection_thread.start()
self.start_sending_dp_thread = threading.Thread(target=self.start_sending_dp, args=(), daemon=True)
self.start_sending_dp_thread.start()
self.start_positive_connecting_thread = threading.Thread(target=self.start_positive_connecting, args=(), daemon=True)
self.start_positive_connecting_thread.start()
self.start_mht_thread = threading.Thread(target=self.start_mht, args=(), daemon=True)
self.start_mht_thread.start()
def start_mht(self):
while True:
dp = Datapack(head={'from': __name__})
dp.head['to'] = '*'
dp.app = 'net'
dp.method = 'get'
dp.body = b'mht'
print('Send mht request', dp)
send_queue.put(dp)
time.sleep(10)
def start_positive_connecting(self):
self.read_addrlist()
for addr in self.addrlist:
self.try_to_connect(addr)
def try_to_connect(self, addr):
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
conn.connect(addr)
except Exception as e:
print('Connect to %s failed, %s: %s' % (str(addr), type(e), str(e)))
return
connection = Connection(conn, addr, self, positive=True)
connection.i_did_something()
def read_addrlist(self):
if not os.path.exists('addrlist.txt'):
print('addrlist.txt not exists, config that base on addrlist_sample.txt')
else:
with open('addrlist.txt', 'r') as f:
raw_data = f.read()
raw_data = raw_data.replace('\r', '')
lines = raw_data.split('\n')
for line in lines:
ip, port = line.split(':')
port = int(port)
self.addrlist.append((ip, port))
def i_did_something(self): # go f**k your yeallow line
pass
def process_command(self, dp):
if dp.body == b'status':
print('Online %s' % str(list(self.id_dict.keys())))
elif dp.body == b'mht' and dp.method == 'get':
ndp = dp.reply()
connection_list = []
with self.lock:
for id in self.id_dict:
connections = self.id_dict[id]
for connection in connections:
ip, port = connection.conn.getpeername()
listen_port = connection.listen_port
connection_list.append((ip, listen_port))
ndp.body = json.dumps(connection_list).encode()
send_queue.put(ndp)
else:
print('Received unknown command', dp)
def start_sending_dp(self):
while True:
dp = receive_queue.get()
if dp.app == 'net' and not dp.head.get('to'):
self.process_command(dp)
continue
to_str = dp.head['to']
to_list = to_str.split('&')
to = to_list.pop(0)
to_str = '&'.join(to_list)
dp.head['to'] = to_str
if to == '*':
with self.lock:
for id in self.id_dict:
connection = self.id_dict[id][0]
connection.sendall(dp)
else:
connections = self.id_dict.get(to)
if not connections:
if to == ID:
print('To id %s is yourself!' % to, dp)
continue
print('To id %s has no connection now' % to, dp)
self.wheel_queue.put(dp)
continue
connection = connections[0]
connection.sendall(dp)
def start_wheel(self):
while True:
dp = self.wheel_queue.get()
time.sleep(RETRYSLEEP)
receive_queue.put(dp)
def start_accpet_connection(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_ip = jsondata.try_to_read_jsondata('listen_ip', '127.0.0.1')
listen_port = jsondata.try_to_read_jsondata('listen_port', 3900)
s.bind((listen_ip, listen_port))
listen_num = jsondata.try_to_read_jsondata('listen_num', 39)
s.listen(listen_num)
print('Sucessfully listen at %s:%s, max connection:%s' % (listen_ip, listen_port, listen_num))
while True:
conn, addr = s.accept()
connection = Connection(conn, addr, self)
connection.i_did_something()
def set_connection(self, id, connection):
with self.lock:
if not self.id_dict.get(id):
self.id_dict[id] = []
self.id_dict[id].append(connection)
self.all_connection_list.append(connection)
print('%s connected' % id)
def del_connection(self, id, connection):
with self.lock:
self.id_dict[id].remove(connection)
self.all_connection_list.remove(connection)
if id in self.id_dict and not self.id_dict.get(id): # del the empty user
del(self.id_dict[id])
print('%s disconnected' % id)
class Connection:
def __init__(self, conn, addr, netowrk_controller, positive=False):
self.conn = conn
self.addr = addr
self.netowrk_controller = netowrk_controller
self.id = None
self.buff = b''
self.padding_queue = queue.Queue()
self.thread_send = None
self.positive = positive
self.listen_port = None
self.thread_recv = threading.Thread(target=self._init, args=(), daemon=True)
self.thread_recv.start()
def _init(self): # init to check connection id, threading
err_code, flag = self.check_id()
if err_code:
print('<%s> Init connection failed, connection closed, code: %s' % (flag, err_code))
self.conn.close()
self.netowrk_controller.set_connection(self.id, self)
self.thread_send = threading.Thread(target=self.send_func, args=(), daemon=True)
self.thread_send.start()
self.receive()
def receive(self):
still_need = 0
while True:
try:
data = self.conn.recv(BUFFSIZE)
except ConnectionResetError:
break
except Exception as e:
print('Connection recv error %s: %s' % (type(e), str(e)))
break
if not data:
break
self.buff += data
if not still_need:
dp = Datapack()
dp.encode_data = self.buff
try:
self.buff = dp.decode(only_head=True)
if dp.method == 'file' and os.path.exists(dp.head['filename']):
os.remove(dp.head['filename'])
except Exception as e:
print('Decode head failed %s: %s' % (type(e), str(e)))
continue
length = int(dp.head.get('length'))
still_need = length
if still_need > len(self.buff):
# writing tmp data
if dp.method == 'file':
with open(dp.head['filename'], 'ab') as f:
still_need -= f.write(self.buff)
else:
dp.body += self.buff
still_need -= len(self.buff)
self.buff = b'' # empty buff because all tmp data has been write
else: # download complete setuation
if dp.method == 'file':
with open(dp.head['filename'], 'ab') as f:
f.write(self.buff[:still_need])
else:
dp.body = self.buff[:still_need]
self.buff = self.buff[still_need:]
still_need = 0
# bleow code are using to process datapack
if dp.method == 'file':
print('Received file %s' % dp.head['filename'], dp)
send_queue.put(dp)
# below code are using to closed connection
self.conn.close()
self.netowrk_controller.del_connection(self.id, self)
def check_id(self):
'''
check id package must like
-------------------------------
post handshake msw/0.1
id: [yourID]
listen_port: [3900]
length: 0
-------------------------------
error code list:
1: not get "id" in head
2: receive data failed
3: appname is not handshake
4: id is yourself
'''
data = None
if self.positive:
self.send_id()
try:
data = self.conn.recv(BUFFSIZE)
except ConnectionResetError:
print('One connection failed before ID check')
if not data:
return 2, ''
self.buff += data
dp = Datapack()
dp.encode_data = self.buff # maybe here needs to use copy.copy(self.buff)
self.buff = dp.decode(only_head=True)
if not dp.head.get('id'):
return 1, dp.head.get('flag')
if not dp.app == 'handshake':
return 3, dp.head.get('flag')
self.id = dp.head['id']
self.listen_port = dp.head.get('listen_port')
if self.id == jsondata.try_to_read_jsondata('id', 'unknown_id'):
print('you connect to your self')
return 4, dp.head.get('flag')
if not self.positive:
self.send_id()
return 0, dp.head.get('flag')
def send_id(self):
dp = Datapack(head={'from': __name__})
dp.app = 'handshake'
dp.head['listen_port'] = str(jsondata.try_to_read_jsondata('listen_port', 3900))
dp.encode()
self.conn.sendall(dp.encode_data)
def sendall(self, dp):
self.padding_queue.put(dp)
def send_func(self):
while True:
dp = self.padding_queue.get()
dp.encode()
if dp.method == 'file':
self.conn.sendall(dp.encode_data)
with open(dp.head['filename'], 'rb') as f:
for data in f:
self.conn.sendall(data)
else:
self.conn.sendall(dp.encode_data)
def i_did_something(self):
pass
thread = threading.Thread(target=main, args=(), daemon=True)
thread.start()