#encoding: utf-8

from __future__ import with_statement
import copy
from datetime import datetime, timedelta

import pytest

from whoosh import analysis, fields, index, qparser, query, searching, scoring
from whoosh.codec.whoosh3 import W3Codec
from whoosh.compat import b, u, text_type
from whoosh.compat import xrange, permutations, izip_longest
from whoosh.filedb.filestore import RamStorage
from whoosh.util.testing import TempIndex


def make_index():
    s = fields.Schema(key=fields.ID(stored=True),
                      name=fields.TEXT,
                      value=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(s)

    w = ix.writer()
    w.add_document(key=u("A"), name=u("Yellow brown"),
                   value=u("Blue red green render purple?"))
    w.add_document(key=u("B"), name=u("Alpha beta"),
                   value=u("Gamma delta epsilon omega."))
    w.add_document(key=u("C"), name=u("One two"),
                   value=u("Three rendered four five."))
    w.add_document(key=u("D"), name=u("Quick went"),
                   value=u("Every red town."))
    w.add_document(key=u("E"), name=u("Yellow uptown"),
                   value=u("Interest rendering outer photo!"))
    w.commit()

    return ix


def _get_keys(stored_fields):
    return sorted([d.get("key") for d in stored_fields])


def _docs(q, s):
    return _get_keys([s.stored_fields(docnum) for docnum
                           in q.docs(s)])


def _run_query(q, target):
    ix = make_index()
    with ix.searcher() as s:
        assert target == _docs(q, s)


def test_empty_index():
    schema = fields.Schema(key=fields.ID(stored=True), value=fields.TEXT)
    st = RamStorage()
    with pytest.raises(index.EmptyIndexError):
        st.open_index(schema=schema)


def test_docs_method():
    ix = make_index()
    with ix.searcher() as s:
        assert _get_keys(s.documents(name="yellow")) == ["A", "E"]
        assert _get_keys(s.documents(value="red")) == ["A", "D"]
        assert _get_keys(s.documents()) == ["A", "B", "C", "D", "E"]


def test_term():
    _run_query(query.Term("name", u("yellow")), [u("A"), u("E")])
    _run_query(query.Term("value", u("zeta")), [])
    _run_query(query.Term("value", u("red")), [u("A"), u("D")])


def test_require():
    _run_query(query.Require(query.Term("value", u("red")),
                             query.Term("name", u("yellow"))),
               [u("A")])


def test_and():
    _run_query(query.And([query.Term("value", u("red")),
                          query.Term("name", u("yellow"))]),
               [u("A")])
    # Missing
    _run_query(query.And([query.Term("value", u("ochre")),
                          query.Term("name", u("glonk"))]),
               [])


def test_or():
    _run_query(query.Or([query.Term("value", u("red")),
                         query.Term("name", u("yellow"))]),
               [u("A"), u("D"), u("E")])
    # Missing
    _run_query(query.Or([query.Term("value", u("ochre")),
                         query.Term("name", u("glonk"))]),
               [])
    _run_query(query.Or([]), [])


def test_ors():
    domain = u("alfa bravo charlie delta").split()
    s = fields.Schema(num=fields.STORED, text=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(s)
    with ix.writer() as w:
        for i, ls in enumerate(permutations(domain)):
            w.add_document(num=i, text=" ".join(ls))

    with ix.searcher() as s:
        qs = [query.Term("text", word) for word in domain]
        for i in xrange(1, len(domain)):
            q = query.Or(qs[:i])
            r1 = [(hit.docnum, hit.score) for hit in s.search(q, limit=None)]

            q.binary_matcher = True
            r2 = [(hit.docnum, hit.score) for hit in s.search(q, limit=None)]

            for item1, item2 in izip_longest(r1, r2):
                assert item1[0] == item2[0]
                assert item1[1] == item2[1]


def test_not():
    _run_query(query.And([query.Or([query.Term("value", u("red")),
                                    query.Term("name", u("yellow"))]),
                          query.Not(query.Term("name", u("quick")))]),
               [u("A"), u("E")])


def test_topnot():
    _run_query(query.Not(query.Term("value", "red")), [u("B"), "C", "E"])
    _run_query(query.Not(query.Term("name", "yellow")), [u("B"), u("C"),
                                                         u("D")])


def test_andnot():
    _run_query(query.AndNot(query.Term("name", u("yellow")),
                            query.Term("value", u("purple"))),
               [u("E")])


def test_andnot2():
    schema = fields.Schema(a=fields.ID(stored=True))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(a=u("bravo"))
    w.add_document(a=u("echo"))
    w.add_document(a=u("juliet"))
    w.commit()
    w = ix.writer()
    w.add_document(a=u("kilo"))
    w.add_document(a=u("foxtrot"))
    w.add_document(a=u("charlie"))
    w.commit(merge=False)
    w = ix.writer()
    w.delete_by_term("a", u("echo"))
    w.add_document(a=u("alfa"))
    w.add_document(a=u("india"))
    w.add_document(a=u("delta"))
    w.commit(merge=False)

    with ix.searcher() as s:
        q = query.TermRange("a", u("bravo"), u("k"))
        qr = [hit["a"] for hit in s.search(q)]
        assert " ".join(sorted(qr)) == "bravo charlie delta foxtrot india juliet"

        oq = query.Or([query.Term("a", "bravo"), query.Term("a", "delta")])
        oqr = [hit["a"] for hit in s.search(oq)]
        assert " ".join(sorted(oqr)) == "bravo delta"

        anq = query.AndNot(q, oq)

        m = anq.matcher(s)
        r = s.search(anq)
        assert list(anq.docs(s)) == sorted(hit.docnum for hit in r)
        assert " ".join(sorted(hit["a"] for hit in r)) == "charlie foxtrot india juliet"


def test_variations():
    _run_query(query.Variations("value", u("render")),
               [u("A"), u("C"), u("E")])


def test_wildcard():
    _run_query(query.Or([query.Wildcard('value', u('*red*')),
                         query.Wildcard('name', u('*yellow*'))]),
               [u("A"), u("C"), u("D"), u("E")])
    # Missing
    _run_query(query.Wildcard('value', 'glonk*'), [])


def test_not2():
    schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
    storage = RamStorage()
    ix = storage.create_index(schema)
    writer = ix.writer()
    writer.add_document(name=u("a"), value=u("alfa bravo charlie delta echo"))
    writer.add_document(name=u("b"),
                        value=u("bravo charlie delta echo foxtrot"))
    writer.add_document(name=u("c"),
                        value=u("charlie delta echo foxtrot golf"))
    writer.add_document(name=u("d"), value=u("delta echo golf hotel india"))
    writer.add_document(name=u("e"), value=u("echo golf hotel india juliet"))
    writer.commit()

    with ix.searcher() as s:
        p = qparser.QueryParser("value", None)
        results = s.search(p.parse("echo NOT golf"))
        assert sorted([d["name"] for d in results]) == ["a", "b"]

        results = s.search(p.parse("echo NOT bravo"))
        assert sorted([d["name"] for d in results]) == ["c", "d", "e"]

    ix.delete_by_term("value", u("bravo"))

    with ix.searcher() as s:
        results = s.search(p.parse("echo NOT charlie"))
        assert sorted([d["name"] for d in results]) == ["d", "e"]

#    def test_or_minmatch():
#        schema = fields.Schema(k=fields.STORED, v=fields.TEXT)
#        st = RamStorage()
#        ix = st.create_index(schema)
#
#        w = ix.writer()
#        w.add_document(k=1, v=u("alfa bravo charlie delta echo"))
#        w.add_document(k=2, v=u("bravo charlie delta echo foxtrot"))
#        w.add_document(k=3, v=u("charlie delta echo foxtrot golf"))
#        w.add_document(k=4, v=u("delta echo foxtrot golf hotel"))
#        w.add_document(k=5, v=u("echo foxtrot golf hotel india"))
#        w.add_document(k=6, v=u("foxtrot golf hotel india juliet"))
#        w.commit()
#
#        s = ix.searcher()
#        q = Or([Term("v", "echo"), Term("v", "foxtrot")], minmatch=2)
#        r = s.search(q)
#        assert sorted(d["k"] for d in r), [2, 3, 4, 5])


def test_range():
    schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("A"), content=u("alfa bravo charlie delta echo"))
    w.add_document(id=u("B"), content=u("bravo charlie delta echo foxtrot"))
    w.add_document(id=u("C"), content=u("charlie delta echo foxtrot golf"))
    w.add_document(id=u("D"), content=u("delta echo foxtrot golf hotel"))
    w.add_document(id=u("E"), content=u("echo foxtrot golf hotel india"))
    w.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("content", schema)

        q = qp.parse(u("charlie [delta TO foxtrot]"))
        assert q.__class__ == query.And
        assert q[0].__class__ == query.Term
        assert q[1].__class__ == query.TermRange
        assert q[1].start == "delta"
        assert q[1].end == "foxtrot"
        assert not q[1].startexcl
        assert not q[1].endexcl
        ids = sorted([d['id'] for d in s.search(q)])
        assert ids == [u('A'), u('B'), u('C')]

        q = qp.parse(u("foxtrot {echo TO hotel]"))
        assert q.__class__ == query.And
        assert q[0].__class__ == query.Term
        assert q[1].__class__ == query.TermRange
        assert q[1].start == "echo"
        assert q[1].end == "hotel"
        assert q[1].startexcl
        assert not q[1].endexcl
        ids = sorted([d['id'] for d in s.search(q)])
        assert ids == [u('B'), u('C'), u('D'), u('E')]

        q = qp.parse(u("{bravo TO delta}"))
        assert q.__class__ == query.TermRange
        assert q.start == "bravo"
        assert q.end == "delta"
        assert q.startexcl
        assert q.endexcl
        ids = sorted([d['id'] for d in s.search(q)])
        assert ids == [u('A'), u('B'), u('C')]

        # Shouldn't match anything
        q = qp.parse(u("[1 to 10]"))
        assert q.__class__ == query.TermRange
        assert len(s.search(q)) == 0


def test_range_clusiveness():
    schema = fields.Schema(id=fields.ID(stored=True))
    st = RamStorage()
    ix = st.create_index(schema)
    w = ix.writer()
    for letter in u("abcdefg"):
        w.add_document(id=letter)
    w.commit()

    with ix.searcher() as s:
        def check(startexcl, endexcl, string):
            q = query.TermRange("id", "b", "f", startexcl, endexcl)
            r = "".join(sorted(d['id'] for d in s.search(q)))
            assert r == string

        check(False, False, "bcdef")
        check(True, False, "cdef")
        check(True, True, "cde")
        check(False, True, "bcde")


def test_open_ranges():
    schema = fields.Schema(id=fields.ID(stored=True))
    st = RamStorage()
    ix = st.create_index(schema)
    w = ix.writer()
    for letter in u("abcdefg"):
        w.add_document(id=letter)
    w.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("id", schema)

        def check(qstring, result):
            q = qp.parse(qstring)
            r = "".join(sorted([d['id'] for d in s.search(q)]))
            assert r == result

        check(u("[b TO]"), "bcdefg")
        check(u("[TO e]"), "abcde")
        check(u("[b TO d]"), "bcd")
        check(u("{b TO]"), "cdefg")
        check(u("[TO e}"), "abcd")
        check(u("{b TO d}"), "c")


def test_open_numeric_ranges():
    domain = range(0, 1000, 7)

    schema = fields.Schema(num=fields.NUMERIC(stored=True))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    for i in domain:
        w.add_document(num=i)
    w.commit()

    qp = qparser.QueryParser("num", schema)
    with ix.searcher() as s:
        q = qp.parse("[100 to]")
        r = [hit["num"] for hit in s.search(q, limit=None)]
        assert r == [n for n in domain if n >= 100]

        q = qp.parse("[to 500]")
        r = [hit["num"] for hit in s.search(q, limit=None)]
        assert r == [n for n in domain if n <= 500]


def test_open_date_ranges():
    basedate = datetime(2011, 1, 24, 6, 25, 0, 0)
    domain = [basedate + timedelta(days=n) for n in xrange(-20, 20)]

    schema = fields.Schema(date=fields.DATETIME(stored=True))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    for d in domain:
        w.add_document(date=d)
    w.commit()

    with ix.searcher() as s:
        # Without date parser
        qp = qparser.QueryParser("date", schema)
        q = qp.parse("[2011-01-10 to]")
        r = [hit["date"] for hit in s.search(q, limit=None)]
        assert len(r) > 0
        target = [d for d in domain if d >= datetime(2011, 1, 10, 6, 25)]
        assert r == target

        q = qp.parse("[to 2011-01-30]")
        r = [hit["date"] for hit in s.search(q, limit=None)]
        assert len(r) > 0
        target = [d for d in domain if d <= datetime(2011, 1, 30, 6, 25)]
        assert r == target

        # With date parser
        from whoosh.qparser.dateparse import DateParserPlugin
        qp.add_plugin(DateParserPlugin(basedate))

        q = qp.parse("[10 jan 2011 to]")
        r = [hit["date"] for hit in s.search(q, limit=None)]
        assert len(r) > 0
        target = [d for d in domain if d >= datetime(2011, 1, 10, 6, 25)]
        assert r == target

        q = qp.parse("[to 30 jan 2011]")
        r = [hit["date"] for hit in s.search(q, limit=None)]
        assert len(r) > 0
        target = [d for d in domain if d <= datetime(2011, 1, 30, 6, 25)]
        assert r == target


def test_negated_unlimited_ranges():
    # Whoosh should treat u("[to]") as if it was "*"
    schema = fields.Schema(id=fields.ID(stored=True), num=fields.NUMERIC,
                           date=fields.DATETIME)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    from string import ascii_letters
    domain = text_type(ascii_letters)

    dt = datetime.now()
    for i, letter in enumerate(domain):
        w.add_document(id=letter, num=i, date=dt + timedelta(days=i))
    w.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("id", schema)

        nq = qp.parse(u("NOT [to]"))
        assert nq.__class__ == query.Not
        q = nq.query
        assert q.__class__ == query.Every
        assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
        assert not list(nq.docs(s))

        nq = qp.parse(u("NOT num:[to]"))
        assert nq.__class__ == query.Not
        q = nq.query
        assert q.__class__ == query.NumericRange
        assert q.start is None
        assert q.end is None
        assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
        assert not list(nq.docs(s))

        nq = qp.parse(u("NOT date:[to]"))
        assert nq.__class__ == query.Not
        q = nq.query
        assert q.__class__ == query.Every
        assert "".join(h["id"] for h in s.search(q, limit=None)) == domain
        assert not list(nq.docs(s))


def test_keyword_or():
    schema = fields.Schema(a=fields.ID(stored=True), b=fields.KEYWORD)
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(a=u("First"), b=u("ccc ddd"))
    w.add_document(a=u("Second"), b=u("aaa ddd"))
    w.add_document(a=u("Third"), b=u("ccc eee"))
    w.commit()

    qp = qparser.QueryParser("b", schema)
    with ix.searcher() as s:
        qr = qp.parse(u("b:ccc OR b:eee"))
        assert qr.__class__ == query.Or
        r = s.search(qr)
        assert len(r) == 2
        assert r[0]["a"] == "Third"
        assert r[1]["a"] == "First"


def test_merged():
    schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
    with TempIndex(schema) as ix:
        with ix.writer() as w:
            w.add_document(id=u("alfa"), content=u("alfa"))
            w.add_document(id=u("bravo"), content=u("bravo"))

        with ix.searcher() as s:
            r = s.search(query.Term("content", u("bravo")))
            assert len(r) == 1
            assert r[0]["id"] == "bravo"

        with ix.writer() as w:
            w.add_document(id=u("charlie"), content=u("charlie"))
            w.optimize = True

        assert len(ix._segments()) == 1

        with ix.searcher() as s:
            r = s.search(query.Term("content", u("bravo")))
            assert len(r) == 1
            assert r[0]["id"] == "bravo"


def test_multireader():
    sc = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(sc)
    w = ix.writer()
    w.add_document(id=u("alfa"), content=u("alfa"))
    w.add_document(id=u("bravo"), content=u("bravo"))
    w.add_document(id=u("charlie"), content=u("charlie"))
    w.add_document(id=u("delta"), content=u("delta"))
    w.add_document(id=u("echo"), content=u("echo"))
    w.add_document(id=u("foxtrot"), content=u("foxtrot"))
    w.add_document(id=u("golf"), content=u("golf"))
    w.add_document(id=u("hotel"), content=u("hotel"))
    w.add_document(id=u("india"), content=u("india"))
    w.commit()

    with ix.searcher() as s:
        r = s.search(query.Term("content", u("bravo")))
        assert len(r) == 1
        assert r[0]["id"] == "bravo"

    w = ix.writer()
    w.add_document(id=u("juliet"), content=u("juliet"))
    w.add_document(id=u("kilo"), content=u("kilo"))
    w.add_document(id=u("lima"), content=u("lima"))
    w.add_document(id=u("mike"), content=u("mike"))
    w.add_document(id=u("november"), content=u("november"))
    w.add_document(id=u("oscar"), content=u("oscar"))
    w.add_document(id=u("papa"), content=u("papa"))
    w.add_document(id=u("quebec"), content=u("quebec"))
    w.add_document(id=u("romeo"), content=u("romeo"))
    w.commit()
    assert len(ix._segments()) == 2

    #r = ix.reader()
    #assert r.__class__.__name__ == "MultiReader"
    #pr = r.postings("content", u("bravo"))

    with ix.searcher() as s:
        r = s.search(query.Term("content", u("bravo")))
        assert len(r) == 1
        assert r[0]["id"] == "bravo"


def test_posting_phrase():
    schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
    storage = RamStorage()
    ix = storage.create_index(schema)
    writer = ix.writer()
    writer.add_document(name=u("A"),
                        value=u("Little Miss Muffet sat on a tuffet"))
    writer.add_document(name=u("B"), value=u("Miss Little Muffet tuffet"))
    writer.add_document(name=u("C"), value=u("Miss Little Muffet tuffet sat"))
    writer.add_document(name=u("D"),
                        value=u("Gibberish blonk falunk miss muffet sat " +
                                "tuffet garbonzo"))
    writer.add_document(name=u("E"), value=u("Blah blah blah pancakes"))
    writer.commit()

    with ix.searcher() as s:
        def names(results):
            return sorted([fields['name'] for fields in results])

        q = query.Phrase("value", [u("little"), u("miss"), u("muffet"),
                                   u("sat"), u("tuffet")])
        m = q.matcher(s)
        assert m.__class__.__name__ == "SpanNear2Matcher"

        r = s.search(q)
        assert names(r) == ["A"]
        assert len(r) == 1

        q = query.Phrase("value", [u("miss"), u("muffet"), u("sat"),
                                   u("tuffet")])
        assert names(s.search(q)) == ["A", "D"]

        q = query.Phrase("value", [u("falunk"), u("gibberish")])
        r = s.search(q)
        assert not names(r)
        assert len(r) == 0

        q = query.Phrase("value", [u("gibberish"), u("falunk")], slop=2)
        assert names(s.search(q)) == ["D"]

        q = query.Phrase("value", [u("blah")] * 4)
        assert not names(s.search(q))  # blah blah blah blah

        q = query.Phrase("value", [u("blah")] * 3)
        m = q.matcher(s)
        assert names(s.search(q)) == ["E"]


def test_phrase_score():
    schema = fields.Schema(name=fields.ID(stored=True), value=fields.TEXT)
    storage = RamStorage()
    ix = storage.create_index(schema)
    writer = ix.writer()
    writer.add_document(name=u("A"),
                        value=u("Little Miss Muffet sat on a tuffet"))
    writer.add_document(name=u("D"),
                        value=u("Gibberish blonk falunk miss muffet sat " +
                                "tuffet garbonzo"))
    writer.add_document(name=u("E"), value=u("Blah blah blah pancakes"))
    writer.add_document(name=u("F"),
                        value=u("Little miss muffet little miss muffet"))
    writer.commit()

    with ix.searcher() as s:
        q = query.Phrase("value", [u("little"), u("miss"), u("muffet")])
        m = q.matcher(s)
        assert m.id() == 0
        score1 = m.weight()
        assert score1 > 0
        m.next()
        assert m.id() == 3
        assert m.weight() > score1


def test_stop_phrase():
    schema = fields.Schema(title=fields.TEXT(stored=True))
    storage = RamStorage()
    ix = storage.create_index(schema)
    writer = ix.writer()
    writer.add_document(title=u("Richard of York"))
    writer.add_document(title=u("Lily the Pink"))
    writer.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("title", schema)
        q = qp.parse(u("richard of york"))
        assert q.__unicode__() == "(title:richard AND title:york)"
        assert len(s.search(q)) == 1
        #q = qp.parse(u("lily the pink"))
        #assert len(s.search(q)), 1)
        assert len(s.find("title", u("lily the pink"))) == 1


def test_phrase_order():
    tfield = fields.TEXT(stored=True, analyzer=analysis.SimpleAnalyzer())
    schema = fields.Schema(text=tfield)
    storage = RamStorage()
    ix = storage.create_index(schema)

    writer = ix.writer()
    for ls in permutations(["ape", "bay", "can", "day"], 4):
        writer.add_document(text=u(" ").join(ls))
    writer.commit()

    with ix.searcher() as s:
        def result(q):
            r = s.search(q, limit=None, sortedby=None)
            return sorted([d['text'] for d in r])

        q = query.Phrase("text", ["bay", "can", "day"])
        assert result(q) == [u('ape bay can day'), u('bay can day ape')]


def test_phrase_sameword():
    schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
    storage = RamStorage()
    ix = storage.create_index(schema)

    writer = ix.writer()
    writer.add_document(id=1, text=u("The film Linda Linda Linda is good"))
    writer.add_document(id=2, text=u("The model Linda Evangelista is pretty"))
    writer.commit()

    with ix.searcher() as s:
        r = s.search(query.Phrase("text", ["linda", "linda", "linda"]),
                     limit=None)
        assert len(r) == 1
        assert r[0]["id"] == 1


def test_phrase_multi():
    schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
    ix = RamStorage().create_index(schema)

    domain = u("alfa bravo charlie delta echo").split()
    w = None
    for i, ls in enumerate(permutations(domain)):
        if w is None:
            w = ix.writer()
        w.add_document(id=i, text=u(" ").join(ls))
        if not i % 30:
            w.commit()
            w = None
    if w is not None:
        w.commit()

    with ix.searcher() as s:
        q = query.Phrase("text", ["alfa", "bravo"])
        _ = s.search(q)


def test_missing_field_scoring():
    schema = fields.Schema(name=fields.TEXT(stored=True),
                           hobbies=fields.TEXT(stored=True))
    with TempIndex(schema) as ix:
        with ix.writer() as w:
            w.add_document(name=u('Frank'), hobbies=u('baseball, basketball'))

        with ix.reader() as r:
            assert r.field_length("hobbies") == 2
            assert r.field_length("name") == 1

        with ix.writer() as w:
            w.add_document(name=u('Jonny'))

        with ix.searcher() as s:
            assert s.field_length("hobbies") == 2
            assert s.field_length("name") == 2

            parser = qparser.MultifieldParser(['name', 'hobbies'], schema)
            q = parser.parse(u("baseball"))
            result = s.search(q)
            assert len(result) == 1


def test_search_fieldname_underscores():
    s = fields.Schema(my_name=fields.ID(stored=True), my_value=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(s)

    w = ix.writer()
    w.add_document(my_name=u("Green"), my_value=u("It's not easy being green"))
    w.add_document(my_name=u("Red"),
                   my_value=u("Hopping mad like a playground ball"))
    w.commit()

    qp = qparser.QueryParser("my_value", schema=s)
    with ix.searcher() as s:
        r = s.search(qp.parse(u("my_name:Green")))
        assert r[0]['my_name'] == "Green"


def test_short_prefix():
    s = fields.Schema(name=fields.ID, value=fields.TEXT)
    qp = qparser.QueryParser("value", schema=s)
    q = qp.parse(u("s*"))
    assert q.__class__.__name__ == "Prefix"
    assert q.text == "s"


def test_weighting():
    from whoosh.scoring import Weighting, BaseScorer

    schema = fields.Schema(id=fields.ID(stored=True),
                           n_comments=fields.STORED)
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("1"), n_comments=5)
    w.add_document(id=u("2"), n_comments=12)
    w.add_document(id=u("3"), n_comments=2)
    w.add_document(id=u("4"), n_comments=7)
    w.commit()

    # Fake Weighting implementation
    class CommentWeighting(Weighting):
        def scorer(self, searcher, fieldname, text, qf=1):
            return self.CommentScorer(searcher.stored_fields)

        class CommentScorer(BaseScorer):
            def __init__(self, stored_fields):
                self.stored_fields = stored_fields

            def score(self, matcher):
                sf = self.stored_fields(matcher.id())
                ncomments = sf.get("n_comments", 0)
                return ncomments

    with ix.searcher(weighting=CommentWeighting()) as s:
        q = query.TermRange("id", u("1"), u("4"), constantscore=False)

        r = s.search(q)
        ids = [fs["id"] for fs in r]
        assert ids == ["2", "4", "1", "3"]


def test_dismax():
    schema = fields.Schema(id=fields.STORED,
                           f1=fields.TEXT, f2=fields.TEXT, f3=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=1, f1=u("alfa bravo charlie delta"),
                   f2=u("alfa alfa alfa"),
                   f3=u("alfa echo foxtrot hotel india"))
    w.commit()

    with ix.searcher(weighting=scoring.Frequency()) as s:
        assert list(s.documents(f1="alfa")) == [{"id": 1}]
        assert list(s.documents(f2="alfa")) == [{"id": 1}]
        assert list(s.documents(f3="alfa")) == [{"id": 1}]

        qs = [query.Term("f1", "alfa"), query.Term("f2", "alfa"),
              query.Term("f3", "alfa")]
        dm = query.DisjunctionMax(qs)
        r = s.search(dm)
        assert r.score(0) == 3.0


def test_deleted_wildcard():
    schema = fields.Schema(id=fields.ID(stored=True))
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("alfa"))
    w.add_document(id=u("bravo"))
    w.add_document(id=u("charlie"))
    w.add_document(id=u("delta"))
    w.add_document(id=u("echo"))
    w.add_document(id=u("foxtrot"))
    w.commit()

    w = ix.writer()
    w.delete_by_term("id", "bravo")
    w.delete_by_term("id", "delta")
    w.delete_by_term("id", "echo")
    w.commit()

    with ix.searcher() as s:
        r = s.search(query.Every("id"))
        assert sorted([d['id'] for d in r]) == ["alfa", "charlie", "foxtrot"]


