Saturday, October 29, 2022

Deep Java Library memory management

This post introduces some techniques to monitor memory usage and leaks in machine learning applications using the Deep Java Learning (DJL) library [1]. This bag of tricks is far from being exhaustive.



DJL is an open source framework to support distributed inference in Java for deep learning models such as MXNet, Tensor flow or PyTorch.
The training of deep learning models may require a very significant amount of floating computations which are best supported by GPUs. However, the memory model in JVM is incompatible with column-based resident memory requires by the GPU. 

Vectorization libraries such as Blast are implemented in C/C++ and support fast execution of linear algebra operations. The ubiquitous Python numerical library, numpy [2] commonly used in data science is a wrapper around these low level math functions. The ND interface, used in DJL, provide Java developers with similar functionality.

Note: The code snippets in this post are written in Scala but can be easily reworked in Java

The basics

Memory types

DJL supports monitoring 3 memory components

  • Resident Set Size (RSS) is the portion of the memory used by a process that is held in RAM memory and cannot be swapped. 
  • Heap is the section of memory used by object dynamically allocated
  • Non-heap is the section encompassing static memory and stack allocation

Tensor representation

Deep learning frameworks operations on tensors. Those tensors are implemented as NDArray objects, created dynamically from array of values (integer, float,...). NDManager is memory collector/manager native to the underlying C++ implementation of the various deep learning frameworks. Its purpose is to create and delete (close) NDArray instances. NDManager has a hierarchical (single root tree) structure the child manager can be spawn from a parent [3].

Let's consider the following, simple example of the computation of the mean of a sequence of floating point values
import ai.djl.ndarray.NDManager

// Set up the memory manager
val ndManager = ndManager.newBaseManager()
    
val input = Array.fill(1000)(Random.nexFloat())
// Allocate resources outside JVM
val ndInput = ndManager.create(input)
val ndMean = ndInput.means()
val mean = ndMean.toFloatArray.head

// Release ND resources
ndManager.close()

The steps implemented in the code snippet are:
  1. instantiates the root resource manager, ndManager
  2. creates an array of 1000 random floating point values
  3. convert into a ND array, ndInput
  4. computes the mean, ndMean
  5. convert back to Java data types
  6. and finally close the root manager.

The root NDManager can be broken down it child managers to allow a finer granularity of allocation and release of resources. The following method, computeMean, instantiates a child manager, subNDManager,  to compute the mean value.  The child manager has to be explicitly closed (releasing associated resources) before the function returns.
The memory associated with the local ND variables, ndInput and ndMean are automatically released when going out of scope.

import ai.djl.ndarray.NDManager

def computeMean(input: Array[Float], ndManager: NDManager): Float = 
  if(input.nonEmpty) {
    val subNDManager = ndManager.newSubManager()
    val ndInput = ndManager.create(input)
    val ndMean = ndInput.means()
    val mean = ndMean.toFloatArray.head
     
    subNDManager.close()
    mean 
////f// Local resources, ndInput and ndMean are released
     // when going out of scope
  }
  else
    0.0F


JMX to the rescue

The JVM provides developers with the ability to access operating system metrics such as CPU, or heap consumption through the Java Management Extension (JMX) interface [4]

The DJL class, MemoryTrainingListener, leverages JMX monitoring capability, It provides developers with a simple method, collectMemoryInfo to collect metrics

First we need to instruct DJL to enable collection of memory stats as a Java property

System.setProperty("collect-memory", "true")

Similarly to the VisualVM heap memory snapshot, described in the next section, we can collect memory metrics (RSS, Heap and NonHeap) before and after each new NDArray object is created or released. 

 def computeMean(
  input: Array[Float], 
  ndManager: NDManager, 
  metricName: String): Float = {
      
  val manager = ndManager.newSubManager()
    // Initialize a new metrics
  val metrics = new Metrics()

    //  Initialize the collection of memory related metrics
  MemoryTrainingListener.collectMemoryInfo(metrics)
  val initVal = metrics.latestMetric(metricName).getValue.longValue
      
  val ndInput = ndManager.create(input)
  val ndMean = ndInput.mean()

  collectMetric(metrics, initVal, metricName)
  val mean = ndMean.toFloatArray.head

  // Close the output array and collect metrics
  ndMean.close()
  collectMetric(metrics, initVal, metricName)
     
  // Close the input array and collect metrics
  ndInput.close()
  collectMetric(metrics, originalValue, metricName)
      
  // Close the sub manager and collect metrics
  ndManager.close()
  collectMetric(metrics, initVal, metricName)
  mean
}

