ETL.NET 助力海量数据轻松处理

2023-12-15 19:24:35

随着数字化转型的不断深入,数据在企业和个人生活中扮演着日益重要的角色。

在企业方面,数据可以帮助企业更好地了解客户需求、市场趋势和业务表现,从而更好地制定战略和决策。此外,数据还可以帮助企业优化业务流程、提高效率和降低成本。

在个人生活方面,数据也扮演着越来越重要的角色。例如,我们使用智能手机、智能手表等设备时,这些设备会收集我们的健康数据、位置数据等信息,帮助我们更好地管理自己的生活和健康。此外,社交媒体、电子商务等应用程序也会收集我们的数据,以便更好地为我们提供个性化的服务和推荐。

什么是 ETL & EtlT ?

About ETL

ETL(抽取、转换和加载)是一种数据处理方法,它对于企业和个人来说都具有重要意义。

ETL

ETL 是一种数据处理方法,它由以下三个步骤组成:

  • 抽取(Extraction):从多个数据源中提取数据。
  • 转换(Transformation):对抽取的数据进行清洗、整合、转换和验证,以满足特定的需求。
  • 加载(Loading):将经过转换的数据加载到目标系统或数据仓库中。

About EtlT

EtlT 它拆分了原有 ETLELT 的结构,并力求 实时批量 统一在一起处理以满足实时数据仓库和 AI 应用的需求。

Etlt

EtlT 由以下四个步骤组成:

  • E(xtract) 抽取:从数据源角度来看,支持传统的线下数据库、传统文件、传统软件同时,还要支持新兴云上数据库、SaaS 软件 API 以及 Serverless 数据源的抽取;从数据抽取方式来看,需要支持实时 CDC(Change Data Capture)对数据库 Binlog 日志的解析,也要支持实时计算(例如 Kafka Streaming),同时也需要支持大批量数据读取(多线程分区读取、限流读取等)。
  • t(ransform) 规范化:相对于 ETL 和 ELT,EtLT 多出了一个小 t,它的目标是数据规范化(Data Normalization)将复杂、异构的抽取出来数据源,快速地变为目标端可加载的结构化数据,同时,针对 CDC 实时加载 Binlog 进行拆分、过滤、字段格式变更,并支持批量和实时方式快速分发到最终 Load 阶段。
  • L(oad) 加载:准确的说,加载阶段已经不是简单的数据加载,而是配合 Et 阶段,将数据源的数据结构的变更、数据内容的变更以适合数据目标端(Data Target)的形式快速、准确的加载到数据目标当中,其中,对于数据结构的变化要支持同源数据结构变更(Schema Evolution),数据加载也应该支持大批量加载(Bulk Load)、SaaS 加载(Reverse ETL)、JDBC 加载等。确保既支持实时数据和数据结构的变化,还要支持大批量数据快速加载。
  • T(ransform) 转化:在云数据仓库、线下数据仓库或新数据联邦的环境下,完成业务逻辑的加工,通常使用 SQL 方式,实时或批量地将复杂业务逻辑准确、快速变为业务端或者 AI 端使用的数据。

EtLT 架构下,使用者人群也有了明确的分工:

  • EtL 阶段:以数据工程师为主,他们将复杂异构的混合数据源,变为数据仓库或者数据联邦可加载的数据,放到数据存储当中,他们无需对企业指标计算规则有深入理解,但需要对各种源数据和非结构化数据变为结构化数据转化有深入理解。他们需要确保的是数据的及时性、数据源到结构化数据的准确性。
  • T 阶段:以数据分析师、各业务部门数据 SQL 开发者、AI 工程师为主,他们深刻理解企业业务规则,可以将业务规则变为底层结构化数据上的 SQL 语句进行分析统计,最终实现企业内部的数据分析和 AI 应用的实现,他们需要确保的是数据逻辑关系、数据质量以及最终数据结果满足业务需求。

通常情况下,在大数据的处理基本倾向于前面部分的 Etl 环节,而后面的 T 环节 的数据处理倾向于 SQL 统计分析,也就是俗称的 最后一公里

谈谈 ETL 作用

