pyetl是一个纯python开发的ETL框架, 相比sqoop, datax 之类的ETL工具,pyetl可以对每个字段添加udf函数,使得数据转换过程更加灵活,相比专业ETL工具pyetl更轻量,纯python代码操作,更加符合开发人员习惯
安装
pip3 install pyetl
使用示例
数据库表之间数据同步
from pyetl import Task, DatabaseReader, DatabaseWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target") Task(reader, writer).start()
数据库表到hive表同步
from pyetl import Task, DatabaseReader, HiveWriter2 reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = HiveWriter2("hive://localhost:10000/default", table_name="target") Task(reader, writer).start()
数据库表同步es
from pyetl import Task, DatabaseReader, ElasticSearchWriter reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source") writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget") Task(reader, writer).start()
原始表目标表字段名称不同,需要添加字段映射
添加
# 原始表source包含uuid,full_name字段 reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") # 目标表target包含id,name字段 writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") # columns配置目标表和原始表的字段映射关系 columns = {"id": "uuid", "name": "full_name"} Task(reader, writer, columns=columns).start()
字段的udf映射,对字段进行规则校验、数据标准化、数据清洗等
# functions配置字段的udf映射,如下id转字符串,name去除前后空格 functions={"id": str, "name": lambda x: x.strip()} Task(reader, writer, columns=columns, functions=functions).start()
继承Task类灵活扩展ETL任务
import json from pyetl import Task, DatabaseReader, DatabaseWriter class NewTask(Task): reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") def get_columns(self): """通过函数的方式生成字段映射配置,使用更灵活""" # 以下示例将数据库中的字段映射配置取出后转字典类型返回 sql = "select columns from task where name='new_task'" columns = self.writer.db.read_one(sql)["columns"] return json.loads(columns) def get_functions(self): """通过函数的方式生成字段的udf映射""" # 以下示例将每个字段类型都转换为字符串 return {col: str for col in self.columns} def apply_function(self, record): """数据流中对一整条数据的udf""" record["flag"] = int(record["id"]) % 2 return record def before(self): """任务开始前要执行的操作, 如初始化任务表,创建目标表等""" sql = "create table destination_table(id int, name varchar(100))" self.writer.db.execute(sql) def after(self): """任务完成后要执行的操作,如更新任务状态等""" sql = "update task set status='done' where name='new_task'" self.writer.db.execute(sql) NewTask().start()
目前已实现Reader和Writer列表
Reader
介绍
DatabaseReader
支持所有关系型数据库的读取
FileReader
结构化文本数据读取,如csv文件
ExcelReader
Excel表文件读取
Writer
介绍
DatabaseWriter
支持所有关系型数据库的写入
ElasticSearchWriter
批量写入数据到es索引
HiveWriter
批量插入hive表
HiveWriter2
Load data方式导入hive表(推荐)
FileWriter
写入数据到文本文件
项目地址pyetl
总结
RTX 5090要首发 性能要翻倍!三星展示GDDR7显存
三星在GTC上展示了专为下一代游戏GPU设计的GDDR7内存。
首次推出的GDDR7内存模块密度为16GB,每个模块容量为2GB。其速度预设为32 Gbps(PAM3),但也可以降至28 Gbps,以提高产量和初始阶段的整体性能和成本效益。
据三星表示,GDDR7内存的能效将提高20%,同时工作电压仅为1.1V,低于标准的1.2V。通过采用更新的封装材料和优化的电路设计,使得在高速运行时的发热量降低,GDDR7的热阻比GDDR6降低了70%。
更新动态
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]