1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
--
-- (C) 2021 - ntop.org
--
local ts_schema = {}
local ts_common = require("ts_common")
-- NOTE: to get the actual rentention period, multiply retention_dp * aggregation_dp * step
ts_schema.supported_steps = {
["1"] = {
retention = {
-- aggregation_dp: number of raw points to aggregate
-- retention_dp: number of aggregated dp to store
{aggregation_dp = 1, retention_dp = 86400}, -- 1 second resolution: keep for 1 day
{aggregation_dp = 60, retention_dp = 43200}, -- 1 minute resolution: keep for 1 month
{aggregation_dp = 3600, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
}, hwpredict = {
row_count = 86400, -- keep 1 day prediction
period = 3600, -- assume 1 hour periodicity
}
},
["5"] = {
retention = {
-- aggregation_dp: number of raw points to aggregate
-- retention_dp: number of aggregated dp to store
{aggregation_dp = 1, retention_dp = 86400}, -- 1 second resolution: keep for 1 day
{aggregation_dp = 12, retention_dp = 43200}, -- 1 minute resolution: keep for 1 month
{aggregation_dp = 720, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
}, hwpredict = {
row_count = 86400, -- keep 1 day prediction
period = 3600, -- assume 1 hour periodicity
}
},
["60"] = {
retention = {
{aggregation_dp = 1, retention_dp = 1440}, -- 1 minute resolution: keep for 1 day
{aggregation_dp = 60, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 1440, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 10080, -- keep 1 week prediction
period = 1440, -- assume 1 day periodicity
}
},
["300"] = {
retention = {
{aggregation_dp = 1, retention_dp = 288}, -- 5 minute resolution: keep for 1 day
{aggregation_dp = 12, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 288, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 2016, -- keep 1 week prediction
period = 288, -- assume 1 day periodicity
}
},
["3600"] = {
retention = {
{aggregation_dp = 1, retention_dp = 720}, -- 1 hour resolution: keep for 1 month
{aggregation_dp = 24, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}
}
}
function ts_schema:new(name, options)
options = options or {}
options.metrics_type = options.metrics_type or ts_common.metrics.counter
--options.is_critical_ts : if true, this timeseries should be written even if ntop.isDeadlineApproaching()
-- required options
if not options.step then
traceError(TRACE_ERROR, TRACE_CONSOLE, "missing step option in schema " .. name)
return nil
end
local obj = {name=name, options=options, _tags={}, _metrics={}, tags={}, metrics={}}
local step_info = ts_schema.supported_steps[tostring(options.step)]
if step_info ~= nil then
-- add retention policy and other informations
for k, v in pairs(step_info) do
obj[k] = v
end
end
setmetatable(obj, self)
self.__index = self
return obj
end
local function validateTagMetric(name)
if(name == "measurement") then
--[[
traceError(TRACE_ERROR, TRACE_CONSOLE, string.format("Invalid tag/measurement name: \"%s\"", name))
tprint(debug.traceback())
]]
return(false)
end
return(true)
end
function ts_schema:addTag(name)
if not validateTagMetric(name) then
return
end
if self.tags[name] == nil then
self._tags[#self._tags + 1] = name
self.tags[name] = 1
end
end
function ts_schema:addMetric(name)
if not validateTagMetric(name) then
return
end
if self.metrics[name] == nil then
self._metrics[#self._metrics + 1] = name
self.metrics[name] = 1
end
end
function ts_schema:allTagsDefined(tags)
for tag in pairs(self.tags) do
if tags[tag] == nil then
return false, tag
end
end
return true
end
function ts_schema:verifyTags(tags)
local actual_tags = {}
local all_defined, missing_tag = self:allTagsDefined(tags)
if not all_defined then
traceError(TRACE_ERROR, TRACE_CONSOLE, "missing tag '" .. missing_tag .. "' in schema " .. self.name)
return nil
end
for tag in pairs(tags) do
if self.tags[tag] == nil then
-- NOTE: just ignore the additional tags
--traceError(TRACE_ERROR, TRACE_CONSOLE, "unknown tag '" .. tag .. "' in schema " .. self.name)
--return false
else
actual_tags[tag] = tags[tag]
end
end
return actual_tags
end
function ts_schema:verifyTagsAndMetrics(tags_and_metrics)
local tags = {}
local metrics = {}
for tag in pairs(self.tags) do
if tags_and_metrics[tag] == nil then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Missing mandatory tag '" .. tag .. "' while using schema " .. self.name)
return nil
end
tags[tag] = tags_and_metrics[tag]
end
for metric in pairs(self.metrics) do
if tags_and_metrics[metric] == nil then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Missing mandatory metric '" .. metric .. "' while using schema " .. self.name)
return nil
end
metrics[metric] = tags_and_metrics[metric]
end
for item in pairs(tags_and_metrics) do
if((self.tags[item] == nil) and (self.metrics[item] == nil))then
traceError(TRACE_ERROR, TRACE_CONSOLE, "unknown tag/metric '" .. item .. "' in schema " .. self.name)
return nil
end
end
-- NOTE: the ifid tag is required in order to identify all the ts of
-- a given interface (also for the system interface). This is required in
-- order to properly delete them from "Manage Data".
if(tags.ifid == nil) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "An 'ifid' tag is required in schema " .. self.name)
return nil
end
return tags, metrics
end
function ts_schema:getAggregationFunction()
local fn = self.options.aggregation_function
if((fn ~= nil) and (ts_common.aggregation[fn] ~= nil)) then
return(fn)
end
-- fallback
return(ts_common.aggregation.mean)
end
return ts_schema
|