Skip to content

Commit c5e92c4

Browse files
authored
Update tcp_stats_bpf_test to work with qemu (#1308)
Summary: Update `tcp_stats_bpf_test` to work with qemu Relevant Issues: N/A Type of change: /kind bug Test Plan: Verified that the test ran successfully for our various test runners - [x] Running test multiple times within docker to verify that adding the `nc` server doesn't cause flakiness ([P376](https://phab.corp.pixielabs.ai/P376)). Also verified that each test is in its own network namespace so that the servers can't conflict with each other ([P378](https://phab.corp.pixielabs.ai/P378)) - [x] Running test within qemu passes ([P377](https://phab.corp.pixielabs.ai/P377)) - [x] Verified that when a test times out that the test prints the accumulated records found (otherwise it's difficult to see why the assertion failed) - [P379](https://phab.corp.pixielabs.ai/P379) Signed-off-by: Dom Del Nano <ddelnano@pixielabs.ai>
1 parent 0f657dd commit c5e92c4

3 files changed

Lines changed: 116 additions & 14 deletions

File tree

src/stirling/source_connectors/tcp_stats/tcp_stats_bpf_test.cc

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
#include "src/common/base/base.h"
2525
#include "src/common/exec/exec.h"
26+
#include "src/common/exec/subprocess.h"
2627
#include "src/common/testing/testing.h"
28+
#include "src/shared/types/column_wrapper.h"
2729
#include "src/shared/types/types.h"
2830
#include "src/stirling/core/data_table.h"
2931
#include "src/stirling/source_connectors/tcp_stats/tcp_stats_connector.h"
@@ -36,6 +38,16 @@ namespace stirling {
3638

3739
using ::px::stirling::testing::RecordBatchSizeIs;
3840
using ::px::stirling::testing::TcpTraceBPFTestFixture;
41+
using ::px::stirling::testing::WaitAndExpectRecords;
42+
using ::px::types::Int64Value;
43+
using ::px::types::StringValue;
44+
using tcp_stats::kTCPBytesReceivedIdx;
45+
using tcp_stats::kTCPBytesSentIdx;
46+
using tcp_stats::kTCPLocalAddrIdx;
47+
using tcp_stats::kTCPLocalPortIdx;
48+
using tcp_stats::kTCPRemoteAddrIdx;
49+
using tcp_stats::kTCPRemotePortIdx;
50+
using tcp_stats::kTCPRetransmitsIdx;
3951
using ::testing::ContainsRegex;
4052
using ::testing::HasSubstr;
4153

@@ -45,28 +57,68 @@ class TcpTraceTest : public TcpTraceBPFTestFixture {};
4557
// Test Scenarios
4658
//-----------------------------------------------------------------------------
4759

60+
std::vector<TcpStatsRecord> ToTcpStatsRecordVector(
61+
const std::vector<TaggedRecordBatch>& record_batches) {
62+
std::vector<TcpStatsRecord> result;
63+
64+
for (size_t i = 0; i < record_batches.size(); i++) {
65+
auto record_batch = record_batches[i].records;
66+
for (const types::SharedColumnWrapper& column_wrapper : record_batch) {
67+
for (size_t idx = 0; idx < column_wrapper->Size(); ++idx) {
68+
TcpStatsRecord r;
69+
r.local_addr = record_batch[kTCPLocalAddrIdx]->Get<StringValue>(idx);
70+
r.local_port = record_batch[kTCPLocalPortIdx]->Get<Int64Value>(idx).val;
71+
r.remote_addr = record_batch[kTCPRemoteAddrIdx]->Get<StringValue>(idx);
72+
r.remote_port = record_batch[kTCPRemotePortIdx]->Get<Int64Value>(idx).val;
73+
r.tx = record_batch[kTCPBytesSentIdx]->Get<Int64Value>(idx).val;
74+
r.rx = record_batch[kTCPBytesReceivedIdx]->Get<Int64Value>(idx).val;
75+
r.retransmits = record_batch[kTCPRetransmitsIdx]->Get<Int64Value>(idx).val;
76+
result.push_back(r);
77+
}
78+
}
79+
}
80+
81+
return result;
82+
}
83+
4884
TEST_F(TcpTraceTest, Capture) {
85+
SubProcess server_proc;
86+
std::thread server_thread([&] {
87+
std::vector<std::string> args = {
88+
"nc", "-l", "-p", "12345", "-s", "127.0.0.1", "-v",
89+
};
90+
ASSERT_OK(server_proc.Start(args, /* stderr_to_stdout */ true));
91+
});
92+
server_thread.detach();
93+
4994
StartTransferDataThread();
5095

5196
// Send "hello" to 127.0.0.1, total 6 bytes of data
52-
std::string cmd1 = "echo \"hello\" | nc 127.0.0.1 22";
97+
std::string cmd1 = "echo \"hello\" | nc -w1 127.0.0.1 12345 -v";
5398
ASSERT_OK(px::Exec(cmd1));
5499

55100
StopTransferDataThread();
56-
auto records = testing::ExtractToString(tcp_stats::kTCPStatsTable, tcp_stats_table_);
57-
58-
EXPECT_THAT(records, HasSubstr("remote_addr:[127.0.0.1] remote_port:[22] tx:[6]"));
59-
60-
StartTransferDataThread();
61-
62-
// Send "This is sample test" to 127.0.0.1, total 20 bytes of data
63-
std::string cmd2 = "echo \"This is sample test\" | nc 127.0.0.1 22";
64-
ASSERT_OK(px::Exec(cmd2));
65-
66-
StopTransferDataThread();
67-
auto records2 = testing::ExtractToString(tcp_stats::kTCPStatsTable, tcp_stats_table_);
68-
EXPECT_THAT(records2, HasSubstr("remote_addr:[127.0.0.1] remote_port:[22] tx:[20]"));
69101

102+
std::vector<TcpStatsRecord> expected = {
103+
{
104+
.remote_addr = "127.0.0.1",
105+
.remote_port = 12345,
106+
.tx = 6,
107+
.rx = 0,
108+
.retransmits = 0,
109+
},
110+
};
111+
112+
std::vector<TaggedRecordBatch> tablets = ConsumeRecords(TCPStatsConnector::kTCPStatsTableNum);
113+
testing::Timeout t(std::chrono::minutes{1});
114+
auto records = ToTcpStatsRecordVector(tablets);
115+
while (!testing::RecordsContains(records, expected) && !t.TimedOut()) {
116+
auto new_records = ToTcpStatsRecordVector(ConsumeRecords(TCPStatsConnector::kTCPStatsTableNum));
117+
records.insert(records.end(), new_records.begin(), new_records.end());
118+
std::this_thread::sleep_for(std::chrono::milliseconds{200});
119+
}
120+
121+
EXPECT_THAT(records, IsSupersetOf(expected));
70122
// TODO(RagalahariP): Explore options for testing retransmissions in a unit test case,
71123
// as retransmissions are blocking calls without known timeout value.
72124
}

src/stirling/testing/overloads.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <magic_enum.hpp>
2323

2424
#include "src/stirling/utils/monitor.h"
25+
#include "src/stirling/utils/tcp_stats.h"
2526

2627
namespace px {
2728
namespace stirling {
@@ -52,5 +53,17 @@ inline void PrintTo(const ProbeStatusRecord& r, std::ostream* os) {
5253
r.info);
5354
}
5455

56+
inline bool operator==(const TcpStatsRecord& a, const TcpStatsRecord& b) {
57+
return (a.remote_port == b.remote_port) && (a.remote_addr == b.remote_addr) && (a.tx == b.tx) &&
58+
(a.rx == b.rx) && (a.retransmits == b.retransmits);
59+
}
60+
61+
inline void PrintTo(const TcpStatsRecord& r, std::ostream* os) {
62+
*os << absl::Substitute(
63+
"TcpStatsRecord{remote_port: $0, remote_addr: $1, tx: $2, rx: "
64+
"$3, retransmits: $4, ",
65+
r.remote_port, r.remote_addr, r.tx, r.rx, r.retransmits);
66+
}
67+
5568
} // namespace stirling
5669
} // namespace px

src/stirling/utils/tcp_stats.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2018- The Pixie Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
*/
18+
19+
#pragma once
20+
21+
#include <string>
22+
23+
namespace px {
24+
namespace stirling {
25+
26+
struct TcpStatsRecord {
27+
std::string local_addr;
28+
int local_port;
29+
std::string remote_addr;
30+
int remote_port;
31+
int tx;
32+
int rx;
33+
int retransmits;
34+
};
35+
36+
} // namespace stirling
37+
} // namespace px

0 commit comments

Comments
 (0)