ETL 对企业的作用

  • 数据整合:企业通常有多个数据源,包括数据库、文件、应用程序等,ETL 能够将这些分散的数据整合在一起,为企业提供全面且一致的数据视图。

  • 数据清洗与质量控制ETL 可以清洗和验证数据,排除重复、不完整或不准确的数据,提高数据的质量和可靠性。

  • 决策支持:通过将多个数据源中的数据整合起来,并进行转换和分析,ETL 可以为企业提供准确的决策支持信息,帮助管理层做出更明智的决策。

  • 业务流程优化ETL 可以将数据从不同系统中抽取出来,并进行转换和加载,实现数据在不同系统之间的流动,优化业务流程,提高企业的效率和竞争力。

ETL 对个人职业发展的作用

  • 数据处理和分析能力:掌握 ETL 技术可以使个人具备处理和分析大规模数据的能力。在当今数据驱动的时代,数据处理和分析已成为许多职业领域的核心需求,如数据科学家、业务分析师、市场营销人员等。ETL的知识和技能使个人能够有效地抽取、转换和加载数据,为数据分析和洞察提供基础。

  • 数据整合和管理能力ETL 能够将分散的数据源整合为一致的数据视图。掌握ETL技术的个人可以有效管理和整合不同来源的数据,提供准确和可靠的数据资源。这对于数据管理岗位、数据架构师、数据工程师等职业来说尤为重要。

  • 决策支持和业务优化能力ETL 可以为企业提供准确的决策支持信息,并帮助优化业务流程。具备ETL技能的个人可以参与数据驱动的决策制定过程,通过数据抽取、转换和加载,为管理层提供准确的数据报告和分析结果,促进企业的业务发展和效率提升。

  • 跨行业换工作的机会ETL 技能是一种跨行业的技能,可以应用于各种行业和领域。掌握ETL技术可以使个人具备在不同行业和领域中进行数据处理和分析的能力,扩展个人的职业发展机会。无论是金融、健康、零售还是制造业等,ETL 技能都是广泛需求的,因此具备这一技能可以为个人提供更多的就业和发展选择。

ETL.NET 介绍

.neter 人员大数据处理框架终于来了。

ETL.NET 是一个完全用 .NET 编写的开源框架,可用于多平台使用,并可直接集成到任何 .NET 应用程序中。可以毫不费力地实现快速的、低内存的和易于维护的数据处理,即使是百万数据也能轻松应对。所有用于规范化、更新插入、查找或连接的工具都大大减少了任何导入和转换目的的工作量。处理跟踪的所有内容,错误跟踪都是为开发人员自动完成的。

ETL.NET

ETL.NET 功能特点

ETL.NET 是一组 .NET 库,允许将常规商业智能 ETL 功能嵌入到任何 .NET 应用程序中。

  • .NET 支持ETL.NET 完全用 .NET 编写,用于多平台使用,并可直接集成到任何 .NET 应用程序中。
  • 易于实施ETL.NET 的工作原理与 SSIS 类似,ETL 进程要像 Linq 查询一样用 .NET 编写。
  • 易于运行:简单明了的 .NETELT.NET 运行时无需安装即可执行 ETL 进程。

ETL.NET

缺少什么?没有问题!任何类型的扩展 都可以在瞬间实现,以创建新类型的数据源/目标或任何类型的运算符。ETL.NET 就是为此而设计的。

1、它包含 SSIS 的所有转换和功能

  • ETL.NET 提供了对任何数据源进行任何转换所需的每个运算符。ETL.NET 实现的运算符的灵感来自 SQL 提供的运算符:Map、Join、Sort、Distinct、Lookup、Top、Pivot、Cross Apply、Union、Group By、Aggregate 等;
  • 数据源,如 Excel,平面文件,如:csv、SQL Server、Xml、Entity Framework(实体框架) 等;
  • 通过自动过滤和保存功能跟踪整个活动;
  • Process 流程的参数化;

2、开箱即用的功能

ETL.NET 支持从多种数据类型和数据源中读取可写入数据,可以满足各种使用场景。

2.1、Read or write any file type and any data source.(读取或写入任何文件类型和任何数据源)

  • Native SQL server
  • Entity Framework
  • CSV
  • Excel
  • Bloomberg response files
  • Searchable PDF
  • XML
  • Anything .NET can read or write whatsoever(任何 .NET 可以读取或者写入的内容)

2.2、Read or write files on any source.(在任何源上读取或写入文件)

  • File system
  • FTP
  • SFTP
  • FTPS
  • Dropbox
  • eMail and MailBox
  • zip archives
  • Anything .NET can access whatsoever(任何 .NET 可以访问的内容)

