-
Notifications
You must be signed in to change notification settings - Fork 0
/
m3u8_downloader.py
115 lines (95 loc) · 3.91 KB
/
m3u8_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# coding: utf-8
"""
Auther: Guo Yang <guoyang@webmail.hzau.edu.cn>
Version: 0.1.2
"""
import os
import sys
import requests
import m3u8
def check_path(_path):
# check if download path exit
if os.path.isdir(_path) or os.path.isabs(_path):
# check if download path is empty
if not os.listdir(_path):
return _path
else:
print('>>>[-] folder not empty, continue?')
flag = input('>>>[*] Yes:1 No:2 \n>>>[+] : ')
try:
if flag == '1':
return _path
else:
_path = input('>>>[+] input download path:\n>>>[+] : ')
check_path(_path)
except Exception as e:
print(e)
exit(0)
else:
os.makedirs(_path)
return _path
def m3u8_video_download(m3u8_url, url_static, _path='/Users/simonguo/Downloads/m3u8download'):
m3u8_obj = m3u8.load(m3u8_url)
full_web_url = []
for file in m3u8_obj.files:
# print(file)
full_web_url.append(url_static + file)
os.chdir(_path)
print('>>>[+] downloading...')
print('-' * 60)
error_get = []
file_No = 0
err_No = 0
for _url in full_web_url:
# option 1: .ts files are stored using its original name on server
# movie_name = _url.split('/')[11] # in my case name part is at 12th place in URL (sep with '/')
try:
# 'Connection':'close' prevent port's occupation
# timeout=30 prevent timeout from being too long
movie = requests.get(_url, headers = {'Connection':'close'}, timeout=60)
# option 2: rename .ts files into numeric order like 1.ts, 2.ts, ... good for join
file_No += 1
movie_name = str(file_No)+'.ts'
with open(movie_name, 'wb') as movie_content:
movie_content.writelines(movie)
# option 1: output current file's name
# print('>>>[+] File ', movie_name, ' in total ', len(full_web_url), ' files\tdone')
# option 2: output progress bar
prograss = file_No * 50 / len(full_web_url)
sys.stdout.write('\r>>>[+] progress: [%s%s] %d/%d files done' %('|' * round(prograss), (' ' * (50-round(prograss))), file_No, len(full_web_url)))
sys.stdout.flush()
# catch exception, log bad request
except Exception as e:
error_get.append(_url)
continue
if error_get:
print('Warning: Total of %d request(s) failed, Retrying...' % len(error_get))
print('-' * 60)
for _url_err in error_get:
try:
movie = requests.get(_url, headers = {'Connection':'close'}, timeout=60)
err_No += 1
movie_name = 'err_No_'+str(err_No)+'.ts'
with open(movie_name, 'wb') as movie_content:
movie_content.writelines(movie)
except Exception as e:
print('Error: ', _url_err, 'retry failed...log: ', e)
continue
# all file good, output msg
else:
print('\n>>>[+] Download successfully!!!')
# join .ts files
for filename in range(1, len(full_web_url)+1):
os.system('cat ' + str(filename) + '.ts >> full.ts')
if __name__ == '__main__':
try:
_m3u8_url = input('>>>[+] input .m3u8 URL (absolute path on computer or full URL link on Internet):\n>>>[+] : ')
_url_static = input('>>>[+] input static URL for .ts files on server (end with \'/\'):\n>>>[+] : ')
_download_path = input('>>>[+] input download path (default path is \'default\'): \n>>>[+] : ')
if _download_path == 'default':
m3u8_video_download(_m3u8_url, _url_static)
else:
_download_path_checked = check_path(_download_path)
m3u8_video_download(_m3u8_url, _url_static, _download_path_checked)
except Exception as e:
print(e)