YouTube影片下載(四):透過subprocess.run()執行外部命令

Python的subprocess程式庫在Python 3.5版新增了run()方法,它能擷取外部命令的輸出(stdout)或錯誤訊息(stderr),可替代原本的call()以及Popen()方法。

本文將介紹subpricess程式庫的下列兩個方法,並使用run()改寫YouTube影片下載(一)的原始碼。

  • check_output():執行外部命令並傳回外部命令的輸出文字
  • run():執行外部命令並儲存外部命令的輸出文字

使用check_output()執行外部命令並擷取輸出文字

check_output()的外部命令參數,預設採用字串列表格式傳入。透過它執行ping google.com命令的敘述如下,check_output()將傳回外部命令的輸出文字:

import subprocess

txt = subprocess.check_output(["ping", "google.com"])
print(txt.decode('cp950'))

傳回的位元組字串透過cp950 (big5)解碼還原成繁體中文字串;在Linux或macOS的終端機,請改成底下的命令,並使用utf-8解碼。

import subprocess

txt = subprocess.check_output(["ping", "-c 4", "google.com"])
print(txt.decode('utf-8'))

使用run()執行外部命令

Python官方文件建議使用run()取代check_output()。

run()其實是把Popen()重新包裝、簡化的方法。以擷取ping命令的輸出為例,跟Popen()方法一樣,外部命令預設採用字串列表格式,並透過stdout參數把外部命令的輸出導入Python程式

import subprocess

r = subprocess.run(["ping", "google.com"], stdout=subprocess.PIPE)

run()將傳回一個CompletedProcess類型物件,其中包含命令是否順利執行的returncode(直譯為「傳回代碼」)屬性,若returncode不是0,代表命令執行過程出現問題。例如,上面的指令執行後,變數r將包含如下內容:

CompletedProcess(args=['ping', 'google.com'], returncode=0, stdout=b'\r\nPing google.com [216.58.200.46] (\xa8\xcf\xa5\xce 32 \xa6\xec\xa4\xb8\xb2\xd5\xaa\xba\xb8\xea\xae\xc6):\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\xa6^\xc2\xd0\xa6\xdb 216.58.200.46: \xa6\xec\xa4\xb8\xb2\xd5=32 \xae\xc9\xb6\xa1=4ms TTL=56\r\n\r\n216.58.200.46 \xaa\xba Ping \xb2\xce\xadp\xb8\xea\xae\xc6:\r\n \xab\xca\xa5]: \xa4w\xb6\xc7\xb0e = 4\xa1A\xa4w\xa6\xac\xa8\xec = 4, \xa4w\xbf\xf2\xa5\xa2 = 0 (0% \xbf\xf2\xa5\xa2)\xa1A\r\n\xa4j\xac\xf9\xaa\xba\xa8\xd3\xa6^\xae\xc9\xb6\xa1 (\xb2@\xac\xed):\r\n \xb3\xcc\xa4p\xad\xc8 = 4ms\xa1A\xb3\xcc\xa4j\xad\xc8 = 4ms\xa1A\xa5\xad\xa7\xa1 = 4ms\r\n')

CompletedProcess物件的returncode屬性值為0,代表外部命令順利執行無誤。

外部命令輸出到終端機的文字,被轉存在CompletedProcess物件的stdout屬性,底下敘述採用Windows的“cp950”(big5)解碼stdout的位元組字串:

print(r.stdout.decode('cp950'))

它將顯示如下內容:

Ping google.com [216.58.200.46] (使用 32 位元組的資料):
回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56
回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56
回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56
回覆自 216.58.200.46: 位元組=32 時間=4ms TTL=56

216.58.200.46 的 Ping 統計資料:
封包: 已傳送 = 4,已收到 = 4, 已遺失 = 0 (0% 遺失), 
大約的來回時間 (毫秒):
最小值 = 4ms,最大值 = 4ms,平均 = 4ms

確認外部命令是否正確執行的兩個方式

判斷returncode屬性值是否等於0,便能知道外部命令是否運作順利,例如:

import subprocess

r = subprocess.run(["ping", "google"], stdout=subprocess.PIPE)

if (r.returncode == 0):
    print(r.stdout.decode('cp950'))
else:
    print("命令出錯了!")

執行結果將顯示“命令出錯了!”,因為google的網址後面少了“.com”。

確認命令是否順利執行的第二個方式,是將run()的check(直譯為「檢查」)參數設定成True。如此,當命令出現問題,它將引發“subprocess.CalledProcessError”類型的例外。上一段程式碼可以用try…except改寫成:

import subprocess

try:
    r = subprocess.run("ping google", shell=True, check=True)
    print(r.stdout.decode('cp950'))
except subprocess.CalledProcessError:
    print("命令出錯了!")

執行結果一樣是顯示“命令出錯了!”。

使用run()方法取代Popen()

底下是檢查已下載的YouTube影片是否包含聲音的自訂函式,使用run()取代Popen(),程式碼稍微簡短一些:

