Skip to content

Commit

Permalink
fix: support log file based positioning replication (to #155)
Browse files Browse the repository at this point in the history
  • Loading branch information
VWagen1989 committed Nov 14, 2024
1 parent 3e9e0e0 commit 8369977
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 68 deletions.
5 changes: 2 additions & 3 deletions binlogreplication/binlog_position_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
)

const binlogPositionDirectory = ".replica"
const mysqlFlavor = "MySQL56"
const defaultChannelName = ""

// binlogPositionStore manages loading and saving data to the binlog position metadata table. This provides
Expand All @@ -47,7 +46,7 @@ type binlogPositionStore struct {
// Currently only the default binlog channel ("") is supported.
// If no position is stored, this method returns a zero mysql.Position and a nil error.
// If any errors are encountered, a nil mysql.Position and an error are returned.
func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) {
func (store *binlogPositionStore) Load(flavor string, ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) {
store.mu.Lock()
defer store.mu.Unlock()

Expand All @@ -62,7 +61,7 @@ func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (po
// Strip off the "MySQL56/" prefix
positionString = strings.TrimPrefix(positionString, "MySQL56/")

return replication.ParsePosition(mysqlFlavor, positionString)
return replication.ParsePosition(flavor, positionString)
}

// Save persists the specified |position| to disk.
Expand Down
112 changes: 96 additions & 16 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +54,9 @@ const (
ERFatalReplicaError = 13117
)

// Match any strings starting with "OFF" (case insensitive)
var gtidModeOffRegex = regexp.MustCompile(`(?i)^OFF$`)

// binlogReplicaApplier represents the process that applies updates from a binlog connection.
//
// This type is NOT used concurrently – there is currently only one single applier process running to process binlog
Expand Down Expand Up @@ -116,6 +120,29 @@ func (a *binlogReplicaApplier) IsRunning() bool {
return a.running.Load()
}

// check the GTID_MODE on "conn", return false if it's 'OFF', otherwise return true.
func checkGtidModeEnabled(conn *mysql.Conn) (bool, error) {
qr, err := conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
if err != nil {
return false, fmt.Errorf("error checking GTID_MODE: %v", err)
}
if len(qr.Rows) == 0 {
return false, fmt.Errorf("no rows returned when checking GTID_MODE")
}
gtidMode := string(qr.Rows[0][0].Raw())
return !gtidModeOffRegex.MatchString(gtidMode), nil
}

// This function will connect to the MySQL server and check the GTID_MODE.
func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) {
conn, err := mysql.Connect(ctx, &params)
if err != nil {
return false, err
}
defer conn.Close()
return checkGtidModeEnabled(conn)
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
Expand All @@ -130,6 +157,8 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

var conn *mysql.Conn
var err error
gtidModeEnabled := false
flavorName := ""
for connectionAttempts := uint64(0); ; connectionAttempts++ {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)

Expand Down Expand Up @@ -157,6 +186,18 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
ConnectTimeoutMs: 4_000,
}

gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams)
if err != nil {
return nil, err
}

if !gtidModeEnabled {
flavorName = replication.FilePosFlavorID
} else {
flavorName = replication.Mysql56FlavorID
}
connParams.Flavor = flavorName

conn, err = mysql.Connect(ctx, &connParams)
if err != nil {
logrus.Warnf("failed connection attempt to source (%s): %s",
Expand Down Expand Up @@ -184,7 +225,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

// Request binlog events to start
// TODO: This should also have retry logic
err = a.startReplicationEventStream(ctx, conn)
err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName)
if err != nil {
return nil, err
}
Expand All @@ -196,17 +237,10 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
return conn, nil
}

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn) error {
serverId, err := loadReplicaServerId()
func (a *binlogReplicaApplier) initializedGtidPosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) {
position, err := positionStore.Load(flavorName, ctx, a.engine)
if err != nil {
return err
}

position, err := positionStore.Load(ctx, a.engine)
if err != nil {
return err
return replication.Position{}, err
}

if position.IsZero() {
Expand All @@ -227,9 +261,9 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
gtidPurged = gtidPurged[1:]
}

purged, err := replication.ParsePosition(mysqlFlavor, gtidPurged)
purged, err := replication.ParsePosition(flavorName, gtidPurged)
if err != nil {
return err
return replication.Position{}, err
}
position = purged
}
Expand All @@ -248,11 +282,57 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
position = replication.Position{GTIDSet: gtid.GTIDSet()}
}

return position, nil
}

// another method like "initializedGtidPosition" to get the current log file based position
func (a *binlogReplicaApplier) initializedLogFilePosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) {
position, err := positionStore.Load(flavorName, ctx, a.engine)
if err != nil {
return replication.Position{}, err
}

if position.IsZero() {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)
if err != nil {
return replication.Position{}, err
}
filePosGtid := replication.FilePosGTID{
File: replicaSourceInfo.SourceLogFile,
Pos: uint32(replicaSourceInfo.SourceLogPos),
}
position = replication.Position{GTIDSet: filePosGtid}
}

