Apache Flume 数据采集的配置与优化秘籍


想到就做到
想到就做到 2024-10-10 11:03:26 3494 赞同 0 反对 0
分类: 资源 标签: 运维 信创知识
ApacheFlume数据采集的配置与优化秘籍

大数据的价值如璀璨明珠,而高效的数据采集则是开启这一宝藏的关键钥匙。Apache Flume 作为一款强大的分布式数据采集系统,在大数据舞台上扮演着举足轻重的角色。

一、Apache Flume 简介
1.1 什么是 Apache Flume

Apache Flume 是一个高度分布式、可靠且高可用的服务,专为高效收集、聚合和移动大量日志数据而生。其灵活的架构赋予了它从各种数据源(包括文件、网络端口、数据库等)采集数据的能力,并能将数据顺畅地传输至多种目标存储系统(如 HDFS、NoSQL 数据库、消息队列等)。

1.2 Apache Flume 的特点

分布式架构:Flume 以分布式架构为基石,可在多个节点上并行运行,轻松应对大规模数据采集的艰巨挑战。无论数据规模如何庞大,它都能高效地进行数据收集,确保数据的完整性与及时性。例如,在一个大型电商平台的日志数据采集项目中,Flume 的分布式架构使得海量的用户行为日志能够被迅速采集和处理,为后续的数据分析提供了坚实基础。
高可靠性:通过严谨的事务机制,Flume 为数据的可靠传输保驾护航。即使在网络故障或节点故障的困境中,它也能自动重试,坚决不让数据丢失。以金融数据采集为例,在一个金融机构的大数据项目中,Flume 确保了每一笔交易数据都能准确无误地被采集和传输,为金融分析提供了可靠的数据支撑。
灵活性强:支持丰富多样的数据源和数据输出方式,可根据不同的业务需求进行灵活配置。无论是从文件、网络端口、数据库,还是其他数据源采集数据,Flume 都能游刃有余地应对。同时,它可以将数据输出到 HDFS、NoSQL 数据库、消息队列等多种目标存储系统,满足各种数据存储需求。例如,在一个社交媒体平台的数据分析项目中,Flume 可以根据不同的数据类型和分析需求,灵活地选择数据源和输出方式,为平台的运营决策提供有力支持。
1.3 Apache Flume 的工作原理

Flume 主要由 Agent、Source、Channel 和 Sink 组成。

Source:作为数据采集的先锋,负责从数据源采集数据。常见的 Source 类型有文件 Source、网络端口 Source、Avro Source 等。例如,文件 Source 能够监控一个指定的文件,当文件中有新的数据写入时,它会自动读取并将数据发送到 Channel。
Channel:如同数据的临时中转站,用于存储从 Source 采集到的数据。Flume 提供了多种类型的 Channel,如内存 Channel、文件 Channel、JDBC Channel 等。内存 Channel 速度飞快,但可能因内存不足而导致数据丢失;文件 Channel 可靠性高,但性能相对较低。在实际应用中,需根据具体需求精心选择合适的 Channel 类型。
Sink:数据传输的终点,将 Channel 中的数据输出到目标存储系统。常见的 Sink 类型有 HDFS Sink、NoSQL Sink、Avro Sink 等。例如,HDFS Sink 可以将数据写入到 Hadoop Distributed File System(HDFS)中,为后续的大数据分析搭建起坚实的数据平台。
Agent:是 Flume 的基本运行单元,由一个或多个 Source、Channel 和 Sink 组成。一个 Agent 可以负责从一个特定的数据源采集数据,并将数据输出到一个或多个目标存储系统。
二、Apache Flume 的安装与部署
2.1 安装前准备

确保系统已安装 Java 环境,Flume 是基于 Java 开发的,需要 Java 运行时环境支持。

Apache Flume 可以从其官方网站下载,下载地址为(如图):https://archive.apache.org/dist/flume/。这里我们以 1.9.0 版本为例进行安装。

/storage/22/article_pic/20241010/1728529406670743feae331.png


