PerfectHadoop: YARN Resource Manager

This project provides a Swift wrapper of YARN Resource Manager REST API:

  • YARNResourceManager(): access to cluster information of YARN, including cluster and its metrics, scheduler, application submit, etc.

Connect to YARN Resource Manager

To connect to your Hadoop YARN Resource Manager by Perfect, initialize a YARNResourceManager() object with sufficient parameters:

// this connection could possibly do some basic operations
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088)

or connect to Hadoop YARN Node Manager with a valid user name:

// add user name if need
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "your user name")

Authentication

If using Kerberos to authenticate, please try codes below:

// set auth to kerberos
let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "username", auth: .krb5)

Parameters of YARNResourceManager Object

Item Data Type Description
service String the service protocol of web request - http / https
host String the hostname or ip address of the Hadoop YARN Resource Manager
port Int the port of yarn host, default is 8088
auth Authorization .off or .krb5. Default value is .off
proxyUser String proxy user, if applicable
apibase String use this parameter ONLY the target server has a different api routine other than /ws/v1/cluster
timeout Int timeout in seconds, zero means never timeout during transfer

Get General Information

Call checkClusterInfo() to get the general information of a YARN Resource Manager in form of a ClusterInfo structure:

guard let i = try yarn.checkClusterInfo() else {
    print("unable to check cluster info")
    return
}//end guard
print(i.startedOn)
print(i.state)
print(i.hadoopVersion)
print(i.resourceManagerVersion)

Once called, checkClusterInfo() would return a ClusterInfo object, as described below:

ClusterInfo Object

Item Data Type Description
id Int The cluster id
startedOn Int The time the cluster started (in ms since epoch)
state String The ResourceManager state - valid values are: NOTINITED, INITED, STARTED, STOPPED
haState String The ResourceManager HA state - valid values are: INITIALIZING, ACTIVE, STANDBY, STOPPED
resourceManagerVersion String Version of the ResourceManager
resourceManagerBuildVersion String ResourceManager build string with build version, user, and checksum
resourceManagerVersionBuiltOn String Timestamp when ResourceManager was built (in ms since epoch)
hadoopVersion String Version of hadoop common
hadoopBuildVersion String Hadoop common build string with build version, user, and checksum
hadoopVersionBuiltOn String Timestamp when hadoop common was built(in ms since epoch)

Cluster Metrics

Method checkClusterMetrics() returns a detailed info structure of cluster.

guard let m = try yarn.checkClusterMetrics() else {
    // something wrong
}
print(m.availableMB)
print(m.availableVirtualCores)
print(m.allocatedVirtualCores)
print(m.totalMB)

Once called, checkClusterMetrics() would return a ClusterMetrics object as listed below:

ClusterMetrics Object

Item Data Type Description
appsSubmitted Int The number of applications submitted
appsCompleted Int The number of applications completed
appsPending Int The number of applications pending
appsRunning Int The number of applications running
appsFailed Int The number of applications failed
appsKilled Int The number of applications killed
reservedMB Int The amount of memory reserved in MB
availableMB Int The amount of memory available in MB
allocatedMB Int The amount of memory allocated in MB
totalMB Int The amount of total memory in MB
reservedVirtualCores Int The number of reserved virtual cores
availableVirtualCores Int The number of available virtual cores
allocatedVirtualCores Int The number of allocated virtual cores
totalVirtualCores Int The total number of virtual cores
containersAllocated Int The number of containers allocated
containersReserved Int The number of containers reserved
containersPending Int The number of containers pending
totalNodes Int The total number of nodes
activeNodes Int The number of active nodes
lostNodes Int The number of lost nodes
unhealthyNodes Int The number of unhealthy nodes
decommissionedNodes Int The number of nodes decommissioned
rebootedNodes Int The number of nodes rebooted

Cluster Scheduler

Method checkSchedulerInfo() returns a detailed info structure of scheduler.

guard let sch = try yarn.checkSchedulerInfo() else {
    // something wrong, must return
}
print(sch.capacity)
print(sch.maxCapacity)
print(sch.queueName)
print(sch.queues.count)

Once done, checkSchedulerInfo() would return a SchedulerInfo structure as listed below:

SchedulerInfo Object

