博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《Flink官方文档》Python 编程指南测试版(二)
阅读量:6980 次
发布时间:2019-06-27

本文共 4983 字,大约阅读时间需要 16 分钟。

为元组定义keys

最简单的情形是对一个数据集中的元组按照一个或多个域进行分组:

reduced = data \  .group_by(0) \  .reduce_group(
)

数据集中的元组被按照第一个域分组。对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值。

grouped = data \  .group_by(0,1) \  .reduce(/*do something*/)

在上面的例子中,数据集的分组基于第一个和第二个域形成的复合关键字,因此,reduce函数输入数据组中,每个元组两个域的值均相同。

关于嵌套元组需要注意:如果你有一个使用了嵌套元组的数据集,指定group_by(<index of tuple>)操作,系统将把整个元组作为关键字使用。

向Flink传递函数

一些特定的操作需要采用用户自定义的函数,因此它们都接受lambda表达式和rich functions作为输入参数。

data.filter(lambda x: x > 5)
class Filter(FilterFunction):    def filter(self, value):        return value > 5data.filter(Filter())

Rich functions可以将函数作为输入参数,允许使用broadcast-variables(广播变量),能够由init()函数参数化,是复杂函数的一个可考虑的实现方式。它们也是在reduce操作中,定义一个可选的combine function的唯一方式。

Lambda表达式可以让函数在一行代码上实现,非常便捷。需要注意的是,如果某个操作会返回多个数值,则其使用的lambda表达式应当返回一个迭代器。(所有函数将接收一个collector输入 参数)。

数据类型

Flink的Python API目前仅支持python中的基本数据类型(int,float,bool,string)以及byte arrays。

运行环境对数据类型的支持,包括序列化器serializer,反序列化器deserializer,以及自定义类型的类。

class MyObj(object):    def __init__(self, i):        self.value = iclass MySerializer(object):    def serialize(self, value):        return struct.pack(">i", value.value)class MyDeserializer(object):    def _deserialize(self, read):        i = struct.unpack(">i", read(4))[0]        return MyObj(i)env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

Tuples/Lists

你可以使用元组(或列表)来表示复杂类型。Python中的元组可以转换为Flink中的Tuple类型,它们包含数量固定的不同类型的域(最多25个)。每个域的元组可以是基本数据类型,也可以是其他的元组类型,从而形成嵌套元组类型。

word_counts = env.from_elements(("hello", 1), ("world",2))counts = word_counts.map(lambda x: x[1])

当进行一些要求指定关键字的操作时,例如对数据记录进行分组或配对。通过设定关键字,可以非常便捷地指定元组中各个域的位置。你可以指定多个位置,从而实现复合关键字(更多信息,查阅)。

wordCounts \    .group_by(0) \    .reduce(MyReduceFunction())

数据源

数据源创建了初始的数据集,包括来自文件,以及来自数据接口/集合两种方式。

基于文件的:

  • read_text(path) – 按行读取文件,并将每一行以String形式返回。
  • read_csv(path,type) – 解析以逗号(或其他字符)划分数据域的文件。
    返回一个包含若干元组的数据集。支持基本的java数据类型作为字段类型。

基于数据集合的:

  • from_elements(*args) – 基于一系列数据创建一个数据集,包含所有元素。
  • generate_sequence(from, to) – 按照指定的间隔,生成一系列数据。

Examples

env  = get_environment\# read text file from local files systemlocalLiens = env.read_text("file:#/path/to/my/textfile")\# read text file from a HDFS running at nnHost:nnPorthdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")\# read a CSV file with three fields, schema defined using constants defined in flink.plan.ConstantscsvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))\# create a set from some given elementsvalues = env.from_elements("Foo", "bar", "foobar", "fubar")\# generate a number sequencenumbers = env.generate_sequence(1, 10000000)

 

数据池

数据池可以接收数据集,并被用来存储或返回它们:

  • write_text() – 按行以String形式写入数据。可通过对每个数据项调用str()函数获取String。
  • write_csv(…) – 将元组写入逗号分隔数值文件。行数和数据字段均可配置。每个字段的值可通过对数据项调用str()方法得到。
  • output() – 在标准输出上打印每个数据项的str()字符串。

一个数据集可以同时作为多个操作的输入数据。程序可以在写入或打印一个数据集的同时,对其进行其他的变换操作。

Examples

标准数据池相关方法示例如下:

