1. ホーム
  2. Hadoop

HDFSソースコード解析 --- デコミッション

2022-02-20 22:03:37

はじめに

DecommissionManagerは、データノードの停止を管理します。バックグラウンドの監視スレッドは、デコミッション中のデータノードのステータスを定期的にチェックします。

プロセスの起動

起動コマンド

sudo -u hdfs hdfs dfsadmin -refreshNodes

FSNamesystem.java

exitCode = refreshNodes();
void refreshNodes() throws IOException {
    Configuration conf = new HdfsConfiguration();
    checkOperation(OperationCategory.UNCHECKED);
    checkSuperuserPrivilege();

    // read the xml file, find the file path corresponding to DFS_HOSTS_EXCLUDE
    RPC.Server.hostsReader.updateFileNames(
        conf.get(DFSConfigKeys.DFS_HOSTS_WHITELIST, ""),
        conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
    // Take the hostname out of the file
    RPC.Server.hostsReader.refresh();
    reloadNeverdelete(conf);
    dir.permissionCheckpatch.refreshAclMap();
    // swipe datanodes
    getBlockManager().getDatanodeManager().refreshNodes(conf);
  }

public synchronized void refresh(String hostNameString) throws IOException {
    LOG.info("Refreshing hosts (include/exclude) list");
    Set<String> newIncludes = new HashSet<String>();
    Set<String> newExcludes = new HashSet<String>();
    HashMap<String,ArrayList<String>> tmpvalid = new HashMap<String, ArrayList<String>>();
    boolean switchIncludes = false;
    boolean switchExcludes = false;
    if (!includesFile.isEmpty()) {
      readFileToSet("included", includesFile, newIncludes, tmpvalid);
      switchIncludes = true;
    }
    if (!includesFile.isEmpty()) {
      readFileToSet("excluded", excludesFile, newExcludes);
      switchExcludes = true;
    }

    if (switchIncludes) {
      // switch the new hosts that are to be included
      includes = newIncludes;
      validuserForHost = tmpvalid;
    }
    if (switchExcludes) {
      // switch the excluded hosts
      excludes = newExcludes;
    }
  }

HostFileReader.java

refresh()は、includedとexcludesの2つのホスト名を格納するSet<String>をファイルからメモリに読み込む処理です。

/**
   * Rereads conf to get hosts and exclude list file names.
   It checks if any of the hosts have changed states: * Rereads the files to update the hosts and exclude lists.
   * checks if any of the hosts have changed states:
   */
  public void refreshNodes(final Configuration conf) throws IOException {
    refreshHostsReader(conf);
    namesystem.getBMLock().writeLock(OpName.REFRESH_NODES);
    try {
      refreshDatanodes();
      countSoftwareVersions();
    } finally {
      namesystem.getBMLock().writeUnlock(OpName.REFRESH_NODES);
    }
  }

DatanodeManager.java

refreshNodes(conf)はrefreshDatanodesに移動します。

/**
   * 1. Added to hosts --> no further work needed here.
   * Removed from hosts --> mark AdminState as decommissioned. 
   * 3. Added to exclude --> start decommissioning.
   * 4. Removed from exclude --> stop decommission.
   */
  private void refreshDatanodes() {
    for(DatanodeDescriptor node : datanodeMap.values()) {
      // Check if not include.
      if (!hostConfigManager.isIncluded(node)) {
        node.setDisallowed(true); // case 2.
      } else {
        long maintenanceExpireTimeInMS =
            hostConfigManager.getMaintenanceExpirationTimeInMS(node);
        if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
          decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
        // determine if decommission is needed
        } else if (hostConfigManager.isExcluded(node)) {
          decomManager.startDecommission(node); // case 3.
        } else {
          decomManager.stopMaintenance(node);
          decomManager.stopDecommission(node); // case 4.
        }
      }
      node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
    }
  }

