본문 바로가기
개발

MariaDB connector 2.x 분석

by just다해 2024. 9. 25.

mariadb jdbc에서 aurora option을 사용하면, reader, writer endpoint를 따로 설정하지 않더라도 jdbc에서 처리해 주고 있습니다.

어떻게 처리하고 있는지에 대해 궁금증이 생겨서 분석을 해보았습니다.

writer end-point 만 등록해도 reader end-point에 모두 접근 가능

maria jdbc에서 connection 설정을 하면서, 현재 존재하는 end-point를 모두 가져와 등록합니다.

이때 UrlParser로 현재 HaMode인지 판단하여, Proxy를 구현합니다.

Driver

  public Connection connect(final String url, final Properties props) throws SQLException {
    // urlParser
    UrlParser urlParser = UrlParser.parse(url, props);
    if (urlParser == null || urlParser.getHostAddresses() == null) {
      return null;
    } else {
      // bean 등록 시점에 connection 연결
      return MariaDbConnection.newConnection(urlParser, null);
    }
  }

UrlParser

UrlParser.parse(url, prop)에서 application-database에 등록한 url 기준으로 설정 정보를 파싱합니다.

url

jdbc:mariadb:aurora://{{endPoint}}:{{port}}/common?serverTimezone=UTC&enabledTLSProtocols=TLSv1.2

HaMode

public enum HaMode {
  AURORA,
  REPLICATION,
  SEQUENTIAL,
  LOADBALANCE,
  NONE
}

이때, url에 입력된 aurora로 HaMode.AURORA가 설정 됩니다.

MariaDbConnection

url 정보를 파싱한 후에, 새로운 커넥션을 등록하면서, proxy를 설정합니다.

public static MariaDbConnection newConnection(UrlParser urlParser, GlobalStateInfo globalInfo)
      throws SQLException {
    if (urlParser.getOptions().pool) {
      return Pools.retrievePool(urlParser).getConnection();
    }

    // Utils.retrieveProxy 설정 proxy 설정
    Protocol protocol = Utils.retrieveProxy(urlParser, globalInfo);
    return new MariaDbConnection(protocol);
  }

Utils

Proxy 설정 하는 부분을 보면 AURORA인 경우에는

  • FailoverProxy
  • AuroraListener

