如何基于S3搭建hadoop大数据集群,并整合Hive、Spark

写作本文的目的,一个是为了积累经验,一个是为了给同样需要基于S3搭建大数据平台的人提供一个参考。

本文不会带你手把手一步步操作,只会阐述核心的配置文件、组件的交互等等问题。

组件版本信息

java8

scala 2.12.15

hadoop-3.3.1

hive-3.1.2

spark-3.2.1

kyuubi-1.5.

es 7.5

zk 3.5.7

kafka 2.0.1

clickhouse 22..5

trino 414

Greenplum 6.18

hbase 2.3.7

Hadoop

Hadoop集群的文件系统往往运行在HDFS上,即Hadoop DFS,如果基于S3搭建,注意,首先NameNode和DataNode将不会启动,其次hive上创建的表都是外部表。

Hadoop核心组件

在搭建之前,有一些核心组件需要做一些了解。

  1. Hadoop分布式文件系统(HDFS):

    • 功能: HDFS是Hadoop的存储系统,设计用来存储大量数据,并提供高吞吐量的数据访问。它通过在多个服务器上分布存储数据来实现容错和高可用性。
    • 主要组件:
      • NameNode: 管理文件系统的元数据,如文件和目录的结构、文件的属性等。它不存储实际数据,而是维护文件系统树和整个文件系统的元数据。
      • DataNode: 在HDFS中存储实际数据。每个DataNode负责管理它所在节点上的存储。客户端直接与DataNode交互来读写数据,而NameNode则负责元数据操作。
  2. Yet Another Resource Negotiator (YARN):

    • 功能: YARN是Hadoop的资源管理和作业调度组件。它允许多个数据处理引擎(如MapReduce和Spark)有效地共享集群资源。
    • 主要组件:
      • ResourceManager: 负责整个系统的资源管理和作业调度。它包含两个主要组件:调度器(负责资源分配)和应用程序管理器(负责生命周期管理)。
      • NodeManager: 运行在集群的每个节点上,负责监控其资源(如CPU、内存)的使用情况,并向ResourceManager报告。
  3. MapReduce:

    • 功能: MapReduce是Hadoop的数据处理模型和执行环境。它将应用分为两个阶段:Map阶段和Reduce阶段,从而在分布式环境中高效地处理大数据集。
    • 主要组件:
      • JobTracker: 在早期版本的Hadoop中,JobTracker负责作业调度和资源分配。在YARN出现后,这些功能由YARN的ResourceManager接管。
      • TaskTracker: 运行在每个节点上,执行由JobTracker分配的任务。在YARN中,这些功能由NodeManager和应用程序的特定容器来处理。
  4. Hadoop Common:

    • 功能: 提供其他Hadoop模块所需的公共工具和库。这些工具和库支持Hadoop的其他组件,提供文件系统和OS级别的抽象,以及序列化、Java RPC(远程过程调用)和持久化的工具。

因此,实际在机器上运行jps,只会看见每个节点上的NodeManager和一台节点上的ResourceManager。

Hadoop核心配置

core-site.xml

在以下配置中,设置s3的地址,s3的secret以及key,s3的一些其他配置信息。

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- 配置文件的许可和说明 -->

<configuration>
    <!-- Hadoop文件系统的默认文件系统设置为S3 -->
    <property>
        <name>fs.defaultFS</name>
        <value>s3a://bucketname</value>
    </property>
    
    <!-- S3的访问密钥 -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>xxx</value>
    </property>

    <!-- S3服务的终端节点 -->
    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://10.24.xx.xx:80</value>
    </property>

    <!-- 降级同步异常 -->
    <property>
        <name>fs.s3a.downgrade.syncable.exceptions</name>
        <value>true</value>
    </property>

    <!-- S3的秘密密钥 -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>xxxxx</value>
    </property>

    <!-- 是否启用S3连接的SSL加密 -->
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
    </property>

    <!-- 启用S3的路径风格访问 -->
    <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
    </property>

    <!-- S3的文件系统实现类 -->
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- 允许的代理用户(root)的主机 -->
    <property>
        <name>hadoop.proxyuser.root.hosts</name>
        <value>*</value>
    </property>

    <!-- 允许的代理用户(root)的组 -->
    <property>
        <name>hadoop.proxyuser.root.groups</name>
        <value>*</value>
    </property>