def test_missing_wildcard():
    schema = fields.Schema(id=fields.ID(stored=True), f1=fields.TEXT,
                           f2=fields.TEXT)
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("1"), f1=u("alfa"), f2=u("apple"))
    w.add_document(id=u("2"), f1=u("bravo"))
    w.add_document(id=u("3"), f1=u("charlie"), f2=u("candy"))
    w.add_document(id=u("4"), f2=u("donut"))
    w.add_document(id=u("5"))
    w.commit()

    with ix.searcher() as s:
        r = s.search(query.Every("id"))
        assert sorted([d['id'] for d in r]) == ["1", "2", "3", "4", "5"]

        r = s.search(query.Every("f1"))
        assert sorted([d['id'] for d in r]) == ["1", "2", "3"]

        r = s.search(query.Every("f2"))
        assert sorted([d['id'] for d in r]) == ["1", "3", "4"]


def test_finalweighting():
    from whoosh.scoring import Frequency

    schema = fields.Schema(id=fields.ID(stored=True),
                           summary=fields.TEXT,
                           n_comments=fields.STORED)
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("1"), summary=u("alfa bravo"), n_comments=5)
    w.add_document(id=u("2"), summary=u("alfa"), n_comments=12)
    w.add_document(id=u("3"), summary=u("bravo"), n_comments=2)
    w.add_document(id=u("4"), summary=u("bravo bravo"), n_comments=7)
    w.commit()

    class CommentWeighting(Frequency):
        use_final = True

        def final(self, searcher, docnum, score):
            ncomments = searcher.stored_fields(docnum).get("n_comments", 0)
            return ncomments

    with ix.searcher(weighting=CommentWeighting()) as s:
        q = qparser.QueryParser("summary", None).parse("alfa OR bravo")
        r = s.search(q)
        ids = [fs["id"] for fs in r]
        assert ["2", "4", "1", "3"] == ids