로 생성하고 있는 부분을 볼 수 있습니다.

  public static Protocol retrieveProxy(final UrlParser urlParser, final GlobalStateInfo globalInfo)
      throws SQLException {
    final ReentrantLock lock = new ReentrantLock();
    final LruTraceCache traceCache =
        urlParser.getOptions().enablePacketDebug ? new LruTraceCache() : null;
    Protocol protocol;
    switch (urlParser.getHaMode()) {
      case AURORA:
        return getProxyLoggingIfNeeded(
            urlParser,
            (Protocol)
                Proxy.newProxyInstance(
                    AuroraProtocol.class.getClassLoader(),
                    new Class[] {Protocol.class},
                    new FailoverProxy(
                        new AuroraListener(urlParser, globalInfo), lock, traceCache)));

FailoverProxy

생성자

  public FailoverProxy(Listener listener, ReentrantLock lock, LruTraceCache traceCache)
      throws SQLException {
    this.lock = lock;
    this.listener = listener;
    this.listener.setProxy(this); // listener(AuroraListener)
    this.traceCache = traceCache;
    // connection 초기화
    this.listener.initializeConnection();
  }
  • failoverProxy 생성 시 listener(AuroraListener)에 proxy 설정
  • initializeConnection 커넥션 초기화

AuroraListener

public class AuroraListener extends MastersReplicasListener {

AuroraListener는 MastersReplicasListener를 상속받고 있으며, MastersReplicasListener 내 initializeConnection 커넥션 초기화 하는 부분이 있습니다.

MastersReplicasListener

public class MastersReplicasListener extends AbstractMastersReplicasListener {
  // ...
  @Override
  public void initializeConnection() throws SQLException {
    // AbstractMastersListener 의 initializeConnection 호출
    super.initializeConnection();
    try {
      reconnectFailedConnection(new SearchFilter(true));
    } catch (SQLException e) {
      // initializeConnection failed
      checkInitialConnection(e);
    }
  }
public abstract class AbstractMastersReplicasListener extends AbstractMastersListener {
  private static final ConcurrentMap<HostAddress, Long> blacklist = new ConcurrentHashMap<>();
  private static final ConnectionValidator connectionValidationLoop = new ConnectionValidator();
  // ...
  public void initializeConnection() throws SQLException {
    long connectionTimeoutMillis =
        TimeUnit.SECONDS.toMillis(urlParser.getOptions().validConnectionTimeout);
    lastQueryNanos = System.nanoTime();
    if (connectionTimeoutMillis > 0) {
      connectionValidationLoop.addListener(this, connectionTimeoutMillis);
    }
  }

AbstractMastersListener의 initializeConnection 호출하여 connectionTimeout 설정 및 ConnectionValidator에 Listener 설정을 해주고 있습니다.

AuroraListener

AuroraListener에서 override한 reconnectFailedConnection 를 살펴보면, isInitialConnection 인 경우 do while 문에서 AuroraProtocol.loop(this, globalInfo, loopAddress, searchFilter); 를 호출하고 있습니다.

  @Override
  public void reconnectFailedConnection(SearchFilter initialSearchFilter) throws SQLException {
    // ...
    // isInitialConnection 인경우
    if ((isMasterHostFail() || isSecondaryHostFail()) || searchFilter.isInitialConnection()) {
      // while permit to avoid case when succeeded creating a new Master connection
      // and ping master connection fail a few milliseconds after,
      // resulting a masterConnection not initialized.
      do {
        AuroraProtocol.loop(this, globalInfo, loopAddress, searchFilter);
        if (!searchFilter.isFailoverLoop()) {
          try {
            checkWaitingConnection();
          } catch (ReconnectDuringTransactionException e) {
            // don't throw an exception for this specific exception
          }
        }
      } while (searchFilter.isInitialConnection()
          && !(masterProtocol != null
              || (urlParser.getOptions().allowMasterDownConnection && secondaryProtocol != null)));
    }

    // When reconnecting, search if replicas list has change since first initialisation
    if (getCurrentProtocol() != null && !getCurrentProtocol().isClosed()) {
      retrieveAllEndpointsAndSet(getCurrentProtocol());
    }
  // ...

AuroraProtocol loop

loop 로직 내에서 따라가다 보면 listener.retrieveAllEndpointsAndSet(protocol);를 호출하는 것을 볼 수 있습니다.

// ...
        if (listener.isMasterHostFailReconnect() && protocol.isMasterConnection()) {
          // Look for secondary when only known endpoint is the cluster endpoint
          if (searchFilter.isFineIfFoundOnlyMaster()
              && listener.getUrlParser().getHostAddresses().size() <= 1
              && protocol.getHostAddress().equals(listener.getClusterHostAddress())) {

            // 모든 endpoint 설정해주는 부분
            listener.retrieveAllEndpointsAndSet(protocol);

            if (listener.getUrlParser().getHostAddresses().size() > 1) {
              // add newly discovered end-point to loop
              loopAddresses.addAll(listener.getUrlParser().getHostAddresses());
              // since there is more than one end point, reactivate connection to a read-only host
              searchFilter = new SearchFilter(false);
            }
          }

          if (foundMaster(listener, protocol, searchFilter)) {
            return;
          }

AuroraListener

retrieveAllEndpointsAndSet

getCurrentEndpointIdentifiers로 현재 존재하는 모든 endpoints를 가져온 후 설정합니다.

  public void retrieveAllEndpointsAndSet(Protocol protocol) throws SQLException {
    // For a given cluster, same port for all endpoints and same end host address
    if (clusterDnsSuffix != null) {
      List<String> endpoints = getCurrentEndpointIdentifiers(protocol);
      setUrlParserFromEndpoints(endpoints, protocol.getPort());
    }
  }
private List<String> getCurrentEndpointIdentifiers(Protocol protocol) throws SQLException {
    List<String> endpoints = new ArrayList<>();
    try {
      proxy.lock.lock();
      try {
        // Deleted instance may remain in db for 24 hours so ignoring instances that have had no
        // change
        // for 3 minutes
        Results results = new Results();
        protocol.executeQuery(
            false,
            results,
            "select server_id, session_id from information_schema.replica_host_status "
                + "where last_update_timestamp > now() - INTERVAL 3 MINUTE");
        results.commandEnd();
        ResultSet resultSet = results.getResultSet();

        while (resultSet.next()) {
          endpoints.add(resultSet.getString(1) + "." + clusterDnsSuffix);
        }

        // randomize order for distributed load-balancing
        Collections.shuffle(endpoints);

      } finally {
        proxy.lock.unlock();
      }
    } catch (SQLException qe) {
      logger.warning("SQL exception occurred: " + qe.getMessage());
      if (protocol.getProxy().hasToHandleFailover(qe)) {
        if (masterProtocol == null || masterProtocol.equals(protocol)) {
          setMasterHostFail();
        } else if (secondaryProtocol.equals(protocol)) {
          setSecondaryHostFail();
        }
        addToBlacklist(protocol.getHostAddress());
        reconnectFailedConnection(new SearchFilter(isMasterHostFail(), isSecondaryHostFail()));
      }
    }

    return endpoints;
  }

별도 connection 설정 없이 Transaction(readOnly = true) 로 어떻게 reader/writer 를 접근이 가능할까?

    @Transactional(readOnly = true)
    fun getMultiTenantTestWithReadTransaction(shopContext: ShopContext) =
        queryService.getBySpaceId(shopContext).map { it.toResponse<PocInfoResponse>() }

@Transactional(readOnly = true)인 경우 어떻게 connection 이 연결되는지 분석해보겠습니다.

위 이미지는 Spring Data의 @Transactional이 붙은 함수가 실행되면서 거치는 객체(Bean)들입니다.

[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.getTransaction(370) : Creating new transaction with name [com.bmf.shop.adminapi.multitenant.MultiTenantPocService.getMultiTenantTestWithReadTransaction]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT,readOnly
[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doBegin(412) : Opened new EntityManager [SessionImpl(760331977<open>)] for JPA transaction
[  XNIO-1 task-1] c.b.s.c.t.MultiTenantConnectionProviderImpl.getConnection(39) : connection.catalog = shop_anna, connection.schema = null
[  XNIO-1 task-1] o.s.j.d.DataSourceUtils.prepareConnectionForTransaction(187) : Setting JDBC Connection [HikariProxyConnection@1615122059 wrapping org.mariadb.jdbc.MariaDbConnection@6a3f9e18] read-only
[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doBegin(440) : Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@4a38c260]
[  XNIO-1 task-1] o.h.SQL.logStatement(144) : select pocinfo0_.poc_info_id as poc_info1_41_, pocinfo0_.created_at as created_2_41_, pocinfo0_.space_id as space_id3_41_, pocinfo0_.updated_at as updated_4_41_, pocinfo0_.shop_id as shop_id5_41_ from poc_info pocinfo0_ where pocinfo0_.space_id=? order by pocinfo0_.poc_info_id desc limit ?
[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.processCommit(740) : Initiating transaction commit
[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doCommit(557) : Committing JPA transaction on EntityManager [SessionImpl(760331977<open>)]
[  XNIO-1 task-1] o.s.j.d.DataSourceUtils.resetConnectionAfterTransaction(251) : Resetting read-only flag of JDBC Connection [HikariProxyConnection@1615122059 wrapping org.mariadb.jdbc.MariaDbConnection@6a3f9e18]
[  XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doCleanupAfterCompletion(648) : Closing JPA EntityManager [SessionImpl(760331977<open>)] after transaction
  1. JpaTransactionManager.getTransaction(370)
  2. JpaTransactionManager.doBegin(412)
  3. DataSourceUtils.prepareConnectionForTransaction(187)
  4. JpaTransactionManager.doBegin(440)
  5. JpaTransactionManager.processCommit(740)
  6. JpaTransactionManager.doCommit(557)
  7. DataSourceUtils.resetConnectionAfterTransaction(251)
  8. JpaTransactionManager.doCleanupAfterCompletion(648)

AbstractPlatformTransactionManager

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        // JpaTransactionManager.doGetTransaction()
        Object transaction = doGetTransaction();

        // ...

        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        // ...
    }

getTransaction() 에서 doGetTransaction이 호출되는데 이때 실제 DataSource가 되는 TransactionManager의 override된 doGetTransaction이 호출됩니다. 즉, JpaTransactionManager의 doGetTransaction()이 호출됩니다.

    @Override
    protected Object doGetTransaction() {
        JpaTransactionObject txObject = new JpaTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());

        EntityManagerHolder emHolder = (EntityManagerHolder)
                TransactionSynchronizationManager.getResource(obtainEntityManagerFactory());
        if (emHolder != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Found thread-bound EntityManager [" + emHolder.getEntityManager() +
                        "] for JPA transaction");
            }
            txObject.setEntityManagerHolder(emHolder, false);
        }

        if (getDataSource() != null) {
            ConnectionHolder conHolder = (ConnectionHolder)
                    TransactionSynchronizationManager.getResource(getDataSource());
            txObject.setConnectionHolder(conHolder);
        }

        return txObject;
    }

connection 을 가져온 후에 startTransaction 을 호출하여 반환하고 있습니다.

return startTransaction(def, transaction, debugEnabled, suspendedResources);

startTransaction

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
            boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // JpaTransactionManager.doBegin()
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }

8번째 라인에서 JpaTransactionManager 의 doBegin() 이 호출됩니다.

JpaTransactionManager doBegin()

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        JpaTransactionObject txObject = (JpaTransactionObject) transaction;

        // ...
        try {
            // ...
            EntityManager em = txObject.getEntityManagerHolder().getEntityManager();

            // Delegate to JpaDialect for actual transaction begin.
            int timeoutToUse = determineTimeout(definition);
            Object transactionData = getJpaDialect().beginTransaction(em,
                    new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
            txObject.setTransactionData(transactionData);
            txObject.setReadOnly(definition.isReadOnly());

        // ...

doBegin() 내에서 실제로 transaction 시작하는 부분을 보면

Object transactionData = getJpaDialect().beginTransaction(em,
                    new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
    @Override
    public Object beginTransaction(EntityManager entityManager, TransactionDefinition definition)
            throws PersistenceException, SQLException, TransactionException {

        SessionImplementor session = getSession(entityManager);

        // ...
        if (isolationLevelNeeded || definition.isReadOnly()) {
            if (this.prepareConnection && ConnectionReleaseMode.ON_CLOSE.equals(
                    session.getJdbcCoordinator().getLogicalConnection().getConnectionHandlingMode().getReleaseMode())) {
                // 여기서 connection 가져오고 있음.
                preparedCon = session.connection();
                // 이 부분에서 실제 커넥션에 readOnly 설정
                previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(preparedCon, definition);
            }
            else if (isolationLevelNeeded) {
                throw new InvalidIsolationLevelException(
                        "HibernateJpaDialect is not allowed to support custom isolation levels: " +
                        "make sure that its 'prepareConnection' flag is on (the default) and that the " +
                        "Hibernate connection release mode is set to ON_CLOSE.");
            }
        }

        // Standard JPA transaction begin call for full JPA context setup...
        entityManager.getTransaction().begin();

        // ...
    }

preparedCon = session.connection(); 에서 커넥션을 가져오는 것을 확인할 수 있습니다.

이때, MultiTenantConnectionProviderImpl의 getConnection() 에서 설정한 커넥션 정보 기준으로 커넥션을 가져옵니다.

@Component
class MultiTenantConnectionProviderImpl(
    private val dataSource: DataSource,
) : MultiTenantConnectionProvider, Logging {

    @Throws(SQLException::class)
    override fun getAnyConnection(): Connection {
        return dataSource.connection
    }

    @Throws(SQLException::class)
    override fun releaseAnyConnection(connection: Connection) {
        connection.close()
    }

    @Throws(SQLException::class)
    override fun getConnection(tenantIdentifier: String?): Connection {
        val connection = anyConnection
        try {
            val dbName =
                if (tenantIdentifier.isNullOrBlank()) throw CommonException(INVALID_TENANT_ID) else tenantIdentifier
            connection.catalog = dbName
            connection.schema = dbName

            logger.debug("connection.catalog = ${connection.catalog}, connection.schema = ${connection.schema}")
        } catch (e: Exception) {
            releaseAnyConnection(connection)
            throw e
        }
        return connection
    }

커넥션을 가져온 후 DataSourceUtils.prepareConnectionForTransaction(preparedCon, definition);을 호출하고 있습니다.

DataSourceUtils.prepareConnectionForTransaction

    @Nullable
    public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
            throws SQLException {

        // Set read-only flag.
        if (definition != null && definition.isReadOnly()) {
            try {
                if (debugEnabled) {
                    logger.debug("Setting JDBC Connection [" + con + "] read-only");
                }
                con.setReadOnly(true);
            }
            catch (SQLException | RuntimeException ex) {
                Throwable exToCheck = ex;
                while (exToCheck != null) {
                    if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
                        // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
                        throw ex;
                    }
                    exToCheck = exToCheck.getCause();
                }
                // "read-only not supported" SQLException -> ignore, it's just a hint anyway
                logger.debug("Could not set JDBC Connection read-only", ex);
            }
        }
    // ...

11번째 라인에서 커넥션의 setReadOnly를 호출하는 것을 확인할 수 있습니다.

setReadOnly를 호출하게 되면, 커넥션에 설정된 mariadb jdbc의 FailoverProxy.invoke()가 호출되게 됩니다.

FailoverProxy

public class FailoverProxy implements InvocationHandler {
  // ...
  private static final String METHOD_SET_READ_ONLY = "setReadonly";

  // ...

    @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    switch (methodName) {
      case METHOD_SET_READ_ONLY:
        this.listener.switchReadOnlyConnection((Boolean) args[0]);
        return null;
  // ...

FailoverProxy에서 setReadOnly METHOD_SET_READ_ONLY 에서 최초 커넥션 설정 시 등록한 AuroraListener의 switchReadOnlyConnection()을 호출하는 것을 알 수 있습니다.

MasterReplicasListener switchReadOnlyConnection()

  @Override
  public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException {
    checkWaitingConnection();

    // ...
        if (currentReadOnlyAsked) {
          if (currentProtocol == null) {
            // switching to secondary connection
            currentProtocol = this.secondaryProtocol;
          } else if (currentProtocol.isMasterConnection()) {
            // must change to replica connection
            if (!isSecondaryHostFail()) {
              try {
                // switching to secondary connection
                syncConnection(this.masterProtocol, this.secondaryProtocol);
                currentProtocol = this.secondaryProtocol;
                // current connection is now secondary
                return;
              } catch (SQLException e) {
                // switching to secondary connection failed
                if (setSecondaryHostFail()) {
                  addToBlacklist(secondaryProtocol.getHostAddress());
                }
              }
            }
            // stay on master connection, since replica connection is fail
            FailoverLoop.addListener(this);
          }

connectionProtocol이 null 이거나 master 와 연결되어 있으면, secondaryProtocol 을 설정해 주는 것을 확인할 수 있습니다.

'개발' 카테고리의 다른 글

Singleton Pattern  (4) 2024.09.23
DDD - Aggregate  (2) 2024.09.22
DIP 의존 역전 원칙  (0) 2024.09.21
DDD - 도메인 모델링  (0) 2024.09.20
DDD - 도메인이란?  (1) 2024.09.19