def check_media(filename):
    # 取代subprocess.Popen()
    out = subprocess.run(["ffprobe", filename],
                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if (out.stdout.decode('utf-8').rfind('Audio') == -1):
        return -1  # 沒有聲音
    else:
        return 1

搜尋字串的find()和rfind()方法

Python的字串物件具有搜尋字串內容的find()rfind()方法(r代表“reverse”,反向),它們都會傳回搜尋到的目標字串的位置編號(從0開始);傳回-1代表找不到。 find()從字串開頭搜尋,rfind()則是從字串結尾往前搜尋,以這個字串為例:

txt = ("老腦筋可以變成新腦筋,"
      "新腦筋不學習就會變成老腦筋。")

find()方法將傳回1,代表在txt字串的位置1找到“腦筋”。

txt.find("腦筋")

rfind()方法將傳回22,代表在txt字串的位置22找到“腦筋”。

txt.rfind("腦筋")

YouTube影片下載(二)貼文的ffprobe命令執行結果可看出,代表聲音串流的“Audio”文字位於訊息的末尾,因此改用rfind()從後面往前搜尋,可以節省一點時間。

使用run()方法改寫下載YouTube影片的原始碼

這是改寫後的版本:

import argparse
import os
import platform
from pytube import YouTube
import subprocess

args = {}
fileobj = {}
download_count = 1


def pyTube_folder():
    sys = platform.system()
    home = os.path.expanduser('~')

    if sys == 'Windows':
        folder = os.path.join(home, 'Videos', 'PyTube')
    elif sys == 'Darwin':
        folder = os.path.join(home, 'Movies', 'PyTube')

    if not os.path.isdir(folder):  # 若'PyTube'資料夾不存在…
        os.mkdir(folder)        # 則新增資料夾

    return folder


def onProgress(stream, chunk, file_handle, remains):
    total = stream.filesize
    percent = (total-remains) / total * 100
    print('下載中… {:05.2f}%'.format(percent), end='\r')


def video_res(yt):
    res_set = set()
    video_list = yt.streams.filter(type="video").all()

    for v in video_list:
        res_set.add(v.resolution)

    return sorted(res_set, reverse=True, key=lambda s: int(s[:-1]))


def download_media(args):
    try:
        yt = YouTube(args.url, on_progress_callback=onProgress,
                     on_complete_callback=onComplete)
    except:
        print('下載影片時發生錯誤,請確認網路連線和YouTube網址無誤。')
        return

    filter = yt.streams.filter

    if args.a:  # 只下載聲音
        target = filter(type="audio").first()
    elif args.fhd:
        target = filter(type="video", resolution="1080p").first()
    elif args.hd:
        target = filter(type="video", resolution="720p").first()
    elif args.sd:
        target = filter(type="video", resolution="480p").first()
    else:
        target = filter(type="video").first()

    if target is None:
        print('沒有您指定的解析度,可用的解析度如下:')
        res_list = video_res(yt)

        for i, res in enumerate(res_list):
            print('{}) {}'.format(i+1, res))

        val = input('請選擇(預設{}):'.format(res_list[0]))

        try:
            res = res_list[int(val)-1]
        except:
            res = res_list[0]

        print('您選擇的是 {} 。'.format(res))
        target = filter(type="video", resolution=res).first()

    # 開始下載
    target.download(output_path=pyTube_folder())


def check_media(filename):
    # 取代subprocess.Popen()
    out = subprocess.run(["ffprobe", filename],
                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if (out.stdout.decode('utf-8').rfind('Audio') == -1):
        return -1  # 沒有聲音
    else:
        return 1


def merge_media():
    temp_video = os.path.join(fileobj['dir'], 'temp_video.mp4')
    temp_audio = os.path.join(fileobj['dir'], 'temp_audio.mp4')
    temp_output = os.path.join(fileobj['dir'], 'output.mp4')

    cmd = f'ffmpeg -i {temp_video} -i {temp_audio} \
        -map 0:v -map 1:a -c copy -y {temp_output}'
    try:
        # 取代subprocess.call()
        subprocess.run(cmd, shell=True)
        # 視訊檔重新命名
        os.rename(temp_output, os.path.join(fileobj['dir'], fileobj['name']))
        os.remove(temp_audio)
        os.remove(temp_video)
        print('視訊和聲音合併完成')
    except:
        print('視訊和聲音合併失敗')


def onComplete(stream, file_handle):
    global download_count, fileobj
    fileobj['name'] = os.path.basename(file_handle.name)
    fileobj['dir'] = os.path.dirname(file_handle.name)
    print('\r')

    if download_count == 1:
        if check_media(file_handle.name) == -1:
            print('此影片沒有聲音')
            download_count += 1
            try:
                # 視訊檔重新命名
                os.rename(file_handle.name, os.path.join(
                    fileobj['dir'], 'temp_video.mp4'))
            except:
                print('視訊檔重新命名失敗')
                return

            print('準備下載聲音檔')
            vars(args)['a'] = True  # 設定成a參數
            download_media(args)    # 下載聲音
        else:
            print('此影片有聲音,下載完畢!')
    else:
        try:
            # 聲音檔重新命名
            os.rename(file_handle.name, os.path.join(
                fileobj['dir'], 'temp_audio.mp4'))
        except:
            print("聲音檔重新命名失敗")
        # 合併聲音檔
        merge_media()


def main():
    global args
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="指定YouTube視訊網址")
    parser.add_argument("-sd", action="store_true", help="選擇普通(480P)畫質")
    parser.add_argument("-hd", action="store_true", help="選擇HD(720P)畫質")
    parser.add_argument("-fhd", action="store_true", help="選擇Full HD(1080P)畫質")
    parser.add_argument("-a", action="store_true", help="僅下載聲音")

    args = parser.parse_args()
    download_media(args)


if __name__ == '__main__':
    main()
Posts created 483

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

Related Posts

Begin typing your search term above and press enter to search. Press ESC to cancel.

Back To Top