Socket 套接字编程-TCP
接下来,我将实现一个基于 TCP 的 Socket 套接字编程示例。这个示例将包括一个简单的服务器和客户端,服务器将监听特定端口,客户端将连接到该端口并发送消息。
基础通信框架设计
tcp.hpp
这是一个简单的 TCP 网络编程工具头文件,封装了基本的 TCP socket 操作,包括创建 socket 、绑定端口、监听连接和建立连接等功能。代码使用了 C++ 风格封装,同时底层调用 Linux 系统socket API。
文件结构:
- 头文件保护宏:防止重复包含
- 必要的系统头文件引入
- 错误码枚举定义
- 线程本地存储的地址缓冲区
tcp命名空间包含所有功能函数
详细注释:
#ifndef _TCP_HPP_
#define _TCP_HPP_ 1
// 引入必要的头文件
#include <iostream>
#include <sys/socket.h> // socket相关系统调用
#include <netinet/in.h> // 互联网地址族
#include <arpa/inet.h> // 地址转换函数
#include <cstring> // bzero等字符串操作
#include "log.hpp" // 自定义日志头文件
// 错误码枚举定义
enum
{
socket_error = 1, // socket创建失败
bind_error, // 绑定端口失败
listen_error, // 监听失败
connect_error, // 连接失败
};
// 线程本地存储的地址缓冲区,用于地址转换
inline thread_local char addr_buffer[1024];
// TCP相关功能封装命名空间
namespace tcp
{
// 创建TCP socket
int Socket()
{
// 创建IPv4的TCP socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
perror("socket"); // 打印错误信息
exit(socket_error); // 退出程序并返回错误码
}
return sockfd; // 返回socket文件描述符
}
// 绑定socket到指定端口
void Bind(int sockfd, int port)
{
struct sockaddr_in local; // 定义IPv4地址结构
bzero(&local, sizeof(local)); // 清空结构体
local.sin_family = AF_INET; // 设置地址族为IPv4
local.sin_addr.s_addr = INADDR_ANY; // 监听所有网络接口
local.sin_port = htons(port); // 将端口号转换为网络字节序
// 绑定socket到地址
int n = bind(sockfd, (struct sockaddr *)(&local), sizeof(local));
if (n != 0)
{
perror("bind");
exit(bind_error);
}
}
// 从sockaddr_in结构体提取IP地址和端口号
void GetAddrAndPort(struct sockaddr_in &addr_in, std::string &addr, uint16_t &port)
{
port = ntohs(addr_in.sin_port); // 网络字节序转主机字节序
// 将IP地址从二进制转换为点分十进制字符串
inet_ntop(AF_INET, &addr_in.sin_addr, addr_buffer, sizeof(addr_buffer) - 1);
addr = addr_buffer; // 将结果存入输出参数
}
// 开始监听socket连接
void Listen(int listen_sock, int backlog = 10)
{
// 设置监听队列长度为backlog(默认10)
int n = listen(listen_sock, backlog);
if (n == -1)
{
perror("listen");
exit(listen_error);
}
}
// 连接到指定服务器
void Connect(int sockfd, sockaddr_in &server)
{
// 尝试建立TCP连接
int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server));
if(n == -1)
{
perror("connect");
exit(connect_error);
}
}
};
#endif
功能说明:
- Socket() - 创建
TCP socket文件描述符 - Bind() - 将
socket绑定到指定端口 - Listen() - 开始监听连接请求
- Connect() - 作为客户端连接到服务器
- GetAddrAndPort() - 辅助函数,用于从地址结构中提取可读的IP和端口
tcp_server.hpp
这是一个简单的TCP服务器实现,基于之前封装的tcp.hpp基础功能。它提供了完整的服务器工作流程,包括初始化、监听和客户端请求处理。
#ifndef _TCP_SERVER_HPP_
#define _TCP_SERVER_HPP_ 1
#include "tcp.hpp" // 引入之前封装的TCP基础功能
#include "log.hpp" // 引入日志功能
const u_int16_t default_port = 8888; // 默认监听端口
// TCP服务器类封装
class TcpServer
{
public:
// 构造函数,可指定端口(默认使用default_port)
TcpServer(u_int16_t port = default_port)
: port_(port)
{
}
// 初始化服务器
void Init()
{
listen_sockfd_ = tcp::Socket(); // 创建监听socket
lg(Info, "listening sock create success, sockfd: %d", listen_sockfd_);
tcp::Bind(listen_sockfd_, port_); // 绑定端口
lg(Info, "listening sock bind success");
tcp::Listen(listen_sockfd_); // 开始监听
}
// 启动服务器主循环
void Start()
{
while (true) // 无限循环接受客户端连接
{
struct sockaddr_in client; // 客户端地址结构
socklen_t len = sizeof(client);
// 接受客户端连接
int client_sockfd = accept(listen_sockfd_, (sockaddr *)&client, &len);
if (client_sockfd == -1)
{
lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
}
// 获取客户端地址和端口信息
std::string client_addr;
u_int16_t client_port;
tcp::GetAddrAndPort(client, client_addr, client_port);
lg(Info, "accept a new client [%s: %d]", client_addr.c_str(), client_port);
// 处理客户端请求
char buffer[1024];
Server(client_sockfd, buffer, sizeof(buffer), client_addr, client_port);
}
}
// 处理客户端连接的函数
void Server(const int &client_sockfd, char *buffer, size_t len,
const std::string &client_addr = "*", const u_int16_t client_port = -1)
{
// 判断是否需要打印客户端信息
bool print_inf = !(client_addr == "*" || client_port == -1);
while (true) // 与客户端的通信循环
{
// 接收客户端数据
ssize_t n = recv(client_sockfd, buffer, len - 1, 0);
if (n == -1) // 接收错误
{
if (print_inf)
lg(Error, "accept error, [%d]: %s", errno, strerror(errno));
}
else if (n == 0) // 客户端断开连接
{
if (print_inf)
lg(Info, "client [%s: %d] quit", client_addr.c_str(), client_port);
close(client_sockfd);
break;
}
else // 正常接收数据
{
buffer[n] = 0; // 添加字符串结束符
if (print_inf)
lg(Info, "server get a message from client [%s: %d]: %s",
client_addr.c_str(), client_port, buffer);
// 回显数据给客户端
ssize_t m = send(client_sockfd, buffer, n, 0);
if (m == -1)
{
lg(Error, "send error, [%d]: %s", errno, strerror(errno));
}
}
}
}
private:
u_int16_t port_; // 服务器监听端口
int listen_sockfd_; // 监听socket文件描述符
};
#endif
主要组件:
TcpServer类:封装了TCP服务器的核心功能- 构造函数:可以指定监听端口,默认使用
8888 Init()方法:初始化服务器,创建socket、绑定端口并开始监听Start()方法:启动服务器主循环,接受客户端连接Server()方法:处理单个客户端连接
- 构造函数:可以指定监听端口,默认使用
成员变量:
port_:服务器监听端口listen_sockfd_:监听socket的文件描述符
工作流程:
- 创建
TcpServer实例时指定端口(可选) - 调用
Init()初始化服务器 - 调用
Start()启动服务器主循环- 使用
accept()接受客户端连接 - 为每个客户端创建连接并调用
Server()处理
- 使用
- 在
Server()方法中:- 使用
recv()接收客户端数据 - 使用
send()回显数据给客户端 - 检测连接断开情况并关闭
socket
- 使用
功能特点:
- 使用
lg宏记录日志信息 - 支持显示客户端地址和端口
- 简单的回显服务器功能
- 基本的错误处理和日志记录
功能测试
下面这两段代码利用上面的头文件分别实现了客户端和服务端:
客户端代码:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include "tcp.hpp" // 引入自定义的TCP封装库
using namespace std;
int main(int argc, char *argv[])
{
// 检查命令行参数是否正确(需要服务器IP和端口两个参数)
if (argc != 3)
{
cerr << "Usage: " << argv[0] << " server_ip server_port" << endl;
return 1;
}
// 从命令行参数获取服务器IP和端口
string server_ip = argv[1];
uint16_t server_port = stoi(argv[2]); // 将端口字符串转换为整数
// 创建客户端socket
int sockfd = tcp::Socket();
// 初始化服务器地址结构
struct sockaddr_in server;
bzero(&server, sizeof(server)); // 清空结构体
server.sin_port = htons(server_port); // 设置端口(转换为网络字节序)
server.sin_family = AF_INET; // 使用IPv4协议
server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // 设置服务器IP地址
// 连接到服务器
tcp::Connect(sockfd, server);
// 主循环:与服务器交互
while (true)
{
string inbuffer;
cout << "Please Enter# "; // 提示用户输入
getline(cin, inbuffer); // 获取用户输入
// 发送数据到服务器
ssize_t n = send(sockfd, inbuffer.c_str(), inbuffer.size(), 0);
if (n > 0) // 发送成功
{
char buffer[1024]; // 接收缓冲区
// 接收服务器回应(这里定义但不使用temp,因为TCP不需要关心发送方信息)
struct sockaddr_in temp;
socklen_t len = sizeof(temp);
// 接收服务器返回的数据
ssize_t m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (m > 0) // 成功接收到数据
{
buffer[m] = 0; // 添加字符串结束符
cout << "server echo# " << buffer << endl; // 打印服务器回应
}
else if (m == 0) // 服务器关闭连接
{
printf("server [%s: %d] quit", server_ip.c_str(), server_port);
break; // 退出循环
}
else // 接收出错
{
perror("recv");
}
}
else // 发送失败
{
perror("send");
}
}
// 关闭socket
close(sockfd);
return 0;
}
服务端代码:
#include <iostream>
#include <memory> // 用于智能指针
#include "tcp_server.hpp" // 引入自定义的TCP服务器类
using namespace std;
int main(int argc, char *argv[])
{
int port; // 声明端口变量
// 处理命令行参数
if(argc == 1) // 如果没有参数
{
port = 8080; // 使用默认端口8080
}
else if(argc == 2) // 如果有一个参数
{
port = stoi(argv[1]); // 将参数转换为整数作为端口
}
else // 如果参数过多
{
// 显示用法说明
cerr << "Usage: " << argv[0] << "(default port: 8080)" << endl
<< "OR" << endl
<< argv[0] << " port" << endl;
return 1; // 非正常退出
}
// 使用unique_ptr智能指针创建TcpServer实例
// 这样可以自动管理内存,避免内存泄漏
unique_ptr<TcpServer> us(new TcpServer(port));
// 初始化服务器
us->Init();
// 启动服务器主循环
us->Start();
return 0; // 程序正常退出
}
经测试,客户端与服务端可以正常通信:
# 客户端
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./tcp_client.o 127.0.0.1 7777
Please Enter# hello!
server echo# hello!
Please Enter# I am a human
server echo# I am a human
Please Enter# bye!
server echo# bye!
Please Enter# ^C
# 服务端
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./main.o 7777
[Info][2025-7-29 22:11:38] listening sock create success, sockfd: 3
[Info][2025-7-29 22:11:38] listening sock bind success
[Info][2025-7-29 22:11:41] accept a new client [127.0.0.1: 59872]
[Info][2025-7-29 22:11:46] server get a message from client [127.0.0.1: 59872]: hello!
[Info][2025-7-29 22:11:54] server get a message from client [127.0.0.1: 59872]: I am a human
[Info][2025-7-29 22:12:5] server get a message from client [127.0.0.1: 59872]: bye!
[Info][2025-7-29 22:12:8] client [127.0.0.1: 59872] quit
利用线程池处理批量连接
线程池代码如下:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_ 1; // 防止头文件重复包含
#include <iostream>
#include <pthread.h> // POSIX线程库
#include <unordered_map> // 哈希表,用于存储线程信息
#include <queue> // 任务队列
const size_t default_cap = 20; // 默认线程池容量
// 自动加解锁的RAII封装类
class LockGuard
{
public:
// 构造函数自动加锁
explicit LockGuard(pthread_mutex_t &mtx)
: mtx_(mtx)
{
pthread_mutex_lock(&mtx_);
}
// 析构函数自动解锁
~LockGuard()
{
pthread_mutex_unlock(&mtx_);
}
// 禁用拷贝构造和赋值操作
LockGuard(const LockGuard &) = delete;
LockGuard &operator=(const LockGuard &) = delete;
private:
pthread_mutex_t &mtx_; // 引用管理的互斥锁
};
// 线程池模板类
template <class T>
class ThreadPool
{
private:
// 检查任务队列是否为空
bool IsEmptyQueue()
{
return tasks_.empty();
}
// 从任务队列头部取出一个任务
T Pop()
{
T task = tasks_.front();
tasks_.pop();
return task;
}
private:
// 线程数据传递结构体
struct thread_data
{
std::string name; // 线程名称
ThreadPool<T> *tp; // 所属线程池指针
};
public:
// 获取线程池单例(线程安全)
static ThreadPool<T> &GetInstance()
{
// C++11后静态局部变量初始化是线程安全的
static ThreadPool<T> tp;
return tp;
}
// 启动线程池
void Start()
{
running_ = true; // 设置运行标志
// 创建指定数量的工作线程
for (int i = 1; i <= cap_; ++i)
{
std::string name = "thread-" + std::to_string(i); // 生成线程名
pthread_t tid;
// 创建线程数据
thread_data *td = new thread_data{
name : name,
tp : this
};
// 创建线程
pthread_create(&tid, nullptr, ThreadHandler, (void *)(td));
// 记录线程信息
threads_[tid] = name;
}
}
// 向线程池添加任务
void Push(const T &task)
{
{
LockGuard lg(mtx_); // 加锁保护
tasks_.push(task); // 任务入队
}
// 先释放锁后唤醒,提高并发效率
pthread_cond_signal(&cond_); // 唤醒一个等待线程
}
// 结束线程池
void End()
{
{
LockGuard lg(mtx_);
running_ = false; // 设置停止标志
}
// 广播唤醒所有等待线程
pthread_cond_broadcast(&cond_);
// 等待所有线程结束
for (auto &[tid, name] : threads_)
{
pthread_join(tid, nullptr);
}
threads_.clear(); // 清空线程记录
}
// 线程处理函数(静态成员函数)
static void *ThreadHandler(void *args)
{
// 获取线程数据
thread_data *td = static_cast<thread_data *>(args);
ThreadPool<T> *tp = td->tp;
std::string thread_name = td->name;
T task;
while (true)
{
{
LockGuard lg(tp->mtx_); // 加锁
// 等待条件:任务队列不为空或线程池停止
while (tp->IsEmptyQueue())
{
if (tp->running_)
pthread_cond_wait(&tp->cond_, &tp->mtx_); // 等待任务
else
{
delete td; // 清理线程数据
return nullptr; // 线程退出
}
}
// 获取任务
task = tp->Pop();
} // 自动解锁
// 执行任务
task();
// 输出任务结果(假设T类型有GetResult方法)
printf("%s: %s\n", thread_name.c_str(), task.GetResult().c_str());
}
delete td; // 理论上不会执行到这里
return nullptr;
}
private:
// 私有构造函数(单例模式)
ThreadPool(size_t cap = default_cap)
: cap_(cap), // 线程容量
running_(false) // 初始状态为未运行
{
pthread_mutex_init(&mtx_, nullptr); // 初始化互斥锁
pthread_cond_init(&cond_, nullptr); // 初始化条件变量
}
// 析构函数
~ThreadPool()
{
if (running_)
End(); // 如果还在运行,先停止
pthread_mutex_destroy(&mtx_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}
// 禁用拷贝构造和赋值操作
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
private:
std::unordered_map<pthread_t, std::string> threads_; // 线程ID与名称映射
size_t cap_; // 线程池容量
std::queue<T> tasks_; // 任务队列
pthread_mutex_t mtx_; // 保护任务队列的互斥锁
pthread_cond_t cond_; // 任务通知条件变量
bool running_; // 线程池运行标志
};
// 静态成员初始化
template <class T>
pthread_mutex_t lock_ = PTHREAD_MUTEX_INITIALIZER;
#endif
如果对线程池实现不清楚的,可以跳转查看:线程池实现
我们需要给线程池传递一个任务对象,因此需要定义一个任务类:
#ifndef _TASK_HPP_
#define _TASK_HPP_ 1
#include <iostream>
#include <sys/socket.h> // socket相关系统调用
#include <stdlib.h> // 标准库函数
#include "log.hpp" // 自定义日志功能
// 线程本地存储的缓冲区,每个线程独立一份,避免竞争
thread_local char buffer[1024];
/**
* @brief 任务处理类,封装客户端请求处理逻辑
*
* 用于处理单个客户端的通信任务,支持函数对象调用方式
*/
class Task
{
public:
/**
* @brief 构造函数
* @param client_sockfd 客户端socket文件描述符
* @param client_addr 客户端IP地址(默认"*"表示不指定)
* @param client_port 客户端端口号(默认-1表示不指定)
*/
Task(int client_sockfd = -1, std::string client_addr = "*", u_int16_t client_port = -1)
: client_sockfd_(client_sockfd),
client_addr_(client_addr),
client_port_(client_port){}
/**
* @brief 处理客户端请求的核心方法
*
* 1. 接收客户端消息
* 2. 记录日志
* 3. 回显消息给客户端
* 4. 关闭连接
*/
void Server()
{
// 判断是否需要打印客户端信息(当有具体客户端信息时需要打印)
bool print_inf = !(client_addr_ == "*" || client_port_ == -1);
// 接收客户端数据
ssize_t n = recv(client_sockfd_, buffer, sizeof(buffer) - 1, 0);
if (n == -1) // 接收出错
{
if (print_inf)
lg(Error, "accept error, [%d]: %s", errno, strerror(errno));
}
else if (n == 0) // 客户端关闭连接
{
if (print_inf)
lg(Info, "client [%s: %d] quit", client_addr_.c_str(), client_port_);
}
else // 正常接收数据
{
buffer[n] = 0; // 添加字符串结束符
if (print_inf)
lg(Info, "server get a message from client [%s: %d]: %s",
client_addr_.c_str(), client_port_, buffer);
// 回显数据给客户端
ssize_t m = send(client_sockfd_, buffer, n, 0);
if (m == -1) // 发送失败
{
lg(Error, "send error, [%d]: %s", errno, strerror(errno));
}
}
// 关闭客户端连接
close(client_sockfd_);
}
/**
* @brief 函数对象运算符重载
*
* 使Task对象可以像函数一样被调用,便于线程池等场景使用
*/
void operator()()
{
Server();
}
private:
int client_sockfd_; // 客户端socket文件描述符
std::string client_addr_; // 客户端IP地址
u_int16_t client_port_; // 客户端端口号
};
#endif
而后,我们只需要将客户的信息传递到任务类中,将该任务传递到线程池中即可:
void Start()
{
while (true)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int client_sockfd = accept(listen_sockfd_, (sockaddr *)&client, &len);
if (client_sockfd == -1)
{
lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
}
std::string client_addr;
u_int16_t client_port;
tcp::GetAddrAndPort(client, client_addr, client_port);
lg(Info, "accept a new client [%s: %d]", client_addr.c_str(), client_port);
char buffer[1024];
/* 被修改部分 */
// Server(client_sockfd, buffer, sizeof(buffer), client_addr, client_port);
Task t(client_sockfd, client_addr, client_port);
ThreadPool<Task>::GetInstance().Push(t);
/* END */
}
}
客户端中,我们需要将套接字创建和连接服务端放到循环内部,从而使得每次发送信息都会建立一个新的连接:
while (true)
{
// 创建套接字并连接服务器(移到循环内部)
int sockfd = tcp::Socket();
tcp::Connect(sockfd, server);
...
}
运行后,我们会发现客户端每次发送信息都会建立一个新的连接,利用线程池,避免了一个用户一个线程从而使得资源耗尽的情况。线程池会将资源均匀分给所有的在线用户。
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./main.o 7777
[Info][2025-7-29 22:51:24] listening sock create success, sockfd: 3
[Info][2025-7-29 22:51:24] listening sock bind success
[Info][2025-7-29 22:51:27] accept a new client [127.0.0.1: 38302]
[Info][2025-7-29 22:51:28] server get a message from client [127.0.0.1: 38302]: 123
[Info][2025-7-29 22:51:28] accept a new client [127.0.0.1: 38308]
[Info][2025-7-29 22:51:29] server get a message from client [127.0.0.1: 38308]: 412
[Info][2025-7-29 22:51:29] accept a new client [127.0.0.1: 33648]
[Info][2025-7-29 22:51:30] server get a message from client [127.0.0.1: 33648]: 2342
[Info][2025-7-29 22:51:30] accept a new client [127.0.0.1: 33654]
[Info][2025-7-29 22:51:31] server get a message from client [127.0.0.1: 33654]: 12412
客户端添加重连机制
我们将 Connect 函数修改为循环检测,给予用户5次机会,每隔2秒重新连接一次,当次数耗尽则终止客户端进程:
// 有5次重连机会,每次间隔一秒
int cnt = 5;
while(connect(sockfd, (struct sockaddr *)&server, sizeof(server)) == -1)
{
// 进来了说明 connect 连接失败,我们尝试重连
if(cnt == 0){
printf("Sorry, I am unable to connect to the designated server\n");
return 1;
}
--cnt;
printf("Try to reconnect..., %d chances remaining\n", cnt);
sleep(1);
}
效果如下:
# 服务端
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./main.o 7777
[Info][2025-7-29 23:37:22] listening sock create success, sockfd: 3
[Info][2025-7-29 23:37:22] listening sock bind success
[Info][2025-7-29 23:37:24] accept a new client [127.0.0.1: 44352]
[Info][2025-7-29 23:37:26] server get a message from client [127.0.0.1: 44352]: 123
[Info][2025-7-29 23:37:26] accept a new client [127.0.0.1: 44360]
^C # 服务端故意断开连接
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./main.o 7777
[Info][2025-7-29 23:37:32] listening sock create success, sockfd: 3
[Info][2025-7-29 23:37:32] listening sock bind success
[Info][2025-7-29 23:37:32] accept a new client [127.0.0.1: 52752]
# 客户端
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./tcp_client.o 127.0.0.1 7777
Please Enter# 123
server echo# 123
Please Enter# 124
server [127.0.0.1: 7777] quit
Try to reconnect..., 4 chances remaining
Try to reconnect..., 3 chances remaining
Try to reconnect..., 2 chances remaining
# 重连成功
Please Enter#
封装成简单的聊天服务
1.客户端修改
封装成聊天服务需要做的是将收到的信号广播出去,那么,首先我们需要保证客户端的发送数据和接收数据两个行为隔离,这样才能保证用户在发送数据的同时可以接收其他客户端的数据。
我们通过创建一个线程来实现任务分离,线程通过客户端创建的套接字接收数据,又因为客户端发送一条消息后就会关闭该套接字,因为每次发送完数据后需要回收线程并在下一个循环再创建一个线程:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <thread>
#include "tcp.hpp"
using namespace std;
/**
* 处理服务器响应的线程函数
* @param sockfd 客户端套接字描述符
* @param server_ip 服务器IP地址
* @param server_port 服务器端口号
*/
void Handler(const int sockfd, const std::string server_ip, const u_int16_t server_port)
{
while (true)
{
char buffer[1024]; // 接收缓冲区
struct sockaddr_in temp; // 临时存储对端地址(实际未使用)
socklen_t len = sizeof(temp);
// 从服务器接收数据(不关心发送方信息)
ssize_t m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (m > 0) // 成功接收到数据
{
buffer[m] = 0; // 添加字符串结束符
cout << "server echo# " << buffer << endl; // 打印服务器响应
}
else if (m == 0) // 连接已关闭
{
printf("server [%s: %d] quit\n", server_ip.c_str(), server_port);
break;
}
else // 接收出错
{
break;
}
}
}
int main(int argc, char *argv[])
{
// 检查命令行参数
if (argc != 3)
{
cerr << "Usage: " << argv[0] << " server_ip server_port" << endl;
return 1;
}
// 解析命令行参数
string server_ip = argv[1];
uint16_t server_port = stoi(argv[2]);
// 初始化服务器地址结构
struct sockaddr_in server;
bzero(&server, sizeof(server)); // 清空结构体
server.sin_port = htons(server_port); // 设置端口(网络字节序)
server.sin_family = AF_INET; // IPv4地址族
server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // 设置IP地址
// 主循环: 允许用户多次发送请求
while (true)
{
// 创建TCP套接字
int sockfd = tcp::Socket();
// 创建接收线程(在连接前创建以便及时接收响应)
thread receiver(Handler, sockfd, server_ip, server_port);
// 连接重试机制(最多尝试5次)
int cnt = 5;
// 尝试连接服务器
while (connect(sockfd, (struct sockaddr *)&server, sizeof(server)) == -1)
{
// 连接失败,进行重试
if (cnt == 0) // 重试次数用尽
{
printf("Sorry, I am unable to connect to the designated server\n");
return 1;
}
--cnt;
printf("Try to reconnect..., %d chances remaining\n", cnt);
sleep(2); // 等待2秒后重试
}
// 获取用户输入
string inbuffer;
cout << "Please Enter# ";
getline(cin, inbuffer);
// 发送数据到服务器
ssize_t n = send(sockfd, inbuffer.c_str(), inbuffer.size(), 0);
if (n == -1)
{
perror("send");
}
// 关闭套接字并等待接收线程结束
close(sockfd);
receiver.join();
}
return 0;
}
2.服务端修改
服务端需要在 TcpServer 类当中添加两个函数,一个用于广播数据,另一个用于删除用户,我们利用 unordered_set 来存储所有用户的信息:
// tcpserver.hpp
#ifndef _TCP_SERVER_HPP_
#define _TCP_SERVER_HPP_ 1
// 包含必要的头文件
#include "tcp.hpp" // TCP基础操作封装
#include "log.hpp" // 日志模块
#include "thread_pool.hpp" // 线程池实现
#include "task.hpp" // 任务类定义
#include <unordered_set> // 哈希集合
#include <mutex> // 互斥锁
// 默认端口号定义
const u_int16_t default_port = 8888;
/**
* TCP服务器类
* 实现多客户端连接管理和消息广播功能
*/
class TcpServer
{
public:
/**
* 构造函数
* @param port 服务器监听端口,默认为default_port
*/
TcpServer(u_int16_t port = default_port)
: port_(port)
{}
/**
* 初始化服务器
* 1. 创建监听套接字
* 2. 绑定端口
* 3. 开始监听
* 4. 启动线程池
*/
void Init()
{
listen_sockfd_ = tcp::Socket(); // 创建监听套接字
lg(Info, "listening sock create success, sockfd: %d", listen_sockfd_);
tcp::Bind(listen_sockfd_, port_); // 绑定端口
lg(Info, "listening sock bind success");
tcp::Listen(listen_sockfd_); // 开始监听
ThreadPool<Task>::GetInstance().Start(); // 启动线程池
}
/**
* 启动服务器主循环
* 1. 接受客户端连接
* 2. 记录客户端信息
* 3. 创建任务并提交到线程池
*/
void Start()
{
while (true) // 主循环
{
struct sockaddr_in client; // 客户端地址信息
socklen_t len = sizeof(client);
// 接受客户端连接
int client_sockfd = accept(listen_sockfd_, (sockaddr *)&client, &len);
if (client_sockfd == -1) // 接受连接失败
{
lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
continue;
}
// 线程安全地添加客户端套接字到集合
mtx_.lock();
client_sockfds_.insert(client_sockfd);
mtx_.unlock();
// 获取客户端地址和端口信息
std::string client_addr;
u_int16_t client_port;
tcp::GetAddrAndPort(client, client_addr, client_port);
lg(Info, "accept a new client [%s: %d]", client_addr.c_str(), client_port);
// 创建任务对象并提交到线程池
Task t(client_sockfd,
std::bind(&TcpServer::broadcast, this, std::placeholders::_1), // 绑定广播回调
std::bind(&TcpServer::delete_client, this, std::placeholders::_1), // 绑定删除客户端回调
client_addr,
client_port);
ThreadPool<Task>::GetInstance().Push(t); // 提交任务
}
}
/**
* 广播消息给所有客户端
* @param buffer 要广播的消息内容
*/
void broadcast(std::string buffer)
{
mtx_.lock(); // 加锁保护客户端集合
for(auto sockfd: client_sockfds_)
{
send(sockfd, buffer.c_str(), buffer.size(), 0); // 发送消息
}
mtx_.unlock(); // 解锁
}
/**
* 从客户端集合中删除指定客户端
* @param sockfd 要删除的客户端套接字
*/
void delete_client(int sockfd)
{
mtx_.lock(); // 加锁保护客户端集合
client_sockfds_.erase(sockfd); // 删除客户端
mtx_.unlock(); // 解锁
}
private:
u_int16_t port_; // 服务器监听端口
int listen_sockfd_; // 监听套接字描述符
std::unordered_set<int> client_sockfds_; // 已连接客户端套接字集合
std::mutex mtx_; // 保护client_sockfds_的互斥锁
};
#endif
利用包装器将广播函数和删除用户函数传递给 Task 类使用
// task.hpp
#ifndef _TASK_HPP_
#define _TASK_HPP_ 1
#include <iostream>
#include <sys/socket.h> // 套接字相关
#include <stdlib.h>
#include <functional> // 函数对象
#include "log.hpp" // 日志模块
// 线程本地存储的缓冲区,每个线程独立一份
thread_local char buffer[1024];
// 定义函数对象类型
using func_t = std::function<void(std::string)>; // 广播消息的函数类型
using func2_t = std::function<void(int)>; // 删除客户端的函数类型
/**
* 任务类 - 处理客户端请求的单元
* 封装了客户端通信逻辑和回调机制
*/
class Task
{
public:
/**
* 构造函数
* @param client_sockfd 客户端套接字描述符,默认-1
* @param broadcast 广播消息的回调函数,默认nullptr
* @param delete_client 删除客户端的回调函数,默认nullptr
* @param client_addr 客户端地址,默认"*"
* @param client_port 客户端端口,默认-1
*/
Task(int client_sockfd = -1,
func_t broadcast = nullptr,
func2_t delete_client = nullptr,
std::string client_addr = "*",
u_int16_t client_port = -1)
: client_sockfd_(client_sockfd),
broadcast_(broadcast),
delete_client_(delete_client),
client_addr_(client_addr),
client_port_(client_port) {}
/**
* 服务器端处理客户端请求的核心方法
* 1. 接收客户端消息
* 2. 处理接收结果
* 3. 调用广播回调
* 4. 关闭连接并清理客户端
*/
void Server()
{
// 判断是否需要打印客户端信息
bool print_inf = !(client_addr_ == "*" || client_port_ == -1);
// 接收客户端数据
ssize_t n = recv(client_sockfd_, buffer, sizeof(buffer) - 1, 0);
if (n == -1) // 接收错误
{
if (print_inf)
lg(Error, "accept error, [%d]: %s", errno, strerror(errno));
}
else if (n == 0) // 客户端关闭连接
{
if (print_inf)
lg(Info, "client [%s: %d] quit", client_addr_.c_str(), client_port_);
}
else // 成功接收数据
{
buffer[n] = 0; // 添加字符串结束符
if (print_inf)
lg(Info, "server get a message from client [%s: %d]: %s",
client_addr_.c_str(), client_port_, buffer);
// 构造响应消息
std::string outbuffer = "[" + client_addr_ + ": " + std::to_string(client_port_) + "]:" + buffer;
// 调用广播回调函数
broadcast_(outbuffer);
}
// 关闭客户端连接
close(client_sockfd_);
// 调用删除客户端回调
delete_client_(client_sockfd_);
}
/**
* 函数调用运算符重载
* 使Task对象可以像函数一样被调用
*/
void operator()()
{
Server();
}
private:
int client_sockfd_; // 客户端套接字描述符
std::string client_addr_; // 客户端IP地址
u_int16_t client_port_; // 客户端端口号
func_t broadcast_; // 广播消息的回调函数
func2_t delete_client_; // 删除客户端的回调函数
};
#endif
终于,我们的聊天服务大功告成,接下来让我们测试一下通信能力如何:
# 服务端
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./main.o 7777
[Info][2025-7-30 1:1:47] listening sock create success, sockfd: 3
[Info][2025-7-30 1:1:47] listening sock bind success
[Info][2025-7-30 1:1:59] accept a new client [127.0.0.1: 40718]
[Info][2025-7-30 1:2:7] accept a new client [127.0.0.1: 40732]
[Info][2025-7-30 1:2:23] server get a message from client [127.0.0.1: 40718]: hello, how are you?
[Info][2025-7-30 1:2:23] accept a new client [127.0.0.1: 46856]
[Info][2025-7-30 1:2:44] server get a message from client [127.0.0.1: 40732]: I am fine, thank you!
[Info][2025-7-30 1:2:44] accept a new client [127.0.0.1: 59586]
# 客户端1
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./tcp_client.o 127.0.0.1 7777
Please Enter# hello, how are you?
server echo# [127.0.0.1: 40718]$hello, how are you?
Please Enter# server echo# [127.0.0.1: 40732]$I am fine, thank you!
# 客户端2
╭─ljx@VM-16-15-debian ~/linux_review/tcp
╰─➤ ./tcp_client.o 127.0.0.1 7777
Please Enter# server echo# [127.0.0.1: 40718]$hello, how are you?
I am fine, thank you!
server echo# [127.0.0.1: 40732]$I am fine, thank you!
Please Enter#
因为消息都打印在同一个会话上,难免会出现信息混乱,若想要实现信息不混乱,可以选择将数据分离到两个会话窗口中,这里就不做扩展了
感谢各位的阅读,希望对你有帮助!有问题欢迎在下面评论区讨论!