First we instantiate a Metrics that is passed along all the various snapshots. Given the metrics and current NDManager, we create a base line in heap memory size, initVal.  We then collect the value of the metric for each creation and release of NDArray instances (collectMetric) from our mean computation example.

Here is a simple snapshot method which compute the increase/decrease in heap memory from the base line.
def collectMetric(
  metrics: Metrics, 
  initVal: Long, 
  metricName: String): Unit = {

  MemoryTrainingListener.collectMemoryInfo(metrics)
  val newVal = metrics.latestMetric(metricName).getValue.longValue
  println(s"$metricName: ${(newVal - initVal)/1024.0} KB")
}



Memory leaks detection

I have been a combination of several investigative techniques for estimating the source of a memory leak.

MemoryTrainingListener.debugDump

This method will dump basic memory and CPU stats into a local file for a given metrics

MemoryTrainingListener.debugDump(metrics, outputFile)
  
Output
Heap.Bytes:72387328|#Host:10.5.67.192
Heap.Bytes:74484480|#Host:10.5.67.192
NonHeap.Bytes:39337256|#Host:10.5.67.192
NonHeap.Bytes:40466888|#Host:10.5.67.192
cpu.Percent:262.2|#Host:10.5.67.192
cpu.Percent:262.2|#Host:10.5.67.192
rss.Bytes:236146688|#Host:10.5.67.192
rss.Bytes:244297728|#Host:10.5.67.192

NDManager.cap

It is not uncommon to have a NDArray objects associated with a sub manager not been properly closed. One simple solution is to prevent allocating new objects into the parent manager.

// Protect the parent/root manager from
// accidental allocation of NDArray objects
ndManager.cap()
// Set up the memory manager
val ndManager = ndManager.newBaseManager()

// Results in an error
val ndInput = ndManager.create(input)
  


Profilers

For reference, DJL introduces a set of experimental profilers to support investigation of memory consumption bottlenecks [5]

VisualVM

We select VisualVM [6] among the various JVM profiling solutions to highlight some key statistics in investigating a memory leak.  VisualVM is a utility that is to be downloaded for Oracle site. It is not bundled with JDK.
A simple way to identify excessive memory consumption is taking regular snapshots or dump of the objects allocated from the heap, as illustrated below.







VisualVM has an intuitive UI to drill down into the sequence or composite objects. Besides quantifying memory consumption during inference, the following details view illustrates the hierarchical nature of the ND manager.



Friday, April 8, 2022

Normalized Discounted Cumulative Gain in Scala

Target audience: Advanced
Estimated reading time: 10'






This post illustrates the Normalized Discounted Cumulative Gain (NDCG) and its implementation in Scala.
Numerous real-life applications of machine learning require the prediction the most relevant ranking of items to optimize an outcome. For instance
  • Evaluate and prioritize counter-measures to cyber-attach
  • Ranks symptoms in a clinical trial
  • Extract documents relevant to a given topic from a corpus
The Discounted Cumulative Gain (DCG) and its normalized counter part, Normalized Discounted Cumulative Gain (NDCG) is a metric original applied in textual information retrieval and extended to other domains.
This post uses Scala 2.11.8


Discounted Cumulative Gain

Let's dive into the mathematical formalism for the Discounted Cumulative Gain. 

For a indexed target values tj as illustrated in the diagram above, the discounted cumulative gain is computed as
\[DCG=\sum_{j=1}^{n}\frac{2^{t_{j}}-1}{log_{2}(j+1)}\] The objective is to compare any given list of ranked/sorted item with a benchmark which represent the optimum ranking (ranking with the highest DCG value).
\[p(ranking|IDCG)=\frac{log(2)}{IDCG}\sum_{j=0}^{n}\frac{2^{t_{j}}-1}{log(j+1)}\]


