Python的subprocess程式庫在Python 3.5版新增了run()方法,它能擷取外部命令的輸出(stdout)或錯誤訊息(stderr),可替代原本的call()以及Popen()方法。
本文將介紹subpricess程式庫的下列兩個方法,並使用run()改寫YouTube影片下載(一)的原始碼。
- check_output():執行外部命令並傳回外部命令的輸出文字
- run():執行外部命令並儲存外部命令的輸出文字
使用check_output()執行外部命令並擷取輸出文字
check_output()的外部命令參數,預設採用字串列表格式傳入。透過它執行ping google.com命令的敘述如下,check_output()將傳回外部命令的輸出文字:
import subprocess txt = subprocess.check_output(["ping", "google.com"]) print(txt.decode('cp950'))
傳回的位元組字串透過cp950 (big5)解碼還原成繁體中文字串;在Linux或macOS的終端機,請改成底下的命令,並使用utf-8解碼。
import subprocess txt = subprocess.check_output(["ping", "-c 4", "google.com"]) print(txt.decode('utf-8'))
使用run()執行外部命令
Python官方文件建議使用run()取代check_output()。
run()其實是把Popen()重新包裝、簡化的方法。以擷取ping命令的輸出為例,跟Popen()方法一樣,外部命令預設採用字串列表格式,並透過stdout參數把外部命令的輸出導入Python程式:
import subprocess r = subprocess.run(["ping", "google.com"], stdout=subprocess.PIPE)
run()將傳回一個CompletedProcess類型物件,其中包含命令是否順利執行的returncode(直譯為「傳回代碼」)屬性,若returncode不是0,代表命令執行過程出現問題。例如,上面的指令執行後,變數r將包含如下內容:
CompletedProcess(args=['ping', 'google.com'], returncode=0, stdout=b'\r\nPing google.com [216.58.200.46] (\xa8\xcf\xa5\xce 32 \xa6\xec\xa4\xb8\xb2\xd5\xaa\xba\xb8\xea\xae\xc6):\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\r\n216.58.200.46 \xaa\xba Ping \xb2\xce\xadp\xb8\xea\xae\xc6:\r\n \xab\xca\xa5]: \xa4w\xb6\xc7\xb0e = 4\xa1A\xa4w\xa6\xac\xa8\xec = 4, \xa4w\xbf\xf2\xa5\xa2 = 0 (0% \xbf\xf2\xa5\xa2)\xa1A\r\n\xa4j\xac\xf9\xaa\xba\xa8\xd3\xa6^\xae\xc9\xb6\xa1 (\xb2@\xac\xed):\r\n \xb3\xcc\xa4p\xad\xc8 = 4ms\xa1A\xb3\xcc\xa4j\xad\xc8 = 4ms\xa1A\xa5\xad\xa7\xa1 = 4ms\r\n')
CompletedProcess物件的returncode屬性值為0,代表外部命令順利執行無誤。
外部命令輸出到終端機的文字,被轉存在CompletedProcess物件的stdout屬性,底下敘述採用Windows的“cp950”(big5)解碼stdout的位元組字串:
print(r.stdout.decode('cp950'))
它將顯示如下內容:
Ping google.com [216.58.200.46] (使用 32 位元組的資料): 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56 216.58.200.46 的 Ping 統計資料: 封包: 已傳送 = 4,已收到 = 4, 已遺失 = 0 (0% 遺失), 大約的來回時間 (毫秒): 最小值 = 4ms,最大值 = 4ms,平均 = 4ms
確認外部命令是否正確執行的兩個方式
判斷returncode屬性值是否等於0,便能知道外部命令是否運作順利,例如:
import subprocess r = subprocess.run(["ping", "google"], stdout=subprocess.PIPE) if (r.returncode == 0): print(r.stdout.decode('cp950')) else: print("命令出錯了!")
執行結果將顯示“命令出錯了!”,因為google的網址後面少了“.com”。
確認命令是否順利執行的第二個方式,是將run()的check(直譯為「檢查」)參數設定成True。如此,當命令出現問題,它將引發“subprocess.CalledProcessError”類型的例外。上一段程式碼可以用try…except改寫成:
import subprocess try: r = subprocess.run("ping google", shell=True, check=True) print(r.stdout.decode('cp950')) except subprocess.CalledProcessError: print("命令出錯了!")
執行結果一樣是顯示“命令出錯了!”。
使用run()方法取代Popen()
底下是檢查已下載的YouTube影片是否包含聲音的自訂函式,使用run()取代Popen(),程式碼稍微簡短一些:
def check_media(filename): # 取代subprocess.Popen() out = subprocess.run(["ffprobe", filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if (out.stdout.decode('utf-8').rfind('Audio') == -1): return -1 # 沒有聲音 else: return 1
搜尋字串的find()和rfind()方法
Python的字串物件具有搜尋字串內容的find()和rfind()方法(r代表“reverse”,反向),它們都會傳回搜尋到的目標字串的位置編號(從0開始);傳回-1代表找不到。 find()從字串開頭搜尋,rfind()則是從字串結尾往前搜尋,以這個字串為例:
txt = ("老腦筋可以變成新腦筋," "新腦筋不學習就會變成老腦筋。")
find()方法將傳回1,代表在txt字串的位置1找到“腦筋”。
txt.find("腦筋")
rfind()方法將傳回22,代表在txt字串的位置22找到“腦筋”。
txt.rfind("腦筋")
從YouTube影片下載(二)貼文的ffprobe命令執行結果可看出,代表聲音串流的“Audio”文字位於訊息的末尾,因此改用rfind()從後面往前搜尋,可以節省一點時間。
使用run()方法改寫下載YouTube影片的原始碼
這是改寫後的版本:
import argparse import os import platform from pytube import YouTube import subprocess args = {} fileobj = {} download_count = 1 def pyTube_folder(): sys = platform.system() home = os.path.expanduser('~') if sys == 'Windows': folder = os.path.join(home, 'Videos', 'PyTube') elif sys == 'Darwin': folder = os.path.join(home, 'Movies', 'PyTube') if not os.path.isdir(folder): # 若'PyTube'資料夾不存在… os.mkdir(folder) # 則新增資料夾 return folder def onProgress(stream, chunk, file_handle, remains): total = stream.filesize percent = (total-remains) / total * 100 print('下載中… {:05.2f}%'.format(percent), end='\r') def video_res(yt): res_set = set() video_list = yt.streams.filter(type="video").all() for v in video_list: res_set.add(v.resolution) return sorted(res_set, reverse=True, key=lambda s: int(s[:-1])) def download_media(args): try: yt = YouTube(args.url, on_progress_callback=onProgress, on_complete_callback=onComplete) except: print('下載影片時發生錯誤,請確認網路連線和YouTube網址無誤。') return filter = yt.streams.filter if args.a: # 只下載聲音 target = filter(type="audio").first() elif args.fhd: target = filter(type="video", resolution="1080p").first() elif args.hd: target = filter(type="video", resolution="720p").first() elif args.sd: target = filter(type="video", resolution="480p").first() else: target = filter(type="video").first() if target is None: print('沒有您指定的解析度,可用的解析度如下:') res_list = video_res(yt) for i, res in enumerate(res_list): print('{}) {}'.format(i+1, res)) val = input('請選擇(預設{}):'.format(res_list[0])) try: res = res_list[int(val)-1] except: res = res_list[0] print('您選擇的是 {} 。'.format(res)) target = filter(type="video", resolution=res).first() # 開始下載 target.download(output_path=pyTube_folder()) def check_media(filename): # 取代subprocess.Popen() out = subprocess.run(["ffprobe", filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if (out.stdout.decode('utf-8').rfind('Audio') == -1): return -1 # 沒有聲音 else: return 1 def merge_media(): temp_video = os.path.join(fileobj['dir'], 'temp_video.mp4') temp_audio = os.path.join(fileobj['dir'], 'temp_audio.mp4') temp_output = os.path.join(fileobj['dir'], 'output.mp4') cmd = f'ffmpeg -i {temp_video} -i {temp_audio} \ -map 0:v -map 1:a -c copy -y {temp_output}' try: # 取代subprocess.call() subprocess.run(cmd, shell=True) # 視訊檔重新命名 os.rename(temp_output, os.path.join(fileobj['dir'], fileobj['name'])) os.remove(temp_audio) os.remove(temp_video) print('視訊和聲音合併完成') except: print('視訊和聲音合併失敗') def onComplete(stream, file_handle): global download_count, fileobj fileobj['name'] = os.path.basename(file_handle.name) fileobj['dir'] = os.path.dirname(file_handle.name) print('\r') if download_count == 1: if check_media(file_handle.name) == -1: print('此影片沒有聲音') download_count += 1 try: # 視訊檔重新命名 os.rename(file_handle.name, os.path.join( fileobj['dir'], 'temp_video.mp4')) except: print('視訊檔重新命名失敗') return print('準備下載聲音檔') vars(args)['a'] = True # 設定成a參數 download_media(args) # 下載聲音 else: print('此影片有聲音,下載完畢!') else: try: # 聲音檔重新命名 os.rename(file_handle.name, os.path.join( fileobj['dir'], 'temp_audio.mp4')) except: print("聲音檔重新命名失敗") # 合併聲音檔 merge_media() def main(): global args parser = argparse.ArgumentParser() parser.add_argument("url", help="指定YouTube視訊網址") parser.add_argument("-sd", action="store_true", help="選擇普通(480P)畫質") parser.add_argument("-hd", action="store_true", help="選擇HD(720P)畫質") parser.add_argument("-fhd", action="store_true", help="選擇Full HD(1080P)畫質") parser.add_argument("-a", action="store_true", help="僅下載聲音") args = parser.parse_args() download_media(args) if __name__ == '__main__': main()