Skip to content

Commit ce5b6bb

Browse files
Phillip Kuznetsovcopybaranaut
authored andcommitted
Add connection_throughput_stats to pxviews
Summary: Add function that computes the throughput of all network connections inside each pod of the cluster. Test Plan: Replace equivalent code in px/cluster and verify that it runs. Reviewers: jamesbartlett, vihang, michelle Reviewed By: vihang Signed-off-by: Phillip Kuznetsov <pkuznetsov@pixielabs.ai> Differential Revision: https://phab.corp.pixielabs.ai/D11961 GitOrigin-RevId: 9461163
1 parent b630db1 commit ce5b6bb

1 file changed

Lines changed: 59 additions & 0 deletions

File tree

src/carnot/planner/pxl_lib/pxviews.pxl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,4 +479,63 @@ def stacktraces(start_time, end_time):
479479
'node_num_cpus',
480480
]]
481481

482+
483+
def connection_throughput_stats(start_time, end_time):
484+
''' Compute the throughput of all network connections inside each pod ofthe cluster.
485+
486+
@start_time Starting time of the data to examine.
487+
@end_time Ending time of the data to examine.
488+
'''
489+
df = px.DataFrame(table='conn_stats', start_time=start_time, end_time=end_time)
490+
491+
df.pod_id = df.ctx['pod_id']
492+
493+
# Find min/max bytes transferred over the selected time window per pod.
494+
# Each connection is identified by the tuple: (upid, remote_addr, and remote_port, ssl, protocol).
495+
df = df.groupby(['upid', 'pod_id', 'trace_role', 'remote_addr', 'remote_port', 'ssl', 'protocol']).agg(
496+
bytes_recv_min=('bytes_recv', px.min),
497+
bytes_recv_max=('bytes_recv', px.max),
498+
bytes_sent_min=('bytes_sent', px.min),
499+
bytes_sent_max=('bytes_sent', px.max),
500+
time_max=('time_', px.max),
501+
time_min=('time_', px.min),
502+
)
503+
# Calculate bytes transferred over the time window
504+
df.bytes_sent = df.bytes_sent_max - df.bytes_sent_min
505+
df.bytes_recv = df.bytes_recv_max - df.bytes_recv_min
506+
507+
df = df.groupby(['pod_id', 'trace_role', 'remote_addr']).agg(
508+
bytes_recv=('bytes_recv', px.sum),
509+
bytes_sent=('bytes_sent', px.sum),
510+
time_max=('time_max', px.max),
511+
)
512+
513+
# Get RX/TX stats for the server side connections.
514+
server_df = df[df.trace_role == 2]
515+
server_df.rx_server = server_df.bytes_recv
516+
server_df.tx_server = server_df.bytes_sent
517+
server_df = server_df[['time_max', 'pod_id', 'rx_server', 'tx_server', 'remote_addr']]
518+
519+
# Get RX/TX stats for the client side connections.
520+
client_df = df[df.trace_role == 1]
521+
client_df.rx_client = client_df.bytes_recv
522+
client_df.tx_client = client_df.bytes_sent
523+
client_df = client_df[['time_max', 'pod_id', 'rx_client', 'tx_client', 'remote_addr']]
524+
525+
# Create a dataframe that contains both server-side and client-side RX/TX stats.
526+
df = server_df.merge(client_df,
527+
how='outer',
528+
left_on=['pod_id', 'remote_addr'],
529+
right_on=['pod_id', 'remote_addr'],
530+
suffixes=['', '_x'])
531+
532+
df.time_ = px.select(px.greaterThanEqual(df.time_max, df.time_max_x), df.time_max, df.time_max_x)
533+
df.pod_id = px.select(df.pod_id != '', df.pod_id, df.pod_id_x)
534+
df.remote_addr = px.select(df.remote_addr != '', df.remote_addr, df.remote_addr_x)
535+
536+
df.inbound_conn_throughput = df.rx_server + df.tx_server
537+
df.outbound_conn_throughput = df.tx_client + df.rx_client
538+
539+
return df[['time_', 'remote_addr', 'pod_id', 'inbound_conn_throughput', 'outbound_conn_throughput']]
540+
482541
)"

0 commit comments

Comments
 (0)