Scala implementation

The implementation of the computation of NDCG in Scala is quite simple, indeed. Given a ranked list of items. The three steps are
  • Compute the IDCG (or normalization factor)from the list
  • Compute the DCG for the list
  • Compute the NDCG = DCG/IDCF
First let's consider list of items, of type T to rank. The method ranking to sort a sample of sorted items is provided as an implicit function. The constructor for NDCG has a single argument: the sample of ranking:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class NDCG[T](
   firstSample: Seq[T])
   (implicit ranking: T => Int) {
  import NDCG._

  val iDCG: Double = normalize

  def score: Double = score(initialSample)

  private def dcg(samples: Seq[T]): Double =
    samples.zipWithIndex.aggregate(0.0)(
      (s, samplej) => s + compute(samplej._2 + 1, ranking(samplej._1))
      , _ + _)


  private def normalize: Double = {
    val sorted = initialSample.zipWithIndex.sortBy{
      case (sample, n) => -ranking(sample)
    }.map( _._1)
    dcg(sorted)
  }
}

The Ideal Discounted Cumulative Gain, iDCG is computed through the normalize method (line 6). iDCG (normalization factor) is computed by first sorting the items of type T by their value in decreasing order (line 16), then scoring this re-sorted list using the dcg method (line 17). 
The computation of the Discounted Cumulative Gain by the method dcg (line 10) is a direct application of the formula described in the previous chapter.
Note: The logarithm function uses a base 2. It is computed as natural log(x)/natural log (2)


Let's now consider a list of items of type Item defined as follows: 

case class Item(id: String, x: Double, rank: Int)

The list of items, itemsList is implicitly ranked through the last attribute, rank.

val itemsList = Seq[Item](
  Item("1", 3.4, 4), Item("2", 1.4, 3),
  Item("3", -0.7, 5), Item("4", 5.2, 2), 
  Item("5", 1.4, 1))

implicit val ranking = (item: Item) => item.rank

It is time to compute the NDCG coefficient for the list of items, by invoking the score method.

val nDCG = new NDCG[Item](itemsList)

println(s"IDCG = ${nDCG.iDCG}")    //45.64
println(s"Score = ${nDCG.score}")  // 0.801

The ideal discounted cumulative gain, iDCG is 45.6: It is the optimum ranking for this list of time. The first sample score a probability of 0.8

Note The DCG of subsequent samples can be computed using the same iDCG value from the same instance of NDCG.

def score(samples: Seq[T]): Double =
  if( samples.size != initialSample.size) 0.0 
  else dcg(samples)/iDCG


Monday, January 24, 2022

Bloom Filter in Scala

Target audience: Intermediate
Estimated reading time: 20'

A brief introduction to the Bloom filter and its implementation in Scala using a cryptographic digest.




Overview

Bloom filter became a popular probabilistic data structure to enable membership queries (object x belonging to set or category Y) a couple of years ago. The main benefit of Bloom filter is to reduce the requirement of large memory allocation by avoiding allocating objects in memory much like HashSet or HashMap. The compact representation comes with a trade-off: although the filter does not allow false negatives it does not guarantee that there is no false positives. 
In other words, a query returns:
  • very high probability that an object belong to a set
  • an object does not belong to a set
A Bloom filter is quite often used as a front end to a deterministic algorithm

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted.


Theory

Let's consider a set A = {a0,.. an-1} of n elements for which a query to determine membership is executed. The data structure consists of a bit vector V of m bits and k completely independent hash functions that are associated to a position in the bit vector. The assignment (or mapping) of hash functions to bits has to follow a uniform distribution. 
The diagram below illustrates the basic mechanism behind the Bloom filter. The set A is defined by the pair a1 and a2. The hash functions h1 and h2 map the elements to bit position (bit set to 1) in the bit vector. The element b has one of the position set to 0 and therefore does not belong to the set. The element c belongs to the set because its associated positions have bits set to 1

