PerfectHadoop: YARN Resource Manager
This project provides a Swift wrapper of YARN Resource Manager REST API:
YARNResourceManager()
: access to cluster information of YARN, including cluster and its metrics, scheduler, application submit, etc.
Connect to YARN Resource Manager
To connect to your Hadoop YARN Resource Manager by Perfect, initialize a YARNResourceManager()
object with sufficient parameters:
// this connection could possibly do some basic operations let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088)
or connect to Hadoop YARN Node Manager with a valid user name:
// add user name if need let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "your user name")
Authentication
If using Kerberos to authenticate, please try codes below:
// set auth to kerberos let yarn = YARNResourceManager(host: "yarn.somehadoopdomain.com", port: 8088, user: "username", auth: .krb5)
Parameters of YARNResourceManager Object
Item | Data Type | Description |
---|---|---|
service | String | the service protocol of web request - http / https |
host | String | the hostname or ip address of the Hadoop YARN Resource Manager |
port | Int | the port of yarn host, default is 8088 |
auth | Authorization | .off or .krb5. Default value is .off |
proxyUser | String | proxy user, if applicable |
apibase | String | use this parameter ONLY the target server has a different api routine other than /ws/v1/cluster |
timeout | Int | timeout in seconds, zero means never timeout during transfer |
Get General Information
Call checkClusterInfo()
to get the general information of a YARN Resource Manager in form of a ClusterInfo
structure:
guard let i = try yarn.checkClusterInfo() else { print("unable to check cluster info") return }//end guard print(i.startedOn) print(i.state) print(i.hadoopVersion) print(i.resourceManagerVersion)
Once called, checkClusterInfo()
would return a ClusterInfo
object, as described below:
ClusterInfo Object
Item | Data Type | Description |
---|---|---|
id | Int | The cluster id |
startedOn | Int | The time the cluster started (in ms since epoch) |
state | String | The ResourceManager state - valid values are: NOTINITED, INITED, STARTED, STOPPED |
haState | String | The ResourceManager HA state - valid values are: INITIALIZING, ACTIVE, STANDBY, STOPPED |
resourceManagerVersion | String | Version of the ResourceManager |
resourceManagerBuildVersion | String | ResourceManager build string with build version, user, and checksum |
resourceManagerVersionBuiltOn | String | Timestamp when ResourceManager was built (in ms since epoch) |
hadoopVersion | String | Version of hadoop common |
hadoopBuildVersion | String | Hadoop common build string with build version, user, and checksum |
hadoopVersionBuiltOn | String | Timestamp when hadoop common was built(in ms since epoch) |
Cluster Metrics
Method checkClusterMetrics()
returns a detailed info structure of cluster.
guard let m = try yarn.checkClusterMetrics() else { // something wrong } print(m.availableMB) print(m.availableVirtualCores) print(m.allocatedVirtualCores) print(m.totalMB)
Once called, checkClusterMetrics()
would return a ClusterMetrics object as listed below:
ClusterMetrics Object
Item | Data Type | Description |
---|---|---|
appsSubmitted | Int | The number of applications submitted |
appsCompleted | Int | The number of applications completed |
appsPending | Int | The number of applications pending |
appsRunning | Int | The number of applications running |
appsFailed | Int | The number of applications failed |
appsKilled | Int | The number of applications killed |
reservedMB | Int | The amount of memory reserved in MB |
availableMB | Int | The amount of memory available in MB |
allocatedMB | Int | The amount of memory allocated in MB |
totalMB | Int | The amount of total memory in MB |
reservedVirtualCores | Int | The number of reserved virtual cores |
availableVirtualCores | Int | The number of available virtual cores |
allocatedVirtualCores | Int | The number of allocated virtual cores |
totalVirtualCores | Int | The total number of virtual cores |
containersAllocated | Int | The number of containers allocated |
containersReserved | Int | The number of containers reserved |
containersPending | Int | The number of containers pending |
totalNodes | Int | The total number of nodes |
activeNodes | Int | The number of active nodes |
lostNodes | Int | The number of lost nodes |
unhealthyNodes | Int | The number of unhealthy nodes |
decommissionedNodes | Int | The number of nodes decommissioned |
rebootedNodes | Int | The number of nodes rebooted |
Cluster Scheduler
Method checkSchedulerInfo()
returns a detailed info structure of scheduler.
guard let sch = try yarn.checkSchedulerInfo() else { // something wrong, must return } print(sch.capacity) print(sch.maxCapacity) print(sch.queueName) print(sch.queues.count)
Once done, checkSchedulerInfo()
would return a SchedulerInfo
structure as listed below:
SchedulerInfo Object
Item | Data Type | Description |
---|---|---|
availNodeCapacity | Int | The available node capacity |
capacity | Double | Configured queue capacity in percentage relative to its parent queue |
maxCapacity | Double | Max capacity of the queue |
maxQueueMemoryCapacity | Int | Configured maximum queue capacity in percentage relative to its parent queue |
minQueueMemoryCapacity | Int | Minimum queue memory capacity |
numContainers | Int | The number of containers |
numNodes | Int | The total number of nodes |
qstate | QState | State of the queue - valid values are: STOPPED, RUNNING |
queueName | String | Name of the queue |
queues | [Queue] | A collection of queue resources |
rootQueue | FairQueue | A collection of root queue resources |
totalNodeCapacity | Int | The total node capacity |
type | String | Scheduler type - capacityScheduler |
usedCapacity | Double | Used queue capacity in percentage |
usedNodeCapacity | Int | The used node capacity |
FairQueue Object
Item | Data Type | Description |
---|---|---|
maxApps | Int | The maximum number of applications the queue can have |
minResources | ResourcesUsed | The configured minimum resources that are guaranteed to the queue |
maxResources | ResourcesUsed | The configured maximum resources that are allowed to the queue |
usedResources | ResourcesUsed | The sum of resources allocated to containers within the queue |
fairResources | ResourcesUsed | The queue’s fair share of resources |
clusterResources | ResourcesUsed | The capacity of the cluster |
queueName | String | The name of the queue |
schedulingPolicy | String | The name of the scheduling policy used by the queue |
childQueues | FairQueue | A collection of sub-queue information. Omitted if the queue has no childQueues. |
type | String | type of the queue - fairSchedulerLeafQueueInfo |
numActiveApps | Int | The number of active applications in this queue |
numPendingApps | Int | The number of pending applications in this queue |
Queue Object
Item | Data Type | Description |
---|---|---|
absoluteCapacity | Double | Absolute capacity percentage this queue can use of entire cluster |
absoluteMaxCapacity | Double | Absolute maximum capacity percentage this queue can use of the entire cluster |
absoluteUsedCapacity | Double | Absolute used capacity percentage this queue is using of the entire cluster |
capacity | Double | Configured queue capacity in percentage relative to its parent queue |
maxActiveApplications | Int | The maximum number of active applications this queue can have |
maxActiveApplicationsPerUser | Int | The maximum number of active applications per user this queue can have |
maxApplications | Int | The maximum number of applications this queue can have |
maxApplicationsPerUser | Int | The maximum number of applications per user this queue can have |
maxCapacity | Double | Configured maximum queue capacity in percentage relative to its parent queue |
numActiveApplications | Int | The number of active applications in this queue |
numApplications | Int | The number of applications currently in the queue |
numContainers | Int | The number of containers being used |
numPendingApplications | Int | The number of pending applications in this queue |
queueName | String | The name of the queue |
queues | [Queue] | A collection of sub-queue information. Omitted if the queue has no sub-queues. |
resourcesUsed | ResourcesUsed | The total amount of resources used by this queue |
state | String | The state of the queue |
type | String | type of the queue - capacitySchedulerLeafQueueInfo |
usedCapacity | Double | Used queue capacity in percentage |
usedResources | String | A string describing the current resources used by the queue |
userLimit | Int | The minimum user limit percent set in the configuration |
userLimitFactor | Double | The user limit factor set in the configuration |
users | [User] | A collection of user objects containing resources used, see below: |
User Object
Item | Data Type | Description |
---|---|---|
username | String | The username of the user using the resources |
resourcesUsed | ResourcesUsed | The amount of resources used by the user in this queue, see definition below |
numActiveApplications | Int | The number of active applications for this user in this queue |
numPendingApplications | Int | The number of pending applications for this user in this queue |
ResourcesUsed Object
Item | Data Type | Description |
---|---|---|
memory | int | Memory required for each container |
vCores | int | Virtual cores required for each container |
Cluster Nodes
Check All Nodes
Method checkClusterNodes()
returns an array of node info of cluster.
let nodes = try yarn.checkClusterNodes() nodes.forEach { node in print(node.rack) print(node.availableVirtualCores) print(node.availMemoryMB) print(node.healthReport) print(node.healthStatus) print(node.id) print(node.lastHealthUpdate) print(node.nodeHostName) print(node.nodeHTTPAddress)
Once done, the checkClusterNodes()
would return an array of Node
object, as described below:
Node Object
Item | Data Type | Description |
---|---|---|
rack | String | The rack location of this node |
state | String | State of the node - valid values are: NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED |
id | String | The node id |
nodeHostName | String | The host name of the node |
nodeHTTPAddress | String | The nodes HTTP address |
healthStatus | String | The health status of the node - Healthy or Unhealthy |
healthReport | String | A detailed health report |
lastHealthUpdate | Int | The last time the node reported its health (in ms since epoch) |
usedMemoryMB | Int | The total amount of memory currently used on the node (in MB) |
availMemoryMB | Int | The total amount of memory currently available on the node (in MB) |
usedVirtualCores | Int | The total number of vCores currently used on the node |
availableVirtualCores | Int | The total number of vCores available on the node |
numContainers | int | The total number of containers currently running on the node |
Check A Node
Method checkClusterNode()
returns a detailed info structure of a node.
guard let n = try yarn.checkClusterNode(id: "host.domain.com:8041") else { // something wrong, must return }
Applications on Cluster
Check All Applications
Method checkApps()
returns an array of APP structure.
``` swift
let apps = try yarn.checkApps()
// or alternatively, you can filter out those APPs you want by setting query parameters:
/// let apps = try yarn.checkApps(states: [APP.State.FINISHED, APP.State.RUNNING], finalStatus: APP.FinalStatus.SUCCEEDED)
apps.forEach{ a in print(a.allocatedMB) print(a.allocatedVCores) print(a.amContainerLogs) print(a.amHostHttpAddress) print(a.amNodeLabelExpression) print(a.amRPCAddress) print(a.applicationPriority) print(a.applicationTags) } ```
Once done, the checkApps()
would return an array of APP objects, as described below:
APP Object
Item | Data Type | Description |
---|---|---|
id | String | The application id |
user | String | The user who started the application |
name | String | The application name |
applicationType | String | The application type |
queue | String | The queue the application was submitted to |
state | String | The application state according to the ResourceManager - valid values are members of the YarnApplicationState enum: NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED |
finalStatus | String | The final status of the application if finished - reported by the application itself - valid values are: UNDEFINED, SUCCEEDED, FAILED, KILLED |
progress | Double | The progress of the application as a percent |
trackingUI | String | Where the tracking url is currently pointing - History (for history server) or ApplicationMaster |
trackingUrl | String | The web URL that can be used to track the application |
diagnostics | String | Detailed diagnostics information |
clusterId | Int | The cluster id |
startedTime | Int | The time in which application started (in ms since epoch) |
finishedTime | Int | The time in which the application finished (in ms since epoch) |
elapsedTime | Int | The elapsed time since the application started (in ms) |
amContainerLogs | String | The URL of the application master container logs |
amHostHttpAddress | String | The nodes http address of the application master |
amRPCAddress | String | The RPC address of the application master |
allocatedMB | Int | The sum of memory in MB allocated to the application’s running containers |
allocatedVCores | Int | The sum of virtual cores allocated to the application’s running containers |
runningContainers | Int | The number of containers currently running for the application |
memorySeconds | Int | The amount of memory the application has allocated (megabyte-seconds) |
vcoreSeconds | Int | The amount of CPU resources the application has allocated (virtual core-seconds) |
unmanagedApplication | Bool | Is the application unmanaged. |
applicationPriority | Int | priority of the submitted application |
appNodeLabelExpression | String | Node Label expression which is used to identify the nodes on which application’s containers are expected to run by default. |
amNodeLabelExpression | String | Node Label expression which is used to identify the node on which application’s AM container is expected to run. |
Check A Specific Application
Method checkApp()
returns a specific APP Object.
let a = try yarn.checkApp(id: "application_1484231633049_0025") print(a.allocatedMB) print(a.allocatedVCores) print(a.amContainerLogs) print(a.amHostHttpAddress) print(a.amNodeLabelExpression) print(a.amRPCAddress) print(a.applicationPriority) print(a.applicationTags)
The return value is an APP structure, please check the above APP object for detail.
Apply A New Application
Method newApplication()
returns a new application handler.
guard let a = try yarn.newApplication() else { // cannot create a new application, must return }
Once completed, the newApplication()
method will return a NewApplication object, as described below:
NewApplication Object
Item | Data Type | Description |
---|---|---|
id | String | The newly created application id |
maximumResourceCapability | ResourcesUsed | The maximum resource capabilities available on this cluster |
ResourcesUsed Object
The NewApplication
object will contain a special object called ResourceUsed
, as described below:
Item | Data Type | Description |
---|---|---|
memory | Int | The maximum memory available for a container |
vCores | Int | The maximum number of cores available for a container |
Submit Modification to An Application
Method submit()
can submit modifications to a specific application.
⚠️Note⚠️ Detail configuration of Yarn MapReduce Application is out of the range of this document, please get more information at Hadoop MapReduce Next Generation - Writing YARN Applications https://wiki.apache.org/hadoop/WritingYarnApps
// create an empty application to fill in the blanks let sum = SubmitApplication() // *MUST* set the application id sum.id = "application_1484231633049_0025" // set the application name sum.name = "test" // allocate a blank local resource let local = LocalResource(resource: "hdfs://localhost:9000/user/rockywei/DistributedShell/demo-app/AppMaster.jar", type: .FILE, visibility: .APPLICATION, size: 43004, timestamp: 1405452071209) // assign the resource into an array let localResources = Entries([Entry(key:"AppMaster.jar", value: local)]) // *MUST* fill in the field of map reduce command let commands = Commands("/hdp/bin/hadoop jar /hdp/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha1.jar grep input output 'dfs[a-z.]+'") // setup environment as need let environments = Entries([Entry(key:"DISTRIBUTEDSHELLSCRIPTTIMESTAMP", value: "1405459400754"), Entry(key:"CLASSPATH", value:"{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"), Entry(key:"DISTRIBUTEDSHELLSCRIPTLEN", value:6), Entry(key:"DISTRIBUTEDSHELLSCRIPTLOCATION", value: "hdfs://localhost:9000/user/rockywei/demo-app/shellCommands")]) // specify the container info sum.amContainerSpec = AmContainerSpec(localResources: localResources, environment: environments, commands: commands) // set other information as need sum.unmanagedAM = false sum.maxAppAttempts = 2 sum.resource = ResourceRequest(memory: 1024, vCores: 1) sum.type = "MapReduce" sum.keepContainersAcrossApplicationAttempts = false // set the log context sum.logAggregationContext = LogAggregationContext(logIncludePattern: "file1", logExcludePattern: "file2", rolledLogIncludePattern: "file3", rolledLogExcludePattern: "file4", logAggregationPolicyClassName: "org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy", logAggregationPolicyParameters: "") // set the failures validity interval sum.attemptFailuresValidityInterval = 3600000 // set the reservationId, if possible sum.reservationId = "reservation_1454114874_1" sum.amBlackListingRequests = AmBlackListingRequests(amBlackListingEnabled: true, disableFailureThreshold: 0.01) try yarn.submit(application: sum)
SubmitApplication Object
SubmitApplication
class contains quite a few sub types, as described below:
Item | Data Type | Description |
---|---|---|
id | String | The application id |
name | String | The application name |
queue | String | The name of the queue to which the application should be submitted |
priority | Int | The priority of the application |
amContainerSpec | AmContainerSpec | The application master container launch context, described below |
unmanagedAM | Bool | Is the application using an unmanaged application master |
maxAppAttempts | int | The max number of attempts for this application |
resource | ResourceRequest | The resources the application master requires, described below |
type | String | The application type(MapReduce, Pig, Hive, etc) |
keepContainersAcrossApplicationAttempts | Bool | Should YARN keep the containers used by this application instead of destroying them |
tags | [String] | List of application tags, please see the request examples on how to specify the tags |
logAggregationContext | LogAggregationContext | Represents all of the information needed by the NodeManager to handle the logs for this application |
attemptFailuresValidityInterval | Int | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservationId | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
amBlackListingRequests | AmBlackListingRequests | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |
AmContainerSpec Object
Item | Data Type | Description |
---|---|---|
localResources | Entries | Object describing the resources that need to be localized, described below |
environment | Entries | Environment variables for your containers, specified as key value pairs |
commands | Commands | The commands for launching your container, in the order in which they should be executed |
serviceData | Entries | Application specific service data; key is the name of the auxiliary service, value is base-64 encoding of the data you wish to pass |
credentials | Credentials | The credentials required for your application to run, described below |
applicationAcls | Entries | ACLs for your application; the key can be “VIEWAPP” or “MODIFYAPP”, the value is the list of users with the permissions |
LocalResource Object
Item | Data Type | Description |
---|---|---|
resource | String | Location of the resource to be localized |
type | ResourceType | Type of the resource; options are “ARCHIVE”, “FILE”, and “PATTERN” |
visibility | Visibility | Visibility the resource to be localized; options are “PUBLIC”, “PRIVATE”, and “APPLICATION” |
size | Int | Size of the resource to be localized |
timestamp | Int | Timestamp of the resource to be localized |
Commands Object
Currently Commands object only one string field named command
.
⚠️NOTE⚠️ According to Hadoop 3.0 alpha document, commands should be an array of command strings with a sequential meaning, however, the example given in Hadoop 3.0 alpha indicates only one command. This feature is experimental and subject to change in future.
Credentials Object
Item | Data Type | Description |
---|---|---|
tokens | [String:String] | Tokens that you wish to pass to your application, specified as key-value pairs. The key is an identifier for the token and the value is the token(which should be obtained using the respective web-services) |
secrets | [String:String] | Secrets that you wish to use in your application, specified as key-value pairs. They key is an identifier and the value is the base-64 encoding of the secret |
ResourceRequest Object
Item | Data Type | Description |
---|---|---|
memory | int | Memory required for each container |
vCores | int | Virtual cores required for each container |
Entries Object
The Entries
Object has only one field of element: entry
, which is an array of Entry
object or EncryptedEntry
object. Each Entry
has two elements: a string key
and a Any
type value
.
The difference between Entry
and EncryptedEntry
is that the value of EncryptedEntry
will be encoded in base64 format before submission, providing the type of value is either String
or [UInt8]
.
AmBlackListingRequests Object
Item | Data Type | Description |
---|---|---|
amBlackListingEnabled | Bool | Whether AM Blacklisting is enabled |
disableFailureThreshold | Float | AM Blacklisting disable failure threshold |
LogAggregationContext Object
Item | Data Type | Description |
---|---|---|
logIncludePattern | String | The log files which match the defined include pattern will be uploaded when the application finishes |
logExcludePattern | String | The log files which match the defined exclude pattern will not be uploaded when the application finishes |
rolledLogIncludePattern | String | The log files which match the defined include pattern will be aggregated in a rolling fashion |
rolledLogExcludePattern | String | The log files which match the defined exclude pattern will not be aggregated in a rolling fashion |
logAggregationPolicyClassName | String | The policy which will be used by NodeManager to aggregate the logs |
logAggregationPolicyParameters | String | The parameters passed to the policy class |
Application State Control
Method getApplicationStatus()
returns the current state of application.
guard let state = try yarn.getApplicationStatus(id: "application_1484231633049_0025") else { // something wrong, must return } print(state)
Method setApplicationStatus()
can set the current state of application to a designated one.
try yarn.setApplicationStatus(id: "application_1484231633049_0025", state: .KILLED)
Valid States includes:
NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
Application Queue Control
Method getApplicationQueue()
returns the current queue name of application.
guard let queue = try yarn.getApplicationQueue(id: "application_1484231633049_0025") else { // something wrong, must return } print(queue)
Method setApplicationQueue()
can set the current queue name of application to a designated one.
try yarn.setApplicationQueue(id: "application_1484231633049_0025", queue:"a1a")
Application Priority Control
Method getApplicationPriority()
returns the current priority of application.
guard let priority = try yarn.getApplicationPriority(id: "application_1484231633049_0025") else { // something wrong, must return } print(priority)
Currently the returned priority is an integer such as 0
.
Method setApplicationPriority()
can set the current priority of application to a designated one.
try yarn.setApplicationPriority(id: "application_1484231633049_0025", priority: 1)
Application Attempts
Method checkAppAttempts()
returns an array of Attempt object of application.
guard let attempts = try yarn.checkAppAttempts(id: "application_1484231633049_0025") else { // something wrong, must return } attempts.forEach { attempt in print(attempt.containerId) print(attempt.id) print(attempt.nodeHttpAddress) print(attempt.nodeId) print(attempt.startTime) }//next
Once done, checkAppAttempts()
would return an array of AppAttempt object, as described below:
AppAttempt Object
Item | Data Type | Description |
---|---|---|
id | String | The app attempt id |
nodeId | String | The node id of the node the attempt ran on |
nodeHttpAddress | String | The node http address of the node the attempt ran on |
logsLink | String | The http link to the app attempt logs |
containerId | String | The id of the container for the app attempt |
startTime | Int | The start time of the attempt (in ms since epoch) |
Application Statistics
Method checkAppStatistics()
returns an array of statistic variables of application.
swift
let sta = try yarn.checkAppStatistics(states: [APP.State.FINISHED, APP.State.RUNNING])
sta.forEach{ s in
print(s.count)
print(s.state)
print(s.type)
}//next s
checkAppStatistics()
allows user to perform a query with two additional parameters:
Parameter | Data Type | Description |
---|---|---|
states | [APP.State] | states of the applications. If states is not provided, the API will enumerate all application states and return the counts of them. |
applicationTypes | [String] | types of the applications. If applicationTypes is not provided, the API will count the applications of any application type. In this case, the response shows * to indicate any application type. Note that we only support at most one applicationType temporarily, such as applicationTypes: ["MapReduce"] |