From ba14480a2df03f19b8a88aa09127691023bdd36d Mon Sep 17 00:00:00 2001 From: Christian Clauss Date: Wed, 12 Apr 2023 11:37:56 +0200 Subject: [PATCH 1/6] Delete .travis.yml --- .travis.yml | 65 ----------------------------------------------------- 1 file changed, 65 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index d188003..0000000 --- a/.travis.yml +++ /dev/null @@ -1,65 +0,0 @@ -os: linux -dist: xenial -language: python -python: -- 3.8 -- 3.7 -- 3.6 -- 3.5 -- pypy3.5 -- nightly - -jobs: - allow_failures: - - python: nightly - -addons: - apt: - packages: - - tor - -services: -- docker - -before_install: -- sudo service docker restart ; sleep 10 # https://github.com/travis-ci/travis-ci/issues/4778 -- docker network create --driver=bridge trough -- docker run --detach --network=trough --hostname=rethinkdb --name=rethinkdb --publish=28015:28015 rethinkdb -- docker run --detach --network=trough --hostname=hadoop --name=hadoop chalimartines/cdh5-pseudo-distributed -- docker run --detach --network=trough --hostname=trough --name=trough --volume="$PWD/tests/run-trough.sh:/run-trough.sh" --publish=6111:6111 --publish=6112:6112 --publish=6222:6222 --publish=6444:6444 python:3.6 bash /run-trough.sh -- cat /etc/hosts -- echo | sudo tee -a /etc/hosts # travis-ci default doesn't end with a newline 🙄 -- echo 127.0.0.1 rethinkdb | sudo tee -a /etc/hosts -- echo 127.0.0.1 hadoop | sudo tee -a /etc/hosts -- echo 127.0.0.1 trough | sudo tee -a /etc/hosts -- cat /etc/hosts -- ping -c2 trough - -install: -- pip install .[trough] pytest requests warcio mock - -before_script: -- docker exec trough bash -c 'while ! test -e /tmp/trough-read.out ; do sleep 0.5 ; done' || true -- docker logs --timestamps --details trough -- ps ww -fHe -- docker ps - -script: -- py.test -v --tb=native tests -- py.test -v --tb=native --rethinkdb-dedup-url=rethinkdb://localhost/test1/dedup tests -- py.test -v --tb=native --rethinkdb-big-table-url=rethinkdb://localhost/test2/captures tests -- py.test -v --tb=native --rethinkdb-trough-db-url=rethinkdb://localhost/trough_configuration tests - -after_script: -- ps ww -fHe -- docker exec trough cat /tmp/trough-write.out -- docker exec trough cat /tmp/trough-segment-manager-server.out -- docker exec trough cat /tmp/trough-segment-manager-local.out -- docker exec trough cat /tmp/trough-sync-server.out -- docker exec trough cat /tmp/trough-sync-local.out -- docker exec trough cat /tmp/trough-read.out - -notifications: - slack: - - secure: UJzNe+kEJ8QhNxrdqObroisJAO2ipr+Sr2+u1e2euQdIkacyX+nZ88jSk6uDKniAemSfFDI8Ty5a7++2wSbE//Hr3jOSNOJMZLzockafzvIYrq9bP7V97j1gQ4u7liWd19VBnbf0pULuwEfy/n5PdOBR/TiPrgMuYjfZseV+alo= - - secure: S1SK52178uywcWLMO4S5POdjMv1MQjR061CKprjVn2d8x5RBbg8QZtumA6Xt+pByvJzh8vk+ITHCN57tcdi51yL6Z0QauXwxwzTsZmjrhxWOybAO2uOHliqQSDgxKcbXIqJKg7Yv19eLQYWDVJVGuwlMfVBS0hOHtTTpVuLuGuc= From 1fd3b2c7a1fcc2b5a1b61ec1156812d05626dcdd Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 12 Apr 2023 11:44:01 -0700 Subject: [PATCH 2/6] =?UTF-8?q?update=20readme=20=E2=80=94=20rm=20travis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.rst b/README.rst index 59951ae..087dde1 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,5 @@ Warcprox - WARC writing MITM HTTP/S proxy ***************************************** -.. image:: https://travis-ci.org/internetarchive/warcprox.svg?branch=master - :target: https://travis-ci.org/internetarchive/warcprox Warcprox is an HTTP proxy designed for web archiving applications. When used in parallel with `brozzler `_ it From 507592041574f286a55c3b84d5ae5669606d1391 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Wed, 21 Jun 2023 17:25:41 -0700 Subject: [PATCH 3/6] limit revisits mixin --- warcprox/dedup.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 705b74a..289b252 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -1,7 +1,7 @@ ''' warcprox/dedup.py - identical payload digest deduplication using sqlite db -Copyright (C) 2013-2021 Internet Archive +Copyright (C) 2013-2023 Internet Archive This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -61,6 +61,35 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size +class LimitRevisitsMixin(object): + """ + Limit revisits recorded to one per revisit_key + """ + def __init__(self, options=warcprox.Options()): + self.revisits = collections.defaultdict(set) + # TODO: define logger? + + def limit_revisits(self, recorded_url, hash=None, revisit_key=None): + if not hash: + hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if not revisit_key: + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + hash_plus_url = b''.join((hash, recorded_url.url)) + if hash_plus_url in self.revisits[revisit_key]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + self.revisits[revisit_key].add(hash_plus_url) + return False + class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -398,7 +427,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warning( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin): logger = logging.getLogger("warcprox.dedup.BatchTroughLoader") def __init__(self, trough_dedup_db, options=warcprox.Options()): @@ -488,6 +517,9 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor): for entry in future.result(): for recorded_url in key_index[entry['digest_key']]: recorded_url.dedup_info = entry + if recorded_url.dedup_info: + recorded_url.do_not_archive = \ + self.limit_revisits(recorded_url) except Exception as e: # batch_lookup raised exception or something logging.warning( From 08f2903f14d015b06f62880c77c915da2638c84b Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Thu, 22 Jun 2023 19:29:53 -0700 Subject: [PATCH 4/6] LimitRevisitsPGMixin --- warcprox/dedup.py | 60 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index 289b252..d257c58 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -67,7 +67,6 @@ class LimitRevisitsMixin(object): """ def __init__(self, options=warcprox.Options()): self.revisits = collections.defaultdict(set) - # TODO: define logger? def limit_revisits(self, recorded_url, hash=None, revisit_key=None): if not hash: @@ -84,12 +83,65 @@ class LimitRevisitsMixin(object): hash_plus_url = b''.join((hash, recorded_url.url)) if hash_plus_url in self.revisits[revisit_key]: - logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) return True else: self.revisits[revisit_key].add(hash_plus_url) return False +class LimitRevisitsPGMixin(object): + """ + Limit revisits recorded to one per revisit_key + """ + def __init__(self, datasource, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # needed + self.datasource = datasource # "postgresql://user@db_host/db_name" + self.datatable = datatable # postgres table in db_name + # TODO here or elsewhere? verify partition table exists + + def limit_revisits(self, recorded_url, hash=None, revisit_key=None): + if not hash: + hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) + if not revisit_key: + # use ait-job-id if available + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + hash_plus_url = b''.join((hash, recorded_url.url)) + + query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + + if result[0]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = "INSERT INTO {revisit_key} VALUES(revisit_key, hash_plus_url);" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + class DedupLoader(warcprox.BaseStandardPostfetchProcessor, DedupableMixin): def __init__(self, dedup_db, options=warcprox.Options()): warcprox.BaseStandardPostfetchProcessor.__init__(self, options=options) @@ -427,11 +479,13 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor): logging.warning( 'timed out saving dedup info to trough', exc_info=True) -class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsMixin): +class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMixin): logger = logging.getLogger("warcprox.dedup.BatchTroughLoader") def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) + LimitRevisitsPGMixin.__init__(self, datasource="", options) + # TODO: get datasource self.trough_dedup_db = trough_dedup_db def _startup(self): From d9145eefb5de17c1e85d8de1ff85e5908198ff9f Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Mon, 26 Jun 2023 22:49:33 -0700 Subject: [PATCH 5/6] LimitRecords, more LimitRevisitsPGMixin --- warcprox/dedup.py | 107 ++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d257c58..d9f2934 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -61,34 +61,6 @@ class DedupableMixin(object): else: return recorded_url.response_recorder.payload_size() > self.min_binary_size -class LimitRevisitsMixin(object): - """ - Limit revisits recorded to one per revisit_key - """ - def __init__(self, options=warcprox.Options()): - self.revisits = collections.defaultdict(set) - - def limit_revisits(self, recorded_url, hash=None, revisit_key=None): - if not hash: - hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) - if not revisit_key: - if ( - recorded_url.warcprox_meta - and "metadata" in recorded_url.warcprox_meta - and "ait-job-id" in recorded_url.warcprox_meta["metadata"] - ): - revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] - else: - revisit_key = 'all' - - hash_plus_url = b''.join((hash, recorded_url.url)) - if hash_plus_url in self.revisits[revisit_key]: - logger.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) - return True - else: - self.revisits[revisit_key].add(hash_plus_url) - return False - class LimitRevisitsPGMixin(object): """ Limit revisits recorded to one per revisit_key @@ -97,12 +69,16 @@ class LimitRevisitsPGMixin(object): import psycopg2 from psycopg2 import extras # needed self.datasource = datasource # "postgresql://user@db_host/db_name" - self.datatable = datatable # postgres table in db_name - # TODO here or elsewhere? verify partition table exists + self.datatable = 'crawled_urls' # postgres table in db_name + + def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): + if not hash_plus_url: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) - def limit_revisits(self, recorded_url, hash=None, revisit_key=None): - if not hash: - hash = warcprox.digest_str(recorded_url.payload_digest, self.options.base32) if not revisit_key: # use ait-job-id if available if ( @@ -113,9 +89,67 @@ class LimitRevisitsPGMixin(object): revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] else: revisit_key = 'all' - hash_plus_url = b''.join((hash, recorded_url.url)) - query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + + try: + conn = psycopg2.connect(self.datasource) + except Exception as e: + self.logger.warning("db connection failure: %s", e) + return False + cur = conn.cursor() + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) + return False + result = cur.fetchone() + + if result[0]: + logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) + return True + else: + query = "INSERT INTO {self.datable} VALUES(revisit_key, hash_plus_url);" + try: + cur.execute(query) + except Exception as e: + self.logger.warning("exception inserting %s in %s: %s", hash_plus_url, revisit_key, e) + + return False + +class LimitRecords(object): + """ + Limit records to one per revisit_key, e.g., crawl id + """ + def __init__(self, datasource, options=warcprox.Options()): + import psycopg2 + from psycopg2 import extras # needed + self.datasource = datasource # "postgresql://user@db_host/db_name" + # self.datatable = revisit_key # set in limit_revisits method + # verify partition table exists + self.datatable = 'crawled_urls' # postgres table in db_name + + def limit_records(self, buckets): + for bucket in buckets: + for recorded_url in bucket: + hash_plus_url = b''.join( + (warcprox.digest_str(recorded_url.payload_digest, + self.options.base32), + recorded_url.url) + ) + + # use ait-job-id if available, or seed? + if ( + recorded_url.warcprox_meta + and "metadata" in recorded_url.warcprox_meta + and "ait-job-id" in recorded_url.warcprox_meta["metadata"] + ): + revisit_key = recorded_url.warcprox_meta["metadata"]["ait-job-id"] + else: + revisit_key = 'all' + + # query = f"SELECT exists(SELECT 1 FROM {revisit_key} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" try: conn = psycopg2.connect(self.datasource) @@ -498,7 +532,8 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix ''' buckets = collections.defaultdict(list) discards = [] - # for duplicate checks, see https://webarchive.jira.com/browse/WT-31 + # current batch often contains new duplicates, so check for them here + # see https://webarchive.jira.com/browse/WT-31 hash_plus_urls = set() for recorded_url in batch: if not recorded_url.payload_digest: From b3f7b09298e8da07a994d19a8e668c9ca3e545db Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Tue, 27 Jun 2023 11:47:55 -0700 Subject: [PATCH 6/6] fixes for qa prototyping --- warcprox/dedup.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/warcprox/dedup.py b/warcprox/dedup.py index d9f2934..27705c9 100644 --- a/warcprox/dedup.py +++ b/warcprox/dedup.py @@ -65,11 +65,11 @@ class LimitRevisitsPGMixin(object): """ Limit revisits recorded to one per revisit_key """ - def __init__(self, datasource, options=warcprox.Options()): + def __init__(self, options=warcprox.Options()): import psycopg2 - from psycopg2 import extras # needed - self.datasource = datasource # "postgresql://user@db_host/db_name" - self.datatable = 'crawled_urls' # postgres table in db_name + from psycopg2 import extras # TODO: needed? + self.datasource = "postgresql://archiveit@db.qa-archive-it.org/archiveit" # "postgresql://user@db_host/db_name" + self.datatable = "crawl_revisits" # postgres table in db_name def limit_revisits(self, recorded_url, hash_plus_url=None, revisit_key=None): if not hash_plus_url: @@ -78,7 +78,6 @@ class LimitRevisitsPGMixin(object): self.options.base32), recorded_url.url) ) - if not revisit_key: # use ait-job-id if available if ( @@ -90,7 +89,7 @@ class LimitRevisitsPGMixin(object): else: revisit_key = 'all' - query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1;" + query = f"SELECT exists(SELECT 1 FROM {self.datatable} WHERE hash_plus_url = {hash_plus_url} LIMIT 1);" try: conn = psycopg2.connect(self.datasource) @@ -104,12 +103,14 @@ class LimitRevisitsPGMixin(object): self.logger.warning("exception querying for %s in %s: %s", hash_plus_url, revisit_key, e) return False result = cur.fetchone() - if result[0]: + logging.info("result[0]: %s", result[0]) + + if result[0] and result[0] == True: logging.info("skipping revisit for url %s and hash %s", recorded_url.url, hash) return True else: - query = "INSERT INTO {self.datable} VALUES(revisit_key, hash_plus_url);" + query = f"INSERT INTO {self.datatable} VALUES({revisit_key}, {hash_plus_url});" try: cur.execute(query) except Exception as e: @@ -518,8 +519,7 @@ class BatchTroughLoader(warcprox.BaseBatchPostfetchProcessor, LimitRevisitsPGMix def __init__(self, trough_dedup_db, options=warcprox.Options()): warcprox.BaseBatchPostfetchProcessor.__init__(self, options) - LimitRevisitsPGMixin.__init__(self, datasource="", options) - # TODO: get datasource + LimitRevisitsPGMixin.__init__(self, datasource, options) self.trough_dedup_db = trough_dedup_db def _startup(self):