ffmpeg process download same time, deepcopy...
This commit is contained in:
@@ -67,6 +67,8 @@ def create_floder(path):
|
||||
_create_floder(flordpath)
|
||||
|
||||
def _create_floder(path):
|
||||
if not path:
|
||||
return
|
||||
pathlist = list(os.path.split(path))
|
||||
pathlist.pop()
|
||||
flordpath = '/'.join(pathlist)
|
||||
|
||||
2
mswp.py
2
mswp.py
@@ -90,7 +90,7 @@ class Datapack:
|
||||
|
||||
|
||||
def reply(self):
|
||||
ndp = copy.copy(self)
|
||||
ndp = copy.deepcopy(self)
|
||||
ndp.app = ndp.head['from']
|
||||
ndp.method = 'reply'
|
||||
if not self.head['id'] == ID: # net package
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import threading
|
||||
import copy
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
import subprocess
|
||||
from mswp import Datapack
|
||||
from forwarder import receive_queues, send_queue
|
||||
from config import msw_queue, _create_floder
|
||||
@@ -19,49 +22,77 @@ class Ffmpeg_controller:
|
||||
self.ffmpeg_type = None
|
||||
self.status = 0
|
||||
self.server = None
|
||||
self.conver_task_queue = queue.Queue()
|
||||
self.org_filename = None
|
||||
|
||||
self.conver_task_thread = threading.Thread(target=self.conver_task_func, args=())
|
||||
self.conver_task_thread.start()
|
||||
|
||||
self.mainloop()
|
||||
|
||||
|
||||
def mainloop(self):
|
||||
_create_floder('resources/ffmpeg_tmp')
|
||||
_create_floder('resources/ffmpeg_finished')
|
||||
_create_floder('res/ffmpeg_tmp')
|
||||
_create_floder('res/ffmpeg_finished')
|
||||
|
||||
while True:
|
||||
dp = receive_queue.get()
|
||||
|
||||
if dp.method == 'post' and dp.body == b'start': # config ffmpeg is server or client
|
||||
self.org_filename = dp.head['filename']
|
||||
ndp = dp.reply()
|
||||
ndp.body = 'Spliting file %s' % dp.head['filename']
|
||||
ndp.body = ndp.body.encode()
|
||||
send_queue.put(ndp)
|
||||
|
||||
cmd = 'ffmpeg -i ' + dp.head['filename'] + ' -c copy -f segment -segment_time 20 \
|
||||
-reset_timestamps 1 -y resources/ffmpeg_tmp/' + '%d' + '.mp4'
|
||||
-reset_timestamps 1 -y res/ffmpeg_tmp/' + '%d' + '.mp4'
|
||||
|
||||
os.system(cmd)
|
||||
|
||||
self.run_as_server()
|
||||
|
||||
# concat all file
|
||||
filelist = os.listdir('res/ffmpeg_finished')
|
||||
if 'filelist.txt' in filelist:
|
||||
filelist.remove('filelist.txt')
|
||||
with open('res/ffmpeg_finished/filelist.txt', 'w') as f:
|
||||
for file in filelist:
|
||||
f.write('file \'%s\'\n' % file)
|
||||
object_filename = self.org_filename[:-4] + '.mkv'
|
||||
subprocess.check_output('ffmpeg -f concat -i res/ffmpeg_finished/filelist.txt \
|
||||
-c copy -y ' + object_filename, shell=True)
|
||||
|
||||
print('All process finished at ' + object_filename)
|
||||
|
||||
elif dp.method == 'post' and dp.body == b'enable': # clinet mode
|
||||
self.status = 1
|
||||
self.server = dp.head['server']
|
||||
self.conver_func()
|
||||
|
||||
elif dp.method == 'get':
|
||||
elif dp.method == 'post' and dp.body == b'status':
|
||||
result = 'ffmpeg not working'
|
||||
ndp = dp.reply()
|
||||
ndp.body = result.encode()
|
||||
|
||||
send_queue.put(ndp)
|
||||
|
||||
elif dp.method == 'get': # let other client disable
|
||||
ndp = dp.reply()
|
||||
ndp.method = 'post'
|
||||
ndp.body = b'disable'
|
||||
print('let %s disabled' % dp.head['id'])
|
||||
|
||||
send_queue.put(ndp)
|
||||
|
||||
|
||||
def run_as_server(self):
|
||||
_padding_to_convert = os.listdir('resources/ffmpeg_tmp')
|
||||
_padding_to_convert = os.listdir('res/ffmpeg_tmp')
|
||||
padding_to_convert = []
|
||||
for file in _padding_to_convert:
|
||||
file = 'resources/ffmpeg_tmp/' + file
|
||||
file = 'res/ffmpeg_tmp/' + file
|
||||
padding_to_convert.append(file)
|
||||
already_in_convert = {} # flag: filename
|
||||
already_in_convert = []
|
||||
finished_convert = [] # outputfilename
|
||||
|
||||
while True:
|
||||
@@ -71,7 +102,8 @@ class Ffmpeg_controller:
|
||||
result = ''
|
||||
result += 'padding_to_convert ' + str(padding_to_convert) + '\n'
|
||||
result += 'already_in_convert ' + str(already_in_convert) + '\n'
|
||||
result += 'finished_convert ' + str(finished_convert)
|
||||
result += 'finished_convert ' + str(finished_convert) + '\n'
|
||||
result += 'conver_task_queue size ' + str(self.conver_task_queue.qsize())
|
||||
ndp = dp.reply()
|
||||
ndp.body = result.encode()
|
||||
send_queue.put(ndp)
|
||||
@@ -79,51 +111,79 @@ class Ffmpeg_controller:
|
||||
elif dp.method == 'get':
|
||||
if padding_to_convert:
|
||||
filename = padding_to_convert.pop(0)
|
||||
already_in_convert[dp.head['flag']] = filename
|
||||
elif already_in_convert:
|
||||
key, filename = get_one_from_dict(already_in_convert)
|
||||
already_in_convert[dp.head['flag']] = filename
|
||||
del(already_in_convert[key])
|
||||
else:
|
||||
print('woring')
|
||||
continue
|
||||
|
||||
ndp = dp.reply()
|
||||
ndp.method = 'file'
|
||||
ndp.head['filename'] = filename
|
||||
print('%s get %s to convert' % (dp.head['id'], filename), dp)
|
||||
already_in_convert.append(filename)
|
||||
|
||||
send_queue.put(ndp)
|
||||
print('%s get %s to convert' % (dp.head['id'], filename), dp)
|
||||
|
||||
ndp = dp.reply()
|
||||
ndp.method = 'file'
|
||||
ndp.head['filename'] = filename
|
||||
|
||||
send_queue.put(ndp)
|
||||
|
||||
else:
|
||||
if not already_in_convert: # finished
|
||||
break
|
||||
else: # waiting for final convert
|
||||
ndp = dp.reply()
|
||||
ndp.method = 'post'
|
||||
ndp.body = b'disable'
|
||||
|
||||
elif dp.method == 'file':
|
||||
os.remove(already_in_convert[dp.head['flag']])
|
||||
del(already_in_convert[dp.head['flag']])
|
||||
finished_convert.append(dp.head['filename'])
|
||||
if not padding_to_convert and not already_in_convert:
|
||||
print('convert finished')
|
||||
return
|
||||
|
||||
old_filename = dp.head['old_filename']
|
||||
filename = dp.head['filename']
|
||||
|
||||
def conver_func(self):
|
||||
while self.status:
|
||||
os.remove(old_filename)
|
||||
already_in_convert.remove(old_filename)
|
||||
finished_convert.append(filename)
|
||||
|
||||
if not padding_to_convert and not already_in_convert: # final process
|
||||
break
|
||||
|
||||
print('Mapreduce finished')
|
||||
|
||||
|
||||
|
||||
|
||||
def conver_func(self): # run as client
|
||||
for _ in range(2):
|
||||
self.send_request()
|
||||
|
||||
|
||||
while self.status or not self.conver_task_queue.empty():
|
||||
dp = receive_queue.get()
|
||||
|
||||
if dp.method == 'post' and dp.body == b'disable':
|
||||
self.status = 0
|
||||
|
||||
elif dp.method == 'post' and dp.body == b'status':
|
||||
result = 'Working as client, queue size: %s' % str(self.conver_task_queue.qsize())
|
||||
|
||||
ndp = dp.reply()
|
||||
ndp.body = result.encode()
|
||||
send_queue.put(ndp)
|
||||
|
||||
elif dp.method == 'file':
|
||||
filename = dp.head['filename']
|
||||
output_filename = filename[:-4] + '.mkv'
|
||||
output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished')
|
||||
os.system('ffmpeg -i ' + filename + ' -c:a libopus -ab 64k \
|
||||
-c:v libx265 -s 1280x720 -y ' + output_filename)
|
||||
|
||||
ndp = dp.reply()
|
||||
ndp.head['filename'] = output_filename
|
||||
ndp.method = 'file'
|
||||
send_queue.put(ndp)
|
||||
self.conver_task_queue.put(dp)
|
||||
|
||||
|
||||
def conver_task_func(self):
|
||||
while True:
|
||||
dp = self.conver_task_queue.get()
|
||||
|
||||
filename = dp.head['filename']
|
||||
output_filename = filename[:-4] + '.mkv'
|
||||
output_filename = output_filename.replace('ffmpeg_tmp', 'ffmpeg_finished')
|
||||
os.system('ffmpeg -i ' + filename + ' -c:a libopus -ab 64k \
|
||||
-c:v libx265 -s 1280x720 -y ' + output_filename)
|
||||
|
||||
ndp = dp.reply()
|
||||
ndp.head['filename'] = output_filename
|
||||
ndp.head['old_filename'] = filename
|
||||
ndp.method = 'file'
|
||||
send_queue.put(ndp)
|
||||
|
||||
self.send_request()
|
||||
|
||||
|
||||
def send_request(self):
|
||||
dp = Datapack(head={'from': __name__})
|
||||
|
||||
@@ -35,6 +35,12 @@ def _main():
|
||||
if raw_data == 'exit':
|
||||
msw_queue.put(1)
|
||||
break
|
||||
if raw_data == 'update':
|
||||
raw_data = 'update:compress;update_to:*'
|
||||
if raw_data == '1':
|
||||
raw_data = 'ffmpeg:start;filename:res/test.mp4'
|
||||
if raw_data == '2':
|
||||
raw_data = 'ffmpeg:enable;to:*,server:miku'
|
||||
|
||||
if raw_data[:6] == '(file)': # like "(file)log: filename.exe"
|
||||
raw_data = raw_data[6:]
|
||||
|
||||
@@ -191,6 +191,10 @@ class Network_controller: # manage id and connection
|
||||
self.process_command(dp)
|
||||
continue
|
||||
|
||||
if not dp.head.get('to'):
|
||||
print('You got a no head datapack')
|
||||
print(str(dp.head))
|
||||
|
||||
to_str = dp.head['to']
|
||||
to_list = to_str.split('&')
|
||||
to = to_list.pop(0)
|
||||
@@ -202,7 +206,9 @@ class Network_controller: # manage id and connection
|
||||
for id in self.id_dict:
|
||||
connection = self.id_dict[id][0]
|
||||
connection.sendall(dp)
|
||||
|
||||
elif not to:
|
||||
print('not to', dp)
|
||||
|
||||
else:
|
||||
self.send_to_id(to, dp)
|
||||
|
||||
@@ -224,6 +230,10 @@ class Network_controller: # manage id and connection
|
||||
return
|
||||
|
||||
print('To id %s has no connection now' % to, dp)
|
||||
if dp.head.get('to'):
|
||||
dp.head['id'] = to + '&' + dp.head['id']
|
||||
else:
|
||||
dp.head['id'] = to
|
||||
self.wheel_queue.put(dp)
|
||||
return
|
||||
|
||||
@@ -377,7 +387,7 @@ class Connection:
|
||||
except Exception as e:
|
||||
print('Decode head failed %s: %s' % (type(e), str(e)))
|
||||
print(self.buff)
|
||||
continue
|
||||
break
|
||||
|
||||
length = int(dp.head.get('length'))
|
||||
still_need = length
|
||||
@@ -487,7 +497,8 @@ class Connection:
|
||||
self.conn.sendall(data)
|
||||
except Exception as e:
|
||||
print('Failed to send file %s %s: %s' % (dp.head['filename'], type(e), str(e)), dp)
|
||||
continue
|
||||
self.netowrk_controller.wheel_queue.put(dp)
|
||||
break
|
||||
print('Send file %s finished' % dp.head['filename'], dp)
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ receive_queue = receive_queues[__name__]
|
||||
|
||||
|
||||
remove_file_list = ['__init__.py', 'addrlist.txt', 'config.json', 'logger.log', 'update.tar.xz']
|
||||
remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp']
|
||||
remove_dir_list = ['.git', '.idea', '__pycache__', 'resources', 'tmp', 'res']
|
||||
|
||||
|
||||
def main():
|
||||
@@ -33,7 +33,7 @@ def main():
|
||||
ndp = Datapack(head={'from':__name__})
|
||||
ndp.method = 'file'
|
||||
ndp.app = 'update'
|
||||
ndp.head['filename'] = 'resources/update.tar.xz'
|
||||
ndp.head['filename'] = 'res/update.tar.xz'
|
||||
ndp.head['to'] = to
|
||||
|
||||
send_queue.put(ndp)
|
||||
@@ -59,7 +59,7 @@ class Compresser:
|
||||
self.compress_files(self.filelist)
|
||||
|
||||
def compress_files(self, filelist):
|
||||
with tarfile.open('resources/update.tar.xz', 'w:xz') as f:
|
||||
with tarfile.open('res/update.tar.xz', 'w:xz') as f:
|
||||
for name in filelist:
|
||||
f.add(name)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user