# Query with large result sets

This sample notebook demonstrates how to query large amounts of data using the Azure Monitor Query client library.

Due to Log Analytics [service limits](https://learn.microsoft.com/azure/azure-monitor/service-limits#la-query-api), sometimes it may not be possible to retrieve all the expected data in a single query. For example, the number of rows returned or the maximum size of the data returned may exceed the stated limits. One approach for overcoming these limits is to split the queries into multiple smaller queries using different time ranges.

In this notebook, you will learn how your data in a Log Analytics workspace can first be queried to determine the time ranges that can be used to split the data retrieval into multiple smaller queries without exceeding the service limits. Then, you will asynchronously execute the smaller queries and output the results to separate files which can be used for further processing or analysis. Afterwards, this notebook also shows how to export the data to an [Azure Data Lake Storage (ADLS)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-introduction) account.

**Disclaimer**: This approach of splitting data retrieval into multiple smaller queries is good when:
   1. Dealing with a few GB or a few millions of records per hour. For larger data sets, [exporting](https://learn.microsoft.com/azure/azure-monitor/logs/logs-data-export) is recommended.
   2. The data retrieval query only uses simple data retrieval operators outlined [here](https://learn.microsoft.com/azure/azure-monitor/logs/basic-logs-query?tabs=portal-1#kql-language-limits).

## Getting started

For this notebook, it is assumed that you have an existing Azure subscription and an active [Log Analytics workspace](https://learn.microsoft.com/azure/azure-monitor/logs/log-analytics-workspace-overview) that contains at least one table populated with data.

Start by installing the Azure Monitor Query and Azure Identity client libraries along with the `pandas` data analysis library.

In [None]:
import sys

!{sys.executable} -m pip install --upgrade azure-monitor-query azure-identity pandas


### Setup

An authenticated client is required to query data from Log Analytics. The following code shows how to create a `LogsQueryClient` using the `DefaultAzureCredential`. Note that an async credential and client are used.

In [None]:
from azure.identity.aio import DefaultAzureCredential
from azure.monitor.query.aio import LogsQueryClient

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)


#### Set Log Analytics workspace ID

Set the `LOGS_WORKSPACE_ID` variable below to the ID of your Log Analytics workspace.

In [None]:
LOGS_WORKSPACE_ID = "<workspace_id>"


#### Define helper functions

In order to overcome the service limits, the strategy is to query data in smaller chunks based on some time column (i.e. `TimeGenerated`). The following helper functions are useful for this by querying your data in order to find suitable start and end times for the batch queries.

- The `get_batch_endpoints_by_row_count` function will return a list of times that can be used in the query time spans while ensuring that the number of rows returned will be less than the specified row limit. 
- The `get_batch_endpoints_by_size` function will return a list of times that can be used in the query time spans while ensuring that the size of the data returned is less than the specified byte size limit.

In [None]:
from datetime import datetime, timedelta

import pandas as pd

from azure.core.exceptions import HttpResponseError
from azure.monitor.query import LogsQueryPartialResult, LogsQueryStatus


async def get_batch_endpoints_by_row_count(
    query: str,
    end_time: datetime,
    days_back: int,
    max_rows_per_query: int = int(1e5),
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query
    such that number of rows returned by each query is (approximately) `max_rows_per_query`
    """

    # This query will assign a batch number to each row depending on the maximum number of rows per query.
    # Then the earliest timestamp for each batch number is used for each query endpoint.
    find_batch_endpoints_query = f"""
        {query}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(1) / {max_rows_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """

    start_time = end_time - timedelta(days=days_back)
    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=find_batch_endpoints_query,
            timespan=(start_time, end_time),
        )
    except HttpResponseError as e:
        print("Error batching endpoints by row count")
        raise e

    if response.status == LogsQueryStatus.PARTIAL:
        raise Exception(f"Error batching endpoints by data size: {response.partial_error}")

    batch_endpoints = [end_time]
    batch_endpoints += [row[0] for row in response.tables[0].rows]
    return batch_endpoints


async def get_batch_endpoints_by_byte_size(
    query: str,
    end_time: datetime,
    days_back: int,
    max_bytes_per_query: int = 100 * 1024 * 1024, # 100 MiB
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query such that
    the size of the data returned is less than `max_bytes_per_query`.
    """

    # This query will assign a batch number to each row depending on the estimated data size.
    # Then the earliest timestamp for each batch number is used for each query endpoint.
    find_batch_endpoints_query = f"""
        {query}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(estimate_data_size(*)) / {max_bytes_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """

    start_time = end_time - timedelta(days=days_back)
    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=find_batch_endpoints_query,
            timespan=(start_time, end_time)
        )
    except HttpResponseError as e:
        print("Error batching endpoints by byte size")
        raise e

    if response.status == LogsQueryStatus.PARTIAL:
        raise Exception(f"Error batching endpoints by byte size: {response.partial_error}")

    batch_endpoints = [end_time]
    batch_endpoints += [row[0] for row in response.tables[0].rows]
    return batch_endpoints


Next, define a function that will asynchronously execute a given query over a time range specified by a given start time and end time. This function will call the `query_workspace` method of the `LogsQueryClient`. The Azure Monitor Query library will automatically handle retries in case of connection-related errors or server errors (i.e. 500, 503, and 504 status codes). Check [here](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/core/azure-core#configurations) for more information on configuring retries.

In [None]:
async def execute_query(
    query: str,
    start_time: datetime,
    end_time: datetime,
    *,
    query_id: str = "",
    correlation_request_id: str = "",
):
    """
    Asynchronously execute the given query, restricted to the given time range, and parse the API response.

    :param str query: The query to execute.
    :param datetime start_time: The start of the time range to query.
    :param datetime end_time: The end of the time range to query.
    :keyword str query_id: Optional identifier for the query, used for printing.
    :keyword str correlation_request_id, Optional correlation ID to use in the query headers for tracing.
    """
    headers = {}
    if correlation_request_id:
        headers["x-ms-correlation-request-id"] = correlation_request_id

    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=query,
            timespan=(start_time, end_time),
            server_timeout=360,
            include_statistics=False, # Can be used for debugging.
            headers=headers,
            retry_on_methods=["POST"]
        )
    except HttpResponseError as e:
        print(f"Error when attempting query {query_id} (query time span: {start_time} to {end_time}):\n\t", e)
        return []

    if response.status == LogsQueryStatus.SUCCESS:
        print(f"Query {query_id} successful (query time span: {start_time} to {end_time}). Row count: {len(response.tables[0].rows)}")
        return response.tables[0]
    else:
        # This will be a LogsQueryPartialResult.
        error = response.partial_error
        print(f"Partial results returned for query {query_id} (query time span: {start_time} to {end_time}):\n\t", error)
        return response.partial_data[0]


## Query data

With the helper functions defined, you can now query the data in chunks that won't hit the row count and data size service limits.

### Set variables

Before running the queries, some variables will need to be configured.

- `QUERY` - KQL query to run. Change the table name and specify any required columns and filters as needed. When constructing this query, the recommendation is to use [reduced KQL](https://learn.microsoft.com/azure/azure-monitor/logs/basic-logs-query?tabs=portal-1#kql-language-limits) which are optimized for data retrieval. To get all rows/columns, just set `QUERY = <name-of-table>`. 
- `END_TIME` - End of the time range to query over.
- `DAYS_BACK` - The number of days to go back from the end time. For example, if `END_TIME = datetime.now()` and `DAYS_BACK = 7`, the query will return data from the last 7 days. Note that fetched data will (initially) be stored in memory on your system, so it is possible to run into memory limitations if the query returns a large amount of data. If this issue is encountered, consider querying the data in time segments. For example, instead of querying 365 days of data at once, query 100 days of data at a time by setting the values of `END_TIME` and `DAYS_BACK` appropriately and re-running the notebook from this cell onwards for each separate segment.
- `MAX_ROWS_PER_QUERY` - The max number of rows that is returned from a single query. This is defaulted to the service limit of 500,000 rows multiplied by some factor to allow for some wiggle room. This limit may sometimes be exceeded if many entries share the same timestamp.
- `MAX_BYTES_PER_QUERY` - The max size in bytes of data returned from a single query. This is defaulted to the service limit of 100 MiB multiplied by some factor to allow for some wiggle room.
- `MAX_CONCURRENT_QUERIES` - The max number of concurrent queries to run at once. This is defaulted to 5. Reducing this can help avoid errors due to rate limits.
- `OUTPUT_DIRECTORY` - The directory where the query results will be stored. This is defaulted to "./query_results".
- `OUTPUT_FILE_PREFIX` - The prefix of each output file name. This is defaulted to "query_results".

In [None]:
# EDIT THIS VALUE WITH YOUR QUERY.
# If necessary, add a KQL `project` operator or any filtering operators to limit the number of rows returned.
QUERY = "AppRequests"

# Use the current time in the system's local timezone as the end time.
END_TIME = datetime.now().astimezone()

# If you want to use a different end time, uncomment the following line and adjust as needed.
# END_TIME = datetime.strptime("2023-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")

DAYS_BACK = 90

MAX_ROWS_PER_QUERY_SERVICE_LIMIT = int(5e5)  # 500K
MAX_ROWS_PER_QUERY = int(MAX_ROWS_PER_QUERY_SERVICE_LIMIT * 0.9)

MAX_BYTES_PER_QUERY_SERVICE_LIMIT = 100 * 1024 * 1024
MAX_BYTES_PER_QUERY = int(MAX_BYTES_PER_QUERY_SERVICE_LIMIT * 0.6) # 64 MB of compressed data is the limit. This ensures we stay under that.

MAX_CONCURRENT_QUERIES = 5

OUTPUT_DIRECTORY = "./query_results"
OUTPUT_FILE_PREFIX = "query_results"


### Estimate data and costs (optional)

Before running the chunked queries, it might first be prudent to estimate the size of the data if planning on exporting the query results to another service. The below cell defines another helper function that can be used to estimate the size of the data.

In [None]:
async def estimate_data_size(query: str, end_time: datetime, days_back: int):
    query = f"{query} | summarize n_rows = count(), estimate_data_size = sum(estimate_data_size(*))"
    start_time = end_time - timedelta(days=days_back)
    response = await client.query_workspace(
        workspace_id=LOGS_WORKSPACE_ID,
        query=query,
        timespan=(start_time, end_time),
    )

    if response.status == LogsQueryStatus.PARTIAL:
        raise Exception(f"Error estimating data size: {response.partial_error}")

    columns = response.tables[0].columns
    rows = response.tables[0].rows
    df = pd.DataFrame(data=rows, columns=columns)
    return df


Now, run the following cell to estimate the size of the data that will be returned by the queries. Note that this is just an estimate and the actual size may vary slightly. This information can be used in conjunction with the Azure storage [pricing calculator](https://azure.microsoft.com/pricing/calculator/?service=storage) to determine costs that will be incurred for your storage setup. If using Azure Data Lake Storage Gen2, full billing details can be found [here](https://azure.microsoft.com/pricing/details/storage/data-lake/).

In [None]:
data_size_df = await estimate_data_size(QUERY, END_TIME, DAYS_BACK)
data_size_df["estimate_data_size_MB"] = data_size_df["estimate_data_size"] / (1000 **2)
data_size_df["estimate_data_size_GB"] = data_size_df["estimate_data_size_MB"] / 1000
data_size_df


### Fetch log data

Use the helper functions to create an async wrapper function that will query the data in chunks using the variables defined above.

In order to be memory efficient, it's better to process data as it is returned from the queries instead of storing all the data in memory. Inside `fetch_logs` below, there is a section of code where you can add code to process the data on the fly. 

By default, all the queried data will be written to binary pickle files in the directory defined by `OUTPUT_DIRECTORY`. Each file will contain the results of a single query, and each file name will be be prefixed with the value of `OUTPUT_FILE_PREFIX`.

There is also a section inside `process_logs` where custom code for mutating the data in each chunk's DataFrame can be added.

In [None]:
import asyncio
import os

# Limit the number of concurrent queries.
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)

async def fetch_logs(query: str, start_time: datetime, end_time: datetime, query_id: str, correlation_request_id: str):
    async with semaphore:
        response = await execute_query(query, start_time, end_time, query_id=query_id, correlation_request_id=correlation_request_id)
        # Do some post-processing on the response and get the final DataFrame.
        df = await process_logs(response)
        if df is not None and not df.empty:
            # Can do something with the DataFrame here. For example, write to a file, insert into a database, etc.

            # ADD YOUR CUSTOM CODE HERE. Remember to remove/uncomment the following line as needed.
            write_to_file(df, query_id)

            # Return the number of rows in the DataFrame.
            return len(df)
        return 0


async def process_logs(response):
    if response:
        df = pd.DataFrame(data=response.rows, columns=response.columns)

        # Can modify/mutate the DataFrame here.
        return df
    return None


def write_to_file(df, query_id):
    path = os.path.join(OUTPUT_DIRECTORY, f"{OUTPUT_FILE_PREFIX}_{query_id}.pkl")
    df.to_pickle(path)


In [None]:
import uuid


async def run():
    # Below, we combine the endpoints retrieved from both endpoint methods to ensure that the number of rows
    # and the size of the data returned are both within the limits.
    # Worst case performance is double the theoretical minimum number of queries necessary.
    print("Calculating batch endpoints...")
    row_batch_endpoints = await get_batch_endpoints_by_row_count(QUERY, END_TIME, days_back=DAYS_BACK, max_rows_per_query=MAX_ROWS_PER_QUERY)
    byte_batch_endpoints = await get_batch_endpoints_by_byte_size(QUERY, END_TIME, days_back=DAYS_BACK, max_bytes_per_query=MAX_BYTES_PER_QUERY)
    batch_endpoints = sorted(set(byte_batch_endpoints + row_batch_endpoints), reverse=True)

    print("Clearing output directory...")
    if os.path.exists(OUTPUT_DIRECTORY):
        for filename in os.listdir(OUTPUT_DIRECTORY):
            if filename.startswith(OUTPUT_FILE_PREFIX):
                os.remove(os.path.join(OUTPUT_DIRECTORY, filename))
    else:
        os.makedirs(OUTPUT_DIRECTORY)

    if len(batch_endpoints) == 1:
        print(f"No data with time generated earlier than {batch_endpoints[0]} was found in the queried data. "
              "Verify that the query is correct and that the data exists in your specified time range.")
        print(f"Will attempt to query the data with the start and end time both set to {batch_endpoints[0]}. This may fail if "
              "the data exceeds API limits. Another field to split on in the query may be necessary.")
        batch_endpoints.append(batch_endpoints[0])

    queries = []
    end_time = batch_endpoints[0]
    correlation_request_id = str(uuid.uuid4())

    print(f"Querying {len(batch_endpoints) - 1} time ranges, from {batch_endpoints[-1]} to {end_time}")
    print(f"Correlation request ID: {correlation_request_id}")


    for i in range(1, len(batch_endpoints)):
        start_time = batch_endpoints[i]
        queries.append(fetch_logs(QUERY, start_time, end_time, query_id=str(i), correlation_request_id=correlation_request_id))
        end_time = start_time - timedelta(microseconds=1) # Subtract 1 microsecond to avoid overlap between queries.

    counts = await asyncio.gather(*queries)
    # Return total number of rows retrieved across all queries.
    return sum(counts)


Now, go ahead and run the following cell to fetch the data. Note that this may take some time depending on the size of the data and the number of queries that need to be run.

In [None]:
count = await run()
print(f"Retrieved {count} rows")


One of the output files can be analyzed to see what the data looks like. The following cell will output the first 30 rows of the first file.

In [None]:
# Inspect some of the data.
for file in os.listdir(OUTPUT_DIRECTORY):
    if file.startswith(OUTPUT_FILE_PREFIX):
        path = os.path.join(OUTPUT_DIRECTORY, file)
        df = pd.read_pickle(path)
        break

df.head(30)


#### (Optional) Combine data into single DataFrame

If system memory permits, the data can be combined into a single `DataFrame` for more comprehensive analysis. Uncomment the line calling `combine_all_files_to_df` and run the cell.

In [None]:
full_data_df = pd.DataFrame()


def combine_all_files_to_df():
    global full_data_df
    for file in os.listdir(OUTPUT_DIRECTORY):
        if file.startswith(OUTPUT_FILE_PREFIX):
            path = os.path.join(OUTPUT_DIRECTORY, file)
            df = pd.read_pickle(path)
            full_data_df = pd.concat([full_data_df, df], ignore_index=True)

# Combine all files into a single DataFrame.
# combine_all_files_to_df()

# Inspect the combined DataFrame.
print(f"Shape of combined DataFrame: {full_data_df.shape}")
full_data_df.head(30)


## Optional: Export data to Azure Data Lake Storage (ADLS)

If desired, the data queried from your Log Analytics workspace can be exported to an [Azure Data Lake Storage (ADLS)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-introduction) account. This can be useful for storing the data for longer periods of time or for using it in other applications. To do this, the `azure-storage-file-datalake` Python package will be needed which uses the [ADLS Gen2 REST API](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-directory-file-acl-python) under the hood.

### Setup

First, ensure you have the required package installed:

In [None]:
import sys

!{sys.executable} -m pip install --upgrade azure-storage-file-datalake


Now, input your storage [connection string](https://learn.microsoft.com/azure/storage/common/storage-account-keys-manage) below and instantiate the ADLS service client.

In [None]:
from azure.storage.filedatalake import DataLakeServiceClient

AZURE_STORAGE_CONNECTION_STRING = "<your connection string>"

try:
    adls_service_client = DataLakeServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
except Exception as e:
    print(e)



Next, define a helper function that can be used to interact with the ADLS storage account(s) to which the queried data will be exported to.

In [None]:
def upload_df_to_adls_path(
    df: pd.DataFrame,
    adls_dirname: str,
    adls_filename: str,
    container_name: str,
):
    """
    Upload a pandas DataFrame to the specified ADLS path as a single JSON file.
    """
    json_data = df.to_json(orient="records", lines=True, date_format="iso")
    file_system_client = adls_service_client.get_file_system_client(file_system=container_name)

    try:
        file_system_client.create_directory(adls_dirname)
    except Exception as e:
        print(e)

    try:
        directory_client = file_system_client.get_directory_client(adls_dirname)
        file_client = directory_client.get_file_client(adls_filename)
        file_client.upload_data(json_data, overwrite=True)
    except Exception as e:
        print(e)


### Upload data

Now, run the following cell to upload the data using the helper function defined above after configuring the variables below. Here, the `upload_df_to_adls_path` function is called on each data file created previously.

To be more memory efficient, the `upload_df_to_adls_path` method can be called on each chunk's DataFrame from within the `fetch_logs` function above.

In [None]:
# NAme of the storage container. This must already exist.
CONTAINER_NAME = "<container_name>"

# Name of the directory to write to. This will be created if it does not exist.
DIRECTORY_NAME = "monitor-log-dump"

# Name of the file to write to (include the .json extension).
FILENAME = "monitor-log-dump.json"


for file in os.listdir(OUTPUT_DIRECTORY):
    if file.startswith(OUTPUT_FILE_PREFIX):
        path = os.path.join(OUTPUT_DIRECTORY, file)
        df = pd.read_pickle(path)
        upload_df_to_adls_path(df, DIRECTORY_NAME, FILENAME, CONTAINER_NAME)


## Conclusion

In this notebook, you learned how to query data from a Log Analytics workspace in chunks to avoid hitting the service limits. You also learned how to export the data to an Azure Data Lake Storage (ADLS) account.