如何使用 ETL.NET ?

ETL.NET 是一系列的类库,可以方便的通过 Nuget 包安装集成到任何 .NET 应用程序即可使用。

ETL.NET 相关资源

  • Github 项目地址,https://github.com/paillave/Etl.Net
  • ETL.NET 官网,https://paillave.github.io/Etl.Net/
  • Nuget 包资源,https://www.nuget.org/packages/Paillave.EtlNet.Core

Paillave.EtlNet 系列 Nuget 包

Paillave.EtlNet

Examples 应用举例

实验目标:提取特定格式 .zip 文件中所有的 .csv 文件数据,并处理数据写入到指定数据库表中。

1、创建控制台项目

  • 执行 .net cli 命令,创建 ConsoleAppEtl 项目:
dotnet new console -o ConsoleAppEtl --no-https -f net8.0
  • 查看创建控制台(console)项目更多帮助信息:
PS C:\Users\Jeffrey.Chai> dotnet new console -h
控制台应用 (C#)
作者: Microsoft
描述: 用于创建可在 Windows、Linux 和 macOS 上 .NET 上运行的命令行应用程序的项目

用法:
  dotnet new console [options] [模板选项]

选项:
  -n, --name <name>       正在创建的输出名称。如未指定名称,则使用输出目录的名称。
  -o, --output <output>   要放置生成的输出的位置。
  --dry-run               如果运行给定命令行将导致模板创建,则显示将发生情况的摘要。
  --force                 强制生成内容 (即使它会更改现有文件)。
  --no-update-check       在实例化模板时,禁用对模板包更新的检查。
  --project <project>     应用于上下文评估的项目。
  -lang, --language <C#>  指定要实例化的模板语言。
  --type <project>        指定要实例化的模板类型。

模板选项:
  -f, --framework <net6.0|net7.0|net8.0>  项目的目标框架。
                                          类型: choice
                                            net8.0  目标 net8.0
                                            net7.0  目标 net7.0
                                            net6.0  目标 net6.0
                                          默认: net8.0
  --langVersion <langVersion>             在创建的项目文件中设置 LangVersion 属性
                                          类型: text
  --no-restore                            如果指定,则在创建时跳过项目的自动还原。
                                          类型: bool
                                          默认: false
  --use-program-main                      是否生成显式程序类和主方法,而不是顶级语句。
                                          类型: bool
                                          默认: false
  --aot                                   是否启用将项目以 native AOT 发布。
                                          类型: bool
                                          默认: false

要查看有关其他模板语言(F#, VB)的帮助,请使用 --language 选项:
   dotnet new console -h --language F#

2、添加依赖 nuget 包

该项目中使用到的 nuget 包信息如下:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Paillave.EtlNet.Core" Version="2.0.47" />
    <PackageReference Include="Paillave.EtlNet.FileSystem" Version="2.0.47" />
    <PackageReference Include="Paillave.EtlNet.SqlServer" Version="2.0.47" />
    <PackageReference Include="Paillave.EtlNet.TextFile" Version="2.0.47" />
    <PackageReference Include="Paillave.EtlNet.Zip" Version="2.0.47" />
  </ItemGroup>

  <ItemGroup>
    <Folder Include="Files\Input\" />
  </ItemGroup>

</Project>

3、应用案例代码编写

在编写 demo 代码之前,先在项目中添加一个文件夹:

  • Files\Input\,用来模拟输入的数据源;

下面是一个 ETL.NET 处理数据的场景:

  1. 首先从文件夹中读取所有的 zip 文件。
  2. 解压 zip,并读取其中的 .csv 文件。
  3. 解析 .csv 文件内容,添加的 Person 的集合中。
  4. 去除重复项(以 email 作为唯一依据),并写入到 Sql Server 数据库中。

依据上面的处理流程,在 Program.cs 中编写如下代码:

using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using Paillave.Etl.Core;

namespace ConsoleAppEtl;

internal class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hello, ETL.NET! https://paillave.github.io/");

        var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
        processRunner.DebugNodeStream += (sender, e) => {
            /* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG ex: e.NodeName == "parse file" */
            Console.WriteLine($"NodeName = {e.NodeName},Count = {e.Count},HasError = {e.HasError},ToSequenceId = {e.ToSequenceId},FromSequenceId = {e.FromSequenceId}");
        };

        string inputFilesPath = @"C:\Users\Jeffrey.Chai\Desktop\test\ConsoleAppEtl\Files\Input";
        string connStr = "Data Source=.;User Id=sa;Password=123@etl.net;Initial Catalog=EtlTest;Encrypt=True;TrustServerCertificate=True;Pooling=true;Min Pool Size=1;Max Pool Size=10;";
        using (var cnx = new SqlConnection(connStr))
        {
            cnx.Open();
            var executionOptions = new ExecutionOptions<string>
            {
                Resolver = new SimpleDependencyResolver().Register(cnx)
            };

            var res = await processRunner.ExecuteAsync(config: inputFilesPath, options: executionOptions);
            if (res.Failed && res.ErrorTraceEvent != null)
            {
                Console.WriteLine($"errors:{res.ErrorTraceEvent.Content.Type},{res.ErrorTraceEvent.Content.Level},{res.ErrorTraceEvent.Content.Message}");
            }
            Console.WriteLine(res.Failed ? "Failed" : "Succeeded");
        }
    }

    /// <summary>
    /// 定义处理流程
    /// </summary>
    /// <param name="contextStream"></param>
    private static void DefineProcess(ISingleStream<string> contextStream)
    {
        contextStream
          .CrossApplyFolderFiles(name: "列出所有的 .zip 文件", pattern: "*.zip", recursive: true)
          .CrossApplyZipFiles(name: "从 .zip 解压出 .csv 文件", pattern: "*.csv")
          .CrossApplyTextFile(name: "解析 .csv 文件",
             args: FlatFileDefinition.Create(item => new Person
             {
                 Email = item.ToColumn("email"),
                 FirstName = item.ToColumn("first name"),
                 LastName = item.ToColumn("last name"),
                 DateOfBirth = item.ToDateColumn("date of birth", "yyyy-MM-dd"),
                 Reputation = item.ToNumberColumn<int?>("reputation", ".")
             }).IsColumnSeparated(','))
          .Distinct("email 去重", item => item.Email)
          .SqlServerSave("写入 mssql2022 数据库", o => o
             .ToTable("dbo.Person")
             .SeekOn(p => p.Email)
             .DoNotSave(p => p.Id))
             .Do("输出到控制台", item => Console.WriteLine($"fullname:{item.FirstName}-{item.LastName},email:{item.Email}"));
    }
    
    /// <summary>
    /// 数据库表实体模型
    /// </summary>
    private class Person
    {
        public int Id { get; set; }
        public string Email { get; set; }
        public string FirstName { get; set; }
        public string LastName { get; set; }
        public DateTime DateOfBirth { get; set; }
        public int? Reputation { get; set; }
    }
}

