forked from AlexStocks/getty
-
Notifications
You must be signed in to change notification settings - Fork 0
/
getty.go
191 lines (165 loc) · 5.72 KB
/
getty.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/******************************************************
# DESC : getty interface
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:20
# FILE : getty.go
******************************************************/
package getty
import (
"compress/flate"
"errors"
"net"
"time"
)
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// If there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session.
type NewSessionCallback func(Session) error
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
// When receiving a tcp network streaming segment, there are 4 cases as following:
// case 1: a error found in the streaming segment;
// case 2: can not unmarshal a pkg from the streaming segment;
// case 3: just unmarshal a pkg from the streaming segment;
// case 4: unmarshal more than one pkg from the streaming segment;
//
// The return value is (nil, 0, error) as case 1.
// The return value is (nil, 0, nil) as case 2.
// The return value is (pkg, pkgLen, nil) as case 3.
// The handleTcpPackage may invoke func Read many times as case 4.
Read(Session, []byte) (interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) error
}
// tcp package handler interface
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error
// invoked when session closed.
OnClose(Session)
// invoked when got error.
OnError(Session, error)
// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)
// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}
/////////////////////////////////////////
// compress
/////////////////////////////////////////
type CompressType int
const (
CompressNone CompressType = flate.NoCompression // 0
CompressZip = flate.DefaultCompression // -1
CompressBestSpeed = flate.BestSpeed // 1
CompressBestCompression = flate.BestCompression // 9
CompressHuffman = flate.HuffmanOnly // -2
CompressSnappy = 10
)
/////////////////////////////////////////
// connection interfacke
/////////////////////////////////////////
type Connection interface {
ID() uint32
SetCompressType(CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgNum()
incWritePkgNum()
// update session's active time
UpdateActive()
// get session's active time
GetActive() time.Time
readTimeout() time.Duration
// SetReadTimeout sets deadline for the future read calls.
SetReadTimeout(time.Duration)
writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration)
Write(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
// set related session
setSession(Session)
}
/////////////////////////////////////////
// Session interfacke
/////////////////////////////////////////
var (
ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked")
ErrNullPeerAddr = errors.New("peer address is nil")
)
type Session interface {
Connection
Reset()
Conn() net.Conn
Stat() string
IsClosed() bool
// get endpoint type
EndPoint() EndPoint
SetMaxMsgLen(int)
SetName(string)
SetEventListener(EventListener)
SetPkgHandler(ReadWriter)
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetRQLen(int)
SetWQLen(int)
SetWaitTime(time.Duration)
SetTaskPool(*TaskPool)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
// the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
WritePkg(pkg interface{}, timeout time.Duration) error
WriteBytes([]byte) error
WriteBytesArray(...[]byte) error
Close()
}
/////////////////////////////////////////
// EndPoint interfacke
/////////////////////////////////////////
type EndPoint interface {
// get endpoint type
EndPointType() EndPointType
// run event loop and serves client request.
RunEventLoop(newSession NewSessionCallback)
// check the endpoint has been closed
IsClosed() bool
// close the endpoint and free its resource
Close()
}
type Client interface {
EndPoint
}
type Server interface {
EndPoint
// get the network listener
Listener() net.Listener
}