2.2 安装步骤

2.2.1 解压安装包
将下载的 Flume 安装包解压到指定目录,例如:

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/

2.2.2 配置环境变量
在系统的环境变量文件中(如 .bash_profile 或 .zshrc)添加 Flume 的安装路径,以便在任何目录下都能方便地运行 Flume 命令。

export FLUME_HOME=/opt/apache-flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

保存文件后,使环境变量生效。

2.2.3 验证安装
在命令行中输入 flume-ng version,如果能正确显示 Flume 的版本信息,则说明安装成功。

2.3 部署 Flume Agent

2.3.1 创建配置文件
根据实际需求创建 Flume 的配置文件,例如 flume-conf.properties,配置文件中包含 Agent 的名称、Source、Channel 和 Sink 的配置信息。
2.3.2 启动 Agent
在命令行中使用以下命令启动 Flume Agent:

flume-ng agent -n agentName -c conf -f /path/to/flume-conf.properties
1
其中,agentName 是配置文件中定义的 Agent 名称,conf 是 Flume 的配置目录,/path/to/flume-conf.properties 是配置文件的路径。

三、Apache Flume 的配置
3.1 配置文件结构

Flume 的配置文件通常以 .conf 为扩展名,其结构清晰,易于理解和修改。配置文件主要包括 Agent 名称、Source、Channel 和 Sink 的配置信息。

以下是一个简单的 Flume 配置文件示例:

# 定义 Agent 名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# 配置 Source
agent1.sources.source1.type = file
agent1.sources.source1.channels = channel1
agent1.sources.source1.file = /path/to/logfile.log

# 配置 Channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000

# 配置 Sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hdfs.path = /flume/data
agent1.sinks.sink1.hdfs.fileType = DataStream


在这个配置文件中,我们定义了一个名为 agent1 的 Agent,它从一个指定的文件中采集数据,将数据存储在内存 Channel 中,然后将数据输出到 HDFS 中。

3.2 常见配置参数

Source 相关参数:
type:指定 Source 的类型,如上例中的 file 表示文件 Source。
file:对于文件 Source,指定要监控的文件路径。
interceptors:可以配置拦截器,对采集到的数据进行预处理。例如,可以使用时间戳拦截器为数据添加时间戳。
Channel 相关参数:
type:指定 Channel 的类型,如上例中的 memory 表示内存 Channel。
capacity:Channel 的容量,即可以存储的数据条数。
transactionCapacity:一次事务中可以处理的数据条数。
Sink 相关参数:
type:指定 Sink 的类型,如上例中的 hdfs 表示 HDFS Sink。
channel:指定要使用的 Channel。
hdfs.path:输出到 HDFS 的路径。
hdfs.fileType:指定输出文件的类型,如 DataStream 表示普通文本文件。
四、Apache Flume 的高级功能
4.1 自定义拦截器

Flume 允许用户自定义拦截器,以满足特定的数据处理需求。拦截器可以在数据从 Source 传输到 Channel 的过程中对数据进行预处理。

例如,假设我们需要过滤掉特定关键词的数据,可以实现一个自定义的拦截器:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class KeywordFilterInterceptor implements Interceptor {

private String keywordToFilter;

public KeywordFilterInterceptor(String keywordToFilter) {
this.keywordToFilter = keywordToFilter;
}

@Override
public void initialize() {}

@Override
public Event intercept(Event event) {
String eventData = new String(event.getBody());
if (!eventData.contains(keywordToFilter)) {
return event;
}
return null;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> filteredEvents = new ArrayList<>();
for (Event event : events) {
Event filteredEvent = intercept(event);
if (filteredEvent!= null) {
filteredEvents.add(filteredEvent);
}
}
return filteredEvents;
}

@Override
public void close() {}

public static class Builder implements Interceptor.Builder {

private String keywordToFilter;

@Override
public Interceptor build() {
return new KeywordFilterInterceptor(keywordToFilter);
}

@Override
public void configure(Context context) {
keywordToFilter = context.getString("keywordToFilter", "defaultKeyword");
}
}
}


