Skip to content

Commit

Permalink
Merge pull request #247 from Lyt99/feature/flow
Browse files Browse the repository at this point in the history
feat(flow): enhance flow ui
  • Loading branch information
BSWANG authored Apr 11, 2024
2 parents 3fcdda3 + 37b8a31 commit 9adcd2e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 43 deletions.
10 changes: 6 additions & 4 deletions pkg/controller/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (s *Server) GetFlowGraph(ctx *gin.Context) {

r := int(ts.Sub(fs).Seconds())

result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds])", r), ts)
result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
Expand All @@ -272,28 +272,30 @@ func (s *Server) GetFlowGraph(ctx *gin.Context) {
}
g.SetEdgeBytesFromVector(vector)

result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds])", r), ts)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.SetEdgePacketsFromVector(vector)

result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds])", r), ts)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeDroppedFromVector(vector)

result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds])", r), ts)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeRetransFromVector(vector)

jstr, err := g.ToJSON()
Expand Down
27 changes: 18 additions & 9 deletions pkg/controller/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@ func FromVector(m model.Vector) (*FlowGraph, error) {
g := NewFlowGraph()
for _, v := range m {
g.AddNodesFromSample(v)
g.AddEdge(createEdge(v))
}
return g, nil
}

func (g *FlowGraph) AddNodesFromVector(v model.Vector) {
for _, s := range v {
g.AddNodesFromSample(s)
}
}

func (g *FlowGraph) AddNodesFromSample(v *model.Sample) {
ip := string(v.Metric["src"])
t := string(v.Metric["src_type"])
Expand Down Expand Up @@ -130,36 +135,40 @@ func (g *FlowGraph) AddEdge(e Edge) {
func (g *FlowGraph) SetEdgeBytesFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Bytes = int(v.Value)
if _, ok := g.Edges[id]; !ok {
g.AddEdge(createEdge(v))
}
g.Edges[id].Bytes = int(v.Value)
}
}

func (g *FlowGraph) SetEdgePacketsFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Packets = int(v.Value)
if _, ok := g.Edges[id]; !ok {
g.AddEdge(createEdge(v))
}
g.Edges[id].Packets = int(v.Value)
}
}

func (g *FlowGraph) SetEdgeDroppedFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Dropped = int(v.Value)
if _, ok := g.Edges[id]; !ok {
g.AddEdge(createEdge(v))
}
g.Edges[id].Dropped = int(v.Value)
}
}

func (g *FlowGraph) SetEdgeRetransFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Retrans = int(v.Value)
if _, ok := g.Edges[id]; !ok {
g.AddEdge(createEdge(v))
}
g.Edges[id].Retrans = int(v.Value)
}
}

Expand Down
1 change: 1 addition & 0 deletions webui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"build-plugin-ice-i18n": "^0.2.2",
"d3": "^7.8.5",
"d3-force": "^3.0.0",
"ipaddr.js": "^2.1.0",
"moment": "^2.28.0",
"react": "^18.2.0",
"react-d3-force-layout": "^1.0.1",
Expand Down
62 changes: 47 additions & 15 deletions webui/src/pages/monitoring/flow/components/FlowGraph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { forceManyBody, forceLink } from 'd3-force';
import styles from './index.module.css'
import { clamp } from "@/utils";
import exp from "constants";
import ipaddr from "ipaddr.js";

interface Node {
id: string
Expand Down Expand Up @@ -36,6 +37,21 @@ interface GroupInfo {
position: [number, number]
}

const subnets = {
'Intranet': [
ipaddr.parseCIDR('10.0.0.0/8'),
ipaddr.parseCIDR('172.16.0.0/12'),
ipaddr.parseCIDR('192.168.0.0/16'),
ipaddr.parseCIDR('fd80::/8')
],
'Shared Address Space': [
ipaddr.parseCIDR('100.64.0.0/10')
],
'Link Local Address': [
ipaddr.parseCIDR('169.254.0.0/16')
]
}

const parsePodName = (n: any): {
group: string
groupType: string
Expand All @@ -45,13 +61,13 @@ const parsePodName = (n: any): {
if (/^.+-([a-z0-9]{5,10})-[a-z0-9]{5,10}$/.test(n.name)) {
return {
group: `${n.namespace}/${parts.slice(0, parts.length - 2).join('-')}`,
groupType: 'deployment',
groupType: 'Deployment',
}
}
if (/^.+-[a-z0-9]{5,10}$/.test(n.name)) {
return {
group: `${n.namespace}/${parts.slice(0, parts.length - 1).join('-')}`,
groupType: 'daemonset',
groupType: 'DaemonSet',
}
}

Expand All @@ -61,6 +77,17 @@ const parsePodName = (n: any): {
}
}

const toExternalGroupData = (n: any): any => {
const ip = ipaddr.parse(n.id);
const group = ipaddr.subnetMatch(ip, subnets, 'Internet')

return {
group: group,
type: 'Endpoint',
groupType: 'Endpoint',
}
}

const getGroupData = (n: any): any => {
switch (n.type) {
case 'pod':
Expand All @@ -74,13 +101,12 @@ const getGroupData = (n: any): any => {
}
case 'external':
return {
group: 'External',
groupType: 'endpoint',
...toExternalGroupData(n)
}
default:
return {
group: 'Unknown',
groupType: 'endpoint',
groupType: 'Endpoint',
}
}
}
Expand Down Expand Up @@ -147,7 +173,7 @@ const toGroupedGraphData = (data: any, expandedGroups: GroupInfo[]): GraphData =
type: 'group',
groupType: withGroup.find(n => n.group === g).groupType,
links: [],
// nodes: withGroup.filter(n => n.group === g),
nodes: withGroup.filter(n => n.group === g),
}
});

