1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
#!/usr/bin/env python3
import logging
import re
import signal
import sys
import asyncio
import urllib.parse
import aiohttp
class Crawler:
def __init__(self, rooturl, loop, maxtasks=100):
self.rooturl = rooturl
self.loop = loop
self.todo = set()
self.busy = set()
self.done = {}
self.tasks = set()
self.sem = asyncio.Semaphore(maxtasks)
# connector stores cookies between requests and uses connection pool
self.connector = aiohttp.TCPConnector(share_cookies=True, loop=loop)
@asyncio.coroutine
def run(self):
asyncio.Task(self.addurls([(self.rooturl, '')])) # Set initial work.
yield from asyncio.sleep(1)
while self.busy:
yield from asyncio.sleep(1)
self.connector.close()
self.loop.stop()
@asyncio.coroutine
def addurls(self, urls):
for url, parenturl in urls:
url = urllib.parse.urljoin(parenturl, url)
url, frag = urllib.parse.urldefrag(url)
if (url.startswith(self.rooturl) and
url not in self.busy and
url not in self.done and
url not in self.todo):
self.todo.add(url)
yield from self.sem.acquire()
task = asyncio.Task(self.process(url))
task.add_done_callback(lambda t: self.sem.release())
task.add_done_callback(self.tasks.remove)
self.tasks.add(task)
@asyncio.coroutine
def process(self, url):
print('processing:', url)
self.todo.remove(url)
self.busy.add(url)
try:
resp = yield from aiohttp.request(
'get', url, connector=self.connector)
except Exception as exc:
print('...', url, 'has error', repr(str(exc)))
self.done[url] = False
else:
if (resp.status == 200 and
('text/html' in resp.headers.get('content-type'))):
data = (yield from resp.read()).decode('utf-8', 'replace')
urls = re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', data)
asyncio.Task(self.addurls([(u, url) for u in urls]))
resp.close()
self.done[url] = True
self.busy.remove(url)
print(len(self.done), 'completed tasks,', len(self.tasks),
'still pending, todo', len(self.todo))
def main():
loop = asyncio.get_event_loop()
c = Crawler(sys.argv[1], loop)
asyncio.Task(c.run())
try:
loop.add_signal_handler(signal.SIGINT, loop.stop)
except RuntimeError:
pass
loop.run_forever()
print('todo:', len(c.todo))
print('busy:', len(c.busy))
print('done:', len(c.done), '; ok:', sum(c.done.values()))
print('tasks:', len(c.tasks))
if __name__ == '__main__':
if '--iocp' in sys.argv:
from asyncio import events, windows_events
sys.argv.remove('--iocp')
logging.info('using iocp')
el = windows_events.ProactorEventLoop()
events.set_event_loop(el)
main()
|