Skip to content

utils

GMM

Source code in changedet/utils.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class GMM:
    def __init__(
        self,
        K: int,
        niter: int = 100,
        *,
        cov_type: str = "full",
        tol: float = 1e-4,
        reg_covar: float = 1e-6,
    ):
        self.n_components = K
        self.cov_type = cov_type
        self.tol = tol
        self.niter = niter
        self.reg_covar = reg_covar

    def init_cluster_params(
        self, X: np.ndarray
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Initialse cluster parameters

        Shape notation:

            N: number of samples
            D: number of features
            K: number of mixture components

        Initialisation method:

            Initialise means to a random data point in X
            Initialse cov to a spherical covariance matrix of variance 1
            Initialse pi to uniform distribution

        Args:
            X (numpy.ndarray): Data matrix of shape (N,D)

        Returns:
            tuple:
                - means (numpy.ndarray): Means array of shape (K, D)
                - cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
                - pi (numpy.ndarray): Mixture weights of shape (K,)
        """

        n_samples, n_features = X.shape
        means = np.zeros((self.n_components, n_features))
        cov = np.zeros((self.n_components, n_features, n_features))

        # Initialise
        # Mean -> random data point
        # Cov -> spherical covariance - all clusters have same diagonal cov
        # matrix and diagonal elements are all equal
        for k in range(self.n_components):
            means[k] = X[np.random.choice(n_samples)]
            cov[k] = np.eye(n_features)

        pi = np.ones(self.n_components) / self.n_components

        return means, cov, pi

    def __repr__(self) -> str:
        return f"GMM(n_components={self.n_components})"

    @staticmethod
    def estimate_full_covariance(
        X: np.ndarray,
        resp: np.ndarray,
        nk: np.ndarray,
        means: np.ndarray,
        reg_covar: float,
    ) -> np.ndarray:
        """Estimate full covariance matrix

        Shape notation:

                N: number of samples
                D: number of features
                K: number of mixture components

        Args:
            X (numpy.ndarray): Data matrix of shape (N, D)
            resp (numpy.ndarray): Responsibility matrix of shape (N,K)
            nk (numpy.ndarray): Total responsibility per cluster of shape (K,)
            means (numpy.ndarray): Means array of shape (K, D)
            reg_covar (float): Regularisation added to diagonal of covariance matrix \
                to ensure positive definiteness

        Returns:
            cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
        """
        n_components, n_features = means.shape
        cov = np.empty((n_components, n_features, n_features))
        for k in range(n_components):
            delta = X - means[k]
            cov[k] = (
                np.dot(resp[:, k] * delta.T, delta) / nk[k]
                + np.eye(n_features) * reg_covar
            )
        return cov

    @staticmethod
    def estimate_tied_covariance(
        X: np.ndarray,
        resp: np.ndarray,
        nk: np.ndarray,
        means: np.ndarray,
        reg_covar: float,
    ) -> np.ndarray:
        """Estimate tied covariance matrix

        Shape notation:

                N: number of samples
                D: number of features
                K: number of mixture components

        Args:
            X (numpy.ndarray): Data matrix of shape (N, D)
            resp (numpy.ndarray): Responsibility matrix of shape (N,K)
            nk (numpy.ndarray): Total responsibility per cluster of shape (K,)
            means (numpy.ndarray): Means array of shape (K, D)
            reg_covar (float): Regularisation added to diagonal of covariance matrix \
                to ensure positive definiteness

        Returns:
            cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
        """
        n_components, n_features = means.shape
        avg_X2 = np.dot(X.T, X)
        avg_means2 = np.dot(nk * means.T, means)
        cov = (avg_X2 - avg_means2) / nk.sum() + np.eye(n_features) * reg_covar

        # Convert (D,D) cov to (K,D,D) cov where all K cov matrices are equal
        cov = np.repeat(cov[np.newaxis], n_components, axis=0)
        return cov

    def e_step(
        self,
        X: np.ndarray,
        resp: np.ndarray,
        means: np.ndarray,
        cov: np.ndarray,
        pi: np.ndarray,
        sample_inds: ArrayLike,
    ) -> tuple[np.ndarray, np.ndarray]:
        """Expectation step

        Shape notation:

            N: number of samples
            D: number of features
            K: number of mixture components

        Args:
            X (numpy.ndarray): Data matrix of shape (N, D)
            resp (numpy.ndarray): Responsibility matrix of shape (N,K)
            means (numpy.ndarray): Means array of shape (K, D)
            cov (numpy.ndarray): Covariance matrix of shape (K,D,D) - full
            pi (numpy.ndarray): Mixture weights of shape (K,)
            sample_inds (ArrayLike): Samples to be considered

        Returns:
            tuple:
                - resp (numpy.ndarray): Responsibility matrix of shape (N,K)
                - wpdf (numpy.ndarray): Unnormalised responsibility matrix of shape (N,K)
        """
        for k in range(self.n_components):
            resp[sample_inds, k] = pi[k] * multivariate_normal.pdf(
                X[sample_inds], means[k], cov[k]
            )
        wpdf = resp.copy()  # For log likelihood computation
        # Safe normalisation
        a = np.sum(resp, axis=1, keepdims=True)
        idx = np.where(a == 0)[0]
        a[idx] = 1.0
        resp = resp / a
        return resp, wpdf

    def m_step(
        self, X: np.ndarray, resp: np.ndarray
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Maximisation step

        Shape notation:

            N: number of samples
            D: number of features
            K: number of mixture components

        Args:
            X (numpy.ndarray): Data matrix of shape (N, D)
            resp (numpy.ndarray): Responsibility matrix of shape (N,K)

        Returns:
            tuple:
                - means (numpy.ndarray): Means array of shape (K, D)
                - cov (numpy.ndarray): Covariance matrix of shape (K,D,D) - full
                - pi (numpy.ndarray): Mixture weights of shape (K,)
        """
        # M step
        n_samples, _ = X.shape
        nk = resp.sum(axis=0)
        means = np.dot(resp.T, X) / nk[:, np.newaxis]
        pi = nk / n_samples

        if self.cov_type == "tied":
            cov = self.estimate_tied_covariance(X, resp, nk, means, self.reg_covar)
        else:
            cov = self.estimate_full_covariance(X, resp, nk, means, self.reg_covar)

        return means, pi, cov

    def fit(
        self,
        X: np.ndarray,
        resp: Optional[np.ndarray] = None,
        sample_inds: Optional[ArrayLike] = None,
    ) -> np.ndarray:
        """
        Fit a GMM to X with initial responsibility resp. If sample_inds are specified, only those
        indexes are considered.


        Args:
            X (numpy.ndarray): Data matrix
            resp (numpy.ndarray, optional): Initial responsibility matrix. Defaults to None.
            sample_inds (ArrayLike, optional): Sample indexes to be considered. Defaults to None.

        Returns:
            resp (numpy.ndarray): Responsibility matrix
        """

        n_samples, _ = X.shape

        if sample_inds is None:
            sample_inds = range(n_samples)

        means, cov, pi = self.init_cluster_params(X)
        lls = []

        if resp is None:
            resp = np.random.rand(n_samples, self.n_components)
            resp /= resp.sum(axis=1, keepdims=True)
        else:
            means, pi, cov = self.m_step(X, resp)

        # EM algorithm
        for i in range(self.niter):
            # resp_old = resp + 0.0

            # E step
            resp, wpdf = self.e_step(X, resp, means, cov, pi, sample_inds)
            # M step
            means, pi, cov = self.m_step(X, resp)

            # resp_flat = resp.ravel()
            # resp_old_flat = resp_old.ravel()
            # idx = np.where(resp.flat)[0]
            # ll = np.sum(resp_old_flat[idx] * np.log(resp_flat[idx]))
            ll = np.log(wpdf.sum(axis=1)).sum()
            lls.append(ll)

            # print(f"Log-likelihood:{ll}")
            if i > 1 and np.abs(lls[i] - lls[i - 1]) < self.tol:
                # print("Exiting")
                break

        return resp, means, cov, pi

e_step(X, resp, means, cov, pi, sample_inds)

Expectation step

Shape notation:

1
2
3
N: number of samples
D: number of features
K: number of mixture components

Parameters:

Name Type Description Default
X ndarray

Data matrix of shape (N, D)

required
resp ndarray

Responsibility matrix of shape (N,K)

required
means ndarray

Means array of shape (K, D)

required
cov ndarray

Covariance matrix of shape (K,D,D) - full

required
pi ndarray

Mixture weights of shape (K,)

required
sample_inds ArrayLike

Samples to be considered

required

Returns:

Name Type Description
tuple tuple[ndarray, ndarray]
  • resp (numpy.ndarray): Responsibility matrix of shape (N,K)
  • wpdf (numpy.ndarray): Unnormalised responsibility matrix of shape (N,K)
Source code in changedet/utils.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def e_step(
    self,
    X: np.ndarray,
    resp: np.ndarray,
    means: np.ndarray,
    cov: np.ndarray,
    pi: np.ndarray,
    sample_inds: ArrayLike,
) -> tuple[np.ndarray, np.ndarray]:
    """Expectation step

    Shape notation:

        N: number of samples
        D: number of features
        K: number of mixture components

    Args:
        X (numpy.ndarray): Data matrix of shape (N, D)
        resp (numpy.ndarray): Responsibility matrix of shape (N,K)
        means (numpy.ndarray): Means array of shape (K, D)
        cov (numpy.ndarray): Covariance matrix of shape (K,D,D) - full
        pi (numpy.ndarray): Mixture weights of shape (K,)
        sample_inds (ArrayLike): Samples to be considered

    Returns:
        tuple:
            - resp (numpy.ndarray): Responsibility matrix of shape (N,K)
            - wpdf (numpy.ndarray): Unnormalised responsibility matrix of shape (N,K)
    """
    for k in range(self.n_components):
        resp[sample_inds, k] = pi[k] * multivariate_normal.pdf(
            X[sample_inds], means[k], cov[k]
        )
    wpdf = resp.copy()  # For log likelihood computation
    # Safe normalisation
    a = np.sum(resp, axis=1, keepdims=True)
    idx = np.where(a == 0)[0]
    a[idx] = 1.0
    resp = resp / a
    return resp, wpdf

estimate_full_covariance(X, resp, nk, means, reg_covar) staticmethod

Estimate full covariance matrix

Shape notation:

1
2
3
    N: number of samples
    D: number of features
    K: number of mixture components

Parameters:

Name Type Description Default
X ndarray

Data matrix of shape (N, D)

required
resp ndarray

Responsibility matrix of shape (N,K)

required
nk ndarray

Total responsibility per cluster of shape (K,)

required
means ndarray

Means array of shape (K, D)

required
reg_covar float

Regularisation added to diagonal of covariance matrix to ensure positive definiteness

required

Returns:

Name Type Description
cov ndarray

Covariance matrix of shape (K,D,D)

Source code in changedet/utils.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@staticmethod
def estimate_full_covariance(
    X: np.ndarray,
    resp: np.ndarray,
    nk: np.ndarray,
    means: np.ndarray,
    reg_covar: float,
) -> np.ndarray:
    """Estimate full covariance matrix

    Shape notation:

            N: number of samples
            D: number of features
            K: number of mixture components

    Args:
        X (numpy.ndarray): Data matrix of shape (N, D)
        resp (numpy.ndarray): Responsibility matrix of shape (N,K)
        nk (numpy.ndarray): Total responsibility per cluster of shape (K,)
        means (numpy.ndarray): Means array of shape (K, D)
        reg_covar (float): Regularisation added to diagonal of covariance matrix \
            to ensure positive definiteness

    Returns:
        cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
    """
    n_components, n_features = means.shape
    cov = np.empty((n_components, n_features, n_features))
    for k in range(n_components):
        delta = X - means[k]
        cov[k] = (
            np.dot(resp[:, k] * delta.T, delta) / nk[k]
            + np.eye(n_features) * reg_covar
        )
    return cov

estimate_tied_covariance(X, resp, nk, means, reg_covar) staticmethod

Estimate tied covariance matrix

Shape notation:

1
2
3
    N: number of samples
    D: number of features
    K: number of mixture components

Parameters:

Name Type Description Default
X ndarray

Data matrix of shape (N, D)

required
resp ndarray

Responsibility matrix of shape (N,K)

required
nk ndarray

Total responsibility per cluster of shape (K,)

required
means ndarray

Means array of shape (K, D)

required
reg_covar float

Regularisation added to diagonal of covariance matrix to ensure positive definiteness

required

Returns:

Name Type Description
cov ndarray

Covariance matrix of shape (K,D,D)

Source code in changedet/utils.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
@staticmethod
def estimate_tied_covariance(
    X: np.ndarray,
    resp: np.ndarray,
    nk: np.ndarray,
    means: np.ndarray,
    reg_covar: float,
) -> np.ndarray:
    """Estimate tied covariance matrix

    Shape notation:

            N: number of samples
            D: number of features
            K: number of mixture components

    Args:
        X (numpy.ndarray): Data matrix of shape (N, D)
        resp (numpy.ndarray): Responsibility matrix of shape (N,K)
        nk (numpy.ndarray): Total responsibility per cluster of shape (K,)
        means (numpy.ndarray): Means array of shape (K, D)
        reg_covar (float): Regularisation added to diagonal of covariance matrix \
            to ensure positive definiteness

    Returns:
        cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
    """
    n_components, n_features = means.shape
    avg_X2 = np.dot(X.T, X)
    avg_means2 = np.dot(nk * means.T, means)
    cov = (avg_X2 - avg_means2) / nk.sum() + np.eye(n_features) * reg_covar

    # Convert (D,D) cov to (K,D,D) cov where all K cov matrices are equal
    cov = np.repeat(cov[np.newaxis], n_components, axis=0)
    return cov

fit(X, resp=None, sample_inds=None)

Fit a GMM to X with initial responsibility resp. If sample_inds are specified, only those indexes are considered.

Parameters:

Name Type Description Default
X ndarray

Data matrix

required
resp ndarray

Initial responsibility matrix. Defaults to None.

None
sample_inds ArrayLike

Sample indexes to be considered. Defaults to None.

None

Returns:

Name Type Description
resp ndarray

Responsibility matrix

Source code in changedet/utils.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def fit(
    self,
    X: np.ndarray,
    resp: Optional[np.ndarray] = None,
    sample_inds: Optional[ArrayLike] = None,
) -> np.ndarray:
    """
    Fit a GMM to X with initial responsibility resp. If sample_inds are specified, only those
    indexes are considered.


    Args:
        X (numpy.ndarray): Data matrix
        resp (numpy.ndarray, optional): Initial responsibility matrix. Defaults to None.
        sample_inds (ArrayLike, optional): Sample indexes to be considered. Defaults to None.

    Returns:
        resp (numpy.ndarray): Responsibility matrix
    """

    n_samples, _ = X.shape

    if sample_inds is None:
        sample_inds = range(n_samples)

    means, cov, pi = self.init_cluster_params(X)
    lls = []

    if resp is None:
        resp = np.random.rand(n_samples, self.n_components)
        resp /= resp.sum(axis=1, keepdims=True)
    else:
        means, pi, cov = self.m_step(X, resp)

    # EM algorithm
    for i in range(self.niter):
        # resp_old = resp + 0.0

        # E step
        resp, wpdf = self.e_step(X, resp, means, cov, pi, sample_inds)
        # M step
        means, pi, cov = self.m_step(X, resp)

        # resp_flat = resp.ravel()
        # resp_old_flat = resp_old.ravel()
        # idx = np.where(resp.flat)[0]
        # ll = np.sum(resp_old_flat[idx] * np.log(resp_flat[idx]))
        ll = np.log(wpdf.sum(axis=1)).sum()
        lls.append(ll)

        # print(f"Log-likelihood:{ll}")
        if i > 1 and np.abs(lls[i] - lls[i - 1]) < self.tol:
            # print("Exiting")
            break

    return resp, means, cov, pi

init_cluster_params(X)

Initialse cluster parameters

Shape notation:

1
2
3
N: number of samples
D: number of features
K: number of mixture components

Initialisation method:

1
2
3
Initialise means to a random data point in X
Initialse cov to a spherical covariance matrix of variance 1
Initialse pi to uniform distribution

Parameters:

Name Type Description Default
X ndarray

Data matrix of shape (N,D)

required

Returns:

Name Type Description
tuple tuple[ndarray, ndarray, ndarray]
  • means (numpy.ndarray): Means array of shape (K, D)
  • cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
  • pi (numpy.ndarray): Mixture weights of shape (K,)
Source code in changedet/utils.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def init_cluster_params(
    self, X: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Initialse cluster parameters

    Shape notation:

        N: number of samples
        D: number of features
        K: number of mixture components

    Initialisation method:

        Initialise means to a random data point in X
        Initialse cov to a spherical covariance matrix of variance 1
        Initialse pi to uniform distribution

    Args:
        X (numpy.ndarray): Data matrix of shape (N,D)

    Returns:
        tuple:
            - means (numpy.ndarray): Means array of shape (K, D)
            - cov (numpy.ndarray): Covariance matrix of shape (K,D,D)
            - pi (numpy.ndarray): Mixture weights of shape (K,)
    """

    n_samples, n_features = X.shape
    means = np.zeros((self.n_components, n_features))
    cov = np.zeros((self.n_components, n_features, n_features))

    # Initialise
    # Mean -> random data point
    # Cov -> spherical covariance - all clusters have same diagonal cov
    # matrix and diagonal elements are all equal
    for k in range(self.n_components):
        means[k] = X[np.random.choice(n_samples)]
        cov[k] = np.eye(n_features)

    pi = np.ones(self.n_components) / self.n_components

    return means, cov, pi

m_step(X, resp)

Maximisation step

Shape notation:

1
2
3
N: number of samples
D: number of features
K: number of mixture components

Parameters:

Name Type Description Default
X ndarray

Data matrix of shape (N, D)

required
resp ndarray

Responsibility matrix of shape (N,K)

required

Returns:

Name Type Description
tuple tuple[ndarray, ndarray, ndarray]
  • means (numpy.ndarray): Means array of shape (K, D)
  • cov (numpy.ndarray): Covariance matrix of shape (K,D,D) - full
  • pi (numpy.ndarray): Mixture weights of shape (K,)
Source code in changedet/utils.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def m_step(
    self, X: np.ndarray, resp: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Maximisation step

    Shape notation:

        N: number of samples
        D: number of features
        K: number of mixture components

    Args:
        X (numpy.ndarray): Data matrix of shape (N, D)
        resp (numpy.ndarray): Responsibility matrix of shape (N,K)

    Returns:
        tuple:
            - means (numpy.ndarray): Means array of shape (K, D)
            - cov (numpy.ndarray): Covariance matrix of shape (K,D,D) - full
            - pi (numpy.ndarray): Mixture weights of shape (K,)
    """
    # M step
    n_samples, _ = X.shape
    nk = resp.sum(axis=0)
    means = np.dot(resp.T, X) / nk[:, np.newaxis]
    pi = nk / n_samples

    if self.cov_type == "tied":
        cov = self.estimate_tied_covariance(X, resp, nk, means, self.reg_covar)
    else:
        cov = self.estimate_full_covariance(X, resp, nk, means, self.reg_covar)

    return means, pi, cov

InitialChangeMask

Initial Change Mask

Create a change mask to remove strong changes and enable better radiometric normalisation.

References
  • P. R. Marpu, P. Gamba and M. J. Canty, "Improving Change Detection Results of IR-MAD by Eliminating Strong Changes," in IEEE Geoscience and Remote Sensing Letters, vol. 8 no. 4, pp. 799-803, July 2011, doi:10.1109/LGRS.2011.2109697.
Source code in changedet/utils.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class InitialChangeMask:  # pragma: no cover
    """Initial Change Mask

    Create a change mask to remove strong changes and enable better radiometric
    normalisation.


    References
    ----------
    - P. R. Marpu, P. Gamba and M. J. Canty, "Improving Change Detection Results
      of IR-MAD by Eliminating Strong Changes," in IEEE Geoscience and Remote
      Sensing Letters, vol. 8 no. 4, pp. 799-803, July 2011,
      doi:10.1109/LGRS.2011.2109697.
    """

    def __init__(self, mode: str = "hist") -> None:
        self.mode = mode
        self.gmm = GMM(3, cov_type="full")

    @staticmethod
    def plot(mean: np.ndarray, cov: np.ndarray, thresh: float) -> None:

        sigma = np.sqrt(cov)
        mix_names = ["No change", "Ambiguous", "Pure Change"]
        mix_colours = ["r", "g", "b"]
        # f1 = plt.figure()
        for m, sig, name, colour in zip(mean, sigma, mix_names, mix_colours):
            p = np.linspace(m - 3 * sig, m + 3 * sig, 100)
            plt.plot(p, norm.pdf(p, m, sig), label=name, color=colour)
        plt.legend()
        plt.tight_layout()
        plt.margins(0)
        plt.axvline(x=thresh, color="k", linestyle="--")
        plt.show()

    def prepare(self, im1: np.ndarray, im2: np.ndarray, plot: bool = True) -> np.ndarray:
        # Linear stretch
        im1 = contrast_stretch(im1, stretch_type="percentile")
        im2 = contrast_stretch(im2, stretch_type="percentile")

        ch1, r1, c1 = im1.shape

        m = r1 * c1
        N = ch1

        im1r = im1.reshape(N, m).T
        im2r = im2.reshape(N, m).T

        diff = np.abs(im1r - im2r)
        # Max difference
        diff = diff.max(axis=1)[:, np.newaxis]

        _, mean, cov, pi = self.gmm.fit(diff)

        mean = mean.flatten()
        cov = cov.flatten()
        pi = pi.flatten()

        # Sort in ascending order
        idx = np.argsort(mean)
        mean = mean[idx]
        cov = cov[idx]
        pi = pi[idx]

        # Refer https://gist.github.com/ashnair1/433ffbc1e747f80067f8a0439e346279
        # for derivation of the equation

        # TODO: Computing roots via this method results in invalid thresholds.
        # In theory, this and the current method should yield same results.

        # k = np.log((np.sqrt(cov[0]) * pi[1]) / (np.sqrt(cov[1]) * pi[0]))
        # a = cov[1] - cov[0]
        # b = -2 * (mean[0] * cov[1] - mean[1] * cov[0])
        # c = mean[0] ** 2 * cov[1] - mean[1] ** 2 * cov[0] + 2 * k * (cov[0] * cov[1])
        # roots = np.roots([a, b, c])

        roots = self.roots(mean, cov, pi)

        m1 = mean[0]
        m2 = mean[1]
        s1 = roots[0]
        s2 = roots[1]

        thresh = (
            ((m1 > m2) * (m1 > s1) * (m2 < s1) * s1)
            + ((m1 > m2) * (m1 > s2) * (m2 < s2) * s2)
            + ((m2 > m1) * (m2 > s1) * (m1 < s1) * s1)
            + ((m2 > m1) * (m2 > s2) * (m1 < s2) * s2)
        )

        # Plot distributions and threshold
        if plot:
            self.plot(mean, cov, thresh)

        if not thresh:
            return None

        icm = np.where(diff < thresh, 0, 1)
        icm = icm.reshape(r1, c1)
        return icm

    @staticmethod
    def roots(mean: np.ndarray, var: np.ndarray, pi: np.ndarray) -> tuple[float, float]:
        """Compute the threshold between the no-change and change distributions
        from the mean, variance and mixture weight (pi) of no change, change and
        ambigous distributions.

        Refer this [gist](<https://gist.github.com/ashnair1/433ffbc1e747f80067f8a0439e346279>)
        for full derivation.

        Args:
            mean (np.ndarray): means of distributions
            var (np.ndarray): variances of distributions
            pi (np.ndarray): mixture weights

        Returns:
            Tuple[float, float]: thresholds
        """
        std1 = np.sqrt(var[0])
        std2 = np.sqrt(var[1])
        k = np.log((std1 * pi[1]) / (std2 * pi[0]))
        n1 = var[1] * mean[0] - var[0] * mean[1]
        n2 = np.sqrt(
            var[0] * var[1] * (mean[0] - mean[1]) ** 2 + 0.5 * k * (var[0] - var[1])
        )
        d1 = var[1] - var[0]
        root1 = (n1 + n2) / d1
        root2 = (n1 - n2) / d1
        return root1, root2

roots(mean, var, pi) staticmethod

Compute the threshold between the no-change and change distributions from the mean, variance and mixture weight (pi) of no change, change and ambigous distributions.

Refer this gist for full derivation.

Parameters:

Name Type Description Default
mean ndarray

means of distributions

required
var ndarray

variances of distributions

required
pi ndarray

mixture weights

required

Returns:

Type Description
tuple[float, float]

Tuple[float, float]: thresholds

Source code in changedet/utils.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@staticmethod
def roots(mean: np.ndarray, var: np.ndarray, pi: np.ndarray) -> tuple[float, float]:
    """Compute the threshold between the no-change and change distributions
    from the mean, variance and mixture weight (pi) of no change, change and
    ambigous distributions.

    Refer this [gist](<https://gist.github.com/ashnair1/433ffbc1e747f80067f8a0439e346279>)
    for full derivation.

    Args:
        mean (np.ndarray): means of distributions
        var (np.ndarray): variances of distributions
        pi (np.ndarray): mixture weights

    Returns:
        Tuple[float, float]: thresholds
    """
    std1 = np.sqrt(var[0])
    std2 = np.sqrt(var[1])
    k = np.log((std1 * pi[1]) / (std2 * pi[0]))
    n1 = var[1] * mean[0] - var[0] * mean[1]
    n2 = np.sqrt(
        var[0] * var[1] * (mean[0] - mean[1]) ** 2 + 0.5 * k * (var[0] - var[1])
    )
    d1 = var[1] - var[0]
    root1 = (n1 + n2) / d1
    root2 = (n1 - n2) / d1
    return root1, root2

check_pos_semi_def(mat)

Test whether matrix is positive semi-definite

Ref:

Parameters:

Name Type Description Default
mat ndarray

Input matrix

required

Returns:

Name Type Description
bool bool

True if mat is positive semi-definite else False

Source code in changedet/utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def check_pos_semi_def(mat: np.ndarray) -> bool:
    """Test whether matrix is positive semi-definite

    Ref:

    - <https://scicomp.stackexchange.com/a/12984/39306>
    - <https://stackoverflow.com/a/63911811/10800115>

    Args:
        mat (np.ndarray): Input matrix

    Returns:
        bool: True if mat is positive semi-definite else False
    """

    try:
        reg_mat = mat + np.eye(mat.shape[0]) * 1e-3
        np.linalg.cholesky(reg_mat)
        return True
    except np.linalg.LinAlgError:
        return False

contrast_stretch(img, *, target_type='uint8', stretch_type='minmax', percentile=(2, 98))

Change image distribution to cover full range of target_type.

Types of contrast stretching: - minmax (Default) - percentile

Parameters:

Name Type Description Default
img ndarray

Input image

required
target_type dtype

Target type of rescaled image. Defaults to "uint8".

'uint8'
stretch_type str

Types of contrast stretching. Defaults to "minmax".

'minmax'
percentile tuple

Cut off percentiles if stretch_type = "percentile". Defaults to (2, 98).

(2, 98)

Returns:

Name Type Description
scaled ndarray

Rescaled image

Source code in changedet/utils.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
def contrast_stretch(
    img: np.ndarray,
    *,
    target_type: str = "uint8",
    stretch_type: str = "minmax",
    percentile: tuple[int, int] = (2, 98),
) -> np.ndarray:
    """Change image distribution to cover full range of target_type.

    Types of contrast stretching:
    - minmax (Default)
    - percentile

    Args:
        img (numpy.ndarray): Input image
        target_type (dtype): Target type of rescaled image. Defaults to "uint8".
        stretch_type (str): Types of contrast stretching. Defaults to "minmax".
        percentile (tuple): Cut off percentiles if stretch_type = "percentile". Defaults to (2, 98).

    Returns:
        scaled (numpy.ndarray): Rescaled image
    """

    type_info = np.iinfo(target_type)
    minout = type_info.min
    maxout = type_info.max

    if stretch_type == "percentile":
        lower, upper = np.nanpercentile(img, percentile)
    else:
        lower = np.min(img)
        upper = np.max(img)

    # Contrast Stretching
    a = (maxout - minout) / (upper - lower)
    b = minout - a * lower
    g = a * img + b
    return np.clip(g, minout, maxout)

histplot(xlist, xlabel, bins=50)

Plot multiple histograms in the same figure

Parameters:

Name Type Description Default
xlist arraylike

Sequence

required
xlabel list[str]

Sequence label

required
bins int

Histogram bins. Defaults to 50.

50

Returns:

Type Description
Figure

matplotlib.figure.figure: Figure with histograms

Source code in changedet/utils.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def histplot(xlist: ArrayLike, xlabel: list[str], bins: Optional[int] = 50) -> Figure:
    """Plot multiple histograms in the same figure

    Args:
        xlist (arraylike): Sequence
        xlabel (list[str]): Sequence label
        bins (int, optional): Histogram bins. Defaults to 50.

    Returns:
        matplotlib.figure.figure: Figure with histograms
    """
    f = plt.figure()
    for i, j in zip(xlist, xlabel):
        plt.hist(i[:, :, 0].flatten(), bins=bins, label=j)
    plt.legend()
    return f

init_logger(name='logger', output=None)

Initialise changedet logger

Parameters:

Name Type Description Default
name str

Name of this logger. Defaults to "logger".

'logger'
output str

Path to folder/file to write logs. If None, logs are not written

None
Source code in changedet/utils.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def init_logger(name: str = "logger", output: Optional[str] = None) -> logging.Logger:
    """
    Initialise changedet logger

    Args:
        name (str, optional): Name of this logger. Defaults to "logger".
        output (str, optional): Path to folder/file to write logs. If None, logs are not written
    """
    logger = logging.getLogger(name=name)
    logger.setLevel(logging.DEBUG)

    # Output logs to terminal
    streamhandler = logging.StreamHandler(sys.stdout)
    streamhandler.setLevel(logging.INFO)
    streamhandler.setFormatter(_ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S"))
    logger.addHandler(streamhandler)

    # Output logs to file
    if output:
        output_path = Path(output)
        logfile = (
            output_path
            if output_path.suffix in [".txt", ".log"]
            else output_path / "log.txt"
        )
        Path.mkdir(output_path.parent)

        filehandler = logging.FileHandler(logfile)
        filehandler.setLevel(logging.DEBUG)
        filehandler.setFormatter(_ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S"))
        logger.addHandler(filehandler)
    return logger

np_weight_stats(x, ws=None)

Calculate weighted mean and sample covariance.

Parameters:

Name Type Description Default
x ndarray

Data matrix of shape (N,D)

required
ws ndarray

Weight vector of shape (N,). Defaults to None

None

Returns:

Name Type Description
tuple tuple[ndarray, ndarray]
  • wsigma (numpy.ndarray): Weighted covariance matrix
  • wmean (numpy.ndarray): Weighted mean
Source code in changedet/utils.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
def np_weight_stats(
    x: np.ndarray, ws: Optional[np.ndarray] = None
) -> tuple[np.ndarray, np.ndarray]:
    """Calculate weighted mean and sample covariance.

    Args:
        x (numpy.ndarray): Data matrix of shape (N,D)
        ws (numpy.ndarray, optional): Weight vector of shape (N,). Defaults to None

    Returns:
        tuple:
            - wsigma (numpy.ndarray): Weighted covariance matrix
            - wmean (numpy.ndarray): Weighted mean
    """
    # Uniform weight if ws is unspecified or array of zeros
    if ws is None or not np.any(ws):
        ws = np.ones(x.shape[0])
    mean = np.ma.average(x, axis=0, weights=ws)
    wmean = np.expand_dims(mean.data, axis=1)  # (H*W,) -> (H*W,1)
    wsigma = np.cov(x, rowvar=False, aweights=ws)
    return wsigma, wmean