def test_outofdate():
    schema = fields.Schema(id=fields.ID(stored=True))
    st = RamStorage()
    ix = st.create_index(schema)

    w = ix.writer()
    w.add_document(id=u("1"))
    w.add_document(id=u("2"))
    w.commit()

    s = ix.searcher()
    assert s.up_to_date()

    w = ix.writer()
    w.add_document(id=u("3"))
    w.add_document(id=u("4"))

    assert s.up_to_date()
    w.commit()
    assert not s.up_to_date()

    s = s.refresh()
    assert s.up_to_date()
    s.close()


def test_find_missing():
    schema = fields.Schema(id=fields.ID, text=fields.KEYWORD(stored=True))
    ix = RamStorage().create_index(schema)

    w = ix.writer()
    w.add_document(id=u("1"), text=u("alfa"))
    w.add_document(id=u("2"), text=u("bravo"))
    w.add_document(text=u("charlie"))
    w.add_document(id=u("4"), text=u("delta"))
    w.add_document(text=u("echo"))
    w.add_document(id=u("6"), text=u("foxtrot"))
    w.add_document(text=u("golf"))
    w.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("text", schema)
        q = qp.parse(u("NOT id:*"))
        r = s.search(q, limit=None)
        assert list(h["text"] for h in r) == ["charlie", "echo", "golf"]


