台灣點歌王爬蟲試作
在課堂上課時候老師突然說了一句:「如果大家無聊的話可以試著爬取這個網站的資料並分享出來」,這麼看起來老師也是個喜歡唱歌的人~
注意事項:
- 此程式碼預設是爬取網站裡全部公司的歌曲(歌曲長度1至11以上)
- 一開始請先以小範圍開始爬取,過多的請求會造成長時間的等待和伺服器的壓力增加
- 函數multiprocessing()裡核心數為8,如果電腦邏輯處理器數低於8可能會報錯,請適當修改num變數
# 最終成品
# 單一進程獲取全部API資料
# 增加分割list
# 使用多線程獲取歌詞
# 增加資料庫
# 嘗試例外處理
# 小規模除錯,以公司查詢
# 多核心爬取測試
import requests
import json
import time
import pymysql
import threading
import multiprocessing as mp
from queue import Queue
from bs4 import BeautifulSoup
# 使用網頁ID爬取並補齊資料
def get_lyrics(songDetailID):
get_html = 'https://song.corp.com.tw/mv.aspx?id='
youtu = 'https://www.youtube.com/watch?v='
url_link = get_html + songDetailID
html = requests.get(url_link)
html.encoding = 'utf-8'
sp = BeautifulSoup(html.text, 'lxml')
albumName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumName_lg')
if albumName != None:
albumName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumName_lg').text
else:
albumName = ''
albumDate = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumDate_lg')
if albumDate != None:
albumDate = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumDate_lg').text
else:
albumDate = ''
artistName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_artistName_lg')
if artistName != None:
artistName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_artistName_lg').text
else:
artistName = ''
lyrics = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_lyrics_lg')
if lyrics != None:
lyrics = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_lyrics_lg').text
else:
lyrics = ''
youtu_value = sp.select_one('#ContentPlaceHolder1_ContentPlaceHolder1_youtubeID').get('value')
if youtu_value != None:
youtu_link = youtu + youtu_value
else:
youtu_link = ''
fill = {'albumName':albumName, 'albumDate':albumDate, 'artistName':artistName, 'lyrics':lyrics, 'youtu_link':youtu_link}
return fill
# 使用API獲取第一步的資料
def get_api():
url = "https://song.corp.com.tw/api/song.aspx"
my_payload = {
'company':'錢櫃',
'cusType':'',
'minId': 0,
'oid':'' ,
'lang':'國',
'board': '',
'keyword':'',
'singer':'',
'sex':'',
'Len': 0,
'songDate':'null'
}
'''
# 參考用回復
APIResponse = {
'seq': 39,
'id': 62715,
'code': '62117',
'name': 'I',
'singer': '光良',
'lang': '國',
'sex': '男',
'company': '音圓',
'songDate': '000-00',
'subname': '',
'songDetailID': '58358',
'counter': 1063,
'dateSorter': 0,
'len': 1,
'artistIMG': 'https://i.kfs.io/artist/global/6656,0v5/fit/160x160.jpg'
}
'''
company_list = ['音圓', '弘音', '金嗓', '瑞影', '點將家', '嘉揚',
'音圓原廠', '音影', '美華', '金影', '音遊', '金嗓/投幣',
'大唐', '錢櫃', '好樂迪', '星據點', '銀櫃', '享溫馨', 'MV'
]
lang_list = ['台', '國', '日', '客', '粵', '英', '韓', '山', '兒']
songData = []
for l in company_list:
my_payload['company'] = l
for k in lang_list:
my_payload['lang'] = k
for j in range(1,12):
my_payload['Len'] = j
while True:
try:
html = requests.get(url, params=my_payload)
get_json = json.loads(html.text)
except:
print('get失敗!嘗試第二次get!錯誤payload為:',my_payload)
html = requests.get(url, params=my_payload)
get_json = json.loads(html.text)
print('二次get成功!')
if len(get_json) <= 0:
break
for i in get_json:
songData.append(i)
my_payload['minId'] = i['seq']
print('抓取公司名:{}、{}語、歌曲名長度為{},目前共找到{}首歌'.format(my_payload['company'], k,j,len(songData)))
my_payload['minId'] = 0
print('API_get完成!')
return songData
# 分割字串並把資料分配給不同核心運算
def split(songData,num):
song_2dlist = []
for i in range(num):
song_2dlist.append([])
list_index = -1
for j in songData:
if list_index < (len(song_2dlist) - 1):
list_index = list_index + 1
else:
list_index = 0
song_2dlist[list_index].append(j)
return song_2dlist
# 補齊資料,完善get到的API資料
def fill_up(song_2dlist_index, q):
song_len = len(song_2dlist_index)
count = 0
count_30 = True
count_60 = True
count_90 = True
for i in song_2dlist_index:
try:
fill = get_lyrics(i['songDetailID'])
i.update(fill)
except:
print('補齊資料錯誤一次!嘗試二次連線!資料為:',i)
fill = get_lyrics(i['songDetailID'])
i.update(fill)
print('二次連線成功!')
count = count + 1
if count > (song_len*0.9) and count_90:
print('某線程已完成90%')
count_90 = False
elif count > (song_len*0.6) and count_60:
print('某線程已完成60%')
count_60 = False
elif count > (song_len*0.3) and count_30:
print('某線程已完成30%')
count_30 = False
print('某線程完成')
q.put(song_2dlist_index)
# 使用多線程增加爬蟲速度
def multithreading(song_2dlist_8p,qq):
q = Queue()
threads = []
num = 8
song_2dlist_8t = split(song_2dlist_8p, num)
for i in range(num):
threads.append(threading.Thread(target=fill_up, args=(song_2dlist_8t[i], q)))
threads[i].start()
for thread in threads:
thread.join()
results_t = []
for _ in range(num):
results_t.append(q.get())
qq.put(results_t)
# 使用多進程增加效率
def multiprocessing(songData):
qq = mp.Queue()
processlist = []
num = 8
song_2dlist_8p = split(songData, num)
for i in range(num):
processlist.append(mp.Process(target=multithreading, args=(song_2dlist_8p[i],qq)))
processlist[i].start()
for process in processlist:
process.join
results = []
for _ in range(num):
results.append(qq.get())
print('results:type',type(results), 'results:len', len(results))
return results
# 將處理好的資料存進資料庫
def my_sql(results_index):
con = pymysql.connect(host='localhost', port=3306, user='root', passwd='xxxxxx',
charset='utf8', db='testdb'
)
with con.cursor() as cursor:
sql = '''
INSERT INTO song (id, artistIMG, code, company, counter, dateSorter, lang, len, name, seq, sex, singer, songDate, songDetailID, subname, albumName, albumDate, artistName, lyrics, youtu_link) VALUES
'''
for i in results_index:
tuple_count = (i['id'], i['artistIMG'], i['code'], i['company'], i['counter'], i['dateSorter'], i['lang'],
i['len'], i['name'], i['seq'], i['sex'], i['singer'], i['songDate'], i['songDetailID'],
i['subname'], i['albumName'], i['albumDate'], i['artistName'], i['lyrics'], i['youtu_link']
)
sql = sql + str(tuple_count) + ',\n'
sql = sql[:-2] + ';'
#sql = sql.replace("'None'", "NULL").replace("None", "NULL")
cursor.execute(sql)
con.commit()
con.close()
print('資料庫連線完成!')
if __name__ == '__main__':
t = time.time()
songData = get_api()
results = multiprocessing(songData)
for i in results:
for j in i:
my_sql(j)
print('耗時為:', (time.time() - t), '秒')