資訊領域的發展一日千里,網站擷取這門技術可以說是走在技術最前端,無論是資料保護、偵測機器人Bot、維護伺服器Server資源,還是各種巧取資料的特殊技巧,彼此是競爭的關係,每分每秒總是有人在嘗試各種手段進行資料收集與被收集的攻防戰,因此這些技術極容易過時,儘管如此,本書作者提供了許多良好的編程與解決問題的觀念與想法,能夠讓我們在技術快速更迭之下,有所依循。
本書建議有基礎Python能力的朋友閱讀,而我是在閱讀《Python自動化的樂趣-搞定重複瑣碎&單調無聊的工作》(Automate the Boring Stuff with Python: Practical Programming for
Total Beginners)這本由Al Sweigart著作的書籍之後,發覺這兩本書的內容可以順利銜接,在《Python自動化的樂趣》一書介紹的多數套件模組,在《網站擷取》一書中也都將繼續使用。
第一章:您的第一個Scraper
補充資料
l 在命令提示字元輸入pip install virtualenv,下載並安裝Virtualenv。
l 在命令提示字元用虛擬環境確保類別庫整潔:
# 在目前工作目錄建立scrapingEnv資料夾
# 建議在未來統一使用Anaconda虛擬環境
virtualenv scrapingEnv
cd scrapingEnv
scripts\activate
deactivate
# 在目前工作目錄建立scrapingEnv資料夾
# 建議在未來統一使用Anaconda虛擬環境
virtualenv scrapingEnv
cd scrapingEnv
scripts\activate
deactivate
Requests與BeautifulSoup
l 在命令提示字元輸入pip install requests,下載並安裝Requests。
l 在命令提示字元輸入pip install beautifulsoup4,下載並安裝BeautifulSoup。
l import requests, bs4
aRes = requests.get("http://pythonscraping.com/pages/page1.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
print(aBeaSou.select("h1"))
print(aBeaSou.h1)
aRes = requests.get("http://pythonscraping.com/pages/page1.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
print(aBeaSou.select("h1"))
print(aBeaSou.h1)
第二章:進階HTML解析
BeautifulSoup物件的.find()與.findAll()方法
l .find()的意義等於.findAll()把limit設為1:
.find(tag, attributes, recursive, text, limit, keywords)
.findAll(tag, attributes, recursive, text, keywords)
tag: 標籤名稱字串或串列,如"span"、["h1", "h2"]
attributes: 屬性與比對內容的字典,如{"id": "text"}
recursive: True(預設)走訪標籤的子代,False只走訪最上層標籤
text: 標籤的文字內容,如text = "the prince"
keywords: 屬性與比對內容,如id = "text"
.find(tag, attributes, recursive, text, limit, keywords)
.findAll(tag, attributes, recursive, text, keywords)
tag: 標籤名稱字串或串列,如"span"、["h1", "h2"]
attributes: 屬性與比對內容的字典,如{"id": "text"}
recursive: True(預設)走訪標籤的子代,False只走訪最上層標籤
text: 標籤的文字內容,如text = "the prince"
keywords: 屬性與比對內容,如id = "text"
l import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/warandpeace.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll("span", {"class": "green"}):
print(i.get_text())
aRes = requests.get("http://www.pythonscraping.com/pages/warandpeace.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll("span", {"class": "green"}):
print(i.get_text())
走訪樹狀結構
l 處理子代(.children)與子孫(.descendants):
# 如果只想尋找子代的話,可以使用.children標籤;如果想尋找子代與子孫的話,可以使用.descendants標籤
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.find("table", {"id": "giftList"}).children:
print(i)
# 如果只想尋找子代的話,可以使用.children標籤;如果想尋找子代與子孫的話,可以使用.descendants標籤
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.find("table", {"id": "giftList"}).children:
print(i)
l 處理平輩(.next_siblings、.previous_siblings、.next_sibling、.previous_sibling):
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.find("table", {"id": "giftList"}).tr.next_siblings:
print(i)
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.find("table", {"id": "giftList"}).tr.next_siblings:
print(i)
l 處理親代(.parents、.parent):
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
print(aBeaSou.find("img", {"src": "../img/gifts/img1.jpg"}).parent.previous_sibling.get_text())
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
print(aBeaSou.find("img", {"src": "../img/gifts/img1.jpg"}).parent.previous_sibling.get_text())
整合BeautifulSoup與正規表達式
l 正規表達式:
import re
aBox = "Hell"
aRegex = re.compile(r"(?!"+aBox+")o World")
aMatch = aRegex.search("Hello World")
print(aMatch.group()) à o World
import re
aBox = "Hell"
aRegex = re.compile(r"(?!"+aBox+")o World")
aMatch = aRegex.search("Hello World")
print(aMatch.group()) à o World
l import requests, bs4, re
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll("img", {"src": re.compile("\.\./img/gifts/img.*\.jpg")}):
print(i["src"])
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll("img", {"src": re.compile("\.\./img/gifts/img.*\.jpg")}):
print(i["src"])
整合BeautifulSoup與Lambda表達式
l Lambda表達式:
square = lambda a: a**2
print(square(2)) à 4
square = lambda a: a**2
print(square(2)) à 4
l 整合時限制接受一個標籤物件引數,然後回傳Boolean:
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll(lambda tag: len(tag.attrs) == 2):
print(i)
print("\n")
import requests, bs4
aRes = requests.get("http://www.pythonscraping.com/pages/page3.html")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
for i in aBeaSou.findAll(lambda tag: len(tag.attrs) == 2):
print(i)
print("\n")
這本書的主軸是BeautifulSoup
l bs4除了BeautifulSoup物件、Tag物件,還有NavigableString物件用來表現標籤裡的文字,Comment物件用來尋找HTML註解標籤裡的文字。
第三章:開始爬行
走訪內部網域
l import requests, bs4, re, random, datetime
# 偽亂數與亂數種子
random.seed(datetime.datetime.now())
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# Wikipedia文章連結
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Keanu_Reeves")
while len(links) > 0:
aRandomLink = links[random.randint(0, len(links) - 1)].attrs["href"]
print(aRandomLink)
links = getLinks(aRandomLink)
# 偽亂數與亂數種子
random.seed(datetime.datetime.now())
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# Wikipedia文章連結
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Keanu_Reeves")
while len(links) > 0:
aRandomLink = links[random.randint(0, len(links) - 1)].attrs["href"]
print(aRandomLink)
links = getLinks(aRandomLink)
爬行整個網站
l import requests, bs4, re
links = set()
def getLinks(pageUrl):
global pages
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# Wikipedia標題
print(aBeaSou.h1.get_text())
# Wikipedia第一段文字
print(aBeaSou.find(id = "mw-content-text").p.get_text())
for link in aBeaSou.findAll("a", href = re.compile("^(/wiki/)")):
# 避免重複爬到同一個頁面,因為Python預設的遞迴深度限制(程式可以遞迴呼叫自己的次數)是1000次
if link.attrs["href"] not in links:
aNewLink = link.attrs["href"]
print("\n" + aNewLink)
links.add(aNewLink)
getLinks(aNewLink)
getLinks("")
links = set()
def getLinks(pageUrl):
global pages
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# Wikipedia標題
print(aBeaSou.h1.get_text())
# Wikipedia第一段文字
print(aBeaSou.find(id = "mw-content-text").p.get_text())
for link in aBeaSou.findAll("a", href = re.compile("^(/wiki/)")):
# 避免重複爬到同一個頁面,因為Python預設的遞迴深度限制(程式可以遞迴呼叫自己的次數)是1000次
if link.attrs["href"] not in links:
aNewLink = link.attrs["href"]
print("\n" + aNewLink)
links.add(aNewLink)
getLinks(aNewLink)
getLinks("")
走訪外部網域
l import requests, bs4, re, random, datetime
random.seed(datetime.datetime.now())
def getInternalLinks(aBeaSou, addressPart):
internalLinks = []
for link in aBeaSou.findAll("a", href = re.compile("^(/|.*"+addressPart+")")):
if not link.attrs["href"] in internalLinks:
internalLinks.append(link.attrs["href"])
return internalLinks
def getExternalLinks(aBeaSou, addressPart):
externalLinks = []
for link in aBeaSou.findAll("a", href = re.compile("^(http|www)((?!"+addressPart+").)*$")):
if not link.attrs["href"] in externalLinks:
externalLinks.append(link.attrs["href"])
return externalLinks
# 使用.replace()以空字串取代http://與https://
def splitAddress(pageUrl):
aMatch = re.compile(r"http://|https://").search(pageUrl)
if aMatch == None:
addressParts = pageUrl.split("/")
else:
addressParts = pageUrl.replace(aMatch.group(), "").split("/")
return addressParts[0]
def getRandomExternalLink(pageUrl):
aRes = requests.get(pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
externalLinks = getExternalLinks(aBeaSou, splitAddress(pageUrl))
if len(externalLinks) == 0:
internalLinks = getInternalLinks(aBeaSou, splitAddress(pageUrl))
return getRandomExternalLink(internalLinks[random.randint(0, len(internalLinks) - 1)])
else:
return externalLinks[random.randint(0, len(externalLinks) - 1)]
def followExternalOnly(pageUrl):
externalLink = getRandomExternalLink(pageUrl)
print("Random external link is: " + externalLink)
followExternalOnly(externalLink)
followExternalOnly("http://oreilly.com")
random.seed(datetime.datetime.now())
def getInternalLinks(aBeaSou, addressPart):
internalLinks = []
for link in aBeaSou.findAll("a", href = re.compile("^(/|.*"+addressPart+")")):
if not link.attrs["href"] in internalLinks:
internalLinks.append(link.attrs["href"])
return internalLinks
def getExternalLinks(aBeaSou, addressPart):
externalLinks = []
for link in aBeaSou.findAll("a", href = re.compile("^(http|www)((?!"+addressPart+").)*$")):
if not link.attrs["href"] in externalLinks:
externalLinks.append(link.attrs["href"])
return externalLinks
# 使用.replace()以空字串取代http://與https://
def splitAddress(pageUrl):
aMatch = re.compile(r"http://|https://").search(pageUrl)
if aMatch == None:
addressParts = pageUrl.split("/")
else:
addressParts = pageUrl.replace(aMatch.group(), "").split("/")
return addressParts[0]
def getRandomExternalLink(pageUrl):
aRes = requests.get(pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
externalLinks = getExternalLinks(aBeaSou, splitAddress(pageUrl))
if len(externalLinks) == 0:
internalLinks = getInternalLinks(aBeaSou, splitAddress(pageUrl))
return getRandomExternalLink(internalLinks[random.randint(0, len(internalLinks) - 1)])
else:
return externalLinks[random.randint(0, len(externalLinks) - 1)]
def followExternalOnly(pageUrl):
externalLink = getRandomExternalLink(pageUrl)
print("Random external link is: " + externalLink)
followExternalOnly(externalLink)
followExternalOnly("http://oreilly.com")
Anaconda
l
安裝Anaconda之後,在Anaconda Prompt或命令提示字元(需在「編輯系統環境變數」加入路徑,或於安裝時勾選加入路徑)輸入,管理conda:
# conda版本
conda --version
# 更新conda
conda update conda
conda update anaconda
# conda版本
conda --version
# 更新conda
conda update conda
conda update anaconda
l
安裝Anaconda之後,在Anaconda Prompt或命令提示字元(需在「編輯系統環境變數」加入路徑,或於安裝時勾選加入路徑)輸入,管理虛擬環境:
# 列出所有環境
conda info -e
conda info --envs
conda env list
# 新建環境
conda create -n 環境名稱 python=Python版本 模組名稱
conda create --name 環境名稱 python=Python版本 模組名稱
# 啟用環境
activate 環境名稱
# 退出環境
deactivate
# 複製環境
conda create -n 新環境名稱 --clone 舊環境名稱
# 刪除環境
conda remove -n 環境名稱 --all
conda env remove -n 環境名稱
# 列出所有環境
conda info -e
conda info --envs
conda env list
# 新建環境
conda create -n 環境名稱 python=Python版本 模組名稱
conda create --name 環境名稱 python=Python版本 模組名稱
# 啟用環境
activate 環境名稱
# 退出環境
deactivate
# 複製環境
conda create -n 新環境名稱 --clone 舊環境名稱
# 刪除環境
conda remove -n 環境名稱 --all
conda env remove -n 環境名稱
l
安裝Anaconda之後,在Anaconda Prompt或命令提示字元(需在「編輯系統環境變數」加入路徑,或於安裝時勾選加入路徑)輸入,管理Python:
# Python版本
python --version
# 檢查可以安裝的Python
conda search python
# 更新Python
conda update python
# Python版本
python --version
# 檢查可以安裝的Python
conda search python
# 更新Python
conda update python
l
安裝Anaconda之後,在Anaconda Prompt或命令提示字元(需在「編輯系統環境變數」加入路徑,或於安裝時勾選加入路徑)輸入,管理模組:
# 列出所有模組
conda list -n 環境名稱
# 更新所有模組
conda update --all -n 環境名稱
# 檢查可以安裝的模組
conda search 模組名稱
# 安裝模組
conda install -n 環境名稱 模組名稱
# 更新模組
conda update -n 環境名稱 模組名稱
# 解除安裝模組
conda remove -n 環境名稱 模組名稱
# 列出所有模組
conda list -n 環境名稱
# 更新所有模組
conda update --all -n 環境名稱
# 檢查可以安裝的模組
conda search 模組名稱
# 安裝模組
conda install -n 環境名稱 模組名稱
# 更新模組
conda update -n 環境名稱 模組名稱
# 解除安裝模組
conda remove -n 環境名稱 模組名稱
l 所有虛擬環境都存放在安裝Anaconda資料夾裡的envs資料夾裡。
Scrapy
l 步驟1-在Anaconda Prompt或命令提示字元輸入,建立虛擬環境、安裝Scrapy模組:
conda create -n pythonScraping
activate pythonScraping
conda install scrapy
conda create -n pythonScraping
activate pythonScraping
conda install scrapy
l 步驟2-在Anaconda Prompt或命令提示字元輸入,在目前工作目錄建立專案資料夾:
scrapy startproject wikiSpider
scrapy startproject wikiSpider
l 步驟3.1-在專案資料夾修改程式碼:
# 修改C:\Users\Timmy\wikiSpider\wikiSpider\items.py
from scrapy import Item, Field
# 每個Scrapy的Item物件都代表網站上的單一頁面
class Article(Item):
# 只收集每個單一頁面的標題
title = Field()
步驟3.2-在專案資料夾建立程式碼(範例1):
# 建立C:\Users\Timmy\wikiSpider\wikiSpider\spiders\articleSpiderOne.py
from scrapy import Spider
from wikiSpider.items import Article
class ArticleSpider(Spider):
name = "articleOne"
allowed_domains = ["en.wikipedia.org"]
start_urls = ["http://en.wikipedia.org/wiki/Main_Page", "http://en.wikipedia.org/wiki/Python_%28programming_language%29"]
def parse(self, response):
item = Article()
title = response.xpath("//h1/text()")[0].extract()
print("Title is: " + title)
item["title"] = title
return item
步驟3.2-在專案資料夾建立程式碼(範例2):
# 建立C:\Users\Timmy\wikiSpider\wikiSpider\spiders\articleSpiderTwo.py
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from wikiSpider.items import Article
class ArticleSpider(CrawlSpider):
name = "articleTwo"
allowed_domains = ["en.wikipedia.org"]
start_urls = ["http://en.wikipedia.org/wiki/Python_%28programming_language%29"]
rules = [Rule(LinkExtractor(allow = ("(/wiki/)((?!:).)*$"),), callback = "parse_item", follow = True)]
def parse_item(self, response):
item = Article()
title = response.xpath("//h1/text()")[0].extract()
print("Title is: " + title)
item["title"] = title
return item
# 修改C:\Users\Timmy\wikiSpider\wikiSpider\items.py
from scrapy import Item, Field
# 每個Scrapy的Item物件都代表網站上的單一頁面
class Article(Item):
# 只收集每個單一頁面的標題
title = Field()
步驟3.2-在專案資料夾建立程式碼(範例1):
# 建立C:\Users\Timmy\wikiSpider\wikiSpider\spiders\articleSpiderOne.py
from scrapy import Spider
from wikiSpider.items import Article
class ArticleSpider(Spider):
name = "articleOne"
allowed_domains = ["en.wikipedia.org"]
start_urls = ["http://en.wikipedia.org/wiki/Main_Page", "http://en.wikipedia.org/wiki/Python_%28programming_language%29"]
def parse(self, response):
item = Article()
title = response.xpath("//h1/text()")[0].extract()
print("Title is: " + title)
item["title"] = title
return item
步驟3.2-在專案資料夾建立程式碼(範例2):
# 建立C:\Users\Timmy\wikiSpider\wikiSpider\spiders\articleSpiderTwo.py
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from wikiSpider.items import Article
class ArticleSpider(CrawlSpider):
name = "articleTwo"
allowed_domains = ["en.wikipedia.org"]
start_urls = ["http://en.wikipedia.org/wiki/Python_%28programming_language%29"]
rules = [Rule(LinkExtractor(allow = ("(/wiki/)((?!:).)*$"),), callback = "parse_item", follow = True)]
def parse_item(self, response):
item = Article()
title = response.xpath("//h1/text()")[0].extract()
print("Title is: " + title)
item["title"] = title
return item
l 步驟4-在Anaconda Prompt或命令提示字元輸入,執行專案:
cd wikiSpider
# 用articleSpiderOne.py裡面由name = "article"定義的名稱呼叫Scraper
scrapy crawl articleOne
# 在專案資料夾輸出執行成果
scrapy crawl articleOne -o articles.csv -t csv
scrapy crawl articleOne -o articles.json -t json
scrapy crawl articleOne -o articles.xml -t xml
cd wikiSpider
# 用articleSpiderOne.py裡面由name = "article"定義的名稱呼叫Scraper
scrapy crawl articleOne
# 在專案資料夾輸出執行成果
scrapy crawl articleOne -o articles.csv -t csv
scrapy crawl articleOne -o articles.json -t json
scrapy crawl articleOne -o articles.xml -t xml
l 選擇性設定-顯示日誌紀錄,由高至低分為CRITICAL、ERROR、WARNING、DEBUG、INFO共5個層級:
# 修改C:\Users\Timmy\wikiSpider\wikiSpider\settings.py,增加一行程式碼
LOG_LEVEL = "ERROR"
# 在Anaconda Prompt或命令提示字元輸入,執行專案時僅顯示ERROR層級或以上的日誌紀錄
scrapy crawl articleOne
# 修改C:\Users\Timmy\wikiSpider\wikiSpider\settings.py,增加一行程式碼
LOG_LEVEL = "ERROR"
# 在Anaconda Prompt或命令提示字元輸入,執行專案時僅顯示ERROR層級或以上的日誌紀錄
scrapy crawl articleOne
l 選擇性設定-輸出日誌紀錄,由高至低分為CRITICAL、ERROR、WARNING、DEBUG、INFO共5個層級:
# 在Anaconda Prompt或命令提示字元輸入,在專案資料夾輸出日誌紀錄
scrapy crawl articleOne -s LOG_FILE=wiki.log
# 在Anaconda Prompt或命令提示字元輸入,在專案資料夾輸出日誌紀錄
scrapy crawl articleOne -s LOG_FILE=wiki.log
第四章:使用API
API
l 大多數現代API(Application Programming Interface)在使用前都需要先完成某些形式的認證,認證可使用後,API最常見的回應格式是XML(eXtensible Markup Language)與JSON(JavaScript Object Notation),近年來JSON的能見度比XML高。
l 絕大多數API都能以這4個動作完整操控:
import requests
# GET請求物件
aRes = requests.get("https://api.github.com/events")
# POST傳送物件
bRes = requests.post("http://httpbin.org/post", data = {"key": "value"})
# PUT更新物件
cRes = requests.put("http://httpbin.org/put", data = {"key": "value"})
# DELETE刪除物件
dRes = requests.delete("http://httpbin.org/delete")
import requests
# GET請求物件
aRes = requests.get("https://api.github.com/events")
# POST傳送物件
bRes = requests.post("http://httpbin.org/post", data = {"key": "value"})
# PUT更新物件
cRes = requests.put("http://httpbin.org/put", data = {"key": "value"})
# DELETE刪除物件
dRes = requests.delete("http://httpbin.org/delete")
整合Scrapers與API
l 此範例可搭配Visualization:GeoChart進行資料視覺化:
import requests, bs4, re, random, datetime, json
random.seed(datetime.datetime.now())
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
def getHistoryIPs(pageUrl):
pageUrl = pageUrl.replace("/wiki/", "")
historyUrl = "http://en.wikipedia.org/w/index.php?title=" + pageUrl + "&action=history"
print("History url is: " + historyUrl)
aRes = requests.get(historyUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# 只找class是mw-anonuserlink的連結,裡面是IP位址而不是帳號
ipAddresses = aBeaSou.findAll("a", {"class": "mw-anonuserlink"})
addressList = set()
for ipAddress in ipAddresses:
addressList.add(ipAddress.get_text())
return addressList
def getCountry(historyIP):
# This API endpoint is deprecated and will stop working on July 1st, 2018. For more information please visit: https://github.com/apilayer/freegeoip#readme
aRes = requests.get("http://freegeoip.net/json/" + historyIP)
aRes.raise_for_status()
stringOfJsonData = aRes.text
pythonValue = json.loads(stringOfJsonData)
return pythonValue["country_code"]
links = getLinks("/wiki/Python_(programming_language)")
while len(links) > 0:
for link in links:
historyIPs = getHistoryIPs(link.attrs["href"])
for historyIP in historyIPs:
country = getCountry(historyIP)
print(historyIP + " is from " + country)
aNewLink = links[random.randint(0, len(links) - 1)].attrs["href"]
links = getLinks(aNewLink)
import requests, bs4, re, random, datetime, json
random.seed(datetime.datetime.now())
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
def getHistoryIPs(pageUrl):
pageUrl = pageUrl.replace("/wiki/", "")
historyUrl = "http://en.wikipedia.org/w/index.php?title=" + pageUrl + "&action=history"
print("History url is: " + historyUrl)
aRes = requests.get(historyUrl)
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# 只找class是mw-anonuserlink的連結,裡面是IP位址而不是帳號
ipAddresses = aBeaSou.findAll("a", {"class": "mw-anonuserlink"})
addressList = set()
for ipAddress in ipAddresses:
addressList.add(ipAddress.get_text())
return addressList
def getCountry(historyIP):
# This API endpoint is deprecated and will stop working on July 1st, 2018. For more information please visit: https://github.com/apilayer/freegeoip#readme
aRes = requests.get("http://freegeoip.net/json/" + historyIP)
aRes.raise_for_status()
stringOfJsonData = aRes.text
pythonValue = json.loads(stringOfJsonData)
return pythonValue["country_code"]
links = getLinks("/wiki/Python_(programming_language)")
while len(links) > 0:
for link in links:
historyIPs = getHistoryIPs(link.attrs["href"])
for historyIP in historyIPs:
country = getCountry(historyIP)
print(historyIP + " is from " + country)
aNewLink = links[random.randint(0, len(links) - 1)].attrs["href"]
links = getLinks(aNewLink)
第五章:儲存資料
擷取檔案
l import requests, bs4, os
from urllib.request import urlretrieve
dirName = "pythonScraping"
os.makedirs(dirName, exist_ok = True)
aRes = requests.get("http://www.pythonscraping.com")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# 擷取單一檔案
imageLocation = aBeaSou.find("a", {"id": "logo"}).find("img")["src"]
urlretrieve(imageLocation, os.path.join(dirName, "logo.jpg"))
# 擷取所有檔案
fileLoactionList = aBeaSou.findAll(src = True)
for i in fileLoactionList:
aFileLocation = i["src"]
try:
print("Try to retrieve: " + aFileLocation)
urlretrieve(aFileLocation, os.path.join(dirName, os.path.basename(aFileLocation)))
except:
print("Cannot retrieve: " + aFileLocation)
from urllib.request import urlretrieve
dirName = "pythonScraping"
os.makedirs(dirName, exist_ok = True)
aRes = requests.get("http://www.pythonscraping.com")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
# 擷取單一檔案
imageLocation = aBeaSou.find("a", {"id": "logo"}).find("img")["src"]
urlretrieve(imageLocation, os.path.join(dirName, "logo.jpg"))
# 擷取所有檔案
fileLoactionList = aBeaSou.findAll(src = True)
for i in fileLoactionList:
aFileLocation = i["src"]
try:
print("Try to retrieve: " + aFileLocation)
urlretrieve(aFileLocation, os.path.join(dirName, os.path.basename(aFileLocation)))
except:
print("Cannot retrieve: " + aFileLocation)
整合Scrapers與CSV
l import requests, bs4, csv
aRes = requests.get("http://en.wikipedia.org/wiki/Comparison_of_text_editors")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
rows = aBeaSou.find("table", {"class": "wikitable"}).findAll("tr")
aFile = open("filesEditors.csv", "w", newline = "", encoding = "utf-8")
aWriter = csv.writer(aFile)
for row in rows:
csvRow = []
for cell in row.findAll(["td", "th"]):
csvRow.append(cell.get_text())
aWriter.writerow(csvRow)
aFile.close()
aRes = requests.get("http://en.wikipedia.org/wiki/Comparison_of_text_editors")
aRes.raise_for_status()
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
rows = aBeaSou.find("table", {"class": "wikitable"}).findAll("tr")
aFile = open("filesEditors.csv", "w", newline = "", encoding = "utf-8")
aWriter = csv.writer(aFile)
for row in rows:
csvRow = []
for cell in row.findAll(["td", "th"]):
csvRow.append(cell.get_text())
aWriter.writerow(csvRow)
aFile.close()
MySQL開放原始碼關聯式資料庫管理系統
l 下載、安裝MySQL及其相關工具:
在Windows上安裝MySQL,Setup Type建議選擇Server Only以避免安裝很多額外的Microsoft軟體與函式庫,若選擇Server Only,Configure時必須選擇Server Computer:
MySQL
MySQL Workbench與phpMyAdmin之類的工具讓快速檢視、排序與新增資料變得很簡單:
MySQL Workbench
phpMyAdmin
在Windows上安裝MySQL,Setup Type建議選擇Server Only以避免安裝很多額外的Microsoft軟體與函式庫,若選擇Server Only,Configure時必須選擇Server Computer:
MySQL
MySQL Workbench與phpMyAdmin之類的工具讓快速檢視、排序與新增資料變得很簡單:
MySQL Workbench
phpMyAdmin
l MySQL簡易命令列操作-新建資料庫、資料表:
-- 新建資料庫
CREATE DATABASE testDB;
-- 使用資料庫
USE testDB;
-- 新建資料表,MySQL的資料表不能沒有欄位,包含名稱、變數型別、其它可省略的屬性等,欄位最後定義資料表的鍵
CREATE TABLE testable (id BIGINT(7) NOT NULL AUTO_INCREMENT, title VARCHAR(200), content VARCHAR(10000), created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id));
-- 顯示資料表結構
DESCRIBE testTable;
-- 新建資料庫
CREATE DATABASE testDB;
-- 使用資料庫
USE testDB;
-- 新建資料表,MySQL的資料表不能沒有欄位,包含名稱、變數型別、其它可省略的屬性等,欄位最後定義資料表的鍵
CREATE TABLE testable (id BIGINT(7) NOT NULL AUTO_INCREMENT, title VARCHAR(200), content VARCHAR(10000), created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id));
-- 顯示資料表結構
DESCRIBE testTable;
l MySQL簡易命令列操作-查詢、新增、修改、刪除資料表紀錄:
-- 以SELECT為例,MySQL子句使用的順序
SELECT à FROM à WHERE à GROUP BY à HAVING à ORDER BY à LIMIT
-- SELECT查詢資料表紀錄
SELECT * FROM testTable WHERE title = "testTitle" AND content = "testContent";
SELECT * FROM testTable WHERE id BETWEEN 1 AND 5;
SELECT * FROM testTable WHERE id NOT IN (1, 3, 5);
SELECT * FROM testTable ORDER BY title DESC LIMIT 5;
SELECT id, title AS newTitle FROM testTable WHERE content LIKE "%test%";
SELECT DISTINCT title FROM testable;
-- INSERT新增資料表紀錄
INSERT INTO testTable (title, content) VALUES ("testTitle", "testContent");
-- UPDATE修改資料表紀錄
UPDATE testTable SET title = "newTitle", content = "newContent" WHERE id = 1;
-- DELETE刪除資料表紀錄
DELETE FROM testTable WHERE id = 1;
-- 建立與刪除索引
CREATE INDEX testIndex ON testTable (title, content(16));
ALTER TABLE testTable DROP INDEX testIndex;
-- 以SELECT為例,MySQL子句使用的順序
SELECT à FROM à WHERE à GROUP BY à HAVING à ORDER BY à LIMIT
-- SELECT查詢資料表紀錄
SELECT * FROM testTable WHERE title = "testTitle" AND content = "testContent";
SELECT * FROM testTable WHERE id BETWEEN 1 AND 5;
SELECT * FROM testTable WHERE id NOT IN (1, 3, 5);
SELECT * FROM testTable ORDER BY title DESC LIMIT 5;
SELECT id, title AS newTitle FROM testTable WHERE content LIKE "%test%";
SELECT DISTINCT title FROM testable;
-- INSERT新增資料表紀錄
INSERT INTO testTable (title, content) VALUES ("testTitle", "testContent");
-- UPDATE修改資料表紀錄
UPDATE testTable SET title = "newTitle", content = "newContent" WHERE id = 1;
-- DELETE刪除資料表紀錄
DELETE FROM testTable WHERE id = 1;
-- 建立與刪除索引
CREATE INDEX testIndex ON testTable (title, content(16));
ALTER TABLE testTable DROP INDEX testIndex;
l MySQL簡易命令列操作-統計函式:
SELECT Min(id) FROM testTable;
SELECT Max(id) FROM testTable;
SELECT Avg(id) FROM testTable;
SELECT Sum(id) FROM testTable;
SELECT Count(id) FROM testTable;
SELECT Min(id) FROM testTable;
SELECT Max(id) FROM testTable;
SELECT Avg(id) FROM testTable;
SELECT Sum(id) FROM testTable;
SELECT Count(id) FROM testTable;
PyMySQL模組
l 在命令提示字元輸入pip install PyMySQL,下載並安裝PyMySQL。
l 一個連線物件(Connection)可以擁有很多游標物件(Cursor),游標物件負責追蹤特定的狀態資訊:
import pymysql
aConn = pymysql.connect(host = "127.0.0.1", user = "root", passwd = "myPassword", port = 3306)
aCurs = aConn.cursor()
aCurs.execute("USE testDB")
# 取得資料
aCurs.execute("SELECT * FROM testTable WHERE id = 1")
print(aCurs.fetchone())
# 取得資料筆數
aCurs.execute("SELECT * FROM testTable")
print(aCurs.rowcount)
# 取得最後資料編號
aCurs.execute("INSERT INTO testTable (title, content) VALUES (\"testTitle\", \"testContent\")")
aConn.commit()
print(aCurs.lastrowid)
aCurs.close()
aConn.close()
import pymysql
aConn = pymysql.connect(host = "127.0.0.1", user = "root", passwd = "myPassword", port = 3306)
aCurs = aConn.cursor()
aCurs.execute("USE testDB")
# 取得資料
aCurs.execute("SELECT * FROM testTable WHERE id = 1")
print(aCurs.fetchone())
# 取得資料筆數
aCurs.execute("SELECT * FROM testTable")
print(aCurs.rowcount)
# 取得最後資料編號
aCurs.execute("INSERT INTO testTable (title, content) VALUES (\"testTitle\", \"testContent\")")
aConn.commit()
print(aCurs.lastrowid)
aCurs.close()
aConn.close()
整合Scrapers與MySQL
l Wikipedia資料存到MySQL,MySQL的設定:
MySQL預設不會處理Unicode,因此需要開啟處理Unicode的功能,把資料庫、資料表的預設字集從utf8mb4改成utf8mb4_unicode_ci。
ALTER DATABASE testDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
ALTER TABLE testTable CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
MySQL預設不會處理Unicode,因此需要開啟處理Unicode的功能,把資料庫、資料表的預設字集從utf8mb4改成utf8mb4_unicode_ci。
ALTER DATABASE testDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
ALTER TABLE testTable CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
l Wikipedia資料存到MySQL,Python程式碼:
import requests, bs4, random, datetime, pymysql, re
random.seed(datetime.datetime.now())
aConn = pymysql.connect(host = "127.0.0.1", user = "root", passwd = "myPassword", port = 3306, charset = "utf8")
aCurs = aConn.cursor()
aCurs.execute("USE testDB")
def store(title, content):
aCurs.execute("INSERT INTO testTable (title, content) VALUES (\"%s\", \"%s\")", (title, content))
aCurs.connection.commit()
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
title = aBeaSou.find("h1").get_text()
content = aBeaSou.find("div", {"id": "mw-content-text"}).find("p").get_text()
store(title, content)
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Keanu_Reeves")
try:
while len(links) > 0:
aRandomLink = links[random.randint(0, len(links) - 1)].attrs["href"]
print(aRandomLink)
links = getLinks(aRandomLink)
finally:
# 無論如何,游標跟連線都會在程式結束前關掉
aCurs.close()
aConn.close()
import requests, bs4, random, datetime, pymysql, re
random.seed(datetime.datetime.now())
aConn = pymysql.connect(host = "127.0.0.1", user = "root", passwd = "myPassword", port = 3306, charset = "utf8")
aCurs = aConn.cursor()
aCurs.execute("USE testDB")
def store(title, content):
aCurs.execute("INSERT INTO testTable (title, content) VALUES (\"%s\", \"%s\")", (title, content))
aCurs.connection.commit()
def getLinks(pageUrl):
aRes = requests.get("http://en.wikipedia.org" + pageUrl)
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
title = aBeaSou.find("h1").get_text()
content = aBeaSou.find("div", {"id": "mw-content-text"}).find("p").get_text()
store(title, content)
return aBeaSou.find("div", {"id": "bodyContent"}).findAll("a", href = re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Keanu_Reeves")
try:
while len(links) > 0:
aRandomLink = links[random.randint(0, len(links) - 1)].attrs["href"]
print(aRandomLink)
links = getLinks(aRandomLink)
finally:
# 無論如何,游標跟連線都會在程式結束前關掉
aCurs.close()
aConn.close()
第六章:讀取文件
讀取純文字
l Python試著把文件以ASCII來解讀,而瀏覽器則試著把文件以ISO-8859-1來解讀,以下範例文件(法文、斯拉夫文)應該以UTF-8來解讀:
# 方法一,本書作者讀取網頁方式-urllib
from urllib.request import urlopen
aPage = urlopen("http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
print(str(aPage.read(), "utf-8"))
# 方法二,自己建議讀取網頁方式-requests
import requests
aRes = requests.get("http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
aRes.raise_for_status()
aRes.encoding = "utf-8"
print(aRes.text)
# 方法一,本書作者讀取網頁方式-urllib
from urllib.request import urlopen
aPage = urlopen("http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
print(str(aPage.read(), "utf-8"))
# 方法二,自己建議讀取網頁方式-requests
import requests
aRes = requests.get("http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
aRes.raise_for_status()
aRes.encoding = "utf-8"
print(aRes.text)
l 讀取網頁時,配合HTML頁面內容的meta標籤,選用它指定的編碼方式:
# 方法一,本書作者讀取網頁方式-urllib
from urllib.request import urlopen
import bs4
aPage = urlopen("http://en.wikipedia.org/wiki/Python_(programming_language)")
aBeaSou = bs4.BeautifulSoup(aPage, "html.parser")
content = aBeaSou.find("div", {"id": "mw-content-text"}).get_text()
content = bytes(content, "UTF-8")
content = content.decode("UTF-8")
print(content)
# 方法二,自己建議讀取網頁方式-requests
import requests, bs4
aRes = requests.get("http://en.wikipedia.org/wiki/Python_(programming_language)")
aRes.raise_for_status()
aRes.encoding = "utf-8"
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
content = aBeaSou.find("div", {"id": "mw-content-text"}).get_text()
print(content)
# 方法一,本書作者讀取網頁方式-urllib
from urllib.request import urlopen
import bs4
aPage = urlopen("http://en.wikipedia.org/wiki/Python_(programming_language)")
aBeaSou = bs4.BeautifulSoup(aPage, "html.parser")
content = aBeaSou.find("div", {"id": "mw-content-text"}).get_text()
content = bytes(content, "UTF-8")
content = content.decode("UTF-8")
print(content)
# 方法二,自己建議讀取網頁方式-requests
import requests, bs4
aRes = requests.get("http://en.wikipedia.org/wiki/Python_(programming_language)")
aRes.raise_for_status()
aRes.encoding = "utf-8"
aBeaSou = bs4.BeautifulSoup(aRes.text, "html.parser")
content = aBeaSou.find("div", {"id": "mw-content-text"}).get_text()
print(content)
讀取CSV
l 把資料以字串的形式載入,然後包到一個物件裡,讓Python能夠把它視為檔案處理而不實際把資料存成檔案,會是比較好的做法:
import requests, io, csv
aRes = requests.get("http://pythonscraping.com/files/MontyPythonAlbums.csv")
aRes.raise_for_status()
# csv.reader會以list物件的形式回傳內容
aFile = io.StringIO(aRes.text)
aReader = csv.reader(aFile)
for i in aReader:
print(i)
# csv.DictReader會以dict物件的形式回傳內容,欄位名稱儲存在.fieldnames之內
aFile = io.StringIO(aRes.text)
aDictReader = csv.DictReader(aFile)
print(aDictReader.fieldnames)
for i in aDictReader:
print(i)
import requests, io, csv
aRes = requests.get("http://pythonscraping.com/files/MontyPythonAlbums.csv")
aRes.raise_for_status()
# csv.reader會以list物件的形式回傳內容
aFile = io.StringIO(aRes.text)
aReader = csv.reader(aFile)
for i in aReader:
print(i)
# csv.DictReader會以dict物件的形式回傳內容,欄位名稱儲存在.fieldnames之內
aFile = io.StringIO(aRes.text)
aDictReader = csv.DictReader(aFile)
print(aDictReader.fieldnames)
for i in aDictReader:
print(i)
讀取PDF
l 在命令提示字元輸入pip install PDFMiner3K,下載並安裝PDFMiner3K。
l 在命令提示字元輸入pip install PyPDF2,下載並安裝PyPDF2。
l # 方法一,本書作者讀取網頁方式-pdfminer
from urllib.request import urlopen
from io import StringIO
from pdfminer.pdfinterp import PDFResourceManager, process_pdf
from pdfminer.layout import LAParams
from pdfminer.converter import TextConverter
def readPDF(pdfFile):
rsrcmgr = PDFResourceManager()
retstr = StringIO()
laparams = LAParams()
device = TextConverter(rsrcmgr, retstr, laparams = laparams)
process_pdf(rsrcmgr, device, pdfFile)
device.close()
content = retstr.getvalue()
retstr.close()
return content
pdfFile = urlopen("http://pythonscraping.com/pages/warandpeace/chapter1.pdf")
content = readPDF(pdfFile)
print(content)
# 方法二,自己建議讀取網頁方式-PyPDF2
import requests, io, PyPDF2
aRes = requests.get("http://pythonscraping.com/pages/warandpeace/chapter1.pdf")
aRes.raise_for_status()
aFile = io.BytesIO(aRes.content)
aReader = PyPDF2.PdfFileReader(aFile)
for i in range(aReader.numPages):
aPage = aReader.getPage(i)
print(aPage.extractText())
from urllib.request import urlopen
from io import StringIO
from pdfminer.pdfinterp import PDFResourceManager, process_pdf
from pdfminer.layout import LAParams
from pdfminer.converter import TextConverter
def readPDF(pdfFile):
rsrcmgr = PDFResourceManager()
retstr = StringIO()
laparams = LAParams()
device = TextConverter(rsrcmgr, retstr, laparams = laparams)
process_pdf(rsrcmgr, device, pdfFile)
device.close()
content = retstr.getvalue()
retstr.close()
return content
pdfFile = urlopen("http://pythonscraping.com/pages/warandpeace/chapter1.pdf")
content = readPDF(pdfFile)
print(content)
# 方法二,自己建議讀取網頁方式-PyPDF2
import requests, io, PyPDF2
aRes = requests.get("http://pythonscraping.com/pages/warandpeace/chapter1.pdf")
aRes.raise_for_status()
aFile = io.BytesIO(aRes.content)
aReader = PyPDF2.PdfFileReader(aFile)
for i in range(aReader.numPages):
aPage = aReader.getPage(i)
print(aPage.extractText())
讀取DOCX
l 所有docx檔都以Zip壓縮來節省空間,解壓縮出來的是xml檔,文件裡所有文字都放在<w:t>標籤裡:
import requests, bs4, io, zipfile
aRes = requests.get("http://pythonscraping.com/pages/AWordDocument.docx")
aRes.raise_for_status()
aFile = io.BytesIO(aRes.content)
aXml = zipfile.ZipFile(aFile)
xmlContent = aXml.read("word/document.xml")
aBeaSou = bs4.BeautifulSoup(xmlContent.decode("utf-8"), "lxml-xml")
for i in aBeaSou.findAll("w:t"):
print(i.get_text())
import requests, bs4, io, zipfile
aRes = requests.get("http://pythonscraping.com/pages/AWordDocument.docx")
aRes.raise_for_status()
aFile = io.BytesIO(aRes.content)
aXml = zipfile.ZipFile(aFile)
xmlContent = aXml.read("word/document.xml")
aBeaSou = bs4.BeautifulSoup(xmlContent.decode("utf-8"), "lxml-xml")
for i in aBeaSou.findAll("w:t"):
print(i.get_text())
沒有留言:
張貼留言