ノードがオフラインで開始する必要があるかどうかを判断します(DatanodeManagerの初期化時に新しいDecommissionManagerオブジェクトが作成されます)。

@Override
public synchronized boolean isExcluded(DatanodeID dn) {
    return isExcluded(dn.getResolvedAddress());
}

private boolean isExcluded(InetSocketAddress address) {
    return excludes.match(address);
}

除外されているかどうかを判断する

/**
 * Manage node decommissioning.
 * Node decommissioning operation state manager
 */
class DecommissionManager {
  static final Log LOG = LogFactory.getLog(DecommissionManager.class);
  
  // namespace system
  private final FSNamesystem fsnamesystem;
 
  DecommissionManager(FSNamesystem namesystem) {
    this.fsnamesystem = namesystem;
  }
 
  /* Periodically check decommission status. */
  // Monitor method
  class Monitor implements Runnable {
    ...
  }
}

ソースコード解析

デコミッションマネージャ

@VisibleForTesting
  public void startDecommission(DatanodeDescriptor node) {
    if (!node.isDecommissionInProgress()) {
      if (!node.isAlive()) {
        LOG.info("Dead node {} is decommissioned immediately.", node);
        node.setDecommissioned();
      } else if (!node.isDecommissioned()) {
        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
          LOG.info("Starting decommission of {} {} with {} blocks", 
              node, storage, storage.numBlocks());
        }
        // Update DN stats maintained by HeartbeatManager
        hbManager.startDecommission(node);
        node.decommissioningStatus.setStartTime(now());
        // Add to pending nodes
        pendingNodes.add(node);
      }
    } else {
      LOG.trace("startDecommission: Node {} is already decommissioned in "
              + "progress, nothing to do.", node);
    }
  }

スタートデコミッション

try {
  processPendingNodes();
  check();
} finally {
  namesystem.getBMLock().writeUnlock(OpName.DECOMMISSION_MONITOR);
}

リスナーで、processPendingNodes() メソッドを呼び出して、ダウングレードするデータノードを取得します。

private void processPendingNodes() {
  while (!pendingNodes.isEmpty() &&
    (maxConcurrentTrackedNodes == 0 ||
    outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
    outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
  }
}

Poll()でpendingNodesを取り出し、outOfServiceNodeBlockに格納する。

void activate(Configuration conf) {
  ...
    monitor = new Monitor(blocksPerInterval, 
        nodesPerInterval, maxConcurrentTrackedNodes);
    executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
        TimeUnit.SECONDS);
  ...
}

起動

Decommissionリスナーを起動します。