However, the algorithm does not prevent false positive. For instance, a bit may have been set to 1 during the insertion of previous elements and the query reports erroneously that the element belongs to the set.
The insertion of an elements depends on the h hash functions, therefore the time needed to add a new element is h (number of hash functions) and independent from size of the bit vector: asymptotic insertion time = O(h). However, the filter requires h bits for each element and is less effective that traditional bit array for small sets.
The probability of false positives decreases as the number n of inserted elements decreases and the size of the bitvector m, increases. The number of hash functions that minimizes the probability of false positives is defined by h = m.ln2/n.


Implementation in Scala

The implementation relies on the MessageDigest java library class to generated the unique hash values. Ancillary methods and condition on methods arguments are ommitted for sake of clarity.
The first step is to define the BloomFilter class and its attributes
  • length Number of entries in the filter (line 2)
  • numHashs Number of hash functions (line 3)
  • algorithm Hashing algorithm with SHA1 as default (line 4)
  • set Array of bytes for entries in the Bloom filter (line 6)
  • digest Digest used to generate hash values (line 7)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class BloomFilter(
  length: Int,
  numHashs: Int, 
  algorithm: String="SHA1") {
    
  val set = new Array[Byte](length)
  val digest = Try(MessageDigest.getInstance(algorithm))

  def add(elements: Array[Any]): Int {}
  final def contains(el: Any): Boolean = {}

  private def hash(value: Int): Int {}
  private def getSet(el: Any): Array[Int] = {}
}

The digest using the message digest of the java library java.security.MessageDigest.
The next step consists of defining the methods to add single generic element add(any: Any) line 8 and array of elements add(elements: Array[Any]) (line 2).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// add an array of elements to the filter
def add(elements: Array[Any]): Int = digest.map(_ => {
   elements.foreach( getSet(_).foreach(set(_) = 1) )
   elements.size
 }).getOrElse(-1)
 
@inline
def add(any: Any): Boolean = this.add(Array[Any](any))
 
final def contains(any: Any): Boolean =
   digest.map( _ => !getSet(el).exists(set(_) !=1))
       .getOrElse(false)

The method contains (line 10) evaluates whether an element is contained in the filter. The method returns
  • true if the filter very likely contains the element
  • false if the filter DOES NOT contain this element
The contains method relies on a accessing an element from the set using the recursive getSet method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def getSet(any: Any): Array[Int] = {
  val newSet = new Array[Int](numHashs)
  newSet.update(0, hash(any.hashCode))
  getSet(newSet, 1)
  newSet
}
 
@scala.annotation.tailrec
def getSet(values: Array[Int], index: Int): Unit =
  if( index < values.size) {
    values.update(index, hash(values(index-1)))
    getSet(values, index+1) // tail recursion
  }
}

Similarly to the add method, the getSet methods has two implementations
  • Generate a new set from any new element (line 1)
  • A recursive call to initialize the Bloom filter with an array if integers (line 9).
The hash method is the core of the Bloom filter: It consists of computing an index of an entry.

def hash(value: Int) : Int = digest.map(d => {
  d.reset
  d.update(value)
  Math.abs(new BigInteger(1, d.digest).intValue) % (set.size -1)
}).getOrElse(-1)

The instance of the MessageDigestclass, digest generates a hash value using either MD5 or SHA-1 algorithm. Tail recursion is used as an alternative to the iterative process to generate the set.

The next code snippet implements a very simple implicit conversion from Int to Array[Byte] conversion (line 5)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
object BloomFilter {
 val NUM_BYTES = 4
 val LAST_BYTE = NUM_BYTES -1
 
 implicit def int2Bytes(value: Int) : Array[Byte] =
    Array.tabulate(NUM_BYTES)(n => {
      val offset = (LAST_BYTE - n) << LAST_BYTE
      ((value >>> offset) & 0xFF).toByte
    })
}

The conversion relies on the manipulation of bits from a 32 bit Integer to 4 bytes (line 6 - 8). Alternatively, you may consider a conversion from a long value to a 8 byte array.