def test_ngram_phrase():
    f = fields.NGRAM(minsize=2, maxsize=2, phrase=True)
    schema = fields.Schema(text=f, path=fields.ID(stored=True))
    ix = RamStorage().create_index(schema)
    writer = ix.writer()
    writer.add_document(text=u('\u9AD8\u6821\u307E\u3067\u306F\u6771\u4EAC'
                               '\u3067\u3001\u5927\u5B66\u304B\u3089\u306F'
                               '\u4EAC\u5927\u3067\u3059\u3002'),
                        path=u('sample'))
    writer.commit()

    with ix.searcher() as s:
        p = qparser.QueryParser("text", schema)

        q = p.parse(u('\u6771\u4EAC\u5927\u5B66'))
        assert len(s.search(q)) == 1

        q = p.parse(u('"\u6771\u4EAC\u5927\u5B66"'))
        assert len(s.search(q)) == 0

        q = p.parse(u('"\u306F\u6771\u4EAC\u3067"'))
        assert len(s.search(q)) == 1


def test_ordered():
    domain = u("alfa bravo charlie delta echo foxtrot").split(" ")

    schema = fields.Schema(f=fields.TEXT(stored=True))
    ix = RamStorage().create_index(schema)
    writer = ix.writer()
    for ls in permutations(domain):
        writer.add_document(f=u(" ").join(ls))
    writer.commit()

    with ix.searcher() as s:
        q = query.Ordered([query.Term("f", u("alfa")),
                           query.Term("f", u("charlie")),
                           query.Term("f", u("echo"))])
        r = s.search(q)
        for hit in r:
            ls = hit["f"].split()
            assert "alfa" in ls
            assert "charlie" in ls
            assert "echo" in ls
            a = ls.index("alfa")
            c = ls.index("charlie")
            e = ls.index("echo")
            assert a < c and c < e, repr(ls)