Item Data Type Description
availNodeCapacity Int The available node capacity
capacity Double Configured queue capacity in percentage relative to its parent queue
maxCapacity Double Max capacity of the queue
maxQueueMemoryCapacity Int Configured maximum queue capacity in percentage relative to its parent queue
minQueueMemoryCapacity Int Minimum queue memory capacity
numContainers Int The number of containers
numNodes Int The total number of nodes
qstate QState State of the queue - valid values are: STOPPED, RUNNING
queueName String Name of the queue
queues [Queue] A collection of queue resources
rootQueue FairQueue A collection of root queue resources
totalNodeCapacity Int The total node capacity
type String Scheduler type - capacityScheduler
usedCapacity Double Used queue capacity in percentage
usedNodeCapacity Int The used node capacity

FairQueue Object

Item Data Type Description
maxApps Int The maximum number of applications the queue can have
minResources ResourcesUsed The configured minimum resources that are guaranteed to the queue
maxResources ResourcesUsed The configured maximum resources that are allowed to the queue
usedResources ResourcesUsed The sum of resources allocated to containers within the queue
fairResources ResourcesUsed The queue’s fair share of resources
clusterResources ResourcesUsed The capacity of the cluster
queueName String The name of the queue
schedulingPolicy String The name of the scheduling policy used by the queue
childQueues FairQueue A collection of sub-queue information. Omitted if the queue has no childQueues.
type String type of the queue - fairSchedulerLeafQueueInfo
numActiveApps Int The number of active applications in this queue
numPendingApps Int The number of pending applications in this queue

Queue Object

Item Data Type Description
absoluteCapacity Double Absolute capacity percentage this queue can use of entire cluster
absoluteMaxCapacity Double Absolute maximum capacity percentage this queue can use of the entire cluster
absoluteUsedCapacity Double Absolute used capacity percentage this queue is using of the entire cluster
capacity Double Configured queue capacity in percentage relative to its parent queue
maxActiveApplications Int The maximum number of active applications this queue can have
maxActiveApplicationsPerUser Int The maximum number of active applications per user this queue can have
maxApplications Int The maximum number of applications this queue can have
maxApplicationsPerUser Int The maximum number of applications per user this queue can have
maxCapacity Double Configured maximum queue capacity in percentage relative to its parent queue
numActiveApplications Int The number of active applications in this queue
numApplications Int The number of applications currently in the queue
numContainers Int The number of containers being used
numPendingApplications Int The number of pending applications in this queue
queueName String The name of the queue
queues [Queue] A collection of sub-queue information. Omitted if the queue has no sub-queues.
resourcesUsed ResourcesUsed The total amount of resources used by this queue
state String The state of the queue
type String type of the queue - capacitySchedulerLeafQueueInfo
usedCapacity Double Used queue capacity in percentage
usedResources String A string describing the current resources used by the queue
userLimit Int The minimum user limit percent set in the configuration
userLimitFactor Double The user limit factor set in the configuration
users [User] A collection of user objects containing resources used, see below:

User Object

Item Data Type Description
username String The username of the user using the resources
resourcesUsed ResourcesUsed The amount of resources used by the user in this queue, see definition below
numActiveApplications Int The number of active applications for this user in this queue
numPendingApplications Int The number of pending applications for this user in this queue

ResourcesUsed Object

Item Data Type Description
memory int Memory required for each container
vCores int Virtual cores required for each container

Cluster Nodes

Check All Nodes

Method checkClusterNodes() returns an array of node info of cluster.