Usage

This simple test consists of checking if a couple of values are indeed contains in the set. The filter will definitively reject 22 and very likely accept 23. If the objective is to confirm that 23 belongs to the set, then a full-fledged hash table would have to be used.

val filter = new BloomFilter(100, 100, "SHA")
final val newValues = Array[Any](57, 97, 91, 23, 67,33)  
                                
filter.add(newValues)

println( filter.contains(22) )
println( filter.contains(23) )


Performance evaluation
Let's look at the behavior of the bloom filter under load. The test consists of adding 100,000,000 new random values then test if the filter contains a value (10,000) times. The test is run 10 times after a warm up of the JVM.

final val newValues = Array[Any](57, 97, 91, 23, 67,33)                                  
  // Measure average time to add a new data set
filter.add(Array.tabulate(size)(n => Random.nextInt(n + 1)))

  // Measure average time to test for a value.
filter.contains(newValues(Random.nextInt(newValues.size)))

The first performance test evaluates the average time required to insert a new element into a Bloom filter which size range from 100M to 1Billion entries.
The second test evaluates the average search/query time for bloom filters with same range of size.



As expected the average time to load a new set of values and check the filter contains a specific value is fairly constant.


References

Saturday, September 11, 2021

Automating the configuration of a GAN in PyTorch

Target audience: Expert
Estimated reading time: 60'



This post illustrates the automation of creating deep convolutional generative adversarial networks (DCGAN) by inferring the configuration of generator from the discriminator. We will use the ubiquitous real vs. fake images detection scenario for our GAN model. 



This post does not dwell in details into generative adversarial networks or convolutional networks. It focuses on automating the configuration of some of their components. It is assumed the reader has some basic understanding of convolutional neural networks and Pytorch library.

The challenge

For those not familiar with GANs..... 
GANs are unsupervised learning models that discover patterns in data and use those patterns to generate new samples (data augmentation) that are almost indistinguishable from the original data. GANs are part of the generative models family along with variational auto-encoders or MLE. The approach reframes the problem as a supervised learning problem using two adversarial networks:
  • Generator model trained to generate new samples
  • Discriminator model that attempts to classify the new samples as real (from the original dataset) or fake (generated)
Please refer to the reference section to learn more about generative adversarial networks.

Designing and configuring the generator and discriminator of a generative adversarial networks (GAN) or the encoder and decoder layers of a variational convolutional auto-encoders (VAE) can be a very tedious and repetitive task. 
Actually some of the steps can be fully automated knowing that the generative network of the convolutional GAN for example can be configured as the mirror (or inversion) of the discriminator using a de-convolutional network. The same automation technique applies to the instantiation of a decoder of a VAE given an encoder.
Functional representation of a simple deep convolutional GAN


Neural component reusability is key to generate a de-convolutional network from a convolutional network. To this purpose we break down a neural network into computational blocks.

Convolutional neural blocks

At the highest level, a generative adversarial network is composed of at least two neural networks: A generator and a discriminator.
These two neural networks can be broken down into neural block or group of PyTorch modules: hidden layer, batch normalization, regularization, pooling mode and activation function. Let's consider a discriminator built using a convolutional neural network followed by a fully connected (restricted Boltzmann machine) network. The PyTorch modules associated with any given layer are assembled as a neural block class.
A PyTorch modules of the convolutional neural block are:
  • Conv2d: Convolutional layer with input, output channels, kernel, stride and padding
  • Dropout: Drop-out regularization layer
  • BatchNorm2d: Batch normalization module
  • MaxPool2d Pooling layer
  • ReLu, Sigmoid, ... Activation functions

Representation of a convolutional neural block