def test_otherwise():
    schema = fields.Schema(id=fields.STORED, f=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=1, f=u("alfa one two"))
    w.add_document(id=2, f=u("alfa three four"))
    w.add_document(id=3, f=u("bravo four five"))
    w.add_document(id=4, f=u("bravo six seven"))
    w.commit()

    with ix.searcher() as s:
        q = query.Otherwise(query.Term("f", u("alfa")),
                            query.Term("f", u("six")))
        assert [d["id"] for d in s.search(q)] == [1, 2]

        q = query.Otherwise(query.Term("f", u("tango")),
                            query.Term("f", u("four")))
        assert [d["id"] for d in s.search(q)] == [2, 3]

        q = query.Otherwise(query.Term("f", u("tango")),
                            query.Term("f", u("nine")))
        assert [d["id"] for d in s.search(q)] == []


def test_fuzzyterm():
    schema = fields.Schema(id=fields.STORED, f=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=1, f=u("alfa bravo charlie delta"))
    w.add_document(id=2, f=u("bravo charlie delta echo"))
    w.add_document(id=3, f=u("charlie delta echo foxtrot"))
    w.add_document(id=4, f=u("delta echo foxtrot golf"))
    w.commit()

    with ix.searcher() as s:
        q = query.FuzzyTerm("f", "brave")
        assert [d["id"] for d in s.search(q)] == [1, 2]


def test_fuzzyterm2():
    schema = fields.Schema(id=fields.STORED, f=fields.TEXT(spelling=True))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=1, f=u("alfa bravo charlie delta"))
    w.add_document(id=2, f=u("bravo charlie delta echo"))
    w.add_document(id=3, f=u("charlie delta echo foxtrot"))
    w.add_document(id=4, f=u("delta echo foxtrot golf"))
    w.commit()

    with ix.searcher() as s:
        assert list(s.reader().terms_within("f", u("brave"), 1)) == ["bravo"]
        q = query.FuzzyTerm("f", "brave")
        assert [d["id"] for d in s.search(q)] == [1, 2]


def test_multireader_not():
    schema = fields.Schema(id=fields.STORED, f=fields.TEXT)

    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=0, f=u("alfa bravo chralie"))
    w.add_document(id=1, f=u("bravo chralie delta"))
    w.add_document(id=2, f=u("charlie delta echo"))
    w.add_document(id=3, f=u("delta echo foxtrot"))
    w.add_document(id=4, f=u("echo foxtrot golf"))
    w.commit()

    with ix.searcher() as s:
        q = query.And([query.Term("f", "delta"),
                       query.Not(query.Term("f", "delta"))])
        r = s.search(q)
        assert len(r) == 0

    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=5, f=u("alfa bravo chralie"))
    w.add_document(id=6, f=u("bravo chralie delta"))
    w.commit(merge=False)
    w = ix.writer()
    w.add_document(id=7, f=u("charlie delta echo"))
    w.add_document(id=8, f=u("delta echo foxtrot"))
    w.commit(merge=False)
    w = ix.writer()
    w.add_document(id=9, f=u("echo foxtrot golf"))
    w.add_document(id=10, f=u("foxtrot golf delta"))
    w.commit(merge=False)
    assert len(ix._segments()) > 1

    with ix.searcher() as s:
        q = query.And([query.Term("f", "delta"),
                       query.Not(query.Term("f", "delta"))])
        r = s.search(q)
        assert len(r) == 0


def test_boost_phrase():
    schema = fields.Schema(title=fields.TEXT(field_boost=5.0, stored=True),
                           text=fields.TEXT)
    ix = RamStorage().create_index(schema)
    domain = u("alfa bravo charlie delta").split()
    w = ix.writer()
    for ls in permutations(domain):
        t = u(" ").join(ls)
        w.add_document(title=t, text=t)
    w.commit()

    q = query.Or([query.Term("title", u("alfa")),
                  query.Term("title", u("bravo")),
                  query.Phrase("text", [u("bravo"), u("charlie"), u("delta")])
                  ])

    def boost_phrases(q):
        if isinstance(q, query.Phrase):
            q.boost *= 1000.0
            return q
        else:
            return q.apply(boost_phrases)
    q = boost_phrases(q)

    with ix.searcher() as s:
        r = s.search(q, limit=None)
        for hit in r:
            if "bravo charlie delta" in hit["title"]:
                assert hit.score > 100.0


def test_filter():
    schema = fields.Schema(id=fields.STORED, path=fields.ID, text=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=1, path=u("/a/1"), text=u("alfa bravo charlie"))
    w.add_document(id=2, path=u("/b/1"), text=u("bravo charlie delta"))
    w.add_document(id=3, path=u("/c/1"), text=u("charlie delta echo"))
    w.commit(merge=False)
    w = ix.writer()
    w.add_document(id=4, path=u("/a/2"), text=u("delta echo alfa"))
    w.add_document(id=5, path=u("/b/2"), text=u("echo alfa bravo"))
    w.add_document(id=6, path=u("/c/2"), text=u("alfa bravo charlie"))
    w.commit(merge=False)
    w = ix.writer()
    w.add_document(id=7, path=u("/a/3"), text=u("bravo charlie delta"))
    w.add_document(id=8, path=u("/b/3"), text=u("charlie delta echo"))
    w.add_document(id=9, path=u("/c/3"), text=u("delta echo alfa"))
    w.commit(merge=False)

    with ix.searcher() as s:
        fq = query.Or([query.Prefix("path", "/a"),
                       query.Prefix("path", "/b")])
        r = s.search(query.Term("text", "alfa"), filter=fq)
        assert [d["id"] for d in r] == [1, 4, 5]

        r = s.search(query.Term("text", "bravo"), filter=fq)
        assert [d["id"] for d in r] == [1, 2, 5, 7, ]


def test_fieldboost():
    schema = fields.Schema(id=fields.STORED, a=fields.TEXT, b=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=0, a=u("alfa bravo charlie"), b=u("echo foxtrot india"))
    w.add_document(id=1, a=u("delta bravo charlie"), b=u("alfa alfa alfa"))
    w.add_document(id=2, a=u("alfa alfa alfa"), b=u("echo foxtrot india"))
    w.add_document(id=3, a=u("alfa sierra romeo"), b=u("alfa tango echo"))
    w.add_document(id=4, a=u("bravo charlie delta"), b=u("alfa foxtrot india"))
    w.add_document(id=5, a=u("alfa alfa echo"), b=u("tango tango tango"))
    w.add_document(id=6, a=u("alfa bravo echo"), b=u("alfa alfa tango"))
    w.commit()

    def field_booster(fieldname, factor=2.0):
        "Returns a function which will boost the given field in a query tree"
        def booster_fn(obj):
            if obj.is_leaf() and obj.field() == fieldname:
                obj = copy.deepcopy(obj)
                obj.boost *= factor
                return obj
            else:
                return obj
        return booster_fn

    with ix.searcher() as s:
        q = query.Or([query.Term("a", u("alfa")),
                      query.Term("b", u("alfa"))])
        q = q.accept(field_booster("a", 100.0))
        assert text_type(q) == text_type("(a:alfa^100.0 OR b:alfa)")
        r = s.search(q)
        assert [hit["id"] for hit in r] == [2, 5, 6, 3, 0, 1, 4]


def test_andmaybe_quality():
    schema = fields.Schema(id=fields.STORED, title=fields.TEXT(stored=True),
                           year=fields.NUMERIC)
    ix = RamStorage().create_index(schema)

    domain = [(u('Alpha Bravo Charlie Delta'), 2000),
              (u('Echo Bravo Foxtrot'), 2000), (u('Bravo Golf Hotel'), 2002),
              (u('Bravo India'), 2002), (u('Juliet Kilo Bravo'), 2004),
              (u('Lima Bravo Mike'), 2004)]
    w = ix.writer()
    for title, year in domain:
        w.add_document(title=title, year=year)
    w.commit()

    with ix.searcher() as s:
        qp = qparser.QueryParser("title", ix.schema)
        q = qp.parse(u("title:bravo ANDMAYBE year:2004"))

        titles = [hit["title"] for hit in s.search(q, limit=None)[:2]]
        assert "Juliet Kilo Bravo" in titles

        titles = [hit["title"] for hit in s.search(q, limit=2)]
        assert "Juliet Kilo Bravo" in titles


