from gevent import monkey; monkey.patch_all(thread=False)

import re
import json
import os

import webtest

from six.moves.urllib.parse import urlencode

from pywb.warcserver.index.cdxobject import CDXObject

from pywb.warcserver.test.testutils import BaseTestClass
from pywb.warcserver.warcserver import WarcServer


# ============================================================================
class TestCDXApp(BaseTestClass):
    @classmethod
    def setup_class(cls):
        super(TestCDXApp, cls).setup_class()
        config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config_test.yaml')
        cls.testapp = webtest.TestApp(WarcServer(config_file=config_file))

    def query(self, url, is_error=False, **params):
        params['url'] = url
        return self.testapp.get('/pywb/index?' + urlencode(params, doseq=1), expect_errors=is_error)

    def test_exact_url(self):
        """
        basic exact match, no filters, etc.
        """
        resp = self.query('http://www.iana.org/')

        assert resp.status_code == 200
        assert len(resp.text.splitlines()) == 3, resp.text

    def test_exact_url_json(self):
        """
        basic exact match, no filters, etc.
        """
        resp = self.query('http://www.iana.org/', output='json')

        assert resp.status_code == 200
        lines = resp.text.splitlines()
        assert len(lines) == 3, resp.text
        assert len(list(map(json.loads, lines))) == 3

    def test_exact_url_plain_text(self):
        """
        basic exact match, no filters, etc.
        """
        resp = self.query('http://www.iana.org/', output='text')

        assert resp.status_code == 200
        assert resp.content_type == 'text/plain'
        assert '{' not in resp.text

        lines = resp.text.splitlines()
        assert len(lines) == 3, resp.text

    def test_prefix_match(self):
        """
        prefix match test
        """
        resp = self.query('http://www.iana.org/', matchType='prefix')

        assert resp.status_code == 200

        suburls = 0
        for l in resp.text.splitlines():
            fields = l.split(' ')
            if len(fields[0]) > len('org,iana)/'):
                suburls += 1
        assert suburls > 0

    def test_filters_1(self):
        """
        filter cdxes by mimetype and filename field, exact match.
        """
        resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
                     filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))

        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        lines = resp.text.splitlines()
        assert len(lines) > 0

        for l in lines:
            cdx = CDXObject(l.encode('utf-8'))
            assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
            assert cdx['timestamp'] == '20140127171239'
            assert cdx['mime'] == 'warc/revisit'
            assert cdx['filename'] == 'dupes.warc.gz'

    def test_filters_2_no_fuzzy_no_match(self):
        """
        two filters, disable fuzzy matching
        """
        resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
                     filter=('!mime:warc/revisit', 'filename:dupes.warc.gz'),
                     allowFuzzy='false')

        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        lines = resp.text.splitlines()
        assert len(lines) == 0

    def test_filters_3(self):
        """
        filter cdxes by mimetype and filename field, exact match.
        """
        resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
                     filter=('!mime:warc/revisit', '!filename:dupes.warc.gz'))

        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        lines = resp.text.splitlines()
        assert len(lines) == 1

        for l in lines:
            cdx = CDXObject(l.encode('utf-8'))
            assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
            assert cdx['timestamp'] == '20140126200625'
            assert cdx['mime'] == 'text/css'
            assert cdx['filename'] == 'iana.warc.gz'

    def test_limit(self):
        resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
                     limit='1')

        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        cdxes = resp.text.splitlines()
        assert len(cdxes) == 1

        cdx = CDXObject(cdxes[0].encode('utf-8'))
        assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
        assert cdx['timestamp'] == '20140126200625'
        assert cdx['mime'] == 'text/css'

        resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
                     limit='1', reverse='1')

        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        cdxes = resp.text.splitlines()
        assert len(cdxes) == 1

        cdx = CDXObject(cdxes[0].encode('utf-8'))
        assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
        assert cdx['timestamp'] == '20140127171239'
        assert cdx['mime'] == 'warc/revisit'

    def test_fields(self):
        """
        retrieve subset of fields with ``fields`` parameter.
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     fields='urlkey,timestamp,status')

        assert resp.status_code == 200

        cdxes = resp.text.splitlines()

        for cdx in cdxes:
            cdx = CDXObject(cdx.encode('utf-8'))
            assert cdx['urlkey'] == 'org,iana)/_css/2013.1/print.css'
            assert re.match(r'\d{14}$', cdx['timestamp'])
            assert re.match(r'\d{3}|-', cdx['status'])

    def test_fields_json(self):
        """
        retrieve subset of fields with ``fields`` parameter, in json
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     fields='urlkey,timestamp,status',
                     output='json')

        assert resp.status_code == 200

        cdxes = resp.text.splitlines()

        for cdx in cdxes:
            print(cdx)
            fields = json.loads(cdx)
            assert len(fields) == 3
            assert fields['urlkey'] == 'org,iana)/_css/2013.1/print.css'
            assert re.match(r'\d{14}$', fields['timestamp'])
            assert re.match(r'\d{3}|-', fields['status'])

    def test_fields_undefined(self):
        """
        server shall respond with Bad Request and name of undefined
        when ``fields`` parameter contains undefined name(s).
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     is_error=True,
                     fields='urlkey,nosuchfield')

        resp.status_code == 400

    def test_fields_undefined_json(self):
        """
        server shall respond with Bad Request and name of undefined
        when ``fields`` parameter contains undefined name(s).
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     is_error=True,
                     fields='urlkey,nosuchfield',
                     output='json')

        resp.status_code == 400

    def test_resolveRevisits(self):
        """
        with ``resolveRevisits=true``, server adds three fields pointing to
        the *original* capture.
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     resolveRevisits='true'
                     )
        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        cdxes = resp.text.splitlines()
        originals = {}
        for cdx in cdxes:
            cdx = CDXObject(cdx.encode('utf-8'))
            assert len(cdx) == 16

            # orig.* fields are either all '-' or (int, int, filename)
            # check if orig.* fields are equals to corresponding fields
            # for the original capture.

            sha = cdx['digest']
            if cdx['orig.length'] == '-':
                assert cdx['orig.offset'] == '-' and cdx['orig.filename'] == '-'
                originals[sha] = (int(cdx['length']), int(cdx['offset']), cdx['filename'])
            else:
                orig = originals.get(sha)
                assert orig == (int(cdx['orig.length']), int(cdx['orig.offset']), cdx['orig.filename'])

    def test_resolveRevisits_orig_fields(self):
        """
        when resolveRevisits=true, extra three fields are named
        ``orig.length``, ``orig.offset`` and ``orig.filename``, respectively.
        it is possible to filter fields by these names.
        """
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     resolveRevisits='1',
                     fields='urlkey,orig.length,orig.offset,orig.filename'
                     )
        assert resp.status_code == 200
        assert resp.content_type == 'text/x-cdxj'

        cdxes = resp.text.splitlines()
        cdx = cdxes[0]
        cdx = CDXObject(cdx.encode('utf-8'))
        assert cdx['orig.offset'] == '-'
        assert cdx['orig.length'] == '-'
        assert cdx['orig.filename'] == '-'

        for cdx in cdxes[1:]:
            cdx = CDXObject(cdx.encode('utf-8'))
            assert cdx['orig.offset'] != '-'
            assert cdx['orig.length'] != '-'
            assert cdx['orig.filename'] == 'iana.warc.gz'

    def test_collapseTime_resolveRevisits_reverse(self):
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                     collapseTime='11',
                     resolveRevisits='true',
                     reverse='true'
                     )

        cdxes = [CDXObject(l) for l in resp.body.splitlines()]

        assert len(cdxes) == 3

        # timestamp is in descending order
        for i in range(len(cdxes) - 1):
            assert cdxes[i]['timestamp'] >= cdxes[i + 1]['timestamp']

    def test_error_unknown_output_format(self):
        """test unknown output format in combination with a list of output fields"""
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                          is_error=True,
                          fields='urlkey,timestamp,status',
                          output='foo')
        assert resp.status_code == 400
        assert resp.json == {'message': 'output=foo not supported'}

    def test_error_unknown_match_type(self):
        """test unknown/unsupported matchType"""
        resp = self.query('http://www.iana.org/_css/2013.1/print.css',
                          is_error=True,
                          fields='urlkey,timestamp,status',
                          matchType='foo')
        assert resp.status_code == 400
        assert resp.json == {'message': 'Invalid match_type: foo'}