Merge pull request #98 from nlevitt/trough-dedup-bugs

WIP: trough dedup bug fix
This commit is contained in:
Noah Levitt 2018-07-19 11:17:19 -05:00 committed by GitHub
commit f32d5636a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 18 deletions

View File

@ -51,6 +51,7 @@ import gzip
import mock
import email.message
import socketserver
from concurrent import futures
try:
import http.server as http_server
@ -886,6 +887,57 @@ def test_dedup_buckets(https_daemon, http_daemon, warcprox_, archiving_proxies,
finally:
fh.close()
def test_dedup_bucket_concurrency(https_daemon, http_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls
revisits_before = warcprox_.proxy.stats_db.value(
'__all__', 'revisit', 'urls') or 0
# fire off 20 initial requests simultaneously-ish
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 20)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before
# fire off 20 requests to the same urls but different buckets
# none should be deduped
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, -i - 1)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 40)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before
# fire off 20 requests same as the initial requests, all should be deduped
with futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
url = 'http://localhost:%s/test_dedup_bucket_concurrency/%s' % (
http_daemon.server_port, i)
headers = {"Warcprox-Meta": json.dumps({
"warc-prefix":"test_dedup_buckets",
"dedup-bucket":"bucket_%s" % i})}
pool.submit(
requests.get, url, proxies=archiving_proxies, verify=False,
headers=headers)
wait(lambda: warcprox_.proxy.running_stats.urls - urls_before == 60)
assert warcprox_.proxy.stats_db.value('__all__', 'revisit', 'urls') == revisits_before + 20
def test_block_rules(http_daemon, https_daemon, warcprox_, archiving_proxies):
urls_before = warcprox_.proxy.running_stats.urls

View File

@ -148,6 +148,8 @@ class BasePostfetchProcessor(threading.Thread):
raise Exception('not implemented')
def _run(self):
threading.current_thread().name = '%s(tid=%s)' % (
threading.current_thread().name, gettid())
self.logger.info('%s starting up', self)
self._startup()
while not self.stop.is_set():

View File

@ -405,7 +405,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
recorded_url.payload_digest, self.options.base32)
if recorded_url.payload_digest else 'n/a')
self.logger.debug(
'filtered out digests (not loading dedup): %r', discards)
'len(batch)=%s len(discards)=%s buckets=%s',
len(batch), len(discards),
{bucket: len(buckets[bucket]) for bucket in buckets})
return buckets
def _build_key_index(self, batch):
@ -432,11 +434,12 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
fs = {}
with futures.ThreadPoolExecutor(max_workers=len(buckets)) as pool:
# send off the trough requests in parallel
key_indexes = {}
for bucket in buckets:
key_index = self._build_key_index(buckets[bucket])
key_indexes[bucket] = self._build_key_index(buckets[bucket])
future = pool.submit(
self.trough_dedup_db.batch_lookup,
key_index.keys(), bucket)
key_indexes[bucket].keys(), bucket)
fs[future] = bucket
# process results as they come back
@ -444,6 +447,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
for future in futures.as_completed(fs, timeout=20):
bucket = fs[future]
try:
key_index = key_indexes[bucket]
for entry in future.result():
for recorded_url in key_index[entry['digest_key']]:
recorded_url.dedup_info = entry
@ -459,8 +463,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor):
novel = sorted([
k for k in key_index.keys() if k not in dups])
self.logger.debug(
'bucket %s: dups=%r novel=%r',
bucket, dups, novel)
'bucket %s: dups(%s)=%r novel(%s)=%r',
bucket, len(dups), dups, len(novel), novel)
except futures.TimeoutError as e:
# the remaining threads actually keep running in this case,

View File

@ -204,13 +204,14 @@ class WarcWriter:
record.offset = offset
record.length = warc.f.tell() - offset
record.warc_filename = warc.finalname
self.logger.debug(
self.logger.trace(
'wrote warc record: warc_type=%s content_length=%s '
'url=%s warc=%s offset=%d',
record.get_header(warctools.WarcRecord.TYPE),
'digest=%s offset=%d warc=%s url=%s',
record.type,
record.get_header(warctools.WarcRecord.CONTENT_LENGTH),
record.get_header(warctools.WarcRecord.URL),
warc.path, record.offset)
record.get_header(b'WARC-Payload-Digest'),
record.offset, warc.path,
record.get_header(warctools.WarcRecord.URL))
return records

View File

@ -117,19 +117,18 @@ class WarcWriterProcessor(warcprox.BaseStandardPostfetchProcessor):
and self._filter_accepts(recorded_url))
def _log(self, recorded_url, records):
try:
payload_digest = records[0].get_header('WARC-Payload-Digest').decode("utf-8")
except:
payload_digest = "-"
# 2015-07-17T22:32:23.672Z 1 58 dns:www.dhss.delaware.gov P http://www.dhss.delaware.gov/dhss/ text/dns #045 20150717223214881+316 sha1:63UTPB7GTWIHAGIK3WWL76E57BBTJGAK http://www.dhss.delaware.gov/dhss/ - {"warcFileOffset":2964,"warcFilename":"ARCHIVEIT-1303-WEEKLY-JOB165158-20150717223222113-00000.warc.gz"}
type_ = records[0].type.decode("utf-8") if records else '-'
try:
payload_digest = records[0].get_header(b'WARC-Payload-Digest').decode('utf-8')
except:
payload_digest = '-'
type_ = records[0].type.decode('utf-8') if records else '-'
filename = records[0].warc_filename if records else '-'
offset = records[0].offset if records else '-'
self.logger.info(
"%s %s %s %s %s size=%s %s %s %s offset=%s",
'%s %s %s %s %s size=%s %s %s %s offset=%s',
recorded_url.client_ip, recorded_url.status,
recorded_url.method, recorded_url.url.decode("utf-8"),
recorded_url.method, recorded_url.url.decode('utf-8'),
recorded_url.mimetype, recorded_url.size, payload_digest,
type_, filename, offset)