在 Flume 配置文件中使用自定义拦截器:

agent.sources.source1.interceptors = i1
agent.sources.source1.interceptors.i1.type = com.example.KeywordFilterInterceptor$Builder
agent.sources.source1.interceptors.i1.keywordToFilter = specificKeyword

自定义拦截器的应用场景非常广泛。比如在网络安全领域,可以通过拦截器对网络流量数据进行分析,过滤掉潜在的恶意攻击流量;在电商数据分析中,可以根据特定的用户行为模式进行数据筛选,为精准营销提供更有价值的数据。

4.2 多 Agent 级联

在复杂的大数据采集场景中,可以使用多个 Flume Agent 进行级联,以实现更灵活的数据传输和处理。

例如,第一个 Agent 从数据源采集数据,然后将数据传输给第二个 Agent,第二个 Agent 可以对数据进行进一步处理后再输出到目标存储系统。

# Agent 1 configuration
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

agent1.sources.source1.type = file
agent1.sources.source1.channels = channel1
agent1.sources.source1.file = /path/to/source/logs

agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hostname = agent2-host
agent1.sinks.sink1.port = 4141

# Agent 2 configuration
agent2.sources = source2
agent2.channels = channel2
agent2.sinks = sink2

agent2.sources.source2.type = avro
agent2.sources.source2.channels = channel2
agent2.sources.source2.bind = agent2-host
agent2.sources.source2.port = 4141

agent2.sinks.sink2.type = hdfs
agent2.sinks.sink2.channel = channel2
agent2.sinks.sink2.hdfs.path = /flume/processed/data


多 Agent 级联可以实现数据的分布式处理和负载均衡。例如在一个大型企业的大数据架构中,可以在不同的部门或地理位置部署多个 Agent,将数据逐步汇总和处理,提高系统的可扩展性和可靠性。

五、Apache Flume 的优化
5.1 性能优化

性能优化是确保 Flume 在大数据采集过程中高效运行的关键。以下是一些性能优化的策略:

调整 Channel 参数:

根据数据量和系统资源情况,合理调整 Channel 的容量和事务容量。如果数据量较大,可以适当增加容量以减少数据积压。例如,在一个高流量的日志采集项目中,将内存 Channel 的容量从默认的 10000 增加到 50000,可以更好地应对大量数据的临时存储需求。同时,要密切关注系统内存使用情况,避免因容量设置过大而导致内存不足的问题。如果内存资源紧张,可以考虑使用文件 Channel,它将数据存储在磁盘上,虽然性能相对较低,但可以提供更大的存储容量。
在调整事务容量时,需要综合考虑数据传输的效率和系统资源的消耗。如果事务容量设置过小,可能会导致频繁的事务提交,增加系统开销;如果事务容量设置过大,可能会导致数据处理延迟增加。可以通过实际测试和监控来确定最佳的事务容量值。
选择合适的 Sink:

不同的 Sink 类型在性能上有所差异。例如,HDFS Sink 在写入大量小文件时可能会导致性能下降,可以考虑使用 SequenceFile 等方式进行优化。SequenceFile 可以将多个小文件合并成一个大文件,减少文件数量,提高写入性能。同时,可以调整 HDFS Sink 的参数,如 block size、replication factor 等,以进一步优化性能。
对于高并发的场景,可以选择使用 Kafka Sink 将数据发送到消息队列,然后再进行后续处理。Kafka 具有高吞吐量和低延迟的特点,可以很好地应对高并发的数据采集需求。在配置 Kafka Sink 时,可以调整参数如 batch size、linger.ms 等,以优化数据的发送效率。
使用拦截器:

