diff --git a/mswp.py b/mswp.py index 497ece0..75811d6 100644 --- a/mswp.py +++ b/mswp.py @@ -39,12 +39,15 @@ class Datapack: self.method = method self.file = file self.delete = delete + self.failed_times = 0 self.app = app self.version = version self.body = body self.encode_data = b'' if self.head.get('from'): self.head['from'] = process_plugins_name(self.head['from']) + else: + self.head['from'] = 'unkonwn_app' if gen_flag: randseed = str(random.random()).encode() h = hashlib.sha1() diff --git a/plugins/ffmpeg.py b/plugins/ffmpeg.py index 6d3bb77..4aab044 100644 --- a/plugins/ffmpeg.py +++ b/plugins/ffmpeg.py @@ -11,6 +11,24 @@ from config import dprint as print receive_queue = receive_queues[__name__] +''' +Usage: +ffmpeg: start;filename:res/test.mp4 +start using one file test.mp4 + +ffmpeg: autostart +auto start one file from res/ffmpeg_task + +ffmpeg: start;filename:res/output.mp4,concat:false +using tem file, output should be res/output_convert +ffmpeg: stop + +ffmpeg: enable;server:miku +ffmpeg: disable + +''' + + def main(): while True: ffmpeg_controller = Ffmpeg_controller() @@ -22,12 +40,15 @@ class Ffmpeg_controller: self.ffmpeg_type = None self.status = 0 self.server = None - self.conver_task_queue = queue.Queue() + self.convert_task_queue = queue.Queue() self.org_filename = None + self.object_filename = None self.concat = True + self.pause = False + self.tasklist = [] - self.conver_task_thread = threading.Thread(target=self.conver_task_func, args=()) - self.conver_task_thread.start() + self.convert_task_thread = threading.Thread(target=self.convert_task_func, args=()) + self.convert_task_thread.start() self.mainloop() @@ -35,10 +56,31 @@ class Ffmpeg_controller: def mainloop(self): _create_floder('res/ffmpeg_tmp') _create_floder('res/ffmpeg_finished') + _create_floder('res/ffmpeg_task') + _create_floder('res/ffmpeg_complet') while True: dp = receive_queue.get() + if dp.method == 'post' and dp.body == b'concat': + self.org_filename = dp.head['filename'] + self.object_filename = self.org_filename[:-4] + '.mkv' + self.concat_func() + + if dp.method == 'post' and dp.body == b'autostart': + filelist = os.listdir('res/ffmpeg_task') + self.tasklist = [] + for file in filelist: + if len(file) > 3: + ext = file[-4:] + if ext in ['.mp4', '.MP4', '.mkv', '.MKV']: + self.tasklist.append('res/ffmpeg_task/' + file) + dp = Datapack() + dp.app = 'ffmpeg' + dp.body = b'start' + dp.head['filename'] = self.tasklist.pop(0) + send_queue.put(dp) + if dp.method == 'post' and dp.body == b'start': # config ffmpeg is server or client if dp.head.get('concat'): if dp.head['concat'] == 'true': @@ -48,8 +90,12 @@ class Ffmpeg_controller: else: print('unknown concat value') continue - - self.org_filename = dp.head['filename'] + else: + self.concat = True + + if self.concat: + self.org_filename = dp.head['filename'] + self.object_filename = 'res/ffmpeg_complet/' + os.path.basename(self.org_filename)[:-4] + '.mkv' if self.concat: ndp = dp.reply() @@ -57,35 +103,23 @@ class Ffmpeg_controller: ndp.body = ndp.body.encode() send_queue.put(ndp) - cmd = 'ffmpeg -i ' + dp.head['filename'] + ' -c copy -f segment -segment_time 20 \ - -reset_timestamps 1 -y res/ffmpeg_tmp/' + '%d' + '.mkv' + cmd = 'ffmpeg -i "' + os.path.normpath(dp.head['filename']) + '" -c copy \ + -f segment -segment_time 20 -reset_timestamps 1 -y \ + "res/ffmpeg_tmp/' + '%d' + '.mkv"' os.system(cmd) self.run_as_server() if self.concat: - # concat all file - filelist = os.listdir('res/ffmpeg_finished') - if 'filelist.txt' in filelist: - filelist.remove('filelist.txt') - with open('res/ffmpeg_finished/filelist.txt', 'w') as f: - for file in filelist: - f.write('file \'%s\'\n' % file) - object_filename = self.org_filename[:-4] + '_conver.mkv' - subprocess.check_output('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \ - -c copy -y ' + object_filename, shell=True) - - for file in filelist: - os.remove('res/ffmpeg_finished/' + file) - os.remove('res/ffmpeg_finished/filelist.txt') + self.concat_func() print('All process finished') elif dp.method == 'post' and dp.body == b'enable': # clinet mode self.status = 1 self.server = dp.head['server'] - self.conver_func() + self.convert_func() elif dp.method == 'post' and dp.body == b'status': result = 'ffmpeg not working' @@ -103,6 +137,30 @@ class Ffmpeg_controller: send_queue.put(ndp) + def concat_func(self): + # concat all file + _filelist = os.listdir('res/ffmpeg_finished') + if 'filelist.txt' in _filelist: + _filelist.remove('filelist.txt') + + # correct order + filelist = [] + for filenum in range(len(_filelist)): + filelist.append(str(filenum) + '.mkv') + + with open('res/ffmpeg_finished/filelist.txt', 'w') as f: + for file in filelist: + f.write('file \'%s\'\n' % file) + + os.system('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \ + -c copy -y "' + os.path.normpath(self.object_filename) + '"') + + for file in filelist: + os.remove('res/ffmpeg_finished/' + file) + pass + os.remove('res/ffmpeg_finished/filelist.txt') + + def run_as_server(self): _padding_to_convert = os.listdir('res/ffmpeg_tmp') padding_to_convert = [] @@ -120,12 +178,27 @@ class Ffmpeg_controller: result += 'padding_to_convert ' + str(padding_to_convert) + '\n' result += 'already_in_convert ' + str(already_in_convert) + '\n' result += 'finished_convert ' + str(finished_convert) + '\n' - result += 'conver_task_queue size ' + str(self.conver_task_queue.qsize()) + result += 'convert_task_queue size ' + str(self.convert_task_queue.qsize()) ndp = dp.reply() ndp.body = result.encode() send_queue.put(ndp) + elif dp.method == 'post' and dp.body == b'stop': + break + + elif dp.method == 'post' and dp.body == b'pause': + self.pause = True + + elif dp.method == 'post' and dp.body == b'continue': + self.pause = False + elif dp.method == 'get': + if self.pause: + ndp = dp.reply() + ndp.method = 'post' + ndp.body = b'disable' + send_queue.put(ndp) + continue if padding_to_convert: filename = padding_to_convert.pop(0) already_in_convert.append(filename) @@ -145,6 +218,7 @@ class Ffmpeg_controller: ndp = dp.reply() ndp.method = 'post' ndp.body = b'disable' + send_queue.put(ndp) elif dp.method == 'file': old_filename = dp.head['old_filename'] @@ -154,6 +228,13 @@ class Ffmpeg_controller: already_in_convert.remove(old_filename) finished_convert.append(filename) + total = len(padding_to_convert) + len(already_in_convert) + len(finished_convert) + print('Processing...(%d) %d/%d %s' % \ + (len(already_in_convert), \ + len(finished_convert), \ + total, \ + str(round(len(finished_convert)/total*100, 2)))) + if not padding_to_convert and not already_in_convert: # final process break @@ -162,36 +243,36 @@ class Ffmpeg_controller: - def conver_func(self): # run as client + def convert_func(self): # run as client for _ in range(2): self.send_request() - while self.status or not self.conver_task_queue.empty(): + while self.status or not self.convert_task_queue.empty(): dp = receive_queue.get() if dp.method == 'post' and dp.body == b'disable': self.status = 0 elif dp.method == 'post' and dp.body == b'status': - result = 'Working as client, queue size: %s' % str(self.conver_task_queue.qsize()) + result = 'Working as client, queue size: %s' % str(self.convert_task_queue.qsize()) ndp = dp.reply() ndp.body = result.encode() send_queue.put(ndp) elif dp.method == 'file': - self.conver_task_queue.put(dp) + self.convert_task_queue.put(dp) - def conver_task_func(self): + def convert_task_func(self): while True: - dp = self.conver_task_queue.get() + dp = self.convert_task_queue.get() filename = dp.head['filename'] - output_filename = filename[:-4] + '_conver.mkv' + output_filename = filename[:-4] + '.mkv' output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished') - os.system('ffmpeg -i ' + filename + ' -c:a libopus -ab 64k \ - -c:v libx265 -s 1280x720 -y ' + output_filename) + os.system('ffmpeg -i "' + os.path.normpath(filename) + '" -c:a libopus -ab 64k \ + -c:v libx265 -s 1280x720 -y "' + os.path.normpath(output_filename) + '"') os.remove(filename) diff --git a/plugins/input.py b/plugins/input.py index 042d20b..a4c253d 100644 --- a/plugins/input.py +++ b/plugins/input.py @@ -24,6 +24,7 @@ def print_reply_func(): def _main(): + last = '' file_flag = False while True: file_flag = False @@ -38,9 +39,13 @@ def _main(): if raw_data == 'update': raw_data = 'update:compress;update_to:*' if raw_data == '1': - raw_data = 'ffmpeg:start;filename:test.mp4,concat:false' + raw_data = 'ffmpeg:autostart' if raw_data == '2': raw_data = 'ffmpeg:enable;to:*,server:miku' + if raw_data == 'r': + raw_data = last + + last = raw_data if raw_data[:6] == '(file)': # like "(file)log: filename.exe" raw_data = raw_data[6:] diff --git a/plugins/net.py b/plugins/net.py index e457169..99761ad 100644 --- a/plugins/net.py +++ b/plugins/net.py @@ -14,7 +14,7 @@ receive_queue = receive_queues[__name__] BUFFSIZE = jsondata.try_to_read_jsondata('buffsize', 4096) ID = jsondata.try_to_read_jsondata('id', 'Unknown_ID') -RETRYSLEEP = 5 +RETRYSLEEP = 3.9 MYPROXY = jsondata.try_to_read_jsondata('proxy', False) ONLYPROXY = jsondata.try_to_read_jsondata('onlyproxy', False) @@ -84,17 +84,19 @@ class Network_controller: # manage id and connection for addr in self.conflist: self.try_to_connect(addr, conntype='normal') - time.sleep(3) + time.sleep(3.9) for addr in self.mhtlist: self.try_to_connect(addr, conntype='mht') - time.sleep(3) - - time.sleep(4) - + time.sleep(3.9) def try_to_connect(self, addr, conntype='normal'): + thread = threading.Thread(target=self._try_to_connect, args=(addr, conntype)) + thread.start() + + + def _try_to_connect(self, addr, conntype='normal'): conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect(addr) @@ -208,6 +210,7 @@ class Network_controller: # manage id and connection if not dp.head.get('to'): print('You got a no head datapack') print(str(dp.head)) + continue to_str = dp.head['to'] to_list = to_str.split('&') @@ -242,19 +245,24 @@ class Network_controller: # manage id and connection print('To id %s is yourself!' % to, dp) # maybe proxy to yourself return if to in self.proxydict: # neat warning dangerous code - if dp.head['to']: - dp.head['to'] = self.proxydict[to] + '&' + to + '&' + dp.head['to'] + if not ID == self.proxydict[to]: # check whether proxy is yourself + if dp.head.get('to'): + dp.head['to'] = self.proxydict[to] + '&' + to + '&' + dp.head['to'] + else: + dp.head['to'] = self.proxydict[to] + '&' + to else: - dp.head['to'] = self.proxydict[to] + '&' + to - - send_queue.put(dp) + if dp.head.get('to'): + dp.head['to'] = to + '&' + dp.head['to'] + else: + dp.head['to'] = to + self.wheel_queue.put(dp) return - print('To id %s has no connection now' % to, dp) + print('To id %s has no connection now %d...' % (to, dp.failed_times), dp) if dp.head.get('to'): - dp.head['id'] = to + '&' + dp.head['id'] + dp.head['to'] = to + '&' + dp.head['to'] else: - dp.head['id'] = to + dp.head['to'] = to self.wheel_queue.put(dp) return @@ -265,6 +273,10 @@ class Network_controller: # manage id and connection def start_wheel(self): while True: dp = self.wheel_queue.get() + dp.failed_times += 1 + if dp.failed_times > 39: + print('Datapack abandom', dp) + continue time.sleep(RETRYSLEEP) receive_queue.put(dp) @@ -435,7 +447,7 @@ class Connection: # bleow code are using to process datapack if dp.method == 'file': os.rename('tmp/' + dp.head['filename'], dp.head['filename']) - print('Received file %s' % dp.head['filename'], dp) + print('Received file %s from %s' % (dp.head['filename'], self.id), dp) send_queue.put(dp)