环境信息:

Hadoop 集群名称 data-test

Hadoop 节点 主机名 :

  1. hadoop01
  2. hadoop02
  3. hadoop03

写入 Parquet 文件到 Hdfs


  Path file = new Path(
                "hdfs:/test3.parquet");

        String schemaStr = "message schema {optional int64 id;optional binary idc_id;}";
        MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://data-test");
        configuration.set("dfs.nameservices","data-test");
        configuration.set("dfs.ha.namenodes.data-test","nn1,nn2,nn3");
        configuration.set("dfs.namenode.rpc-address.data-test.nn1","hadoop01:8020");
        configuration.set("dfs.namenode.rpc-address.data-test.nn2","hadoop02:8020");
        configuration.set("dfs.namenode.rpc-address.data-test.nn3","hadoop03:8020");

// 这个重点, 如果不配置这个 将会把 data-test 当成域名解析出现找不到 域名解析的错误
        configuration.set("dfs.client.failover.proxy.provider.data-test","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");




        ExampleParquetWriter.Builder builder = ExampleParquetWriter
                .builder(file).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withConf(configuration)
                .withType(schema);


        ParquetWriter<Group> writer = builder.build();
        SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
        long start = System.currentTimeMillis();
        for (long i = 0; i < 10000000L; i++) {
            writer.write(groupFactory.newGroup().append("id", 1000L).append("idc_id", "xxx"+ getSring()));
            if(i% 100000 == 0){
                LOGGER.error("写入10w行");
            }
        }
        writer.close();
        LOGGER.info("耗时: {} ms" ,(System.currentTimeMillis() - start) );
        System.err.println("写入完成..");

重 Hdfs 中读取 Parquet格式的文件


  Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://data-test");
        configuration.set("dfs.nameservices","data-test");
        configuration.set("dfs.ha.namenodes.data-test","nn1,nn2,nn3");
        configuration.set("dfs.namenode.rpc-address.data-test.nn1","hadoop01:8020");
        configuration.set("dfs.namenode.rpc-address.data-test.nn2","hadoop02:8020");
        configuration.set("dfs.namenode.rpc-address.data-test.nn3","hadoop03:8020");
        configuration.set("dfs.client.failover.proxy.provider.data-test","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        Path file = new Path(
                "hdfs:/test3.parquet");
        ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), file).withConf(configuration);

        ParquetReader<Group> reader = builder.build();
        SimpleGroup group =(SimpleGroup) reader.read();
        while (group != null){

            System.out.println("schema:"+group.getType().toString());
            System.out.println("idc_id:"+group.getString(1, 0));
            group = (SimpleGroup) reader.read();
        }

依赖的Jar包


 <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-reload4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-reload4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

这样就可以从配置了 Ha的hadoop 集群 读取和写入 Parquet文件了

导入的java包

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.UUID;