拦截器可以在数据采集过程中对数据进行预处理,减少后续处理的负担。例如,可以使用正则表达式拦截器过滤掉不需要的数据,或者使用时间戳拦截器为数据添加时间戳。在一个电商数据采集项目中,使用正则表达式拦截器过滤掉无效的用户行为数据,只保留有价值的数据进行传输和存储,大大减少了数据处理的工作量,提高了系统性能。
可以根据实际需求组合使用多个拦截器,实现更复杂的数据预处理功能。同时,要注意拦截器的性能开销,避免因拦截器处理过于复杂而影响数据采集的效率。
5.2 可靠性优化

可靠性是大数据采集的重要考量因素,确保数据的准确传输和存储至关重要。

配置多个 Channel 和 Sink:

可以配置多个 Channel 和 Sink,实现数据的冗余存储和备份。如果一个 Channel 或 Sink 出现故障,数据可以通过其他通道进行传输,确保数据的可靠性。例如,在一个金融数据监控项目中,配置两个文件 Channel 和两个 HDFS Sink,当一个 Channel 或 Sink 出现故障时,数据可以自动切换到另一个通道进行传输和存储,保证了金融交易数据的安全可靠。
可以使用 Flume 的负载均衡和故障转移机制,实现 Channel 和 Sink 的动态分配和切换。同时,要定期对多个 Channel 和 Sink 进行监控和维护,确保它们的正常运行。
设置监控和报警:

通过设置监控指标,如数据采集速率、Channel 使用率等,可以及时发现系统中的问题。可以使用 Flume 的监控工具或者第三方监控软件来实时监测 Flume 的运行状态。例如,使用 Grafana 等监控工具,通过配置 Flume 的 JMX 指标,可以直观地查看 Flume 的运行状态和性能指标。
同时,可以配置报警机制,当出现异常情况时及时通知管理员进行处理。例如,当数据采集速率下降到一定程度或者 Channel 使用率超过一定阈值时,发送邮件或短信通知管理员,以便及时采取措施解决问题。可以使用 Nagios、Zabbix 等监控软件来实现报警功能。
六、经典案例分析
6.1 电商数据采集

在一个电商平台中,需要采集用户的浏览记录、购买记录等数据进行分析。可以使用 Flume 从电商平台的日志文件中采集数据,将数据存储在 HDFS 中进行后续的大数据分析。

以下是一个电商数据采集的 Flume 配置示例:

# 定义 Agent 名称
agent2.sources = source2
agent2.channels = channel2
agent2.sinks = sink2