</configuration>

yarn-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- ResourceManager的主机名 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop102</value>
    </property>

    <!-- ResourceManager的状态存储类,用于恢复和持久化ResourceManager的状态 -->
    <property>
        <name>yarn.resourcemanager.store.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
    </property>

    <!-- 是否启用日志聚集功能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>

    <!-- NodeManager辅助服务,这里配置了MapReduce的shuffle服务 -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

    <!-- MapReduce shuffle服务的类路径 -->
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>

    <!-- MapReduce Map任务输出的压缩编解码器 -->
    <property>
        <name>mapred.map.output.compress.codec</name>
        <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>

    <!-- YARN的调度器类,这里使用的是容量调度器 -->
    <property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    </property>

    <!-- 是否压缩MapReduce Map任务的输出 -->
    <property>
        <name>mapreduce.map.output.compress</name>
        <value>true</value>
    </property>

    <!-- 是否启用ResourceManager的恢复功能 -->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
    </property>

    <!-- 是否启用NodeManager的虚拟内存检查 -->
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>

    <!-- 设置ResourceManager绑定的主机地址 -->
    <property>
        <name>yarn.resourcemanager.bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置NodeManager绑定的主机地址 -->
    <property>
        <name>yarn.nodemanager.bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置时间线服务绑定的主机地址 -->
    <property>
        <name>yarn.timeline-service.bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置YARN应用程序的类路径 -->
    <property>
        <name>yarn.application.classpath</name>
        <value>
            /opt/module/hadoop-3.3.1/etc/hadoop,
            /opt/module/hadoop-3.3.1/share/hadoop/common/*,
            /opt/module/hadoop-3.3.1/share/hadoop/common/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/hdfs/*,
            /opt/module/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/mapreduce/*,
            /opt/module/hadoop-3.3.1/share/hadoop/mapreduce/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/yarn/*,
            /opt/module/hadoop-3.3.1/share/hadoop/yarn/lib/*
        </value>
    </property>

    <!-- 开启日志聚集功能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>

    <!-- 设置日志聚集服务器的URL -->
    <property>
        <name>yarn.log.server.url</name>
        <value>http://hadoop102:19888/jobhistory/logs</value>
    </property>

    <!-- 设置日志保留时间为7天 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property

hdfs-site.xml

可以不用设置hdfs,只需要在core-site中设置s3的信息即可。

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- 控制DataNode注册时是否检查其IP地址和主机名 -->
    <property>
        <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
        <value>false</value>
    </property>

    <!-- 设置是否启用HDFS的权限检查 -->
    <property>
        <name>dfs.permissions.enabled</name>
        <value>false</value>
    </property>

    <!-- 设置NameNode的RPC绑定地址 -->
    <property>
        <name>dfs.namenode.rpc-bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置NameNode的服务RPC绑定地址 -->
    <property>
        <name>dfs.namenode.servicerpc-bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置NameNode的HTTP服务绑定地址 -->
    <property>
        <name>dfs.namenode.http-bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 设置NameNode的HTTPS服务绑定地址 -->
    <property>
        <name>dfs.namenode.https-bind-host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 客户端是否使用DataNode的主机名来进行通信 -->
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>false</value>
    </property>

    <!-- DataNode是否使用自己的主机名进行通信 -->
    <property>
        <name>dfs.datanode.use.datanode.hostname</name>
        <value>false</value>
    </property>
</configuration>

capacity-scheduler.xml

yarn的容量调度器。注意这个很重要,yarn的任务跑在各自的队列中,每个队列有它的资源和任务分配,最好不要使用默认的default队列,有时候在启动kyuubi的时候本身会占用一部分对立资源。

<configuration>
    <!-- 最大应用程序数量,包括挂起和运行中的应用 -->
    <property>
        <name>yarn.scheduler.capacity.maximum-applications</name>
        <value>10000</value>
    </property>

    <!-- 用于运行应用主节点(AM)的最大资源比例 -->
    <property>
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>0.5</value>
    </property>

    <!-- 用于比较资源的资源计算器实现 -->
    <property>
        <name>yarn.scheduler.capacity.resource-calculator</name>
        <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    </property>

    <!-- 根队列下的子队列 -->
    <property>
        <name>yarn.scheduler.capacity.root.queues</name>
        <value>default,wa_wxqb_nsfocus</value>
    </property>

    <!-- 默认队列的目标容量 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.capacity</name>
        <value>40</value>
    </property>

    <!-- 默认队列的用户限制因子 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
        <value>1.0</value>
    </property>

    <!-- 默认队列的最大容量 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
        <value>40</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的资源额定容量 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.capacity</name>
        <value>60</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的用户资源限制因子 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.user-limit-factor</name>
        <value>1</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的资源最大容量 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.maximum-capacity</name>
        <value>80</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的状态 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.state</name>
        <value>RUNNING</value>
    </property>

    <!-- wa_wxqb_nsfocus队列提交作业的访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.acl_submit_applications</name>
        <value>*</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的管理员访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.acl_administer_queue</name>
        <value>*</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的应用最大优先级访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.acl_application_max_priority</name>
        <value>*</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的应用最大生命周期 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.maximum-applicationlifetime</name>
        <value>-1</value>
    </property>

    <!-- wa_wxqb_nsfocus队列的默认应用生命周期 -->
    <property>
        <name>yarn.scheduler.capacity.root.wa_wxqb_nsfocus.default-applicationlifetime</name>
        <value>-1</value>
    </property>

    <!-- 默认队列的状态 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.state</name>
        <value>RUNNING</value>
    </property>

    <!-- 默认队列提交作业的访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
        <value>*</value>
    </property>

    <!-- 默认队列的管理员访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
        <value>*</value>
    </property>

    <!-- 默认队列的应用最大优先级访问控制列表 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name>
        <value>*</value>
    </property>

    <!-- 默认队列的应用最大生命周期 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.maximum-application-lifetime</name>
        <value>-1</value>
    </property>

    <!-- 默认队列的默认应用生命周期 -->
    <property>
        <name>yarn.scheduler.capacity.root.default.default-application-lifetime</name>
        <value>-1</value>
    </property>

    <!-- 节点局部性延迟设置 -->
    <property>
        <name>yarn.scheduler.capacity.node-locality-delay</name>
        <value>40</value>
    </property>

    <!-- 机架局部性额外延迟 -->
    <property>
        <name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
        <value>-1</value>
    </property>

    <!-- 队列映射设置 -->
    <property>
        <name>yarn.scheduler.capacity.queue-mappings</name>
        <value></value>
    </property>

    <!-- 是否允许队列映射覆盖用户指定的队列 -->
    <property>
        <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
        <value>false</value>
    </property>

    <!-- 单个节点心跳中允许的OFF_SWITCH分配数量 -->
    <property>
        <name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name>
        <value>1</value>
    </property>

    <!-- 应用失败快速响应设置 -->
    <property>
        <name>yarn.scheduler.capacity.application.fail-fast</name>
        <value>false</value>
    </property>

    <!-- 工作流优先级映射 -->
    <property>
        <name>yarn.scheduler.capacity.workflow-priority-mappings</name>
        <value></value>
    </property>

    <!-- 是否允许工作流优先级映射覆盖用户指定的优先级 -->
    <property>
        <name>yarn.scheduler.capacity.workflow-priority-mappings-override.enable</name>
        <value>false</value>
    </property>
</configuration>

mapred-site.xml

<configuration>
    <!-- 设置MapReduce框架的名称为YARN -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

    <!-- 设置MapReduce应用主节点(AM)的环境变量 -->
    <property>
        <name>yarn.app.mapreduce.am.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.3.1</value>
    </property>

    <!-- 设置Map任务的环境变量 -->
    <property>
        <name>mapreduce.map.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.3.1</value>
    </property>

    <!-- 设置Reduce任务的环境变量 -->
    <property>
        <name>mapreduce.reduce.env</name>
        <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.3.1</value>
    </property>

    <!-- 配置作业历史服务器的地址 -->
    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>hadoop102:10020</value>
    </property>

    <!-- 配置作业历史服务器Web界面的地址 -->
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>hadoop102:19888</value>
    </property>

    <!-- 设置YARN应用程序的类路径 -->
    <property>
        <name>yarn.application.classpath</name>
        <value>
            /opt/module/hadoop-3.3.1/etc/hadoop,
            /opt/module/hadoop-3.3.1/share/hadoop/common/*,
            /opt/module/hadoop-3.3.1/share/hadoop/common/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/hdfs/*,
            /opt/module/hadoop-3.3.1/share/hadoop/hdfs/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/mapreduce/*,
            /opt/module/hadoop-3.3.1/share/hadoop/mapreduce/lib/*,
            /opt/module/hadoop-3.3.1/share/hadoop/yarn/*,
            /opt/module/hadoop-3.3.1/share/hadoop/yarn/lib/*
        </value>
    </property>
</configuration>

share

在share目录下,将tools下的aws包,拷贝进common。

cp share/hadoop/tools/lib/*aws* share/hadoop/common/lib/

解决Resource changed on src filesystem问题。

在使用s3作为存储的时候,其他任何问题都可以想办法解决,唯独这个问题,你要下载、修改,重新编译hadoop-yarn-common包的源代码。

错误提示是这样的:

java.io.IOException: Resource s3a://yarn/user/root/DistributedShell/application_1641533299713_0002/ExecScript.sh changed on src filesystem (expected 1641534006000, was 1641534011000
    at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:273)
    at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
    at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这个错误的报错位置在 hadoop-yarn-common 包下的 org.apache.hadoop.yarn.util.FSDownload 中:

private void verifyAndCopy(Path destination)
    throws IOException, YarnException {
  final Path sCopy;
  try {
    sCopy = resource.getResource().toPath();
  } catch (URISyntaxException e) {
    throw new IOException("Invalid resource", e);
  }
  FileSystem sourceFs = sCopy.getFileSystem(conf);
  FileStatus sStat = sourceFs.getFileStatus(sCopy);
  if (sStat.getModificationTime() != resource.getTimestamp()) {
    throw new IOException("Resource " + sCopy + " changed on src filesystem" +
        " - expected: " +
        "\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
        ", was: " +
        "\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
        ", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"");
  }
  if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
    if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
      throw new IOException("Resource " + sCopy +
          " is not publicly accessible and as such cannot be part of the" +
          " public cache.");
    }
  }

  downloadAndUnpack(sCopy, destination);
}

对于 DistributedShell 任务来讲,它会在运行时更改 ExecScript 文件为 ExecScript.shExecScript 是从 distributed-shell 文件复制而来)。在 S3 的实现中,更改文件名会造成时间戳被修改(HDFS 则不会更改)。关于这一点的详细解释可以查看官网的相关内容。以下截取 hadoop-yarn-applications-distributedshell 中的部分源码:

// The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!scriptPath.isEmpty()) {
  Path renamedScriptPath = null;
  if (Shell.WINDOWS) {
    renamedScriptPath = new Path(scriptPath + ".bat");
  } else {
    renamedScriptPath = new Path(scriptPath + ".sh");
  }

  try {
    // rename the script file based on the underlying OS syntax.
    renameScriptFile(renamedScriptPath);
  } catch (Exception e) {
    LOG.error(
        "Not able to add suffix (.bat/.sh) to the shell script filename",
        e);
    // We know we cannot continue launching the container
    // so we should release it.
    numCompletedContainers.incrementAndGet();
    numFailedContainers.incrementAndGet();
    return;
  }

  URL yarnUrl = null;
  try {
    yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
  } catch (URISyntaxException e) {
    LOG.error("Error when trying to use shell script path specified"
        + " in env, path=" + renamedScriptPath, e);
    // A failure scenario on bad input such as invalid shell script path
    // We know we cannot continue launching the container
    // so we should release it.
    // TODO
    numCompletedContainers.incrementAndGet();
    numFailedContainers.incrementAndGet();
    return;
  }
  LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
    LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
    shellScriptPathLen, shellScriptPathTimestamp);
  localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
      EXEC_SHELL_STRING_PATH, shellRsrc);
  shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}

解决方法:

去官网下载src包,使用idea打开hadoop-yarn-common项目,全局搜索关键字verifyAndCopy,或搜索文件FSDownload.java,修改代码如下:将错误修改为警告。

private void verifyAndCopy(Path destination)
    throws IOException, YarnException {
  final Path sCopy;
  try {
    sCopy = resource.getResource().toPath();
  } catch (URISyntaxException e) {
    throw new IOException("Invalid resource", e);
  }
  FileSystem sourceFs = sCopy.getFileSystem(conf);
  FileStatus sStat = sourceFs.getFileStatus(sCopy);
  if (sStat.getModificationTime() != resource.getTimestamp()) {
    /*
    throw new IOException("Resource " + sCopy + " changed on src filesystem" +
        " - expected: " +
        "\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
        ", was: " +
        "\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
        ", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"");
    */
    LOG.warn("Resource " + sCopy + " changed on src filesystem" +
            " - expected: " +
            "\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
            ", was: " +
            "\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
            ", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"" +
            ". Stop showing exception here, use a warning instead.");
  }
  if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
    if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
      throw new IOException("Resource " + sCopy +
          " is not publicly accessible and as such cannot be part of the" +
          " public cache.");
    }
  }

  downloadAndUnpack(sCopy, destination);
}