The constructor for the neural block initializes all its parameters and its modules in the proper oder. For the sake of simplicity, regularization elements such as drop-out (bagging of sub-network) is omitted.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class ConvNeuralBlock(nn.Module):
  def __init__(self,
      in_channels: int,
      out_channels: int,
      kernel_size: int,
      stride: int,
      padding: int,
      batch_norm: bool,
      max_pooling_kernel: int,
      activation: nn.Module,
      bias: bool,
      is_spectral: bool = False):
    
   super(ConvNeuralBlock, self).__init__()
        
   # Assertions are omitted
   # 1- initialize the input and output channels
   self.in_channels = in_channels
   self.out_channels = out_channels
   self.is_spectral = is_spectral
   modules = []
   
   # 2- create a 2 dimension convolution layer
   conv_module = nn.Conv2d(   
       self.in_channels,
       self.out_channels,
       kernel_size=kernel_size,
       stride=stride,
       padding=padding,
       bias=bias)

   # 6- if this is a spectral norm block
   if self.is_spectral:        
     conv_module = nn.utils.spectral_norm(conv_module)
     modules.append(conv_module)
        
   # 3- Batch normalization
   if batch_norm:               
     modules.append(nn.BatchNorm2d(self.out_channels))
     
   # 4- Activation function
   if activation is not None: 
     modules.append(activation)
        
   # 5- Pooling module
   if max_pooling_kernel > 0:   
     modules.append(nn.MaxPool2d(max_pooling_kernel))
   
   self.modules = tuple(modules)

We considering the case of a generative model for images. The first step (1) is to initialize the number of input and output channels, then create the 2-dimension convolution (2), a batch normalization module (3) an activation function (4) and finally a Max  pooling module (5). The spectral norm regularization (6) is optional.
The convolutional neural network is assembled from convolutional and feedback forward neural blocks, in the following build method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ConvModel(NeuralModel):
  def __init__(self,                    
       model_id: str,
       # 1 Number of input and output unites
       input_size: int,
       output_size: int,
       # 2- PyTorch convolutional modules
       conv_model: nn.Sequential,
       dff_model_input_size: int = -1,
       # 3- PyTorch fully connected
       dff_model: nn.Sequential = None):
        
   super(ConvModel, self).__init__(model_id)
   self.input_size = input_size
   self.output_size = output_size
   self.conv_model = conv_model
   self.dff_model_input_size = dff_model_input_size
   self.dff_model = dff_model
   
  @classmethod
  def build(cls,
      model_id: str,
      conv_neural_blocks: list,  
      dff_neural_blocks: list) -> NeuralModel:
            
   # 4- Initialize the input and output size 
   # for the convolutional layer
   input_size = conv_neural_blocks[0].in_channels
   output_size = conv_neural_blocks[len(conv_neural_blocks) - 1].out_channels

   # 5- Generate the model from the sequence 
   # of conv. neural blocks
   conv_modules = [conv_module for conv_block in conv_neural_blocks
         for conv_module in conv_block.modules]
   conv_model = nn.Sequential(*conv_modules)

   # 6- If a fully connected RBM is included in the model ..
   if dff_neural_blocks is not None and not is_vae:
     dff_modules = [dff_module for dff_block in dff_neural_blocks
        for dff_module in dff_block.modules]
         
     dff_model_input_size = dff_neural_blocks[0].output_size
     dff_model = nn.Sequential(*tuple(dff_modules))
   else:
     dff_model_input_size = -1
     dff_model = None
      
  return cls(
     model_id, 
     conv_dimension, 
     input_size, 
     output_size, 
     conv_model,
     dff_model_input_size, 
     dff_model)

The default constructor (1) initializes the number of input/output channels, the PyTorch modules for the convolutional layers (2) and the fully connected layers (3).
The class method, build, instantiate the convolutional model from the convolutional neural blocks and feed forward neural blocks. It initializes the size of input and output layers from the first and last neural blocks (4), generate the PyTorch convolutional modules (5) and fully-connected layers modules (6) from the neural blocks.
Next we build the de-convolutional neural network from the convolutional blocks.

Inverting a convolutional block

The process to build a GAN is as follow:
  1. Specify components (PyTorch modules) for each convolutional layer 
  2. Assemble these modules into a convolutional neural block
  3. Create a generator and discriminator network by aggregating the blocks
  4. Wire the generator and discriminator to product a fully functional GAN
