当前位置: 首页 > news >正文

Hadoop之DataNode启动源码解析

Hadoop之DataNode启动源码解析

添加依赖

为了能够编译和运行 Hadoop 的 DataNode 组件,我们需要在项目的 pom.xml 文件中添加以下依赖:

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency>
</dependencies>
DataNode 类介绍

DataNode 类是 Hadoop 分布式文件系统 (HDFS) 中的一个核心组件,它负责存储文件系统的数据块。每个部署可以包含一个或多个 DataNode 实例。DataNodeNameNode 通信以报告其存储状态,并响应来自 NameNode 的指令,如删除或复制块等操作。此外,DataNode 还需要与客户端代码和其他 DataNode 进行交互。

DataNode 主程序入口

DataNode 的主程序入口点位于 main 方法中,该方法首先检查命令行参数是否存在帮助请求,然后调用 secureMain 方法初始化 DataNode 实例。

public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);
}public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {StringUtils.startupShutdownMessage(DataNode.class, args, LOG);DataNode datanode = createDataNode(args, null, resources);… …} catch (Throwable e) {LOG.error("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}
}
DataNode 实例化

createDataNode 方法用于实例化 DataNode 对象,并启动其守护进程。

public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 初始化DNDataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动DN进程dn.runDatanodeDaemon();}return dn;
}public static DataNode instantiateDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {... ...return makeInstance(dataLocations, conf, resources);
}static DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {... ...return new DataNode(conf, locations, storageLocationChecker, resources);
}
DataNode 构造函数

构造函数初始化了 DataNode 的主要属性,并启动了必要的组件。

DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final StorageLocationChecker storageLocationChecker,final SecureResources resources) throws IOException {super(conf);... ...try {hostName = getHostName(conf);LOG.info("Configured hostname is {}", hostName);// 启动DNstartDataNode(dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}... ...
}
DataNode 启动过程

startDataNode 方法初始化了 DataNode 的关键组件,包括数据存储 (DataStorage)、MXBean 注册、DataXceiver 服务器、HTTP 服务器等。

void startDataNode(List<StorageLocation> dataDirectories,SecureResources resources) throws IOException {... ...// 创建数据存储对象storage = new DataStorage();// global DN settingsregisterMXBean();// 初始化DataXceiverinitDataXceiver();// 启动HttpServerstartInfoServer();pauseMonitor = new JvmPauseMonitor();pauseMonitor.init(getConf());pauseMonitor.start();// BlockPoolTokenSecretManager is required to create ipc server.this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();// Login is done by now. Set the DN user name.dnUserName = UserGroupInformation.getCurrentUser().getUserName();LOG.info("dnUserName = {}", dnUserName);LOG.info("supergroup = {}", supergroup);// 初始化RPC服务initIpcServer();metrics = DataNodeMetrics.create(getConf(), getDisplayName());peerMetrics = dnConf.peerStatsEnabled ?DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);ecWorker = new ErasureCodingWorker(getConf(), this);blockRecoveryWorker = new BlockRecoveryWorker(this);// 创建BlockPoolManagerblockPoolManager = new BlockPoolManager(this);// 心跳管理blockPoolManager.refreshNamenodes(getConf());// Create the ReadaheadPool from the DataNode context so we can// exit without having to explicitly shutdown its thread pool.readaheadPool = ReadaheadPool.getInstance();saslClient = new SaslDataTransferClient(dnConf.getConf(),dnConf.saslPropsResolver, dnConf.trustedChannelResolver);saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);startMetricsLogger();if (dnConf.diskStatsEnabled) {diskMetrics = new DataNodeDiskMetrics(this,dnConf.outliersReportIntervalMs);}
}
初始化DataXceiverServer

initDataXceiver 方法创建并启动了 DataXceiverServer,它是 DataNode 用来接收客户端和其他 DataNode 发送过来的数据的服务。

private void initDataXceiver() throws IOException {// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty... ...
}
初始化HTTP服务

startInfoServer 方法初始化并启动了 HTTP 服务器,用于提供有关 DataNode 的信息和服务。

private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();if (httpServer.getHttpAddress() != null) {infoPort = httpServer.getHttpAddress().getPort();}if (httpServer.getHttpsAddress() != null) {infoSecurePort = httpServer.getHttpsAddress().getPort();}
}
DatanodeHttpServer 构造函数

DatanodeHttpServer 构造函数用于创建 HTTP 服务器实例。

public DatanodeHttpServer(final Configuration conf,final DataNode datanode,final ServerSocketChannel externalHttpChannel)throws IOException {... ...HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode").setConf(confForInfoServer).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).hostName(getHostnameForSpnegoPrincipal(confForInfoServer)).addEndpoint(URI.create("http://localhost:" + proxyPort)).setFindPort(true);... ...
}
初始化DataNode的RPC服务端

初始化 DataNode 的 RPC 服务端涉及到配置和启动相关的服务,以供客户端连接。

private void initIpcServer() throws IOException {InetSocketAddress ipcAddr = NetUtils.createSocketAddr(getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));... ...ipcServer = new RPC.Builder(getConf()).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false).setSecretManager(blockPoolTokenSecretManager).build();... ...
}
DataNode 向 NameNode 注册

DataNode 需要向 NameNode 注册自身,以便 NameNode 可以跟踪和管理集群中的所有 DataNode。