def test_collect_limit():
    schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id="a", text=u("alfa bravo charlie delta echo"))
    w.add_document(id="b", text=u("bravo charlie delta echo foxtrot"))
    w.add_document(id="c", text=u("charlie delta echo foxtrot golf"))
    w.add_document(id="d", text=u("delta echo foxtrot golf hotel"))
    w.add_document(id="e", text=u("echo foxtrot golf hotel india"))
    w.commit()

    with ix.searcher() as s:
        r = s.search(query.Term("text", u("golf")), limit=10)
        assert len(r) == 3
        count = 0
        for _ in r:
            count += 1
        assert count == 3

    w = ix.writer()
    w.add_document(id="f", text=u("foxtrot golf hotel india juliet"))
    w.add_document(id="g", text=u("golf hotel india juliet kilo"))
    w.add_document(id="h", text=u("hotel india juliet kilo lima"))
    w.add_document(id="i", text=u("india juliet kilo lima mike"))
    w.add_document(id="j", text=u("juliet kilo lima mike november"))
    w.commit(merge=False)

    with ix.searcher() as s:
        r = s.search(query.Term("text", u("golf")), limit=20)
        assert len(r) == 5
        count = 0
        for _ in r:
            count += 1
        assert count == 5


def test_scorer():
    schema = fields.Schema(key=fields.TEXT(stored=True))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(key=u("alfa alfa alfa"))
    w.add_document(key=u("alfa alfa alfa alfa"))
    w.add_document(key=u("alfa alfa"))
    w.commit()
    w = ix.writer()
    w.add_document(key=u("alfa alfa alfa alfa alfa alfa"))
    w.add_document(key=u("alfa"))
    w.add_document(key=u("alfa alfa alfa alfa alfa"))
    w.commit(merge=False)

#    dw = scoring.DebugModel()
#    s = ix.searcher(weighting=dw)
#    r = s.search(query.Term("key", "alfa"))
#    log = dw.log
#    assert log, [('key', 'alfa', 0, 3.0, 3),
#                       ('key', 'alfa', 1, 4.0, 4),
#                       ('key', 'alfa', 2, 2.0, 2),
#                       ('key', 'alfa', 0, 6.0, 6),
#                       ('key', 'alfa', 1, 1.0, 1),
#                       ('key', 'alfa', 2, 5.0, 5)])


def test_pos_scorer():
    ana = analysis.SimpleAnalyzer()
    schema = fields.Schema(id=fields.STORED, key=fields.TEXT(analyzer=ana))
    ix = RamStorage().create_index(schema)
    w = ix.writer()
    w.add_document(id=0, key=u("0 0 1 0 0 0"))
    w.add_document(id=1, key=u("0 0 0 1 0 0"))
    w.add_document(id=2, key=u("0 1 0 0 0 0"))
    w.commit()
    w = ix.writer()
    w.add_document(id=3, key=u("0 0 0 0 0 1"))
    w.add_document(id=4, key=u("1 0 0 0 0 0"))
    w.add_document(id=5, key=u("0 0 0 0 1 0"))
    w.commit(merge=False)

    def pos_score_fn(searcher, fieldname, text, matcher):
        poses = matcher.value_as("positions")
        return 1.0 / (poses[0] + 1)
    pos_weighting = scoring.FunctionWeighting(pos_score_fn)

    s = ix.searcher(weighting=pos_weighting)
    r = s.search(query.Term("key", "1"))
    assert [hit["id"] for hit in r] == [4, 2, 0, 1, 5, 3]


# def test_too_many_prefix_positions():
#     schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
#     ix = RamStorage().create_index(schema)
#     with ix.writer() as w:
#         for i in xrange(200):
#             text = u("a%s" % i)
#             w.add_document(id=i, text=text)
#
#     q = query.Prefix("text", u("a"))
#     q.TOO_MANY_CLAUSES = 100
#
#     with ix.searcher() as s:
#         m = q.matcher(s)
#         assert m.supports("positions")
#         items = list(m.items_as("positions"))
#         assert [(i, [0]) for i in xrange(200)] == items


