mariadb jdbc에서 aurora option을 사용하면, reader, writer endpoint를 따로 설정하지 않더라도 jdbc에서 처리해 주고 있습니다.
어떻게 처리하고 있는지에 대해 궁금증이 생겨서 분석을 해보았습니다.
writer end-point 만 등록해도 reader end-point에 모두 접근 가능
maria jdbc에서 connection 설정을 하면서, 현재 존재하는 end-point를 모두 가져와 등록합니다.
이때 UrlParser로 현재 HaMode인지 판단하여, Proxy를 구현합니다.
Driver
public Connection connect(final String url, final Properties props) throws SQLException {
// urlParser
UrlParser urlParser = UrlParser.parse(url, props);
if (urlParser == null || urlParser.getHostAddresses() == null) {
return null;
} else {
// bean 등록 시점에 connection 연결
return MariaDbConnection.newConnection(urlParser, null);
}
}
UrlParser
UrlParser.parse(url, prop)에서 application-database에 등록한 url 기준으로 설정 정보를 파싱합니다.
url
jdbc:mariadb:aurora://{{endPoint}}:{{port}}/common?serverTimezone=UTC&enabledTLSProtocols=TLSv1.2
HaMode
public enum HaMode {
AURORA,
REPLICATION,
SEQUENTIAL,
LOADBALANCE,
NONE
}
이때, url에 입력된 aurora로 HaMode.AURORA가 설정 됩니다.
MariaDbConnection
url 정보를 파싱한 후에, 새로운 커넥션을 등록하면서, proxy를 설정합니다.
public static MariaDbConnection newConnection(UrlParser urlParser, GlobalStateInfo globalInfo)
throws SQLException {
if (urlParser.getOptions().pool) {
return Pools.retrievePool(urlParser).getConnection();
}
// Utils.retrieveProxy 설정 proxy 설정
Protocol protocol = Utils.retrieveProxy(urlParser, globalInfo);
return new MariaDbConnection(protocol);
}
Utils
Proxy 설정 하는 부분을 보면 AURORA인 경우에는
- FailoverProxy
- AuroraListener
로 생성하고 있는 부분을 볼 수 있습니다.
public static Protocol retrieveProxy(final UrlParser urlParser, final GlobalStateInfo globalInfo)
throws SQLException {
final ReentrantLock lock = new ReentrantLock();
final LruTraceCache traceCache =
urlParser.getOptions().enablePacketDebug ? new LruTraceCache() : null;
Protocol protocol;
switch (urlParser.getHaMode()) {
case AURORA:
return getProxyLoggingIfNeeded(
urlParser,
(Protocol)
Proxy.newProxyInstance(
AuroraProtocol.class.getClassLoader(),
new Class[] {Protocol.class},
new FailoverProxy(
new AuroraListener(urlParser, globalInfo), lock, traceCache)));
FailoverProxy
생성자
public FailoverProxy(Listener listener, ReentrantLock lock, LruTraceCache traceCache)
throws SQLException {
this.lock = lock;
this.listener = listener;
this.listener.setProxy(this); // listener(AuroraListener)
this.traceCache = traceCache;
// connection 초기화
this.listener.initializeConnection();
}
- failoverProxy 생성 시 listener(AuroraListener)에 proxy 설정
- initializeConnection 커넥션 초기화
AuroraListener
public class AuroraListener extends MastersReplicasListener {
AuroraListener는 MastersReplicasListener를 상속받고 있으며, MastersReplicasListener 내 initializeConnection 커넥션 초기화 하는 부분이 있습니다.
MastersReplicasListener
public class MastersReplicasListener extends AbstractMastersReplicasListener {
// ...
@Override
public void initializeConnection() throws SQLException {
// AbstractMastersListener 의 initializeConnection 호출
super.initializeConnection();
try {
reconnectFailedConnection(new SearchFilter(true));
} catch (SQLException e) {
// initializeConnection failed
checkInitialConnection(e);
}
}
public abstract class AbstractMastersReplicasListener extends AbstractMastersListener {
private static final ConcurrentMap<HostAddress, Long> blacklist = new ConcurrentHashMap<>();
private static final ConnectionValidator connectionValidationLoop = new ConnectionValidator();
// ...
public void initializeConnection() throws SQLException {
long connectionTimeoutMillis =
TimeUnit.SECONDS.toMillis(urlParser.getOptions().validConnectionTimeout);
lastQueryNanos = System.nanoTime();
if (connectionTimeoutMillis > 0) {
connectionValidationLoop.addListener(this, connectionTimeoutMillis);
}
}
AbstractMastersListener의 initializeConnection 호출하여 connectionTimeout 설정 및 ConnectionValidator에 Listener 설정을 해주고 있습니다.
AuroraListener
AuroraListener에서 override한 reconnectFailedConnection 를 살펴보면, isInitialConnection 인 경우 do while 문에서 AuroraProtocol.loop(this, globalInfo, loopAddress, searchFilter);
를 호출하고 있습니다.
@Override
public void reconnectFailedConnection(SearchFilter initialSearchFilter) throws SQLException {
// ...
// isInitialConnection 인경우
if ((isMasterHostFail() || isSecondaryHostFail()) || searchFilter.isInitialConnection()) {
// while permit to avoid case when succeeded creating a new Master connection
// and ping master connection fail a few milliseconds after,
// resulting a masterConnection not initialized.
do {
AuroraProtocol.loop(this, globalInfo, loopAddress, searchFilter);
if (!searchFilter.isFailoverLoop()) {
try {
checkWaitingConnection();
} catch (ReconnectDuringTransactionException e) {
// don't throw an exception for this specific exception
}
}
} while (searchFilter.isInitialConnection()
&& !(masterProtocol != null
|| (urlParser.getOptions().allowMasterDownConnection && secondaryProtocol != null)));
}
// When reconnecting, search if replicas list has change since first initialisation
if (getCurrentProtocol() != null && !getCurrentProtocol().isClosed()) {
retrieveAllEndpointsAndSet(getCurrentProtocol());
}
// ...
AuroraProtocol loop
loop 로직 내에서 따라가다 보면 listener.retrieveAllEndpointsAndSet(protocol);를 호출하는 것을 볼 수 있습니다.
// ...
if (listener.isMasterHostFailReconnect() && protocol.isMasterConnection()) {
// Look for secondary when only known endpoint is the cluster endpoint
if (searchFilter.isFineIfFoundOnlyMaster()
&& listener.getUrlParser().getHostAddresses().size() <= 1
&& protocol.getHostAddress().equals(listener.getClusterHostAddress())) {
// 모든 endpoint 설정해주는 부분
listener.retrieveAllEndpointsAndSet(protocol);
if (listener.getUrlParser().getHostAddresses().size() > 1) {
// add newly discovered end-point to loop
loopAddresses.addAll(listener.getUrlParser().getHostAddresses());
// since there is more than one end point, reactivate connection to a read-only host
searchFilter = new SearchFilter(false);
}
}
if (foundMaster(listener, protocol, searchFilter)) {
return;
}
AuroraListener
retrieveAllEndpointsAndSet
getCurrentEndpointIdentifiers
로 현재 존재하는 모든 endpoints를 가져온 후 설정합니다.
public void retrieveAllEndpointsAndSet(Protocol protocol) throws SQLException {
// For a given cluster, same port for all endpoints and same end host address
if (clusterDnsSuffix != null) {
List<String> endpoints = getCurrentEndpointIdentifiers(protocol);
setUrlParserFromEndpoints(endpoints, protocol.getPort());
}
}
private List<String> getCurrentEndpointIdentifiers(Protocol protocol) throws SQLException {
List<String> endpoints = new ArrayList<>();
try {
proxy.lock.lock();
try {
// Deleted instance may remain in db for 24 hours so ignoring instances that have had no
// change
// for 3 minutes
Results results = new Results();
protocol.executeQuery(
false,
results,
"select server_id, session_id from information_schema.replica_host_status "
+ "where last_update_timestamp > now() - INTERVAL 3 MINUTE");
results.commandEnd();
ResultSet resultSet = results.getResultSet();
while (resultSet.next()) {
endpoints.add(resultSet.getString(1) + "." + clusterDnsSuffix);
}
// randomize order for distributed load-balancing
Collections.shuffle(endpoints);
} finally {
proxy.lock.unlock();
}
} catch (SQLException qe) {
logger.warning("SQL exception occurred: " + qe.getMessage());
if (protocol.getProxy().hasToHandleFailover(qe)) {
if (masterProtocol == null || masterProtocol.equals(protocol)) {
setMasterHostFail();
} else if (secondaryProtocol.equals(protocol)) {
setSecondaryHostFail();
}
addToBlacklist(protocol.getHostAddress());
reconnectFailedConnection(new SearchFilter(isMasterHostFail(), isSecondaryHostFail()));
}
}
return endpoints;
}
별도 connection 설정 없이 Transaction(readOnly = true) 로 어떻게 reader/writer 를 접근이 가능할까?
@Transactional(readOnly = true)
fun getMultiTenantTestWithReadTransaction(shopContext: ShopContext) =
queryService.getBySpaceId(shopContext).map { it.toResponse<PocInfoResponse>() }
@Transactional(readOnly = true)
인 경우 어떻게 connection 이 연결되는지 분석해보겠습니다.
위 이미지는 Spring Data의 @Transactional
이 붙은 함수가 실행되면서 거치는 객체(Bean)들입니다.
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.getTransaction(370) : Creating new transaction with name [com.bmf.shop.adminapi.multitenant.MultiTenantPocService.getMultiTenantTestWithReadTransaction]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT,readOnly
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doBegin(412) : Opened new EntityManager [SessionImpl(760331977<open>)] for JPA transaction
[ XNIO-1 task-1] c.b.s.c.t.MultiTenantConnectionProviderImpl.getConnection(39) : connection.catalog = shop_anna, connection.schema = null
[ XNIO-1 task-1] o.s.j.d.DataSourceUtils.prepareConnectionForTransaction(187) : Setting JDBC Connection [HikariProxyConnection@1615122059 wrapping org.mariadb.jdbc.MariaDbConnection@6a3f9e18] read-only
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doBegin(440) : Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@4a38c260]
[ XNIO-1 task-1] o.h.SQL.logStatement(144) : select pocinfo0_.poc_info_id as poc_info1_41_, pocinfo0_.created_at as created_2_41_, pocinfo0_.space_id as space_id3_41_, pocinfo0_.updated_at as updated_4_41_, pocinfo0_.shop_id as shop_id5_41_ from poc_info pocinfo0_ where pocinfo0_.space_id=? order by pocinfo0_.poc_info_id desc limit ?
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.processCommit(740) : Initiating transaction commit
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doCommit(557) : Committing JPA transaction on EntityManager [SessionImpl(760331977<open>)]
[ XNIO-1 task-1] o.s.j.d.DataSourceUtils.resetConnectionAfterTransaction(251) : Resetting read-only flag of JDBC Connection [HikariProxyConnection@1615122059 wrapping org.mariadb.jdbc.MariaDbConnection@6a3f9e18]
[ XNIO-1 task-1] o.s.o.j.JpaTransactionManager.doCleanupAfterCompletion(648) : Closing JPA EntityManager [SessionImpl(760331977<open>)] after transaction
- JpaTransactionManager.getTransaction(370)
- JpaTransactionManager.doBegin(412)
- DataSourceUtils.prepareConnectionForTransaction(187)
- JpaTransactionManager.doBegin(440)
- JpaTransactionManager.processCommit(740)
- JpaTransactionManager.doCommit(557)
- DataSourceUtils.resetConnectionAfterTransaction(251)
- JpaTransactionManager.doCleanupAfterCompletion(648)
AbstractPlatformTransactionManager
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// JpaTransactionManager.doGetTransaction()
Object transaction = doGetTransaction();
// ...
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
// ...
}
getTransaction()
에서 doGetTransaction
이 호출되는데 이때 실제 DataSource가 되는 TransactionManager의 override된 doGetTransaction
이 호출됩니다. 즉, JpaTransactionManager의 doGetTransaction()
이 호출됩니다.
@Override
protected Object doGetTransaction() {
JpaTransactionObject txObject = new JpaTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
EntityManagerHolder emHolder = (EntityManagerHolder)
TransactionSynchronizationManager.getResource(obtainEntityManagerFactory());
if (emHolder != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found thread-bound EntityManager [" + emHolder.getEntityManager() +
"] for JPA transaction");
}
txObject.setEntityManagerHolder(emHolder, false);
}
if (getDataSource() != null) {
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(getDataSource());
txObject.setConnectionHolder(conHolder);
}
return txObject;
}
connection 을 가져온 후에 startTransaction 을 호출하여 반환하고 있습니다.
return startTransaction(def, transaction, debugEnabled, suspendedResources);
startTransaction
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// JpaTransactionManager.doBegin()
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
8번째 라인에서 JpaTransactionManager 의 doBegin()
이 호출됩니다.
JpaTransactionManager doBegin()
protected void doBegin(Object transaction, TransactionDefinition definition) {
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
// ...
try {
// ...
EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
// Delegate to JpaDialect for actual transaction begin.
int timeoutToUse = determineTimeout(definition);
Object transactionData = getJpaDialect().beginTransaction(em,
new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
txObject.setTransactionData(transactionData);
txObject.setReadOnly(definition.isReadOnly());
// ...
doBegin()
내에서 실제로 transaction 시작하는 부분을 보면
Object transactionData = getJpaDialect().beginTransaction(em,
new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
@Override
public Object beginTransaction(EntityManager entityManager, TransactionDefinition definition)
throws PersistenceException, SQLException, TransactionException {
SessionImplementor session = getSession(entityManager);
// ...
if (isolationLevelNeeded || definition.isReadOnly()) {
if (this.prepareConnection && ConnectionReleaseMode.ON_CLOSE.equals(
session.getJdbcCoordinator().getLogicalConnection().getConnectionHandlingMode().getReleaseMode())) {
// 여기서 connection 가져오고 있음.
preparedCon = session.connection();
// 이 부분에서 실제 커넥션에 readOnly 설정
previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(preparedCon, definition);
}
else if (isolationLevelNeeded) {
throw new InvalidIsolationLevelException(
"HibernateJpaDialect is not allowed to support custom isolation levels: " +
"make sure that its 'prepareConnection' flag is on (the default) and that the " +
"Hibernate connection release mode is set to ON_CLOSE.");
}
}
// Standard JPA transaction begin call for full JPA context setup...
entityManager.getTransaction().begin();
// ...
}
preparedCon = session.connection();
에서 커넥션을 가져오는 것을 확인할 수 있습니다.
이때, MultiTenantConnectionProviderImpl의 getConnection()
에서 설정한 커넥션 정보 기준으로 커넥션을 가져옵니다.
@Component
class MultiTenantConnectionProviderImpl(
private val dataSource: DataSource,
) : MultiTenantConnectionProvider, Logging {
@Throws(SQLException::class)
override fun getAnyConnection(): Connection {
return dataSource.connection
}
@Throws(SQLException::class)
override fun releaseAnyConnection(connection: Connection) {
connection.close()
}
@Throws(SQLException::class)
override fun getConnection(tenantIdentifier: String?): Connection {
val connection = anyConnection
try {
val dbName =
if (tenantIdentifier.isNullOrBlank()) throw CommonException(INVALID_TENANT_ID) else tenantIdentifier
connection.catalog = dbName
connection.schema = dbName
logger.debug("connection.catalog = ${connection.catalog}, connection.schema = ${connection.schema}")
} catch (e: Exception) {
releaseAnyConnection(connection)
throw e
}
return connection
}
커넥션을 가져온 후 DataSourceUtils.prepareConnectionForTransaction(preparedCon, definition);
을 호출하고 있습니다.
DataSourceUtils.prepareConnectionForTransaction
@Nullable
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
throws SQLException {
// Set read-only flag.
if (definition != null && definition.isReadOnly()) {
try {
if (debugEnabled) {
logger.debug("Setting JDBC Connection [" + con + "] read-only");
}
con.setReadOnly(true);
}
catch (SQLException | RuntimeException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
throw ex;
}
exToCheck = exToCheck.getCause();
}
// "read-only not supported" SQLException -> ignore, it's just a hint anyway
logger.debug("Could not set JDBC Connection read-only", ex);
}
}
// ...
11번째 라인에서 커넥션의 setReadOnly를 호출하는 것을 확인할 수 있습니다.
setReadOnly를 호출하게 되면, 커넥션에 설정된 mariadb jdbc의 FailoverProxy.invoke()
가 호출되게 됩니다.
FailoverProxy
public class FailoverProxy implements InvocationHandler {
// ...
private static final String METHOD_SET_READ_ONLY = "setReadonly";
// ...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
switch (methodName) {
case METHOD_SET_READ_ONLY:
this.listener.switchReadOnlyConnection((Boolean) args[0]);
return null;
// ...
FailoverProxy에서 setReadOnly METHOD_SET_READ_ONLY 에서 최초 커넥션 설정 시 등록한 AuroraListener의 switchReadOnlyConnection()
을 호출하는 것을 알 수 있습니다.
MasterReplicasListener switchReadOnlyConnection()
@Override
public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException {
checkWaitingConnection();
// ...
if (currentReadOnlyAsked) {
if (currentProtocol == null) {
// switching to secondary connection
currentProtocol = this.secondaryProtocol;
} else if (currentProtocol.isMasterConnection()) {
// must change to replica connection
if (!isSecondaryHostFail()) {
try {
// switching to secondary connection
syncConnection(this.masterProtocol, this.secondaryProtocol);
currentProtocol = this.secondaryProtocol;
// current connection is now secondary
return;
} catch (SQLException e) {
// switching to secondary connection failed
if (setSecondaryHostFail()) {
addToBlacklist(secondaryProtocol.getHostAddress());
}
}
}
// stay on master connection, since replica connection is fail
FailoverLoop.addListener(this);
}
connectionProtocol이 null 이거나 master 와 연결되어 있으면, secondaryProtocol 을 설정해 주는 것을 확인할 수 있습니다.
'개발' 카테고리의 다른 글
Singleton Pattern (4) | 2024.09.23 |
---|---|
DDD - Aggregate (2) | 2024.09.22 |
DIP 의존 역전 원칙 (0) | 2024.09.21 |
DDD - 도메인 모델링 (0) | 2024.09.20 |
DDD - 도메인이란? (1) | 2024.09.19 |