1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import pytest
import elasticsearch
import elasticsearch.helpers
from ..test_otel import setup_tracing
pytestmark = [
pytest.mark.skipif(
"TEST_WITH_OTEL" not in os.environ, reason="TEST_WITH_OTEL is not set"
),
pytest.mark.otel,
]
def test_otel_end_to_end(sync_client):
tracer, memory_exporter = setup_tracing()
sync_client._otel.tracer = tracer
resp = sync_client.search(index="logs-*", query={"match_all": {}})
assert resp.meta.status == 200
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
assert spans[0].name == "search"
expected_attributes = {
"http.request.method": "POST",
"db.system.name": "elasticsearch",
"db.operation.name": "search",
"db.operation.parameter.index": "logs-*",
}
# Assert expected atttributes are here, but allow other attributes too
# to make this test robust to elastic-transport changes
assert expected_attributes.items() <= spans[0].attributes.items()
@pytest.mark.parametrize(
"bulk_helper_name", ["bulk", "streaming_bulk", "parallel_bulk"]
)
def test_otel_bulk(sync_client, elasticsearch_url, bulk_helper_name):
tracer, memory_exporter = setup_tracing()
# Create a new client with our tracer
sync_client = sync_client.options()
sync_client._otel.tracer = tracer
# "Disable" options to keep our custom tracer
sync_client.options = lambda: sync_client
docs = [{"answer": x, "helper": bulk_helper_name, "_id": x} for x in range(10)]
bulk_function = getattr(elasticsearch.helpers, bulk_helper_name)
if bulk_helper_name == "bulk":
success, failed = bulk_function(
sync_client, docs, index="test-index", chunk_size=2, refresh=True
)
assert success, failed == (5, 0)
else:
for ok, resp in bulk_function(
sync_client, docs, index="test-index", chunk_size=2, refresh=True
):
assert ok is True
memory_exporter.shutdown()
assert 10 == sync_client.count(index="test-index")["count"]
assert {"answer": 4, "helper": bulk_helper_name} == sync_client.get(
index="test-index", id=4
)["_source"]
spans = list(memory_exporter.get_finished_spans())
parent_span = spans.pop()
assert parent_span.name == f"helpers.{bulk_helper_name}"
assert parent_span.attributes == {
"db.system.name": "elasticsearch",
"db.operation.name": f"helpers.{bulk_helper_name}",
"http.request.method": "null",
}
assert len(spans) == 5
for span in spans:
assert span.name == "bulk"
expected_attributes = {
"http.request.method": "PUT",
"db.system.name": "elasticsearch",
"db.operation.name": "bulk",
"db.operation.parameter.index": "test-index",
}
# Assert expected atttributes are here, but allow other attributes too
# to make this test robust to elastic-transport changes
assert expected_attributes.items() <= spans[0].attributes.items()
assert span.parent.trace_id == parent_span.context.trace_id
|