Python的subprocess程式庫在Python 3.5版新增了run()方法,它能擷取外部命令的輸出(stdout)或錯誤訊息(stderr),可替代原本的call()以及Popen()方法。
本文將介紹subpricess程式庫的下列兩個方法,並使用run()改寫YouTube影片下載(一)的原始碼。
- check_output():執行外部命令並傳回外部命令的輸出文字
- run():執行外部命令並儲存外部命令的輸出文字
使用check_output()執行外部命令並擷取輸出文字
check_output()的外部命令參數,預設採用字串列表格式傳入。透過它執行ping google.com命令的敘述如下,check_output()將傳回外部命令的輸出文字:
import subprocess
txt = subprocess.check_output(["ping", "google.com"])
print(txt.decode('cp950'))
傳回的位元組字串透過cp950 (big5)解碼還原成繁體中文字串;在Linux或macOS的終端機,請改成底下的命令,並使用utf-8解碼。
import subprocess
txt = subprocess.check_output(["ping", "-c 4", "google.com"])
print(txt.decode('utf-8'))
使用run()執行外部命令
Python官方文件建議使用run()取代check_output()。
run()其實是把Popen()重新包裝、簡化的方法。以擷取ping命令的輸出為例,跟Popen()方法一樣,外部命令預設採用字串列表格式,並透過stdout參數把外部命令的輸出導入Python程式:
import subprocess r = subprocess.run(["ping", "google.com"], stdout=subprocess.PIPE)
run()將傳回一個CompletedProcess類型物件,其中包含命令是否順利執行的returncode(直譯為「傳回代碼」)屬性,若returncode不是0,代表命令執行過程出現問題。例如,上面的指令執行後,變數r將包含如下內容:
CompletedProcess(args=['ping', 'google.com'], returncode=0, stdout=b'\r\nPing google.com [216.58.200.46] (\xa8\xcf\xa5\xce 32 \xa6\xec\xa4\xb8\xb2\xd5\xaa\xba\xb8\xea\xae\xc6):\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\r\n216.58.200.46 \xaa\xba Ping \xb2\xce\xadp\xb8\xea\xae\xc6:\r\n \xab\xca\xa5]: \xa4w\xb6\xc7\xb0e = 4\xa1A\xa4w\xa6\xac\xa8\xec = 4, \xa4w\xbf\xf2\xa5\xa2 = 0 (0% \xbf\xf2\xa5\xa2)\xa1A\r\n\xa4j\xac\xf9\xaa\xba\xa8\xd3\xa6^\xae\xc9\xb6\xa1 (\xb2@\xac\xed):\r\n \xb3\xcc\xa4p\xad\xc8 = 4ms\xa1A\xb3\xcc\xa4j\xad\xc8 = 4ms\xa1A\xa5\xad\xa7\xa1 = 4ms\r\n')
CompletedProcess物件的returncode屬性值為0,代表外部命令順利執行無誤。
外部命令輸出到終端機的文字,被轉存在CompletedProcess物件的stdout屬性,底下敘述採用Windows的“cp950”(big5)解碼stdout的位元組字串:
print(r.stdout.decode('cp950'))
它將顯示如下內容:
Ping google.com [216.58.200.46] (使用 32 位元組的資料): 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 216.58.200.46 的 Ping 統計資料: 封包: 已傳送 = 4,已收到 = 4, 已遺失 = 0 (0% 遺失), 大約的來回時間 (毫秒): 最小值 = 4ms,最大值 = 4ms,平均 = 4ms
確認外部命令是否正確執行的兩個方式
判斷returncode屬性值是否等於0,便能知道外部命令是否運作順利,例如:
import subprocess
r = subprocess.run(["ping", "google"], stdout=subprocess.PIPE)
if (r.returncode == 0):
print(r.stdout.decode('cp950'))
else:
print("命令出錯了!")
執行結果將顯示“命令出錯了!”,因為google的網址後面少了“.com”。
確認命令是否順利執行的第二個方式,是將run()的check(直譯為「檢查」)參數設定成True。如此,當命令出現問題,它將引發“subprocess.CalledProcessError”類型的例外。上一段程式碼可以用try…except改寫成:
import subprocess
try:
r = subprocess.run("ping google", shell=True, check=True)
print(r.stdout.decode('cp950'))
except subprocess.CalledProcessError:
print("命令出錯了!")
執行結果一樣是顯示“命令出錯了!”。
使用run()方法取代Popen()
底下是檢查已下載的YouTube影片是否包含聲音的自訂函式,使用run()取代Popen(),程式碼稍微簡短一些:
def check_media(filename):
# 取代subprocess.Popen()
out = subprocess.run(["ffprobe", filename],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if (out.stdout.decode('utf-8').rfind('Audio') == -1):
return -1 # 沒有聲音
else:
return 1
搜尋字串的find()和rfind()方法
Python的字串物件具有搜尋字串內容的find()和rfind()方法(r代表“reverse”,反向),它們都會傳回搜尋到的目標字串的位置編號(從0開始);傳回-1代表找不到。 find()從字串開頭搜尋,rfind()則是從字串結尾往前搜尋,以這個字串為例:
txt = ("老腦筋可以變成新腦筋,"
"新腦筋不學習就會變成老腦筋。")
find()方法將傳回1,代表在txt字串的位置1找到“腦筋”。
txt.find("腦筋")
rfind()方法將傳回22,代表在txt字串的位置22找到“腦筋”。
txt.rfind("腦筋")
從YouTube影片下載(二)貼文的ffprobe命令執行結果可看出,代表聲音串流的“Audio”文字位於訊息的末尾,因此改用rfind()從後面往前搜尋,可以節省一點時間。
使用run()方法改寫下載YouTube影片的原始碼
這是改寫後的版本:
import argparse
import os
import platform
from pytube import YouTube
import subprocess
args = {}
fileobj = {}
download_count = 1
def pyTube_folder():
sys = platform.system()
home = os.path.expanduser('~')
if sys == 'Windows':
folder = os.path.join(home, 'Videos', 'PyTube')
elif sys == 'Darwin':
folder = os.path.join(home, 'Movies', 'PyTube')
if not os.path.isdir(folder): # 若'PyTube'資料夾不存在…
os.mkdir(folder) # 則新增資料夾
return folder
def onProgress(stream, chunk, file_handle, remains):
total = stream.filesize
percent = (total-remains) / total * 100
print('下載中… {:05.2f}%'.format(percent), end='\r')
def video_res(yt):
res_set = set()
video_list = yt.streams.filter(type="video").all()
for v in video_list:
res_set.add(v.resolution)
return sorted(res_set, reverse=True, key=lambda s: int(s[:-1]))
def download_media(args):
try:
yt = YouTube(args.url, on_progress_callback=onProgress,
on_complete_callback=onComplete)
except:
print('下載影片時發生錯誤,請確認網路連線和YouTube網址無誤。')
return
filter = yt.streams.filter
if args.a: # 只下載聲音
target = filter(type="audio").first()
elif args.fhd:
target = filter(type="video", resolution="1080p").first()
elif args.hd:
target = filter(type="video", resolution="720p").first()
elif args.sd:
target = filter(type="video", resolution="480p").first()
else:
target = filter(type="video").first()
if target is None:
print('沒有您指定的解析度,可用的解析度如下:')
res_list = video_res(yt)
for i, res in enumerate(res_list):
print('{}) {}'.format(i+1, res))
val = input('請選擇(預設{}):'.format(res_list[0]))
try:
res = res_list[int(val)-1]
except:
res = res_list[0]
print('您選擇的是 {} 。'.format(res))
target = filter(type="video", resolution=res).first()
# 開始下載
target.download(output_path=pyTube_folder())
def check_media(filename):
# 取代subprocess.Popen()
out = subprocess.run(["ffprobe", filename],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if (out.stdout.decode('utf-8').rfind('Audio') == -1):
return -1 # 沒有聲音
else:
return 1
def merge_media():
temp_video = os.path.join(fileobj['dir'], 'temp_video.mp4')
temp_audio = os.path.join(fileobj['dir'], 'temp_audio.mp4')
temp_output = os.path.join(fileobj['dir'], 'output.mp4')
cmd = f'ffmpeg -i {temp_video} -i {temp_audio} \
-map 0:v -map 1:a -c copy -y {temp_output}'
try:
# 取代subprocess.call()
subprocess.run(cmd, shell=True)
# 視訊檔重新命名
os.rename(temp_output, os.path.join(fileobj['dir'], fileobj['name']))
os.remove(temp_audio)
os.remove(temp_video)
print('視訊和聲音合併完成')
except:
print('視訊和聲音合併失敗')
def onComplete(stream, file_handle):
global download_count, fileobj
fileobj['name'] = os.path.basename(file_handle.name)
fileobj['dir'] = os.path.dirname(file_handle.name)
print('\r')
if download_count == 1:
if check_media(file_handle.name) == -1:
print('此影片沒有聲音')
download_count += 1
try:
# 視訊檔重新命名
os.rename(file_handle.name, os.path.join(
fileobj['dir'], 'temp_video.mp4'))
except:
print('視訊檔重新命名失敗')
return
print('準備下載聲音檔')
vars(args)['a'] = True # 設定成a參數
download_media(args) # 下載聲音
else:
print('此影片有聲音,下載完畢!')
else:
try:
# 聲音檔重新命名
os.rename(file_handle.name, os.path.join(
fileobj['dir'], 'temp_audio.mp4'))
except:
print("聲音檔重新命名失敗")
# 合併聲音檔
merge_media()
def main():
global args
parser = argparse.ArgumentParser()
parser.add_argument("url", help="指定YouTube視訊網址")
parser.add_argument("-sd", action="store_true", help="選擇普通(480P)畫質")
parser.add_argument("-hd", action="store_true", help="選擇HD(720P)畫質")
parser.add_argument("-fhd", action="store_true", help="選擇Full HD(1080P)畫質")
parser.add_argument("-a", action="store_true", help="僅下載聲音")
args = parser.parse_args()
download_media(args)
if __name__ == '__main__':
main()
