PerfectHadoop: MapReduce Application 归并映射应用程序
该项目实现了Hadoop 归并映射应用程序 MapReduce Application Master REST API 的Swift类封装:
MapReduceApplication(): 获取当前服务器上特定归并映射应用程序状态。
注意事项
⚠️本函数库为试验性模块,可能随时会有更新和调整。 ⚠️
连接到Hadoop映射归并应用程序服务器
为了能够连接到当前活动的Hadoop 映射归并应用程序,请首先用有效的 应用程序代码 初始化一个MapReduceApplication() 对象,并且需要包含必要参数:
// 该连接可能只有基本操作权限 let app = MapReduceApplication(applicationId: "application_12345678_xxxxxx", host: "mapReducer.somedomain.com")
或者您还可以:
// 如果有需要,增加用户名,用于执行更多有权限要求的操作 let app = MapReduceApplication(applicationId: "application_12345678_xxxxxx", host: "mapReducer.somedomain.com", user: "your user name")
用户身份认证
如果服务器需要使用 Kerberos 身份验证,请在代码中加入:
// 将连接设置为KRB5认证模式 let app = MapReduceApplication(applicationId: "application_12345678_xxxxxx", host: "mapReducer.somedomain.com", user: "your user name", auth: .krb5)
MapReduceApplication 映射归并应用程序对象属性
| 变量 | 类型 | 描述 |
|---|---|---|
| applicationId | String | 应用程序编码。 必选项 |
| service | String | 网站请求协议 - http 或 https |
| host | String | 映射归并应用程序所在主机名称或者ip地址。 |
| port | Int | 映射归并应用程序所在主机使用的端口,默认为8088 |
| auth | Authorization | 认证方法,.off(不认证)或者 .krb5(Kerberos V5版本验证方法)。默认是 .off,即不验证。 |
| proxyUser | String | 代理服务器用户名,如果适用的话 |
| apibase | String | 只有 当目标url应用程序界面接口api路径不是 /ws/v1/mapreduce 的时候,请填写该项目。 |
| timeout | Int | 超时(秒)。即访问服务器需要在该时间段内完成请求,否则网络会话过程自动断开并返回。如果设置为零则表示永远不超时(持续等待直到服务器返回) |
查看基本信息
调用 checkInfo()方法可以获得当前映射归并主应用程序的基本运行情况,返回结果是一个 MapReduceApplication.Info 数据结构:
guard let inf = try app.checkInfo() else {
// 出错了
}
print(inf.startedOn)
print(inf.hadoopVersion)
print(inf.hadoopBuildVersion)
print(inf.hadoopVersionBuiltOn)
MapReduceApplication.Info 数据结构
| 变量 | 类型 | 描述 |
|---|---|---|
| appId | Int | 应用程序编码 |
| startedOn | Int | 应用程序启动时间(Unix纪元,毫秒) |
| name | string | 应用程序名称 |
| user | string | 启动该程序的用户名 |
| elapsedTime | long | 自该应用程序启动后持续的时间(毫秒) |
MapReduce Job 映射归并作业
调用 checkJobs() 能够返回该应用程序的一个作业数组。
作业数组指的是目前该应用程序所执行的映射归并已完成的作业清单。返回的清单目前并未包含所有参数。最简单的形式就是直接调用 checkJobs() 而且不用包含任何参数,意味着无条件返回所有可以查询到的所有作业:
let jobs = try app.checkJobs()
jobs.forEach { j in
print(j.id)
print(j.name)
print(j.queue)
print(j.state)
}
如果执行成功,checkJobs() 将返回一系列作业对象,如以下数据结构所述:
作业对象 Job 数据结构
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业编码 |
| name | String | 作业名称 |
| queue | String | 作业所在的队列名称 |
| user | String | 用户名 |
| state | String | 作业状态 - 有效值包括:NEW(新建)、INITED(初始化完成)、RUNNING(正在运行)、SUCCEEDED(执行成功)、FAILED(失败)、KILL_WAIT(关闭/终止等待)、KILLED(已经关闭/终止完成)、ERROR(出错) |
| startTime | Int | 作业启动时间(Unix纪元,毫秒) |
| finishTime | Int | 作业结束时间(Unix纪元,毫秒) |
| elapsedTime | Int | 作业自启动以来的持续时间(毫秒) |
| mapsTotal | Int | 总映射数 |
| mapsCompleted | Int | 已完成的映射总数 |
| reducesTotal | Int | 总归并数 |
| reducesCompleted | Int | 已完成的归并总数 |
| diagnostics | String | 诊断信息 |
| uberized | Bool | 当前作业是否是一个uber类型的作业——完全在应用程序主机上运行 |
| mapsPending | Int | 等待执行的映射数量 |
| mapsRunning | Int | 正在执行的映射数量 |
| reducesPending | Int | 等待执行的归并任务数量 |
| reducesRunning | Int | 正在执行的归并任务数量 |
| failedReduceAttempts | Int | 归并尝试失败的数量 |
| killedReduceAttempts | Int | 已经终止的归并尝试数量 |
| successfulReduceAttempts | Int | 成功完成的归并尝试数量 |
| failedMapAttempts | Int | 映射尝试失败的总数量 |
| killedMapAttempts | Int | 映射尝试被关闭/终止的总数量 |
| successfulMapAttempts | Int | 成功完成的映射尝试数量 |
| acls | [ACL] | 安全访问控制列表(ACL)集合 |
访问控制列表对象 ACL 数据结构
| 变量 | 类型 | 描述 |
|---|---|---|
| name | String | 安全访问列表名称 |
| value | String | 安全访问列表项的值 |
检查特定作业(Job)
可以通过作业代码(Job ID)检查特定作业信息:
let job = try app.checkJob(jobId: "job_1484231633049_0005")
详见 作业(Job)数据结构。
作业尝试 JobAttempt
以下命令能够获得关于某个作业的尝试列表信息:
guard let attempts = try app.checkJobAttempts(jobId: "job_1484231633049_0005") else {
// 出错了
}
attempts.forEach { attempt in
print(attempt.id)
print(attempt.containerId)
print(attempt.nodeHttpAddress)
print(attempt.nodeId)
print(attempt.startTime)
}
JobAttempt 数据结构
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业尝试 (Job Attempt) 编码 |
| nodeId | String | 该尝试执行时所在的节点编号 |
| nodeHttpAddress | String | 该尝试执行时所在的节点地址 |
| logsLink | String | 作业尝试的日志超链接 |
| containerId | String | 该尝试执行时所在的容器编号 |
| startTime | Long | 该作业尝试启动的时间(Unix纪元,单位是毫秒) |
作业指标统计(JobCounter)
使用作业指标函数库,您可以获得关于该作业的所有指标信息。
guard let js = try app.checkJobCounters(jobId: "job_1484231633049_0005") else {
// 出错了
}//end guard
js.counterGroup.forEach{ group in
print(group.counterGroupName)
group.counters.forEach { counter in
print(counter.name)
print(counter.mapCounterValue)
print(counter.reduceCounterValue)
print(counter.totalCounterValue)
}//next counter
}//next group
JobCounter Object
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业编码 |
| counterGroup | [CounterGroup] | 指标清单分组集合 |
CounterGroup 指标分组对象
| 变量 | 类型 | 描述 |
|---|---|---|
| counterGroupName | string | 该组指标的名称 |
| counter | [Counter] | 指标对象构成的数组 |
Counter 指标对象
| 变量 | 类型 | 描述 |
|---|---|---|
| name | String | 指标名称 |
| reduceCounterValue | Int | 归并任务计数 |
| mapCounterValue | Int | 映射任务计数 |
| totalCounterValue | Int | 所有任务总数统计 |
查看作业配置
函数 checkJobConfig() 可以用来检查作业配置情况:
gurard let config = try app.checkJobConfig(jobId: "job_1484231633049_0005") else {
/// 出错了
}
// print the configuration path
print(config.path)
// check properties of configuration file
for p in config.property {
print(p.name)
print(p.value)
print(p.source)
}
JobConfig 作业配置对象
作业配置对象包含了该作业的资源使用信息。
| 变量 | 类型 | 描述 |
|---|---|---|
| path | String | 作业配置文件的路径 |
| property | [Property] | 属性数组 |
Property 属性对象
| 变量 | 类型 | 描述 |
|---|---|---|
| name | String | 配置项名称 |
| value | String | 配置项取值 |
| source | String | 配置对象所在路径。如果不止一个路径,则意味着占用资源的历史,清单末尾是最新的内容。 |
作业子任务(Tasks of A Job)
方法 checkJobTasks() 用于获取特定作业的子任务信息:
// 获得特定作业的所有子任务信息
let tasks = try app.checkJobTasks(jobId: "job_1484231633049_0005")
// print properties of each task
for t in tasks {
print(t.progress)
print(t.elapsedTime)
print(t.state)
print(t.startTime)
print(t.id)
print(t.type)
print(t.successfulAttempt)
print(t.finishTime)
}//next t
checkJobTasks() 函数参数
调用 checkJobTasks() 总是需要一个作业编码 jobId ,但是也可以指定第二个附加参数,即子任务类型用于进一步过滤查询结果:
- jobId: 作业编码
- taskType: 可选的子任务类型,即
.MAP映射任务或者.REDUCE归并任务
JobTask Object
函数checkJobTasks()用于返回一个作业子任务数组 JobTask 对象,包括下列属性:
| 变量 | 类型 | 描述 |
|---|---|---|
| id | string | 作业子任务编码 |
| state | String | 作业子任务状态 - 有效值包括: NEW(新建)、SCHEDULED(已列入计划)、RUNNING(运行)、SUCCEEDED(成功)、FAILED(失败)、KILL_WAIT(等待终止/关闭)、KILLED(已经终止/关闭) |
| type | String | 作业子任务类型 - MAP(映射)或者 REDUCE(归并) |
| successfulAttempt | String | 最后一次成功尝试的作业子任务编码 |
| progress | Double | 作业子任务执行程度,百分比 |
| startTime | Int | 作业子任务启动时间(毫秒,Unix纪元)。如果还没有启动,则值为-1。 |
| finishTime | Int | 作业子任务结束时间(毫秒,Unix纪元)。 |
| elapsedTime | Int | 应用程序自启动至今所耗时间,单位是毫秒 |
查询特定的作业子任务 JobTask
如果已经获取了有效的 jobId 作业编码和 jobTaskId 作业子任务编码,那么可以使用函数 checkJobTask() 检查特定的作业子任务:
guard let task = try app.checkJobTask(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0") else {
// 出错了
}
print(task.progress)
该函数 checkJobTask 返回值为一个 JobTask 作业子任务 对象,如上所述。
作业子任务统计指标
方法 checkJobTaskCounters() 用于返回特定作业子任务的统计指标信息,如以下范例所示:
guard let js = try app.checkJobTaskCounters(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0") else {
// 出错了
}
// 打印作业子任务指标编码
print(js.id)
// 检查每个统计指标编组
js.taskCounterGroup.forEach{ group in
print(group.counterGroupName)
// 打印每个分组中的详细指标信息
group.counters.forEach { counter in
print(counter.name)
print(counter.value)
}
}
JobTaskCounter 作业子任务指标对象
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业编码 |
| taskCounterGroup | [CounterGroup] | 分组对象集合,详见 CounterGroup 作业指标分组 |
Task Attempts 作业子任务尝试
函数 checkJobTaskAttempts() 用于检查特定作业子任务的尝试信息。
let jobTaskAttempts = try app.checkJobTaskAttempts(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0")
for attempt in jobTaskAttempts {
print(attempt.id)
print(attempt.diagnostics)
print(attempt.assignedContainerId)
print(attempt.rack)
print(attempt.state)
print(attempt.progress)
}//next
一旦成功调用 checkJobTaskAttempts() 会返回一个 TaskAttempt 作业子任务尝试对象,如下所述:
TaskAttempt 作业子任务尝试对象
作业子任务尝试对象 TaskAttempt 包括下列属性:
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业子任务编码 |
| rack | String | 所在机架信息 |
| state | TaskAttempt.State | 作业子任务尝试状态——有效值包括: NEW(新建)、UNASSIGNED(未指派)、 ASSIGNED(已指派)、RUNNING(运行)、COMMITPENDING(等待提交)、SUCCESSCONTAINERCLEANUP(成功并准备清理容器)、 SUCCEEDED(成功)、FAILCONTAINERCLEANUP(清理容器失败)、FAILTASKCLEANUP(作业子任务清理失败)、FAILED(失败)、KILLCONTAINERCLEANUP(终止清理容器)、KILLTASK_CLEANUP、(终止作业子任务清理)、KILLED(已终止) |
| type | TaskType | 作业子任务类型 - MAP 映射 或者 REDUCE 归并 |
| assignedContainerId | String | 该作业子任务尝试所在的容器编码 |
| nodeHttpAddress | String | 该作业子任务尝试坐在的http服务器节点地址 |
| diagnostics | String | 诊断信息 |
| progress | Double | 该作业子任务尝试的执行进度,百分比 |
| startTime | Int | 该作业子任务尝试启动时间(毫秒,Unix纪元) |
| finishTime | Int | 该作业子任务尝试结束时间(毫秒,Unix纪元) |
| elapsedTime | Int | 自该作业自任务尝试开始运行时所消耗时间(毫秒) |
另外,对于 reduce (归并)作业子任务尝试来说,还可以包括下列属性:
| 变量 | 类型 | 描述 |
|---|---|---|
| shuffleFinishTime | Int | 重新排序结束时间(毫秒,Unix纪元) |
| mergeFinishTime | Int | 合并工作结束时间(毫秒,Unix纪元) |
| elapsedShuffleTime | Int | 重新排序所耗时间(从归并子任务启动开始后到排序结束所用的时间) |
| elapsedMergeTime | Int | 合并操作所用时间(从排序结束到合并完成总共的毫秒数) |
| elapsedReduceTime | Int | 整个归并阶段所耗时间(从合并结束到整个归并子任务结束的总时间) |
查询特定的 TaskAttempt 作业子任务尝试
如果作业子任务尝试编码有效,则可以执行checkJobTaskAttempt()查询该作业子任务的尝试信息,如下所述:
guard let jobTaskAttempts = try app.checkJobTaskAttempt(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0") else {
// 出错了
}//end guard
print(attempt.id)
print(attempt.diagnostics)
print(attempt.assignedContainerId)
print(attempt.rack)
print(attempt.state)
print(attempt.progress)
函数checkJobTaskAttempt() 调用后会返回一个“TaskAttempt 作业子任务尝试对象” 如上文所述。
Attempt State Control 作业尝试状态控制
目前Hadoop 映射归并主程序具有一个试验性质的尝试状态控制。
检查尝试
如果需要查看尝试状态,请调用 checkJobTaskAttemptState() 方法:
guard let state = try checkJobTaskAttemptState(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0") else {
// 出错了
}//end guard
print(state)
终止尝试
调用 killTaskAttempt() 方法可以终止尝试:
try killTaskAttempt(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0")
作业子任务尝试指标统计
方法 checkJobTaskAttemptCounters() 可以获得关于特定作业子任务尝试的详细指标统计,如下文所述:
guard let counters = try app.checkJobTaskAttemptCounters(jobId: "job_1484231633049_0005", taskId: "task_1326381300833_2_2_m_0", "attempt_1326381300833_2_2_m_0_0") else {
// 出错了
}
// 打印指标计数编码
print(counters.id)
// 遍历所有指标分组
for group in counters.taskAttemptCounterGroup {
// 打印分组名称
print(group.counterGroupName)
// 打印该分组下的所有详细指标
for counter in group.counters {
print(counter.name)
print(counter.value)
}//next counter
}//next group
}
JobTaskAttemptCounters 作业子任务尝试统计指标对象
| 变量 | 类型 | 描述 |
|---|---|---|
| id | String | 作业编号 |
| taskAttemptcounterGroup | [CounterGroup] | 一个统计指标分组对象集合,详见CounterGroup 统计指标分组 |