Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ repos:
rev: v1.6.1
hooks:
- id: mypy
additional_dependencies:
- types-requests
1 change: 1 addition & 0 deletions netsuite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import constants # noqa
from .client import * # noqa
from .config import * # noqa
from .odbc import * # noqa
from .rest_api import * # noqa
from .restlet import * # noqa
from .soap_api import * # noqa
18 changes: 11 additions & 7 deletions netsuite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ class TokenAuth(BaseModel):


class UsernamePasswordAuth(BaseModel):
"""
This is a very old authentication method that is not recommended for use.

However, it's the only way to access the netsuite.com (NOT netsuite2.com) data source via ODBC.
"""

username: str
password: str
role: str | int


class Config(BaseModel):
Expand All @@ -36,13 +31,17 @@ class Config(BaseModel):

log_level: t.Optional[str] = None

# TODO ODBC is not yet fully supported, but this is the first step
odbc_data_source: t.Literal["NetSuite.com", "NetSuite2.com"] = "NetSuite.com"
odbc_driver: str = "{NetSuite Drivers 64bit}"

@property
def is_token_auth(self) -> bool:
return isinstance(self.auth, TokenAuth)

@property
def is_password_auth(self) -> bool:
return isinstance(self.auth, UsernamePasswordAuth)

@property
def is_sandbox(self) -> bool:
return re.search(r"_SB[\d]+$", self.account) is not None
Expand Down Expand Up @@ -86,6 +85,8 @@ def from_env(cls):
- `NETSUITE_TOKEN_SECRET`: The token secret for OAuth.
- `NETSUITE_USERNAME`: The username for login auth (only for odbc).
- `NETSUITE_PASSWORD`: The password for login auth (only for odbc).
- `NETSUITE_ODBC_DRIVER`: ODBC Driver for SuiteConnect
- `NETSUITE_ODBC_DATA_SOURCE`: Netsuite Server Data Source Name (NetSuite2.com)
- `NETSUITE_LOG_LEVEL`: log level for NetSuite debugging

Returns a dictionary of available config options.
Expand All @@ -100,6 +101,9 @@ def from_env(cls):
"token_secret",
"username",
"password",
"role",
"odbc_driver",
"odbc_data_source",
"log_level",
]
prefix = "NETSUITE_"
Expand Down
71 changes: 71 additions & 0 deletions netsuite/odbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import typing
from functools import cached_property

import pyodbc

from .config import Config


class NetSuiteODBC:
def __init__(self, config: Config):
if not config.is_password_auth:
raise RuntimeError(
"Only username/password auth is supported for ODBC connections."
)

self._config = config

@cached_property
def hostname(self) -> str:
return f"{self._config.account_slugified}.connect.api.netsuite.com"

@cached_property
def _connection_string(self) -> str:
return (
f"DRIVER={self._config.odbc_driver};"
f"Host={self.hostname};"
"Port=1708;"
"Encrypted=1;"
"AllowSinglePacketLogout=1;"
"Truststore=system;"
f"ServerDataSource={self._config.odbc_data_source};"
f"UID={self._config.auth.username};"
f"PWD={self._config.auth.password};"
"CustomProperties="
f"AccountID={self._config.account};"
f"RoleID={self._config.auth.role}"
)

def connect(self) -> pyodbc.Connection:
"""
Connects to the ODBC database and returns the connection object

:return: pyodbc.Connection
"""
return pyodbc.connect(self._connection_string)

def execute(self, sql: str) -> pyodbc.Cursor:
"""
Executes the SQL statement and returns the pyodbc cursor object

:param sql:
:return: pyodbc.Cursor
"""
with self.connect() as connection:
cursor = connection.cursor()
return cursor.execute(sql)

def query(self, sql: str) -> list[dict[str, typing.Any]]:
"""
Executes the SQL statement and returns the results as a list of dicts
:param sql:
:return: list[dict[str, any]]
"""
with self.connect() as connection:
cursor = connection.cursor()
response = cursor.execute(sql)
headers = [col[0] for col in response.description]
results = [
dict(zip(headers, row)) for i, row in enumerate(response.fetchall())
]
return results
13 changes: 8 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
[tool.poetry]
[project]
name = "netsuite"
version = "0.12.0"
version = "0.13.0"
description = "Make async requests to NetSuite SuiteTalk SOAP/REST Web Services and Restlets"
authors = ["Jacob Magnusson <m@jacobian.se>", "Mike Bianco <mike@mikebian.co>"]
authors = [
{ name = "Jacob Magnusson", email = "m@jacobian.se" },
{ name = "Mike Bianco", email = "mike@mikebian.co" }
]
license = "MIT"
readme = "README.md"
homepage = "https://jacobsvante.github.io/netsuite/"
Expand Down Expand Up @@ -39,9 +42,9 @@ soap_api = ["zeep"]
cli = ["ipython"]
orjson = ["orjson"]
# TODO doesn't --all-extras solve this for us?
all = ["zeep", "ipython", "orjson", "odbc"]
all = ["zeep", "ipython", "orjson", "pyodbc"]

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
black = "~24"
flake8 = "~7"
isort = "~6"
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ def dummy_config_with_production_account(dummy_config):
@pytest.fixture
def dummy_username_password_config():
return Config(
account="123456_SB1", auth={"username": "username", "password": "password"}
account="123456_SB1",
auth={"username": "username", "password": "password", "role": 3},
)
10 changes: 10 additions & 0 deletions tests/test_odbc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO need more expanded tests here
from netsuite import NetSuiteODBC


def test_tba(dummy_config):
Expand All @@ -22,4 +23,13 @@ def test_username_auth(dummy_username_password_config):
config = dummy_username_password_config

assert config.auth.username == "username"
assert type(config.auth.role) in [int, str]
assert config.odbc_data_source in ["NetSuite.com", "NetSuite2.com"]
assert config.odbc_driver == "{NetSuite Drivers 64bit}"
assert not config.is_token_auth


def test_correct_config_auth_method(dummy_username_password_config):
config = dummy_username_password_config
ns = NetSuiteODBC(config)
assert ns.hostname == "123456-sb1.connect.api.netsuite.com"