write DataSet to a file on the local file systemtextData.write_text("file:///my/result/on/localFS") write DataSet to a file on a HDFS with a namenode running at nnHost:nnPorttextData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS") write DataSet to a file and overwrite the file if it existstextData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE) tuples as lines with pipe as the separator "a|b|c"values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|") this writes tuples in the text formatting "(a, b, c)", rather than as CSV linesvalues.write_text("file:///path/to/the/result/file")

 

广播变量

使用广播变量,能够在使用普通输入参数的基础上,使得一个数据集同时被多个并行的操作所使用。这对于实现辅助数据集,或者是基于数据的参数化法非常有用。这样,数据集就可以以集合的形式被访问。

  • 注册广播变量:广播数据集可通过调用with_broadcast_set(DataSet,String)函数,按照名字注册广播变量。
  • 访问广播变量:通过对调用self.context.get_broadcast_variable(String)可获取广播变量。
class MapperBcv(MapFunction):    def map(self, value):        factor = self.context.get_broadcast_variable("bcv")[0][0]        return value * factor# 1. The DataSet to be broadcastedtoBroadcast = env.from_elements(1, 2, 3)data = env.from_elements("a", "b")# 2. Broadcast the DataSetdata.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

确保在进行广播变量的注册和访问时,应当采用相同的名字(示例中的”bcv”)。

注意:由于广播变量的内容被保存在每个节点的内部存储中,不适合包含过多内容。一些简单的参数,例如标量值,可简单地通过参数化rich function来实现。

并行执行

该章节将描述如何在Flink中配置程序的并行执行。一个Flink程序可以包含多个任务(操作,数据源和数据池)。一个任务可以被划分为多个可并行运行的部分,每个部分处理输入数据的一个子集。并行运行的实例数量被称作它的并行性或并行度degree of parallelism (DOP)。

在Flink中可以为任务指定不同等级的并行度。

运行环境级

Flink程序可在一个运行环境的上下文中运行。一个运行环境为其中运行的所有操作,数据源和数据池定义了一个默认的并行度。运行环境的并行度可通过对某个操作的并行度进行配置来修改。

一个运行环境的并行度可通过调用set_parallelism()方法来指定。例如,为了将示例程序中的所有操作,数据源和数据池的并行度设置为3,可以通过如下方式设置运行环境的默认并行度。

env = get_environment()env.set_parallelism(3)text.flat_map(lambda x,c: x.lower().split()) \    .group_by(1) \    .reduce_group(Adder(), combinable=True) \    .output()env.execute()

系统级

通过设置位于./conf/flink-conf.yaml.文件的parallelism.default属性,改变系统级的默认并行度,可设置所有运行环境的默认并行度。具体细节可查阅文档。

执行方法

为了在Flink中运行计划任务,到Flink目录下,运行/bin文件夹下的pyflink.sh脚本。对于python2.7版本,运行pyflink2.sh;对于python3.4版本,运行pyflink3.sh。包含计划任务的脚本应当作为第一个输入参数,其后可添加一些另外的python包,最后,在“-”之后,输入其他附加参数。

./bin/pyflink<2/3>.sh 

转载自

你可能感兴趣的文章
使用VisualStudio2010连接CodePlex进行代码管理
查看>>
NPOI读写Excel
查看>>
rails应用ajax之二:使用rails自身支持
查看>>
设计模式(七)组合模式Composite(结构型)
查看>>
Qt之自定义搜索框
查看>>
程序员的量化交易之路(25)--Cointrader之MarketData市场数据实体(12)
查看>>
使用 CAS 在 Tomcat 中实现单点登录
查看>>
Podfile 常见语法
查看>>
【原】YUI压缩与CSS media queries下的bug
查看>>
[AWK]使用AWK进行分割字符串以及截取字符串
查看>>
SiteMesh介绍
查看>>
form实现登陆操作
查看>>
SpriteBuilder中如何平均拉伸精灵帧动画的距离
查看>>
poj1330Nearest Common Ancestors 1470 Closest Common Ancestors(LCA算法)
查看>>
dojo从asp.net中获取json数据
查看>>
Android:problem opening wizard the selected wizard could not be started
查看>>
PostgreSQL md5 auth method introduce, with random salt protect
查看>>
【spring框架】spring整合hibernate初步
查看>>
JVM调优总结
查看>>
PostgreSQL 9.3 beta2 stream replication primary standby switchover bug?
查看>>