4、准备数据库表结构

4.1、创建数据库

创建数据库命名为 EtlTest,此处我就使用 docker 容器化部署一个 mssql2022,执行如下命令:

# 搜索镜像
docker search mssql
# 拉取镜像
docker pull mcr.microsoft.com/mssql/server:2022-latest

# 运行容器 mssql2022
docker run -d --name mssql2022 --hostname mssql2022 \
-p 1433:1433  \
-e "ACCEPT_EULA=Y" \
-e "MSSQL_SA_PASSWORD=123@etl.net" \
-e "TZ=Asia/Shanghai" \
-e "MSSQL_PID=Developer" \
-e "MSSQL_COLLATION=Chinese_PRC_BIN" \
mcr.microsoft.com/mssql/server:2022-latest

4.2、测试数据库连接

数据库容器运行成功后,使用数据库工具 dbeaver-ce 连接测试,看数据库是否能够正常访问,如果正常访问即显示如下信息:

mssql2022

4.3、创建数据库表

依据上面的 Person.cs 实体模型,创建数据表 Person,执行如下 sql 脚本:

-- mssql
CREATE TABLE "Person" (  
  "Id" INT identity(1,1) PRIMARY KEY NOT NULL, 
  "Email" VARCHAR(32) NOT NULL, 
  "FirstName" VARCHAR(32) NOT NULL, 
  "LastName" VARCHAR(32) NOT NULL, 
  "DateOfBirth" DateTime NOT NULL,
  "Reputation" INT NULL
);

此时该表的数据为空,改表主要用于从特定文件 .csv 提取数据,并写入表中保存。

5、准备数据源

这里我们准备的源数据是后缀为 .csv 类型的文件,并且压缩为 .zip 格式的文件夹。

