Skip to content

Commit ace44ee

Browse files
committed
ch21 examples
1 parent 93bb440 commit ace44ee

16 files changed

+854
-0
lines changed

21-futures/demo_executor_map.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""
2+
Experiment with ``ThreadPoolExecutor.map``
3+
"""
4+
# tag::EXECUTOR_MAP[]
5+
from time import sleep, strftime
6+
from concurrent import futures
7+
8+
def display(*args): # <1>
9+
print(strftime('[%H:%M:%S]'), end=' ')
10+
print(*args)
11+
12+
def loiter(n): # <2>
13+
msg = '{}loiter({}): doing nothing for {}s...'
14+
display(msg.format('\t'*n, n, n))
15+
sleep(n)
16+
msg = '{}loiter({}): done.'
17+
display(msg.format('\t'*n, n))
18+
return n * 10 # <3>
19+
20+
def main():
21+
display('Script starting.')
22+
executor = futures.ThreadPoolExecutor(max_workers=3) # <4>
23+
results = executor.map(loiter, range(5)) # <5>
24+
display('results:', results) # <6>
25+
display('Waiting for individual results:')
26+
for i, result in enumerate(results): # <7>
27+
display('result {}: {}'.format(i, result))
28+
29+
if __name__ == '__main__':
30+
main()
31+
# end::EXECUTOR_MAP[]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT
2+
BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE
3+
EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID
4+
IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK
5+
LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA
6+
NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB
7+
SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR
8+
TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.gif

21-futures/getflags/flags.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env python3
2+
3+
"""Download flags of top 20 countries by population
4+
5+
Sequential version
6+
7+
Sample runs (first with new domain, so no caching ever)::
8+
9+
$ ./flags.py
10+
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
11+
20 downloads in 26.21s
12+
$ ./flags.py
13+
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
14+
20 downloads in 14.57s
15+
16+
17+
"""
18+
19+
# tag::FLAGS_PY[]
20+
import os
21+
import time
22+
from typing import Callable
23+
24+
import requests # <1>
25+
26+
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
27+
'MX PH VN ET EG DE IR TR CD FR').split() # <2>
28+
29+
BASE_URL = 'http://fluentpython.com/data/flags' # <3>
30+
DEST_DIR = 'downloaded/' # <4>
31+
32+
def save_flag(img: bytes, filename: str) -> None: # <5>
33+
path = os.path.join(DEST_DIR, filename)
34+
with open(path, 'wb') as fp:
35+
fp.write(img)
36+
37+
def get_flag(cc: str) -> bytes: # <6>
38+
cc = cc.lower()
39+
url = f'{BASE_URL}/{cc}/{cc}.gif'
40+
resp = requests.get(url)
41+
return resp.content
42+
43+
def download_many(cc_list: list[str]) -> int: # <7>
44+
for cc in sorted(cc_list): # <8>
45+
image = get_flag(cc)
46+
print(cc, end=' ', flush=True) # <9>
47+
save_flag(image, cc.lower() + '.gif')
48+
49+
return len(cc_list)
50+
51+
def main(downloader: Callable[[list[str]], int]) -> None: # <10>
52+
t0 = time.perf_counter() # <11>
53+
count = downloader(POP20_CC)
54+
elapsed = time.perf_counter() - t0
55+
print(f'\n{count} downloads in {elapsed:.2f}s')
56+
57+
if __name__ == '__main__':
58+
main(download_many) # <12>
59+
# end::FLAGS_PY[]

21-futures/getflags/flags.zip

2.26 MB
Binary file not shown.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env python3
2+
3+
"""Download flags of countries (with error handling).
4+
5+
asyncio async/await version
6+
7+
"""
8+
# BEGIN FLAGS2_ASYNCIO_TOP
9+
import asyncio
10+
from collections import Counter
11+
12+
import aiohttp
13+
from aiohttp import web
14+
from aiohttp.http_exceptions import HttpProcessingError
15+
import tqdm # type: ignore
16+
17+
from flags2_common import main, HTTPStatus, Result, save_flag
18+
19+
# default set low to avoid errors from remote site, such as
20+
# 503 - Service Temporarily Unavailable
21+
DEFAULT_CONCUR_REQ = 5
22+
MAX_CONCUR_REQ = 1000
23+
24+
25+
class FetchError(Exception): # <1>
26+
def __init__(self, country_code):
27+
self.country_code = country_code
28+
29+
30+
async def get_flag(session, base_url, cc): # <2>
31+
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
32+
async with session.get(url) as resp:
33+
if resp.status == 200:
34+
return await resp.read()
35+
elif resp.status == 404:
36+
raise web.HTTPNotFound()
37+
else:
38+
raise HttpProcessingError(
39+
code=resp.status, message=resp.reason,
40+
headers=resp.headers)
41+
42+
43+
async def download_one(session, cc, base_url, semaphore, verbose): # <3>
44+
try:
45+
async with semaphore: # <4>
46+
image = await get_flag(session, base_url, cc) # <5>
47+
except web.HTTPNotFound: # <6>
48+
status = HTTPStatus.not_found
49+
msg = 'not found'
50+
except Exception as exc:
51+
raise FetchError(cc) from exc # <7>
52+
else:
53+
save_flag(image, cc.lower() + '.gif') # <8>
54+
status = HTTPStatus.ok
55+
msg = 'OK'
56+
57+
if verbose and msg:
58+
print(cc, msg)
59+
60+
return Result(status, cc)
61+
# END FLAGS2_ASYNCIO_TOP
62+
63+
# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
64+
async def downloader_coro(cc_list: list[str],
65+
base_url: str,
66+
verbose: bool,
67+
concur_req: int) -> Counter[HTTPStatus]: # <1>
68+
counter: Counter[HTTPStatus] = Counter()
69+
semaphore = asyncio.Semaphore(concur_req) # <2>
70+
async with aiohttp.ClientSession() as session: # <8>
71+
to_do = [download_one(session, cc, base_url, semaphore, verbose)
72+
for cc in sorted(cc_list)] # <3>
73+
74+
to_do_iter = asyncio.as_completed(to_do) # <4>
75+
if not verbose:
76+
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
77+
for future in to_do_iter: # <6>
78+
try:
79+
res = await future # <7>
80+
except FetchError as exc: # <8>
81+
country_code = exc.country_code # <9>
82+
try:
83+
if exc.__cause__ is None:
84+
error_msg = 'Unknown cause'
85+
else:
86+
error_msg = exc.__cause__.args[0] # <10>
87+
except IndexError:
88+
error_msg = exc.__cause__.__class__.__name__ # <11>
89+
if verbose and error_msg:
90+
msg = '*** Error for {}: {}'
91+
print(msg.format(country_code, error_msg))
92+
status = HTTPStatus.error
93+
else:
94+
status = res.status
95+
96+
counter[status] += 1 # <12>
97+
98+
return counter # <13>
99+
100+
101+
def download_many(cc_list: list[str],
102+
base_url: str,
103+
verbose: bool,
104+
concur_req: int) -> Counter[HTTPStatus]:
105+
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
106+
counts = asyncio.run(coro) # <14>
107+
108+
return counts
109+
110+
111+
if __name__ == '__main__':
112+
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
113+
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""Utilities for second set of flag examples.
2+
"""
3+
4+
import os
5+
import time
6+
import sys
7+
import string
8+
import argparse
9+
from collections import namedtuple, Counter
10+
from enum import Enum
11+
12+
13+
Result = namedtuple('Result', 'status data')
14+
15+
HTTPStatus = Enum('HTTPStatus', 'ok not_found error')
16+
17+
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
18+
'MX PH VN ET EG DE IR TR CD FR').split()
19+
20+
DEFAULT_CONCUR_REQ = 1
21+
MAX_CONCUR_REQ = 1
22+
23+
SERVERS = {
24+
'REMOTE': 'http://fluentpython.com/data/flags',
25+
'LOCAL': 'http://localhost:8000/flags',
26+
'DELAY': 'http://localhost:8001/flags',
27+
'ERROR': 'http://localhost:8002/flags',
28+
}
29+
DEFAULT_SERVER = 'LOCAL'
30+
31+
DEST_DIR = 'downloaded/'
32+
COUNTRY_CODES_FILE = 'country_codes.txt'
33+
34+
35+
def save_flag(img: bytes, filename: str) -> None:
36+
path = os.path.join(DEST_DIR, filename)
37+
with open(path, 'wb') as fp:
38+
fp.write(img)
39+
40+
41+
def initial_report(cc_list: list[str],
42+
actual_req: int,
43+
server_label: str) -> None:
44+
if len(cc_list) <= 10:
45+
cc_msg = ', '.join(cc_list)
46+
else:
47+
cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
48+
print('{} site: {}'.format(server_label, SERVERS[server_label]))
49+
msg = 'Searching for {} flag{}: {}'
50+
plural = 's' if len(cc_list) != 1 else ''
51+
print(msg.format(len(cc_list), plural, cc_msg))
52+
plural = 's' if actual_req != 1 else ''
53+
msg = '{} concurrent connection{} will be used.'
54+
print(msg.format(actual_req, plural))
55+
56+
57+
def final_report(cc_list: list[str],
58+
counter: Counter[HTTPStatus],
59+
start_time: float) -> None:
60+
elapsed = time.time() - start_time
61+
print('-' * 20)
62+
msg = '{} flag{} downloaded.'
63+
plural = 's' if counter[HTTPStatus.ok] != 1 else ''
64+
print(msg.format(counter[HTTPStatus.ok], plural))
65+
if counter[HTTPStatus.not_found]:
66+
print(counter[HTTPStatus.not_found], 'not found.')
67+
if counter[HTTPStatus.error]:
68+
plural = 's' if counter[HTTPStatus.error] != 1 else ''
69+
print('{} error{}.'.format(counter[HTTPStatus.error], plural))
70+
print('Elapsed time: {:.2f}s'.format(elapsed))
71+
72+
73+
def expand_cc_args(every_cc: bool,
74+
all_cc: bool,
75+
cc_args: list[str],
76+
limit: int) -> list[str]:
77+
codes: set[str] = set()
78+
A_Z = string.ascii_uppercase
79+
if every_cc:
80+
codes.update(a+b for a in A_Z for b in A_Z)
81+
elif all_cc:
82+
with open(COUNTRY_CODES_FILE) as fp:
83+
text = fp.read()
84+
codes.update(text.split())
85+
else:
86+
for cc in (c.upper() for c in cc_args):
87+
if len(cc) == 1 and cc in A_Z:
88+
codes.update(cc+c for c in A_Z)
89+
elif len(cc) == 2 and all(c in A_Z for c in cc):
90+
codes.add(cc)
91+
else:
92+
msg = 'each CC argument must be A to Z or AA to ZZ.'
93+
raise ValueError('*** Usage error: '+msg)
94+
return sorted(codes)[:limit]
95+
96+
97+
def process_args(default_concur_req):
98+
server_options = ', '.join(sorted(SERVERS))
99+
parser = argparse.ArgumentParser(
100+
description='Download flags for country codes. '
101+
'Default: top 20 countries by population.')
102+
parser.add_argument('cc', metavar='CC', nargs='*',
103+
help='country code or 1st letter (eg. B for BA...BZ)')
104+
parser.add_argument('-a', '--all', action='store_true',
105+
help='get all available flags (AD to ZW)')
106+
parser.add_argument('-e', '--every', action='store_true',
107+
help='get flags for every possible code (AA...ZZ)')
108+
parser.add_argument('-l', '--limit', metavar='N', type=int,
109+
help='limit to N first codes', default=sys.maxsize)
110+
parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
111+
default=default_concur_req,
112+
help=f'maximum concurrent requests (default={default_concur_req})')
113+
parser.add_argument('-s', '--server', metavar='LABEL',
114+
default=DEFAULT_SERVER,
115+
help=('Server to hit; one of ' +
116+
f'{server_options} (default={DEFAULT_SERVER})'))
117+
parser.add_argument('-v', '--verbose', action='store_true',
118+
help='output detailed progress info')
119+
args = parser.parse_args()
120+
if args.max_req < 1:
121+
print('*** Usage error: --max_req CONCURRENT must be >= 1')
122+
parser.print_usage()
123+
sys.exit(1)
124+
if args.limit < 1:
125+
print('*** Usage error: --limit N must be >= 1')
126+
parser.print_usage()
127+
sys.exit(1)
128+
args.server = args.server.upper()
129+
if args.server not in SERVERS:
130+
print('*** Usage error: --server LABEL must be one of',
131+
server_options)
132+
parser.print_usage()
133+
sys.exit(1)
134+
try:
135+
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
136+
except ValueError as exc:
137+
print(exc.args[0])
138+
parser.print_usage()
139+
sys.exit(1)
140+
141+
if not cc_list:
142+
cc_list = sorted(POP20_CC)
143+
return args, cc_list
144+
145+
146+
def main(download_many, default_concur_req, max_concur_req):
147+
args, cc_list = process_args(default_concur_req)
148+
actual_req = min(args.max_req, max_concur_req, len(cc_list))
149+
initial_report(cc_list, actual_req, args.server)
150+
base_url = SERVERS[args.server]
151+
t0 = time.time()
152+
counter = download_many(cc_list, base_url, args.verbose, actual_req)
153+
assert sum(counter.values()) == len(cc_list), \
154+
'some downloads are unaccounted for'
155+
final_report(cc_list, counter, t0)

0 commit comments

Comments
 (0)