"""
This is from the LWA Software Library (LSL)
https://github.com/lwa-project/lsl
Redistributed under the GNU license.
Small collection of robust statistical estimators based on functions from
Henry Freudenriech (Hughes STX) statistics library (called ROBLIB) that have
been incorporated into the AstroIDL User's Library. Function included are:
* biweight_mean - biweighted mean estimator
* mean - robust estimator of the mean of a data set
* mode - robust estimate of the mode of a data set using the half-sample
method
* std - robust estimator of the standard deviation of a data set
* checkfit - return the standard deviation and biweights for a fit in order
to determine its quality
* linefit - outlier resistant fit of a line to data
* polyfit - outlier resistant fit of a polynomial to data
For the fitting routines, the coefficients are returned in the same order as
numpy.polyfit, i.e., with the coefficient of the highest power listed first.
For additional information about the original IDL routines, see:
http://idlastro.gsfc.nasa.gov/contents.html#C17
"""
# Python2 compatibility
from __future__ import print_function, division, absolute_import
import sys
if sys.version_info < (3,):
range = xrange
import math
import numpy
from numpy.polynomial.polynomial import polyfit as npp_polyfit, polyval as npp_polyval
__version__ = '0.5'
__all__ = ['biweight_mean', 'mean', 'mode', 'std', 'checkfit', 'linefit', 'polyfit']
__max_iter = 25
__delta = 5.0e-7
__epsilon = 1.0e-20
def __stddev(x):
return x.std()*numpy.sqrt(x.size/(x.size+1.0))
[docs]def biweight_mean(inputData, axis=None, dtype=None):
"""
Calculate the mean of a data set using bisquare weighting.
Based on the biweight_mean routine from the AstroIDL User's
Library.
.. versionchanged:: 1.0.3
Added the 'axis' and 'dtype' keywords to make this function more
compatible with numpy.mean()
"""
if axis is not None:
fnc = lambda x: biweight_mean(x, dtype=dtype)
y0 = numpy.apply_along_axis(fnc, axis, inputData)
else:
y = inputData.ravel()
if type(y).__name__ == "MaskedArray":
y = y.compressed()
if dtype is not None:
y = y.astype(dtype)
n = len(y)
closeEnough = 0.03*numpy.sqrt(0.5/(n-1))
diff = 1.0e30
nIter = 0
y0 = numpy.median(y)
deviation = y - y0
sigma = std(deviation)
if sigma < __epsilon:
diff = 0
while diff > closeEnough:
nIter = nIter + 1
if nIter > __max_iter:
break
uu = ((y-y0)/(6.0*sigma))**2.0
uu = numpy.where(uu > 1.0, 1.0, uu)
weights = (1.0-uu)**2.0
weights /= weights.sum()
y0 = (weights*y).sum()
deviation = y - y0
prevSigma = sigma
sigma = std(deviation, zero=True)
if sigma > __epsilon:
diff = numpy.abs(prevSigma - sigma) / prevSigma
else:
diff = 0.0
return y0
[docs]def mean(inputData, cut=3.0, axis=None, dtype=None):
"""
Robust estimator of the mean of a data set. Based on the
resistant_mean function from the AstroIDL User's Library.
.. versionchanged:: 1.2.1
Added a ValueError if the distriubtion is too strange
.. versionchanged:: 1.0.3
Added the 'axis' and 'dtype' keywords to make this function more
compatible with numpy.mean()
"""
if axis is not None:
fnc = lambda x: mean(x, cut=cut, dtype=dtype)
dataMean = numpy.apply_along_axis(fnc, axis, inputData)
else:
data = inputData.ravel()
if type(data).__name__ == "MaskedArray":
data = data.compressed()
if dtype is not None:
data = data.astype(dtype)
data0 = numpy.median(data)
maxAbsDev = numpy.median(numpy.abs(data-data0)) / 0.6745
if maxAbsDev < __epsilon:
maxAbsDev = (numpy.abs(data-data0)).mean() / 0.8000
cutOff = cut*maxAbsDev
good = numpy.where( numpy.abs(data-data0) <= cutOff )
good = good[0]
dataMean = data[good].mean()
dataSigma = math.sqrt( ((data[good]-dataMean)**2.0).sum() / len(good) )
if cut > 1.0:
sigmacut = cut
else:
sigmacut = 1.0
if sigmacut <= 4.5:
dataSigma = dataSigma / (-0.15405 + 0.90723*sigmacut - 0.23584*sigmacut**2.0 + 0.020142*sigmacut**3.0)
cutOff = cut*dataSigma
good = numpy.where( numpy.abs(data-data0) <= cutOff )
good = good[0]
dataMean = data[good].mean()
if len(good) > 3:
dataSigma = math.sqrt( ((data[good]-dataMean)**2.0).sum() / len(good) )
else:
raise ValueError("Distribution is too strange to compute mean")
if cut > 1.0:
sigmacut = cut
else:
sigmacut = 1.0
if sigmacut <= 4.5:
dataSigma = dataSigma / (-0.15405 + 0.90723*sigmacut - 0.23584*sigmacut**2.0 + 0.020142*sigmacut**3.0)
dataSigma = dataSigma / math.sqrt(len(good)-1)
return dataMean
[docs]def mode(inputData, axis=None, dtype=None):
"""
Robust estimator of the mode of a data set using the half-sample mode.
.. versionadded: 1.0.3
"""
if axis is not None:
fnc = lambda x: mode(x, dtype=dtype)
dataMode = numpy.apply_along_axis(fnc, axis, inputData)
else:
# Create the function that we can use for the half-sample mode
def _hsm(data):
if data.size == 1:
return data[0]
elif data.size == 2:
return data.mean()
elif data.size == 3:
i1 = data[1] - data[0]
i2 = data[2] - data[1]
if i1 < i2:
return data[:2].mean()
elif i2 > i1:
return data[1:].mean()
else:
return data[1]
else:
wMin = data[-1] - data[0]
N = data.size // 2 + data.size % 2
for i in range(0, N):
w = data[i+N-1] - data[i]
if w < wMin:
wMin = w
j = i
return _hsm(data[j:j+N])
data = inputData.ravel()
if type(data).__name__ == "MaskedArray":
data = data.compressed()
if dtype is not None:
data = data.astype(dtype)
# The data need to be sorted for this to work
data = numpy.sort(data)
# Find the mode
dataMode = _hsm(data)
return dataMode
[docs]def std(inputData, zero=False, axis=None, dtype=None):
"""
Robust estimator of the standard deviation of a data set.
Based on the robust_sigma function from the AstroIDL User's Library.
.. versionchanged:: 1.2.1
Added a ValueError if the distriubtion is too strange
.. versionchanged:: 1.0.3
Added the 'axis' and 'dtype' keywords to make this function more
compatible with numpy.std()
"""
if axis is not None:
fnc = lambda x: std(x, zero=zero, dtype=dtype)
sigma = numpy.apply_along_axis(fnc, axis, inputData)
else:
data = inputData.ravel()
if type(data).__name__ == "MaskedArray":
data = data.compressed()
if dtype is not None:
data = data.astype(dtype)
if zero:
data0 = 0.0
else:
data0 = numpy.median(data)
maxAbsDev = numpy.median(numpy.abs(data-data0)) / 0.6745
if maxAbsDev < __epsilon:
maxAbsDev = (numpy.abs(data-data0)).mean() / 0.8000
if maxAbsDev < __epsilon:
sigma = 0.0
return sigma
u = (data-data0) / 6.0 / maxAbsDev
u2 = u**2.0
good = numpy.where( u2 <= 1.0 )
good = good[0]
if len(good) < 3:
raise ValueError("Distribution is too strange to compute standard deviation")
numerator = ((data[good]-data0)**2.0 * (1.0-u2[good])**4.0).sum()
nElements = (data.ravel()).shape[0]
denominator = ((1.0-u2[good])*(1.0-5.0*u2[good])).sum()
sigma = nElements*numerator / (denominator*(denominator-1.0))
if sigma > 0:
sigma = math.sqrt(sigma)
else:
sigma = 0.0
return sigma
[docs]def checkfit(inputData, inputFit, epsilon, delta, bisquare_limit=6.0):
"""
Determine the quality of a fit and biweights. Returns a tuple
with elements:
0. Status
1. Robust standard deviation analog
2. Fractional median absolute deviation of the residuals
3. Number of input points given non-zero weight in the calculation
4. Bisquare weights of the input points
5. Residual values scaled by sigma
This function is based on the rob_checkfit routine from the AstroIDL
User's Library.
"""
status = 0
data = inputData.ravel()
fit = inputFit.ravel()
if type(data).__name__ == "MaskedArray":
data = data.compressed()
if type(fit).__name__ == "MaskedArray":
fit = fit.compressed()
deviation = data - fit
sigma = std(deviation, zero=True)
if sigma < epsilon:
return (status, sigma, 0.0, 0, 0.0, 0.0)
toUse = (numpy.where( numpy.abs(fit) > epsilon ))[0]
if len(toUse) < 3:
fracDev = 0.0
else:
fracDev = numpy.median(numpy.abs(deviation[toUse]/fit[toUse]))
if fracDev < delta:
return (status, sigma, fracDev, 0, 0.0, 0.0)
status = 1
scaledResids = numpy.abs(deviation)/(bisquare_limit*sigma)
toUse = (numpy.where(scaledResids > 1))[0]
if len(toUse) > 0:
scaledResids[toUse] = 1.0
nGood = len(data) - len(toUse)
biweights = (1.0 - scaledResids**2.0)
biweights = biweights / biweights.sum()
return (status, sigma, fracDev, nGood, biweights, scaledResids)
[docs]def linefit(inputX, inputY, max_iter=25, bisector=False, bisquare_limit=6.0, close_factor=0.03):
"""
Outlier resistance two-variable linear regression function.
Based on the robust_linefit routine in the AstroIDL User's Library.
"""
xIn = inputX.ravel()
yIn = inputY.ravel()
if type(yIn).__name__ == "MaskedArray":
xIn = xIn.compress(numpy.logical_not(yIn.mask))
yIn = yIn.compressed()
n = len(xIn)
x0 = xIn.mean()
y0 = yIn.mean()
x = xIn - x0
y = yIn - y0
cc = numpy.zeros(2)
ss = numpy.zeros(2)
sigma = 0.0
yFit = yIn
badFit = 0
nGood = n
lsq = 0.0
yp = y
if n > 5:
s = numpy.argsort(x)
u = x[s]
v = y[s]
nHalf = n//2 - 1
x1 = numpy.median(u[0:nHalf+1])
x2 = numpy.median(u[nHalf+1:])
y1 = numpy.median(v[0:nHalf+1])
y2 = numpy.median(v[nHalf+1:])
if numpy.abs(x2-x1) < __epsilon:
x1, x2 = u[0], u[-1]
y1, y2 = v[0], v[-1]
cc[1] = (y2-y1)/(x2-x1)
cc[0] = y1 - cc[1]*x1
yFit = cc[0] + cc[1]*x
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(yp, yFit, __epsilon, __delta)
if nGood < 2:
lsq = 1.0
if lsq == 1 or n < 6:
sx = x.sum()
sy = y.sum()
sxy = (x*y).sum()
sxx = (x*x).sum()
d = sxx - sx*sx
if numpy.abs(d) < __epsilon:
return (0.0, 0.0)
ySlope = (sxy - sx*sy) / d
yYInt = (sxx*sy - sx*sxy) / d
if bisector:
syy = (y*y).sum()
d = syy - sy*sy
if numpy.abs(d) < __epsilon:
return (0.0, 0.0)
tSlope = (sxy - sy*sx) / d
tYInt = (syy*sx - sy*sxy) / d
if numpy.abs(tSlope) < __epsilon:
return (0.0, 0.0)
xSlope = 1.0/tSlope
xYInt = -tYInt / tSlope
if ySlope > xSlope:
a1 = yYInt
b1 = ySlope
r1 = numpy.sqrt(1.0+ySlope**2.0)
a2 = xYInt
b2 = xSlope
r2 = numpy.sqrt(1.0+xSlope**2.0)
else:
a2 = yYInt
b2 = ySlope
r2 = numpy.sqrt(1.0+ySlope**2.0)
a1 = xYInt
b1 = xSlope
r1 = numpy.sqrt(1.0+xSlope**2.0)
yInt = (r1*a2 + r2*a1) / (r1 + r2)
slope = (r1*b2 + r2*b1) / (r1 + r2)
r = numpy.sqrt(1.0+slope**2.0)
if yInt > 0:
r = -r
u1 = slope / r
u2 = -1.0/r
u3 = yInt / r
yp = u1*x + u2*y + u3
yFit = y*0.0
ss = yp
else:
slope = ySlope
yInt = yYInt
yFit = yInt + slope*x
cc[0] = yInt
cc[1] = slope
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(yp, yFit, __epsilon, __delta)
if nGood < 2:
cc[0] = cc[0] + y0 - cc[1]*x0
return cc[::-1]
sigma1 = min([(100.0*sigma), 1e20])
closeEnough = close_factor * numpy.sqrt(0.5/(n-1))
if closeEnough < __delta:
closeEnough = __delta
diff = 1.0e20
nIter = 0
while diff > closeEnough and nIter < max_iter:
nIter = nIter + 1
sigma2 = sigma1
sigma1 = sigma
sx = (biweights*x).sum()
sy = (biweights*y).sum()
sxy = (biweights*x*y).sum()
sxx = (biweights*x*x).sum()
d = sxx - sx*sx
if numpy.abs(d) < __epsilon:
return (0.0, 0.0)
ySlope = (sxy - sx*sy) / d
yYInt = (sxx*sy - sx*sxy) / d
slope = ySlope
yInt = yYInt
if bisector:
syy = (biweights*y*y).sum()
d = syy - sy*sy
if numpy.abs(d) < __epsilon:
return (0.0, 0.0)
tSlope = (sxy - sy*sx) / d
tYInt = (syy*sx - sy*sxy) / d
if numpy.abs(tSlope) < __epsilon:
return (0.0, 0.0)
xSlope = 1.0/tSlope
xYInt = -tYInt / tSlope
if ySlope > xSlope:
a1 = yYInt
b1 = ySlope
r1 = numpy.sqrt(1.0+ySlope**2.0)
a2 = xYInt
b2 = xSlope
r2 = numpy.sqrt(1.0+xSlope**2.0)
else:
a2 = yYInt
b2 = ySlope
r2 = numpy.sqrt(1.0+ySlope**2.0)
a1 = xYInt
b1 = xSlope
r1 = numpy.sqrt(1.0+xSlope**2.0)
yInt = (r1*a2 + r2*a1) / (r1 + r2)
slope = (r1*b2 + r2*b1) / (r1 + r2)
r = numpy.sqrt(1.0+slope**2.0)
if yInt > 0:
r = -r
u1 = slope / r
u2 = -1.0/r
u3 = yInt / r
yp = u1*x + u2*y + u3
yFit = y*0.0
ss = yp
else:
yFit = yInt + slope*x
cc[0] = yInt
cc[1] = slope
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(yp, yFit, __epsilon, __delta, bisquare_limit=bisquare_limit)
if status == 0:
break
if nGood < 2:
badFit = 1
break
diff = min([numpy.abs(sigma1 - sigma)/sigma, numpy.abs(sigma2 - sigma)/sigma])
cc[0] = cc[0] + y0 - cc[1]*x0
return cc[::-1]
def __polyfit_rescale(coeffs, x0, y0):
order = len(coeffs)-1
if order == 1:
coeffs[0] = coeffs[0]-coeffs[1]*x0 + y0
elif order == 2:
coeffs[0] = coeffs[0]-coeffs[1]*x0+coeffs[2]*x0**2 + y0
coeffs[1] = coeffs[1]-2.*coeffs[2]*x0
elif order == 3:
coeffs[0] = coeffs[0]-coeffs[1]*x0+coeffs[2]*x0**2-coeffs[3]*x0**3 + y0
coeffs[1] = coeffs[1]-2.*coeffs[2]*x0+3.*coeffs[3]*x0**2
coeffs[2] = coeffs[2]-3.*coeffs[3]*x0
elif order == 4:
coeffs[0] = coeffs[0]- coeffs[1]*x0+coeffs[2]*x0**2-coeffs[3]*x0**3+coeffs[4]*x0**4+ y0
coeffs[1] = coeffs[1]-2.*coeffs[2]*x0+3.*coeffs[3]*x0**2-4.*coeffs[4]*x0**3
coeffs[2] = coeffs[2]-3.*coeffs[3]*x0+6.*coeffs[4]*x0**2
coeffs[3] = coeffs[3]-4.*coeffs[4]*x0
elif order == 5:
coeffs[0] = coeffs[0]- coeffs[1]*x0+coeffs[2]*x0**2-coeffs[3]*x0**3+coeffs[4]*x0**4-coeffs[5]*x0**5+ y0
coeffs[1] = coeffs[1]-2.*coeffs[2]*x0+ 3.*coeffs[3]*x0**2- 4.*coeffs[4]*x0**3+5.*coeffs[5]*x0**4
coeffs[2] = coeffs[2]-3.*coeffs[3]*x0+ 6.*coeffs[4]*x0**2-10.*coeffs[5]*x0**3
coeffs[3] = coeffs[3]-4.*coeffs[4]*x0+10.*coeffs[5]*x0**2
coeffs[4] = coeffs[4]-5.*coeffs[5]*x0
return coeffs[::-1]
[docs]def polyfit(inputX, inputY, order, max_iter=25):
"""
Outlier resistance two-variable polynomial function fitter.
Based on the robust_poly_fit routine in the AstroIDL User's
Library.
"""
x = inputX.ravel()
y = inputY.ravel()
if type(y).__name__ == "MaskedArray":
x = x.compress(numpy.logical_not(y.mask))
y = y.compressed()
n = len(x)
if order < 6:
x0 = x.mean()
y0 = y.mean()
u = x - x0
v = y - y0
else:
u = x
v = y
minPts = order + 1
if (n//4)*4 == n:
need2 = 1
else:
need2 = 0
n3 = 3*n//4
n1 = n//4
nSeg = order + 2
if (nSeg//2)*2 == nSeg:
nSeg = nSeg + 1
min_pts = nSeg*3
yp = y
if n < 10000:
lsqFit = 1
cc = npp_polyfit(u, v, order)
yFit = npp_polyval(u, cc)
else:
lsqfit = 0
q = numpy.argsort(u)
u = u[q]
v = v[q]
nPerSeg = numpy.zeros(nSeg, dtype=numpy.int64) + n//nSeg
nLeft = n - nPerSeg[0]*nSeg
nPerSeg[nSeg//2] = nPerSeg[nSeg//2] + nLeft
r = numpy.zeros(nSeg, dtype=numpy.float64)
s = numpy.zeros(nSeg, dtype=numpy.float64)
r[0] = numpy.median(u[0:nPerSeg[0]])
s[0] = numpy.median(v[0:nPerSeg[0]])
i2 = nPerSeg[0]-1
for i in range(1, nSeg):
i1 = i2
i2 = i1 + nPerSeg[i]
r[i] = numpy.median(u[i1:i2])
s[i] = numpy.median(v[i1:i2])
cc = npp_polyfit(r, s, order)
yFit = npp_polyval(u, cc)
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(v, yFit, __epsilon, __delta)
if nGood == 0:
return __polyfit_rescale(cc, x0, y0)
if nGood < minPts:
if lsqFit == 0:
cc = npp_polyfit(u, v, order)
yFit = npp_polyval(u, cc)
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(yp, yFit, __epsilon, __delta)
if nGood == 0:
return 0
nGood = n - nGood
if nGood < minPts:
return 0
closeEnough = 0.03*numpy.sqrt(0.5/(n-1))
if closeEnough < __delta:
closeEnough = __delta
diff = 1.0e10
sigma1 = min([100.0*sigma, 1e20])
nIter = 0
while diff > closeEnough and nIter < max_iter:
nIter = nIter + 1
sigma2 = sigma1
sigma1 = sigma
g = (numpy.where(biweights > 0))[0]
if len(g) < len(biweights):
u = u[g]
v = v[g]
biweights = biweights[g]
cc = npp_polyfit(u, v, order, w=biweights**2)
yFit = npp_polyval(u, cc)
status, sigma, fracDev, nGood, biweights, scaledResids = checkfit(v, yFit, __epsilon, __delta)
if status == 0:
break
if nGood < minPts:
break
diff = min([numpy.abs(sigma1 - sigma)/sigma, numpy.abs(sigma2 - sigma)/sigma])
return __polyfit_rescale(cc, x0, y0)