MySQL Binary Log connector. @osheroff's fork of @shiyko's project, probably the "official" version of this. With help from the Debezium devs.
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.0</version>
</dependency>
Initially project was started as a fork of open-replicator, but ended up as a complete rewrite. Key differences/features:
- automatic binlog filename/position | GTID resolution
- resumable disconnects
- plugable failover strategies
- binlog_checksum=CRC32 support (for MySQL 5.6.2+ users)
- secure communication over the TLS
- JMX-friendly
- real-time stats
- availability in Maven Central
- no third-party dependencies
- test suite over different versions of MySQL releases
If you are looking for something similar in other languages - check out siddontang/go-mysql (Go), noplay/python-mysql-replication (Python).
Or get the latest JAR(s) from here.
File binlogFile = ...
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {
for (Event event; (event = reader.readEvent()) != null; ) {
...
}
} finally {
reader.close();
}
PREREQUISITES: Whichever user you plan to use for the BinaryLogClient, he MUST have REPLICATION SLAVE privilege. Unless you specify binlogFilename/binlogPosition yourself (in which case automatic resolution won't kick in), you'll need REPLICATION CLIENT granted as well.
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "password");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new EventListener() {
@Override
public void onEvent(Event event) {
...
}
});
client.connect();
You can register a listener for
onConnect
/onCommunicationFailure
/onEventDeserializationFailure
/onDisconnect
usingclient.registerLifecycleListener(...)
.
By default, BinaryLogClient starts from the current (at the time of connect) master binlog position. If you wish to kick off from a specific filename or position, use
client.setBinlogFilename(filename)
+client.setBinlogPosition(position)
.
client.connect()
is blocking (meaning that client will listen for events in the current thread).client.connect(timeout)
, on the other hand, spawns a separate thread.
The stock BinaryLogClient works out of the box with MariaDB but there's two differences;
One, MariaDB's GTIDs are different. They're still strings but parse differently. Two, Maria can send the ANNOTATE_ROWS events which allows you to recover the SQL used to generate rows in row-based replication.
See https://mariadb.com/kb/en/annotate_rows_log_event/ and client.setUseSendAnnotateRowsEvent(true)
You might need it for several reasons: you don't want to waste time deserializing events you won't need; there is no EventDataDeserializer defined for the event type you are interested in (or there is but it contains a bug); you want certain type of events to be deserialized in a different way (perhaps *RowsEventData should contain table name and not id?); etc.
EventDeserializer eventDeserializer = new EventDeserializer();
// do not deserialize EXT_DELETE_ROWS event data, return it as a byte array
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new ByteArrayEventDataDeserializer());
// skip EXT_WRITE_ROWS event data altogether
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new NullEventDataDeserializer());
// use custom event data deserializer for EXT_DELETE_ROWS
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new EventDataDeserializer() {
...
});
BinaryLogClient client = ...
client.setEventDeserializer(eventDeserializer);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
BinaryLogClient binaryLogClient = ...
ObjectName objectName = new ObjectName("mysql.binlog:type=BinaryLogClient");
mBeanServer.registerMBean(binaryLogClient, objectName);
// following bean accumulates various BinaryLogClient stats
// (e.g. number of disconnects, skipped events)
BinaryLogClientStatistics stats = new BinaryLogClientStatistics(binaryLogClient);
ObjectName statsObjectName = new ObjectName("mysql.binlog:type=BinaryLogClientStatistics");
mBeanServer.registerMBean(stats, statsObjectName);
Introduced in 0.4.0.
TLSv1.1 & TLSv1.2 require JDK 7+. Prior to MySQL 5.7.10, MySQL supported only TLSv1 (see Secure Connection Protocols and Ciphers).
To check that MySQL server is properly configured with SSL support -
mysql -h host -u root -ptypeyourpasswordmaybe -e "show global variables like 'have_%ssl';"
("Value" should be "YES"). State of the current session can be determined using\s
("SSL" should not be blank).
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore.jks");
System.setProperty("javax.net.ssl.trustStorePassword","truststore.password");
System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "keystore.password");
BinaryLogClient client = ...
client.setSSLMode(SSLMode.VERIFY_IDENTITY);
- data of numeric types (tinyint, etc) always returned signed(!) regardless of whether column definition includes "unsigned" keyword or not.
- data of var*/*text/*blob types always returned as a byte array (for var* this is true starting from 1.0.0).
Q. How does a typical transaction look like?
A. GTID event (if gtid_mode=ON) -> QUERY event with "BEGIN" as sql -> ... -> XID event | QUERY event with "COMMIT" or "ROLLBACK" as sql.
Q. EventData for inserted/updated/deleted rows has no information about table (except for some weird id). How do I make sense out of it?
A. Each WriteRowsEventData/UpdateRowsEventData/DeleteRowsEventData event is preceded by TableMapEventData which contains schema & table name. If for some reason you need to know column names (types, etc). - the easiest way is to
select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, COLUMN_DEFAULT, IS_NULLABLE,
DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, CHARACTER_OCTET_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE,
CHARACTER_SET_NAME, COLLATION_NAME from INFORMATION_SCHEMA.COLUMNS;
# see https://dev.mysql.com/doc/refman/5.6/en/columns-table.html for more information
(yes, binary log DOES NOT include that piece of information).
You can find JDBC snippet here.
There are two entry points - BinaryLogClient (which you can use to read binary logs from a MySQL server) and BinaryLogFileReader (for offline log processing). Both of them rely on EventDeserializer to deserialize stream of events. Each Event consists of EventHeader (containing among other things reference to EventType) and EventData. The aforementioned EventDeserializer has one EventHeaderDeserializer (EventHeaderV4Deserializer by default) and a collection of EventDataDeserializer|s. If there is no EventDataDeserializer registered for some particular type of Event - default EventDataDeserializer kicks in (NullEventDataDeserializer).
For the insight into the internals of MySQL look here. MySQL Client/Server Protocol and The Binary Log sections are particularly useful as a reference documentation for the **.binlog.network
and **.binlog.event
packages.
Some of the OSS using / built on top of mysql-binlog-conector-java:
- apache/nifi An easy to use, powerful, and reliable system to process and distribute data.
- debezium A low latency data streaming platform for change data capture (CDC).
- zendesk/maxwell A MySQL-to-JSON Kafka producer.
- mavenlink/changestream - A stream of changes for MySQL built on Akka.
- mardambey/mypipe MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka.
- ngocdaothanh/mydit MySQL to MongoDB data replicator.
- sharetribe/dumpr A Clojure library for live replicating data from a MySQL database.
- shyiko/rook Generic Change Data Capture (CDC) toolkit.
- streamsets/datacollector Continuous big data ingestion infrastructure.
- twingly/ecco MySQL replication binlog parser in JRuby.
- zzt93/syncer A tool sync & manipulate data from MySQL/MongoDB to ES/Kafka/MySQL, which make 'Eventual Consistency' promise.
It's also used on a large scale in MailChimp. You can read about it here.
git clone https://github.com/shyiko/mysql-binlog-connector-java.git
cd mysql-binlog-connector-java
mvn # shows how to build, test, etc. project
setup your settings.xml to have a "central" entry.
mvn deploy
In lieu of a formal styleguide, please take care to maintain the existing coding style.
Executing mvn checkstyle:check
within project directory should not produce any errors.
If you are willing to install vagrant (required by integration tests) it's highly recommended
to check (with mvn clean verify
) that there are no test failures before sending a pull request.
Additional tests for any new or changed functionality are also very welcomed.