728x90
반응형
SMALL
YouTube 리뷰 수집용 웹 크롤러 만들기
from bs4 import BeautifulSoup
from selenium import webdriver
import time
import sys
import math
import re
import numpy
import pandas as pd
import xlwt
import random
import os
# 사용자에게 필요한 정보 얻기
query_txt = input("youtube에서 검색할 주제 키워드 입력(예:내셔널지오그래픽): ")
cnt = int(input('위 주제로 댓글을 크롤링할 youtube 영상은 몇 건?: '))
reple_cnt = int(input('각 동영상에서 추출할 댓글은 몇 건?: '))
f_dir = input("크롤링 결과를 저장할 폴더명(예:c:\data\): ")
print("\n")
print("요청한 정보로 데이터를 수집중")
# 저장될 파일위치와 이름 지정
now = time.localtime()
s = '%04d-%02d-%02d-%02d-%02d-%02d' % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)
os.makedirs(f_dir+s+'-'+query_txt)
os.chdir(f_dir+s+'-'+query_txt)
ff_name = f_dir+s+'-'+query_txt+'\\'+s+'-'+query_txt+'.txt'
fc_name = f_dir+s+'-'+query_txt+'\\'+s+'-'+query_txt+'.csv'
fx_name = f_dir+s+'-'+query_txt+'\\'+s+'-'+query_txt+'.xls'
s_time = time.time()
# 크롬 드라이버를 사용하여 웹 브라우저 실행 후 영상 검색
path = "C:\Temp\chromedriver_240\chromedriver.exe"
driver = webdriver.Chrome(path)
driver.get("https://www.youtube.com")
time.sleep(5)
element = driver.find_element_by_name("search_query")
element.send_keys(query_txt)
element.submit()
time.sleep(2)
# 화면을 이동하여 영상 목록 출력
# 검색 결과가 첫 화면에 20개
# 사용자가 입력한 동영상 개수가 20개가 넘어갈 경우 스크롤다운해서 갯수를 먼저 만든 후 URL을 추출하여 URL 완성해야 함
def scroll_down(driver):
driver.execute_script("window.scrollBy(0, 3000): ") # 한 페이지 20개씩 출력값
time.sleep(2)
if reple_cnt < 20:
page_cnt = 1
else:
page_cnt = math.ceil(reply_cnt / 20)
if cnt > 20:
i = 1
while(i <= page_cnt):
print("화면을 %s 회 아래로 스크롤" %i)
scroll_down(driver)
time.sleep(1)
i += 1
# 검색된 youtube 영상의 URL 추출
time.sleep(2)
html = driver.page_source
soup1 = BeautifulSoup(html, 'html.parser')
count = 0
item = []
print("\n")
for i in soup1.find_all('a', 'yt-simple-endpoint style-scope ytd-video-renderer'):
item.append(i['href'])
count += 1
if count == cnt:
break
print('검색된 총 %s 건 동영상의 댓글 추출' %cnt)
print("\n")
# 비트맵 이미지 아이콘을 위한 대체 딕셔너리 생성
bmp_map = dict.fromkeys(range(0x10000, sys.maxunicode + 1), 0xfffd)
# 영상의 댓글 출력
# 영상이 여러 개일 경우 앞 단계에서 추출된 영상의 URL을 하나씩 수행하여 댓글을 수집
# 수집된 전체 URL 중에서 사용자가 입력한 수만큼의 동영상의 댓글을 수집
# 먼저 전체 URL을 완성 -> 동영상 페이지 열기 -> 댓글 수집 -> 다음 동영상 순으로 진행
full_url = []
url_cnt = 0
for x in range(0, len(item)):
url_cnt += 1
url = 'https://www.youtube.com/' + item[x]
full_url.append(url)
if url_cnt == cnt:
break
play_cnt = 1
for y in range(0, len(full_url)):
driver.get(full_url[y])
time.sleep(2)
print("\n")
print("\n")
print("%s번째 동영상의 정보를 수집" %play_cnt)
i = 1
while(i <= page_cnt):
scroll_down(driver)
time.sleep(1)
i += 1
html = driver.page_source
soup2 = BeautifulSoup(html, 'html.parser')
t_count_0 = soup2.select('#count')
t_count_1 = t_count_0[1].get_text()
t_count_2 = t_count_1.replace(".", "")
t_count_3 = re.search("\d+", t_count_2)
t_count_4 = int(t_count_3.group())
t_title_1 = soup2.select('#info-contents')
t_title_2 = t_title_1[0].find('h1').get_text()
t_title_3 = t_title_2.translate(bmp_map).replace("\n", "")
t_view_1 = t_count_0[1].get_text().replace("\n", "")
t_view_2 = t_view_1.replace(".", "")
t_view_3 = re.search("\d+", t_view_2)
t_view_4 = int(t_view_3.group())
print("=" *80)
print("%s번째 동영상의 조회수는 %s 회이고 수집할 댓글은 총 %s개" %(play.cnt, t_count_4, t_view_4))
print("%s번째 동영상의 제목은 %s" %(play_cnt, t_title_3))
print("=" *80)
print("\n")
print("%s번째 동영상에서 댓글 수집을 시작" %play_cnt)
print("댓글의 개수가 많아서 대기")
print("\n")
# 화면을 스크롤해서 요청된 댓글 수만큼 아래로 이동 후 댓글 수집
# page_cnt = math.ceil(t_view_4 / 19) # 각 동영상의 전체 댓글을 수집할 경우 이 코드 사용
i = 1
while(i <= page_cnt + 1):
print("%s번째 페이지의 댓글을 수집중" %i)
scroll_down(driver)
time.sleep(0.5)
i += 1
url2 = []
reple2 = []
reple3 = []
reple4 = []
writer2 = []
wdate2 = []
time.sleep(2)
html = driver.page_source
soup3 = BeautifulSoup(html, 'html.parser')
count = 0
d2 = 0
reple_result = soup3.select('#comments > #selections > #contents')
for a in range(0, reple_cnt + 1):
count += 1
f = open(ff_name, 'a', encoding = 'UTF-8')
f.write("\n")
f.write("-----------------------------------------------" + "\n")
# 댓글 작성자
try:
writer = reple_result[0].select("#header-author > #author-text")[a].get_text().replace("\n", "").strip()
except IndexError:
break
else:
print("\n")
print("%s번째 영상의 %s번째 댓글" %(play_cnt.count))
f.write("%s번째 영상의 %s번째 댓글" %(play_cnt.count) + "\n")
print("-" *70)
# youtube URL 주소
print("URL 주소: ", full_url[y])
f.write("youtube URL 주소: " + full_url[y] + "\n")
url2.append(full_url[y])
print("댓글 작성자명: ", writer)
f.write("댓글 작성자명: ", + writer + "\n")
writer2.append(writer)
# 댓글 내용
reple1 = reple_result[0].select('#content-text')[a].get_text().replace("\n", "")
reple2 = reple1.translate(bmp_map).replaece("\n", "")
print("댓글 내용: ", reple2)
f.write("댓글 내용: " + reple2 + "\n")
reple3.append(reple2)
f.close()
if count == reple_cnt:
break
time.sleep(2)
play_cnt += 1
# 엑셀 형태로 저장
youtube_reple = pd.DataFrame()
youtube_reple['URL 주소'] = url2
youtube_reple['댓글 작성자명'] = pd.Series(writer2)
youtube_reple['댓글 내용'] = pd.Series(reple3)
# csv 형태로 저장
youtube_reple.to_csv(fc_name.encoding="utf-8-sig", index=True)
# 엑셀 형태로 저장
youtube_reple.to_excel(fx_name, index=True)
e_time = time.time()
t_time = e_time - s_time
# txt 파일에 크롤링 요약 정보 저장
orig_stdout = sys.stdout
f = open(ff_name, 'a', encoding='UTF-8')
sys.stdout = f
print("\n")
print("=" *50)
print("총 소요시간은 %s초," %round(t_time, 1))
print("총 저장 건수는 %s건" %(count*cnt))
print("=" *50)
sys.stdout = orig_stdout
f.close()
# 요약 정보를 화면에 출력
print("\n")
print("=" *80)
print("요청된 총 %s건 동영상 리뷰 중에서 실제 크롤링 된 리뷰수는 각 %s건" %/(cnt, count))
print("총 소요시간은 %초" %round(t_time, 1))
print("파일 저장 완료 : txt 파일명 : %s" %ff_name)
print("파일 저장 완료 : csv 파일명 : %s" %fc_name)
print("파일 저장 완료 : xls 파일명 : %s" %fx_name)
print("=" *80)
driver.close()
728x90
반응형
LIST
'App Programming > Web Crawler' 카테고리의 다른 글
[Web Crawler] 아마존 닷컴 베스트셀러 상품 정보 수집용 웹 크롤러 만들기 (0) | 2022.03.03 |
---|---|
[Web Crawler] 구글 이미지 다운로드용 웹 크롤러 만들기 (0) | 2022.03.03 |
[Web Crawler] 데이터 수집용 웹 크롤러 만들기 (2) (0) | 2022.02.16 |
[Web Crawler] 데이터 수집용 웹 크롤러 만들기 (1) (0) | 2022.02.16 |
[Web Crawler] 특정 게시글의 상세 내용 수집하기 (0) | 2022.02.16 |