Skip to content

unitorch.cli.writer¤

GeneralParquetWriter¤

Tip

core/writer/parquet is the section for configuration of GeneralParquetWriter.

Bases: GenericWriter

Write outputs to a Parquet file.

Source code in src/unitorch/cli/writers/__init__.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    output_file: str,
    nrows_per_sample: Optional[int] = None,
    columns: Optional[List[str]] = None,
    schema: Optional[str] = None,
    compression: Optional[str] = "snappy",
):
    self.columns = columns
    self.skip_n_samples = 0
    self.output_file = output_file
    self.pq_writer = None
    self.pq_schema = None if schema is None else eval(schema)
    self.compression = compression

columns instance-attribute ¤

columns = columns

skip_n_samples instance-attribute ¤

skip_n_samples = 0

output_file instance-attribute ¤

output_file = output_file

pq_writer instance-attribute ¤

pq_writer = None

pq_schema instance-attribute ¤

pq_schema = None if schema is None else eval(schema)

compression instance-attribute ¤

compression = compression

from_config classmethod ¤

from_config(config, **kwargs)
Source code in src/unitorch/cli/writers/__init__.py
156
157
158
159
@classmethod
@config_defaults_init("core/writer/parquet")
def from_config(cls, config, **kwargs):
    pass

process_start ¤

process_start(outputs: WriterOutputs)
Source code in src/unitorch/cli/writers/__init__.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def process_start(self, outputs: "WriterOutputs"):
    dataframe = outputs.to_pandas()
    if self.columns is not None:
        cols = set(dataframe.columns)
        dataframe = dataframe[[c for c in self.columns if c in cols]]
    pa_table = pa.Table.from_pandas(dataframe)
    if self.pq_schema is None:
        self.pq_schema = pa_table.schema
    pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
    self.pq_writer = pq.ParquetWriter(
        self.output_file,
        self.pq_schema,
        version="1.0",
        use_dictionary=False,
        flavor="spark",
        compression=self.compression,
        use_compliant_nested_type=True,
    )
    self.pq_writer.write_table(pa_table)

process_end ¤

process_end()
Source code in src/unitorch/cli/writers/__init__.py
181
182
def process_end(self):
    self.pq_writer.close()

process_chunk ¤

process_chunk(outputs: WriterOutputs)
Source code in src/unitorch/cli/writers/__init__.py
184
185
186
187
188
189
190
191
def process_chunk(self, outputs: "WriterOutputs"):
    assert self.pq_writer is not None
    dataframe = outputs.to_pandas()
    if self.columns is not None:
        cols = set(dataframe.columns)
        dataframe = dataframe[[c for c in self.columns if c in cols]]
    pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
    self.pq_writer.write_table(pa_table)