From 5631eaced1d175d0f733e4a9441c0af5c0510609 Mon Sep 17 00:00:00 2001
From: Vangelis Banos <vangelis@archive.org>
Date: Tue, 23 Jan 2018 23:16:35 +0000
Subject: [PATCH 1/2] Parallelize CDX Server dedup queries

---
 warcprox/dedup.py | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)

diff --git a/warcprox/dedup.py b/warcprox/dedup.py
index cb65408..d5ff7c8 100644
--- a/warcprox/dedup.py
+++ b/warcprox/dedup.py
@@ -212,6 +212,9 @@ class CdxServerDedup(DedupDb):
         if options.cdxserver_dedup_cookies:
             self.cookies = options.cdxserver_dedup_cookies
 
+    def loader(self, *args, **kwargs):
+        return CdxServerDedupLoader(self, self.options)
+
     def start(self):
         pass
 
@@ -265,6 +268,33 @@ class CdxServerDedup(DedupDb):
         """
         pass
 
+class CdxServerDedupLoader(warcprox.BaseBatchPostfetchProcessor):
+    def __init__(self, cdx_dedup, options=warcprox.Options()):
+        warcprox.BaseBatchPostfetchProcessor.__init__(self, options)
+        self.pool = futures.ThreadPoolExecutor(max_workers=50)
+        self.batch = set()
+        self.cdx_dedup = cdx_dedup
+
+    def _get_process_put(self):
+        recorded_url = self.inq.get(block=True, timeout=0.5)
+        self.batch.add(recorded_url)
+        self.pool.submit(self._process_url, recorded_url)
+
+    def _process_url(self, recorded_url):
+        try:
+            digest_key = warcprox.digest_str(recorded_url.payload_digest,
+                                             self.options.base32)
+            dedup_info = self.cdx_dedup.lookup(digest_key, recorded_url.url)
+            if dedup_info:
+                recorded_url.dedup_info = dedup_info
+        except ValueError as exc:
+            self.logger.error('CdxServerDedupLoader _process_url failed for url=%s %s',
+                              recorded_url.url, exc)
+        finally:
+            self.batch.remove(recorded_url)
+            if self.outq:
+                self.outq.put(recorded_url)
+
 class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
     def __init__(self, trough_dedup_db, options=warcprox.Options()):
         warcprox.BaseBatchPostfetchProcessor.__init__(self, options)

From 824c19414248c2376ccdac570ea5345d188869a8 Mon Sep 17 00:00:00 2001
From: Noah Levitt <nlevitt@archive.org>
Date: Wed, 24 Jan 2018 16:07:45 -0800
Subject: [PATCH 2/2] make plugin api more flexible

---
 README.rst             | 48 ++++++++++++++++++++++++++----------------
 setup.py               |  4 ++--
 tests/test_warcprox.py | 16 +++++++++++---
 warcprox/controller.py | 24 ++++++++++++++-------
 warcprox/dedup.py      |  2 +-
 warcprox/main.py       |  8 ++-----
 6 files changed, 64 insertions(+), 38 deletions(-)

diff --git a/README.rst b/README.rst
index de7988c..397b930 100644
--- a/README.rst
+++ b/README.rst
@@ -34,20 +34,34 @@ get the warning when you visit each new site. But worse, any embedded
 https content on a different server will simply fail to load, because
 the browser will reject the certificate without telling you.
 
+Plugins
+~~~~~~~
+
+Warcprox supports a limited notion of plugins by way of the `--plugin` command
+line argument. Plugin classes are loaded from the regular python module search
+path. They will be instantiated with one argument, a `warcprox.Options`, which
+holds the values of all the command line arguments. Legacy plugins with
+constructors that take no arguments are also supported. Plugins should either
+have a method `notify(self, recorded_url, records)` or should subclass
+`warcprox.BasePostfetchProcessor`. More than one plugin can be configured by
+specifying `--plugin` multiples times.
+
+XXX example?
+
 Usage
 ~~~~~
 
 ::
 
     usage: warcprox [-h] [-p PORT] [-b ADDRESS] [-c CACERT]
-                    [--certs-dir CERTS_DIR] [-d DIRECTORY] [-z] [-n PREFIX]
+                    [--certs-dir CERTS_DIR] [-d DIRECTORY]
+                    [--warc-filename WARC_FILENAME] [-z] [-n PREFIX]
                     [-s ROLLOVER_SIZE]
                     [--rollover-idle-time ROLLOVER_IDLE_TIME]
                     [-g DIGEST_ALGORITHM] [--base32]
                     [--method-filter HTTP_METHOD]
                     [--stats-db-file STATS_DB_FILE | --rethinkdb-stats-url RETHINKDB_STATS_URL]
                     [-P PLAYBACK_PORT]
-                    [--playback-index-db-file PLAYBACK_INDEX_DB_FILE]
                     [-j DEDUP_DB_FILE | --rethinkdb-dedup-url RETHINKDB_DEDUP_URL | --rethinkdb-big-table-url RETHINKDB_BIG_TABLE_URL | --rethinkdb-trough-db-url RETHINKDB_TROUGH_DB_URL | --cdxserver-dedup CDXSERVER_DEDUP]
                     [--rethinkdb-services-url RETHINKDB_SERVICES_URL]
                     [--onion-tor-socks-proxy ONION_TOR_SOCKS_PROXY]
@@ -63,13 +77,19 @@ Usage
                             address to listen on (default: localhost)
       -c CACERT, --cacert CACERT
                             CA certificate file; if file does not exist, it
-                            will be created (default: ./ayutla.local-warcprox-
-                            ca.pem)
+                            will be created (default:
+                            ./ayutla.monkeybrains.net-warcprox-ca.pem)
       --certs-dir CERTS_DIR
                             where to store and load generated certificates
-                            (default: ./ayutla.local-warcprox-ca)
+                            (default: ./ayutla.monkeybrains.net-warcprox-ca)
       -d DIRECTORY, --dir DIRECTORY
                             where to write warcs (default: ./warcs)
+      --warc-filename WARC_FILENAME
+                            define custom WARC filename with variables
+                            {prefix}, {timestamp14}, {timestamp17},
+                            {serialno}, {randomtoken}, {hostname},
+                            {shorthostname} (default:
+                            {prefix}-{timestamp17}-{serialno}-{randomtoken})
       -z, --gzip            write gzip-compressed warc records
       -n PREFIX, --prefix PREFIX
                             default WARC filename prefix (default: WARCPROX)
@@ -81,8 +101,8 @@ Usage
                             (so that Friday's last open WARC doesn't sit there
                             all weekend waiting for more data) (default: None)
       -g DIGEST_ALGORITHM, --digest-algorithm DIGEST_ALGORITHM
-                            digest algorithm, one of sha256, sha224, sha512,
-                            sha384, md5, sha1 (default: sha1)
+                            digest algorithm, one of sha384, sha224, md5,
+                            sha256, sha512, sha1 (default: sha1)
       --base32              write digests in Base32 instead of hex
       --method-filter HTTP_METHOD
                             only record requests with the given http method(s)
@@ -98,10 +118,6 @@ Usage
       -P PLAYBACK_PORT, --playback-port PLAYBACK_PORT
                             port to listen on for instant playback (default:
                             None)
-      --playback-index-db-file PLAYBACK_INDEX_DB_FILE
-                            playback index database file (only used if
-                            --playback-port is specified) (default:
-                            ./warcprox-playback-index.db)
       -j DEDUP_DB_FILE, --dedup-db-file DEDUP_DB_FILE
                             persistent deduplication database file; empty
                             string or /dev/null disables deduplication
@@ -138,12 +154,8 @@ Usage
       --plugin PLUGIN_CLASS
                             Qualified name of plugin class, e.g.
                             "mypkg.mymod.MyClass". May be used multiple times
-                            to register multiple plugins. Plugin classes are
-                            loaded from the regular python module search path.
-                            They will be instantiated with no arguments and
-                            must have a method `notify(self, recorded_url,
-                            records)` which will be called for each url, after
-                            warc records have been written. (default: None)
+                            to register multiple plugins. See README.rst for
+                            more information. (default: None)
       --version             show program's version number and exit
       -v, --verbose
       --trace
@@ -156,7 +168,7 @@ Warcprox is a derivative work of pymiproxy, which is GPL. Thus warcprox is also
 GPL.
 
 * Copyright (C) 2012 Cygnos Corporation
-* Copyright (C) 2013-2017 Internet Archive
+* Copyright (C) 2013-2018 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
diff --git a/setup.py b/setup.py
index f345120..9b61523 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@
 '''
 setup.py - setuptools installation configuration for warcprox
 
-Copyright (C) 2013-2017 Internet Archive
+Copyright (C) 2013-2016 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -52,7 +52,7 @@ except:
 
 setuptools.setup(
         name='warcprox',
-        version='2.4b1.dev143',
+        version='2.4b1.dev144',
         description='WARC writing MITM HTTP/S proxy',
         url='https://github.com/internetarchive/warcprox',
         author='Noah Levitt',
diff --git a/tests/test_warcprox.py b/tests/test_warcprox.py
index d091542..886902f 100755
--- a/tests/test_warcprox.py
+++ b/tests/test_warcprox.py
@@ -3,7 +3,7 @@
 '''
 tests/test_warcprox.py - automated tests for warcprox
 
-Copyright (C) 2013-2017 Internet Archive
+Copyright (C) 2013-2018 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -1396,7 +1396,10 @@ def test_controller_with_defaults():
     assert wwt.writer_pool.default_warc_writer.record_builder.digest_algorithm == 'sha1'
 
 def test_load_plugin():
-    options = warcprox.Options(port=0, plugins=['warcprox.stats.RunningStats'])
+    options = warcprox.Options(port=0, plugins=[
+        'warcprox.stats.RunningStats',
+        'warcprox.BaseStandardPostfetchProcessor',
+        'warcprox.BaseBatchPostfetchProcessor',])
     controller = warcprox.controller.WarcproxController(options)
     assert isinstance(
             controller._postfetch_chain[-1],
@@ -1404,11 +1407,18 @@ def test_load_plugin():
     assert isinstance(
             controller._postfetch_chain[-1].listener,
             warcprox.stats.RunningStats)
+
     assert isinstance(
             controller._postfetch_chain[-2],
+            warcprox.BaseBatchPostfetchProcessor)
+    assert isinstance(
+            controller._postfetch_chain[-3],
+            warcprox.BaseStandardPostfetchProcessor)
+    assert isinstance(
+            controller._postfetch_chain[-4],
             warcprox.ListenerPostfetchProcessor)
     assert isinstance(
-            controller._postfetch_chain[-2].listener,
+            controller._postfetch_chain[-4].listener,
             warcprox.stats.RunningStats)
 
 def test_choose_a_port_for_me(warcprox_):
diff --git a/warcprox/controller.py b/warcprox/controller.py
index fe9960a..85fa42d 100644
--- a/warcprox/controller.py
+++ b/warcprox/controller.py
@@ -92,13 +92,17 @@ class Factory:
             return None
 
     @staticmethod
-    def plugin(qualname):
+    def plugin(qualname, options):
         try:
             (module_name, class_name) = qualname.rsplit('.', 1)
             module_ = importlib.import_module(module_name)
             class_ = getattr(module_, class_name)
-            plugin = class_()
-            plugin.notify  # make sure it has this method
+            try: # new plugins take `options` argument
+                plugin = class_(options)
+            except: # backward-compatibility
+                plugin = class_()
+            # check that this is either a listener or a batch processor
+            assert hasattr(plugin, 'notify') ^ hasattr(plugin, '_startup')
             return plugin
         except Exception as e:
             logging.fatal('problem with plugin class %r: %s', qualname, e)
@@ -197,15 +201,19 @@ class WarcproxController(object):
                     warcprox.ListenerPostfetchProcessor(
                         crawl_logger, self.options))
 
+        for qualname in self.options.plugins or []:
+            plugin = Factory.plugin(qualname, self.options)
+            if hasattr(plugin, 'notify'):
+                self._postfetch_chain.append(
+                        warcprox.ListenerPostfetchProcessor(
+                            plugin, self.options))
+            else:
+                self._postfetch_chain.append(plugin)
+
         self._postfetch_chain.append(
                 warcprox.ListenerPostfetchProcessor(
                     self.proxy.running_stats, self.options))
 
-        for qualname in self.options.plugins or []:
-            plugin = Factory.plugin(qualname)
-            self._postfetch_chain.append(
-                    warcprox.ListenerPostfetchProcessor(plugin, self.options))
-
         # chain them all up
         self._postfetch_chain[0].inq = inq
         for i in range(1, len(self._postfetch_chain)):
diff --git a/warcprox/dedup.py b/warcprox/dedup.py
index d5ff7c8..8b27874 100644
--- a/warcprox/dedup.py
+++ b/warcprox/dedup.py
@@ -302,7 +302,7 @@ class BatchTroughStorer(warcprox.BaseBatchPostfetchProcessor):
 
     def _filter_and_bucketize(self, batch):
         '''
-        Returns `{bucket: [recorded_url, ...]}`, excluding urls that should
+        Returns `{bucket: [recorded_url, ...]}`, excluding urls that should not
         have dedup info stored.
         '''
         buckets = collections.defaultdict(list)
diff --git a/warcprox/main.py b/warcprox/main.py
index 1f270a1..f663d0d 100644
--- a/warcprox/main.py
+++ b/warcprox/main.py
@@ -4,7 +4,7 @@
 warcprox/main.py - entrypoint for warcprox executable, parses command line
 arguments, initializes components, starts controller, handles signals
 
-Copyright (C) 2013-2017 Internet Archive
+Copyright (C) 2013-2018 Internet Archive
 
 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License
@@ -172,11 +172,7 @@ def _build_arg_parser(prog='warcprox'):
             action='append', help=(
                 'Qualified name of plugin class, e.g. "mypkg.mymod.MyClass". '
                 'May be used multiple times to register multiple plugins. '
-                'Plugin classes are loaded from the regular python module '
-                'search path. They will be instantiated with no arguments and '
-                'must have a method `notify(self, recorded_url, records)` '
-                'which will be called for each url, after warc records have '
-                'been written.'))
+                'See README.rst for more information.'))
     arg_parser.add_argument('--version', action='version',
             version="warcprox {}".format(warcprox.__version__))
     arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')