1水平分庫
最近在做一個IM系統(tǒng),之前的舊系統(tǒng)沒有考慮到用戶量會增長得這么龐大,導致現(xiàn)在數(shù)據(jù)庫性能瓶頸非常嚴重,迫切需要分庫,用于減少每個庫的用戶數(shù)量,進而分攤負載,最終達到數(shù)據(jù)庫橫向擴展的目的。
數(shù)據(jù)庫水平分庫是以用戶Id為分庫的依據(jù),同一個用戶的所有數(shù)據(jù)都在同一個庫上,每個庫有著相同的表結(jié)構(gòu)。為了實現(xiàn)開發(fā)人員來說對數(shù)據(jù)庫的透明訪問,分庫框架需要解決二個問題:
1、 方法參數(shù)中有用戶id的數(shù)據(jù)的新增,查詢及修改
2、 方法參數(shù)中無用戶id的數(shù)據(jù)的查詢
2用戶id
把用戶名和密碼所在的表定義為用戶表,用戶id即是用戶表中的惟一性標識的整形值,如果用戶的用戶名只有一種方式,那么id可以是用戶名的hash值,此時用戶表也是分庫的;如果用戶的用戶名有多種方式,比如允許用戶使用email登陸,也允許用戶使用手機號碼登陸,那么用戶id應該是用戶表中的遞增字段值,此時用戶表應該是不分庫的,這時可以把用戶表獨立為另一個庫,稱之為認證庫。我們的項目應用是屬于后者。
1 3 解決方案
3.1 說明
簡單服務即為DAO,每個domain都對應一個簡單服務,簡單服務之間不允許互相依賴;復雜服務可以依賴多個簡單服務,但不能直接訪問數(shù)據(jù)庫,復雜服務對數(shù)據(jù)庫的操作必須通過單簡單服務。
使用hibernate作為訪問數(shù)據(jù)庫的中間層,結(jié)合Spring的Aop攔截方法,簡單服務代理與簡單服務實現(xiàn)相同的接口,一個簡單服務對應二個實例,一個引用動態(tài)獲取數(shù)據(jù)庫連接的sessionFactory,另一個引用Hibernate Shards的sessionFactory
3.2 方法參數(shù)中有用戶Id
Spring Aop攔截簡單服務代理的所有方法,如果方法的第一個參數(shù)為userid,則將userid
放到當前線程中,并選擇引用動態(tài)獲取數(shù)據(jù)庫連接的sessionFactory的簡單服務實例,在獲取數(shù)據(jù)庫連接時根據(jù)當前線程的userid選擇相應連接,流程如下:
3.3 方法參數(shù)中無用戶Id
Spring Aop攔截簡單服務代理的所有方法,如果方法的第一個參數(shù)為非userid,選擇引用Hibernate Shards的sessionFactory的簡單服務實例,遍歷所有數(shù)據(jù)庫,并返回匯總后的數(shù)據(jù)。這種情況下只允許讀,不允許寫。流程如下:
1 4實現(xiàn)
4.1 簡單服務代理
對每個簡單服務用jdk動態(tài)代理生成一個代理對像,復雜服務依賴代理對像。
4.2 實例化
在簡單服務類上標注@DetachDbService,則會產(chǎn)生三個實例(框架實現(xiàn)):
1. 簡單服務代理實例
2. 引用動態(tài)獲取數(shù)據(jù)庫連接的sessionFactory的簡單服務實例
3. 引用Hibernate Shards的sessionFactory簡單服務實例
4.3 方法參數(shù)
如果是到某個庫獲取數(shù)據(jù),則第一個參數(shù)必須為Long或者UseridAble類型,用于獲取userid
4.4 userid與數(shù)據(jù)庫關系
可選方案 |
優(yōu)點 |
缺點 |
按號段分 |
可部分遷移 |
數(shù)據(jù)分布不均 |
取模 |
數(shù)據(jù)分布均勻 |
遷移數(shù)據(jù)量是1/(n+1),不能按服務器性能分配 |
在認證庫中保存數(shù)據(jù)庫配置 |
靈活,可部分遷移 |
查詢前需要先從數(shù)據(jù)庫或緩存中獲得此配置 |
總的來說,取模是最優(yōu)方案,但是考慮到服務器性能可能不一致,而又需要充分利用服務器資源,所以需要在取模的同時加上權(quán)重。比如現(xiàn)在有二臺數(shù)據(jù)庫,權(quán)重為1:2,那么用戶id先對3取模,0的為第一臺服務器,1,2的為第二臺服務器。
4.5精確分頁
由于hibernate shards不能到某個庫或者其中的幾個庫中去查詢,并且它的分頁是先到所有的庫中將所有符合條件的數(shù)據(jù)取回到內(nèi)存中再進行分頁,所以不可能使用它的分頁。
先用hibernate shards到各個庫上查出符合條件的數(shù)目及數(shù)據(jù)庫標識(標識為查詢表中最小用戶id),返回結(jié)果后對標識進行排序(這樣確保同樣的查詢條件在翻頁的時候能夠以同樣的順序查詢數(shù)據(jù)庫,以達到精確查詢的目的)。根據(jù)這個結(jié)果計算出每個數(shù)據(jù)庫取值的段,然后用動態(tài)數(shù)據(jù)庫連接按之前排好的順序遍歷數(shù)據(jù)庫進行查找,段為0的直接跳過,找滿結(jié)果則返回。
比如現(xiàn)在有3個庫,要查詢所在地為深圳的用戶,通過hibernate shards查得數(shù)據(jù)如下:
|
深圳地區(qū)用戶總數(shù) |
深圳特區(qū)用戶最小id |
DB1 |
7 |
2 |
DB2 |
5 |
1 |
DB3 |
30 |
3 |
這時按用戶最小id排序結(jié)果是DB2,DB1,DB3
假設每頁10條記錄,
第一頁的數(shù)據(jù)是從DB2中取5條,DB1中取前5條,不需要到DB3去取
第二頁的數(shù)據(jù)是從DB1中取后2條,在DB3中取前8條,不需要到DB1中去取
第三頁數(shù)據(jù)是從DB3中取第9到第18條,不需要到DB1和DB2中去取
… …
缺點:不能精確排序
5關鍵代碼
- package com.konceptusa.infinet.annotation;
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
- import org.springframework.beans.factory.annotation.Autowire;
- /**
- * 簡單服務類實例化標注
- * @author Jwin
- *
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target( { ElementType.TYPE })
- @Documented
- public @interface DetachDbService
- {
- boolean lazy() default false;
- Autowire autoWire() default Autowire.BY_NAME;
- String init() default "";
- String destroy() default "";
- }
package com.konceptusa.infinet.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.beans.factory.annotation.Autowire; /** * 簡單服務類實例化標注 * @author Jwin * */ @Retention(RetentionPolicy.RUNTIME) @Target( { ElementType.TYPE }) @Documented public @interface DetachDbService { boolean lazy() default false; Autowire autoWire() default Autowire.BY_NAME; String init() default ""; String destroy() default ""; }
- package com.konceptusa.infinet.annotation.handler;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.aop.framework.ProxyFactoryBean;
- import org.springframework.beans.MutablePropertyValues;
- import org.springframework.beans.factory.config.RuntimeBeanReference;
- import org.springframework.beans.factory.support.RootBeanDefinition;
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
- import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
- import com.konceptusa.infinet.annotation.DetachDbService;
- /**
- * 向spring中注冊簡單服務代理實例,引用動態(tài)數(shù)據(jù)庫連結(jié)的簡單服務實例,引用hibernate shards的簡單服務實例
- * @author Jwin
- *
- */
- public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
- {
- private final static String SESSIONFACTORYNAME = "sessionFactory";
- public final static String DYNAMIC_POSTFIX = "Dynamic";
- public final static String SHARDS_POSTFIX = "Shards";
- private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
- private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
- public Class annotation()
- {
- return DetachDbService.class;
- }
- @Override
- protected void handle(DetachDbService s, Class target)
- {
- String name = target.getSimpleName();
- if (!name.endsWith("ServiceImpl"))
- {
- throw new IllegalConfigException(target.getName()
- + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
- }
- name = getBeanName(name);
- String dynamicName = name + DYNAMIC_POSTFIX;
- String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
- //生成動態(tài)獲取數(shù)據(jù)庫連接的簡單服務實例
- createBean(s, target, dynamicName, dynamicSessionFactory);
- String shardsName = name + SHARDS_POSTFIX;
- String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
- //生成查詢所有數(shù)據(jù)庫的簡單服務實例
- createBean(s, target, shardsName, shardsFactory);
- //生成簡單服務代理類
- RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
- List<String> interceptorNamesList = new ArrayList<String>();
- interceptorNamesList.add(DETACHDBINTERCEPTOR);
- mpv.addPropertyValue("interceptorNames", interceptorNamesList);
- definition.setPropertyValues(mpv);
- registerBeanDefinition(name, definition);
- }
- private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
- {
- RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
- beanDefinition.setPropertyValues(mpv);
- registerBeanDefinition(name, beanDefinition);
- }
- private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
- {
- RootBeanDefinition definition = new RootBeanDefinition();
- definition.setAbstract(false);
- definition.setBeanClass(target);
- definition.setSingleton(true);
- definition.setLazyInit(s.lazy());
- definition.setAutowireCandidate(true);
- definition.setAutowireMode(s.autoWire().value());
- if (!"".equals(s.init()))
- {
- definition.setInitMethodName(s.init().trim());
- }
- if (!"".equals(s.destroy()))
- {
- definition.setDestroyMethodName(s.destroy().trim());
- }
- if (LOG.isDebugEnabled())
- {
- LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
- }
- SpringAnnotationUtils.readProperties(target, definition);
- return definition;
- }
- private String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
- }
package com.konceptusa.infinet.annotation.handler; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.aop.framework.ProxyFactoryBean; import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler; import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils; import com.konceptusa.infinet.annotation.DetachDbService; /** * 向spring中注冊簡單服務代理實例,引用動態(tài)數(shù)據(jù)庫連結(jié)的簡單服務實例,引用hibernate shards的簡單服務實例 * @author Jwin * */ public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService> { private final static String SESSIONFACTORYNAME = "sessionFactory"; public final static String DYNAMIC_POSTFIX = "Dynamic"; public final static String SHARDS_POSTFIX = "Shards"; private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor"; private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class); public Class annotation() { return DetachDbService.class; } @Override protected void handle(DetachDbService s, Class target) { String name = target.getSimpleName(); if (!name.endsWith("ServiceImpl")) { throw new IllegalConfigException(target.getName() + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'"); } name = getBeanName(name); String dynamicName = name + DYNAMIC_POSTFIX; String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX; //生成動態(tài)獲取數(shù)據(jù)庫連接的簡單服務實例 createBean(s, target, dynamicName, dynamicSessionFactory); String shardsName = name + SHARDS_POSTFIX; String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX; //生成查詢所有數(shù)據(jù)庫的簡單服務實例 createBean(s, target, shardsName, shardsFactory); //生成簡單服務代理類 RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name); MutablePropertyValues mpv = new MutablePropertyValues(); mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName)); List<String> interceptorNamesList = new ArrayList<String>(); interceptorNamesList.add(DETACHDBINTERCEPTOR); mpv.addPropertyValue("interceptorNames", interceptorNamesList); definition.setPropertyValues(mpv); registerBeanDefinition(name, definition); } private void createBean(DetachDbService s, Class target, String name, String sessionFactory) { RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name); MutablePropertyValues mpv = new MutablePropertyValues(); mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory)); beanDefinition.setPropertyValues(mpv); registerBeanDefinition(name, beanDefinition); } private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name) { RootBeanDefinition definition = new RootBeanDefinition(); definition.setAbstract(false); definition.setBeanClass(target); definition.setSingleton(true); definition.setLazyInit(s.lazy()); definition.setAutowireCandidate(true); definition.setAutowireMode(s.autoWire().value()); if (!"".equals(s.init())) { definition.setInitMethodName(s.init().trim()); } if (!"".equals(s.destroy())) { definition.setDestroyMethodName(s.destroy().trim()); } if (LOG.isDebugEnabled()) { LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]"); } SpringAnnotationUtils.readProperties(target, definition); return definition; } private String getBeanName(String name) { name = name.substring(0, name.length() - "Impl".length()); name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length()); return name; } }
- package com.konceptusa.infinet.detach.aop;
- import org.aopalliance.intercept.MethodInterceptor;
- import org.aopalliance.intercept.MethodInvocation;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.util.MethodInvoker;
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.UseridAble;
- import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
- /**
- * 分庫簡單服務代理
- * @author Jwin
- *
- */
- public class DetachDBInterceptor implements MethodInterceptor
- {
- private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
- public Object invoke(MethodInvocation invoke) throws Throwable
- {
- int len = invoke.getArguments().length;
- Long id = null;
- if(len >= 1)
- {
- Object arg = invoke.getArguments()[0];
- if(arg instanceof UseridAble)
- {
- UseridAble useridAble = (UseridAble) arg;
- id = useridAble.getUserid();
- }
- else if(arg instanceof Long)
- {
- id = (Long) arg;
- }
- }
- if(id != null)
- {
- UseridContextHolder.setUserid(id);
- try
- {
- return invoke(invoke, id);
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- else
- {
- return invoke(invoke, id);
- }
- }
- private Object invoke(MethodInvocation invoke, Long id) throws Throwable
- {
- String str = invoke.getThis().toString();
- int start = str.lastIndexOf(".");
- int end = str.lastIndexOf("@");
- String className = str.substring(start + 1, end);
- String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
- if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
- {
- postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
- }
- String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
- if(LOG.isDebugEnabled())
- LOG.debug("select service " + serviceName + " for userid = " + id);
- Object service = ObjectFactory.getManagedObject(serviceName);
- if(service == null)
- {
- throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
- }
- MethodInvoker invoker = new MethodInvoker();
- invoker.setArguments(invoke.getArguments());
- invoker.setTargetObject(service);
- invoker.setTargetMethod(invoke.getMethod().getName());
- invoker.prepare();
- return invoker.invoke();
- }
- }
package com.konceptusa.infinet.detach.aop; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.util.MethodInvoker; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.framework.core.support.ObjectFactory; import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler; import com.konceptusa.infinet.detach.UseridAble; import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder; import com.konceptusa.infinet.detach.datasource.UseridContextHolder; /** * 分庫簡單服務代理 * @author Jwin * */ public class DetachDBInterceptor implements MethodInterceptor { private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class); public Object invoke(MethodInvocation invoke) throws Throwable { int len = invoke.getArguments().length; Long id = null; if(len >= 1) { Object arg = invoke.getArguments()[0]; if(arg instanceof UseridAble) { UseridAble useridAble = (UseridAble) arg; id = useridAble.getUserid(); } else if(arg instanceof Long) { id = (Long) arg; } } if(id != null) { UseridContextHolder.setUserid(id); try { return invoke(invoke, id); }finally { UseridContextHolder.removeUserid(); } } else { return invoke(invoke, id); } } private Object invoke(MethodInvocation invoke, Long id) throws Throwable { String str = invoke.getThis().toString(); int start = str.lastIndexOf("."); int end = str.lastIndexOf("@"); String className = str.substring(start + 1, end); String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX; if(id == null && DataSourceIdContextHolder.getDataSourceId() == null) { postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX; } String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix; if(LOG.isDebugEnabled()) LOG.debug("select service " + serviceName + " for userid = " + id); Object service = ObjectFactory.getManagedObject(serviceName); if(service == null) { throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context"); } MethodInvoker invoker = new MethodInvoker(); invoker.setArguments(invoke.getArguments()); invoker.setTargetObject(service); invoker.setTargetMethod(invoke.getMethod().getName()); invoker.prepare(); return invoker.invoke(); } }
- package com.konceptusa.infinet.detach.datasource;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import javax.sql.DataSource;
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
- import org.springframework.util.Assert;
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
- import com.konceptusa.infinet.detach.service.ISelectDBService;
- /**
- * 動態(tài)獲取數(shù)據(jù)庫連接基類
- * @author Jwin
- *
- */
- public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
- {
- private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
- public final static int defaultDataSourceId = -1;
- protected MultiHibernateProperties multiHibernateProperties;
- protected ISelectDBService selectDBService;
- private String newWeights;
- private String oldWeights;
- private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
- public void setSelectDBService(ISelectDBService selectDBService)
- {
- this.selectDBService = selectDBService;
- }
- public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
- {
- this.multiHibernateProperties = multiHibernateProperties;
- }
- @Override
- protected Object determineCurrentLookupKey()
- {
- Long id = UseridContextHolder.getUserid();
- return selectDBService.selectDb(id);
- }
- @Override
- public void afterPropertiesSet()
- {
- LOG.info("init dynamic datasource start");
- Assert.notNull(multiHibernateProperties);
- Assert.notNull(selectDBService);
- List<Properties> properties = multiHibernateProperties.getShardProperties();
- Assert.notEmpty(properties);
- int dataSourceCount = 0;
- for(Properties p : properties)
- {
- dataSourceCount++;
- createDataSource(dataSourceMap, p);
- }
- createDefaultDataSource(dataSourceMap);
- selectDBService.setDefaultDataSourceId(defaultDataSourceId);
- selectDBService.setDataSourceCount(dataSourceCount);
- setTargetDataSources(dataSourceMap);
- setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
- initWeight(dataSourceCount);
- super.afterPropertiesSet();
- LOG.info("init dynamic datasource success");
- }
- public void initWeight(int dataSourceCount)
- {
- Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
- Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
- int totalOldWeight = 0;
- int totalNewWeight = 0;
- if(newWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("newWeights " + newWeights);
- String[] weights = StringUtils.split(newWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- newWeightMap.put(totalNewWeight + j, i);
- }
- totalNewWeight += w;
- }
- }
- else
- {
- totalNewWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- newWeightMap.put(i, i);
- }
- }
- if(oldWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("oldWeights " + oldWeights);
- String[] weights = StringUtils.split(oldWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- oldWeightMap.put(totalOldWeight + j, i);
- }
- totalOldWeight += w;
- }
- }
- else
- {
- totalOldWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- oldWeightMap.put(i, i);
- }
- }
- if(LOG.isInfoEnabled())
- LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
- selectDBService.setTotalNewWeight(totalNewWeight);
- selectDBService.setNewWeightIdMap(newWeightMap);
- selectDBService.setTotalOldWeight(totalOldWeight);
- selectDBService.setOldWeightIdMap(oldWeightMap);
- }
- protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
- protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
- public void setNewWeights(String newWeights)
- {
- this.newWeights = newWeights;
- }
- public void setOldWeights(String oldWeights)
- {
- this.oldWeights = oldWeights;
- }
- public Map<Integer, DataSource> getDataSourceMap()
- {
- return dataSourceMap;
- }
- }
package com.konceptusa.infinet.detach.datasource; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.util.Assert; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.infinet.detach.config.MultiHibernateProperties; import com.konceptusa.infinet.detach.service.ISelectDBService; /** * 動態(tài)獲取數(shù)據(jù)庫連接基類 * @author Jwin * */ public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource { private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class); public final static int defaultDataSourceId = -1; protected MultiHibernateProperties multiHibernateProperties; protected ISelectDBService selectDBService; private String newWeights; private String oldWeights; private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>(); public void setSelectDBService(ISelectDBService selectDBService) { this.selectDBService = selectDBService; } public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties) { this.multiHibernateProperties = multiHibernateProperties; } @Override protected Object determineCurrentLookupKey() { Long id = UseridContextHolder.getUserid(); return selectDBService.selectDb(id); } @Override public void afterPropertiesSet() { LOG.info("init dynamic datasource start"); Assert.notNull(multiHibernateProperties); Assert.notNull(selectDBService); List<Properties> properties = multiHibernateProperties.getShardProperties(); Assert.notEmpty(properties); int dataSourceCount = 0; for(Properties p : properties) { dataSourceCount++; createDataSource(dataSourceMap, p); } createDefaultDataSource(dataSourceMap); selectDBService.setDefaultDataSourceId(defaultDataSourceId); selectDBService.setDataSourceCount(dataSourceCount); setTargetDataSources(dataSourceMap); setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId)); initWeight(dataSourceCount); super.afterPropertiesSet(); LOG.info("init dynamic datasource success"); } public void initWeight(int dataSourceCount) { Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>(); Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>(); int totalOldWeight = 0; int totalNewWeight = 0; if(newWeights != null) { if(LOG.isInfoEnabled()) LOG.info("newWeights " + newWeights); String[] weights = StringUtils.split(newWeights,";"); if(weights.length > dataSourceCount) { throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]"); } for(int i=0;i<weights.length;i++) { int w = Integer.parseInt(weights[i]); for(int j=0;j<w;j++) { newWeightMap.put(totalNewWeight + j, i); } totalNewWeight += w; } } else { totalNewWeight = dataSourceCount; for(int i=0;i<dataSourceCount;i++) { newWeightMap.put(i, i); } } if(oldWeights != null) { if(LOG.isInfoEnabled()) LOG.info("oldWeights " + oldWeights); String[] weights = StringUtils.split(oldWeights,";"); if(weights.length > dataSourceCount) { throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]"); } for(int i=0;i<weights.length;i++) { int w = Integer.parseInt(weights[i]); for(int j=0;j<w;j++) { oldWeightMap.put(totalOldWeight + j, i); } totalOldWeight += w; } } else { totalOldWeight = dataSourceCount; for(int i=0;i<dataSourceCount;i++) { oldWeightMap.put(i, i); } } if(LOG.isInfoEnabled()) LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight); selectDBService.setTotalNewWeight(totalNewWeight); selectDBService.setNewWeightIdMap(newWeightMap); selectDBService.setTotalOldWeight(totalOldWeight); selectDBService.setOldWeightIdMap(oldWeightMap); } protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p); protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap); public void setNewWeights(String newWeights) { this.newWeights = newWeights; } public void setOldWeights(String oldWeights) { this.oldWeights = oldWeights; } public Map<Integer, DataSource> getDataSourceMap() { return dataSourceMap; } }
- package com.konceptusa.infinet.detach.datasource;
- import java.beans.PropertyVetoException;
- import java.util.Map;
- import java.util.Properties;
- import javax.sql.DataSource;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
- import com.mchange.v2.c3p0.ComboPooledDataSource;
- /**
- * 基于c3p0連接池的動態(tài)獲取連接類
- * @author Jwin
- *
- */
- public class DynamicC3p0DataSource extends AbstractDynamicDataSource
- {
- private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class);
- private int initialSize = 1;
- private int maxActive = 1;
- private int minActive = 1;
- private int maxIdleTime = 30;
- private String automaticTestTable = "Test";
- private int acquireIncrement = 3;
- private int maxStatements = 100;
- private int maxStatementsPerConnection = 3;
- private int numHelperThreads = 3;
- private int idleConnectionTestPeriod = 30;
- protected void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap)
- {
- ComboPooledDataSource dataSource = new ComboPooledDataSource();
- dataSource.setUser("sa");
- dataSource.setPassword("");
- dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase());
- try
- {
- dataSource.setDriverClass("org.hsqldb.jdbcDriver");
- } catch (PropertyVetoException e)
- {
- throw new IllegalConfigException(e);
- }
- dataSource.setInitialPoolSize(initialSize);
- dataSource.setMaxPoolSize(maxActive);
- dataSource.setMinPoolSize(minActive);
- dataSource.setMaxIdleTime(maxIdleTime);
- dataSource.setAcquireIncrement(acquireIncrement);
- dataSource.setNumHelperThreads(numHelperThreads);
- dataSource.setAutomaticTestTable(automaticTestTable);
- dataSource.setMaxStatements(maxStatements);
- dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
- dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
- dataSourceMap.put(defaultDataSourceId, dataSource);
- }
- @Override
- protected void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p)
- {
- ComboPooledDataSource dataSource = new ComboPooledDataSource();
- dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey));
- LOG.info("init datasource url " + dataSource.getJdbcUrl());
- dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey));
- dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey));
- try
- {
- dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey));
- } catch (PropertyVetoException e)
- {
- throw new IllegalConfigException(e);
- }
- dataSource.setInitialPoolSize(initialSize);
- dataSource.setMaxPoolSize(maxActive);
- dataSource.setMinPoolSize(minActive);
- dataSource.setMaxIdleTime(maxIdleTime);
- dataSource.setAcquireIncrement(acquireIncrement);
- dataSource.setNumHelperThreads(numHelperThreads);
- dataSource.setAutomaticTestTable(automaticTestTable);
- dataSource.setMaxStatements(maxStatements);
- dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
- dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
- String id = p.getProperty(MultiHibernateProperties.shardIdKey);
- dataSourceMap.put(Integer.parseInt(id), dataSource);
- }
- public void setInitialSize(int initialSize)
- {
- this.initialSize = initialSize;
- }
- public void setMaxActive(int maxActive)
- {
- this.maxActive = maxActive;
- }
- public void setMaxIdleTime(int maxIdle)
- {
- this.maxIdleTime = maxIdle;
- }
- public void setAcquireIncrement(int acquireIncrement)
- {
- this.acquireIncrement = acquireIncrement;
- }
- public void setMaxStatements(int maxStatements)
- {
- this.maxStatements = maxStatements;
- }
- public void setMaxStatementsPerConnection(int maxStatementsPerConnection)
- {
- this.maxStatementsPerConnection = maxStatementsPerConnection;
- }
- public void setNumHelperThreads(int numHelperThreads)
- {
- this.numHelperThreads = numHelperThreads;
- }
- public void setAutomaticTestTable(String automaticTestTable)
- {
- this.automaticTestTable = automaticTestTable;
- }
- public void setMinActive(int minActive)
- {
- this.minActive = minActive;
- }
- public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod)
- {
- this.idleConnectionTestPeriod = idleConnectionTestPeriod;
- }
- }
package com.konceptusa.infinet.detach.datasource; import java.beans.PropertyVetoException; import java.util.Map; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.infinet.detach.config.MultiHibernateProperties; import com.mchange.v2.c3p0.ComboPooledDataSource; /** * 基于c3p0連接池的動態(tài)獲取連接類 * @author Jwin * */ public class DynamicC3p0DataSource extends AbstractDynamicDataSource { private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class); private int initialSize = 1; private int maxActive = 1; private int minActive = 1; private int maxIdleTime = 30; private String automaticTestTable = "Test"; private int acquireIncrement = 3; private int maxStatements = 100; private int maxStatementsPerConnection = 3; private int numHelperThreads = 3; private int idleConnectionTestPeriod = 30; protected void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap) { ComboPooledDataSource dataSource = new ComboPooledDataSource(); dataSource.setUser("sa"); dataSource.setPassword(""); dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase()); try { dataSource.setDriverClass("org.hsqldb.jdbcDriver"); } catch (PropertyVetoException e) { throw new IllegalConfigException(e); } dataSource.setInitialPoolSize(initialSize); dataSource.setMaxPoolSize(maxActive); dataSource.setMinPoolSize(minActive); dataSource.setMaxIdleTime(maxIdleTime); dataSource.setAcquireIncrement(acquireIncrement); dataSource.setNumHelperThreads(numHelperThreads); dataSource.setAutomaticTestTable(automaticTestTable); dataSource.setMaxStatements(maxStatements); dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection); dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod); dataSourceMap.put(defaultDataSourceId, dataSource); } @Override protected void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p) { ComboPooledDataSource dataSource = new ComboPooledDataSource(); dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey)); LOG.info("init datasource url " + dataSource.getJdbcUrl()); dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey)); dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey)); try { dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey)); } catch (PropertyVetoException e) { throw new IllegalConfigException(e); } dataSource.setInitialPoolSize(initialSize); dataSource.setMaxPoolSize(maxActive); dataSource.setMinPoolSize(minActive); dataSource.setMaxIdleTime(maxIdleTime); dataSource.setAcquireIncrement(acquireIncrement); dataSource.setNumHelperThreads(numHelperThreads); dataSource.setAutomaticTestTable(automaticTestTable); dataSource.setMaxStatements(maxStatements); dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection); dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod); String id = p.getProperty(MultiHibernateProperties.shardIdKey); dataSourceMap.put(Integer.parseInt(id), dataSource); } public void setInitialSize(int initialSize) { this.initialSize = initialSize; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public void setMaxIdleTime(int maxIdle) { this.maxIdleTime = maxIdle; } public void setAcquireIncrement(int acquireIncrement) { this.acquireIncrement = acquireIncrement; } public void setMaxStatements(int maxStatements) { this.maxStatements = maxStatements; } public void setMaxStatementsPerConnection(int maxStatementsPerConnection) { this.maxStatementsPerConnection = maxStatementsPerConnection; } public void setNumHelperThreads(int numHelperThreads) { this.numHelperThreads = numHelperThreads; } public void setAutomaticTestTable(String automaticTestTable) { this.automaticTestTable = automaticTestTable; } public void setMinActive(int minActive) { this.minActive = minActive; } public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) { this.idleConnectionTestPeriod = idleConnectionTestPeriod; } }
- package com.konceptusa.infinet.imsupport.detach;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.dao.HibernateQueryListCallback;
- import com.konceptusa.framework.core.dao.hql.Hql;
- import com.konceptusa.framework.core.service.BaseServiceSupport;
- import com.konceptusa.framework.core.service.Page;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.CountId;
- import com.konceptusa.infinet.detach.CountIdComparetor;
- import com.konceptusa.infinet.detach.MagrateAble;
- import com.konceptusa.infinet.detach.QueryListAble;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
- /**
- * 多個數(shù)據(jù)庫綜合查詢,簡單服務類父類
- * @author Jwin
- *
- * @param <T>
- */
- @Transactional(readOnly=true, rollbackFor = Exception.class)
- public abstract class BaseServiceSupportForMulti<T> extends BaseServiceSupport<T> implements QueryListAble<T>,MagrateAble<T>
- {
- private final static Log LOG = LogFactory.getLog(BaseServiceSupportForMulti.class);
- @Override
- protected int findCountByHql(Hql hql)
- {
- List<Long> countList = (List<Long>) getHibernateTemplate().execute(
- new HibernateQueryListCallback(new Hql("select count(*) "
- + hql.getHql(), hql.getCache(), hql.getParameters())));
- Long counts = 0L;
- for(Long count : countList)
- {
- counts += count;
- }
- return counts.intValue();
- }
- @Transactional(readOnly=true, rollbackFor = Exception.class,propagation=Propagation.NOT_SUPPORTED)
- public List<T> queryList(Hql hql, int from, int offset)
- {
- return queryListByHql(hql, from, offset);
- }
- public List<CountId> queryCount(Hql hql)
- {
- List<Object[]> list = queryListByHql(hql);
- List<CountId> countList = new ArrayList<CountId>(list.size());
- for(Object[] l : list)
- {
- if(l[1] != null)
- {
- CountId count = new CountId((Long) l[1],(Long)l[0]);
- countList.add(count);
- }
- }
- Collections.sort(countList, new CountIdComparetor());
- return countList;
- }
- protected String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
- protected Page queryPageByHql(Hql hql,String useridName, int start, int offset)
- {
- Hql countHql = new Hql("select count(*),min(" + useridName + ") "
- + hql.getHql(), hql.getCache(), hql.getParameters());
- return queryPageByHql(countHql, hql, start, offset);
- }
- //先查出各個數(shù)據(jù)庫的總數(shù)及標識,然后對標識進行排序,最后根據(jù)這個結(jié)果遍歷數(shù)據(jù)庫進行分頁查找,找滿結(jié)果則返回。
- private Page queryPageByHql(Hql countHql,Hql listHql,int start, int offset)
- {
- QueryListAble<T> serviceShards = getShardsService();
- QueryListAble<T> serviceDynamic = getDynamicService();
- List<CountId> countList = serviceShards.queryCount(countHql);
- //相對于當前之前所有數(shù)據(jù)庫的總數(shù)偏移
- int totalCount = 0;
- //相對于所有數(shù)據(jù)庫的結(jié)束偏移
- int end = start + offset;
- //相對于當前數(shù)據(jù)庫的開始偏移量
- int startRelative = -1;
- List<T> queryList = new ArrayList<T>(offset);
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- //之前所有庫總數(shù)小于開始偏移量,繼續(xù)下一個數(shù)據(jù)庫
- if(totalCount < start)
- {
- continue;
- }
- //之前所有庫總數(shù)第一次大于開始偏移量
- if(startRelative == -1)
- {
- startRelative = count.getCount().intValue() - (totalCount - start);
- }
- else
- {
- startRelative = 0;
- }
- int relativeCount = totalCount - end;
- if(relativeCount >= 0)
- {
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計算相對于當前庫的偏移
- int offsetRelative = count.getCount().intValue() - relativeCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- break;
- }
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計算相對于當前庫的偏移
- int offsetRelative = totalCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- } finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- totalCount = 0;
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- }
- return new Page<T>(totalCount, queryList);
- }
- protected Page queryPageByHql(String hqlstr,String useridName, int start, int offset,Object ... values)
- {
- Hql listHql = Hql.createIndexHql(