The goal is create a builder for generating the de-convolutional network implementing the GAN generator from the convolutional network defined in the previous section. 
The first step is to extract the de-convolutional block from an existing convolutional block

Conceptual conversion of a convolutional block into a de-convolutional block

The default constructor for the neural block of a de-convolutional network defines all the key parameters used in the network except the pooling module (not needed). The following code snippet illustrates the instantiation of a De convolutional neural block using the convolution parameters such as number of input, output channels, kernel size, stride and passing, batch normalization and activation function. 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DeConvNeuralBlock(nn.Module):
  def __init__(self,
       in_channels: int,
       out_channels: int,
       kernel_size: int,
       stride: int,
       padding: int,
       batch_norm: bool,
       activation: nn.Module,
       bias: bool) -> object:
    super(DeConvNeuralBlock, self).__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    modules = []
             
    # Two dimension de-convolution layer
    de_conv = nn.ConvTranspose2d(
      self.in_channels,
      self.out_channels,
      kernel_size=kernel_size,
      stride=stride,
      padding=padding,
      bias=bias)
   # Add the deconvolution block
   modules.append(de_conv)

   # Add the batch normalization, if defined
   if batch_norm:         
     modules.append(nn.BatchNorm2d(self.out_channels))
   # Add activation
   modules.append(activation)
   self.modules = modules

Note that the de-convolution block does have any pooling capabilities
The class method, auto_build, takes a convolutional neural block, number of input and output channels and an optional activation function to generate a de-convolutional neural block of type DeConvNeuralBlock. The number of input and output channels in the output deconvolution layer is computed in the private method __resize

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@classmethod
def auto_build(cls,
    conv_block: ConvNeuralBlock,
    in_channels: int,
    out_channels: int = None,
    activation: nn.Module = None) -> nn.Module:
    
  # Extract the parameters of the source convolutional block
  kernel_size, stride, padding, batch_norm, activation = \
     DeConvNeuralBlock.__resize(conv_block, activation)

  # Override the number of input_tensor channels 
  # for this block if defined
  next_block_in_channels = in_channels 
    if in_channels is not None \
    else conv_block.out_channels

  # Override the number of output-channels for 
  # this block if specified
  next_block_out_channels = out_channels 
    if out_channels is not None \
    else conv_block.in_channels
    
  return cls(
        conv_block.conv_dimension,
        next_block_in_channels,
        next_block_out_channels,
        kernel_size,
        stride,
        padding,
        batch_norm,
        activation,
        False)

Sizing de-convolutional layers

The next task consists of computing the size of the component of the de-convolutional block from the original convolutional block. 

@staticmethod
def __resize(
  conv_block: ConvNeuralBlock,
  updated_activation: nn.Module) -> (int, int, int, bool, nn.Module):
  conv_modules = list(conv_block.modules)
    
  # 1- Extract the various components of the 
  # convolutional neural block
  _, batch_norm, activation = DeConvNeuralBlock.__de_conv_modules(conv_modules)
  # 2- override the activation function for the 
  # output layer, if necessary
  if updated_activation is not None:
    activation = updated_activation
    
    # 3- Compute the parameters for the de-convolutional 
    # layer, from the conv. block
    kernel_size, _ = conv_modules[0].kernel_size
    stride, _ = conv_modules[0].stride
    padding = conv_modules[0].padding

 return kernel_size, stride, padding, batch_norm, activation

The __resize method extracts the PyTorch modules for the de-convolutional layers from the original convolutional block (1), adds the activation function to the block (2) and finally initialize the parameters of the de-convolutional (3).

The helper method,  __de_conf_modules, extracts the PyTorch modules related to the convolutional layer, batch normalization module and activation function for the de-convolution from the PyTorch modules of the convolution.

@staticmethod
def __de_conv_modules(conv_modules: list) -> \
        (torch.nn.Module, torch.nn.Module, torch.nn.Module):
  activation_function = None
  deconv_layer = None
  batch_norm_module = None

  # 4- Extract the PyTorch de-convolutional modules 
  # from the convolutional ones
  for conv_module in conv_modules:
    if DeConvNeuralBlock.__is_conv(conv_module):
       deconv_layer = conv_module
    elif DeConvNeuralBlock.__is_batch_norm(conv_module):
       batch_norm_moduled = conv_module
    elif DeConvNeuralBlock.__is_activation(conv_module):
       activation_function = conv_module
  return deconv_layer, batch_norm_module, activation_function



