-
Notifications
You must be signed in to change notification settings - Fork 13
/
sample_server.rb
57 lines (46 loc) · 1.62 KB
/
sample_server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
require 'test/unit'
require 'pathname'
$LOAD_PATH.unshift("./lib")
$LOAD_PATH.unshift("./lib/fluent/plugin/thrift")
require 'thrift'
require 'flume_constants'
require 'flume_types'
require 'thrift_flume_event_server'
class FluentFlumeHandler
def append(evt)
puts "call append(evt: #{evt})"
puts "evt: #{evt}"
puts " timestamp: #{evt.timestamp.to_i / 1000}"
puts " body: #{evt.body}"
puts " fieldss: #{evt.fieldss}"
end
def rawAppend(evt)
puts "call rawAppend(evt: #{evt})"
end
def ackedAppend(evt)
puts "call ackedAppend(evt: #{evt})"
EventStatus::OK
end
def close
puts "call close()"
end
end
handler = FluentFlumeHandler.new
processor = ThriftFlumeEventServer::Processor.new handler
transport = Thrift::ServerSocket.new 'localhost', 56789
#transport_factory = Thrift::FramedTransportFactory.new
transport_factory = Thrift::BufferedTransportFactory.new
protocol_factory = Thrift::BinaryProtocolFactory.new
protocol_factory.instance_eval {|obj|
def get_protocol(trans) # override
return Thrift::BinaryProtocol.new(trans,
strict_read=false,
strict_write=false)
end
}
server = Thrift::SimpleServer.new processor, transport, transport_factory, protocol_factory
# ok # server = Thrift::ThreadedServer.new processor, transport, transport_factory, protocol_factory
# ok # server = Thrift::ThreadPoolServer.new processor, transport, transport_factory, protocol_factory
# ng # server = Thrift::NonblockingServer.new processor, transport, transport_factory, protocol_factory
puts "server start!"
server.serve