# 配置 Source
agent2.sources.source2.type = exec
agent2.sources.source2.command = tail -F /path/to/ecommerce/logs/*.log
agent2.sources.source2.channels = channel2

# 配置 Channel
agent2.channels.channel2.type = file
agent2.channels.channel2.checkpointDir = /flume/checkpoints/channel2
agent2.channels.channel2.dataDirs = /flume/data/channel2

# 配置 Sink
agent2.sinks.sink2.type = hdfs
agent2.sinks.sink2.channel = channel2
agent2.sinks.sink2.hdfs.path = /flume/ecommerce/data
agent2.sinks.sink2.hdfs.fileType = DataStream


在这个配置中,我们使用 exec Source 从电商平台的日志文件中采集数据,将数据存储在文件 Channel 中,然后将数据输出到 HDFS 中进行存储。

6.2 金融数据监控

在金融领域,需要实时监控交易数据,以便及时发现异常情况。可以使用 Flume 从金融交易系统的数据库中采集数据,将数据发送到实时分析平台进行处理。

以下是一个金融数据监控的 Flume 配置示例:

# 定义 Agent 名称
agent3.sources = source3
agent3.channels = channel3
agent3.sinks = sink3

# 配置 Source
agent3.sources.source3.type = jdbc
agent3.sources.source3.url = jdbc:mysql://localhost:3306/finance_db
agent3.sources.source3.user = username
agent3.sources.source3.password = password
agent3.sources.source3.query = SELECT * FROM transactions WHERE timestamp > :lastFetchTime
agent3.sources.source3.runQueryDelay = 60000
agent3.sources.source3.channels = channel3

# 配置 Channel
agent3.channels.channel3.type = memory
agent3.channels.channel3.capacity = 10000
agent3.channels.channel3.transactionCapacity = 1000

# 配置 Sink
agent3.sinks.sink3.type = avro
agent3.sinks.sink3.channel = channel3
agent3.sinks.sink3.hostname = realtime_analysis_server
agent3.sinks.sink3.port = 4141


在这个配置中,我们使用 jdbc Source 从金融数据库中采集交易数据,将数据存储在内存 Channel 中,然后将数据通过 Avro Sink 发送到实时分析服务器进行处理。

在金融数据监控场景中,Flume 的高可靠性和实时性至关重要。通过合理配置多个 Channel 和 Sink,以及设置监控和报警机制,可以确保金融交易数据的安全和及时处理。例如,当某个 Channel 出现故障时,系统可以自动切换到备用 Channel,保证数据的不间断传输。同时,通过实时监控数据采集速率和 Channel 使用率等指标,可以及时发现潜在的问题,并采取相应的措施进行处理,确保金融数据监控的稳定性和准确性。

七、Flume 在大规模分布式系统中的应用
7.1 大规模分布式部署策略

在大规模分布式系统中,Flume 的部署需要考虑多方面因素。可以采用分层部署的方式,将不同类型的数据源分配到不同的 Agent 进行采集,然后通过多级 Agent 的级联,将数据逐步汇总到中心存储系统。

例如,在一个拥有多个数据中心的企业中,可以在每个数据中心部署一组 Flume Agent,负责采集本地的数据。然后,这些 Agent 将数据传输到区域中心的 Agent,区域中心的 Agent 再将数据传输到总部的中心存储系统。这样的分层部署方式可以有效地分散数据采集的压力,提高系统的可扩展性和可靠性。

7.2 性能优化与挑战

在大规模分布式环境下,Flume 面临着一些性能挑战。数据量的巨大增长可能导致 Channel 的容量和事务容量需要不断调整,以避免数据积压。同时,选择合适的 Sink 类型和参数也变得更加关键。例如,对于大规模的分布式文件系统,可能需要调整 HDFS Sink 的参数,以提高写入性能和数据的可靠性。

此外,网络延迟和带宽限制也可能影响数据传输的效率。可以通过优化网络配置、使用压缩技术等方式来减少网络传输的压力。同时,合理配置 Flume 的负载均衡和故障转移机制,可以在网络出现问题时保证数据的可靠传输。

八、Flume 与其他大数据工具集成的深入分析
8.1 Flume 与 Spark 的集成优势与应用场景

Flume 与 Spark 的集成可以实现实时数据分析。Spark Streaming 具有高吞吐量和低延迟的特点,与 Flume 的高效数据采集能力相结合,可以快速处理实时流入的数据。

例如,在物联网数据分析中,可以使用 Flume 采集传感器数据,然后将数据实时传输给 Spark Streaming 进行实时分析。通过这种集成方式,可以及时发现设备的异常情况,为设备维护和管理提供决策支持。

8.2 Flume 与 Flink 的集成特点与性能优势

Flume 与 Flink 的集成也具有很多优势。Flink 提供了丰富的数据分析功能和强大的流处理能力,与 Flume 的数据采集功能相结合,可以实现复杂的大数据处理任务。

例如,在金融风险监测中,可以使用 Flume 采集交易数据,然后将数据传输给 Flink 进行实时风险评估。Flink 的精确一次处理语义可以确保数据的准确性和可靠性,为金融机构提供可靠的风险监测服务。

九、实际操作中的常见问题及解决方法
9.1 配置文件冲突

在实际应用中,可能会出现配置文件冲突的情况。这可能是由于多个 Flume Agent 的配置文件中存在相同的参数设置,或者与其他系统的配置文件冲突。

解决方法:仔细检查配置文件,确保参数设置的唯一性。可以使用命名规范来区分不同的 Agent 和配置参数。同时,可以使用版本控制工具来管理配置文件,以便在出现问题时能够快速回溯和修复。

9.2 数据传输中断

数据传输中断可能是由于网络故障、Source 或 Sink 出现问题等原因引起的。

解决方法:首先,检查网络连接是否正常。如果网络出现问题,及时修复网络故障。其次,检查 Source 和 Sink 的状态,确保它们正常运行。可以通过查看 Flume 的日志文件来获取更多的错误信息。如果是 Source 或 Sink 的配置问题,可以根据错误信息进行调整。

9.3 与特定数据库集成时的问题

当与特定数据库集成时,可能会出现兼容性问题或者性能问题。

解决方法:对于兼容性问题,需要确保 Flume 的数据库 Source 或 Sink 与数据库的版本和驱动程序兼容。可以查看 Flume 的官方文档和数据库的文档,了解支持的版本和配置方法。对于性能问题,可以调整数据库的连接参数、查询语句等,以提高数据采集的效率。

十、性能测试与调优指标
10.1 性能测试工具与方法

可以使用一些性能测试工具来评估 Flume 的性能。例如,可以使用 Apache JMeter 来模拟数据源,向 Flume 发送大量的数据,然后观察 Flume 的数据采集速率、延迟等指标。

在进行性能测试时,需要注意测试环境的真实性和可重复性。可以使用与实际生产环境相似的配置和数据量进行测试,以便获得更准确的性能评估结果。

10.2 调优指标与目标

性能调优的指标包括数据采集速率、延迟、系统资源利用率等。调优的目标是在满足业务需求的前提下,提高系统的性能和可靠性,同时降低系统资源的消耗。

例如,可以通过调整 Channel 的容量和事务容量、选择合适的 Sink 类型和参数等方式来提高数据采集速率。同时,可以通过监控系统资源的使用情况,如内存、CPU、网络带宽等,来优化系统的资源利用率。

结束语:
Apache Flume 作为一款强大的数据采集工具,在大数据领域中发挥着重要的作用。通过合理的安装、配置和优化,以及利用其高级功能和与其他大数据工具的集成,可以实现高效、可靠的数据采集和处理,为大数据分析提供坚实的数据基础。在实际应用中,我们需要根据具体的业务需求和数据特点,选择合适的配置参数和优化策略,充分发挥 Flume 的优势。
————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/atgfg/article/details/142501438

 

免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://
fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!

评价 0 条
想到就做到L3
粉丝 0 资源 5 + 关注 私信
最近热门资源
分享如何统信UOS系统在屏蔽mysql显性的用户名称以及密码  612
分享免费开源高速下载器  577
分享如何在银河麒麟高级服务器操作系统V10SP3中需要启用内核审计功能。  572
通过shell脚本在统信UOS/麒麟系统中安装nginx  504
分享如何查看网卡中断的数量  422
分享查询网卡所在PCI插槽链路能力及当前链路状态  420
麒麟系统进行内存清理  413
统信UOS常见问题小总结  411
麒麟系统资源下载合集(适配各类cpu)  409
winrar绿色无广告版分享  393
最近下载排行榜
分享如何统信UOS系统在屏蔽mysql显性的用户名称以及密码 0
分享免费开源高速下载器 0
分享如何在银河麒麟高级服务器操作系统V10SP3中需要启用内核审计功能。 0
通过shell脚本在统信UOS/麒麟系统中安装nginx 0
分享如何查看网卡中断的数量 0
分享查询网卡所在PCI插槽链路能力及当前链路状态 0
麒麟系统进行内存清理 0
统信UOS常见问题小总结 0
麒麟系统资源下载合集(适配各类cpu) 0
winrar绿色无广告版分享 0
作者收入月榜
1

prtyaa 收益395.97元

2

zlj141319 收益228.47元

3

IT-feng 收益214.92元

4

1843880570 收益214.2元

5

风晓 收益208.24元

6

777 收益173.02元

7

哆啦漫漫喵 收益131.6元

8

Fhawking 收益106.6元

9

信创来了 收益105.97元

10

克里斯蒂亚诺诺 收益91.08元

请使用微信扫码

加入交流群

请使用微信扫一扫!