# Detect anomalies in Azure Monitor log data using machine learning techniques 

This notebook demonstrates how a user can use the [`azure-monitor-query`](https://pypi.org/project/azure-monitor-query/) library to retrieve Azure Monitor log data for training a machine learning model to detect anomalies. The [scikit-learn](https://scikit-learn.org/stable/) library is used to train two regression models on historical data, and then the trained model with better performance is used to predict new values and identify anomalies.

1. [**Getting Started**](#getting-started) - Install dependencies, and define helper functions and constants.
2. [**Query and visualize data**](#query-and-visualize) - Explore data from a Log Analytics workspace.
3. [**Analyze data using machine learning techniques**](#analyze-data)
   * [**Prepare data for model training**](#prepare-data) - Prepare data for model training.
   * [**Train and test regression models**](#train-regression-models) - Train a linear regression model and a gradient boosting regression model on historical data.
   * [**Predict new values and identify anomalies**](#identify-anomalies) - Score new data, or predict new values, using one of the trained models to identify anomalies.
4. [**Ingest anomalies**](#ingest-anomalies) - Upload detected anomalies into a custom table in your Log Analytics workspace for further analysis. (optional)

<a id='getting-started'></a>

## 1. Getting started

Let's start by installing the Azure Monitor Query, Azure Identity and Azure Monitor Ingestion client libraries along with the `pandas` data analysis library, `plotly` visualization library, and `scikit-learn` machine learning library.

In [None]:
import sys

!{sys.executable} -m pip install --upgrade azure-monitor-query azure-identity azure-monitor-ingestion

!{sys.executable} -m pip install --upgrade pandas numpy plotly scikit-learn nbformat

### Setup

Some initial setup is needed before we can run the sample code.

#### Set Log Analytics workspace ID

Set the `LOGS_WORKSPACE_ID` variable below to the ID of your Log Analytics workspace. Currently, it is set to use the [Azure Monitor Demo workspace](https://portal.azure.com/#blade/Microsoft_Azure_Monitoring_Logs/DemoLogsBlade), but it is recommended to use your own workspace if available.

In [None]:
LOGS_WORKSPACE_ID = "DEMO_WORKSPACE"

#### Create LogsQueryClient

An authenticated client is needed to query Azure Monitor Logs. The following code shows how to create a `LogsQueryClient` using `DefaultAzureCredential`.

Note, that `LogsQueryClient` typically only supports authentication with Azure Active Directory (Azure AD) token credentials. However, we can pass in a custom authentication policy to enable the use of API keys. This allows the client to query the [demo workspace](https://learn.microsoft.com/azure/azure-monitor/logs/api/access-api#authenticate-with-a-demo-api-key). Do note that the availability and access to this demo workspace is subject to change, and it is recommended to use your own Log Analytics workspace.

In [None]:
from azure.core.credentials import AzureKeyCredential
from azure.core.pipeline.policies import AzureKeyCredentialPolicy
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient

if LOGS_WORKSPACE_ID == "DEMO_WORKSPACE":
    credential = AzureKeyCredential("DEMO_KEY")
    header_name = "X-Api-Key"
    authentication_policy = AzureKeyCredentialPolicy(name=header_name, credential=credential)
else:
    credential = DefaultAzureCredential()
    authentication_policy = None

logs_query_client = LogsQueryClient(credential, authentication_policy=authentication_policy)

#### Define helper functions

Next, we'll define some helper functions that will be used throughout the notebook.

- `query_logs_workspace` - Queries the Log Analytics workspace for a given query and returns the results as a `pandas` DataFrame.
- `display_graph` - Given a `pandas` DataFrame, displays a `plotly` line graph showing hourly usage for various data types over time.

In [None]:
import pandas as pd
import plotly.express as px

from azure.monitor.query import LogsQueryStatus
from azure.core.exceptions import HttpResponseError


def query_logs_workspace(query):
    try:
        response = logs_query_client.query_workspace(LOGS_WORKSPACE_ID, query, timespan=None)
        if response.status == LogsQueryStatus.SUCCESS:
            data = response.tables
        else:
            error = response.partial_error
            data = response.partial_data
            print(error)

        for table in data:
            my_data = pd.DataFrame(data=table.rows, columns=table.columns)
    except HttpResponseError as err:
        print("something fatal happened")
        print (err)
    return my_data


def display_graph(df, title):
    df = df.sort_values(by="TimeGenerated")
    graph = px.line(df, x='TimeGenerated', y="ActualUsage", color='DataType', title=title)
    graph.show()


# Set display options for visualizing
def display_options():
    display = pd.options.display
    display.max_columns = 10
    display.max_rows = 10
    display.max_colwidth = 300
    display.width = None
    return None

display_options()


<a id='query-and-visualize'></a>

## 2. Query and visualize data

Let's start by exploring the data in the Log Analytics workspace. We'll start by running the following query on the [Usage](https://learn.microsoft.com/azure/azure-monitor/reference/tables/usage) table which is assumed to exist inside the workspace. 

This query will check how much data (in Megabytes) was ingested into each of the tables (data types) in the Log Analytics workspace each hour over the past week.

In [None]:
TABLE = "Usage"

QUERY = f"""
let starttime = 7d; // Start date for the time series, counting back from the current date
let endtime = 0d; // today
{TABLE} | project TimeGenerated, DataType, Quantity
| where TimeGenerated between (ago(starttime)..ago(endtime))
| summarize ActualUsage=sum(Quantity) by TimeGenerated=bin(TimeGenerated, 1h), DataType
"""

df = query_logs_workspace(QUERY)
display(df)

Now, let's view the data as a graph using the helper function we defined above.

In [None]:
display_graph(df, "All Data Types - last week usage")

<a id='analyze-data'></a>

## 3. Analyze data using machine learning techniques

<a id='prepare-data'></a>
### Prepare data for model training

After exploring the available data, let's use a subset of it for model training. We will choose a few of the data types to train our model on (defined in `data_types` below). 

In [None]:
# Insert here the selected data types for analysis - for simplicity we picked 6, which seemed most interesting at exploration of data step
data_types = ["ContainerLog", "AzureNetworkAnalytics_CL", "StorageBlobLogs", "AzureDiagnostics", "Perf", "AVSSyslog"]

# Get all available data types that have data.
available_data_types = df["DataType"].unique()

# Filter out data types that are not available in the data.
data_types = list(filter(lambda data_type: data_type in available_data_types, data_types))

if data_types:
    print(f"Selected data type for analysis: {data_types}")
else:
    raise SystemExit("No datatypes found. Please select data types which have data")

# Returns usage query for selected data types for given time range
def get_selected_datatypes(data_types, start, end):
    data_types_string = ",".join([f"'{data_type}'" for data_type in data_types])
    query = (
        f"let starttime = {start}d; "
        f"let endtime = {end}d; "
        "Usage | project TimeGenerated, DataType, Quantity "
        "| where TimeGenerated between (ago(starttime)..ago(endtime)) "
        f"| where DataType in ({data_types_string}) "
        "| summarize ActualUsage=sum(Quantity) by TimeGenerated=bin(TimeGenerated, 1h), DataType"
    )
    return query

# We will query the data from the first 3 weeks of the past month.
# Feel free to change the start and end dates.
start = 28
end = 7

query = get_selected_datatypes(data_types, start, end)
my_data = query_logs_workspace(query)
display(my_data)

if my_data.empty:
    raise SystemExit("No data found for training. Please select data types which have data")


In [None]:
display_graph(my_data, "Selected Data Types - Historical Data Usage (3 weeks)")

Let's now expand the timestamp information in the TimeGenerated field into separate columns for year, month, day, and hour using [`DatetimeIndex`](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#time-date-components) from `pandas`. This will allow us to use the timestamp information as features in our model.

In [None]:
my_data['Year'] = pd.DatetimeIndex(my_data['TimeGenerated']).year
my_data['Month'] = pd.DatetimeIndex(my_data['TimeGenerated']).month
my_data['Day'] = pd.DatetimeIndex(my_data['TimeGenerated']).day
my_data['Hour'] = pd.DatetimeIndex(my_data['TimeGenerated']).hour

Finally, define the X and y variables for training the model. The X variable will contain the features (timestamp information) and the y variable will contain the target (data usage in Megabytes).

In [None]:
Y = my_data['ActualUsage']
X = my_data[['DataType', 'Year', 'Month', 'Day', 'Hour']]

display(X)

<a id="train-regression-models"></a>

### Train and test regression models on historical data

Now that we have our data prepared, let's experiment with two different regression models and check which of the models most closely predicts the data in our testing set:

#### Define cross validator

Before we train, we'll define a cross-validator using [`TimeSeriesSplit`](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.TimeSeriesSplit.html#sklearn.model_selection.TimeSeriesSplit) from `scikit-learn`.  The `evaluate` function defined below will use this cross-validator to evaluate the performance of the models we train.


In [None]:
from sklearn.model_selection import cross_validate
from sklearn.model_selection import TimeSeriesSplit

ts_cv = TimeSeriesSplit()

def evaluate(model, X, Y, cv):
    cv_results = cross_validate(
        model,
        X,
        Y,
        cv=cv,
        scoring=["neg_mean_absolute_error", "neg_root_mean_squared_error"],
    )
    mae = -cv_results["test_neg_mean_absolute_error"]
    rmse = -cv_results["test_neg_root_mean_squared_error"]
    print(
        f"Mean Absolute Error:     {mae.mean():.3f} +/- {mae.std():.3f}\n"
        f"Root Mean Squared Error: {rmse.mean():.3f} +/- {rmse.std():.3f}"
    )


#### Train and evaluate a linear regression model

First, let's train a linear regression model.

Here, we first apply some transformations to the input data:

* One-hot encode the categorical features using [`OneHotEncoder`](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html#sklearn.preprocessing.OneHotEncoder). This is how we numerically represent "DataTypes" in our model.
* Scales numerical features - in our case, hourly usage - to the 0-1 range.

Then, we train the model using an extension of Linear regression called [Ridge Regression](https://en.wikipedia.org/wiki/Ridge_regression). This is a linear regression model that uses L2 [regularization](https://en.wikipedia.org/wiki/Regularization_(mathematics)) to prevent overfitting.

Finally, we evaluate the model using the cross-validator defined above.

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import RidgeCV
import numpy as np


categorical_columns = ["DataType"]

one_hot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)

# Get 25 alpha values between 10^-6 and 10^6
alphas = np.logspace(-6, 6, 25)
ridge_linear_pipeline = make_pipeline(
    ColumnTransformer(
        transformers=[
            ("categorical", one_hot_encoder, categorical_columns),
        ],
        remainder=MinMaxScaler(),
    ),
    RidgeCV(alphas=alphas),
)

ridge_linear_pipeline.fit(X, Y)

print("Score of Linear Regression:")
evaluate(ridge_linear_pipeline, X, Y, cv=ts_cv)

#### Train and evaluate a gradient boosting regression model

Next, let's train a gradient boosting regression model. Here, we'll use [`HistGradientBoostingRegressor`](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.HistGradientBoostingRegressor.html#sklearn.ensemble.HistGradientBoostingRegressor) from `scikit-learn`. We will do ordinal encoding of the categorical features using [`OrdinalEncoder`](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OrdinalEncoder.html#sklearn.preprocessing.OrdinalEncoder).

In [None]:
from sklearn.preprocessing import OrdinalEncoder
from sklearn.ensemble import HistGradientBoostingRegressor


ordinal_encoder = OrdinalEncoder(categories=[data_types])

gradient_boosting_pipeline = make_pipeline(
    ColumnTransformer(
        transformers=[
            ("categorical", ordinal_encoder, categorical_columns),
        ],
        remainder="passthrough",
    ),
    HistGradientBoostingRegressor(
        categorical_features=range(1),
    ),
)

gradient_boosting_pipeline.fit(X, Y)
print("Score of Gradient Boosting Regression:")
evaluate(gradient_boosting_pipeline, X, Y, cv=ts_cv)

Take a look at the error metrics for both models. Which model performs better?

Typically, for this dataset, the gradient boosting regression model will perform better than the linear regression model based on the lower error metrics. Since the gradient boosting regression model performs better, we'll use it to predict new values and identify anomalies.

#### Save the model

First, we need to pickle the model so that we can use it later.

In [None]:
import joblib

# Save the model as a pickle file
filename = './myModel.pkl'
joblib.dump(gradient_boosting_pipeline, filename)

<a id="identify-anomalies"></a>

### Predict new values and identify anomalies

Now that we have a trained model, let's use it to predict new values and identify anomalies. Let's start by querying ingestion information for the six data types we selected over the past week.

In [None]:
# Time range from past week.
start = 7
end = 0

query = get_selected_datatypes(data_types, start, end)
new_data = query_logs_workspace(query)

new_data['Year'] = pd.DatetimeIndex(new_data['TimeGenerated']).year
new_data['Month'] = pd.DatetimeIndex(new_data['TimeGenerated']).month
new_data['Day'] = pd.DatetimeIndex(new_data['TimeGenerated']).day
new_data['Hour'] = pd.DatetimeIndex(new_data['TimeGenerated']).hour
display(new_data)

Visualize the data in a graph:

In [None]:
display_graph(new_data, "Selected Data Types - New Data Usage (1 week)")

Next, load the model from the pickle file and use it to predict (score) values for the latest data.

In [None]:
# Load the model from the file
X_new = new_data[['DataType', 'Year', 'Month', 'Day', 'Hour']]

loaded_model = joblib.load(filename)
Predictions_new = loaded_model.predict(X_new)
new_data["PredictedUsage"] = Predictions_new
display(new_data)


From the displayed DataFrame, you should see an additional column called "PredictedUsage" which contains the predicted usage values.

#### Identify ingestion anomalies

Let's now try to identify anomalies. There are multiple approaches to identifying anomalies, but, for this sample, we'll use a method call [Tukey's fences](https://en.wikipedia.org/wiki/Outlier#Tukey%27s_fences).

Note: The KQL [series_decompose_anomalies](https://learn.microsoft.com/azure/data-explorer/kusto/query/series-decompose-anomaliesfunction) function also uses the Tukey's fences method to detect anomalies.

#### Define helper functions

Let's define a couple of helper function that will help us identify anomalies. These will update a DataFrame with a new column called `Anomalies` where `1` indicates a positive anomaly, and `-1` indicates a negative anomaly.

In [None]:
def outlier_range(data_column):
    sorted(data_column)
    Q1, Q3 = np.percentile(data_column , [10,90])
    IQR = Q3 - Q1
    lower_bound = Q1 - (1.5 * IQR)
    upper_bound = Q3 + (1.5 * IQR)
    return lower_bound, upper_bound

def outlier_update_data_frame(df):
    lower_bound, upper_bound = outlier_range(df['Residual'])

    df.loc[((df['Residual'] < lower_bound) | (df['Residual'] > upper_bound)) & (df['Residual'] < 0) , 'Anomalies'] = -1
    df.loc[((df['Residual'] < lower_bound) | (df['Residual'] > upper_bound)) & (df['Residual'] >= 0) , 'Anomalies'] = 1
    df.loc[(df['Residual'] >= lower_bound) & (df['Residual'] <= upper_bound), 'Anomalies'] = 0

    return df[df['Anomalies'] != 0]

Run the helper functions on the DataFrame to identify anomalies in the new data:

In [None]:

new_data["Residual"] = new_data["ActualUsage"] - new_data["PredictedUsage"]
new_data_datatypes = new_data["DataType"].unique()

new_data.set_index('DataType', inplace=True)

anomalies_df = pd.DataFrame()
for data_type in new_data_datatypes:
    type_anomalies = outlier_update_data_frame(new_data.loc[data_type, :])
    # Add DataType as a column since we reset index later on
    type_anomalies['DataType'] = data_type
    anomalies_df = pd.concat([anomalies_df, type_anomalies], ignore_index=True)

new_data.reset_index(inplace=True)

print(f"{len(anomalies_df)} anomalies detected")
display(anomalies_df)

<a id="ingest-anomalies"></a>

## 4. Ingest anomalies (optional)

Optionally, we can upload detected anomalies to a custom table in a Log Analytics workspace. This can be useful for further analysis or visualization.

To send data to your Log Analytics workspace, you need a registered Azure Active Directory application, custom table, data collection endpoint (DCE), and data collection rule (DCR). You also need to assign permissions to data collection rule so that the Azure AD application can upload.

Use the following tutorial for specifics on creating the prerequisites: [Tutorial: Send data to Azure Monitor Logs with Logs ingestion API (Azure portal) ](https://learn.microsoft.com/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).

When creating the table for the custom logs, use the JSON file created in the following cell when asked to upload a sample JSON file:

In [None]:
import json

sample_data = [{
    "TimeGenerated": "2023-03-19T19:56:43.7447391Z",
    "ActualUsage": 40.1,
    "PredictedUsage": 45.1,
    "Anomalies": -1,
    "DataType": "AzureDiagnostics"
}]

with open("data_sample.json", "w") as file:
    json.dump(sample_data, file)


Then use the following in the `Transformation Editor`:

`source | extend AnomalyTimeGenerated = todatetime(TimeGenerated) | extend TimeGenerated = now() `

This will add a transformation so that `AnomalyTimeGenerated` indicates the time when the anomaly was detected and `TimeGenerated` indicates the time when the anomaly was uploaded to the custom table.

### Define constants

Define constants for your Azure AD application and DCR/DCE information.

In [None]:
AZURE_TENANT_ID = "<Tenant ID>"; # ID of the tenant where the data collection endpoint resides
AZURE_CLIENT_ID = "<Application ID>"; # Application ID to which you granted permissions to your data collection rule
AZURE_CLIENT_SECRET = "<Client secret>"; # Secret created for the application

LOGS_DCR_STREAM_NAME = "<Custom stream name>" # Name of the custom stream from the data collection rule (e.g. "Custom-DetectedAnomalies_CL")
LOGS_DCR_RULE_ID = "<Data collection rule immutableId>" # immutableId of your data collection rule (Can be found in the JSON View of the data collection rule overview page)
DATA_COLLECTION_ENDPOINT =  "<Logs ingestion URL of your endpoint>" # URL that looks like this: https://xxxx.ingest.monitor.azure.com

### Ingest the data

After creating the table and Data collection rule, you can use the following code to ingest the data into the custom table.

**Note:** After creating the table, it can take up to 15 minutes for the table to be available for ingestion through the DCR stream.

In [None]:
from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient


credential = ClientSecretCredential(
    tenant_id=AZURE_TENANT_ID,
    client_id=AZURE_CLIENT_ID,
    client_secret=AZURE_CLIENT_SECRET
)

client = LogsIngestionClient(endpoint=DATA_COLLECTION_ENDPOINT, credential=credential, logging_enable=True)

body = json.loads(anomalies_df.to_json(orient='records', date_format='iso'))

try:
   response = client.upload(rule_id=LOGS_DCR_RULE_ID, stream_name=LOGS_DCR_STREAM_NAME, logs=body)
   print("Upload request accepted")
except HttpResponseError as e:
    print(f"Upload failed: {e}")