let nodes = try yarn.checkClusterNodes()
nodes.forEach { node in
    print(node.rack)
    print(node.availableVirtualCores)
    print(node.availMemoryMB)
    print(node.healthReport)
    print(node.healthStatus)
    print(node.id)
    print(node.lastHealthUpdate)
    print(node.nodeHostName)
    print(node.nodeHTTPAddress)

Once done, the checkClusterNodes() would return an array of Node object, as described below:

Node Object

Item Data Type Description
rack String The rack location of this node
state String State of the node - valid values are: NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED
id String The node id
nodeHostName String The host name of the node
nodeHTTPAddress String The nodes HTTP address
healthStatus String The health status of the node - Healthy or Unhealthy
healthReport String A detailed health report
lastHealthUpdate Int The last time the node reported its health (in ms since epoch)
usedMemoryMB Int The total amount of memory currently used on the node (in MB)
availMemoryMB Int The total amount of memory currently available on the node (in MB)
usedVirtualCores Int The total number of vCores currently used on the node
availableVirtualCores Int The total number of vCores available on the node
numContainers int The total number of containers currently running on the node

Check A Node

Method checkClusterNode() returns a detailed info structure of a node.

guard let n = try yarn.checkClusterNode(id: "host.domain.com:8041") else {
    // something wrong, must return
}

Applications on Cluster

Check All Applications

Method checkApps() returns an array of APP structure. ``` swift let apps = try yarn.checkApps() // or alternatively, you can filter out those APPs you want by setting query parameters: /// let apps = try yarn.checkApps(states: [APP.State.FINISHED, APP.State.RUNNING], finalStatus: APP.FinalStatus.SUCCEEDED)

apps.forEach{ a in print(a.allocatedMB) print(a.allocatedVCores) print(a.amContainerLogs) print(a.amHostHttpAddress) print(a.amNodeLabelExpression) print(a.amRPCAddress) print(a.applicationPriority) print(a.applicationTags) } ```

Once done, the checkApps() would return an array of APP objects, as described below:

APP Object

Item Data Type Description
id String The application id
user String The user who started the application
name String The application name
applicationType String The application type
queue String The queue the application was submitted to
state String The application state according to the ResourceManager - valid values are members of the YarnApplicationState enum: NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
finalStatus String The final status of the application if finished - reported by the application itself - valid values are: UNDEFINED, SUCCEEDED, FAILED, KILLED
progress Double The progress of the application as a percent
trackingUI String Where the tracking url is currently pointing - History (for history server) or ApplicationMaster
trackingUrl String The web URL that can be used to track the application
diagnostics String Detailed diagnostics information
clusterId Int The cluster id
startedTime Int The time in which application started (in ms since epoch)
finishedTime Int The time in which the application finished (in ms since epoch)
elapsedTime Int The elapsed time since the application started (in ms)
amContainerLogs String The URL of the application master container logs
amHostHttpAddress String The nodes http address of the application master
amRPCAddress String The RPC address of the application master
allocatedMB Int The sum of memory in MB allocated to the application’s running containers
allocatedVCores Int The sum of virtual cores allocated to the application’s running containers
runningContainers Int The number of containers currently running for the application
memorySeconds Int The amount of memory the application has allocated (megabyte-seconds)
vcoreSeconds Int The amount of CPU resources the application has allocated (virtual core-seconds)
unmanagedApplication Bool Is the application unmanaged.
applicationPriority Int priority of the submitted application
appNodeLabelExpression String Node Label expression which is used to identify the nodes on which application’s containers are expected to run by default.
amNodeLabelExpression String Node Label expression which is used to identify the node on which application’s AM container is expected to run.

Check A Specific Application

Method checkApp() returns a specific APP Object.

let a = try yarn.checkApp(id: "application_1484231633049_0025")
print(a.allocatedMB)
print(a.allocatedVCores)
print(a.amContainerLogs)
print(a.amHostHttpAddress)
print(a.amNodeLabelExpression)
print(a.amRPCAddress)
print(a.applicationPriority)
print(a.applicationTags)

The return value is an APP structure, please check the above APP object for detail.

Apply A New Application

Method newApplication() returns a new application handler.

guard let a = try yarn.newApplication() else {
    // cannot create a new application, must return
}

Once completed, the newApplication() method will return a NewApplication object, as described below:

NewApplication Object

Item Data Type Description
id String The newly created application id
maximumResourceCapability ResourcesUsed The maximum resource capabilities available on this cluster

ResourcesUsed Object

The NewApplication object will contain a special object called ResourceUsed, as described below:

Item Data Type Description
memory Int The maximum memory available for a container
vCores Int The maximum number of cores available for a container

Submit Modification to An Application

Method submit() can submit modifications to a specific application.

⚠️Note⚠️ Detail configuration of Yarn MapReduce Application is out of the range of this document, please get more information at Hadoop MapReduce Next Generation - Writing YARN Applications https://wiki.apache.org/hadoop/WritingYarnApps

// create an empty application to fill in the blanks
let sum = SubmitApplication()

// *MUST* set the application id
sum.id =  "application_1484231633049_0025"
// set the application name
sum.name = "test"

// allocate a blank local resource
let local = LocalResource(resource: "hdfs://localhost:9000/user/rockywei/DistributedShell/demo-app/AppMaster.jar", type: .FILE, visibility: .APPLICATION, size: 43004, timestamp: 1405452071209)

// assign the resource into an array
let localResources = Entries([Entry(key:"AppMaster.jar", value: local)])

// *MUST* fill in the field of map reduce command
let commands = Commands("/hdp/bin/hadoop jar /hdp/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha1.jar grep input output 'dfs[a-z.]+'")

// setup environment as need
let environments = Entries([Entry(key:"DISTRIBUTEDSHELLSCRIPTTIMESTAMP", value: "1405459400754"), Entry(key:"CLASSPATH", value:"{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"), Entry(key:"DISTRIBUTEDSHELLSCRIPTLEN", value:6), Entry(key:"DISTRIBUTEDSHELLSCRIPTLOCATION", value: "hdfs://localhost:9000/user/rockywei/demo-app/shellCommands")])

// specify the container info
sum.amContainerSpec = AmContainerSpec(localResources: localResources, environment: environments, commands: commands)

// set other information as need
sum.unmanagedAM = false
sum.maxAppAttempts = 2
sum.resource = ResourceRequest(memory: 1024, vCores: 1)
sum.type = "MapReduce"
sum.keepContainersAcrossApplicationAttempts = false

// set the log context
sum.logAggregationContext = LogAggregationContext(logIncludePattern: "file1", logExcludePattern: "file2", rolledLogIncludePattern: "file3", rolledLogExcludePattern: "file4", logAggregationPolicyClassName: "org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy", logAggregationPolicyParameters: "")

// set the failures validity interval
sum.attemptFailuresValidityInterval = 3600000

// set the reservationId, if possible
sum.reservationId = "reservation_1454114874_1"
sum.amBlackListingRequests = AmBlackListingRequests(amBlackListingEnabled: true, disableFailureThreshold: 0.01)

try yarn.submit(application: sum)

SubmitApplication Object

SubmitApplication class contains quite a few sub types, as described below:

Item Data Type Description
id String The application id
name String The application name
queue String The name of the queue to which the application should be submitted
priority Int The priority of the application
amContainerSpec AmContainerSpec The application master container launch context, described below
unmanagedAM Bool Is the application using an unmanaged application master
maxAppAttempts int The max number of attempts for this application
resource ResourceRequest The resources the application master requires, described below
type String The application type(MapReduce, Pig, Hive, etc)
keepContainersAcrossApplicationAttempts Bool Should YARN keep the containers used by this application instead of destroying them
tags [String] List of application tags, please see the request examples on how to specify the tags
logAggregationContext LogAggregationContext Represents all of the information needed by the NodeManager to handle the logs for this application
attemptFailuresValidityInterval Int The failure number will no take attempt failures which happen out of the validityInterval into failure count
reservationId string Represent the unique id of the corresponding reserved resource allocation in the scheduler
amBlackListingRequests AmBlackListingRequests Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold”

AmContainerSpec Object

Item Data Type Description
localResources Entries Object describing the resources that need to be localized, described below
environment Entries Environment variables for your containers, specified as key value pairs
commands Commands The commands for launching your container, in the order in which they should be executed
serviceData Entries Application specific service data; key is the name of the auxiliary service, value is base-64 encoding of the data you wish to pass
credentials Credentials The credentials required for your application to run, described below
applicationAcls Entries ACLs for your application; the key can be “VIEWAPP” or “MODIFYAPP”, the value is the list of users with the permissions

LocalResource Object

Item Data Type Description
resource String Location of the resource to be localized
type ResourceType Type of the resource; options are “ARCHIVE”, “FILE”, and “PATTERN”
visibility Visibility Visibility the resource to be localized; options are “PUBLIC”, “PRIVATE”, and “APPLICATION”
size Int Size of the resource to be localized
timestamp Int Timestamp of the resource to be localized

Commands Object

Currently Commands object only one string field named command. ⚠️NOTE⚠️ According to Hadoop 3.0 alpha document, commands should be an array of command strings with a sequential meaning, however, the example given in Hadoop 3.0 alpha indicates only one command. This feature is experimental and subject to change in future.

Credentials Object

Item Data Type Description
tokens [String:String] Tokens that you wish to pass to your application, specified as key-value pairs. The key is an identifier for the token and the value is the token(which should be obtained using the respective web-services)
secrets [String:String] Secrets that you wish to use in your application, specified as key-value pairs. They key is an identifier and the value is the base-64 encoding of the secret

ResourceRequest Object

Item Data Type Description
memory int Memory required for each container
vCores int Virtual cores required for each container

Entries Object

The Entries Object has only one field of element: entry, which is an array of Entry object or EncryptedEntry object. Each Entry has two elements: a string key and a Any type value.

The difference between Entry and EncryptedEntry is that the value of EncryptedEntry will be encoded in base64 format before submission, providing the type of value is either String or [UInt8].

AmBlackListingRequests Object

Item Data Type Description
amBlackListingEnabled Bool Whether AM Blacklisting is enabled
disableFailureThreshold Float AM Blacklisting disable failure threshold

LogAggregationContext Object

Item Data Type Description
logIncludePattern String The log files which match the defined include pattern will be uploaded when the application finishes
logExcludePattern String The log files which match the defined exclude pattern will not be uploaded when the application finishes
rolledLogIncludePattern String The log files which match the defined include pattern will be aggregated in a rolling fashion
rolledLogExcludePattern String The log files which match the defined exclude pattern will not be aggregated in a rolling fashion
logAggregationPolicyClassName String The policy which will be used by NodeManager to aggregate the logs
logAggregationPolicyParameters String The parameters passed to the policy class

Application State Control

Method getApplicationStatus() returns the current state of application.

guard let state = try yarn.getApplicationStatus(id:  "application_1484231633049_0025") else {
    // something wrong, must return
}
print(state)

Method setApplicationStatus() can set the current state of application to a designated one.

try yarn.setApplicationStatus(id:  "application_1484231633049_0025", state: .KILLED)

Valid States includes:

NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED

Application Queue Control

Method getApplicationQueue() returns the current queue name of application.

guard let queue = try yarn.getApplicationQueue(id:  "application_1484231633049_0025") else {
    // something wrong, must return
}
print(queue)

Method setApplicationQueue() can set the current queue name of application to a designated one.

try yarn.setApplicationQueue(id:  "application_1484231633049_0025", queue:"a1a")

Application Priority Control

Method getApplicationPriority() returns the current priority of application.

guard let priority = try yarn.getApplicationPriority(id:  "application_1484231633049_0025") else {
    // something wrong, must return
}
print(priority)

Currently the returned priority is an integer such as 0.

Method setApplicationPriority() can set the current priority of application to a designated one.

try yarn.setApplicationPriority(id:  "application_1484231633049_0025", priority: 1)

Application Attempts

Method checkAppAttempts() returns an array of Attempt object of application.

guard let attempts = try yarn.checkAppAttempts(id:  "application_1484231633049_0025") else {
    // something wrong, must return
}
attempts.forEach { attempt in
    print(attempt.containerId)
    print(attempt.id)
    print(attempt.nodeHttpAddress)
    print(attempt.nodeId)
    print(attempt.startTime)
}//next

Once done, checkAppAttempts() would return an array of AppAttempt object, as described below:

AppAttempt Object

Item Data Type Description
id String The app attempt id
nodeId String The node id of the node the attempt ran on
nodeHttpAddress String The node http address of the node the attempt ran on
logsLink String The http link to the app attempt logs
containerId String The id of the container for the app attempt
startTime Int The start time of the attempt (in ms since epoch)

Application Statistics

Method checkAppStatistics() returns an array of statistic variables of application. swift let sta = try yarn.checkAppStatistics(states: [APP.State.FINISHED, APP.State.RUNNING]) sta.forEach{ s in print(s.count) print(s.state) print(s.type) }//next s

checkAppStatistics() allows user to perform a query with two additional parameters:

Parameter Data Type Description
states [APP.State] states of the applications. If states is not provided, the API will enumerate all application states and return the counts of them.
applicationTypes [String] types of the applications. If applicationTypes is not provided, the API will count the applications of any application type. In this case, the response shows * to indicate any application type. Note that we only support at most one applicationType temporarily, such as applicationTypes: ["MapReduce"]