1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
|
--
-- (C) 2014-22 - ntop.org
--
local dirs = ntop.getDirs()
package.path = dirs.installdir .. "/scripts/lua/modules/?.lua;" .. package.path
require "template"
require "lua_utils"
local db_debug = false
local db_utils = {}
-- ########################################################
local function iptonumber(str)
local num = 0
for elem in str:gmatch("%d+") do
num = num * 256 + assert(tonumber(elem))
end
return num
end
-- ########################################################
function expandIpV4Network(net)
local address, prefix = splitNetworkPrefix(net)
if(prefix == nil or prefix > 32 or prefix < 0) then prefix = 32 end
local num_hosts = 2^(32-prefix)
local addr = iptonumber(address)
for i=1,32-prefix do
addr = clearbit(addr, bit(i))
end
-- Uses floor function to make sure numbers are returned as integers
local addr_lowest = math.floor(addr)
local addr_highest = math.floor(addr + num_hosts - 1)
local res = { addr_lowest, addr_highest }
return(res)
end
-- ########################################################
local function flowsTableName(version, force_raw)
if tblname_prefs == nil then
tblname_prefs = ntop.getPrefs()
end
local tblname = "flowsv"..version
-- return "flowsv"..version -- FIXX: remove this line when ready
return tblname
end
-- ########################################################
local function allowedNetworksIter(version)
if version ~= 4 and version ~= 6 then
return function() return nil end
end
local an = ntop.getAllowedNetworks()..","
local spl = an:split(",")
local pos = 0
return function()
while pos < #spl do
pos = pos + 1
local net = spl[pos]
local v6_net = not isIPv4Network(net)
if (v6_net and version == 6) or (not v6_net and version == 4) then
return net
end
end
end
end
-- ########################################################
function allowedNetworksRestrictions()
for net in allowedNetworksIter(6) do
if net:match('[1-9a-fA-F]') then
return true
end
end
for net in allowedNetworksIter(4) do
local expanded = expandIpV4Network(net)
if (expanded[1] > 0 or expanded[2] < 2^32 - 1) then
return true
end
end
return false
end
-- ########################################################
local function allowedNetworksFilter(follow, version, filter_src, filter_dst)
local an = ntop.getAllowedNetworks()..","
local v4_expanded = {}
local expanded_src = {}
local expanded_dst = {}
for net in allowedNetworksIter(version) do
-- tprint({net=net, match=net:match('[1-9]'), version=version})
if version == 6 and net:match('[1-9a-fA-F]') then
-- no results when we've a non-zero ipv6 allowed network
return follow.." AND 1 = 0 "
end
if version == 4 then
local expanded = expandIpV4Network(net)
local filter = {}
if filter_src and (expanded[1] > 0 or expanded[2] < 2^32 - 1) then
expanded_src[#expanded_src + 1] = string.format(" (IP_SRC_ADDR >= %d AND IP_SRC_ADDR <= %d) ", expanded[1], expanded[2])
end
if filter_dst and (expanded[1] > 0 or expanded[2] < 2^32 - 1) then
expanded_dst[#expanded_dst + 1] = string.format(" (IP_DST_ADDR >= %d AND IP_DST_ADDR <= %d) ", expanded[1], expanded[2])
end
end
end
local res = "1 = 1"
if version == 4 then
-- multiple allowed networks must be evaluated in OR
local r = {}
if table.len(expanded_src) > 0 then
r[#r + 1] = string.format(" (%s) ", table.concat(expanded_src, " OR "))
end
if table.len(expanded_dst) > 0 then
r[#r + 1] = string.format(" (%s) ", table.concat(expanded_dst, " OR "))
end
if table.len(r) > 0 then
res = table.concat(r, " AND ")
end
elseif version == 6 then
-- TODO: implement filtering for ipv6 networks
end
return string.format("%s AND (%s)", follow, res)
end
-- ########################################################
function getInterfaceTopFlows(interface_id, version, host_or_profile, peer, l7proto, l4proto, port, vlan, profile, info, epoch_begin, epoch_end, offset, max_num_flows, sort_column, sort_order)
-- CONVERT(UNCOMPRESS(JSON) USING 'utf8') AS JSON
if(version == 4) then
sql = "select INET_NTOA(IP_SRC_ADDR) AS IP_SRC_ADDR,INET_NTOA(IP_DST_ADDR) AS IP_DST_ADDR"
else
sql = "select IP_SRC_ADDR, IP_DST_ADDR"
end
follow = " ,L4_SRC_PORT,L4_DST_PORT,VLAN_ID,PROTOCOL,FIRST_SWITCHED,LAST_SWITCHED,PACKETS,IN_BYTES + OUT_BYTES as BYTES,IN_BYTES,OUT_BYTES,idx,L7_PROTO,INFO"
if ntop.isPro() then follow = follow..",PROFILE" end
follow = follow.." from "..flowsTableName(version, true --[[ force raw flows --]]).." where FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
if((l7proto ~= "") and (l7proto ~= "-1")) then follow = follow .." AND L7_PROTO="..l7proto end
if((l4proto ~= "") and (l4proto ~= "-1")) then follow = follow .." AND PROTOCOL="..l4proto end
if(port ~= "") then follow = follow .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")" end
if((vlan ~= nil) and (vlan ~= "")) then follow = follow .." AND VLAN_ID="..vlan end
if((profile ~= nil) and (profile ~= "")) then follow = follow .." AND PROFILE='"..profile.."'" end
if(info ~= "") then follow = follow .." AND (INFO LIKE '%"..info.."%')" end
follow = follow.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='')"
follow = follow.." AND (INTERFACE_ID='"..tonumber(interface_id).."')"
if host_or_profile ~= nil and host_or_profile ~= "" and string.starts(host_or_profile, 'profile:') then
host_or_profile = string.gsub(host_or_profile, 'profile:', '')
follow = follow .. " AND (PROFILE='"..host_or_profile.."') "
elseif host_or_profile ~= nil and host_or_profile ~= "" then
if(version == 4) then
rsp = expandIpV4Network(host_or_profile)
follow = follow .." AND ((IP_SRC_ADDR>="..rsp[1].." AND IP_SRC_ADDR <= "..rsp[2]..")"
follow = follow .." OR (IP_DST_ADDR>="..rsp[1].." AND IP_DST_ADDR <= "..rsp[2].."))"
if peer ~= nil and peer ~= "" then
rsp = expandIpV4Network(peer)
follow = follow .." AND ((IP_SRC_ADDR>="..rsp[1].." AND IP_SRC_ADDR <= "..rsp[2]..")"
follow = follow .." OR (IP_DST_ADDR>="..rsp[1].." AND IP_DST_ADDR <= "..rsp[2].."))"
end
else
follow = follow .." AND (IP_SRC_ADDR='"..host_or_profile.."' OR IP_DST_ADDR='"..host_or_profile.."')"
if peer ~= nil and peer ~= "" then
follow = follow .." AND (IP_SRC_ADDR='"..peer.."' OR IP_DST_ADDR='"..peer.."')"
end
end
end
follow = allowedNetworksFilter(follow, version, true, true)
follow = follow .." order by "..sort_column.." "..sort_order.." limit "..max_num_flows.." OFFSET "..offset
sql = sql .. follow
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql, false) -- do not limit the maximum number of flows
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getFlowInfo(interface_id, version, flow_idx)
version = tonumber(version)
if(version == 4) then
sql = "select INET_NTOA(IP_SRC_ADDR) AS IP_SRC_ADDR,INET_NTOA(IP_DST_ADDR) AS IP_DST_ADDR"
else
sql = "select IP_SRC_ADDR, IP_DST_ADDR"
end
follow = " ,L4_SRC_PORT,L4_DST_PORT,VLAN_ID,PROTOCOL,FIRST_SWITCHED,LAST_SWITCHED,PACKETS,IN_BYTES, OUT_BYTES, IN_BYTES + OUT_BYTES as BYTES,idx,L7_PROTO,INFO,CONVERT(UNCOMPRESS(JSON) USING 'utf8') AS JSON from flowsv"..version
follow = follow.." where idx="..flow_idx
sql = sql .. follow
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getNumFlows(interface_id, version, host, protocol, port, l7proto, info, vlan, profile, epoch_begin, epoch_end, force_raw_flows, enforce_allowed_networks)
if(version == nil) then version = 4 end
if(info == "") then info = nil end
if(l7proto == "") then l7proto = nil end
if(protocol == "") then protocol = nil end
if l7proto ~= "" and l7proto ~= nil then
if(not(isnumber(l7proto))) then
local id
l7proto = string.gsub(l7proto, "%.rrd", "")
if(string.ends(l7proto, ".rrd")) then l7proto = string.sub(l7proto, 1, -5) end
id = interface.getnDPIProtoId(l7proto)
if(id ~= -1) then
l7proto = id
title = "Top "..l7proto.." Flows"
else
l7proto = ""
end
end
end
sql = "select COUNT(*) AS TOT_FLOWS, SUM(IN_BYTES + OUT_BYTES) AS TOT_BYTES, SUM(PACKETS) AS TOT_PACKETS FROM "..flowsTableName(version, force_raw_flows --[[force count from raw flows --]]).." where FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='')"
sql = sql.." AND (INTERFACE_ID='"..tonumber(interface_id).."')"
if((l7proto ~= nil) and (l7proto ~= "")) then sql = sql .." AND L7_PROTO="..l7proto end
if((protocol ~= nil) and (protocol ~= "")) then sql = sql .." AND PROTOCOL="..protocol end
if((vlan ~= nil) and (vlan ~= "")) then sql = sql .." AND VLAN_ID="..vlan end
if((profile ~= nil) and (profile ~= "")) then sql = sql .." AND PROFILE='"..profile.."'" end
if(info ~= nil) then sql = sql .." AND (INFO LIKE '%"..info.."%')" end
if((port ~= nil) and (port ~= "")) then sql = sql .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")" end
if((host ~= nil) and (host ~= "")) then
if(version == 4) then
local ip_range = expandIpV4Network(host)
local ip_lowest = ip_range[1]
local ip_highest = ip_range[2]
if ip_lowest == ip_highest then
sql = sql .." AND (IP_SRC_ADDR='"..ip_highest.."' OR IP_DST_ADDR='"..ip_highest.."')"
else
sql = sql .." AND ((IP_SRC_ADDR>='"..ip_lowest.."' AND IP_SRC_ADDR<='"..ip_highest.."')"
sql = sql .." OR (IP_DST_ADDR>='"..ip_lowest.."' AND IP_DST_ADDR<='"..ip_highest.."'))"
end
else
sql = sql .." AND (IP_SRC_ADDR='"..host.."' OR IP_DST_ADDR='"..host.."')"
end
end
if enforce_allowed_networks then
sql = allowedNetworksFilter(sql, version, true, true)
end
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getOverallTopTalkersSELECT_FROM_WHERE_clause(src_or_dst, v4_or_v6, epoch_begin, epoch_end, ifid, l4proto, port, vlan, profile)
local sql = ""
local sql_bytes_packets = "PACKETS as packets, "
local exclude_same_src_dst = false
if src_or_dst == "IP_DST_ADDR" then
exclude_same_src_dst = true
-- if this is a destination address, we account it INGRESS traffic
sql_bytes_packets = sql_bytes_packets .. "OUT_BYTES as bytes_sent, IN_BYTES as bytes_rcvd, "
elseif src_or_dst == "IP_SRC_ADDR" then
-- if this is a source address, we account the traffic as EGRESS
sql_bytes_packets = sql_bytes_packets .. " IN_BYTES as bytes_sent, OUT_BYTES as bytes_rcvd, "
else
return nil -- make sure to exit early if no valid data has been passed
end
if v4_or_v6 == 6 then
sql = " SELECT NULL addrv4, "..src_or_dst.." addrv6, "
sql = sql..sql_bytes_packets
sql = sql.."FIRST_SWITCHED, LAST_SWITCHED FROM "..flowsTableName(6).." "
elseif v4_or_v6 == 4 then -- ipv4
sql = " SELECT "..src_or_dst.." addrv4, NULL addrv6, "
sql = sql..sql_bytes_packets
sql = sql.."FIRST_SWITCHED, LAST_SWITCHED FROM "..flowsTableName(4).." "
else
sql = ""
end
sql = sql.." WHERE FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='') "
sql = sql.." AND (INTERFACE_ID='"..tonumber(ifid).."') "
if(exclude_same_src_dst == true) then
-- avoid double counting flows that have source == destination
sql = sql.." AND IP_SRC_ADDR <> IP_DST_ADDR"
end
if((l4proto ~= nil) and (l4proto ~= "") and (l4proto ~= "-1")) then
sql = sql .." AND PROTOCOL="..l4proto
end
if((port ~= nil) and (port ~= "")) then
sql = sql .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")"
end
if((vlan ~= nil) and (vlan ~= "")) then
sql = sql .." AND VLAN_ID="..vlan
end
if((profile ~= nil) and (profile ~= "")) then
sql = sql .." AND PROFILE='"..profile.."'"
end
if src_or_dst == "IP_DST_ADDR" then
sql = allowedNetworksFilter(sql, v4_or_v6, false, true)
elseif src_or_dst == "IP_SRC_ADDR" then
sql = allowedNetworksFilter(sql, v4_or_v6, true, false)
end
return sql..'\n'
end
-- ########################################################
function getOverallTopTalkers(interface_id, l4proto, port, vlan, profile, info, epoch_begin, epoch_end, sort_column, sort_order, offset, limit)
-- retrieves top talkers in the given time range
if(info == "") then info = nil end
-- AGGREGATE AND CRUNCH DATA
sql = "select CASE WHEN addrv4 IS NOT NULL THEN INET_NTOA(addrv4) ELSE addrv6 END addr, "
sql = sql.."SUM(bytes_sent + bytes_rcvd) tot_bytes, SUM(packets) tot_packets, "
sql = sql.."SUM(bytes_sent) bytes_sent, "
sql = sql.."SUM(bytes_rcvd) bytes_rcvd, "
sql = sql.."count(*) tot_flows "
-- sql = sql.." (sum(LAST_SWITCHED) - sum(FIRST_SWITCHED)) / count(*) as avg_flow_duration "
sql = sql.." FROM "
sql = sql.."("
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 4, epoch_begin, epoch_end, interface_id, l4proto, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 4, epoch_begin, epoch_end, interface_id, l4proto, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 6, epoch_begin, epoch_end, interface_id, l4proto, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 6, epoch_begin, epoch_end, interface_id, l4proto, port, vlan, profile)
sql = sql..") talkers"
sql = sql.." group by addr "
-- ORDER
local order_by_column = "tot_bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "tot_packets"
elseif sort_column == "column_bytes_sent" or sort_column == "bytes_sent" then
order_by_column = "bytes_sent"
elseif sort_column == "column_bytes_rcvd" or sort_column == "bytes_rcvd" then
order_by_column = "bytes_rcvd"
elseif sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "tot_flows"
-- elseif sort_column == "column_avg_flow_duration" or sort_column == "avg_flow_duration" then
-- order_by_column = "avg_flow_duration"
end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local slice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getHostTopTalkers(interface_id, host, l7_proto_id, l4_proto_id, port, vlan, profile, info, epoch_begin, epoch_end, sort_column, sort_order, offset, limit)
-- obtains host top talkers, possibly restricting the range only to l7_proto_id
if host == nil or host == "" then return {} end
local version = 4
if isIPv6(host) then version = 6 end
if(info == "") then info = nil end
sql = " SELECT addr, "
sql = sql.."sum(peer_bytes_sent + peer_bytes_rcvd) as tot_bytes, sum(peer_packets) as tot_packets, "
sql = sql.."sum(peer_bytes_sent) as bytes_sent, "
sql = sql.."sum(peer_bytes_rcvd) as bytes_rcvd, "
sql = sql.."count(*) as flows "
-- sql = sql.." (sum(LAST_SWITCHED) - sum(FIRST_SWITCHED)) / count(*) as avg_flow_duration "
sql = sql .. "FROM ( SELECT PACKETS as peer_packets, "
if(version == 4) then
sql = sql.." CASE WHEN IP_SRC_ADDR = INET_ATON('"..host.."') THEN INET_NTOA(IP_DST_ADDR) ELSE INET_NTOA(IP_SRC_ADDR) END addr, "
-- when the selected host is the source, we consider its peer that is a destination an thus RECEIVES bytes and packets
-- similarly, when the selected host is the destination, we consider its peer as a source that SENDS bytes and packets
sql = sql.." CASE WHEN IP_SRC_ADDR = INET_ATON('"..host.."') THEN OUT_BYTES ELSE IN_BYTES END peer_bytes_sent, "
sql = sql.." CASE WHEN IP_SRC_ADDR = INET_ATON('"..host.."') THEN IN_BYTES ELSE OUT_BYTES END peer_bytes_rcvd, "
else
sql = sql.." CASE WHEN IP_SRC_ADDR = '"..host.."' THEN IP_DST_ADDR ELSE IP_SRC_ADDR END addr, "
sql = sql.." CASE WHEN IP_SRC_ADDR = '"..host.."' THEN OUT_BYTES ELSE IN_BYTES END peer_bytes_sent, "
sql = sql.." CASE WHEN IP_SRC_ADDR = '"..host.."' THEN IN_BYTES ELSE OUT_BYTES END peer_bytes_rcvd, "
end
sql = sql.." FIRST_SWITCHED, LAST_SWITCHED "
sql = sql.." FROM "..flowsTableName(version)
sql = sql.." WHERE FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='')"
sql = sql.." AND (INTERFACE_ID='"..tonumber(interface_id).."')"
if((port ~= nil) and (port ~= "")) then
sql = sql .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")"
end
if((vlan ~= nil) and (vlan ~= "")) then
sql = sql .." AND VLAN_ID="..vlan
end
if((profile ~= nil) and (profile ~= "")) then
sql = sql .." AND PROFILE='"..profile.."'"
end
if(info ~= nil) then
sql = sql .." AND (INFO LIKE '%"..info.."%')"
end
if l7_proto_id and l7_proto_id ~="" then sql = sql.." AND L7_PROTO = "..tonumber(l7_proto_id) end
if l4_proto_id and l4_proto_id ~="" then sql = sql.." AND PROTOCOL = "..tonumber(l4_proto_id) end
if(version == 4) then
sql = sql .." AND (IP_SRC_ADDR=INET_ATON('"..host.."') OR IP_DST_ADDR=INET_ATON('"..host.."'))"
else
sql = sql .." AND (IP_SRC_ADDR='"..host.."' OR IP_DST_ADDR='"..host.."')"
end
sql = allowedNetworksFilter(sql, version, true, true)
sql = sql..") peers"
-- we don't care about the order so we group by least and greatest
sql = sql.." group by addr "
-- ORDER
local order_by_column = "tot_bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "tot_packets"
elseif sort_column == "column_bytes" or sort_column == "bytes" or sort_column == "tot_bytes" then
order_by_column = "tot_bytes"
elseif sort_column == "column_bytes_sent" or sort_column == "bytes_sent" then
order_by_column = "bytes_sent"
elseif sort_column == "column_bytes_rcvd" or sort_column == "bytes_rcvd" then
order_by_column = "bytes_rcvd"
elseif sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "flows"
-- elseif sort_column == "column_avg_flow_duration" or sort_column == "avg_flow_duration" then
-- order_by_column = "avg_flow_duration"
end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local slice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getAppTopTalkersSELECT_FROM_WHERE_clause(src_or_dst, v4_or_v6, epoch_begin, epoch_end, ifid, l7_proto_id, l4_proto_id, port, vlan, profile)
local sql = ""
local sql_bytes_packets = "PACKETS as packets, "
local exclude_same_src_dst = false
if src_or_dst == "IP_DST_ADDR" then
exclude_same_src_dst = true
-- if this is a destination address, we account it INGRESS traffic
sql_bytes_packets = sql_bytes_packets .. "OUT_BYTES as bytes_sent, IN_BYTES as bytes_rcvd, "
elseif src_or_dst == "IP_SRC_ADDR" then
-- if this is a source address, we account the traffic as EGRESS
sql_bytes_packets = sql_bytes_packets .. " IN_BYTES as bytes_sent, OUT_BYTES as bytes_rcvd, "
else
return nil -- make sure to exit early if no valid data has been passed
end
if v4_or_v6 == 6 then
sql = " SELECT NULL addrv4, "..src_or_dst.." addrv6, "
sql = sql..sql_bytes_packets
sql = sql.."FIRST_SWITCHED, LAST_SWITCHED FROM "..flowsTableName(6).." "
elseif v4_or_v6 == 4 then -- ipv4
sql = " SELECT "..src_or_dst.." addrv4, NULL addrv6, "
sql = sql..sql_bytes_packets
sql = sql.."FIRST_SWITCHED, LAST_SWITCHED FROM "..flowsTableName(4).." "
else
sql = ""
end
sql = sql.." WHERE FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='') "
sql = sql.." AND (INTERFACE_ID='"..tonumber(ifid).."') "
sql = sql.." AND L7_PROTO = "..tonumber(l7_proto_id)
if(exclude_same_src_dst == true) then
-- avoid double counting flows that have source == destination
sql = sql.." AND IP_SRC_ADDR <> IP_DST_ADDR"
end
if((l4_proto_id ~= nil) and (l4_proto_id ~= "") and (l4_proto_id ~= "-1")) then
sql = sql .." AND PROTOCOL="..l4_proto_id
end
if((port ~= nil) and (port ~= "")) then
sql = sql .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")"
end
if((vlan ~= nil) and (vlan ~= "")) then
sql = sql .." AND VLAN_ID="..vlan
end
if((profile ~= nil) and (profile ~= "")) then
sql = sql .." AND PROFILE='"..profile.."'"
end
if src_or_dst == "IP_DST_ADDR" then
sql = allowedNetworksFilter(sql, v4_or_v6, false, true)
elseif src_or_dst == "IP_SRC_ADDR" then
sql = allowedNetworksFilter(sql, v4_or_v6, true, false)
end
return sql..'\n'
end
-- ########################################################
function getAppTopTalkers(interface_id, l7_proto_id, l4_proto_id, port, vlan, profile, info, epoch_begin, epoch_end, sort_column, sort_order, offset, limit)
-- retrieves top talkers in the given time range
if(info == "") then info = nil end
-- AGGREGATE AND CRUNCH DATA
sql = "select CASE WHEN addrv4 IS NOT NULL THEN INET_NTOA(addrv4) ELSE addrv6 END addr, "
sql = sql.."SUM(bytes_sent + bytes_rcvd) tot_bytes, SUM(packets) tot_packets, "
sql = sql.."SUM(bytes_sent) bytes_sent, "
sql = sql.."SUM(bytes_rcvd) bytes_rcvd, "
sql = sql.."count(*) tot_flows "
-- sql = sql.." (sum(LAST_SWITCHED) - sum(FIRST_SWITCHED)) / count(*) as avg_flow_duration "
sql = sql.." FROM "
sql = sql.."("
sql = sql..getAppTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 4, epoch_begin, epoch_end, interface_id, l7_proto_id, l4_proto_id, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getAppTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 4, epoch_begin, epoch_end, interface_id, l7_proto_id, l4_proto_id, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getAppTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 6, epoch_begin, epoch_end, interface_id, l7_proto_id, l4_proto_id, port, vlan, profile)
sql = sql.." UNION ALL "
sql = sql..getAppTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 6, epoch_begin, epoch_end, interface_id, l7_proto_id, l4_proto_id, port, vlan, profile)
sql = sql..") talkers"
sql = sql.." group by addr "
-- ORDER
local order_by_column = "tot_bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "tot_packets"
elseif sort_column == "column_bytes_sent" or sort_column == "bytes_sent" then
order_by_column = "bytes_sent"
elseif sort_column == "column_bytes_rcvd" or sort_column == "bytes_rcvd" then
order_by_column = "bytes_rcvd"
elseif sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "tot_flows"
-- elseif sort_column == "column_avg_flow_duration" or sort_column == "avg_flow_duration" then
-- order_by_column = "avg_flow_duration"
end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local slice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function getTopApplications(interface_id, peer1, peer2, l7_proto_id, l4_proto_id, port, vlan, profile, info, epoch_begin, epoch_end, sort_column, sort_order, offset, limit)
-- tprint({n="getTopApplications", peer1=peer1, peer2=peer2})
-- if both peers are nil, top applications are overall in the time range
-- if peer1 is nil and peer2 is not nil, then top apps are for peer1
-- if peer2 is nil and peer1 is not nil, then top apps are for peer2
-- if both peer2 and peer2 are not nil, then top apps are computed between peer1 and peer2
-- sort_column and sort_order are used to sort the results
-- offset and limit are used to paginate the results
local version = 4
if peer1 and peer1 ~= "" and isIPv6(peer1) then version = 6
elseif peer2 and peer2 ~= "" and isIPv6(peer2) then version = 6 end
if(info == "") then info = nil end
sql = " SELECT L7_PROTO application, "
sql = sql.."sum(IN_BYTES + OUT_BYTES) as tot_bytes, sum(PACKETS) as tot_packets, count(*) as tot_flows "
-- sql = sql.." (sum(LAST_SWITCHED) - sum(FIRST_SWITCHED)) / count(*) as avg_flow_duration "
sql = sql.." FROM "..flowsTableName(version)
sql = sql.." WHERE FIRST_SWITCHED <= "..epoch_end.." and FIRST_SWITCHED >= "..epoch_begin
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='')"
sql = sql.." AND (INTERFACE_ID='"..tonumber(interface_id).."')"
if((port ~= nil) and (port ~= "")) then
sql = sql .." AND (L4_SRC_PORT="..port.." OR L4_DST_PORT="..port..")"
end
if((vlan ~= nil) and (vlan ~= "")) then
sql = sql .." AND VLAN_ID="..vlan
end
if((profile ~= nil) and (profile ~= "")) then
sql = sql .." AND PROFILE='"..profile.."'"
end
if l7_proto_id and l7_proto_id ~="" then sql = sql.." AND L7_PROTO = "..tonumber(l7_proto_id) end
if l4_proto_id and l4_proto_id ~="" then sql = sql.." AND PROTOCOL = "..tonumber(l4_proto_id) end
if(info ~= nil) then sql = sql .." AND (INFO LIKE '%"..info.."%')" end
if peer1 and peer1 ~= "" then
if(version == 4) then
sql = sql .." AND (IP_SRC_ADDR=INET_ATON('"..peer1.."') OR IP_DST_ADDR=INET_ATON('"..peer1.."')) "
else
sql = sql .." AND (IP_SRC_ADDR='"..peer1.."' OR IP_DST_ADDR='"..peer1.."') "
end
end
if peer2 and peer2 ~= "" then
if(version == 4) then
sql = sql .." AND (IP_SRC_ADDR=INET_ATON('"..peer2.."') OR IP_DST_ADDR=INET_ATON('"..peer2.."'))"
else
sql = sql .." AND (IP_SRC_ADDR='"..peer2.."' OR IP_DST_ADDR='"..peer2.."')"
end
end
sql = sql.." group by L7_PROTO "
-- ORDER
local order_by_column = "tot_bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "tot_packets"
end
if sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "tot_flows"
end
-- if sort_column == "column_avg_flow_duration" or sort_column == "avg_flow_duration" then
-- order_by_column = "avg_flow_duration"
--end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local slice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return {}
elseif res == nil then
return {}
else
return(res)
end
end
-- ########################################################
function db_utils.clickhouseDeleteOldPartitions(mysql_retention)
local day_aligned_retention = mysql_retention - (mysql_retention % 86400)
-- Create a string that identifies the PARTITIONs name of the most recent partition that will be deleted
local retention_yyyymmdd = os.date("%Y%m%d", day_aligned_retention)
-- Deletion is done directly on partitions (Clickhouse database has daily partitions)
-- Query the partitions that need to be deleted. Convert YYYYMMDD strings into integers so that
-- only relevant partitions can be queried and deleted
-- The last condition > 999999 prevents old partitions created as YYYMM to be deleted
local partitions_q = string.format("SELECT DISTINCT database, table, toUInt32(partition) drop_part FROM system.parts WHERE active AND database='%s' AND drop_part <= %u AND drop_part > 999999", ntop.getPrefs().mysql_dbname or 'ntopng', retention_yyyymmdd)
local partitions_res = interface.execSQLQuery(partitions_q)
if(partitions_res ~= nil) then
-- Iterate queried partitions and delete them (nil is returned if there is nothing to delete)
for _, partition_info in ipairs(partitions_res) do
local delete_partition_q = string.format("ALTER TABLE %s.%s DROP PARTITION '%s'",
partition_info["database"], partition_info["table"], partition_info["drop_part"])
local delete_partition_res = interface.execSQLQuery(delete_partition_q)
end
end
end
-- ########################################################
local function _harvest_expired_mysql_flows(ifname, mysql_retention, verbose)
local dbtables = {"flowsv4", "flowsv6"}
if tonumber(ifname) == nil then
ifname = getInterfaceId(ifname)
end
for _, tb in pairs(dbtables) do
local sql = "DELETE FROM "..tb.." where FIRST_SWITCHED < "..mysql_retention
sql = sql.." AND (INTERFACE_ID = "..ifname..")"
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."' OR NTOPNG_INSTANCE_NAME IS NULL OR NTOPNG_INSTANCE_NAME='')"
interface.execSQLQuery(sql)
if(verbose) then io.write(sql.."\n") end
end
end
-- ########################################################
function db_utils.harverstExpiredMySQLFlows(ifname, mysql_retention, verbose)
if not ntop.isClickHouseEnabled() then
return _harvest_expired_mysql_flows(ifname, mysql_retention, verbose)
end
end
return db_utils
|