台灣點歌王爬蟲試作

在課堂上課時候老師突然說了一句:「如果大家無聊的話可以試著爬取這個網站的資料並分享出來」,這麼看起來老師也是個喜歡唱歌的人~


注意事項

  1. 此程式碼預設是爬取網站裡全部公司的歌曲(歌曲長度1至11以上)
  2. 一開始請先以小範圍開始爬取,過多的請求會造成長時間的等待和伺服器的壓力增加
  3. 函數multiprocessing()裡核心數為8,如果電腦邏輯處理器數低於8可能會報錯,請適當修改num變數
# 最終成品
# 單一進程獲取全部API資料
# 增加分割list
# 使用多線程獲取歌詞
# 增加資料庫
# 嘗試例外處理
# 小規模除錯,以公司查詢
# 多核心爬取測試

import requests
import json
import time
import pymysql
import threading
import multiprocessing as mp
from queue import Queue
from bs4 import BeautifulSoup

# 使用網頁ID爬取並補齊資料
def get_lyrics(songDetailID):
    get_html = 'https://song.corp.com.tw/mv.aspx?id='
    youtu = 'https://www.youtube.com/watch?v='
    url_link = get_html + songDetailID
    html = requests.get(url_link)
    html.encoding = 'utf-8'
    sp = BeautifulSoup(html.text, 'lxml')
    albumName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumName_lg')
    if albumName != None:
        albumName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumName_lg').text
    else:
        albumName = ''

    albumDate = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumDate_lg')
    if albumDate != None:
        albumDate = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_albumDate_lg').text
    else:
        albumDate = ''
    
    artistName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_artistName_lg')
    if artistName != None:
        artistName = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_artistName_lg').text
    else:
        artistName = ''
    
    lyrics = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_lyrics_lg')
    if lyrics != None:
        lyrics = sp.find(id='ContentPlaceHolder1_ContentPlaceHolder1_lyrics_lg').text
    else:
        lyrics = ''
    
    youtu_value = sp.select_one('#ContentPlaceHolder1_ContentPlaceHolder1_youtubeID').get('value')
    if youtu_value != None:
        youtu_link = youtu + youtu_value
    else:
        youtu_link = ''
    
    fill = {'albumName':albumName, 'albumDate':albumDate, 'artistName':artistName, 'lyrics':lyrics, 'youtu_link':youtu_link}
    return fill
    
    

# 使用API獲取第一步的資料
def get_api():
    url = "https://song.corp.com.tw/api/song.aspx"
    my_payload = {
        'company':'錢櫃',
        'cusType':'', 
        'minId': 0,
        'oid':'' ,
        'lang':'國',
        'board': '',
        'keyword':'',
        'singer':'',
        'sex':'',
        'Len': 0,
        'songDate':'null'
    }

    '''
    # 參考用回復
    APIResponse = {
        'seq': 39, 
        'id': 62715, 
        'code': '62117', 
        'name': 'I', 
        'singer': '光良', 
        'lang': '國', 
        'sex': '男', 
        'company': '音圓', 
        'songDate': '000-00', 
        'subname': '', 
        'songDetailID': '58358', 
        'counter': 1063, 
        'dateSorter': 0, 
        'len': 1, 
        'artistIMG': 'https://i.kfs.io/artist/global/6656,0v5/fit/160x160.jpg'
        }
    '''
    company_list = ['音圓', '弘音', '金嗓', '瑞影', '點將家', '嘉揚', 
                    '音圓原廠', '音影', '美華', '金影', '音遊', '金嗓/投幣',
                    '大唐', '錢櫃', '好樂迪', '星據點', '銀櫃', '享溫馨', 'MV'
                    ]
    lang_list = ['台', '國', '日', '客', '粵', '英', '韓', '山', '兒']
    songData = []
    for l in company_list:
        my_payload['company'] = l
        for k in lang_list:
            my_payload['lang'] = k
            for j in range(1,12):
                my_payload['Len'] = j

                while True:
                    try:
                        html = requests.get(url, params=my_payload)
                        get_json = json.loads(html.text)
                    except:
                        print('get失敗!嘗試第二次get!錯誤payload為:',my_payload)
                        html = requests.get(url, params=my_payload)
                        get_json = json.loads(html.text)
                        print('二次get成功!')
                    
                    if len(get_json) <= 0:
                        break
                    
                    for i in get_json:
                        songData.append(i)
                        
                    my_payload['minId'] = i['seq']
                    print('抓取公司名:{}、{}語、歌曲名長度為{},目前共找到{}首歌'.format(my_payload['company'], k,j,len(songData)))

                my_payload['minId'] = 0

    print('API_get完成!')
    return songData

