Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
"pygsheets==2.0.*",
"pysftp==0.2.9",
"setuptools==80.*",
"paramiko>=3.0,<4.0",
"SQLAlchemy>=1.4,<2.1",
# Temporary pin to override pysftp's dependency resolution.
# TODO: Migrate away from pysftp to use paramiko directly. See https://github.com/agrc/palletjack/issues/123
"paramiko<4.0.0",
],
extras_require={
"tests": [
Expand Down
123 changes: 80 additions & 43 deletions src/palletjack/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import re
import time
import warnings
from contextlib import contextmanager
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
Expand All @@ -20,7 +21,7 @@
import arcgis
import geopandas as gpd
import pandas as pd
import pysftp
import paramiko
import requests
import sqlalchemy
import ujson
Expand Down Expand Up @@ -133,7 +134,7 @@ def __init__(self, out_dir):
self._class_logger.debug("Initializing GoogleDriveDownloader")
self._class_logger.debug("Output directory: %s", out_dir)
self.out_dir = Path(out_dir)
regex_pattern = "(\/|=)([-\w]{25,})" # pylint:disable=anomalous-backslash-in-string
regex_pattern = r"(/|=)([-\w]{25,})"
self._class_logger.debug("Regex pattern: %s", regex_pattern)
self.regex = re.compile(regex_pattern)

Expand Down Expand Up @@ -191,7 +192,7 @@ def _get_filename_from_response(response):
"""

content = response.headers["Content-Disposition"]
all_filenames = re.findall("filename\*?=([^;]+)", content, flags=re.IGNORECASE) # pylint:disable=anomalous-backslash-in-string
all_filenames = re.findall(r"filename\*?=([^;]+)", content, flags=re.IGNORECASE)
if all_filenames:
#: Remove spurious whitespace and "s
return all_filenames[0].strip().strip('"')
Expand Down Expand Up @@ -389,78 +390,114 @@ def download_attachments_from_dataframe_using_api(
class SFTPLoader:
"""Loads data from an SFTP share into a pandas DataFrame"""

def __init__(self, host, username, password, knownhosts_file, download_dir):
def __init__(self, host, username, password, download_dir):
"""
Args:
host (str): The SFTP host to connect to
username (str): SFTP username
password (str): SFTP password
knownhosts_file (str): Path to a known_hosts file for pysftp.CnOpts. Can be generated via ssh-keyscan.
download_dir (str or Path): Directory to save downloaded files
"""

self.host = host
self.username = username
self.password = password
self.knownhosts_file = knownhosts_file
self.download_dir = download_dir
self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)

def download_sftp_folder_contents(self, sftp_folder="upload"):
"""Download all files in sftp_folder to the SFTPLoader's download_dir
@contextmanager
def _sftp_connection(self):
"""Context manager for SFTP connections that ensures proper cleanup.

Yields:
paramiko.SFTPClient: An active SFTP client
"""
transport = None
sftp = None
try:
transport = paramiko.Transport((self.host, 22))
transport.connect(username=self.username, password=self.password)
sftp = paramiko.SFTPClient.from_transport(transport)
yield sftp
finally:
if sftp:
sftp.close()
if transport:
transport.close()

def download_sftp_folder_contents(self, remote_directory, overwrite_existing_files=True):
"""Download all files in remote_directory to the SFTPLoader's download_dir

Args:
sftp_folder (str, optional): Path of remote folder, relative to sftp home directory. Defaults to 'upload'.
remote_directory (str, optional): Absolute path to remote_directory on the SFTP server
overwrite_existing_files (bool, optional): If False, raise an error if a file already exists
in download_dir. Defaults to True.

Raises:
FileExistsError: If overwrite_existing_files is False and a file already exists
FileNotFoundError: If the remote directory or file is not found
ValueError: If no files were downloaded
"""

self._class_logger.info("Downloading files from `%s:%s` to `%s`", self.host, sftp_folder, self.download_dir)
starting_file_count = len(list(self.download_dir.iterdir()))
if not remote_directory.endswith("/"):
remote_directory += "/"

self._class_logger.info(
"Downloading files from `%s:%s` to `%s`", self.host, remote_directory, self.download_dir
)
self._class_logger.debug("SFTP Username: %s", self.username)
connection_opts = pysftp.CnOpts(knownhosts=self.knownhosts_file)
with pysftp.Connection(
self.host, username=self.username, password=self.password, cnopts=connection_opts
) as sftp:

with self._sftp_connection() as sftp:
try:
sftp.get_d(sftp_folder, self.download_dir, preserve_mtime=True)
file_list = sftp.listdir(remote_directory)
except FileNotFoundError as error:
raise FileNotFoundError(f"Folder `{sftp_folder}` not found on SFTP server") from error
downloaded_file_count = len(list(self.download_dir.iterdir())) - starting_file_count
if not downloaded_file_count:
raise FileNotFoundError(f"Directory `{remote_directory}` not found on SFTP server") from error

if not file_list:
raise ValueError("No files to download from remote directory")

downloaded_files = []
with self._sftp_connection() as sftp:
for file_name in file_list:
outfile_path = self.download_dir / file_name

# Check if file exists and overwrite is disabled
if not overwrite_existing_files and outfile_path.exists():
raise FileExistsError(f"File `{outfile_path}` already exists and overwrite_existing_files is False")

try:
sftp.get(f"{remote_directory}{file_name}", outfile_path.as_posix())
downloaded_files.append(file_name)
except FileNotFoundError as error:
raise FileNotFoundError(f"File `{remote_directory}{file_name}` not found on SFTP server") from error

if not downloaded_files:
raise ValueError("No files downloaded")
return downloaded_file_count
return len(downloaded_files)

def download_sftp_single_file(self, filename, sftp_folder="upload"):
"""Download filename into SFTPLoader's download_dir
def download_sftp_single_file(self, remote_file):
"""Download remote_file into SFTPLoader's download_dir

Args:
filename (str): Filename to download; used as output filename as well.
sftp_folder (str, optional): Path of remote folder, relative to sftp home directory. Defaults to 'upload'.

remote_file (str): Remote file to download; see paramiko.SFTPClient.get() for path details
Raises:
FileNotFoundError: Will warn if pysftp can't find the file or folder on the sftp server
FileNotFoundError: If paramiko can't find remote_file on the sftp server

Returns:
Path: Downloaded file's path
"""

outfile = Path(self.download_dir, filename)

self._class_logger.info("Downloading %s from `%s:%s` to `%s`", filename, self.host, sftp_folder, outfile)
self._class_logger.info("Downloading %s from `%s` to `%s`", remote_file, self.host, self.download_dir)
self._class_logger.debug("SFTP Username: %s", self.username)
connection_opts = pysftp.CnOpts(knownhosts=self.knownhosts_file)
try:
with pysftp.Connection(
self.host,
username=self.username,
password=self.password,
cnopts=connection_opts,
default_path=sftp_folder,
) as sftp:
sftp.get(filename, localpath=outfile, preserve_mtime=True)
except FileNotFoundError as error:
raise FileNotFoundError(f"File `{filename}` or folder `{sftp_folder}`` not found on SFTP server") from error

return outfile

with self._sftp_connection() as sftp:
outfile_path = self.download_dir / Path(remote_file).name
try:
sftp.get(remote_file, outfile_path.as_posix())
except FileNotFoundError as error:
raise FileNotFoundError(f"File `{remote_file}` not found on SFTP server") from error

return outfile_path

def read_csv_into_dataframe(self, filename, column_types=None):
"""Read filename into a dataframe with optional column names and types
Expand Down
14 changes: 8 additions & 6 deletions src/palletjack/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,19 @@ def _upload_gdb(self, zipped_gdb_path: Path) -> Item:
raise RuntimeError(f"Error deleting existing gdb item with title {title}") from error

try:
gdb_item = utils.retry(
self.gis.content.add,
root_folder = self.gis.content.folders.get()
add_job = utils.retry(
root_folder.add,
item_properties={
"type": item_type,
"title": title,
"snippet": "Temporary gdb upload from palletjack",
},
data=zipped_gdb_path,
file=str(zipped_gdb_path),
)
except Exception as error:
raise RuntimeError(f"Error uploading {zipped_gdb_path} to AGOL") from error
return gdb_item
return add_job.result()

def _cleanup(self, gdb_item: Item | None = None, zipped_gdb_path: Path | None = None) -> None:
"""Remove the zipped gdb from disk and the gdb item from AGOL
Expand Down Expand Up @@ -573,7 +574,7 @@ def _create_attachment_action_df(self, attachment_eval_df, attachment_path_field
)

#: Overwrite if different names, add if no existing name, do nothing if names are the same
attachment_eval_df["operation"] = np.nan
attachment_eval_df["operation"] = pd.Series([np.nan] * len(attachment_eval_df), dtype="object", index=attachment_eval_df.index)
attachment_eval_df.loc[attachment_eval_df["NAME"] != attachment_eval_df["new_filename"], "operation"] = (
"overwrite"
)
Expand Down Expand Up @@ -747,7 +748,8 @@ def build_attachments_dataframe(input_dataframe, join_column, attachment_column,
pd.DataFrame: Dataframe with join key, attachment name, and full attachment paths
"""

input_dataframe[attachment_column].replace("", np.nan, inplace=True) #: pandas doesn't see empty strings as NAs
#: pandas doesn't see empty strings as NAs
input_dataframe[attachment_column] = input_dataframe[attachment_column].replace("", np.nan)
attachments_dataframe = (
input_dataframe[[join_column, attachment_column]].copy().dropna(subset=[attachment_column])
)
Expand Down
20 changes: 20 additions & 0 deletions src/palletjack/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def check_fields(cls, live_data_properties, new_dataframe, fields, add_oid):
field_checker.check_field_length(fields)
# field_checker.check_srs_wgs84()
field_checker.check_nullable_ints_shapely()
field_checker.check_for_np_inf()

def __init__(self, live_data_properties, new_dataframe):
"""
Expand Down Expand Up @@ -797,6 +798,25 @@ def check_nullable_ints_shapely(self):
f"{', '.join(columns_with_nulls)}"
)

def check_for_np_inf(self):
"""Raise a warning if any of the new dataframe's fields contain np.inf or -np.inf values.

Raises:
UserWarning: If any of the new dataframe's fields contain np.inf or -np.inf values.
"""

columns_with_inf = []
non_spatial_columns = [col for col in self.new_dataframe.columns if self.new_dataframe[col].dtype != "geometry"]
for column in non_spatial_columns:
if np.isinf(self.new_dataframe[column]).any():
columns_with_inf.append(column)

if columns_with_inf:
warnings.warn(
"The following columns have np.inf or -np.inf values, which may cause empty feature services: "
f"{', '.join(columns_with_inf)}"
)


def get_null_geometries(feature_layer_properties):
"""Generate placeholder geometries near 0, 0 with type based on provided feature layer properties dictionary.
Expand Down
Loading