return position, nil
}

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error {
serverId, err := loadReplicaServerId()
if err != nil {
return err
}

var position replication.Position
if gtidModeEnabled {
position, err = a.initializedGtidPosition(ctx, positionStore, flavorName)
if err != nil {
return err
}
if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": position.GTIDSet.String()}); err != nil {
ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
}
} else {
position, err = a.initializedLogFilePosition(ctx, positionStore, flavorName)
if err != nil {
return err
}
}

a.currentPosition = position
a.pendingPosition = position
if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": a.currentPosition.GTIDSet.String()}); err != nil {
ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
}

// Clear out the format description in case we're reconnecting, so that we don't use the old format description
// to interpret any event messages before we receive the new format description from the new stream.
Expand Down
14 changes: 14 additions & 0 deletions binlogreplication/binlog_replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context
return err
}
replicaSourceInfo.ConnectRetryCount = uint64(intValue)
case "SOURCE_LOG_FILE":
value, err := getOptionValueAsString(option)
if err != nil {
return err
}
replicaSourceInfo.SourceLogFile = value
case "SOURCE_LOG_POS":
intValue, err := getOptionValueAsInt(option)
if err != nil {
return err
}
replicaSourceInfo.SourceLogPos = uint64(intValue)
case "SOURCE_AUTO_POSITION":
intValue, err := getOptionValueAsInt(option)
if err != nil {
Expand Down Expand Up @@ -352,6 +364,8 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
copy.SourceUser = replicaSourceInfo.User
copy.SourceHost = replicaSourceInfo.Host
copy.SourcePort = uint(replicaSourceInfo.Port)
copy.SourceLogFile = replicaSourceInfo.SourceLogFile
copy.SourceLogPos = replicaSourceInfo.SourceLogPos
copy.SourceServerUuid = replicaSourceInfo.Uuid
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
Expand Down
31 changes: 11 additions & 20 deletions devtools/replica-setup/checker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ check_server_params() {
# Check for each parameter and validate their values
binlog_format=$(echo "$result" | grep -i "binlog_format" | awk '{print $2}')
enforce_gtid_consistency=$(echo "$result" | grep -i "enforce_gtid_consistency" | awk '{print $2}')
gtid_mode=$(echo "$result" | grep -i "gtid_mode" | awk '{print $2}')
gtid_strict_mode=$(echo "$result" | grep -i "gtid_strict_mode" | awk '{print $2}')
gtid_mode=$(echo "$result" | grep -i "gtid_mode" | awk '{print $2}' | tr '[:lower:]' '[:upper:]')
gtid_strict_mode=$(echo "$result" | grep -i "gtid_strict_mode" | awk '{print $2}' | tr '[:lower:]' '[:upper:]')
log_bin=$(echo "$result" | grep -i "log_bin" | awk '{print $2}')

# Validate binlog_format
Expand All @@ -39,24 +39,15 @@ check_server_params() {
fi

# MariaDB use gtid_strict_mode instead of gtid_mode
if [[ -z "$gtid_strict_mode" ]]; then
# Validate enforce_gtid_consistency (for MySQL)
if [[ "$enforce_gtid_consistency" != "ON" ]]; then
echo "Error: enforce_gtid_consistency is not set to 'ON', it is set to '$enforce_gtid_consistency'."
return 1
fi

# Validate gtid_mode (for MySQL)
if [[ "$gtid_mode" != "ON" ]]; then
echo "Error: gtid_mode is not set to 'ON', it is set to '$gtid_mode'."
return 1
fi
else
# Validate gtid_strict_mode (for MariaDB)
if [[ "$gtid_strict_mode" != "ON" ]]; then
echo "Error: gtid_strict_mode is not set to 'ON', it is set to '$gtid_strict_mode'."
return 1
fi
if [[ "$gtid_strict_mode" == "OFF" || "${gtid_mode}" =~ ^OFF ]]; then
LOG_POS_MODE="ON"
echo "LOG_POS_MODE is set to $LOG_POS_MODE"
fi

# If gtid_strict_mode is empty, check gtid_mode. If it's not OFF, then enforce_gtid_consistency must be ON
if [[ -z "$gtid_strict_mode" && $LOG_POS_MODE == "OFF" && "$enforce_gtid_consistency" != "ON" ]]; then
echo "Error: gtid_mode is not set to 'OFF', it is set to '$gtid_mode'. enforce_gtid_consistency must be 'ON'."
return 1
fi

# Validate log_bin
Expand Down
1 change: 1 addition & 0 deletions devtools/replica-setup/replica_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ MYDUCK_USER=${MYDUCK_USER:-root}
MYDUCK_PASSWORD=${MYDUCK_PASSWORD:-}
MYDUCK_SERVER_ID=${MYDUCK_SERVER_ID:-2}
MYDUCK_IN_DOCKER=${MYDUCK_IN_DOCKER:-false}
LOG_POS_MODE="OFF"

while [[ $# -gt 0 ]]; do
case $1 in
Expand Down
39 changes: 30 additions & 9 deletions devtools/replica-setup/snapshot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,39 @@ echo "Thread count set to: $THREAD_COUNT"

echo "Copying data from MySQL to MyDuck..."
# Run mysqlsh command and capture the output
output=$(mysqlsh -h${MYSQL_HOST} -P${MYSQL_PORT} -u${MYSQL_USER} -p${MYSQL_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true)
output=$(mysqlsh --host=${MYSQL_HOST} --port=${MYSQL_PORT} --user=${MYSQL_USER} --password=${MYSQL_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true)

# Extract the EXECUTED_GTID_SET using grep and awk
EXECUTED_GTID_SET=$(echo "$output" | grep -i "EXECUTED_GTID_SET" | awk '{print $2}')
if [[ $LOG_POS_MODE == "OFF" ]]; then
# Extract the EXECUTED_GTID_SET from this output:
# Executed_GTID_set: 369107a6-a0a5-11ef-a255-0242ac110008:1-10
EXECUTED_GTID_SET=$(echo "$output" | grep -i "EXECUTED_GTID_SET" | awk '{print $2}')

# Check if EXECUTED_GTID_SET is empty
if [ -z "$EXECUTED_GTID_SET" ]; then
echo "EXECUTED_GTID_SET is empty, exiting."
exit 1
# Check if EXECUTED_GTID_SET is empty
if [ -z "$EXECUTED_GTID_SET" ]; then
echo "EXECUTED_GTID_SET is empty, exiting."
exit 1
fi

# If not empty, print the extracted GTID set
echo "EXECUTED_GTID_SET: $EXECUTED_GTID_SET"
else
# Extract the BINLOG_FILE and BINLOG_POS from this output:
# Binlog_file: binlog.000002
# Binlog_position: 3763
# Executed_GTID_set: ''
BINLOG_FILE=$(echo "$output" | grep -i "Binlog_file" | awk '{print $2}')
BINLOG_POS=$(echo "$output" | grep -i "Binlog_position" | awk '{print $2}')

# Check if BINLOG_FILE and BINLOG_POS are empty
if [ -z "$BINLOG_FILE" ] || [ -z "$BINLOG_POS" ]; then
echo "BINLOG_FILE or BINLOG_POS is empty, exiting."
exit 1
fi

# If not empty, print the extracted BINLOG_FILE and BINLOG_POS
echo "BINLOG_FILE: $BINLOG_FILE"
echo "BINLOG_POS: $BINLOG_POS"
fi

# If not empty, print the extracted GTID set
echo "EXECUTED_GTID_SET: $EXECUTED_GTID_SET"

echo "Snapshot completed successfully."
18 changes: 16 additions & 2 deletions devtools/replica-setup/start_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ else
fi

# Use the EXECUTED_GTID_SET variable from the previous steps
if [ ! -z "$EXECUTED_GTID_SET" ]; then
if [ $LOG_POS_MODE == "OFF" ] && [ ! -z "$EXECUTED_GTID_SET" ]; then
mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=root --no-password <<EOF
SET GLOBAL gtid_purged = "${EXECUTED_GTID_SET}";
EOF
fi

# Connect to MySQL and execute the replication configuration commands
mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=root --no-password <<EOF
if [ $LOG_POS_MODE == "OFF" ]; then
mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=root --no-password <<EOF
CHANGE REPLICATION SOURCE TO
SOURCE_HOST='${MYSQL_HOST_FOR_REPLICA}',
SOURCE_PORT=${MYSQL_PORT},
Expand All @@ -34,6 +35,19 @@ CHANGE REPLICATION SOURCE TO
;
START REPLICA;
EOF
else
mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=root --no-password <<EOF
CHANGE REPLICATION SOURCE TO
SOURCE_HOST='${MYSQL_HOST_FOR_REPLICA}',
SOURCE_PORT=${MYSQL_PORT},
SOURCE_USER='${MYSQL_USER}',
SOURCE_PASSWORD='${MYSQL_PASSWORD}',
SOURCE_LOG_FILE='${BINLOG_FILE}',
SOURCE_LOG_POS=${BINLOG_POS}
;
START REPLICA;
EOF
fi

# Check if the commands were successful
if [ $? -ne 0 ]; then
Expand Down
Loading

0 comments on commit 8369977

Please sign in to comment.