# 分割字串並把資料分配給不同核心運算
def split(songData,num):
    song_2dlist = []
    for i in range(num):
        song_2dlist.append([])
    
    list_index = -1
    for j in songData:
        if list_index < (len(song_2dlist) - 1):
            list_index = list_index + 1
        else:
            list_index = 0

        song_2dlist[list_index].append(j)
    
    return song_2dlist

# 補齊資料,完善get到的API資料
def fill_up(song_2dlist_index, q):

    song_len = len(song_2dlist_index)
    count = 0
    count_30 = True
    count_60 = True
    count_90 = True
    for i in song_2dlist_index:
        try:
            fill = get_lyrics(i['songDetailID'])
            i.update(fill)
        except:
            print('補齊資料錯誤一次!嘗試二次連線!資料為:',i)
            fill = get_lyrics(i['songDetailID'])
            i.update(fill)
            print('二次連線成功!')
        

        count = count + 1

        if count > (song_len*0.9) and count_90:
            print('某線程已完成90%')
            count_90 = False
        elif count > (song_len*0.6) and count_60:
            print('某線程已完成60%')
            count_60 = False
        elif count > (song_len*0.3) and count_30:
            print('某線程已完成30%')
            count_30 = False
    print('某線程完成')
    q.put(song_2dlist_index)


# 使用多線程增加爬蟲速度
def multithreading(song_2dlist_8p,qq):
    q = Queue()
    threads = []
    num = 8
    song_2dlist_8t = split(song_2dlist_8p, num)

    for i in range(num):
        threads.append(threading.Thread(target=fill_up, args=(song_2dlist_8t[i], q)))
        threads[i].start()
    
    for thread in threads:
        thread.join()

    results_t = []
    for _ in range(num):
        results_t.append(q.get())
    
    qq.put(results_t)
    

# 使用多進程增加效率
def multiprocessing(songData):
    qq = mp.Queue()
    processlist = []
    num = 8
    song_2dlist_8p = split(songData, num)

    for i in range(num):
        processlist.append(mp.Process(target=multithreading, args=(song_2dlist_8p[i],qq)))
        processlist[i].start()
    
    for process in processlist:
        process.join
    
    
    results = []
    for _ in range(num):
        results.append(qq.get())
    print('results:type',type(results), 'results:len', len(results))
    return results
    



# 將處理好的資料存進資料庫
def my_sql(results_index):

    con = pymysql.connect(host='localhost', port=3306, user='root', passwd='xxxxxx',
                            charset='utf8', db='testdb'
                            )
    
    with con.cursor() as cursor:
        sql = '''
        INSERT INTO song (id, artistIMG, code, company, counter, dateSorter, lang, len, name, seq, sex, singer, songDate, songDetailID, subname, albumName, albumDate, artistName, lyrics, youtu_link) VALUES
        '''
        for i in results_index:
            tuple_count = (i['id'], i['artistIMG'], i['code'], i['company'], i['counter'], i['dateSorter'], i['lang'], 
                            i['len'], i['name'], i['seq'], i['sex'], i['singer'], i['songDate'], i['songDetailID'], 
                            i['subname'], i['albumName'], i['albumDate'], i['artistName'], i['lyrics'], i['youtu_link']
                            )
            sql = sql + str(tuple_count) + ',\n'
        sql = sql[:-2] + ';'
        #sql = sql.replace("'None'", "NULL").replace("None", "NULL")
        cursor.execute(sql)
        con.commit()
    con.close()
    print('資料庫連線完成!')


if __name__ == '__main__':
    t = time.time()
    songData = get_api()
    results = multiprocessing(songData)
    for i in results:
        for j in i:
            my_sql(j)
    print('耗時為:', (time.time() - t), '秒')

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *