HDFSソースコード解析 --- デコミッション
2022-02-20 22:03:37
はじめに
DecommissionManagerは、データノードの停止を管理します。バックグラウンドの監視スレッドは、デコミッション中のデータノードのステータスを定期的にチェックします。
プロセスの起動
起動コマンド
sudo -u hdfs hdfs dfsadmin -refreshNodes
FSNamesystem.java
exitCode = refreshNodes();
void refreshNodes() throws IOException {
Configuration conf = new HdfsConfiguration();
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
// read the xml file, find the file path corresponding to DFS_HOSTS_EXCLUDE
RPC.Server.hostsReader.updateFileNames(
conf.get(DFSConfigKeys.DFS_HOSTS_WHITELIST, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
// Take the hostname out of the file
RPC.Server.hostsReader.refresh();
reloadNeverdelete(conf);
dir.permissionCheckpatch.refreshAclMap();
// swipe datanodes
getBlockManager().getDatanodeManager().refreshNodes(conf);
}
public synchronized void refresh(String hostNameString) throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
HashMap<String,ArrayList<String>> tmpvalid = new HashMap<String, ArrayList<String>>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (!includesFile.isEmpty()) {
readFileToSet("included", includesFile, newIncludes, tmpvalid);
switchIncludes = true;
}
if (!includesFile.isEmpty()) {
readFileToSet("excluded", excludesFile, newExcludes);
switchExcludes = true;
}
if (switchIncludes) {
// switch the new hosts that are to be included
includes = newIncludes;
validuserForHost = tmpvalid;
}
if (switchExcludes) {
// switch the excluded hosts
excludes = newExcludes;
}
}
HostFileReader.java
refresh()は、includedとexcludesの2つのホスト名を格納するSet<String>をファイルからメモリに読み込む処理です。
/**
* Rereads conf to get hosts and exclude list file names.
It checks if any of the hosts have changed states: * Rereads the files to update the hosts and exclude lists.
* checks if any of the hosts have changed states:
*/
public void refreshNodes(final Configuration conf) throws IOException {
refreshHostsReader(conf);
namesystem.getBMLock().writeLock(OpName.REFRESH_NODES);
try {
refreshDatanodes();
countSoftwareVersions();
} finally {
namesystem.getBMLock().writeUnlock(OpName.REFRESH_NODES);
}
}
DatanodeManager.java
refreshNodes(conf)はrefreshDatanodesに移動します。
/**
* 1. Added to hosts --> no further work needed here.
* Removed from hosts --> mark AdminState as decommissioned.
* 3. Added to exclude --> start decommissioning.
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!hostConfigManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
} else {
long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(node);
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
// determine if decommission is needed
} else if (hostConfigManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3.
} else {
decomManager.stopMaintenance(node);
decomManager.stopDecommission(node); // case 4.
}
}
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
}
}
ノードがオフラインで開始する必要があるかどうかを判断します(DatanodeManagerの初期化時に新しいDecommissionManagerオブジェクトが作成されます)。
@Override
public synchronized boolean isExcluded(DatanodeID dn) {
return isExcluded(dn.getResolvedAddress());
}
private boolean isExcluded(InetSocketAddress address) {
return excludes.match(address);
}
除外されているかどうかを判断する
/**
* Manage node decommissioning.
* Node decommissioning operation state manager
*/
class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
// namespace system
private final FSNamesystem fsnamesystem;
DecommissionManager(FSNamesystem namesystem) {
this.fsnamesystem = namesystem;
}
/* Periodically check decommission status. */
// Monitor method
class Monitor implements Runnable {
...
}
}
ソースコード解析
デコミッションマネージャ
@VisibleForTesting
public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress()) {
if (!node.isAlive()) {
LOG.info("Dead node {} is decommissioned immediately.", node);
node.setDecommissioned();
} else if (!node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Starting decommission of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
// Add to pending nodes
pendingNodes.add(node);
}
} else {
LOG.trace("startDecommission: Node {} is already decommissioned in "
+ "progress, nothing to do.", node);
}
}
スタートデコミッション
try {
processPendingNodes();
check();
} finally {
namesystem.getBMLock().writeUnlock(OpName.DECOMMISSION_MONITOR);
}
リスナーで、processPendingNodes() メソッドを呼び出して、ダウングレードするデータノードを取得します。
private void processPendingNodes() {
while (!pendingNodes.isEmpty() &&
(maxConcurrentTrackedNodes == 0 ||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
}
}
Poll()でpendingNodesを取り出し、outOfServiceNodeBlockに格納する。
void activate(Configuration conf) {
...
monitor = new Monitor(blocksPerInterval,
nodesPerInterval, maxConcurrentTrackedNodes);
executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
TimeUnit.SECONDS);
...
}
起動
Decommissionリスナーを起動します。
// Define a custom Monitor class to implement the Runnable interface
private class Monitor implements Runnable {
/* The maximum number of blocks to check per tick.
* The maximum number of blocks to check per tick.
*/
private final int numBlocksPerCheck;
/**
* The maximum number of nodes to check per tick.
*/
private final int numNodesPerCheck;
/**
* The maximum number of nodes to track in outOfServiceNodeBlocks.
* A value of 0 means no limit.
A value of 0 means no limit. */
private final int maxConcurrentTrackedNodes;
private final int maxConcurrentTrackedNodes; /**
* The number of blocks that have been checked on this tick.
*/
private int numBlocksChecked = 0;
/**
private int numBlocksChecked = 0; /** * The number of nodes that have been checked on this tick. used for
private int numBlocksChecked = 0; /* * The number of nodes that have been checked on this tick.
Used for * testing. */
private int numNodesChecked = 0;
/**
* The last datanode in outOfServiceNodeBlocks that we've processed
*/
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
DatanodeID("", "", "", 0, 0, 0, 0, 0));
Monitor(int numBlocksPerCheck, int numNodesPerCheck, int
maxConcurrentTrackedNodes) {
this.numBlocksPerCheck = numBlocksPerCheck;
this.numNodesPerCheck = numNodesPerCheck;
this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
}
private boolean exceededNumBlocksPerCheck() {
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
while (it.hasNext()
&& !exceededNumBlocksPerCheck()
&& !exceededNumNodesPerCheck()) {
numNodesChecked++;
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
entry = it.next();
final DatanodeDescriptor dn = entry.getKey();
// find all the blocks of the datanode
AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
if (dn.isMaintenance()) {
// TODO HDFS-9390 make sure blocks are minimally replicated
// before transitioning the node to IN_MAINTENANCE state.
// If maintenance expires, stop tracking it.
if (dn.maintenanceExpired()) {
stopMaintenance(dn);
toRemove.add(dn);
}
continue;
}
// This is the first time the datanode is scanned, and all the blocks with insufficient copies need to be found and added to the queue for replication.
// The blocks condition here is the list of under-copied blocks.
if (blocks == null) {
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
// return the list of under-replicated blocks on the datanode
blocks = handleInsufficientlyReplicated(dn);
// Add the Map of the node-blocklist that exited the service
outOfServiceNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// Already scanned this datanode, then just scan the list of blocks with insufficient copies again.
LOG.debug("Processing decommission-in-progress node {}", dn);
pruneSufficientlyReplicated(dn, blocks);
}
// A datanode has no more blocks, which means it can be deactivated
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
// full block map.
//
// We've replicated all the known insufficiently replicated // blocks.
// Re-check with the full block map before finally
// marking the datanode as decommissioned
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyReplicated(dn);
outOfServiceNodeBlocks.put(dn, blocks);
}
final boolean isHealthy =
blockManager.isNodeHealthyForDecommission(dn);
if (blocks.size() == 0 && isHealthy) {
setDecommissioned(dn);
toRemove.add(dn);
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as decommissioned.", dn);
} else {
if (LOG.isDebugEnabled()) {
StringBuilder b = new StringBuilder("Node {} ");
if (isHealthy) {
b.append("is ");
} else {
b.append("isn't ");
}
b.append("healthy and still needs to replicate {} more blocks," +
" decommissioning is still in progress.");
LOG.debug(b.toString(), dn, blocks.size());
}
}
} else {
LOG.debug("Node {} still has {} blocks to replicate "
+ "before it is a candidate to finish decommissioning.",
dn, blocks.size());
}
iterkey = dn;
}
モニター
常駐監視スレッド
// Define a custom Monitor class to implement the Runnable interface
private class Monitor implements Runnable {
/* The maximum number of blocks to check per tick.
* The maximum number of blocks to check per tick.
*/
private final int numBlocksPerCheck;
/**
* The maximum number of nodes to check per tick.
*/
private final int numNodesPerCheck;
/**
* The maximum number of nodes to track in outOfServiceNodeBlocks.
* A value of 0 means no limit.
A value of 0 means no limit. */
private final int maxConcurrentTrackedNodes;
private final int maxConcurrentTrackedNodes; /**
* The number of blocks that have been checked on this tick.
*/
private int numBlocksChecked = 0;
/**
private int numBlocksChecked = 0; /** * The number of nodes that have been checked on this tick. used for
private int numBlocksChecked = 0; /* * The number of nodes that have been checked on this tick.
Used for * testing. */
private int numNodesChecked = 0;
/**
* The last datanode in outOfServiceNodeBlocks that we've processed
*/
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
DatanodeID("", "", "", 0, 0, 0, 0, 0));
Monitor(int numBlocksPerCheck, int numNodesPerCheck, int
maxConcurrentTrackedNodes) {
this.numBlocksPerCheck = numBlocksPerCheck;
this.numNodesPerCheck = numNodesPerCheck;
this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
}
private boolean exceededNumBlocksPerCheck() {
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
while (it.hasNext()
&& !exceededNumBlocksPerCheck()
&& !exceededNumNodesPerCheck()) {
numNodesChecked++;
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
entry = it.next();
final DatanodeDescriptor dn = entry.getKey();
// find all the blocks of the datanode
AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
if (dn.isMaintenance()) {
// TODO HDFS-9390 make sure blocks are minimally replicated
// before transitioning the node to IN_MAINTENANCE state.
// If maintenance expires, stop tracking it.
if (dn.maintenanceExpired()) {
stopMaintenance(dn);
toRemove.add(dn);
}
continue;
}
// This is the first time the datanode is scanned, and all the blocks with insufficient copies need to be found and added to the queue for replication.
// The blocks condition here is the list of under-copied blocks.
if (blocks == null) {
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
// return the list of under-replicated blocks on the datanode
blocks = handleInsufficientlyReplicated(dn);
// Add the Map of the node-blocklist that exited the service
outOfServiceNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// Already scanned this datanode, then just scan the list of blocks with insufficient copies again.
LOG.debug("Processing decommission-in-progress node {}", dn);
pruneSufficientlyReplicated(dn, blocks);
}
// A datanode has no more blocks, which means it can be deactivated
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
// full block map.
//
// We've replicated all the known insufficiently replicated // blocks.
// Re-check with the full block map before finally
// marking the datanode as decommissioned
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyReplicated(dn);
outOfServiceNodeBlocks.put(dn, blocks);
}
final boolean isHealthy =
blockManager.isNodeHealthyForDecommission(dn);
if (blocks.size() == 0 && isHealthy) {
setDecommissioned(dn);
toRemove.add(dn);
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as decommissioned.", dn);
} else {
if (LOG.isDebugEnabled()) {
StringBuilder b = new StringBuilder("Node {} ");
if (isHealthy) {
b.append("is ");
} else {
b.append("isn't ");
}
b.append("healthy and still needs to replicate {} more blocks," +
" decommissioning is still in progress.");
LOG.debug(b.toString(), dn, blocks.size());
}
}
} else {
LOG.debug("Node {} still has {} blocks to replicate "
+ "before it is a candidate to finish decommissioning.",
dn, blocks.size());
}
iterkey = dn;
}
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
ハートビート・エフェクトのためのHTML+CSS
-
HTML ホテル フォームによるフィルタリング
-
HTML+cssのボックスモデル例(円、半円など)「border-radius」使いやすい
-
HTMLテーブルのテーブル分割とマージ(colspan, rowspan)
-
ランダム・ネームドロッパーを実装するためのhtmlサンプルコード
-
Html階層型ボックスシャドウ効果サンプルコード
-
QQの一時的なダイアログボックスをポップアップし、友人を追加せずにオンラインで話す効果を達成する方法
-
sublime / vscodeショートカットHTMLコード生成の実装
-
HTMLページを縮小した後にスクロールバーを表示するサンプルコード
-
html のリストボックス、テキストフィールド、ファイルフィールドのコード例