Skip to content

unitorch.cli.writer¤

GeneralParquetWriter¤

Tip

core/writer/parquet is the section for configuration of GeneralParquetWriter.

Bases: GenericWriter

Initialize GeneralParquetWriter.

Parameters:

Name Type Description Default
output_file str

The path to the output file.

required
nrows_per_sample int

The number of rows per sample. Defaults to None.

None
columns List[str]

The list of columns to include in the output file. Defaults to None.

None
schema str

The Parquet schema in string format. Defaults to None.

None
compression str

The compression algorithm to use. Defaults to "snappy".

'snappy'
Source code in src/unitorch/cli/writers/__init__.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def __init__(
    self,
    output_file: str,
    nrows_per_sample: Optional[int] = None,
    columns: Optional[List[str]] = None,
    schema: Optional[str] = None,
    compression: Optional[str] = "snappy",
):
    """
    Initialize GeneralParquetWriter.

    Args:
        output_file (str): The path to the output file.
        nrows_per_sample (int, optional): The number of rows per sample. Defaults to None.
        columns (List[str], optional): The list of columns to include in the output file. Defaults to None.
        schema (str, optional): The Parquet schema in string format. Defaults to None.
        compression (str, optional): The compression algorithm to use. Defaults to "snappy".
    """
    self.columns = columns
    self.skip_n_samples = 0
    self.output_file = output_file
    self.pq_writer = None
    self.pq_schema = None if schema is None else eval(schema)
    self.compression = compression

from_core_configure classmethod ¤

from_core_configure(config, **kwargs)

Create an instance of GeneralParquetWriter from a core configuration.

Parameters:

Name Type Description Default
config

The core configuration.

required
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
GeneralParquetWriter

An instance of GeneralParquetWriter.

Source code in src/unitorch/cli/writers/__init__.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@classmethod
@add_default_section_for_init("core/writer/parquet")
def from_core_configure(cls, config, **kwargs):
    """
    Create an instance of GeneralParquetWriter from a core configuration.

    Args:
        config: The core configuration.
        **kwargs: Additional keyword arguments.

    Returns:
        GeneralParquetWriter: An instance of GeneralParquetWriter.
    """
    pass

process_chunk ¤

process_chunk(outputs: WriterOutputs)

Process a chunk of data during the writing process.

Parameters:

Name Type Description Default
outputs WriterOutputs

The writer outputs.

required
Source code in src/unitorch/cli/writers/__init__.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def process_chunk(self, outputs: WriterOutputs):
    """
    Process a chunk of data during the writing process.

    Args:
        outputs (WriterOutputs): The writer outputs.
    """
    assert self.pq_writer is not None
    dataframe = outputs.to_pandas()
    if self.columns is not None:
        columns = set(dataframe.columns)
        dataframe = dataframe[[h for h in self.columns if h in columns]]
    pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
    self.pq_writer.write_table(pa_table)

process_end ¤

process_end()

Process the end of the writing process.

Source code in src/unitorch/cli/writers/__init__.py
289
290
291
def process_end(self):
    """Process the end of the writing process."""
    self.pq_writer.close()

process_start ¤

process_start(outputs: WriterOutputs)

Process the start of the writing process.

Parameters:

Name Type Description Default
outputs WriterOutputs

The writer outputs.

required
Source code in src/unitorch/cli/writers/__init__.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def process_start(self, outputs: WriterOutputs):
    """
    Process the start of the writing process.

    Args:
        outputs (WriterOutputs): The writer outputs.
    """
    dataframe = outputs.to_pandas()
    if self.columns is not None:
        columns = set(dataframe.columns)
        dataframe = dataframe[[h for h in self.columns if h in columns]]
    pa_table = pa.Table.from_pandas(dataframe)
    if self.pq_schema is None:
        self.pq_schema = pa_table.schema
    pa_table = pa.Table.from_pandas(dataframe, schema=self.pq_schema)
    self.pq_writer = pq.ParquetWriter(
        self.output_file,
        self.pq_schema,
        version="1.0",
        use_dictionary=False,
        flavor="spark",
        compression=self.compression,
        use_compliant_nested_type=True,
    )
    self.pq_writer.write_table(pa_table)