Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,21 @@ if [ -f ./flink-python/dev/lint-python.sh ]; then
# Just completely ignore sudo in conda.
unset SUDO_UID SUDO_GID SUDO_USER

# build python docs
# Set base URL for cross-references to main Flink docs (used by sphinx extlinks)
export FLINK_DOCS_BASE_URL="https://nightlies.apache.org/flink/flink-docs-${BRANCH}"

# build English python docs
# disable the gateway, because otherwise it tries to find FLINK_HOME to access Java classes
PYFLINK_GATEWAY_DISABLED=1 ./flink-python/dev/lint-python.sh -i "sphinx"

# move python docs
# build Chinese python docs into _build/html/zh/ subdirectory
UV_HOME="${FLINK_UV_HOME:-./flink-python/dev/.uv}"
pushd flink-python/docs
source "$UV_HOME/bin/activate"
make zh
deactivate
popd

# move python docs (English at root, Chinese at zh/)
mv flink-python/docs/_build/html docs/target/api/python
fi
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ flink-python/pyflink.egg-info/
flink-python/apache_flink.egg-info/
flink-python/apache-flink-libraries/apache_flink_libraries.egg-info/
flink-python/docs/_build
flink-python/docs/locales/**/*.mo
flink-python/docs/locales/**/LC_MESSAGES/*.po
flink-python/docs/locales/**/reference/
flink-python/docs/locales/**/examples/
flink-python/docs/reference/pyflink.*/api/
flink-python/.tox/
flink-python/dev/download
flink-python/dev/.conda/
Expand Down
186 changes: 3 additions & 183 deletions docs/content.zh/docs/deployment/repls/python_shell.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
---
title: "Python REPL"
weight: 7
type: docs
layout: redirect
redirect_to: /api/python/user_guide/python_repl.html
bookHidden: true
aliases:
- /zh/deployment/repls/python_shell.html
- /zh/apis/python_shell.html
Expand All @@ -25,184 +26,3 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Python REPL