Person.zip

Person.zip 文件存放入项目中 Files\Input 文件夹里面,该压缩文件中,存放了 3.csv 的文件,用于模拟分批到处数据的情况。下面 3 个文件中,分别在每个文件中放入 10 条数据。

person.csv

.csv 文件的内容格式类似如下:

"id","email","first name","last name","date of birth","reputation"
1,"123@qq.com",hu,pingan,"2023-12-12",10
2,"321@qq.com",ma,liuliu,"2023-11-12",5
3,"132@qq.com",zhan,xiaosan,"2023-12-15",2

验证测试

说明:Person.zip 文件可以存放多个结构相同的 .csv 文件,这里为了方便测试,模拟 3 个文件即可。

经过上面的环节,我们已经准备好了项目测试的基础条件。接下来我们就运行项目,启动看下,能否把 Person.zip 文件中的 3.csv 文件给解析出来,并提取到里面的数据写入到提前准备好的数据库表中。

执行完成,控制台输出信息如下:

Hello, ETL.NET! https://paillave.github.io/
fullname:项-栋,email:H6zQXwZ@gmail.com
fullname:谈-震,email:YbjDWRYLT@gmail.com
fullname:滑-超浩,email:NOtywnmL@gmail.com
fullname:慕容-伦,email:iqGQtl8Vf@gmail.com
fullname:司空-群,email:MvDxeaOs@gmail.com
fullname:壤驷-泰,email:OPcQBLZ@gmail.com
fullname:国-才,email:bMIhd6K@gmail.com
fullname:尤-建,email:OCrdarkYy@gmail.com
fullname:公西-毅,email:pYhCZZQ@gmail.com
fullname:邢-飞,email:8FDeFQ@gmail.com
fullname:昝-龙,email:Ybr1jWLv@gmail.com
fullname:麻-军,email:6hPwNdi@gmail.com
fullname:崔-朗,email:gdg2ghvZC@gmail.com
fullname:籍-清,email:y2fSVPoJ@gmail.com
fullname:缑-克,email:SOJ72Ih@gmail.com
fullname:赫连-广,email:WC4NLHXr@gmail.com
fullname:印-信,email:BsGL3fcV@gmail.com
fullname:扈-强,email:BO3P744@gmail.com
fullname:漆雕-波,email:LZA31yMtR@gmail.com
fullname:堵-山,email:kRIjZjmmA@gmail.com
fullname:左-民,email:mg02bO@gmail.com
fullname:乔-言若,email:tp4Nqq@gmail.com
fullname:郁-江,email:LwyEXTrAV@gmail.com
fullname:翟-新利,email:dgWodVMqn@gmail.com
fullname:褚-才,email:AFx57A8@gmail.com
fullname:郁-奇,email:KitTNOc5@gmail.com
fullname:寇-平,email:bwtG8Ipfr@gmail.com
fullname:须-厚,email:DLgJR4s@gmail.com
fullname:宓-奇,email:2wFB27f@gmail.com
fullname:窦-超浩,email:jbbmcenx@gmail.com
NodeName = 列出所有的 .zip 文件,Count = 1,HasError = False,ToSequenceId = 1,FromSequenceId = 1
NodeName = 从 .zip 解压出 .csv 文件,Count = 3,HasError = False,ToSequenceId = 5,FromSequenceId = 3
NodeName = 解析 .csv 文件,Count = 30,HasError = False,ToSequenceId = 123,FromSequenceId = 7
NodeName = email 去重,Count = 30,HasError = False,ToSequenceId = 124,FromSequenceId = 8
NodeName = 写入 mssql2022 数据库,Count = 30,HasError = False,ToSequenceId = 125,FromSequenceId = 9
NodeName = 输出到控制台,Count = 30,HasError = False,ToSequenceId = 126,FromSequenceId = 10
Succeeded

此时我再次查看数据库表信息,是否有把 .csv 文件的数据提取处理保存到指定的数据库表中。

SELECT Id, Email, FirstName, LastName, DateOfBirth, Reputation
FROM EtlTest.dbo.Person;

数据库表 Person 查询信息显示如下:

Person

结论: Person.zip 文件中的 3.csv 文件合计数据量 30 行,存入数据库表 EtlTest.dbo.Person 的数据符合预期。

文章来源:https://blog.csdn.net/ChaITSimpleLove/article/details/134997728
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。