本文介绍如何给tcp连接增加回调函数,和websocket一样,提供三个回调函数:OnOpen,OnClose,OnMessage。
更多请参考:我的自研网络框架 znet,欢迎Star与提Issue。
回调函数
- OnOpen: 连接建立成功后触发的回调函数。
- OnClose: 连接断开后触发的回调函数。
- OnMessage: 收到新消息时的回调函数。
实现
前面已经提到如何启动一个TCP服务,具体请点击:TCP编程。我们在此基础上去添加回调函数。
- Callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package network
import "net"
type ConnectionHandler func(conn *net.TCPConn)
// Callback manage connection callback handlers.
type Callback struct {
open ConnectionHandler
close ConnectionHandler
request func(conn *net.TCPConn, msg []byte)
}
// triggerOpenEvent is called when the connection is established
func (callback *Callback) triggerOpenEvent(conn *net.TCPConn) {
if callback.open != nil {
callback.open(conn)
}
}
// triggerCloseEvent is called when the connection is closed
func (callback *Callback) triggerCloseEvent(conn *net.TCPConn) {
if callback.close != nil {
callback.close(conn)
}
}
// triggerRequestEvent is called when receive new message
func (callback *Callback) triggerRequestEvent(conn *net.TCPConn, msg []byte) {
if callback.request != nil {
callback.request(conn, msg)
}
}
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129package network
import (
"bufio"
"log"
"net"
"runtime"
)
type Server struct {
options Options
callback *Callback
}
// Options 选项
type Options struct {
Accept int // 接收tcp的线程数
SendBuffer int // 发送数据的缓冲区长度
ReadBuffer int // 接收数据的缓冲区长度
OnOpen ConnectionHandler
OnClose ConnectionHandler
OnMessage func(conn *net.TCPConn, msg []byte)
}
type Option func(options *Options)
func NewServer(opts ...Option) *Server {
options := Options{
Accept: runtime.NumCPU(),
SendBuffer: 1024,
ReadBuffer: 1024,
}
for _, setter := range opts {
setter(&options)
}
callback := &Callback{
open: options.OnOpen,
close: options.OnClose,
request: options.OnMessage,
}
return &Server{options: options, callback: callback}
}
// Start 启动服务
func (s *Server) Start(bind string) {
var (
listener *net.TCPListener
addr *net.TCPAddr
err error
)
// 解析绑定地址
if addr, err = net.ResolveTCPAddr("tcp", bind); err != nil {
log.Printf("net.ResolveTCPAddr(tcp, %s) error(%v)", bind, err)
return
}
// 绑定服务
if listener, err = net.ListenTCP("tcp", addr); err != nil {
log.Printf("net.ListenTCP(tcp, %s) error(%v)", bind, err)
return
}
log.Printf("start tcp listen: %s", bind)
// 利用多核的优势去处理链接的创建
for i := 0; i < s.options.Accept; i++ {
go s.listen(listener)
}
}
// listen 监听
func (s *Server) listen(lis *net.TCPListener) {
var (
err error
)
for {
var conn *net.TCPConn
// 监听客户端的链接,完成三次握手,得到一个链接对象
if conn, err = lis.AcceptTCP(); err != nil {
// if listener close then return
log.Printf("listener.Accept(%s) error(%v)", lis.Addr().String(), err)
return
}
if err = conn.SetReadBuffer(s.options.ReadBuffer); err != nil {
log.Printf("conn.SetReadBuffer() error(%v)", err)
return
}
if err = conn.SetWriteBuffer(s.options.ReadBuffer); err != nil {
log.Printf("conn.SetWriteBuffer() error(%v)", err)
return
}
log.Printf("client new request,ip: %v", conn.RemoteAddr())
// 一个goroutine处理一个连接
go s.handle(conn)
}
}
func (s *Server) handle(conn *net.TCPConn) {
log.Printf("start handle client:%s", conn.RemoteAddr().String())
defer conn.Close() // 关闭链接
// 触发open回调
s.callback.triggerOpenEvent(conn)
reader := bufio.NewReader(conn)
for {
// 用一个4k的数组来接收数据
var buf [4096]byte
n, err := reader.Read(buf[:]) // 读取数据
if err != nil {
break
}
msg := buf[:n]
// 触发消息回调
s.callback.triggerRequestEvent(conn, msg)
}
// 触发close回调
s.callback.triggerCloseEvent(conn)
}示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// RunServer 启动服务端
func RunServer() {
server := NewServer(func(options *Options) {
options.OnOpen = func(conn *net.TCPConn) {
log.Println("welcome")
}
options.OnClose = func(conn *net.TCPConn) {
log.Println("goodbye")
}
options.OnMessage = func(conn *net.TCPConn, msg []byte) {
log.Println("received message: ", string(msg))
conn.Write([]byte("bar"))
}
})
server.Start(":9000")
select {}
}
// RunClient 启动客户端
func RunClient() {
client := NewClient()
if err := client.Connect("127.0.0.1:9000"); err != nil {
panic(err)
}
go func() {
for {
client.Receive()
}
}()
for {
client.Send([]byte("foo"))
time.Sleep(time.Second)
}
}
- 本文作者: Hongker
- 本文链接: https://hongker.github.io/2022/11/14/golang-tcp-callback/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!