diff --git a/config.py b/config.py index cfd8354..70cd0bf 100644 --- a/config.py +++ b/config.py @@ -67,6 +67,8 @@ def create_floder(path): _create_floder(flordpath) def _create_floder(path): + if not path: + return pathlist = list(os.path.split(path)) pathlist.pop() flordpath = '/'.join(pathlist) diff --git a/mswp.py b/mswp.py index 572447a..7c0a30b 100644 --- a/mswp.py +++ b/mswp.py @@ -90,7 +90,7 @@ class Datapack: def reply(self): - ndp = copy.copy(self) + ndp = copy.deepcopy(self) ndp.app = ndp.head['from'] ndp.method = 'reply' if not self.head['id'] == ID: # net package diff --git a/plugins/ffmpeg.py b/plugins/ffmpeg.py index b24a510..99fd6fa 100644 --- a/plugins/ffmpeg.py +++ b/plugins/ffmpeg.py @@ -1,6 +1,9 @@ import threading import copy import os +import queue +import time +import subprocess from mswp import Datapack from forwarder import receive_queues, send_queue from config import msw_queue, _create_floder @@ -19,49 +22,77 @@ class Ffmpeg_controller: self.ffmpeg_type = None self.status = 0 self.server = None + self.conver_task_queue = queue.Queue() + self.org_filename = None + + self.conver_task_thread = threading.Thread(target=self.conver_task_func, args=()) + self.conver_task_thread.start() self.mainloop() def mainloop(self): - _create_floder('resources/ffmpeg_tmp') - _create_floder('resources/ffmpeg_finished') + _create_floder('res/ffmpeg_tmp') + _create_floder('res/ffmpeg_finished') while True: dp = receive_queue.get() if dp.method == 'post' and dp.body == b'start': # config ffmpeg is server or client + self.org_filename = dp.head['filename'] ndp = dp.reply() ndp.body = 'Spliting file %s' % dp.head['filename'] ndp.body = ndp.body.encode() send_queue.put(ndp) cmd = 'ffmpeg -i ' + dp.head['filename'] + ' -c copy -f segment -segment_time 20 \ - -reset_timestamps 1 -y resources/ffmpeg_tmp/' + '%d' + '.mp4' + -reset_timestamps 1 -y res/ffmpeg_tmp/' + '%d' + '.mp4' os.system(cmd) self.run_as_server() + # concat all file + filelist = os.listdir('res/ffmpeg_finished') + if 'filelist.txt' in filelist: + filelist.remove('filelist.txt') + with open('res/ffmpeg_finished/filelist.txt', 'w') as f: + for file in filelist: + f.write('file \'%s\'\n' % file) + object_filename = self.org_filename[:-4] + '.mkv' + subprocess.check_output('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \ + -c copy -y ' + object_filename, shell=True) + + print('All process finished at ' + object_filename) elif dp.method == 'post' and dp.body == b'enable': # clinet mode self.status = 1 self.server = dp.head['server'] self.conver_func() - elif dp.method == 'get': + elif dp.method == 'post' and dp.body == b'status': + result = 'ffmpeg not working' + ndp = dp.reply() + ndp.body = result.encode() + + send_queue.put(ndp) + + elif dp.method == 'get': # let other client disable ndp = dp.reply() ndp.method = 'post' ndp.body = b'disable' print('let %s disabled' % dp.head['id']) + send_queue.put(ndp) + + def run_as_server(self): - _padding_to_convert = os.listdir('resources/ffmpeg_tmp') + _padding_to_convert = os.listdir('res/ffmpeg_tmp') padding_to_convert = [] for file in _padding_to_convert: - file = 'resources/ffmpeg_tmp/' + file + file = 'res/ffmpeg_tmp/' + file padding_to_convert.append(file) - already_in_convert = {} # flag: filename + already_in_convert = [] finished_convert = [] # outputfilename while True: @@ -71,7 +102,8 @@ class Ffmpeg_controller: result = '' result += 'padding_to_convert ' + str(padding_to_convert) + '\n' result += 'already_in_convert ' + str(already_in_convert) + '\n' - result += 'finished_convert ' + str(finished_convert) + result += 'finished_convert ' + str(finished_convert) + '\n' + result += 'conver_task_queue size ' + str(self.conver_task_queue.qsize()) ndp = dp.reply() ndp.body = result.encode() send_queue.put(ndp) @@ -79,51 +111,79 @@ class Ffmpeg_controller: elif dp.method == 'get': if padding_to_convert: filename = padding_to_convert.pop(0) - already_in_convert[dp.head['flag']] = filename - elif already_in_convert: - key, filename = get_one_from_dict(already_in_convert) - already_in_convert[dp.head['flag']] = filename - del(already_in_convert[key]) - else: - print('woring') - continue - - ndp = dp.reply() - ndp.method = 'file' - ndp.head['filename'] = filename - print('%s get %s to convert' % (dp.head['id'], filename), dp) + already_in_convert.append(filename) - send_queue.put(ndp) + print('%s get %s to convert' % (dp.head['id'], filename), dp) + + ndp = dp.reply() + ndp.method = 'file' + ndp.head['filename'] = filename + + send_queue.put(ndp) + + else: + if not already_in_convert: # finished + break + else: # waiting for final convert + ndp = dp.reply() + ndp.method = 'post' + ndp.body = b'disable' elif dp.method == 'file': - os.remove(already_in_convert[dp.head['flag']]) - del(already_in_convert[dp.head['flag']]) - finished_convert.append(dp.head['filename']) - if not padding_to_convert and not already_in_convert: - print('convert finished') - return - + old_filename = dp.head['old_filename'] + filename = dp.head['filename'] - def conver_func(self): - while self.status: + os.remove(old_filename) + already_in_convert.remove(old_filename) + finished_convert.append(filename) + + if not padding_to_convert and not already_in_convert: # final process + break + + print('Mapreduce finished') + + + + + def conver_func(self): # run as client + for _ in range(2): self.send_request() - + + while self.status or not self.conver_task_queue.empty(): dp = receive_queue.get() + if dp.method == 'post' and dp.body == b'disable': self.status = 0 + + elif dp.method == 'post' and dp.body == b'status': + result = 'Working as client, queue size: %s' % str(self.conver_task_queue.qsize()) + + ndp = dp.reply() + ndp.body = result.encode() + send_queue.put(ndp) elif dp.method == 'file': - filename = dp.head['filename'] - output_filename = filename[:-4] + '.mkv' - output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished') - os.system('ffmpeg -i ' + filename + ' -c:a libopus -ab 64k \ - -c:v libx265 -s 1280x720 -y ' + output_filename) - - ndp = dp.reply() - ndp.head['filename'] = output_filename - ndp.method = 'file' - send_queue.put(ndp) + self.conver_task_queue.put(dp) + + def conver_task_func(self): + while True: + dp = self.conver_task_queue.get() + + filename = dp.head['filename'] + output_filename = filename[:-4] + '.mkv' + output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished') + os.system('ffmpeg -i ' + filename + ' -c:a libopus -ab 64k \ + -c:v libx265 -s 1280x720 -y ' + output_filename) + + ndp = dp.reply() + ndp.head['filename'] = output_filename + ndp.head['old_filename'] = filename + ndp.method = 'file' + send_queue.put(ndp) + + self.send_request() + def send_request(self): dp = Datapack(head={'from': __name__}) diff --git a/plugins/input.py b/plugins/input.py index 6f4d7b3..ee880fb 100644 --- a/plugins/input.py +++ b/plugins/input.py @@ -35,6 +35,12 @@ def _main(): if raw_data == 'exit': msw_queue.put(1) break + if raw_data == 'update': + raw_data = 'update:compress;update_to:*' + if raw_data == '1': + raw_data = 'ffmpeg:start;filename:res/test.mp4' + if raw_data == '2': + raw_data = 'ffmpeg:enable;to:*,server:miku' if raw_data[:6] == '(file)': # like "(file)log: filename.exe" raw_data = raw_data[6:] diff --git a/plugins/net.py b/plugins/net.py index 0d2d5d2..8b3687d 100644 --- a/plugins/net.py +++ b/plugins/net.py @@ -191,6 +191,10 @@ class Network_controller: # manage id and connection self.process_command(dp) continue + if not dp.head.get('to'): + print('You got a no head datapack') + print(str(dp.head)) + to_str = dp.head['to'] to_list = to_str.split('&') to = to_list.pop(0) @@ -202,7 +206,9 @@ class Network_controller: # manage id and connection for id in self.id_dict: connection = self.id_dict[id][0] connection.sendall(dp) - + elif not to: + print('not to', dp) + else: self.send_to_id(to, dp) @@ -224,6 +230,10 @@ class Network_controller: # manage id and connection return print('To id %s has no connection now' % to, dp) + if dp.head.get('to'): + dp.head['id'] = to + '&' + dp.head['id'] + else: + dp.head['id'] = to self.wheel_queue.put(dp) return @@ -377,7 +387,7 @@ class Connection: except Exception as e: print('Decode head failed %s: %s' % (type(e), str(e))) print(self.buff) - continue + break length = int(dp.head.get('length')) still_need = length @@ -487,7 +497,8 @@ class Connection: self.conn.sendall(data) except Exception as e: print('Failed to send file %s %s: %s' % (dp.head['filename'], type(e), str(e)), dp) - continue + self.netowrk_controller.wheel_queue.put(dp) + break print('Send file %s finished' % dp.head['filename'], dp) diff --git a/plugins/update.py b/plugins/update.py index 8dfdecd..f08d201 100644 --- a/plugins/update.py +++ b/plugins/update.py @@ -8,7 +8,7 @@ receive_queue = receive_queues[__name__] remove_file_list = ['__init__.py', 'addrlist.txt', 'config.json', 'logger.log', 'update.tar.xz'] -remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp'] +remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp', 'res'] def main(): @@ -33,7 +33,7 @@ def main(): ndp = Datapack(head={'from':__name__}) ndp.method = 'file' ndp.app = 'update' - ndp.head['filename'] = 'resources/update.tar.xz' + ndp.head['filename'] = 'res/update.tar.xz' ndp.head['to'] = to send_queue.put(ndp) @@ -59,7 +59,7 @@ class Compresser: self.compress_files(self.filelist) def compress_files(self, filelist): - with tarfile.open('resources/update.tar.xz', 'w:xz') as f: + with tarfile.open('res/update.tar.xz', 'w:xz') as f: for name in filelist: f.add(name)