MongoDB提供了auto-sharding 功能。因?yàn)槠涫莂uto-sharding,即mongodb通過mongos(一個(gè)自動(dòng)分片模塊,用于構(gòu)建一個(gè)大規(guī)模的可擴(kuò)展的數(shù)據(jù)庫集群,這個(gè)集群可以并入動(dòng)態(tài)增加的機(jī)器)自動(dòng)建立一個(gè)水平擴(kuò)展的數(shù)據(jù)庫集群系統(tǒng),將數(shù)據(jù)庫分表存儲(chǔ)在sharding的各個(gè)節(jié)點(diǎn)上。
一個(gè)mongodb集群包括一些shards(包括一些mongod進(jìn)程),mongos路由進(jìn)程,一個(gè)或多個(gè)config服務(wù)器 下面是一些相關(guān)詞匯說明: Shards : 每一個(gè)shard包括一個(gè)或多個(gè)服務(wù)和存儲(chǔ)數(shù)據(jù)的mongod進(jìn)程(mongod是MongoDB數(shù)據(jù)的核心進(jìn)程)典型的每個(gè)shard開啟多個(gè)服務(wù)來提高服務(wù)的可用性。這些服務(wù)/mongod進(jìn)程在shard中組成一個(gè)復(fù)制集 Chunks: Chunk是一個(gè)來自特殊集合中的一個(gè)數(shù)據(jù)范圍,(collection,minKey,maxKey)描敘一個(gè)chunk,它介于minKey和 maxKey范圍之間。例如chunks 的maxsize大小是100M,如果一個(gè)文件達(dá)到或超過這個(gè)范圍時(shí),會(huì)被切分到2個(gè)新的chunks中。當(dāng)一個(gè)shard的數(shù)據(jù)過量時(shí),chunks將會(huì)被遷移到其他的shards上。同樣,chunks也可以遷移到其他的shards上 Config Servers : Config服務(wù)器存儲(chǔ)著集群的metadata信息,包括每個(gè)服務(wù)器,每個(gè)shard的基本信息和chunk信息Config服務(wù)器主要存儲(chǔ)的是chunk信息。每一個(gè)config服務(wù)器都復(fù)制了完整的chunk信息。 今天要介紹的源碼主要是Mongos的主入口函數(shù)的執(zhí)行流程,首先我們打開Mongos的項(xiàng)目(可通過打開源碼db\db_10.sln加載所有項(xiàng)目),如下圖: 注:如果要調(diào)試mongos,需要設(shè)置一個(gè)mongod進(jìn)程和一個(gè)Config Server,形如: d:\mongodb>bin>mongod --dbpath d:\mongodb\db\ --port 27012 d:\mongodb>bin>mongod --configsvr --dbpath d:\mongodb\db\ --port 27022
然后在vs2010中配置相應(yīng)的boost路徑信息及啟動(dòng)參數(shù)信息,如下圖:
下面開始正文。首先打開mongos項(xiàng)目中的server.cpp文件,找到下面方法: int main(int argc, char* argv[]) {
try { return _main(argc, argv); } catch(DBException& e) { cout << "uncaught exception in mongos main:" << endl; cout << e.toString() << endl; } catch(std::exception& e) { cout << "uncaught exception in mongos main:" << endl; cout << e.what() << endl; } catch(...) { cout << "uncaught exception in mongos main" << endl; } return 20; }
int _main(int argc, char* argv[]) {
static StaticObserver staticObserver; mongosCommand = argv[0]; //聲明options信息描述對象 po::options_description options("General options"); po::options_description sharding_options("Sharding options"); po::options_description hidden("Hidden options"); po::positional_options_description positional; CmdLine::addGlobalOptions( options , hidden ); //添加sharding選項(xiàng)描述信息 sharding_options.add_options() ( "configdb" , po::value<string>() , "1 or 3 comma separated config servers" ) ( "test" , "just run unit tests" ) ( "upgrade" , "upgrade meta data version" ) ( "chunkSize" , po::value<int>(), "maximum amount of data per chunk" ) ( "ipv6", "enable IPv6 support (disabled by default)" ) ( "jsonp","allow JSONP access via http (has security implications)" ) ; options.add(sharding_options); .....
.....
// parse options po::variables_map params; //對argc,argv進(jìn)行分析并轉(zhuǎn)換成params,以便下面使用 if ( ! CmdLine::store( argc , argv , options , hidden , positional , params ) ) return 0; // The default value may vary depending on compile options, but for mongos // we want durability to be disabled. cmdLine.dur = false; //如果是help if ( params.count( "help" ) ) { cout << options << endl; return 0; } //如果是版本信息 if ( params.count( "version" ) ) { printShardingVersionInfo(); return 0; } //如要設(shè)置chunkSize if ( params.count( "chunkSize" ) ) { Chunk::MaxChunkSize = params["chunkSize"].as<int>() * 1024 * 1024; } ...... //必選項(xiàng),設(shè)置configdb信息 if ( ! params.count( "configdb" ) ) { out() << "error: no args for --configdb" << endl; return 4; } vector<string> configdbs; //對參數(shù)configdb進(jìn)行分割 (以','分割 ) splitStringDelim( params["configdb"].as<string>() , &configdbs , ',' ); //mongodb強(qiáng)制為1或3,具體原因不明 if ( configdbs.size() != 1 && configdbs.size() != 3 ) { out() << "need either 1 or 3 configdbs" << endl; return 5; } // we either have a seeting were all process are in localhost or none is for ( vector<string>::const_iterator it = configdbs.begin() ; it != configdbs.end() ; ++it ) { try { // 根據(jù)地址參數(shù)實(shí)例化HostAndPort對象,如地址不合法則拋出異常 HostAndPort configAddr( *it ); if ( it == configdbs.begin() ) { grid.setAllowLocalHost( configAddr.isLocalHost() ); } //不允許在configdbs出現(xiàn)本地地址,注:如果configdb中全部為本地地址 //(實(shí)際用處不大)時(shí)不會(huì)執(zhí)行下面if邏輯 if ( configAddr.isLocalHost() != grid.allowLocalHost() ) { out() << "cannot mix localhost and ip addresses in configdbs" << endl; return 10; } } catch ( DBException& e) { out() << "configdb: " << e.what() << endl; return 9; } }
// set some global state
//添加對鏈接池hook的綁定(shardingConnectionHook對象引用),以最終調(diào)用其onHandedOut方法 pool.addHook( &shardingConnectionHook ); //設(shè)置鏈接池名稱 pool.setName( "mongos connectionpool" ); //不設(shè)置“延遲kill游標(biāo)” DBClientConnection::setLazyKillCursor( false ); //設(shè)置當(dāng)replicaSet配置修改時(shí)的hook對象(replicaSetChangey方法會(huì)更新鏈接對象信息 ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) ); //顯示sharding版本信息
printShardingVersionInfo(); //實(shí)始化configServer if ( ! configServer.init( configdbs ) ) { cout << "couldn't resolve config db address" << endl; return 7; } if ( ! configServer.ok( true ) ) { cout << "configServer startup check failed" << endl; return 8; } //檢查Config版本信息(必要時(shí)進(jìn)行升級操作) int configError = configServer.checkConfigVersion( params.count( "upgrade" ) ); if ( configError ) { if ( configError > 0 ) { cout << "upgrade success!" << endl; } else { cout << "config server error: " << configError << endl; } return configError; } //重新設(shè)置config db信息(包括shard中chunk的min,lastmod信息) configServer.reloadSettings();
//初始化一些Signals信息,用于處理程序退出,中斷等情況
init(); //以線程方式啟動(dòng)webserver,循環(huán)偵聽授權(quán)訪問的 message信息,詳見dbwebserver.cpp文件中allowed方法 boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) ); MessageServer::Options opts; opts.port = cmdLine.port; opts.ipList = cmdLine.bind_ip; start(opts);//啟動(dòng)message服務(wù)器,偵聽客戶端message dbexit( EXIT_CLEAN ); return 0; 到這里,main代碼就介紹完了,但上面代碼段中的start才是啟動(dòng)balancer來均衡各個(gè)shard間chunk的操作,所以我們接著再看一下該方法的實(shí)現(xiàn): void start( const MessageServer::Options& opts ) {
setThreadName( "mongosMain" );//設(shè)置線程名稱 installChunkShardVersioning();//綁定chunk shard版本控制信息 balancer.go();//均衡shard 中chunk(節(jié)點(diǎn))信息,詳情參見 balance.cpp的run()方法 cursorCache.startTimeoutThread();//對空閑(過期)游標(biāo)進(jìn)行清除操作 log() << "waiting for connections on port " << cmdLine.port << endl; ShardedMessageHandler handler; MessageServer * server = createServer( opts , &handler );//構(gòu)造server對象 server->setAsTimeTracker(); server->run();//啟動(dòng)message服務(wù) }
|
|