CAGED Source

Source

class publicbr.caged._source.CagedSource(spark_session: pyspark.sql.session.SparkSession, file_dir: str)

Class used to extract Caged data.

Parameters
  • spark_session (pyspark.sql.SparkSession) – Spark Session used to manipulate data

  • file_dir (str) – Root directory where the data will be saved

spark

Spark session used in data manipulation

Type

pyspark.sql.SparkSession

raw_dir

Path to the diectory used to store raw data

Type

str

trusted_dir

Path to the diectory used to store cleaned data

Type

str

ftp_folders

Dict with the correct folders in the FTP server for each CAGED level

Type

Dict[str]

cleaner

Object used to consolidate table

Type

CagedCleaner

create(**kwargs)

Wrapper for method execution.

Parameters

**kwargs

modestr

Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error

n_partitionsint

Number of DataFrame partitions

partition_colstr

Column to partition DataFrame on writing

key :

Other options passed to DataFrameWriter.options

Returns

returns an instance of the object

Return type

self

extract()

Extract data from public CAGED data source, using the CagedCrawler.

Returns

returns an instance of the object

Return type

self

transform(**kwargs)

Transform raw data extracted from public CAGED data source, using the CagedCleaner.

Parameters

**kwargs

modestr

Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error

n_partitionsint

Number of DataFrame partitions

partition_colstr

Column to partition DataFrame on writing

key :

Other options passed to DataFrameWriter.options

Returns

returns an instance of the object

Return type

self

Crawler

class publicbr.caged._crawler.CagedCrawler(save_dir)

Class used to extract CAGED data from the public FTP server.

Parameters

save_dir (str) – Path to where the downloaded data should be stored. It creates a directory if it does not exists already.

host

Host of the public FTP server.

Type

str

save_dir

Path to where the consolidated data should be stored

Type

str

ftp_folders

Dict with the correct folders in the FTP server for each CAGED level

Type

Dict[str]

aux_list

Auxiliary list used to extract file information from the FTP server

Type

List[str]

ftp

Object used to access the FTP server.

Type

ftplib.FTP

layout_file

File containing the layout of the CAGED table columns and auxiliary tables

Type

List[str]

available_aux

List containing the available auxiliary tables for a given CAGED level

Type

List[str]

callback_save(x) None

Callback function used to save file info from the FTP server

Parameters

x (str) – File info

Returns

returns an instance of the object

Return type

self

change_ftp_dir(folder) None

Changes the current directory in the FTP server connection based on the folder

Parameters

folder (str) – Folder name in the FTP server

Returns

returns an instance of the object

Return type

self

create_connection() None

Creates a connection with the remote FTP server

Returns

returns an instance of the object

Return type

self

get_data(dir, files) None

Downloads data from the FTP server

Parameters
  • dir (str) – Directory where the files should be written

  • files (List[str]) – List of files to be downloaded from the FTP server

Returns

returns an instance of the object

Return type

self

get_download_files() List[str]

Gets all the files to be downloaded from the FTP connection

Returns

returns a list with the file names

Return type

List[str]

get_file_info() List[List[str]]

Gets information of all the files available in the current directory in the FTP connection

Returns

returns a list with information on each file. Each element of the list is a list that contains information on a file.

Return type

List[List[str]]

get_layout_file() None

Gets the layout file for a given CAGED level and its available auxiliary tables

Returns

returns an instance of the object

Return type

self

get_most_recent_file() str

Gets the most recent file from those available in the current directory in the FTP connection

Returns

returns the name of the most recent file

Return type

str

run(caged_level) None

Wrapper for method execution.

Parameters

caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered

Returns

returns an instance of the object

Return type

self

save_aux_tables() None

Saves the available auxiliary tables

Returns

returns an instance of the object

Return type

self

unzip_files(files) None

Unzips files downloaded from the FTP connection

Parameters

files (List[str]) – List of files to be unzipped

Returns

returns an instance of the object

Return type

self

Consolidation

class publicbr.caged._consolidation.CagedCleaner(spark_session, file_dir, save_dir)

Class used to clean the Movimentações and Estabelecimentos CAGED tables, which contains individual and per establishment data about layoffs and admissions, respectively.

Parameters
  • spark_session (pyspark.sql.SparkSession) – Spark Session used to manipulate data

  • file_dir (str) – Path to where the raw data is stored.

  • save_dir (str) – Path to where the consolidated data should be stored

spark

Spark session used in data manipulation

Type

pyspark.sql.SparkSession

file_dir

Path to where the raw data is stored.

Type

str

save_dir

Path to where the consolidated data should be stored

Type

str

caged_levels

The strings used to identify the CAGED tables

Type

List[str]

file_name

The final file names of CAGED tables after consolidation

Type

Dict[str]

clean(mode: str = 'error', **kwargs) None

Wrapper for method execution.

Parameters
  • mode (str) – Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error

  • **kwargs

    n_partitionsint

    Number of DataFrame partitions

    partition_colstr

    Column to partition DataFrame on writing

    key :

    Other options passed to DataFrameWriter.options

Returns

returns an instance of the object

Return type

self

define_schema() None

Abstract method that is implemented in classes that inherit it

join_aux_tables(df, caged_level) pyspark.sql.dataframe.DataFrame

Joins the base CAGED DataFrames and the auxiliary tables extract from the Layout document.

Parameters
  • df (pyspark.sql.dataframe.DataFrame) – Spark DataFrame with data to be joined

  • caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered

Returns

returns a DataFrame joined with auxiliary tables

Return type

pyspark.sql.dataframe.Dataframe

transform_data(df, caged_level) pyspark.sql.dataframe.DataFrame

Performs the necessary transformations to clean the raw data.

Parameters
  • df (pyspark.sql.dataframe.DataFrame) – Spark DataFrame with data to be cleaned

  • caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered

Returns

returns the final cleaned DataFrame

Return type

pyspark.sql.dataframe.Dataframe