3737from cachetools .keys import hashkey
3838from pydantic_core import to_json
3939
40+ from pyiceberg .avro .codecs import AVRO_CODEC_KEY , AvroCompressionCodec
4041from pyiceberg .avro .file import AvroFile , AvroOutputFile
4142from pyiceberg .conversions import to_bytes
4243from pyiceberg .exceptions import ValidationError
@@ -946,9 +947,16 @@ class ManifestWriter(ABC):
946947 _deleted_rows : int
947948 _min_sequence_number : Optional [int ]
948949 _partitions : List [Record ]
949- _reused_entry_wrapper : ManifestEntry
950+ _compression : AvroCompressionCodec
950951
951- def __init__ (self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int ) -> None :
952+ def __init__ (
953+ self ,
954+ spec : PartitionSpec ,
955+ schema : Schema ,
956+ output_file : OutputFile ,
957+ snapshot_id : int ,
958+ avro_compression : AvroCompressionCodec ,
959+ ) -> None :
952960 self .closed = False
953961 self ._spec = spec
954962 self ._schema = schema
@@ -963,6 +971,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
963971 self ._deleted_rows = 0
964972 self ._min_sequence_number = None
965973 self ._partitions = []
974+ self ._compression = avro_compression
966975
967976 def __enter__ (self ) -> ManifestWriter :
968977 """Open the writer."""
@@ -998,6 +1007,7 @@ def _meta(self) -> Dict[str, str]:
9981007 "partition-spec" : to_json (self ._spec .fields ).decode ("utf-8" ),
9991008 "partition-spec-id" : str (self ._spec .spec_id ),
10001009 "format-version" : str (self .version ),
1010+ AVRO_CODEC_KEY : self ._compression ,
10011011 }
10021012
10031013 def _with_partition (self , format_version : TableVersion ) -> Schema :
@@ -1109,13 +1119,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
11091119
11101120
11111121class ManifestWriterV1 (ManifestWriter ):
1112- def __init__ (self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int ):
1113- super ().__init__ (
1114- spec ,
1115- schema ,
1116- output_file ,
1117- snapshot_id ,
1118- )
1122+ def __init__ (
1123+ self ,
1124+ spec : PartitionSpec ,
1125+ schema : Schema ,
1126+ output_file : OutputFile ,
1127+ snapshot_id : int ,
1128+ avro_compression : AvroCompressionCodec ,
1129+ ):
1130+ super ().__init__ (spec , schema , output_file , snapshot_id , avro_compression )
11191131
11201132 def content (self ) -> ManifestContent :
11211133 return ManifestContent .DATA
@@ -1129,8 +1141,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11291141
11301142
11311143class ManifestWriterV2 (ManifestWriter ):
1132- def __init__ (self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int ):
1133- super ().__init__ (spec , schema , output_file , snapshot_id )
1144+ def __init__ (
1145+ self ,
1146+ spec : PartitionSpec ,
1147+ schema : Schema ,
1148+ output_file : OutputFile ,
1149+ snapshot_id : int ,
1150+ avro_compression : AvroCompressionCodec ,
1151+ ):
1152+ super ().__init__ (spec , schema , output_file , snapshot_id , avro_compression )
11341153
11351154 def content (self ) -> ManifestContent :
11361155 return ManifestContent .DATA
@@ -1156,12 +1175,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11561175
11571176
11581177def write_manifest (
1159- format_version : TableVersion , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int
1178+ format_version : TableVersion ,
1179+ spec : PartitionSpec ,
1180+ schema : Schema ,
1181+ output_file : OutputFile ,
1182+ snapshot_id : int ,
1183+ avro_compression : AvroCompressionCodec ,
11601184) -> ManifestWriter :
11611185 if format_version == 1 :
1162- return ManifestWriterV1 (spec , schema , output_file , snapshot_id )
1186+ return ManifestWriterV1 (spec , schema , output_file , snapshot_id , avro_compression )
11631187 elif format_version == 2 :
1164- return ManifestWriterV2 (spec , schema , output_file , snapshot_id )
1188+ return ManifestWriterV2 (spec , schema , output_file , snapshot_id , avro_compression )
11651189 else :
11661190 raise ValueError (f"Cannot write manifest for table version: { format_version } " )
11671191
@@ -1211,14 +1235,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite
12111235
12121236
12131237class ManifestListWriterV1 (ManifestListWriter ):
1214- def __init__ (self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ]):
1238+ def __init__ (
1239+ self ,
1240+ output_file : OutputFile ,
1241+ snapshot_id : int ,
1242+ parent_snapshot_id : Optional [int ],
1243+ compression : AvroCompressionCodec ,
1244+ ):
12151245 super ().__init__ (
12161246 format_version = 1 ,
12171247 output_file = output_file ,
12181248 meta = {
12191249 "snapshot-id" : str (snapshot_id ),
12201250 "parent-snapshot-id" : str (parent_snapshot_id ) if parent_snapshot_id is not None else "null" ,
12211251 "format-version" : "1" ,
1252+ AVRO_CODEC_KEY : compression ,
12221253 },
12231254 )
12241255
@@ -1232,7 +1263,14 @@ class ManifestListWriterV2(ManifestListWriter):
12321263 _commit_snapshot_id : int
12331264 _sequence_number : int
12341265
1235- def __init__ (self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int ):
1266+ def __init__ (
1267+ self ,
1268+ output_file : OutputFile ,
1269+ snapshot_id : int ,
1270+ parent_snapshot_id : Optional [int ],
1271+ sequence_number : int ,
1272+ compression : AvroCompressionCodec ,
1273+ ):
12361274 super ().__init__ (
12371275 format_version = 2 ,
12381276 output_file = output_file ,
@@ -1241,6 +1279,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
12411279 "parent-snapshot-id" : str (parent_snapshot_id ) if parent_snapshot_id is not None else "null" ,
12421280 "sequence-number" : str (sequence_number ),
12431281 "format-version" : "2" ,
1282+ AVRO_CODEC_KEY : compression ,
12441283 },
12451284 )
12461285 self ._commit_snapshot_id = snapshot_id
@@ -1275,12 +1314,13 @@ def write_manifest_list(
12751314 snapshot_id : int ,
12761315 parent_snapshot_id : Optional [int ],
12771316 sequence_number : Optional [int ],
1317+ avro_compression : AvroCompressionCodec ,
12781318) -> ManifestListWriter :
12791319 if format_version == 1 :
1280- return ManifestListWriterV1 (output_file , snapshot_id , parent_snapshot_id )
1320+ return ManifestListWriterV1 (output_file , snapshot_id , parent_snapshot_id , avro_compression )
12811321 elif format_version == 2 :
12821322 if sequence_number is None :
12831323 raise ValueError (f"Sequence-number is required for V2 tables: { sequence_number } " )
1284- return ManifestListWriterV2 (output_file , snapshot_id , parent_snapshot_id , sequence_number )
1324+ return ManifestListWriterV2 (output_file , snapshot_id , parent_snapshot_id , sequence_number , avro_compression )
12851325 else :
12861326 raise ValueError (f"Cannot write manifest list for table version: { format_version } " )
0 commit comments