// Define a custom Monitor class to implement the Runnable interface
private class Monitor implements Runnable {
    /* The maximum number of blocks to check per tick.
     * The maximum number of blocks to check per tick.
     */
    private final int numBlocksPerCheck;
    /**
     * The maximum number of nodes to check per tick.
     */
    private final int numNodesPerCheck;
    /**
     * The maximum number of nodes to track in outOfServiceNodeBlocks.
     * A value of 0 means no limit.
     A value of 0 means no limit. */
    private final int maxConcurrentTrackedNodes;
    private final int maxConcurrentTrackedNodes; /**
     * The number of blocks that have been checked on this tick.
     */
    private int numBlocksChecked = 0;
    /**
     private int numBlocksChecked = 0; /** * The number of nodes that have been checked on this tick. used for 
     private int numBlocksChecked = 0; /* * The number of nodes that have been checked on this tick.
     Used for * testing. */
    private int numNodesChecked = 0;
    /**
     * The last datanode in outOfServiceNodeBlocks that we've processed
     */
    private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
        DatanodeID("", "", "", 0, 0, 0, 0, 0));

    Monitor(int numBlocksPerCheck, int numNodesPerCheck, int 
        maxConcurrentTrackedNodes) {
      this.numBlocksPerCheck = numBlocksPerCheck;
      this.numNodesPerCheck = numNodesPerCheck;
      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
    }

    private boolean exceededNumBlocksPerCheck() {
     
          it = new CyclicIteration<>(outOfServiceNodeBlocks,
              iterkey).iterator();
      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();

      while (it.hasNext()
          && !exceededNumBlocksPerCheck()
          && !exceededNumNodesPerCheck()) {
        numNodesChecked++;
        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
            entry = it.next();
        final DatanodeDescriptor dn = entry.getKey();
        // find all the blocks of the datanode
        AbstractList<BlockInfo> blocks = entry.getValue();
        boolean fullScan = false;
        if (dn.isMaintenance()) {
          // TODO HDFS-9390 make sure blocks are minimally replicated
          // before transitioning the node to IN_MAINTENANCE state.

          // If maintenance expires, stop tracking it.
          if (dn.maintenanceExpired()) {
            stopMaintenance(dn);
            toRemove.add(dn);
          }
          continue;
        }
        
        // This is the first time the datanode is scanned, and all the blocks with insufficient copies need to be found and added to the queue for replication.
        // The blocks condition here is the list of under-copied blocks.
        if (blocks == null) {
          LOG.debug("Newly-added node {}, doing full scan to find " +
              "insufficiently-replicated blocks.", dn);
          // return the list of under-replicated blocks on the datanode
          blocks = handleInsufficientlyReplicated(dn);
          // Add the Map of the node-blocklist that exited the service
          outOfServiceNodeBlocks.put(dn, blocks);
          fullScan = true;
        } else {
          // Already scanned this datanode, then just scan the list of blocks with insufficient copies again.
          LOG.debug("Processing decommission-in-progress node {}", dn);
          pruneSufficientlyReplicated(dn, blocks);
        }
        // A datanode has no more blocks, which means it can be deactivated
        if (blocks.size() == 0) {
          if (!fullScan) {
            // If we didn't just do a full scan, need to re-check with the 
            // full block map.
            //
            // We've replicated all the known insufficiently replicated // blocks. 
            // Re-check with the full block map before finally 
            // marking the datanode as decommissioned 
            LOG.debug("Node {} has finished replicating current set of "
                + "blocks, checking with the full block map.", dn);
            blocks = handleInsufficientlyReplicated(dn);
            outOfServiceNodeBlocks.put(dn, blocks);
          }
          final boolean isHealthy =
              blockManager.isNodeHealthyForDecommission(dn);
          if (blocks.size() == 0 && isHealthy) {
            setDecommissioned(dn);
            toRemove.add(dn);
            LOG.debug("Node {} is sufficiently replicated and healthy, "
                + "marked as decommissioned.", dn);
          } else {
            if (LOG.isDebugEnabled()) {
              StringBuilder b = new StringBuilder("Node {} ");
              if (isHealthy) {
                b.append("is ");
              } else {
                b.append("isn't ");
              }
              b.append("healthy and still needs to replicate {} more blocks," +
                  " decommissioning is still in progress.");
              LOG.debug(b.toString(), dn, blocks.size());
            }
          }
        } else {
          LOG.debug("Node {} still has {} blocks to replicate "
                  + "before it is a candidate to finish decommissioning.",
              dn, blocks.size());
        }
        iterkey = dn;
      }

モニター

常駐監視スレッド

// Define a custom Monitor class to implement the Runnable interface
private class Monitor implements Runnable {
    /* The maximum number of blocks to check per tick.
     * The maximum number of blocks to check per tick.
     */
    private final int numBlocksPerCheck;
    /**
     * The maximum number of nodes to check per tick.
     */
    private final int numNodesPerCheck;
    /**
     * The maximum number of nodes to track in outOfServiceNodeBlocks.
     * A value of 0 means no limit.
     A value of 0 means no limit. */
    private final int maxConcurrentTrackedNodes;
    private final int maxConcurrentTrackedNodes; /**
     * The number of blocks that have been checked on this tick.
     */
    private int numBlocksChecked = 0;
    /**
     private int numBlocksChecked = 0; /** * The number of nodes that have been checked on this tick. used for 
     private int numBlocksChecked = 0; /* * The number of nodes that have been checked on this tick.
     Used for * testing. */
    private int numNodesChecked = 0;
    /**
     * The last datanode in outOfServiceNodeBlocks that we've processed
     */
    private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
        DatanodeID("", "", "", 0, 0, 0, 0, 0));

    Monitor(int numBlocksPerCheck, int numNodesPerCheck, int 
        maxConcurrentTrackedNodes) {
      this.numBlocksPerCheck = numBlocksPerCheck;
      this.numNodesPerCheck = numNodesPerCheck;
      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
    }

    private boolean exceededNumBlocksPerCheck() {
     
          it = new CyclicIteration<>(outOfServiceNodeBlocks,
              iterkey).iterator();
      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();

      while (it.hasNext()
          && !exceededNumBlocksPerCheck()
          && !exceededNumNodesPerCheck()) {
        numNodesChecked++;
        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
            entry = it.next();
        final DatanodeDescriptor dn = entry.getKey();
        // find all the blocks of the datanode
        AbstractList<BlockInfo> blocks = entry.getValue();
        boolean fullScan = false;
        if (dn.isMaintenance()) {
          // TODO HDFS-9390 make sure blocks are minimally replicated
          // before transitioning the node to IN_MAINTENANCE state.

          // If maintenance expires, stop tracking it.
          if (dn.maintenanceExpired()) {
            stopMaintenance(dn);
            toRemove.add(dn);
          }
          continue;
        }
        
        // This is the first time the datanode is scanned, and all the blocks with insufficient copies need to be found and added to the queue for replication.
        // The blocks condition here is the list of under-copied blocks.
        if (blocks == null) {
          LOG.debug("Newly-added node {}, doing full scan to find " +
              "insufficiently-replicated blocks.", dn);
          // return the list of under-replicated blocks on the datanode
          blocks = handleInsufficientlyReplicated(dn);
          // Add the Map of the node-blocklist that exited the service
          outOfServiceNodeBlocks.put(dn, blocks);
          fullScan = true;
        } else {
          // Already scanned this datanode, then just scan the list of blocks with insufficient copies again.
          LOG.debug("Processing decommission-in-progress node {}", dn);
          pruneSufficientlyReplicated(dn, blocks);
        }
        // A datanode has no more blocks, which means it can be deactivated
        if (blocks.size() == 0) {
          if (!fullScan) {
            // If we didn't just do a full scan, need to re-check with the 
            // full block map.
            //
            // We've replicated all the known insufficiently replicated // blocks. 
            // Re-check with the full block map before finally 
            // marking the datanode as decommissioned 
            LOG.debug("Node {} has finished replicating current set of "
                + "blocks, checking with the full block map.", dn);
            blocks = handleInsufficientlyReplicated(dn);
            outOfServiceNodeBlocks.put(dn, blocks);
          }
          final boolean isHealthy =
              blockManager.isNodeHealthyForDecommission(dn);
          if (blocks.size() == 0 && isHealthy) {
            setDecommissioned(dn);
            toRemove.add(dn);
            LOG.debug("Node {} is sufficiently replicated and healthy, "
                + "marked as decommissioned.", dn);
          } else {
            if (LOG.isDebugEnabled()) {
              StringBuilder b = new StringBuilder("Node {} ");
              if (isHealthy) {
                b.append("is ");
              } else {
                b.append("isn't ");
              }
              b.append("healthy and still needs to replicate {} more blocks," +
                  " decommissioning is still in progress.");
              LOG.debug(b.toString(), dn, blocks.size());
            }
          }
        } else {
          LOG.debug("Node {} still has {} blocks to replicate "
                  + "before it is a candidate to finish decommissioning.",
              dn, blocks.size());
        }
        iterkey = dn;
      }