MongoDB提供了auto-sharding 功能。因为其是auto-sharding,即mongodb通过mongos(一个自动分片模块,用于构建一个大规模的可扩展的数据库集群,这个集群可以并入动态增加的机器)自动建立一个水平扩展的数据库集群系统,将数据库分表存储在sharding的各个节点上。
一个mongodb集群包括一些shards(包括一些mongod进程),mongos路由进程,一个或多个config服务器
下面是一些相关词汇说明:
Shards : 每一个shard包括一个或多个服务和存储数据的mongod进程(mongod是MongoDB数据的核心进程)典型的每个shard开启多个服务来提高服务的可用性。这些服务/mongod进程在shard中组成一个复制集
Chunks: Chunk是一个来自特殊集合中的一个数据范围,(collection,minKey,maxKey)描叙一个chunk,它介于minKey和maxKey范围之间。例如chunks 的maxsize大小是100M,如果一个文件达到或超过这个范围时,会被切分到2个新的chunks中。当一个shard的数据过量时,chunks将会被迁移到其他的shards上。同样,chunks也可以迁移到其他的shards上
Config Servers : Config服务器存储着集群的metadata信息,包括每个服务器,每个shard的基本信息和chunk信息Config服务器主要存储的是chunk信息。每一个config服务器都复制了完整的chunk信息。
今天要介绍的源码主要是Mongos的主入口函数的执行流程,首先我们打开Mongos的项目(可通过打开源码db\db_10.sln加载所有项目),如下图:
注:如果要调试mongos,需要设置一个mongod进程和一个Config Server,形如:
d:\mongodb>bin>mongod –dbpath d:\mongodb\db\ –port 27012
d:\mongodb>bin>mongod –configsvr –dbpath d:\mongodb\db\ –port 27022
然后在vs2010中配置相应的boost路径信息及启动参数信息,如下图:
#p#
下面开始正文。首先打开mongos项目中的server.cpp文件,找到下面方法:
intmain(intargc,char*argv[]){ try{ return_main(argc,argv); } catch(DBException&e){ cout<<"uncaughtexceptioninmongosmain:"<<endl; cout<<e.toString()<<endl; } catch(std::exception&e){ cout<<"uncaughtexceptioninmongosmain:"<<endl; cout<<e.what()<<endl; } catch(...){ cout<<"uncaughtexceptioninmongosmain"<<endl; } return20; }
该方法是mongos的主函数,代码很简,它主要是try方式执行_main方法,下面是_main的执行流程:
int_main(intargc,char*argv[]){ staticStaticObserverstaticObserver; mongosCommand=argv[0]; //声明options信息描述对象 po::options_descriptionoptions("Generaloptions"); po::options_descriptionsharding_options("Shardingoptions"); po::options_descriptionhidden("Hiddenoptions"); po::positional_options_descriptionpositional; CmdLine::addGlobalOptions(options,hidden); //添加sharding选项描述信息 sharding_options.add_options() ("configdb",po::value(),"1or3commaseparatedconfigservers") ("test","justrununittests") ("upgrade","upgrademetadataversion") ("chunkSize",po::value(),"maximumamountofdataperchunk") ("ipv6","enableIPv6support(disabledbydefault)") ("jsonp","allowJSONPaccessviahttp(hassecurityimplications)") ; options.add(sharding_options); .....
在完成option描述信息的初始化操作之后,下面就开始对启动命令行参数进行分析和执行了,如下:
..... //parseoptions po::variables_mapparams; //对argc,argv进行分析并转换成params,以便下面使用 if(!CmdLine::store(argc,argv,options,hidden,positional,params)) return0; //Thedefaultvaluemayvarydependingoncompileoptions,butformongos //wewantdurabilitytobedisabled. cmdLine.dur=false; //如果是help if(params.count("help")){ cout<<options<<endl; return0; } //如果是版本信息 if(params.count("version")){ printShardingVersionInfo(); return0; } //如要设置chunkSize if(params.count("chunkSize")){ Chunk::MaxChunkSize=params["chunkSize"].as()*1024*1024; } ...... //必选项,设置configdb信息 if(!params.count("configdb")){ out()<<"error:noargsfor--configdb"<<endl; return4; } vectorconfigdbs; //对参数configdb进行分割(以','分割) splitStringDelim(params["configdb"].as(),&configdbs,','); //mongodb强制为1或3,具体原因不明 if(configdbs.size()!=1&&configdbs.size()!=3){ out()<<"needeither1or3configdbs"<<endl; return5; } //weeitherhaveaseetingwereallprocessareinlocalhostornoneis for(vector::const_iteratorit=configdbs.begin();it!=configdbs.end();++it){ try{ //根据地址参数实例化HostAndPort对象,如地址不合法则抛出异常 HostAndPortconfigAddr(*it); if(it==configdbs.begin()){ grid.setAllowLocalHost(configAddr.isLocalHost()); } //不允许在configdbs出现本地地址,注:如果configdb中全部为本地地址 //(实际用处不大)时不会执行下面if逻辑 if(configAddr.isLocalHost()!=grid.allowLocalHost()){ out()<<"cannotmixlocalhostandipaddressesinconfigdbs"<<endl; return10; } } catch(DBException&e){ out()<<"configdb:"<<e.what()<<endl; return9; } }
上面完成了对命令行参数分析之后,接下来mongos要加载绑定几个hook:
//setsomeglobalstate //添加对链接池hook的绑定(shardingConnectionHook对象引用),以最终调用其onHandedOut方法 pool.addHook(&shardingConnectionHook); //设置链接池名称 pool.setName("mongosconnectionpool"); //不设置“延迟kill游标” DBClientConnection::setLazyKillCursor(false); //设置当replicaSet配置修改时的hook对象(replicaSetChangey方法会更新链接对象信息 ReplicaSetMonitor::setConfigChangeHook(boost::bind(&ConfigServer::replicaSetChange,&configServer,_1));
上面的hook主要是在mongos主程序启动完成后,在运行期间执行一些数据操作时执行某些额外操作。从代码可以看出,mongos使用了链接池功能以提升获取链接的效率,具体实现机制我会在后绪章节中加以阐述。代码中的ReplicaSetMonitor类为一个维护和获取有效复制集的监视类,它提供了获取有效master,slave 的方法。完成这一步绑定后,接着mongos就会对config server信息进行初始化和升级操作了,如下:
//显示sharding版本信息 printShardingVersionInfo(); //实始化configServer if(!configServer.init(configdbs)){ cout<<"couldn'tresolveconfigdbaddress"<<endl; return7; } if(!configServer.ok(true)){ cout<<"configServerstartupcheckfailed"<<endl; return8; } //检查Config版本信息(必要时进行升级操作) intconfigError=configServer.checkConfigVersion(params.count("upgrade")); if(configError){ if(configError>0){ cout<<"upgradesuccess!"<<endl; } else{ cout<<"configservererror:"<<configError<<endl; } returnconfigError; } //重新设置configdb信息(包括shard中chunk的min,lastmod信息) configServer.reloadSettings();
***就是启动侦听服务,这里mongos启动了两个侦听服务器,一个是以线程方式启动,用于接收授权的用户操作信息,另一个则是普遍的循环侦听服务,用于侦听客户端message如下:
//初始化一些Signals信息,用于处理程序退出,中断等情况 init(); //以线程方式启动webserver,循环侦听授权访问的message信息,详见dbwebserver.cpp文件中allowed方法 boost::threadweb(boost::bind(&webServerThread,newNoAdminAccess()/*takesownership*/)); MessageServer::Optionsopts; opts.port=cmdLine.port; opts.ipList=cmdLine.bind_ip; start(opts);//启动message服务器,侦听客户端message dbexit(EXIT_CLEAN); return0;
到这里,main代码就介绍完了,但上面代码段中的start才是启动balancer来均衡各个shard间chunk的操作,所以我们接着再看一下该方法的实现:
voidstart(constMessageServer::Options&opts){ setThreadName("mongosMain");//设置线程名称 installChunkShardVersioning();//绑定chunkshard版本控制信息 balancer.go();//均衡shard中chunk(节点)信息,详情参见balance.cpp的run()方法 cursorCache.startTimeoutThread();//对空闲(过期)游标进行清除操作 log()<<"waitingforconnectionsonport"<<cmdLine.port<<endl; ShardedMessageHandlerhandler; MessageServer*server=createServer(opts,&handler);//构造server对象 server->setAsTimeTracker(); server->run();//启动message服务 }
好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍balancer的实现方式和操作流程。
原文链接:http://www.cnblogs.com/daizhj/archive/2011/05/16/2022041.html
【编辑推荐】
- Mongodb源码分析–内存文件映射(MMAP)走进MongoDB的世界 展开MongoDB的学习之旅浅析Mongodb源码之游标Cursor野心勃勃的NoSQL新贵 MongoDB应用实战MongoDB与CouchDB全方位对比