CAGED Source¶
Source¶
- class publicbr.caged._source.CagedSource(spark_session: pyspark.sql.session.SparkSession, file_dir: str)¶
Class used to extract Caged data.
- Parameters
spark_session (pyspark.sql.SparkSession) – Spark Session used to manipulate data
file_dir (str) – Root directory where the data will be saved
- spark¶
Spark session used in data manipulation
- Type
pyspark.sql.SparkSession
- raw_dir¶
Path to the diectory used to store raw data
- Type
str
- trusted_dir¶
Path to the diectory used to store cleaned data
- Type
str
- ftp_folders¶
Dict with the correct folders in the FTP server for each CAGED level
- Type
Dict[str]
- cleaner¶
Object used to consolidate table
- Type
- create(**kwargs)¶
Wrapper for method execution.
- Parameters
**kwargs –
- modestr
Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error
- n_partitionsint
Number of DataFrame partitions
- partition_colstr
Column to partition DataFrame on writing
- key :
Other options passed to DataFrameWriter.options
- Returns
returns an instance of the object
- Return type
self
- extract()¶
Extract data from public CAGED data source, using the CagedCrawler.
- Returns
returns an instance of the object
- Return type
self
- transform(**kwargs)¶
Transform raw data extracted from public CAGED data source, using the CagedCleaner.
- Parameters
**kwargs –
- modestr
Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error
- n_partitionsint
Number of DataFrame partitions
- partition_colstr
Column to partition DataFrame on writing
- key :
Other options passed to DataFrameWriter.options
- Returns
returns an instance of the object
- Return type
self
Crawler¶
- class publicbr.caged._crawler.CagedCrawler(save_dir)¶
Class used to extract CAGED data from the public FTP server.
- Parameters
save_dir (str) – Path to where the downloaded data should be stored. It creates a directory if it does not exists already.
- host¶
Host of the public FTP server.
- Type
str
- save_dir¶
Path to where the consolidated data should be stored
- Type
str
- ftp_folders¶
Dict with the correct folders in the FTP server for each CAGED level
- Type
Dict[str]
- aux_list¶
Auxiliary list used to extract file information from the FTP server
- Type
List[str]
- ftp¶
Object used to access the FTP server.
- Type
ftplib.FTP
- layout_file¶
File containing the layout of the CAGED table columns and auxiliary tables
- Type
List[str]
- available_aux¶
List containing the available auxiliary tables for a given CAGED level
- Type
List[str]
- callback_save(x) None¶
Callback function used to save file info from the FTP server
- Parameters
x (str) – File info
- Returns
returns an instance of the object
- Return type
self
- change_ftp_dir(folder) None¶
Changes the current directory in the FTP server connection based on the folder
- Parameters
folder (str) – Folder name in the FTP server
- Returns
returns an instance of the object
- Return type
self
- create_connection() None¶
Creates a connection with the remote FTP server
- Returns
returns an instance of the object
- Return type
self
- get_data(dir, files) None¶
Downloads data from the FTP server
- Parameters
dir (str) – Directory where the files should be written
files (List[str]) – List of files to be downloaded from the FTP server
- Returns
returns an instance of the object
- Return type
self
- get_download_files() List[str]¶
Gets all the files to be downloaded from the FTP connection
- Returns
returns a list with the file names
- Return type
List[str]
- get_file_info() List[List[str]]¶
Gets information of all the files available in the current directory in the FTP connection
- Returns
returns a list with information on each file. Each element of the list is a list that contains information on a file.
- Return type
List[List[str]]
- get_layout_file() None¶
Gets the layout file for a given CAGED level and its available auxiliary tables
- Returns
returns an instance of the object
- Return type
self
- get_most_recent_file() str¶
Gets the most recent file from those available in the current directory in the FTP connection
- Returns
returns the name of the most recent file
- Return type
str
- run(caged_level) None¶
Wrapper for method execution.
- Parameters
caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered
- Returns
returns an instance of the object
- Return type
self
- save_aux_tables() None¶
Saves the available auxiliary tables
- Returns
returns an instance of the object
- Return type
self
- unzip_files(files) None¶
Unzips files downloaded from the FTP connection
- Parameters
files (List[str]) – List of files to be unzipped
- Returns
returns an instance of the object
- Return type
self
Consolidation¶
- class publicbr.caged._consolidation.CagedCleaner(spark_session, file_dir, save_dir)¶
Class used to clean the Movimentações and Estabelecimentos CAGED tables, which contains individual and per establishment data about layoffs and admissions, respectively.
- Parameters
spark_session (pyspark.sql.SparkSession) – Spark Session used to manipulate data
file_dir (str) – Path to where the raw data is stored.
save_dir (str) – Path to where the consolidated data should be stored
- spark¶
Spark session used in data manipulation
- Type
pyspark.sql.SparkSession
- file_dir¶
Path to where the raw data is stored.
- Type
str
- save_dir¶
Path to where the consolidated data should be stored
- Type
str
- caged_levels¶
The strings used to identify the CAGED tables
- Type
List[str]
- file_name¶
The final file names of CAGED tables after consolidation
- Type
Dict[str]
- clean(mode: str = 'error', **kwargs) None¶
Wrapper for method execution.
- Parameters
mode (str) – Specify the mode of writing data, if data already exist in the designed path * append: Append the contents of the DataFrame to the existing data * overwrite: Overwrite existing data * ignore: Silently ignores this operation * error or errorifexists (default): Raises an error
**kwargs –
- n_partitionsint
Number of DataFrame partitions
- partition_colstr
Column to partition DataFrame on writing
- key :
Other options passed to DataFrameWriter.options
- Returns
returns an instance of the object
- Return type
self
- define_schema() None¶
Abstract method that is implemented in classes that inherit it
- join_aux_tables(df, caged_level) pyspark.sql.dataframe.DataFrame¶
Joins the base CAGED DataFrames and the auxiliary tables extract from the Layout document.
- Parameters
df (pyspark.sql.dataframe.DataFrame) – Spark DataFrame with data to be joined
caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered
- Returns
returns a DataFrame joined with auxiliary tables
- Return type
pyspark.sql.dataframe.Dataframe
- transform_data(df, caged_level) pyspark.sql.dataframe.DataFrame¶
Performs the necessary transformations to clean the raw data.
- Parameters
df (pyspark.sql.dataframe.DataFrame) – Spark DataFrame with data to be cleaned
caged_level (str) – Type of CAGED data. Possibles values are: * mov: Data of individual admissions and layoffs in the period considered * estab: Data of admissions and layoffs by company in the period considered
- Returns
returns the final cleaned DataFrame
- Return type
pyspark.sql.dataframe.Dataframe