Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
Change Log
===========
0.1.17
-------
- Added ``get_horizontal_levels(h, pctbound=0.05, extmethod=..., accuracy=2,
min_touches=2)`` function; detects flat support and resistance zones by
clustering pivot prices within a relative tolerance, returning
``(support_levels, resistance_levels)`` where each entry is
``(mean_price, touch_count, pivot_indices)``; accepts the same
``h`` formats as :func:`calc_support_resistance` (closes #13)

0.1.16
-------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

setup(
name='trendln',
version="0.1.16",
version="0.1.17",
description='Support and Resistance Trend lines Calculator for Financial Analysis',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
100 changes: 100 additions & 0 deletions tests/test_trendln.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from trendln import (
calc_support_resistance,
get_extrema,
get_horizontal_levels,
get_levels,
pandas_to_ohlc,
plot_support_resistance,
Expand Down Expand Up @@ -766,3 +767,102 @@ def test_invalid_calc_result_single_side_raises(self):
single = calc_support_resistance((DATA_SIMPLE, None))
with pytest.raises(ValueError):
get_levels(single, 10, 1.5)


# ---------------------------------------------------------------------------
# get_horizontal_levels — horizontal clustering (issue #13)
# ---------------------------------------------------------------------------

import math as _math

class TestGetHorizontalLevels:
"""Tests for trendln.get_horizontal_levels()."""

# DATA_SIMPLE = [0,1,2,3,2,1,0,1,2,3,...,0] (25 pts)
# Minima are all 0.0 -> a single tight support cluster.
# Maxima are all 3.0 -> a single tight resistance cluster.

def test_returns_two_lists(self):
sup, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE)
assert isinstance(sup, list)
assert isinstance(res, list)

def test_entry_format(self):
sup, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE)
assert len(sup) > 0
mean_price, touch_count, pivot_idxs = sup[0]
assert isinstance(mean_price, float)
assert isinstance(touch_count, int)
assert isinstance(pivot_idxs, list)

def test_support_cluster_at_zero(self):
# All minima of DATA_SIMPLE are 0.0 -> should form one cluster
sup, _ = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=2)
assert len(sup) == 1
mean_price, touch_count, _ = sup[0]
assert _math.isclose(mean_price, 0.0, abs_tol=1e-9)
assert touch_count >= 2

def test_resistance_cluster_at_three(self):
# All maxima of DATA_SIMPLE are 3.0 -> should form one cluster
_, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=2)
assert len(res) == 1
mean_price, touch_count, _ = res[0]
assert _math.isclose(mean_price, 3.0, abs_tol=1e-9)
assert touch_count >= 2

def test_min_touches_one_returns_every_pivot(self):
sup, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=1)
# At min_touches=1 every single pivot is a level
total_pivots = sum(c[1] for c in sup) + sum(c[1] for c in res)
assert total_pivots > 0

def test_high_min_touches_filters_out_clusters(self):
# DATA_SIMPLE has exactly 3 minima (all 0.0). Require 4 -> no support levels.
sup, _ = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=10)
assert sup == []

def test_support_sorted_descending(self):
# With multiple clusters, support levels should be descending by price
data = [float(x) for x in [0,1,0,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2]]
sup, _ = get_horizontal_levels(data, extmethod=METHOD_NAIVE,
min_touches=1, pctbound=0.01)
prices = [s[0] for s in sup]
assert prices == sorted(prices, reverse=True)

def test_resistance_sorted_ascending(self):
_, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=1)
prices = [r[0] for r in res]
assert prices == sorted(prices)

def test_pivot_indices_in_range(self):
sup, res = get_horizontal_levels(DATA_SIMPLE, extmethod=METHOD_NAIVE,
min_touches=1)
n = len(DATA_SIMPLE)
for _, _, idxs in sup + res:
for i in idxs:
assert 0 <= i < n

def test_tuple_input_low_high(self):
# (low, high) tuple: low->support, high->resistance
sup, res = get_horizontal_levels((DATA_SIMPLE, DATA_SIMPLE),
extmethod=METHOD_NAIVE, min_touches=1)
assert len(sup) > 0
assert len(res) > 0

def test_invalid_pctbound_raises(self):
with pytest.raises(ValueError, match='pctbound'):
get_horizontal_levels(DATA_SIMPLE, pctbound=0)

def test_invalid_pctbound_type_raises(self):
with pytest.raises(ValueError, match='pctbound'):
get_horizontal_levels(DATA_SIMPLE, pctbound=5)

