import numpy as np
from scipy import stats, signal
[docs]
def hanning(x,N):
"""
Filter a time series x with a Hanning window of length N. If x is 2D, the time series will be filtered along columns.
Inputs:
x - a numpy array to be filtered
N - width of window
Output: numpy array of filtered time series
"""
wts = signal.hann(N) # filter weights
xf = _filt(x,wts)
return xf
[docs]
def lancz(x=None,dt=1,T=40,return_weights=False):
"""
Filter a time series x with cosine-Lanczos filter. If x is 2D, the time series will be filtered along columns.
The default half amplitude period of 40 hours corresponds to a frequency of 0.6 cpd. A half amplitude period of 34.29h corresponds to 0.7 cpd. The 40 hour half amplitude period is more effective at reducing diurnal-band variability but shifts periods of variability in low passed time series to >2 days.
Inputs:
x - a numpy array to be filtered
dt - sample interval (hours), default = 1
T - half-amplitude period (hours), default = 40
return_weights - Boolean indicating whether to return the filter weights instead of a filtered time series, default = False
Output:
- numpy array of filtered time series, same size as input with ends NaN values at start and end (default)
- numpy array of filter weights (if `return_weights=True` is specified)
Reference: Emery and Thomson, 2004, Data Analysis Methods in Physical Oceanography. 2nd Ed., pp. 539-540. Section 5.10.7.4 - The Hanning window.
"""
cph = 1./dt # samples per hour
nwts = int(np.round(120*cph)) # number of weights
# create filter weights
wts = signal.firwin(nwts,
1./T,
window='hanning',
nyq=cph/2.)
xf = _filt(x,wts,return_weights)
return xf
[docs]
def pl66(x=None,dt=1,T=33,return_weights=False):
"""
Filter a time series x with the PL66 filter. If x is 2D, the time series will be filtered along columns.
Inputs:
x - a numpy array to be filtered
dt - sample interval (hours), default = 1
T - half-amplitude period (hours), default = 33
return_weights - Boolean indicating whether to return the filter weights instead of a filtered time series, default = False
Output:
- numpy array of filtered time series, same size as input with ends NaN values at start and end (default)
- numpy array of filter weights (if `return_weights=True` is specified)
Reference: Rosenfeld (1983), WHOI Technical Report 85-35
Matlab code: http://woodshole.er.usgs.gov/operations/sea-mat/bobstuff-html/pl66tn.html
"""
Tn=float(T)/dt # normalized cutoff period
fqn=1./Tn # normalized cutoff frequency
nw = int(np.round(2.*T/dt)) # number of weights on one side
# create filter weights
j = np.arange(1,nw)
tn = np.pi*j
den=fqn*fqn*tn**3
wts = (2*np.sin(2*fqn*tn)-np.sin(fqn*tn)-np.sin(3*fqn*tn))/den
# make symmetric
wts = np.hstack((wts[::-1],2*fqn,wts))
xf = _filt(x,wts,return_weights)
return xf
[docs]
def pl64(x=None,dt=1,T=33,return_weights=False):
"""
Filter a time series x with the PL64 filter. If x is 2D, the time series will be filtered along columns.
Inputs:
x - a numpy array to be filtered
dt - sample interval (hours), default = 1
T - half-amplitude period (hours), default = 33
return_weights - Boolean indicating whether to return the filter weights instead of a filtered time series, default = False
Output:
- numpy array of filtered time series, same size as input with ends NaN values at start and end (default)
- numpy array of filter weights (if `return_weights=True` is specified)
Reference: CODE-2: Moored Array and Large-Scale Data Report, WHOI 85-35
"""
Tn=float(T)/dt # normalized cutoff period
fqn=1./Tn # normalized cutoff frequency
nw = int(np.round(64/dt)) # number of weights on one side
# create filter weights
j = np.arange(1,nw)
tn = np.pi*j
den=fqn*fqn*tn**3
wts = (2*np.sin(2*fqn*tn)-np.sin(fqn*tn)-np.sin(3*fqn*tn))/den
# make symmetric
wts = np.hstack((wts[::-1],2*fqn,wts))
xf = _filt(x,wts,return_weights)
return xf
def _filt(x,wts,return_weights=False):
"""
Private function to filter a time series and pad the ends of the filtered time series with NaN values. For N weights, N/2 values are padded at each end of the time series. The filter weights are normalized so that the sum of weights = 1.
Inputs:
x - the time series (may be 2d, will be filtered along columns)
wts - the filter weights
return_weights - if True, return the filter weights instead of a filtered time series (default: False)
Output:
- the filtered time series (default)
- filter weights (if `return_weights=True` is specified)
"""
# convert to 2D array if necessary (general case)
ndims = np.ndim(x)
if ndims == 1:
x = np.expand_dims(x,axis=1)
# normalize weights
wtsn = wts*sum(wts)**-1 # normalize weights so sum = 1
if return_weights==False:
# Convolve using 'direct' method. In older versions of scipy, this has to
# be specified because the default 'auto' method could decide to use the
# 'fft' method, which does not work for time series with NaNs. In newer
# versions, there is no method option.
try:
xf = signal.convolve(x,wtsn[:,np.newaxis],mode='same',method='direct')
except:
xf = signal.convolve(x,wtsn[:,np.newaxis],mode='same')
# note: np.convolve may be faster
# http://scipy.github.io/old-wiki/pages/Cookbook/ApplyFIRFilter
# pad ends of time series
nwts = len(wts) # number of filter weights
npad = int(np.ceil(0.5*nwts))
xf[:npad,:] = np.nan
xf[-npad:,:] = np.nan
# return array with same number of dimensions as input
if ndims == 1:
xf = xf.flatten()
elif return_weights==True:
# return normalized weights instead of filtered time series
xf = wtsn
else:
raise('return_weights must be a Boolean')
return xf
[docs]
def princax(u,v=None):
'''
Principal axes of a vector time series.
Usage:
theta,major,minor = princax(u,v) # if u and v are real-valued vector components
or
theta,major,minor = princax(w) # if w is a complex vector
Input:
u,v - 1-D arrays of vector components (e.g. u = eastward velocity, v = northward velocity)
or
w - 1-D array of complex vectors (u + 1j*v)
Output:
theta - angle of major axis (math notation, e.g. east = 0, north = 90)
major - standard deviation along major axis
minor - standard deviation along minor axis
Reference: Emery and Thomson, 2001, Data Analysis Methods in Physical Oceanography, 2nd ed., pp. 325-328.
Matlab function: http://woodshole.er.usgs.gov/operations/sea-mat/RPSstuff-html/princax.html
'''
# if one input only, decompose complex vector
if v is None:
w = np.copy(u)
u = np.real(w)
v = np.imag(w)
# Make sure inputs are numpy arrays
if type(u) is list:
u = np.array(u)
v = np.array(v)
# only use finite values for covariance matrix
ii = np.isfinite(u+v)
uf = u[ii]
vf = v[ii]
# compute covariance matrix
C = np.cov(uf,vf)
# calculate principal axis angle (ET, Equation 4.3.23b)
theta = 0.5*np.arctan2(2.*C[0,1],(C[0,0] - C[1,1])) * 180/np.pi
# calculate variance along major and minor axes (Equation 4.3.24)
term1 = C[0,0] + C[1,1]
term2 = ((C[0,0] - C[1,1])**2 + 4*(C[0,1]**2))**0.5
major = np.sqrt(0.5*(term1 + term2))
minor = np.sqrt(0.5*(term1 - term2))
return theta,major,minor
[docs]
def rot(u,v,theta):
"""
Rotate a vector counter-clockwise OR rotate the coordinate system clockwise.
Usage:
ur,vr = rot(u,v,theta)
Input:
u,v - vector components (e.g. u = eastward velocity, v = northward velocity)
theta - rotation angle (degrees)
Output:
ur,vr - rotated vector components
Example:
rot(1,0,90) returns (0,1)
"""
# Make sure inputs are numpy arrays
if type(u) is list:
u = np.array(u)
v = np.array(v)
w = u + 1j*v # complex vector
ang = theta*np.pi/180 # convert angle to radians
wr = w*np.exp(1j*ang) # complex vector rotation
ur = np.real(wr) # return u and v components
vr = np.imag(wr)
return ur,vr
[docs]
def fillgapwithnan(x,date):
"""
Fill in missing data with NaN values. This is intended for a regular time series that has gaps where no data are reported.
Although this function is intended for regularly-sampled time series (with gaps), it does allow for some slight irregularity (e.g. 13:00,14:00,15:00,15:59,16:59...)
Inputs:
x - a numpy array of data (1 or 2 dimensions)
date - datetime values that correspond to x
Returns:
(newx,newdate)
newx - new array of data
newdate = new datetime values
"""
xnd = np.ndim(x) # remember original number of dims in x
x = np.array(x,ndmin=2) # make array 2D for general use
# ensure that x is oriented correctly and has one dimension with same length as date
flipdim = False
if np.shape(x)[0] != np.shape(date)[0]:
x = x.transpose()
flipdim = True
if np.shape(x)[0] != np.shape(date)[0]:
raise Exception('one dimension of x must have same length as date')
# find most common timedelta
alldeltat = np.diff(date)
# stats.mode returns (value, number of occurences) in arrays
deltat = stats.mode(alldeltat)[0][0]
gapi = np.where(alldeltat > deltat*3/2)[0]
# build new arrays
newdate = date[0:gapi[0]+1]
newx = np.array(x[0:gapi[0]+1,:],ndmin=2)
cnt = 0 # counter for looping through the gaps
for ii in gapi:
tdiff = date[ii+1]-date[ii] # size of gap
# number of new values needed to fill gap
nstep = int(round((tdiff.total_seconds()/deltat.total_seconds())))-1
for step in np.arange(nstep):
t = newdate[-1]+deltat
newdate = np.append(newdate,t)
gapnans = np.nan*np.ones((nstep,np.shape(x)[1]))
newx = np.vstack((newx,gapnans))
if ii!=gapi[-1]:
i1 = ii+1
i2 = gapi[cnt+1]
newdate = np.append(newdate,date[i1:i2+1])
newx = np.vstack((newx,x[i1:i2+1,:]))
else:
newdate = np.append(newdate,date[ii+1:])
newx = np.vstack((newx,x[ii+1:,:]))
cnt=cnt+1
if flipdim:
newx = newx.transpose()
if xnd == 1:
newx = newx.flatten() # reduce back to 1D array if necessary
return (newx,newdate)
[docs]
def depthavg(x,z,h,ssh=None,surface='mixed',bottom='zero'):
'''
Compute depth average of each row in 2D array x, with corresponding depths z.
Designed to accomodate upward looking ADCP data, with moving sea surface and
blank bins with no data near surface. If no sea surface height is specified,
it is assumed to be at z=0 for all times.
x: variable to be depth-averaged, 2D array with shape N rows, M columns
z: measurement depths (z=0 is surface, z=-h is bottom), array of length M
h: bottom depth (positive value)
ssh: sea surface height (optional, set to zero if None or where value is undefined)
surface: boundary condition for surface
'mixed' (default) or 'extrap'
bottom: boundary condition for bottom
'zero' (default),'mixed' or 'extrap'
'''
# ensure that inputs are arrays of floats
x = np.array(x).astype('float')
z = np.array(z).astype('float')
h = np.array(h).astype('float')
if np.ndim(x) == 1:
x = x[np.newaxis]
ni,nj = np.shape(x)
# If SSH not specified, create an array of zeros
if ssh is None:
ssh = np.zeros(ni)
ssh = np.array(ssh)
if np.ndim(ssh) == 0:
ssh = ssh[np.newaxis]
# ssh in 2D column array
ssh2 = np.array([ssh]).T
# depths in 2D array
sorti = np.argsort(z)
zs = z[sorti]
zs2 = np.tile(zs,[ni,1])
xs2 = x[:,sorti] # sort data in same manner as depths
# water depth in 2D column array
h2 = np.tile(h,[ni,1])
# new 2D x and z arrays to work with, with bottom and surface included
zmat = np.hstack([-h2,zs2,ssh2])
nans2 = np.nan*np.ones([ni,1])
xmat = np.hstack([nans2,xs2,nans2])
# only do calculations for rows where finite data exist
fini = np.isfinite(xmat)
ii, = np.where(np.sum(fini,axis=1) > 0)
# bottom calculation
if bottom == 'zero':
xmat[ii,0] = 0.
elif bottom == 'mixed':
xmat[:,0] = xmat[:,1]
elif bottom == 'extrap':
xmat[:,0] = (xmat[:,2]-xmat[:,1])*(zmat[:,0]-zmat[:,1]) \
/(zmat[:,2]-zmat[:,1]) \
+ xmat[:,1]
else:
raise ValueError('depthavg: bottom condition not understood (should be \'mixed\', \'extrap\' or \'zero\')')
# find where depths are higher than sea surface or where there is no data,
# mask with NaN Values
xmatz = np.copy(xmat)
xmatz[ii,-1] = 0.
msk = (zmat > ssh2) | np.isnan(xmatz)
zmatnan = np.copy(zmat)
if np.any(msk):
zmatnan[msk] = np.nan
# sort each row of arrays by depth
sj = np.argsort(zmatnan)
si = np.arange(np.shape(zmat)[0])[:,np.newaxis]
zmats = zmatnan[si,sj]
xmats = xmat[si,sj]
# column index of surface in each row where data exists
jj = (np.sum(np.isfinite(zmats),axis=1)-1)[ii]
# calculate surface value
if surface == 'mixed':
xmats[ii,jj] = xmats[ii,jj-1]
elif surface == 'extrap':
xmats[ii,jj] = (xmats[ii,jj-1]-xmats[ii,jj-2])*(zmats[ii,jj]-zmats[ii,jj-2]) \
/(zmats[ii,jj-1]-zmats[ii,jj-2]) \
+ xmats[ii,jj-2]
else:
raise ValueError('depthavg: surface condition not understood (should be \'mixed\' or \'extrap\')')
# integrate vertically using trapezoidal rule
xm = 0.5*(xmats[:,:-1]+xmats[:,1:])
dz = np.diff(zmats,axis=1)
xint = np.nansum(xm*dz,axis=1)
# divide by instantaneous water depth to compute depth average
xda = np.nan*ssh
xda[ii] = xint[ii]/(ssh[ii] + h)
return xda
[docs]
def surface_transport(u,z,ssh=None,surface='mixed'):
'''
Compute surface transport Us from velocity component u, with corresponding depths z.
Integration is performed from the sea surface to the first zero crossing in the u profile.
If u has units of m/s, Us will have units of m^2/s.
Designed to accomodate upward looking ADCP data, with moving sea surface and
blank bins with no data near surface. If no sea surface height is specified,
it is assumed to be at z=0 for all times.
INPUT:
x: variable to be depth-averaged, 2D array with shape N rows, M columns
z: measurement depths (z=0 is surface, negative below surface), array of length M
ssh: sea surface height (optional, set to zero if None or where value is undefined)
surface: boundary condition for surface
'mixed' (default) or 'extrap'
OUTPUT:
Us - transport integrated from surface to first zero crossing of velocity profiled
zs - depth of first zero crossing
'''
# ensure that inputs are arrays of floats
x = np.array(u).astype('float')
z = np.array(z).astype('float')
if np.ndim(x) == 1:
x = x[np.newaxis]
ni,nj = np.shape(x)
# If SSH not specified, create an array of zeros
if ssh is None:
ssh = np.zeros(ni)
ssh = np.array(ssh)
if np.ndim(ssh) == 0:
ssh = ssh[np.newaxis]
# ssh in 2D column array
ssh2 = np.array([ssh]).T
# depths in 2D array
sorti = np.argsort(z)
zsort = z[sorti]
zs2 = np.tile(zsort,[ni,1])
xs2 = x[:,sorti] # sort data in same manner as depths
# make sure that there is at least one zero crossing
# just below the deepest level
eps = np.finfo(float).eps
zend = zs2[:,0] - np.sqrt(eps)
xend = -np.sign(xs2[:,0])*np.sqrt(eps)
zend = zend[:,np.newaxis]
xend = xend[:,np.newaxis]
# new 2D x and z arrays to work with, with bottom and surface included
zmat = np.hstack([zend,zs2,ssh2])
nans2 = np.nan*np.ones([ni,1])
xmat = np.hstack([xend,xs2,nans2])
# only do calculations for rows where finite data exist
fini = np.isfinite(xmat)
ii, = np.where(np.sum(fini,axis=1) > 0)
# find where depths are higher than sea surface or where there is no data,
# mask with NaN Values
xmatz = np.copy(xmat)
xmatz[ii,-1] = 0.
msk = (zmat > ssh2) | np.isnan(xmatz)
zmatnan = np.copy(zmat)
if np.any(msk):
zmatnan[msk] = np.nan
# sort each row of arrays by depth
sj = np.argsort(zmatnan)
si = np.arange(np.shape(zmat)[0])[:,np.newaxis]
zmats = zmatnan[si,sj]
xmats = xmat[si,sj]
# column index of surface in each row where data exists
jj = (np.sum(np.isfinite(zmats),axis=1)-1)[ii]
# calculate surface value
if surface == 'mixed':
xmats[ii,jj] = xmats[ii,jj-1]
elif surface == 'extrap':
xmats[ii,jj] = (xmats[ii,jj-1]-xmats[ii,jj-2])*(zmats[ii,jj]-zmats[ii,jj-2]) \
/(zmats[ii,jj-1]-zmats[ii,jj-2]) \
+ xmats[ii,jj-2]
else:
raise ValueError('surface_transport: surface condition not understood (should be \'mixed\' or \'extrap\')')
# Flip 2D array so that surface is first column
xmatr = np.fliplr(xmats)
zmatr = np.fliplr(zmats)
# Find index just above first zero crossing
zj = np.argmax(np.sign(xmatr[:,:-1]*xmatr[:,1:]) == -1,axis=1)
# Compute depth of first zero crossing using linear interpolation
i = range(ni)
zs = zmatr[i,zj] - xmatr[i,zj]*(zmatr[i,zj]-zmatr[i,zj+1])/(xmatr[i,zj]-xmatr[i,zj+1])
# create rectangular array, where columns are integers starting from zero
jmat = np.arange(np.shape(xmatr)[1])*np.ones(np.shape(xmatr)[0])[:,np.newaxis]
# use to create mask of depths between surface and zero crossing
mask = np.less_equal(jmat,zj[:,np.newaxis])
# Replace data below first zero crossing with zeros
xmatr = mask*xmatr
# Use depth of first zero crossing for zero velocity closest to surface
zmatr[i,zj+1] = zs
# integrate vertically using trapezoidal rule
xm = 0.5*(xmatr[:,:-1]+xmatr[:,1:])
dz = -np.diff(zmatr,axis=1) # negative because depths are decreasing
Us = np.nansum(xm*dz,axis=1)
badi, = np.where(np.isnan(np.nanmax(xm,axis=1)))
Us[badi] = np.nan
zs[badi] = np.nan
return Us,zs
[docs]
def surface_flux(u,z,tr,ssh=None,surface='mixed'):
'''
Compute integrated surface-layer flux of a tracer tr, due to velocity component u with corresponding depths z.
Integration is performed from the sea surface to the first zero crossing in the u profile.
If u has units of m/s, and tr has units of C, result will have units of (C m^2)/s
Designed to accomodate upward looking ADCP data, with moving sea surface and
blank bins with no data near surface. If no sea surface height is specified,
it is assumed to be at z=0 for all times.
INPUT:
u: velocity, 2D array with shape N rows, M columns
z: measurement depths (z=0 is surface, negative below surface), array of length M
tr: tracer values (N rows, M columns corresponding to depths z)
ssh: sea surface height (optional, set to zero if None or where value is undefined)
surface: boundary condition for surface
'mixed' (default) or 'extrap'
OUTPUT:
trflux - tracer flux integrated from surface to first zero crossing of velocity profile
tr0 - tracer value at first zero crossing of velocity profile
'''
# ensure that inputs are arrays of floats
x = np.array(u).astype('float')
z = np.array(z).astype('float')
tr = np.array(tr).astype('float')
if np.ndim(x) == 1:
x = x[np.newaxis]
tr = tr[np.newaxis]
ni,nj = np.shape(x)
# If SSH not specified, create an array of zeros
if ssh is None:
ssh = np.zeros(ni)
ssh = np.array(ssh)
if np.ndim(ssh) == 0:
ssh = ssh[np.newaxis]
# ssh in 2D column array
ssh2 = np.array([ssh]).T
# depths in 2D array
sorti = np.argsort(z)
zsort = z[sorti] # sort depths
zs2 = np.tile(zsort,[ni,1])
xs2 = x[:,sorti] # sort data in same manner as depths
trs2 = tr[:,sorti] # sort data in same manner as depths
# make sure that there is at least one zero crossing
# just below the deepest level
eps = np.finfo(float).eps
zend = zs2[:,0] - np.sqrt(eps)
xend = -np.sign(xs2[:,0])*np.sqrt(eps)
trend = trs2[:,0]
zend = zend[:,np.newaxis]
xend = xend[:,np.newaxis]
trend = trend[:,np.newaxis]
# new 2D x and z arrays to work with, with bottom and surface included
zmat = np.hstack([zend,zs2,ssh2])
nans2 = np.nan*np.ones([ni,1])
xmat = np.hstack([xend,xs2,nans2])
trmat = np.hstack([trend,trs2,nans2])
# only do calculations for rows where finite data exist
fini = np.isfinite(xmat)
ii, = np.where(np.sum(fini,axis=1) > 0)
# find where depths are higher than sea surface or where there is no data,
# mask with NaN Values
xmatz = np.copy(xmat)
xmatz[ii,-1] = 0.
msk = (zmat > ssh2) | np.isnan(xmatz)
zmatnan = np.copy(zmat)
if np.any(msk):
zmatnan[msk] = np.nan
# sort each row of arrays by depth
sj = np.argsort(zmatnan)
si = np.arange(np.shape(zmat)[0])[:,np.newaxis]
zmats = zmatnan[si,sj]
xmats = xmat[si,sj]
trmats = trmat[si,sj]
# column index of surface in each row where data exists
jj = (np.sum(np.isfinite(zmats),axis=1)-1)[ii]
# calculate surface value
if surface == 'mixed':
xmats[ii,jj] = xmats[ii,jj-1]
trmats[ii,jj] = trmats[ii,jj-1]
elif surface == 'extrap':
xmats[ii,jj] = (xmats[ii,jj-1]-xmats[ii,jj-2])*(zmats[ii,jj]-zmats[ii,jj-2]) \
/(zmats[ii,jj-1]-zmats[ii,jj-2]) \
+ xmats[ii,jj-2]
trmats[ii,jj] = (trmats[ii,jj-1]-trmats[ii,jj-2])*(zmats[ii,jj]-zmats[ii,jj-2]) \
/(zmats[ii,jj-1]-zmats[ii,jj-2]) \
+ trmats[ii,jj-2]
else:
raise ValueError('surface_transport: surface condition not understood (should be \'mixed\' or \'extrap\')')
# Flip 2D array so that surface is first column
xmatr = np.fliplr(xmats)
zmatr = np.fliplr(zmats)
trmatr = np.fliplr(trmats)
# Find index just above first zero crossing
zj = np.argmax(np.sign(xmatr[:,:-1]*xmatr[:,1:]) == -1,axis=1)
# Compute depth of first zero crossing using linear interpolation
i = range(ni)
zs = zmatr[i,zj] - xmatr[i,zj]*(zmatr[i,zj]-zmatr[i,zj+1])/(xmatr[i,zj]-xmatr[i,zj+1])
# Compute tracer value at first zero crossing using linear interpolation
trs = trmatr[i,zj] - xmatr[i,zj]*(trmatr[i,zj]-trmatr[i,zj+1])/(xmatr[i,zj]-xmatr[i,zj+1])
# create rectangular array, where columns are integers starting from zero
jmat = np.arange(np.shape(xmatr)[1])*np.ones(np.shape(xmatr)[0])[:,np.newaxis]
# use to create mask of depths between surface and zero crossing
mask = np.less_equal(jmat,zj[:,np.newaxis])
# Replace data below first zero crossing with zeros
xmatr = mask*xmatr
trmatr = mask*trmatr
# Use depth of first zero crossing for zero velocity closest to surface
zmatr[i,zj+1] = zs
trmatr[i,zj+1] = trs
# integrate vertically using trapezoidal rule
xm = 0.5*(xmatr[:,:-1]+xmatr[:,1:])
trm = 0.5*(trmatr[:,:-1]+trmatr[:,1:])
dz = -np.diff(zmatr,axis=1) # negative because depths are decreasing
trflux = np.nansum(xm*trm*dz,axis=1)
badi, = np.where(np.isnan(np.nanmax(xm,axis=1)))
trflux[badi] = np.nan
zs[badi] = np.nan
return trflux, trs
[docs]
def lombscargle(t,x,ofac=4,hifac=1,t0=None,return_onesided=True,return_zero=False, window='boxcar',scaling='classical'):
'''
Compute the discrete Fourier transform and periodogram for unevenly-spaced
data using the Lomb-Scargle periodogram. Follows methods outlined in Scargle
(1989).
INPUTS
t - array of numerical time values (length N)
x - array of data values, may be complex (length N)
RETURNS
f - array of frequencies
ftx - array of complex coefficients
discrete Fourier transform of x
px - periodogram of ftx, proportional to |ftx|**2
with the default "classical" scaling used in Scargle (1989),
px = (1/N)*|ftx|**2
OPTIONAL PARAMETERS
ofac - oversampling parameter
ratio of number of frequencies used to number of samples in x
(default 4)
hifac - high frequency parameter
ratio of highest frequency to pseudo-Nyquist frequency
(default 1)
t0 - time origin
reference point for phase calculation
(default - None, first value in t array is used)
return_onesided - boolean for returning a one-sided spectrum
If True, return a one-sided spectrum for real data.
If False return a two-sided spectrum for real data.
Note that for complex data, a two-sided spectrum is always returned.
(default - True)
return_zero - boolean for evaluating zero frequency
If True, include zero frequency. If False, do not include zero frequency.
Uses expressions for the limit as frequency approaches zero, following
Scargle (1989).
(default - False)
window - String specifying desired window to use. See `scipy.signal.get_window` for
a list of windows and required parameters.
scaling - Selects between computing the classical periodogram used by Scargle
('classical') or the power spectral density ('density'). The classical
periodogram has units of x**2. The power spectral density has units of x**2/f.
The scaling determines how the periodogram px is calculated from the discrete
Fourier transform ftx:
'classical': px = (1/N)*|ftx|**2, where N is the number of samples
'density': px = (deltat/N)*|ftx|**2, where deltat is the average time step
REFERENCE
Scargle, J.D. (1989) Studies in astronomical time series analysis III: Fourier transforms,
autocorrelation functions, and cross-correlation functions of unevenly spaced data. The
Astrophysical Journal, 343, 874-887
'''
i = 1j # square root of -1
N = len(x) # number of samples
wts = signal.get_window(window,N)
wts = N*wts/np.sum(wts) # make sum of weights equal to N
x = x*wts # apply window
intm = np.mean(np.diff(t))
flo = ((intm)**-1)/(len(x)*ofac) # lowest freq
fhi = hifac*(2*intm)**-1 # highest freq
f = np.arange(flo,fhi+flo,flo)
if return_zero == True:
f = np.append(0,f)
# if complex, evaluate two-sided spectrum regardless of user choice
if np.any(np.iscomplex(x)):
return_onesided = False
# two-sided spectrum
if return_onesided == False:
if return_zero == True:
f = np.append(-f[1:][::-1],f)
else:
f = np.append(-f[::-1],f)
# time origin (reference point for phase calculation)
if t0 is None:
t0 = t[0]
# initialize DFT as array of complex numbers
ftx = np.nan*np.ones(len(f)) + i*np.nan*np.ones(len(f))
for k,fk in enumerate(f):
wrun = 2*np.pi*fk # angular frequency
if fk == 0:
# use well-defined limit as frequency approaches zero
tau = np.sum(t)/N
ftx[k] = np.sum(x)/np.sqrt(N)
else:
Fo = ((N/2)**0.5)*np.exp(-i*wrun*t0)
tau = np.arctan2(np.sum(np.sin(2*wrun*t)),np.sum(np.cos(2*wrun*t)))/(2*wrun)
tprime = t - tau
A = np.sum(np.cos(wrun*tprime)**2)**-0.5
B = np.sum(np.sin(wrun*tprime)**2)**-0.5
# Note apparent typo in Scargle (1989), which has a plus
# sign (+) instead of a minus sign below. This only makes a
# difference in the periodogram if the input values in x are complex.
ftx[k] = Fo*np.sum(A*x*np.cos(wrun*tprime) - i*B*x*np.sin(wrun*tprime))
if scaling == 'classical':
px = np.abs(ftx)**2/N
elif scaling == 'density':
px = np.abs(ftx)**2/N*intm
else:
raise ValueError('Scaling argument not understood. Acceptable options are classical or density')
return f, ftx, px
if __name__ == '__main__':
# Test princax function
u = [1,2,4,5,np.nan]
v = [1,2,3,5,6]
theta,major,minor = princax(u,v)
theta,major,minor = princax(np.array(u)+1j*np.array(v))
mat_theta = 43.0138 # From Matlab output
mat_major = 2.4763
mat_minor = 0.3432
test = np.isclose(np.array([theta,major,minor]),
np.array([mat_theta,mat_major,mat_minor]),
atol = 1e-4)
if test.all():
print('princax test: passed')
else:
raise ValueError('princax test: failed')
# Test rot function
x = [1,0,-1]
y = [0,1,0]
xr,yr = rot(x,y,90)
test1 = np.isclose(xr,[0,-1,0])
test2 = np.isclose(yr,[1,0,-1])
if test1.all() & test2.all():
print('rot test: passed')
else:
raise ValueError('rot test: failed')
# test case #1 - depth avg
u1= np.array([ np.nan, np.nan, np.nan, np.nan, np.nan,
np.nan, np.nan, np.nan, -0.0018506 , 0.00057345,
-0.00027954, 0.00304925, 0.0056888 , 0.01057738, -0.00096978,
0.00614675, 0.00302453, -0.00028928, 0.00077288, -0.00768713,
-0.01823976, 0.00612571, -0.00397687, -0.00580832, -0.00833382,
0.0017868 , -0.00530538, -0.01031236])
u2 = np.array([ np.nan, np.nan, np.nan, np.nan, np.nan,
np.nan, np.nan, np.nan, 0.00150274, 0.00745662,
0.00235997, -0.00074239, -0.00298656, 0.00302234, 0.00318832,
-0.00169822, -0.00439177, -0.00226204, -0.00400032, -0.01001337,
-0.00913997, -0.00681736, -0.01331132, -0.00100251, -0.01532928,
-0.01763108, -0.01194093, -0.01909814])
u = np.vstack([u1,u2])
z = np.array([ 1.49, 1.24, 0.99, 0.74, 0.49, 0.24, -0.01, -0.26, -0.51,
-0.76, -1.01, -1.26, -1.51, -1.76, -2.01, -2.26, -2.51, -2.76,
-3.01, -3.26, -3.51, -3.76, -4.01, -4.26, -4.51, -4.76, -5.01, -5.26])
ubar = depthavg(u,z,7)
ub = u[:,::-1]
zb = z[::-1]
ubarb = depthavg(ub,zb,7)
test = (ubar == ubarb)
if test.all():
print('depthavg test: passed')
else:
raise ValueError('depthavg test: failed')
# surface transport test case #1
u1= np.array([ np.nan, np.nan, np.nan, np.nan, np.nan,
np.nan, np.nan, np.nan, -0.0018506 , 0.00057345,
-0.00027954, 0.00304925, 0.0056888 , 0.01057738, -0.00096978,
0.00614675, 0.00302453, -0.00028928, 0.00077288, -0.00768713,
-0.01823976, 0.00612571, -0.00397687, -0.00580832, -0.00833382,
0.0017868 , -0.00530538, -0.01031236])
u2 = np.array([ np.nan, np.nan, np.nan, np.nan, np.nan,
np.nan, np.nan, np.nan, 0.00150274, 0.00745662,
0.00235997, -0.00074239, -0.00298656, 0.00302234, 0.00318832,
-0.00169822, -0.00439177, -0.00226204, -0.00400032, -0.01001337,
-0.00913997, -0.00681736, -0.01331132, -0.00100251, -0.01532928,
-0.01763108, -0.01194093, -0.01909814])
u = np.vstack([u1,u2])
z = np.array([ 1.49, 1.24, 0.99, 0.74, 0.49, 0.24, -0.01, -0.26, -0.51,
-0.76, -1.01, -1.26, -1.51, -1.76, -2.01, -2.26, -2.51, -2.76,
-3.01, -3.26, -3.51, -3.76, -4.01, -4.26, -4.51, -4.76, -5.01, -5.26])
Us,zs = surface_transport(u,z)
test = np.array([np.isfinite(Us).all(),np.isfinite(zs).all()])
if test.all():
print('surface transport/flux test #1: passed')
else:
raise ValueError('surface transport/flux test #1: failed')
# surface transport/flux test case #2
u = np.array([[-1, -1, -1, 1, 1, 1],
[ 1, 2, -1, 4, -1, -2]])
z = np.array([-6, -5, -4, -3, -2, -1])
tr = np.array([[1, 1, -1, -1, -1, -1],
[ 1, 1, 1, 1, 1, 1]])
Us2,zs2 = surface_transport(u,z)
trflux2,trs2 = surface_flux(u,z,tr)
test = np.array([np.isfinite(Us2).all(),np.isfinite(zs2).all(),
np.isfinite(trflux2).all(),np.isfinite(trs2).all(),
zs2[0]>-4,zs2[0]<-3,zs2[1]>-3,zs2[1]<-2,
trflux2[0]==-Us2[0],trflux2[1]==Us2[1]])
if test.all():
print('surface transport/flux test #2: passed')
else:
raise ValueError('surface transport/flux test #2: failed')
# surface transport/flux test case #3 (no zero crossing)
z3 = np.array([-2. , -2.5, -3. , -3.5, -4. , -4.5, -5. , -5.5, -6. , -6.5, -7. ,
-7.5, -8. , -8.5, -9. , -9.5])
u3 = np.array(
[ 4.21860615e-04, 2.31305385e-03, 1.31356857e-03,
1.94045833e-03, 2.89933335e-04, 2.86449031e-03,
1.97513672e-03, 6.18211638e-03, 3.36873352e-03,
5.39932390e-03, 5.50479300e-03, 4.40279194e-03,
5.36942569e-03, 6.60713067e-03, 5.74801833e-03,
3.50485563e-03])
tr3 = np.array(
[-0.08513565, -0.08006332, -0.0802531 , -0.07925762, -0.07745643,
-0.07925762, -0.07736153, -0.07698196, -0.07726664, -0.07608137,
-0.07608137, -0.07418528, -0.0730949 , -0.07499098, -0.07608137,
-0.07328469])
Us3,zs3 = surface_transport(u3,z3)
trflux3,trs3 = surface_flux(u3,z3,tr3)
test = np.array([np.isfinite(Us3),np.isfinite(zs3),np.isfinite(trflux3),np.isfinite(trs3)])
if test.all():
print('surface transport/flux test #3: passed')
else:
raise ValueError('surface transport/flux test #3: failed')
# test lomb-scargle function
# use example from Trauth - MATLAB Recipes for Earth Sciences (3rd Ed)
np.random.seed(0)
t = np.arange(0,1000,3)
t = t + np.random.randn(len(t))
t = np.sort(t)
x1 = (0.5*np.sin(2*np.pi*t/100) +
1.0*np.sin(2*np.pi*t/40) +
0.5*np.sin(2*np.pi*t/20))
x2 = (0.5*np.sin(2*np.pi*t/100) +
0.5*np.sin(2*np.pi*t/20))
x = x1
x[149:] = x2[149:]
x = x + 0.5*np.random.randn(len(x))
f,ftx,px_dft = lombscargle(t,x)
px_scipy = signal.lombscargle(t, x, f*2*np.pi)
test = np.isclose(px_dft,px_scipy)
if test.all():
print('Lomb-Scargle test: passed')
else:
raise ValueError('Lomb-Scargle test: failed')