and the height of the two dimension output data is



De-convolutional layers
As expected, the formula to computed the size of the output of a de-convolutional layer is the mirror image of the formula for the output size of the convolutional layer.

and


Assembling the de-convolutional network

Finally, de-convolutional model, of type DeConvModel  is created using the sequence of PyTorch module, de_conv_model. Once again, the default constructor (1) initializes the size of the input layer (2) and output layer (3) and load the PyTorch modules, de_conv_modules, for all de-convolutional layers.

class DeConvModel(NeuralModel, ConvSizeParams):
  def __init__(self,            # 1 - Default constructor
           model_id: str,
           input_size: int,     # 2 - Size first layer
           output_size: int,    # 3 - Size output layer
           de_conv_modules: torch.nn.Sequential):
    super(DeConvModel, self).__init__(model_id)
    self.input_size = input_size
    self.output_size = output_size
    self.de_conv_modules = de_conv_modules


  @classmethod
  def build(cls,
      model_id: str,
      conv_neural_blocks: list,  # 4- Input to the builder
      in_channels: int,
      out_channels: int = None,
      last_block_activation: torch.nn.Module = None) -> NeuralModel:
    
    de_conv_neural_blocks = []

    # 5- Need to reverse the order of convolutional neural blocks
    list.reverse(conv_neural_blocks)

    # 6- Traverse the list of convolutional neural blocks
    for idx in range(len(conv_neural_blocks)):
       conv_neural_block = conv_neural_blocks[idx]
       new_in_channels = None
       activation = None
       last_out_channels = None

        # 7- Update num. input channels for the first 
        # de-convolutional layer
       if idx == 0:
           new_in_channels = in_channels
        
        # 8- Defined, if necessary the activation 
        # function for the last layer
       elif idx == len(conv_neural_blocks) - 1:
         if last_block_activation is not None:
           activation = last_block_activation
         if out_channels is not None:
          last_out_channels = out_channels

        # 9- Apply transposition to the convolutional block
      de_conv_neural_block = DeConvNeuralBlock.auto_build(
           conv_neural_block,
           new_in_channels,
           last_out_channels,
            activation)
      de_conv_neural_blocks.append(de_conv_neural_block)
        
       # 10- Instantiate the Deconvolutional network 
       # from its neural blocks
   de_conv_model = DeConvModel.assemble(
       model_id, 
       de_conv_neural_blocks)
     
   del de_conv_neural_blocks
   return de_conv_model

The alternate constructor, build, creates and configures the de-convolutional model from the convolutional blocks conv_neural_blocks (4). 
The order of the de-convolutional layers requires the list of convolutional blocks to be reversed (5).  For each block of the convolutional network (6), the method updates the number of input channels from the number of input channels of the first layer (7).
The method updates the activation function for the output layer (8) and weaves the de-convolutional blocks (9)
Finally, the de-convolutional neural network is assembled from these blocks (10).

@classmethod
def assemble(cls, model_id: str, de_conv_neural_blocks: list):
    input_size = de_conv_neural_blocks[0].in_channels
    output_size = de_conv_neural_blocks[len(de_conv_neural_blocks) - 1].out_channels
    # 11- Generate the PyTorch convolutional modules used by the default constructor
    conv_modules = tuple([conv_module for conv_block in de_conv_neural_blocks
                          for conv_module in conv_block.modules 
                          if conv_module is not None])
    de_conv_model = torch.nn.Sequential(*conv_modules)
    return cls(model_id, input_size, output_size, de_conv_model)

The assemble method constructs the final de-convolutional neural network from the blocks   de_conv_neural_blocks by aggregating the PyTorch modules associated with each block (11).

Environment

  • Python 3.8
  • PyTorch 1.7.2

References