def test_invalid_min_touches_raises(self):
with pytest.raises(ValueError, match='min_touches'):
get_horizontal_levels(DATA_SIMPLE, min_touches=0)
82 changes: 82 additions & 0 deletions trendln/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,88 @@ def _extract(trend):

return supports, resistances, rr_ratios

def get_horizontal_levels(h, pctbound=0.05, extmethod=METHOD_NUMDIFF, accuracy=2, min_touches=2):
"""Find horizontal support and resistance levels by clustering pivot prices.

Groups detected minima and maxima whose prices fall within *pctbound* of
each other into a single level, then filters to levels touched by at least
*min_touches* pivots.

Parameters
----------
h : array-like or tuple
Price series or ``(low, high)`` 2-tuple, same format as
:func:`calc_support_resistance`. For a single series both support and
resistance pivots are drawn from it; for a ``(low, high)`` tuple the
*low* series supplies support pivots and the *high* series supplies
resistance pivots — either element may be ``None`` to skip that side.
pctbound : float, optional
Maximum relative distance between a candidate price and the running
cluster mean for the two to be merged. Default ``0.05`` (5 %).
extmethod : int or str, optional
Extrema detection method constant (default ``METHOD_NUMDIFF``).
accuracy : int, optional
Numerical differentiation accuracy passed to :func:`get_extrema`
(default 2).
min_touches : int, optional
Minimum pivot count required to keep a cluster (default 2). Use
``1`` to return every detected single-pivot level.

Returns
-------
support_levels : list of tuple
Horizontal support zones sorted descending by price. Each entry is
``(mean_price, touch_count, pivot_indices)`` where *mean_price* is the
average pivot price in the cluster, *touch_count* is the cluster size,
and *pivot_indices* is the sorted list of original series indices.
resistance_levels : list of tuple
Horizontal resistance zones in the same format, sorted ascending.
"""
if not isinstance(pctbound, float) or pctbound <= 0.0:
raise ValueError('pctbound must be a positive float')
if not isinstance(min_touches, int) or min_touches < 1:
raise ValueError('min_touches must be a positive integer')

is_tuple = (isinstance(h, tuple) and len(h) == 2
and (h[0] is None or check_num_alike(h[0]))
and (h[1] is None or check_num_alike(h[1])))

if is_tuple:
h_low, h_high = h
raw_min_idxs = get_extrema((h_low, None), extmethod, accuracy) if h_low is not None else []
raw_max_idxs = get_extrema((None, h_high), extmethod, accuracy) if h_high is not None else []
support_pairs = [(float(h_low[i]), i) for i in raw_min_idxs] if h_low is not None else []
resistance_pairs = [(float(h_high[i]), i) for i in raw_max_idxs] if h_high is not None else []
else:
raw_min_idxs, raw_max_idxs = get_extrema(h, extmethod, accuracy)
support_pairs = [(float(h[i]), i) for i in raw_min_idxs]
resistance_pairs = [(float(h[i]), i) for i in raw_max_idxs]

def _cluster(price_idx_pairs):
if not price_idx_pairs:
return []
ordered = sorted(price_idx_pairs, key=lambda p: p[0])
clusters = []
grp_prices = [ordered[0][0]]
grp_idxs = [ordered[0][1]]
for price, idx in ordered[1:]:
mean = sum(grp_prices) / len(grp_prices)
if (mean == 0.0 and price == 0.0) or (mean != 0.0 and abs(price - mean) / mean <= pctbound):
grp_prices.append(price)
grp_idxs.append(idx)
else:
clusters.append((sum(grp_prices) / len(grp_prices),
len(grp_prices), sorted(grp_idxs)))
grp_prices = [price]
grp_idxs = [idx]
clusters.append((sum(grp_prices) / len(grp_prices),
len(grp_prices), sorted(grp_idxs)))
return [c for c in clusters if c[1] >= min_touches]

support_levels = sorted(_cluster(support_pairs), key=lambda c: c[0], reverse=True)
resistance_levels = sorted(_cluster(resistance_pairs), key=lambda c: c[0])
return support_levels, resistance_levels

def plot_sup_res_date(hist, idx, numbest = 2, fromwindows = True, pctbound=0.1,
extmethod = METHOD_NUMDIFF, method=METHOD_NSQUREDLOGN, window=125,
errpct = 0.005, hough_scale=0.01, hough_prob_iter=10, sortError=False, accuracy=2,
Expand Down
Loading