常用大数据组件HA客户端的创建(JAVA版)

文章目录

    • 常用大数据组件HA客户端的创建(JAVA版)
      • Elasticsearch client HA
      • ClickHouse Client Advanced HA
      • HBase Client HA
      • HDFS Client HA
      • Hive Client HA
      • Kafka Client HA
      • Yarn Client HA

在大数据组件采用集群化部署,并实现了HA之后,客户端与服务端建立链接时,也可以保证HA。

Elasticsearch client HA

gradle引入依赖

dependencies {// elasticsearch.client    implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:6.7.2"    implementation "org.elasticsearch.client:transport:6.7.2"    implementation "org.elasticsearch.client:elasticsearch-rest-client:6.7.2"    implementation "org.elasticsearch.client:elasticsearch-rest-client-sniffer:6.7.2"}

若ES集群在部署的时候,没有采用类似VIP负载均衡形式的时候。无论是在创建TransportClient还是RestClient时,为了增加首次创建客户端连接的成功性,需要尽可能的将集群中的ES节点一起传入。整个集群的端口需要统一。

ES客户端提供了一种嗅探器,其允许自动发现运行中ES集群中的节点。可以开启嗅探,除了在客户端启动时配置嗅探器外可以在失败时启用嗅探器,在每次失败后,节点列表都会立即更新,而不是在接下来的普通嗅探循环中更新。

/**     * @param ip          ES节点的ip,多个使用逗号分隔,例如:192.168.1.1,192.168.1.2,192.168.1.3     * @param port        Http端口号     * @param clusterName 集群名称     * @param username    若开启安全,可传入用户名     * @param password    若开启安全,可传入密码     */    private RestHighLevelClient initSecurityRestHighLevelClientByIp(String ip, Integer port, String username, String password) {        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();        credentialsProvider.setCredentials(AuthScope.ANY, new                UsernamePasswordCredentials(username, password));        String[] ips = ip.split(",");        HttpHost[] httpHosts = new HttpHost[ips.length];        for (int i = 0; i < ips.length; i++) {            httpHosts[i] = new HttpHost(ips[i], port);        }        // 开启嗅探        SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();                 RestHighLevelClient highLevelClient = new RestHighLevelClient(                RestClient.builder(httpHosts).setFailureListener(sniffOnFailureListener)                        .setHttpClientConfigCallback(httpClientBuilder -> {                            httpClientBuilder.disableAuthCaching();                            return                                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);                        }));        // 嗅探器可以通过setSniffIntervalMills(以毫秒为单位)更新一次节点列表,配置其在1min中后进行嗅探        Sniffer sniffer = Sniffer.builder(highLevelClient.getLowLevelClient())                .setSniffAfterFailureDelayMillis(60000).build();        sniffOnFailureListener.setSniffer(sniffer);        return highLevelClient;    }/**     * @param ip          ES节点的ip,多个使用逗号分隔,例如:192.168.1.1,192.168.1.2,192.168.1.3     * @param port        tcp端口号     * @param clusterName 集群名称     */    private TransportClient initTransportClientByIp(String ip, Integer port, String clusterName) {        TransportClient client = null;        try {            Settings settings = Settings.builder()                    .put("cluster.name", clusterName)                    .put("client.transport.sniff", true)                    .build();            final PreBuiltTransportClient transportClient = new PreBuiltTransportClient(settings);            String[] ips = ip.split(",");            TransportAddress[] transportAddresses = new TransportAddress[ips.length];            for (int i = 0; i < ips.length; i++) {                transportAddresses[i] = new TransportAddress(InetAddress.getByName(ip), port);            }            client = transportClient.addTransportAddresses(transportAddresses);        } catch (Exception e) {            LOGGER.error("初始化客户端异常", e);        }        return client;    }

ClickHouse Client Advanced HA

gradle引入依赖

dependencies {// clickhouse-jdbc    implementation("com.clickhouse:clickhouse-jdbc:0.4.0:http")}

JDBC驱动式访问CH,采用HTTP的访问方式,随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端链接的高可用。

    public static Connection getConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException {        // 加载CH驱动        Driver driver = (Driver) Class.forName("com.clickhouse.jdbc.ClickHouseDriver").newInstance();        DriverManager.registerDriver(driver);         Properties props = new Properties();        // 随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端的高可用。        String url="jdbc:ch://(http://192.168.1.1:8123),(http://192.168.1.2:8123),(http://192.168.1.3:8123)/default?failover=1&load_balancing_policy=random";        return DriverManager.getConnection(url, props);    }

HBase Client HA

gradle引入依赖

dependencies {// hbase    implementation "org.apache.hbase:hbase-client:1.1.2"}

HBase客户端高可用是通过Zookeeper实现的。

 public static Connection getConnection() {        // 创建配置        Configuration config = HBaseConfiguration.create();        config.set("hbase.zookeeper.quorum", "192.168.1.1,192.168.1.2,192.168.1.3");        config.set("zookeeper.znode.parent", "/hbase-unsecure");        config.set("hbase.zookeeper.property.client.Port", "2181");        // 创建连接        Connection conn = null;        try {            UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hbase");            conn = ConnectionFactory.createConnection(config, User.create(userGroupInformation));        } catch (IOException e) {            LOGGER.error("HBaseClientHelper initConnect Exception.", e);        }        return conn;    }

HDFS Client HA

gradle引入依赖

dependencies {// hadoop-client    implementation("org.apache.hadoop:hadoop-client:2.7.3")}

hdfs需要的客户端就是hadoop client,hadoop的ha与hadoop集群内部的HA使用的相同的配置(来自core-site.xmlhdfs-site.xml),例如:

public static FileSystem fs = null;    public static void init() {        System.setProperty("HADOOP_USER_NAME", MonitorSupport.HADOOP_USER_NAME);        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://mycluster");        conf.set("dfs.nameservices", "mycluster");        conf.set("dfs.ha.namenodes.mycluster", "n1,n2");        conf.set("dfs.namenode.rpc-address.mycluster.n1", "192.168.1.1:8020");        conf.set("dfs.namenode.rpc-address.mycluster.n2", "192.168.1.2:8020");        conf.set("dfs.namenode.http-address.hadoopdemo.nn1", "192.168.1.1:50070");        conf.set("dfs.namenode.http-address.hadoopdemo.nn2", "192.168.1.2:50070");        conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");        conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);        conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);        try {            fs = FileSystem.get(conf);        } catch (IOException e) {            LOGGER.error(" Failed to initialize the HDFS client. ", e);        }    }

Hive Client HA

gradle引入依赖

dependencies {// hadoop-client    implementation("org.apache.hadoop:hadoop-client:2.7.3")    // hive-jdbc    implementation "org.apache.hive:hive-jdbc:1.2.1"}

Hive 本身也是使用的Hadoop Client,所有需要引入hadoo-client的包。

Hive分为HiveServer2 高可用和Metastore 高可用。业务方访问Hive是通过HiveServer2进行的。HiveServer2的高可用是通过Zookeeper来实现高可用。

java客户端可以通过JDBC的方式来访问ZK上HiveServer2的HA的namespace,来实现高可用。

    public static Connection getConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException {        // 加载hive驱动        Driver driver = (Driver) Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();        DriverManager.registerDriver(driver);         Properties props = new Properties();        props.put("user","hdfs");        // 随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端的高可用。        String url="jdbc:hive2://192.168.1.1:2181,192.168.1.2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk";        return DriverManager.getConnection(url, props);    }

Kafka Client HA

gradle引入依赖

dependencies {// kafka    implementation "org.apache.kafka:kafka-clients:1.0.0"}

kafka是通过broker来创建连接的,为了保证客户端连接的高可用,可以多配置一些broker地址。无论是AdminClient,还是KafkaProducer和KafkaConsumer,效果是一样有的。

// AdminClientProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);AdminClient kafkaAdminClient = AdminClient.create(PROPERTIES);// KafkaConsumerProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);PROPERTIES.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());PROPERTIES.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());PROPERTIES.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerAnalysisGroup-1");PROPERTIES.put(ConsumerConfig.CLIENT_ID_CONFIG, "monitor-consumer-client-1");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PROPERTIES);// KafkaProducerProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);PROPERTIES.put(ProducerConfig.CLIENT_ID_CONFIG, "monitor-producer-client-1");PROPERTIES.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());PROPERTIES.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(PROPERTIES);

Yarn Client HA

gradle引入依赖

dependencies {// hadoop-client    implementation("org.apache.hadoop:hadoop-client:2.7.3")}

Yarn需要的客户端与yarn集群内部的HA使用的相同的配置(来自yarn-site.xml),例如:

    YarnClient client = YarnClient.createYarnClient();    Configuration conf = new Configuration();    conf.set("yarn.resourcemanager.ha.enabled", "true");    conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");    conf.set("yarn.resourcemanager.hostname.rm1", "192.168.1.1");    conf.set("yarn.resourcemanager.hostname.rm2", "192.168.1.2");    client.init(conf);    client.start();