Expand Down Expand Up @@ -249,7 +275,7 @@ const drawNode = (node, ctx: CanvasRenderingContext2D, globalScale, highlight, h
}

const drawLink = (link, ctx, globalScale) => {
const SPEED = link.edges.length * 0.2;
const SPEED = link.edges.reduce((a, b) => a + b.packets, 0);
const PARTICLE_SIZE = 2 / globalScale;
const start = link.source;
const end = link.target;
Expand All @@ -259,7 +285,7 @@ const drawLink = (link, ctx, globalScale) => {
diff.x /= length;
diff.y /= length;

const mod = clamp(1000 * 1 / SPEED, 500, 5000);
const mod = clamp(5000 * SPEED, 500, 5000);

// add random offsets to particles
if (!link.offset) {
Expand Down Expand Up @@ -293,17 +319,23 @@ const drawLink = (link, ctx, globalScale) => {
const nodeLabel = (n) => {
const label = [
`Name: ${n.name}`,
`Type: ${n.type === 'group' ? n.groupType : n.type}`
`Type: ${n.type === 'group' ? n.groupType : n.type}`,
]

if (n.type === 'group') {
label.push(`Endpoint count: ${n.nodes.length}`)
}

label.push(`(Click to ${n.type === 'group' ? "expand" : "collapse"} nodes)`)
return label.join("</br>")
}

const linkLabel = (l) => {
const label = [
`Send Packets: ${l.edges.reduce((a, b) => a + b.packets, 0)}`,
`Send Byte(s): ${l.edges.reduce((a, b) => a + b.bytes, 0)}`,
`Dropped Packet(s): ${l.edges.reduce((a, b) => a + b.dropped, 0)}`,
`Retransmitted Packet(s): ${l.edges.reduce((a, b) => a + b.retrans, 0)}`
`Packet(s) Sent: ${l.edges.reduce((a, b) => a + b.packets, 0)}`,
`Byte(s) Sent: ${l.edges.reduce((a, b) => a + b.bytes, 0)}`,
`Packet(s) Dropped: ${l.edges.reduce((a, b) => a + b.dropped, 0)}`,
`Packet(s) Retransmitted: ${l.edges.reduce((a, b) => a + b.retrans, 0)}`
]
return label.join("</br>")
}
Expand All @@ -319,7 +351,7 @@ const FlowGraphD3: React.FC<FlowGraphProps> = (props: FlowGraphProps): JSX.Eleme

fg.d3Force('charge', forceManyBody()
.strength(node => {
return node.type === 'virtual' ? 0 : -15;
return node.type === 'virtual' ? 0 : -30;
}));

fg.d3Force('link', forceLink().id(d => d.id)
Expand Down Expand Up @@ -406,7 +438,7 @@ const FlowGraphD3: React.FC<FlowGraphProps> = (props: FlowGraphProps): JSX.Eleme
onNodeClick={(n) => setExpanded(n.type === 'group' ? [{ name: n.group, position: [n.x, n.y] }, ...expanded] : expanded.filter(g => g.name !== n.group))}
nodeLabel={nodeLabel}
linkLabel={linkLabel}
onEngineStop={() => {fgRef.current.zoomToFit(1000)}}
onEngineStop={() => { fgRef.current.zoomToFit(1000) }}
cooldownTime={3000}
nodeAutoColorBy={'group'}
>
Expand Down
9 changes: 5 additions & 4 deletions webui/src/pages/monitoring/flow/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const filterFlowData = (data: FlowData, namespaces: string[], nodes: string[], s
nodeSet.add(i.id);
});

const filteredEdge = data.edges.filter((edge: any) => {
return nodeSet.has(edge.src) && nodeSet.has(edge.dst)
let filteredEdge = data.edges.filter((edge: any) => {
return edge.src !== edge.dst && nodeSet.has(edge.src) && nodeSet.has(edge.dst)
});

if (!showSeparate) {
Expand All @@ -43,6 +43,7 @@ const filterFlowData = (data: FlowData, namespaces: string[], nodes: string[], s
filteredNode = filteredNode.filter(n => {
return s.has(n.id)
})
console.log(filteredNode)
}

filteredNode.sort((a, b) => {
Expand All @@ -63,7 +64,7 @@ export default function FlowDashboard() {
const [data, setData] = useState({ nodes: [], edges: [] });
const [selectedNamespaces, setSelectedNamespaces] = useState<string[]>([]);
const [loading, setLoading] = useState(false);
const [time, setTime] = useState<Dayjs[] | null>(null);
const [time, setTime] = useState<Dayjs[]>([dayjs().subtract(15, 'minute'), dayjs()]);

const getFlowData = () => {
const [from, to] = time || [dayjs().subtract(15, 'minute'), dayjs()];
Expand Down Expand Up @@ -108,7 +109,7 @@ export default function FlowDashboard() {
<Card.Content style={{ paddingLeft: 0 }}>
<Box direction="row" className={styles.contentBox}>
<span className={styles.optionLabel}>Time Range</span>
<DatePicker2.RangePicker placeholder={['Start Time', 'End Time']} showTime onChange={v => setTime(v)} />
<DatePicker2.RangePicker placeholder={['Start Time', 'End Time']} showTime value={time} onChange={v => setTime(v)} />
</Box>
<Box className={styles.contentBox} direction='row'>
<span className={styles.optionLabel}>Namespaces</span>
Expand Down
13 changes: 2 additions & 11 deletions webui/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3035,7 +3035,7 @@ d3-fetch@3:
d3-quadtree "1 - 3"
d3-timer "1 - 3"

d3-force@3:
d3-force@3, d3-force@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/d3-force/-/d3-force-3.0.0.tgz#3e2ba1a61e70888fe3d9194e30d6d14eece155c4"
integrity sha512-zxV/SsA+U4yte8051P4ECydjD/S+qeYtnaIyAs9tgHCqfguma/aAQDjo85A9Z6EKhBirHRJHXIgJUlffT4wdLg==
Expand Down Expand Up @@ -3063,15 +3063,6 @@ d3-force@^2.0.1, d3-force@^2.1.1:
d3-quadtree "1 - 2"
d3-timer "1 - 2"

d3-force@^3.0.0:
version "3.0.0"
resolved "https://registry.npmmirror.com/d3-force/-/d3-force-3.0.0.tgz#3e2ba1a61e70888fe3d9194e30d6d14eece155c4"
integrity sha512-zxV/SsA+U4yte8051P4ECydjD/S+qeYtnaIyAs9tgHCqfguma/aAQDjo85A9Z6EKhBirHRJHXIgJUlffT4wdLg==
dependencies:
d3-dispatch "1 - 3"
d3-quadtree "1 - 3"
d3-timer "1 - 3"

"d3-format@1 - 3", d3-format@3:
version "3.1.0"
resolved "https://registry.yarnpkg.com/d3-format/-/d3-format-3.1.0.tgz#9260e23a28ea5cb109e93b21a06e24e2ebd55641"
Expand Down Expand Up @@ -4850,7 +4841,7 @@ ipaddr.js@1.9.1:
resolved "https://registry.npmmirror.com/ipaddr.js/-/ipaddr.js-1.9.1.tgz#bff38543eeb8984825079ff3a2a8e6cbd46781b3"
integrity sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==

ipaddr.js@^2.0.1:
ipaddr.js@^2.0.1, ipaddr.js@^2.1.0:
version "2.1.0"
resolved "https://registry.npmmirror.com/ipaddr.js/-/ipaddr.js-2.1.0.tgz#2119bc447ff8c257753b196fc5f1ce08a4cdf39f"
integrity sha512-LlbxQ7xKzfBusov6UMi4MFpEg0m+mAm9xyNGEduwXMEDuf4WfzB/RZwMVYEd7IKGvh4IUkEXYxtAVu9T3OelJQ==
Expand Down

0 comments on commit 9adcd2e

Please sign in to comment.