void refreshNamenodes(Configuration conf)throws IOException {... ...synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap, newLifelineAddressMap);}
}private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap,Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)throws IOException {… …synchronized (this) {… …// Step 3. Start new nameservicesif (!toAdd.isEmpty()) {for (String nsToAdd : toAdd) {… …BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}startAll();}… …
}protected BPOfferService createBPOS(final String nameserviceId,List<InetSocketAddress> nnAddrs,List<InetSocketAddress> lifelineNnAddrs) {// 根据NameNode个数创建对应的服务return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
启动所有的服务

启动所有的服务涉及到创建服务实例并开始它们的生命周期。

synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {// 启动服务bpos.start();}return null;}});} catch (InterruptedException ex) {... ...}
}void start() {for (BPServiceActor actor : bpServices) {actor.start();}
}void start() {... ...bpThread = new Thread(this);bpThread.setDaemon(true); // needed for JUnit testing// 表示开启一个线程,所有查找该线程的 run 方法bpThread.start();if (lifelineSender != null) {lifelineSender.start();}
}
DataNode 服务线程运行

DataNode 服务线程运行涉及初始化和心跳发送。

public void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storage// 向NN 注册connectToNNAndHandshake();break;} catch (IOException ioe) {// Initial handshake, storage recovery or registration failedrunningState = RunningState.INIT_FAILED;if (shouldRetryInit()) {// Retry until all namenode's of BPOS failed initializationLOG.error("Initialization failed for " + this + " "+ ioe.getLocalizedMessage());// 注册失败,5s后重试sleepAndLogInterrupts(5000, "initializing");} else {runningState = RunningState.FAILED;LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);return;}}}… …while (shouldRun()) {try {// 发送心跳offerService();} catch (Exception ex) {... ...}}
}private void connectToNNAndHandshake() throws IOException {// get NN proxy 获取NN的RPC客户端对象bpNamenode = dn.connectToNN(nnAddr);// First phase of the handshake with NN - get the namespace// info.NamespaceInfo nsInfo = retrieveNamespaceInfo();// Verify that this matches the other NN in this HA pair.// This also initializes our block pool in the DN if we are// the first NN connection for this BP.bpos.verifyAndSetNamespaceInfo(this, nsInfo);/* set thread name again to include NamespaceInfo when it's available. */this.bpThread.setName(formatThreadName("heartbeating", nnAddr));// 注册register(nsInfo);
}
DataNode 注册至 NameNode

DataNode 注册至 NameNode 包括创建注册信息以及实际的注册过程。

void register(NamespaceInfo nsInfo) throws IOException {// 创建注册信息DatanodeRegistration newBpRegistration = bpos.createRegistration();LOG.info(this + " beginning handshake with NN");while (shouldRun()) {try {// Use returned registration from namenode with updated fields// 把注册信息发送给NN(DN调用接口方法,执行在NN)newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);newBpRegistration.setNamespaceInfo(nsInfo);bpRegistration = newBpRegistration;break;} catch(EOFException e) {  // namenode might have just restartedLOG.info("Problem connecting to server: " + nnAddr + " :"+ e.getLocalizedMessage());sleepAndLogInterrupts(1000, "connecting to server");} catch(SocketTimeoutException e) {  // namenode is busyLOG.info("Problem connecting to server: " + nnAddr);sleepAndLogInterrupts(1000, "connecting to server");}}… …
}
NameNode 处理注册

NameNode 接收到 DataNode 的注册请求后,会进行处理。

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)throws IOException {checkNNStartup();verifySoftwareVersion(nodeReg);// 注册DNnamesystem.registerDatanode(nodeReg);return nodeReg;
}void registerDatanode(DatanodeRegistration nodeReg) throws IOException {writeLock();try {blockManager.registerDatanode(nodeReg);} finally {writeUnlock("registerDatanode");}
}public void registerDatanode(DatanodeRegistration nodeReg)throws DisallowedDatanodeException, UnresolvedTopologyException {... ...// register new datanode 注册DNaddDatanode(nodeDescr);blockManager.getBlockReportLeaseManager().register(nodeDescr);// also treat the registration message as a heartbeat// no need to update its timestamp// because its is done when the descriptor is created// 将DN添加到心跳管理heartbeatManager.addDatanode(nodeDescr);heartbeatManager.updateDnStat(nodeDescr);incrementVersionCount(nodeReg.getSoftwareVersion());startAdminOperationIfNecessary(nodeDescr);success = true;... ...
}void addDatanode(final DatanodeDescriptor node) {// To keep host2DatanodeMap consistent with datanodeMap,// remove  from host2DatanodeMap the datanodeDescriptor removed// from datanodeMap before adding node to host2DatanodeMap.synchronized(this) {host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));}networktopology.add(node); // may throw InvalidTopologyExceptionhost2DatanodeMap.add(node);checkIfClusterIsNowMultiRack(node);resolveUpgradeDomain(node);… …
}

http://www.mrgr.cn/news/3008.html

相关文章:

  • vue3 组合式 API:setup()
  • Java语言程序设计——篇十五(3)
  • CSS的:current伪类:精准定位当前活动元素
  • Python----爬虫
  • 【大数据】什么是数据中台?
  • CSS伪类选择器和伪元素
  • 为什么使用网络请求时,遇到 HTTP 请求返回 404(Not Found)错误,而使用 HTTPS 请求则正常工作
  • 计算机的演进之路:历史与组成结构全解析
  • 12.2 使用prometheus-sdk向pushgateway打点
  • github删除历史所有commit
  • STM32(二):GPIO
  • Redis 技术详解
  • 迎接“云+AI”智算时代!生态案例分论坛议程一览 | 2024 龙蜥大会
  • C++ 特殊类设计以及单例模式
  • 使用webpack搭建个本地项目
  • VMwareWorkstation安装ESXi 7.0U3系统详细教程
  • 科普贴:Xinstall一键直达,解决App页面跳转烦恼
  • ChatGPT 3.5/4.0新手使用手册~ (论文润色、降重指令) 亲测~
  • C#基础:父类 = new 子类() 的效果和作用
  • 2024最新款数据防泄密软件来了!(数据防泄密软件的2种方法详解!)