Flink附带了一个集成的交互式Python Shell。
它既能够运行在本地启动的local模式,也能够运行在集群启动的cluster模式下。
本地安装Flink,请看[本地安装]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})页面。
您也可以从源码安装Flink,请看[从源码构建 Flink]({{< github_repo >}}#building-apache-flink-from-source)页面。

<span class="label label-info">注意</span> Python Shell会调用"python"命令。关于 PyFlink 安装说明,请参考[第一步指南]({{< ref "docs/getting-started/local_installation" >}})。

你可以通过PyPi安装PyFlink,然后使用Python Shell:

```bash
# 安装 PyFlink
$ python -m pip install apache-flink
# 执行脚本
$ pyflink-shell.sh local
```

关于如何在一个Cluster集群上运行Python shell,可以参考启动章节介绍。

## 使用

当前Python shell支持Table API的功能。
在启动之后,Table Environment的相关内容将会被自动加载。
可以通过变量"bt_env"来使用BatchTableEnvironment,通过变量"st_env"来使用StreamTableEnvironment。

### Table API

下面是一个通过Python Shell 运行的简单示例:
{{< tabs "7207dd60-97bf-461b-b2a5-fcc3dea507c6" >}}
{{< tab "stream" >}}
```python
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> s_env.set_parallelism(1)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
... .schema(Schema.new_builder()
... .column("a", DataTypes.BIGINT())
... .column("b", DataTypes.STRING())
... .column("c", DataTypes.STRING())
... .build())
... .option("path", path)
... .format(FormatDescriptor.for_format("csv")
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("stream_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
... print(f.read())
```
{{< /tab >}}
{{< tab "batch" >}}
```python
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> b_env.set_parallelism(1)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
... .schema(Schema.new_builder()
... .column("a", DataTypes.BIGINT())
... .column("b", DataTypes.STRING())
... .column("c", DataTypes.STRING())
... .build())
... .option("path", path)
... .format(FormatDescriptor.for_format("csv")
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("batch_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
... print(f.read())
```
{{< /tab >}}
{{< /tabs >}}

## 启动

查看Python Shell提供的可选参数,可以使用:

```bash
pyflink-shell.sh --help
```

### Local

Python Shell运行在local模式下,只需要执行:

```bash
pyflink-shell.sh local
```


### Remote

Python Shell运行在一个指定的JobManager上,通过关键字`remote`和对应的JobManager
的地址和端口号来进行指定:

```bash
pyflink-shell.sh remote <hostname> <portnumber>
```

### Yarn Python Shell cluster

Python Shell可以运行在YARN集群之上。Python shell在Yarn上部署一个新的Flink集群,并进行连接。除了指定container数量,你也
可以指定JobManager的内存,YARN应用的名字等参数。
例如,在一个部署了两个TaskManager的Yarn集群上运行Python Shell:

```bash
pyflink-shell.sh yarn -n 2
```

关于所有可选的参数,可以查看本页面底部的完整说明。


### Yarn Session

如果你已经通过Flink Yarn Session部署了一个Flink集群,能够通过以下的命令连接到这个集群:

```bash
pyflink-shell.sh yarn
```


## 完整的参考

```bash
Flink Python Shell
使用: pyflink-shell.sh [local|remote|yarn] [options] <args>...

命令: local [选项]
启动一个部署在local的Flink Python shell
使用:
-h,--help 查看所有可选的参数
命令: remote [选项] <host> <port>
启动一个部署在remote集群的Flink Python shell
<host>
JobManager的主机名
<port>
JobManager的端口号

使用:
-h,--help 查看所有可选的参数

命令: yarn [选项]
启动一个部署在Yarn集群的Flink Python Shell
使用:
-h,--help 查看所有可选的参数
-jm,--jobManagerMemory <arg> 具有可选单元的JobManager
的container的内存(默认值:MB)
-n,--container <arg> 需要分配的YARN container的
数量 (=TaskManager的数量)
-nm,--name <arg> 自定义YARN Application的名字
-qu,--queue <arg> 指定YARN的queue
-s,--slots <arg> 每个TaskManager上slots的数量
-tm,--taskManagerMemory <arg> 具有可选单元的每个TaskManager
的container的内存(默认值:MB)
-h | --help
打印输出使用文档
```

{{< top >}}
7 changes: 4 additions & 3 deletions docs/content.zh/docs/dev/datastream/python/_index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
---
title: Python DataStream API
bookCollapseSection: true
weight: 100
title: "Python DataStream API"
layout: redirect
redirect_to: /api/python/user_guide/datastream/
bookHidden: true
aliases:
- /zh/docs/dev/datastream/python/
---
Expand Down
125 changes: 3 additions & 122 deletions docs/content.zh/docs/dev/datastream/python/data_types.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
---
title: "Data Types"
weight: 25
type: docs
layout: redirect
redirect_to: /api/python/user_guide/datastream/data_types.html
bookHidden: true
aliases:
- /zh/dev/python/datastream-api-users-guide/data_types.html
---
Expand All @@ -23,123 +24,3 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Data Types

In Apache Flink's Python DataStream API, a data type describes the type of a value in the DataStream ecosystem.
It can be used to declare input and output types of operations and informs the system how to serialize elements.

## Pickle Serialization

If the type has not been declared, data would be serialized or deserialized using Pickle.
For example, the program below specifies no data types.

```python
from pyflink.datastream import StreamExecutionEnvironment


def processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
.map(lambda record: (record[0]+1, record[1].upper())) \
.print() # note: print to stdout on the worker machine

env.execute()


if __name__ == '__main__':
processing()
```

However, types need to be specified when:

- Passing Python records to Java operations.
- Improve serialization and deserialization performance.

### Passing Python records to Java operations

Since Java operators or functions can not identify Python data, types need to be provided to help to convert Python types to Java types for processing.
For example, types need to be provided if you want to output data using the FileSink which is implemented in Java.

```python
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink


def file_sink():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
.map(lambda record: (record[0] + 1, record[1].upper()),
output_type=Types.ROW([Types.INT(), Types.STRING()])) \
.add_sink(FileSink
.for_row_format('/tmp/output', Encoder.simple_string_encoder())
.build())

env.execute()


if __name__ == '__main__':
file_sink()

```

### Improve serialization and deserialization performance

Even though data can be serialized and deserialized through Pickle, performance will be better if types are provided.
Explicit types allow PyFlink to use efficient serializers when moving records through the pipeline.

## Supported Data Types

You can use `pyflink.common.typeinfo.Types` to define types in Python DataStream API.
The table below shows the types supported now and how to define them:

| PyFlink Type | Python Type | Java Type |
|:-----------------|:-----------------------|:-----------------------|
|`Types.BOOLEAN()` | `bool` | `java.lang.Boolean` |
|`Types.BYTE()` | `int` | `java.lang.Byte` |
|`Types.SHORT()` | `int` | `java.lang.Short` |
|`Types.INT()` | `int` | `java.lang.Integer` |
|`Types.LONG()` | `int` | `java.lang.Long` |
|`Types.FLOAT()` | `float` | `java.lang.Float` |
|`Types.DOUBLE()` | `float` | `java.lang.Double` |
|`Types.CHAR()` | `str` | `java.lang.Character` |
|`Types.STRING()` | `str` | `java.lang.String` |
|`Types.BIG_INT()` | `int` | `java.math.BigInteger` |
|`Types.BIG_DEC()` | `decimal.Decimal` | `java.math.BigDecimal` |
|`Types.INSTANT()` | `pyflink.common.time.Instant` | `java.time.Instant` |
|`Types.TUPLE()` | `tuple` | `org.apache.flink.api.java.tuple.Tuple0` ~ `org.apache.flink.api.java.tuple.Tuple25` |
|`Types.ROW()` | `pyflink.common.Row` | `org.apache.flink.types.Row` |
|`Types.ROW_NAMED()` | `pyflink.common.Row` | `org.apache.flink.types.Row` |
|`Types.MAP()` | `dict` | `java.util.Map` |
|`Types.PICKLED_BYTE_ARRAY()` | `The actual unpickled Python object` | `byte[]` |
|`Types.SQL_DATE()` | `datetime.date` | `java.sql.Date` |
|`Types.SQL_TIME()` | `datetime.time` | `java.sql.Time` |
|`Types.SQL_TIMESTAMP()` | `datetime.datetime` | `java.sql.Timestamp` |
|`Types.LIST()` | `list of Python object` | `java.util.List` |

The table below shows the array types supported:

| PyFlink Array Type | Python Type | Java Type |
|:-----------------|:-----------------------|:-----------------------|
|`Types.PRIMITIVE_ARRAY(Types.BYTE())` | `bytes` | `byte[]` |
|`Types.PRIMITIVE_ARRAY(Types.BOOLEAN())` | `list of bool` | `boolean[]` |
|`Types.PRIMITIVE_ARRAY(Types.SHORT())` | `list of int` | `short[]` |
|`Types.PRIMITIVE_ARRAY(Types.INT())` | `list of int` | `int[]` |
|`Types.PRIMITIVE_ARRAY(Types.LONG())` | `list of int` | `long[]` |
|`Types.PRIMITIVE_ARRAY(Types.FLOAT())` | `list of float` | `float[]` |
|`Types.PRIMITIVE_ARRAY(Types.DOUBLE())` | `list of float` | `double[]` |
|`Types.PRIMITIVE_ARRAY(Types.CHAR())` | `list of str` | `char[]` |
|`Types.BASIC_ARRAY(Types.BYTE())` | `list of int` | `java.lang.Byte[]` |
|`Types.BASIC_ARRAY(Types.BOOLEAN())` | `list of bool` | `java.lang.Boolean[]` |
|`Types.BASIC_ARRAY(Types.SHORT())` | `list of int` | `java.lang.Short[]` |
|`Types.BASIC_ARRAY(Types.INT())` | `list of int` | `java.lang.Integer[]` |
|`Types.BASIC_ARRAY(Types.LONG())` | `list of int` | `java.lang.Long[]` |
|`Types.BASIC_ARRAY(Types.FLOAT())` | `list of float` | `java.lang.Float[]` |
|`Types.BASIC_ARRAY(Types.DOUBLE())` | `list of float` | `java.lang.Double[]` |
|`Types.BASIC_ARRAY(Types.CHAR())` | `list of str` | `java.lang.Character[]` |
|`Types.BASIC_ARRAY(Types.STRING())` | `list of str` | `java.lang.String[]` |
|`Types.OBJECT_ARRAY()` | `list of Python object` | `Array` |
Loading