def test_collapse():
    from whoosh import collectors

    # id, text, size, tag
    domain = [("a", "blah blah blah", 5, "x"),
              ("b", "blah", 3, "y"),
              ("c", "blah blah blah blah", 2, "z"),
              ("d", "blah blah", 4, "x"),
              ("e", "bloop", 1, "-"),
              ("f", "blah blah blah blah blah", 6, "x"),
              ("g", "blah", 8, "w"),
              ("h", "blah blah", 7, "=")]

    schema = fields.Schema(id=fields.STORED, text=fields.TEXT,
                           size=fields.NUMERIC,
                           tag=fields.KEYWORD(sortable=True))
    ix = RamStorage().create_index(schema)
    with ix.writer(codec=W3Codec()) as w:
        for id, text, size, tag in domain:
            w.add_document(id=u(id), text=u(text), size=size, tag=u(tag))

    with ix.searcher() as s:
        q = query.Term("text", "blah")
        r = s.search(q, limit=None)
        assert " ".join(hit["id"] for hit in r) == "f c a d h b g"

        col = s.collector(limit=3)
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(q, col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "f c h"

        col = s.collector(limit=None)
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(q, col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "f c h b g"

        r = s.search(query.Every(), sortedby="size")
        assert " ".join(hit["id"] for hit in r) == "e c b d a f h g"

        col = s.collector(sortedby="size")
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(query.Every(), col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "e c b d h g"


def test_collapse_nocolumn():
    from whoosh import collectors

    # id, text, size, tag
    domain = [("a", "blah blah blah", 5, "x"),
              ("b", "blah", 3, "y"),
              ("c", "blah blah blah blah", 2, "z"),
              ("d", "blah blah", 4, "x"),
              ("e", "bloop", 1, "-"),
              ("f", "blah blah blah blah blah", 6, "x"),
              ("g", "blah", 8, "w"),
              ("h", "blah blah", 7, "=")]

    schema = fields.Schema(id=fields.STORED, text=fields.TEXT,
                           size=fields.NUMERIC,
                           tag=fields.KEYWORD)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        for id, text, size, tag in domain:
            w.add_document(id=u(id), text=u(text), size=size, tag=u(tag))

    with ix.searcher() as s:
        q = query.Term("text", "blah")
        r = s.search(q, limit=None)
        assert " ".join(hit["id"] for hit in r) == "f c a d h b g"

        col = s.collector(limit=3)
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(q, col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "f c h"

        col = s.collector(limit=None)
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(q, col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "f c h b g"

        r = s.search(query.Every(), sortedby="size")
        assert " ".join(hit["id"] for hit in r) == "e c b d a f h g"

        col = s.collector(sortedby="size")
        col = collectors.CollapseCollector(col, "tag")
        s.search_with_collector(query.Every(), col)
        r = col.results()
        assert " ".join(hit["id"] for hit in r) == "e c b d h g"


def test_collapse_length():
    domain = u("alfa apple agnostic aplomb arc "
               "bravo big braid beer "
               "charlie crouch car "
               "delta dog "
               "echo "
               "foxtrot fold flip "
               "golf gym goop"
               ).split()

    schema = fields.Schema(key=fields.ID(sortable=True),
                           word=fields.ID(stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer(codec=W3Codec()) as w:
        for word in domain:
            w.add_document(key=word[0], word=word)

    with ix.searcher() as s:
        q = query.Every()

        def check(r):
            words = " ".join(hit["word"] for hit in r)
            assert words == "alfa bravo charlie delta echo foxtrot golf"
            assert r.scored_length() == 7
            assert len(r) == 7

        r = s.search(q, collapse="key", collapse_limit=1, limit=None)
        check(r)

        r = s.search(q, collapse="key", collapse_limit=1, limit=50)
        check(r)

        r = s.search(q, collapse="key", collapse_limit=1, limit=10)
        check(r)


def test_collapse_length_nocolumn():
    domain = u("alfa apple agnostic aplomb arc "
               "bravo big braid beer "
               "charlie crouch car "
               "delta dog "
               "echo "
               "foxtrot fold flip "
               "golf gym goop"
               ).split()

    schema = fields.Schema(key=fields.ID(),
                           word=fields.ID(stored=True))
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        for word in domain:
            w.add_document(key=word[0], word=word)

    with ix.searcher() as s:
        q = query.Every()

        def check(r):
            words = " ".join(hit["word"] for hit in r)
            assert words == "alfa bravo charlie delta echo foxtrot golf"
            assert r.scored_length() == 7
            assert len(r) == 7

        r = s.search(q, collapse="key", collapse_limit=1, limit=None)
        check(r)

        r = s.search(q, collapse="key", collapse_limit=1, limit=50)
        check(r)

        r = s.search(q, collapse="key", collapse_limit=1, limit=10)
        check(r)


def test_collapse_order():
    from whoosh import sorting

    schema = fields.Schema(id=fields.STORED,
                           price=fields.NUMERIC(sortable=True),
                           rating=fields.NUMERIC(sortable=True),
                           tag=fields.ID(sortable=True))
    ix = RamStorage().create_index(schema)
    with ix.writer(codec=W3Codec()) as w:
        w.add_document(id="a", price=10, rating=1, tag=u("x"))
        w.add_document(id="b", price=80, rating=3, tag=u("y"))
        w.add_document(id="c", price=60, rating=1, tag=u("z"))
        w.add_document(id="d", price=30, rating=2)
        w.add_document(id="e", price=50, rating=3, tag=u("x"))
        w.add_document(id="f", price=20, rating=1, tag=u("y"))
        w.add_document(id="g", price=50, rating=2, tag=u("z"))
        w.add_document(id="h", price=90, rating=5)
        w.add_document(id="i", price=50, rating=5, tag=u("x"))
        w.add_document(id="j", price=40, rating=1, tag=u("y"))
        w.add_document(id="k", price=50, rating=4, tag=u("z"))
        w.add_document(id="l", price=70, rating=2)

    with ix.searcher() as s:
        def check(kwargs, target):
            r = s.search(query.Every(), limit=None, **kwargs)
            assert " ".join(hit["id"] for hit in r) == target

        price = sorting.FieldFacet("price", reverse=True)
        rating = sorting.FieldFacet("rating", reverse=True)
        tag = sorting.FieldFacet("tag")

        check(dict(sortedby=price), "h b l c e g i k j d f a")
        check(dict(sortedby=price, collapse=tag), "h b l c e d")
        check(dict(sortedby=price, collapse=tag, collapse_order=rating),
              "h b l i k d")


def test_collapse_order_nocolumn():
    from whoosh import sorting

    schema = fields.Schema(id=fields.STORED,
                           price=fields.NUMERIC(),
                           rating=fields.NUMERIC(),
                           tag=fields.ID())
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(id="a", price=10, rating=1, tag=u("x"))
        w.add_document(id="b", price=80, rating=3, tag=u("y"))
        w.add_document(id="c", price=60, rating=1, tag=u("z"))
        w.add_document(id="d", price=30, rating=2)
        w.add_document(id="e", price=50, rating=3, tag=u("x"))
        w.add_document(id="f", price=20, rating=1, tag=u("y"))
        w.add_document(id="g", price=50, rating=2, tag=u("z"))
        w.add_document(id="h", price=90, rating=5)
        w.add_document(id="i", price=50, rating=5, tag=u("x"))
        w.add_document(id="j", price=40, rating=1, tag=u("y"))
        w.add_document(id="k", price=50, rating=4, tag=u("z"))
        w.add_document(id="l", price=70, rating=2)

    with ix.searcher() as s:
        def check(kwargs, target):
            r = s.search(query.Every(), limit=None, **kwargs)
            assert " ".join(hit["id"] for hit in r) == target

        price = sorting.FieldFacet("price", reverse=True)
        rating = sorting.FieldFacet("rating", reverse=True)
        tag = sorting.FieldFacet("tag")

        check(dict(sortedby=price), "h b l c e g i k j d f a")
        check(dict(sortedby=price, collapse=tag), "h b l c e d")
        check(dict(sortedby=price, collapse=tag, collapse_order=rating),
              "h b l i k d")


def test_coord():
    from whoosh.matching import CoordMatcher

    schema = fields.Schema(id=fields.STORED, hits=fields.STORED,
                           tags=fields.KEYWORD)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(id=0, hits=0, tags=u("blah blah blah blah"))
        w.add_document(id=1, hits=0, tags=u("echo echo blah blah"))
        w.add_document(id=2, hits=1, tags=u("bravo charlie delta echo"))
        w.add_document(id=3, hits=2, tags=u("charlie delta echo foxtrot"))
        w.add_document(id=4, hits=3, tags=u("delta echo foxtrot golf"))
        w.add_document(id=5, hits=3, tags=u("echo foxtrot golf hotel"))
        w.add_document(id=6, hits=2, tags=u("foxtrot golf hotel india"))
        w.add_document(id=7, hits=1, tags=u("golf hotel india juliet"))
        w.add_document(id=8, hits=0, tags=u("foxtrot foxtrot foo foo"))
        w.add_document(id=9, hits=0, tags=u("foo foo foo foo"))

    og = qparser.OrGroup.factory(0.99)
    qp = qparser.QueryParser("tags", schema, group=og)
    q = qp.parse("golf foxtrot echo")
    assert q.__class__ == query.Or
    assert q.scale == 0.99

    with ix.searcher() as s:
        m = q.matcher(s)
        assert type(m) == CoordMatcher

        r = s.search(q, optimize=False)
        assert [hit["id"] for hit in r] == [4, 5, 3, 6, 1, 8, 2, 7]


def test_keyword_search():
    schema = fields.Schema(tags=fields.KEYWORD)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(tags=u("keyword1 keyword2 keyword3 keyword4 keyword5"))

    with ix.searcher() as s:
        r = s.search_page(query.Term("tags", "keyword3"), 1)
        assert r


def test_groupedby_with_terms():
    schema = fields.Schema(content=fields.TEXT, organism=fields.ID)
    ix = RamStorage().create_index(schema)

    with ix.writer() as w:
        w.add_document(organism=u("mus"), content=u("IPFSTD1 IPFSTD_kdwq134 Kaminski-all Study00:00:00"))
        w.add_document(organism=u("mus"), content=u("IPFSTD1 IPFSTD_kdwq134 Kaminski-all Study"))
        w.add_document(organism=u("hs"), content=u("This is the first document we've added!"))

    with ix.searcher() as s:
        q = qparser.QueryParser("content", schema=ix.schema).parse(u("IPFSTD1"))
        r = s.search(q, groupedby=["organism"], terms=True)
        assert len(r) == 2
        assert r.groups("organism") == {"mus": [1, 0]}
        assert r.has_matched_terms()
        assert r.matched_terms() == set([('content', b('ipfstd1'))])


def test_score_length():
    schema = fields.Schema(a=fields.TEXT, b=fields.TEXT)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(a=u("alfa bravo charlie"))
        w.add_document(b=u("delta echo foxtrot"))
        w.add_document(a=u("golf hotel india"))

    with ix.writer() as w:
        w.merge = False
        w.add_document(b=u("juliet kilo lima"))
        # In the second segment, there is an "a" field here, but in the
        # corresponding document in the first segment, the field doesn't exist,
        # so if the scorer is getting segment offsets wrong, scoring this
        # document will error
        w.add_document(a=u("mike november oskar"))
        w.add_document(b=u("papa quebec romeo"))

    with ix.searcher() as s:
        assert not s.is_atomic()
        p = s.postings("a", "mike")
        while p.is_active():
            docnum = p.id()
            score = p.score()
            p.next()


def test_terms_with_filter():
    schema = fields.Schema(text=fields.TEXT)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(text=u("alfa bravo charlie delta"))
        w.add_document(text=u("bravo charlie delta echo"))
        w.add_document(text=u("charlie delta echo foxtrot"))
        w.add_document(text=u("delta echo foxtrot golf"))
        w.add_document(text=u("echo foxtrot golf hotel"))
        w.add_document(text=u("foxtrot golf hotel alfa"))
        w.add_document(text=u("golf hotel alfa bravo"))
        w.add_document(text=u("hotel alfa bravo charlie"))

    with ix.searcher() as s:
        workingset = set([1, 2, 3])
        q = query.Term("text", u("foxtrot"))
        r = s.search_page(q, pagenum=1, pagelen=5, terms=True,
                          filter=workingset)

        assert r.scored_length() == 2
        assert [hit.docnum for hit in r] == [2, 3]


def test_terms_to_bytes():
    schema = fields.Schema(a=fields.TEXT, b=fields.NUMERIC, id=fields.STORED)
    ix = RamStorage().create_index(schema)
    with ix.writer() as w:
        w.add_document(id=0, a=u("alfa bravo"), b=100)
        w.add_document(id=1, a=u("bravo charlie"), b=200)
        w.add_document(id=2, a=u("charlie delta"), b=100)
        w.add_document(id=3, a=u("delta echo"), b=200)

    with ix.searcher() as s:
        t1 = query.Term("b", 200)
        t2 = query.Term("a", "bravo")
        q = query.And([t1, t2])
        r = s.search(q)
        assert [hit["id"] for hit in r] == [1]


def test_issue_334():
    schema = fields.Schema(
        kind=fields.ID(stored=True),
        name=fields.ID(stored=True),
        returns=fields.ID(stored=True),
    )
    ix = RamStorage().create_index(schema)

    with ix.writer() as w:

        with w.group():
            w.add_document(kind=u('class'), name=u('Index'))
            w.add_document(kind=u('method'), name=u('add document'),
                           returns=u('void'))
            w.add_document(kind=u('method'), name=u('add reader'),
                           returns=u('void'))
            w.add_document(kind=u('method'), name=u('close'),
                           returns=u('void'))
        with w.group():
            w.add_document(kind=u('class'), name=u('Accumulator'))
            w.add_document(kind=u('method'), name=u('add'),
                           returns=u('void'))
            w.add_document(kind=u('method'), name=u('get result'),
                           returns=u('number'))
        with w.group():
            w.add_document(kind=u('class'), name=u('Calculator'))
            w.add_document(kind=u('method'), name=u('add'),
                           returns=u('number'))
            w.add_document(kind=u('method'), name=u('add all'),
                           returns=u('number'))
            w.add_document(kind=u('method'), name=u('add some'),
                           returns=u('number'))
            w.add_document(kind=u('method'), name=u('multiply'),
                           returns=u('number'))
            w.add_document(kind=u('method'), name=u('close'),
                           returns=u('void'))
        with w.group():
            w.add_document(kind=u('class'), name=u('Deleter'))
            w.add_document(kind=u('method'), name=u('add'),
                           returns=u('void'))
            w.add_document(kind=u('method'), name=u('delete'),
                           returns=u('void'))

    with ix.searcher() as s:
        pq = query.Term('kind', 'class')
        cq = query.Term('name', 'Calculator')

        q = query.NestedChildren(pq, cq) & query.Term('returns', 'void')
        r = s.search(q)
        assert len(r) == 1
        assert r[0]["name"] == u("close")


def test_find_decimals():
    from decimal import Decimal

    schema = fields.Schema(name=fields.KEYWORD(stored=True),
                           num=fields.NUMERIC(Decimal, decimal_places=5))
    ix = RamStorage().create_index(schema)

    with ix.writer() as w:
        w.add_document(name=u("alfa"), num=Decimal("1.5"))
        w.add_document(name=u("bravo"), num=Decimal("2.1"))
        w.add_document(name=u("charlie"), num=Decimal("5.3"))
        w.add_document(name=u("delta"), num=Decimal(3))
        w.add_document(name=u("echo"), num=Decimal("3.00001"))
        w.add_document(name=u("foxtrot"), num=Decimal("3"))

    qp = qparser.QueryParser("name", ix.schema)
    q = qp.parse("num:3.0")
    assert isinstance(q, query.Term)

    with ix.searcher() as s:
        r = s.search(q)
        names = " ".join(sorted(hit["name"] for hit in r))
        assert names == "delta foxtrot"


def test_limit_scores():
    domain = u"alfa bravo charlie delta echo foxtrot golf".split()

    schema = fields.Schema(desc=fields.TEXT, parent=fields.KEYWORD(stored=True))
    with TempIndex(schema) as ix:
        with ix.writer() as w:
            count = 0
            for words in permutations(domain, 4):
                count += 1
                w.add_document(desc=u" ".join(words), parent=text_type(count))

        with ix.searcher() as s:
            q = query.And([
                query.Term("desc", u"delta", boost=30.0),
                query.Term("parent", u"545")
            ])
            r = s.search(q, limit=500)
            assert r.scored_length() == 1
            limited_score = r[0].score

            r = s.search(q, limit=None)
            assert r.scored_length() == 1
            unlimited_score = r[0].score

            assert limited_score == unlimited_score


def test_function_weighting():
    def pos_score_fn(searcher, fieldname, text, matcher):
        spans = matcher.spans()
        return 1.0 / (spans[0].start + 1)

    pos_weighting = scoring.FunctionWeighting(pos_score_fn)

    schema = fields.Schema(id=fields.STORED, text=fields.TEXT)

    with TempIndex(schema) as ix:
        with ix.writer() as w:
            w.add_document(id="a", text=u"aa bb")
            w.add_document(id="b", text=u"bb aa bb")
            w.add_document(id="c", text=u"bb bb aa bb")
            w.add_document(id="d", text=u"bb bb bb aa bb")
            w.add_document(id="e", text=u"bb bb bb bb aa bb")
            w.add_document(id="f", text=u"bb bb bb bb bb aa bb")

        with ix.writer() as w:
            w.add_document(id="g", text=u"aa bb")
            w.add_document(id="h", text=u"bb aa bb")
            w.add_document(id="i", text=u"bb bb aa bb")
            w.add_document(id="j", text=u"bb bb bb aa bb")
            w.add_document(id="k", text=u"bb bb bb bb aa bb")
            w.add_document(id="l", text=u"bb bb bb bb bb aa bb")

        with ix.writer() as w:
            w.add_document(id="m", text=u"aa bb")
            w.add_document(id="n", text=u"bb aa bb")
            w.add_document(id="o", text=u"bb bb aa bb")
            w.add_document(id="p", text=u"bb bb bb aa bb")
            w.add_document(id="q", text=u"bb bb bb bb aa bb")
            w.add_document(id="r", text=u"bb bb bb bb bb aa bb")

        with ix.writer() as w:
            w.add_document(id="s", text=u"aa bb")
            w.add_document(id="t", text=u"bb aa bb")
            w.add_document(id="u", text=u"bb bb aa bb")
            w.add_document(id="v", text=u"bb bb bb aa bb")
            w.add_document(id="w", text=u"bb bb bb bb aa bb")
            w.add_document(id="x", text=u"bb bb bb bb bb aa bb")

        with ix.searcher(weighting=pos_weighting) as s:
            q = query.Term("text", "aa")
            m = q.matcher(s, s.context())
            assert not m.supports_block_quality()

            r = s.search(q, limit=5)
            ids = "".join(([hit["id"] for hit in r]))
            assert ids == "agmsb"

            q = query.Or([query.Term("text", "aa"), query.Term("text", "bb")],
                         scale=2.0)
            m = q.matcher(s, s.context())
            assert not m.supports_block_quality()