重新编译之后,将其替换进hadoop的share common目录下。

如果编译时出现版本问题,尝试修改pom.xml文件:

  <properties>
+   <java.version>1.8</java.version>
+   <java.releaseVersion>8</java.releaseVersion>
  </properties>

  <profiles>
    <profile>
      <id>java9</id>
      <activation>
        <jdk>[9,)</jdk>
      </activation>
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
+             <release>${java.releaseVersion}</release>
              <excludes>
                <exclude>org/apache/hadoop/yarn/webapp/hamlet/**</exclude>
              </excludes>
              <testExcludes>
                <exclude>org/apache/hadoop/yarn/webapp/hamlet/**</exclude>
              </testExcludes>
            </configuration>
          </plugin>
          <plugin>
            <artifactId>maven-javadoc-plugin</artifactId>
            <configuration>
              <excludePackageNames>org.apache.hadoop.yarn.webapp.hamlet</excludePackageNames>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>

Hive

hive-site.xml

要配置hive和hadoop交互,首先将上面的core-site.xml文件移动至hive的conf目录下,并编辑hive-site.xml文件。

hive的site-xml文件主要是配置metastore的连接信息等。

<configuration>
    <!-- 配置连接到Hive元数据存储所使用的数据库连接URL -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
    </property>

    <!-- 指定JDBC驱动,这里使用MySQL的JDBC驱动 -->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <!-- 数据库连接的用户名 -->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- 数据库连接的密码 -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>

    <!-- 设置Hive查询日志的存储位置 -->
    <property>
        <name>hive.querylog.location</name>
        <value>/hive/tmp</value>
    </property>

    <!-- 设置Hive执行本地作业的临时目录 -->
    <property>
        <name>hive.exec.local.scratchdir</name>
        <value>/hive/tmp</value>
    </property>

    <!-- 设置Hive下载资源的目录 -->
    <property>
        <name>hive.downloaded.resources.dir</name>
        <value>/hive/tmp</value>
    </property>

    <!-- 是否允许Hive服务以提交查询的用户身份执行查询 -->
    <property>
        <name>hive.server2.enable.doAs</name>
        <value>false</value>
    </property>

    <!-- 设置Hive的动态分区模式为非严格模式 -->
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>

    <!-- 配置Hive的额外JAR路径 -->
    <property>
        <name>hive.aux.jars.path</name>
        <value>file:///opt/module/hive-3.1.2/lib/hive-contrib-3.1.2.jar</value>
    </property>
</configuration>

hive-env.sh

编辑hive-env.sh

export HIVE_AUX_JARS_PATH=$HIVE_HOME/auxlib

auxlib

在hive的安装目录下创建auxlib文件夹,需要在该文件夹下引入一些第三方的包。

例如aws的包,为了与s3交互。

hive-contrib的包,为了在导入数据时让hive支持多字符分隔。

image-20240105143611929

Spark

hive底层的引擎默认时mr,也可以选择tez,在生产环境中我们一般使用spark。在代码开发中,也是使用Spark框架作为批处理,使用Flink或者SparkStreaming作为流处理引擎。

将hive-site.xml拷贝进spark的conf目录下。

spark-default.conf

在此处配置spark日志目录,yarn的历史日志目录,以及spark本身的ui端口。

spark.eventLog.enabled true
spark.eventLog.dir s3a://bucketname/directory
spark.yarn.historyServer.address hadoop102:18080
spark.history.ui.port 18080

spark-env.sh

在此处配置yarn的配置目录,并将spark日志配置设置为环境变量。

YARN_CONF_DIR=/opt/module/hadoop-3.3.1/etc/hadoop

export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=s3a://bucketname/directory
-Dspark.history.retainedApplications=30"

jars

在jars目录下,主要要拷贝进去aws相关的jar包。例如:

aws-java-sdk-bundle-1.11.901.jar
hadoop-aws-3.3.1.jar
kyuubi-hive-jdbc-shaded-1.8.0.jar

注意有时候要注释掉guaua包。

guava-14.0.1.jar.bak

引入kyuubi是为了便于让其整个多个数据源。

Kyuubi

简介

Kyuubi 是一个在 Apache Spark 上构建的开源分布式多用户服务,用于提供对 Spark 的直接交互式访问。它是基于 Apache Hive 的 Thrift JDBC/ODBC Server(也称为 HiveServer2)开发的,并专门针对大数据查询优化和性能改进。

Kyuubi 的主要功能和特点包括:

  1. 多用户支持:

    • Kyuubi 支持多个用户同时访问,能够处理多个查询请求,提供类似于传统数据库的并发访问能力。
  2. 易于集成:

    • 它可以很容易地与现有的 SQL 工具集成,如 BI 工具、SQL 客户端等,因为它支持标准的 JDBC/ODBC 接口。
  3. 性能优化:

    • 为了提高性能和资源利用率,Kyuubi 对 Spark 作业执行进行优化,包括连接管理、会话缓存等。
  4. 安全性:

    • 提供了安全的访问机制,支持多种认证方式,并能与 Kerberos 集成,确保数据和访问的安全。
  5. 简化Spark的使用:

    • Kyuubi 使得用户无需深入了解 Spark 的内部工作机制,就能通过 SQL 语句轻松地提交和运行 Spark 作业。
  6. 云原生和容器化支持:

    • 它支持在云环境和容器化平台(如 Kubernetes)上运行,使得部署和扩展更加灵活和方便。

Kyuubi 的设计目标是使得使用 Apache Spark 的体验更加类似于使用传统的关系型数据库,通过简化接口和提高性能,使其更加适合于企业级的数据分析和处理需求。

kyuubi-env.sh

在其中指定一些环境和参数。

export JAVA_HOME=$JAVA_HOME
export SPARK_HOME=$SPARK_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
export KYUUBI_JAVA_OPTS="-Xmx10g -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark -XX:MaxDirectMemorySize=1024m  -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./logs -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:./logs/kyuubi-server-gc-%t.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=5M -XX:NewRatio=3 -XX:MetaspaceSize=512m"
export KYUUBI_BEELINE_OPTS="-Xmx10g -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark"

注意我们在spark的jars目录下已经引入和aws包和kyuui的jdbc包,如果没引入,提交作业会出现错误。

项目开发

在项目开发时,我只提供一个pom.xml文件作为参考,其中的版本一定要兼容,否则会引发错误。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.nsfocus</groupId>
  <artifactId>wxqb_event_tag_produce</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>wxqb eventlog tag produce</description>
  <inceptionYear>2010</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.5</maven.compiler.source>
    <maven.compiler.target>1.5</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.15</scala.version>
  </properties>

  <!--
    <repositories>
      <repository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
      </repository>
    </repositories>

    <pluginRepositories>
      <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
      </pluginRepository>
    </pluginRepositories>
  -->
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest</artifactId>
      <version>1.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.2.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>3.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-aws</artifactId>
      <version>3.3.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle -->
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk-bundle</artifactId>
      <version>1.11.901</version>
    </dependency>


    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.37</version> <!-- 使用适当的版本 -->
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kyuubi/kyuubi-hive-jdbc-shaded -->
    <dependency>
      <groupId>org.apache.kyuubi</groupId>
      <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
      <version>1.8.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.6.0</version>
    </dependency>

    <!-- Jackson 核心库 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.12.3</version>
    </dependency>

    <!-- Jackson 数据绑定库 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.12.3</version>
    </dependency>

    <!-- Jackson 注解库 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.12.3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala -->
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.12</artifactId>
      <version>2.12.3</version>
    </dependency>


    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
      <version>2.3.8</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>7</source>
          <target>7</target>
        </configuration>
      </plugin>
      <!-- any other plugins -->
      <!--      <plugin>-->
      <!--        <artifactId>maven-assembly-plugin</artifactId>-->
      <!--        <executions>-->
      <!--          <execution>-->
      <!--            <phase>package</phase>-->
      <!--            <goals>-->
      <!--              <goal>single</goal>-->
      <!--            </goals>-->
      <!--          </execution>-->
      <!--        </executions>-->
      <!--        <configuration>-->
      <!--          <descriptorRefs>-->
      <!--            <descriptorRef>jar-with-dependencies</descriptorRef>-->
      <!--          </descriptorRefs>-->
      <!--        </configuration>-->
      <!--      </plugin>-->
    </plugins>
  </build>
</project>

   转载规则


《如何基于S3搭建hadoop大数据集群,并整合Hive、Spark》Pandy 采用 知识共享署名-非商业性使用 4.0 国际许可协议 进行许可。
 上一篇
春风又绿江南岸,明月何时照我还? 春风又绿江南岸,明月何时照我还?
昨天立春了。2024年的2月4日,农历腊月26。 看到朋友那边的花开了,想起了那首诗, 折花逢驿使,寄与陇头人。江南无所有,聊赠一枝春。 我在客户处,出师不利,但是心态很好,因为快要到春天了。 春天,四季新始,应该有个好的精神面貌。 北京最
2024-02-05
下一篇 
年会不能停 年会不能停
昨天跨年之夜和朋友吃了个饭,看了一部电影叫做《年会不能停》。是大鹏演的一部带有喜剧意味的关于职场裁员的片子。 我恍惚之间好像在这部片子中看到了我的名字??潘迪杨? 总体感觉这部片子挺有想法。关于理想、关于现实、关于生活,是关于在大城市打拼的
2024-01-01
  目录