Last few months I had worked on Cloud based Cluster Manager using Docker Swarm to set up the new customer instance for my project and the experience was wonderful as it gave me deep understanding on how cloud based cluster solutions are working compared to a old school application deployment/setup model. Our solution is mainly to replace the another popular cluster management solution using DMM – Docker Marathon & Mesos.

In DMM, Marathon-lb + Marathon + Mesos takes care of connecting customer requests to their respective services running on the cluster. But this is more involved effort and Docker swarm provides these capabilities out of the box & with simple commands. Some of technologies that we are using …

  • Mesos => Master / Slave configuration forms the cluster
  • Chronos / Marathon (REST API) => Resource Manager, helps to orchestrate containerized services/apps
  • Docker container isolates the resource consumption between tenants
  • Marathon-lb / HAProxy => binds to service port of every app and sends incoming reqs to app instances
    • calls marathon api to retrieve all running services/apps & generates/updates HAProxy config and reloads the HAProxy service
  • Spark => Distributed Compute S/w, can be run in local mode or standalone mode or mesos/yarn mode
  • HDFS => Global shared file system
  • ZK => Distributed / centralized configuration management tool used by many distributed softwares like Kafka, Spark … etc for achieving high availability through Leader/follower model provided by ZK.
  • Nginx =>high-performance HTTP server and reverse proxy
  • Docker Swarm => Swarm mode on docker engine help us to natively manage the cluster.

For building new architecture I had to learn various commands related to docker/docker swarm, HDFS, Spark & Linux (Thanks to our great Chief Architect for his vision/inputs). We had built python based provisioning service to create customer specific instance which involves setting up many of the swarm services …

  • Core product service
  • Backend etl product service
  • Spark as standalone cluster services i.e. master service + worker services
  • Other non docker swarm based configurations
    • Customer space in HDFS with default data
    • ZK configuration for all components.

Since Docker swarm is new technology and we had ran into lots of issues due to the docker version + Linux OS/version that we are using. Our journey on debugging/fixing issues …

  • Started on Centos 7.0 + Docker 1.23 => got into lot of docker swarm service connectivity and other weird issues
  • upgraded Centos OS + Docker to latest version => still had the issues
  • Upgraded OS to Ubuntu 14.04 + Docker to 17.03 => still had service connectivity issues in docker swarm
  • Upgraded Docker to 17.05 => Issues came down but we still noticed few connectivity issues. Posted the issue with docker team – Lately we came to know that there is some race condition which has been fixed.
  • Upgraded OS to latest Ubuntu 16.04 with latest kernel => Yet to apply 17.06 release to see if connectivity issues are completely gone or not. For now we check the connectivity health by using scripts.


I have been working in SAAS solutions for many years now and it is interesting to see how technologies are evolving & applications / web APIs are connecting to each other to get seamless & quick end to end integration between systems & organizations.

Webhook – A lightweight HTTP pattern providing a simple pub/sub model for wiring together Web APIs and SaaS services. It’s been an year that this new concept been introduced in my product (thanks to our great Architect) and I see lot of benefits with it.

  • Now our product can send notifications to the subscribed customers for the predefined events that happens in the product.
  • It’s also used to show the changes between two major statuses in our workflow system
  • We also thought about using it to build reporting data warehouse instead of building warehouse using the typical approach of ETL/batch jobs.

So there may be many such usecases that one can think about to see whether this can fit. The first use case i.e. sending notification with data on an event is a very natural usecase that many products can use this. More examples …

  • File has changed in Dropbox
  • Code change has been committed in GitHub
  • Payment has been initiated in PayPal
  • Card has been created in Trello
  • Post P1 bug to a slack group


Testing it


We are using zookeeper as a central repository for our data/configuration and here are few interesting options/utilities (docker/java versions) that one may need for browsing, copying, backing it up & migrating.

Browse – 

docker run –name zkbrowser -d -p 4550:4550 mijalko/zkbrowser
docker stop zkbrowser
docker start zkbrowser
Copy data –
  1. docker pull ksprojects/zkcopy
  2. docker run –rm -it ksprojects/zkcopy –source <source server:port>/test –target <dest server:port>/test
  3. Java vesion – needs maven
    1. brew install maven (on mac) or install it from
    2. download code from above zkcopy url & build it using => mvn clean install
    3. java -jar target/zkcopy.jar –source <source server:port>/test –target <dest server:port>/test
Browse/backup – (not tried but worth to try)
Migrate point in time data to new servers
  1. copy the last snapshot/log file from source cluster to all zk nodes in the new cluster and restart the cluster.

There can be many UI / API / DB level design patterns to follow but when / what to choose need to be carefully decided and following 5 important principals helped to decide on some of the recent decisions … (Thanks to our Chief Architect for providing these guidelines …)

  1. Keep original data
  2. Maintain SVOT – strengths, vulnerabilities, opportunities, threats
  3. Follow KISS – Keep It Simple Silly – easy maintenance / low cost / more reliability
  4. Don’t rely on PUSH
  5. Plan for disaster

Here are the examples of how above principles are helped us to decide between PUSH Vs PULL  &  Filtering out the data or Keeping the original msg …

Our team had created multiple microservices recently for achieving a big org level new initiative. For one of the end to end flow i.e. creating a new app instance for a trail user involves 3 microservices which follows #1, #4 from above principals.

  1. External Service – follows the event lifecycle model where it triggers the subscribed REST endpoint with user request msg when a new user request comes. Here event lifecycle can be synchronous/asynchronous and the service tries multiple times in case when REST endpoint is down. The service also expects REST endpoint to complete the event lifecycle.
  2. REST interface service – gets the msg from external service and pushes it into ZK.
    1. Follows #1 so that the “Original message” been “PUSHed” to  ZK so that it avoids unnecessary parsing/transformations on the msg, operation will be quick & future changes on the msg can be directly synched instead of parsing/editing the zk msg.
    2. Here #4 is not followed since the service been triggered by external service which intern pushes the original message to ZK and it wait for the core service to complete the provisioning and core service updates the status to ZK. Once it gets the provisioning status by following #4 i.e. PULL model  it triggers the external service to complete the event life cycle.
  3. Core service which I owned creates application instances in Docker Swarm for the trail customers based on data provided in Zookeeper (ZK)
    1. Zookeeper out of box provides watch capability where we can watch on a node which triggers an event when changes happen and based on that the core service can take care the provisioning, so it is PUSH model. But here based on #4, we followed the PULL model i.e. pulling ZK data in period interval to take care provisioning requests and it really helped us from single point failure with ZK (i.e. ZK connection failure or missing some of the events due to any exceptions).


Design Pattern References:


Posted: June 1, 2017 in GC, HDFS, SPARK, Uncategorized
Tags: , , ,

We have recently faced lot of issues with our spark based APP running in Docker SWARM due to heavy Minor/Major GC calls (STW) and following configuration helped to minimize it. Configuration is specific to application and it can’t be reused as is but it can be used as basis to start & try different values to arrive at good numbers for your app. We have tried atleast 10 to 15 different combinations before we arrive at below entries. In our case we use SPARK to run in standalone mode with 4 workers and 4 cores per worker, 16GB driver memory, 16 GB executor memory + max 16 cores & parallelism of 16.

–conf “spark.executor.extraJavaOptions= -XX:ParallelGCThreads=12 -XX:ConcGCThreads=12 -XX:SurvivorRatio=6 -XX:MaxTenuringThreshold=7 -XX:+UseG1GC -XX:MaxGCPauseMillis=15 -XX:InitiatingHeapOccupancyPercent=85 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution”

Use SparkUI for your application which runs on default port 4040 -> executor tab -> “Task Time (GC Time)” column / logs column – stdout link for GC usage / log entries. Above configuration got entries that enables detailed GC related log entries.

Btw, the above entries can also be given as part of HDFS configuration for HADOOP_JOBTRACKER_OPTS, SHARED_HADOOP_NAMENODE_OPTS & HADOOP_DATANODE_OPTS options so that even HDFS can start using G1 GC instead of default -XX:+UseConcMarkSweepGC.

HDFS level config entries that helped us are => -server -XX:ParallelGCThreads=8 -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=50

Changes can be done using Ambari (in case if your cluster been setup using Ambari)  HDFS -> Configs -> Advanced -> Advanced hadoop-env section -> hadoop-env template value.

Reference Links –

Summary Notes:

  1. GC – Mark reachable references in object graph, Sweep/Delete, Compacting => Minor/Major GC (Stop the world = stops apps for few sec)
  2. Performance => responsiveness / latency => How quick the app is accessible, throughput => app’s output w.r.t amount of work it does
  3. Types => 
    1. Serial (single gc thread handles GC), 
    2. Concurrent Mark Sweep (CMS) ( ConcGCThreads=n, gc thread runs along with app side by side) => When the old generation reaches a certain occupancy rate, the CMS is kicked off
      1. 5 collection phases => 
        1. Initial mark for reachable objs(does STW – low pause time)
        2. Concurrent marking as live objs(while app runs) 
        3. Remark the updated objects by the running app (STW)
        4. Concurrent sweep (deallocate space for dead objects and not move live objects i.e. no compaction been done)
        5. Resetting (prepare for next concurrent collection) 
    3. Parallel (ParallelGCThreads=n, uses parallel cores of CPU once heap hits 90%) GC collectors
  4. When to use => CMS => when we have more memory/no.of CPUs + app demands only short pauses … called as low latency collector most of web/financial apps. 
    1. Parallel collector => when we have less memory, less no.of CPUs & app demands high throughput. 
  5. G1 GC => Predictable/tunable GC pauses, low pauses, parallelism & concurrency together & better heap utlization
    1. Heap region => size decides based on amount of the Heap size and JVM plans around 2000 regions ranging from 1 MB to 32 MB size
      1. 10% – default reserved value for safety to avoid promotion failures
    2. Tenuring Threshold is used by JVM to decide when an object can be promoted from young generations to Old generation (MaxTenuringThreshold=n, default 15, -XX:+PrintTenuringDistribution – prints age distribution)
      1. Live objects are evacuated (i.e., copied or moved) to one or more survivor regions. If the aging threshold is met, some of the objects are promoted to old generation region
    3. 6 collection phase for old generation => 
      1. Initial Mark survivor regions or root regions (STW)
      2. Root Region Scanning => Scan survivor regions for old gen references  – need to complete before young GC can occur
      3. Concurrent marking (find live objects across heap) – parallel to application but it can be interrupted by young GC
      4. Remark (STW) – complete the marking of live objects 
      5. Cleanup (STW) – empty regions are removed and region availability recalculated
      6. Copying (STW) – copy to new unused regions. This can be done with young generation regions which are logged as [GC pause (young)]. Or both young and old generation regions which are logged as [GC Pause (mixed)]
    4. do not explicitly set young generation size –Xmn  which will impact G1 GC collector’s pause time goal and with explicit value G1 GC can’t auto adjust the size as needed => i.e. 
      1. Evacuation Failure => when JVM runs out of heap regions during GC for either survivor or promoted objects 
    5. Tuning => 
      1. -XX:NewSize and -XX:MaxNewSize => set a lower and upper bound for the size of the young generation. Young generation can’t be bigger than old since at some point if all young gen obj may beed to be moved to old gen.
      2. -XX:NewRatio => allows us to specify the factor by which the old generation should be larger than the young generation. For example, with -XX:NewRatio=3 the old generation will be three times as large as the young generation
      3. -XX:MaxGCPauseMillis=200 => Sets a target for the maximum GC pause time. This is a soft goal, and the JVM will make its best effort to achieve it. Therefore, the pause time goal will sometimes not be met. The default value is 200 milliseconds
      4. -XX:InitiatingHeapOccupancyPercent=45 => Percentage of the (entire) heap occupancy to start a concurrent GC cycle.
      5. -XX:SurvivorRatio specifies how large “Eden” should be sized relative to one of the two survivor spaces. For example, with -XX:SurvivorRatio=10 we dimension “Eden” ten times as large as “To” (and at the same time ten times as large as “From”). As a result, “Eden” occupies 10/12 of the young generation while “To” and “From” each occupy 1/12. Note that the two survivor spaces are always equal in size.
      6. -XX:InitialTenuringThreshold and -XX:MaxTenuringThreshold, -XX:+PrintTenuringDistribution, -XX:TargetSurvivorRatio =>initial and maximum value of the tenuring threshold, respectively + specify the target utilization (in percent) of “To” at the end of a young generation GC
      7. -XX:+NeverTenure and -XX:+AlwaysTenure => objects are never promoted to the old generation i.e. old generation not needed / no survivor spaces are used so that all young objects are immediately promoted to the old generation on their first GC
      8. Xms / Xmx=> minimum / maximum heap allocated to the program (like spark)
    6. logging => -Xloggc:gc.log (Use eclipse plug-in for visual graphs, Visual VM (jvisualvm), Visual GC plug-in), jhat (java heap analyzer tool), teraquota big memory tool
USING @ruoItems AS updatedRUO
ON (rsRUO .FieldId = updatedRUO.FieldId )
      rsRUO .EntityType = updatedRUO.EntityType ,
      rsRUO .MasterEntityTypeId = updatedRUO.MasterEntityTypeId ,
      rsRUO .FieldValue = updatedRUO.FieldValue ,
      rsRUO .RetailElementId = updatedRUO.RetailElementId ,
      rsRUO .FieldName = updatedRUO.FieldName ,
      rsRUO .ValueSeq = updatedRUO.ValueSeq ,
      rsRUO .CreateDatetime = updatedRUO.CreateDatetime ,
      rsRUO .ChangedDatetime = updatedRUO.ChangedDatetime ,
      rsRUO .RowStatus = updatedRUO.RowStatus ,
      rsRUO .ChangedById = updatedRUO.ChangedById
INSERT (EntityType ,EntityTypeId, MasterEntityTypeId,RowSeq , RetailElementId,FieldValue, FieldName,ValueSeq , CreateDatetime,ChangedDatetime, RowStatus,ChangedById )
VALUES(updatedRUO .EntityType, updatedRUO.EntityTypeId ,updatedRUO. MasterEntityTypeId,updatedRUO .RowSeq, updatedRUO.RetailElementId, updatedRUO.FieldValue ,updatedRUO. FieldName,updatedRUO .ValueSeq, updatedRUO.CreateDatetime, updatedRUO.ChangedDatetime ,updatedRUO. RowStatus,updatedRUO .ChangedById)
            inserted .EntityType,     
            inserted .EntityTypeId,     
            inserted .RowSeq,    
            inserted .RetailElementId ,    
            inserted .FieldValue ,     
            inserted .FieldName ,     
            inserted .ValueSeq ,     
            inserted .CreateDatetime ,     
            inserted .ChangedDatetime ,    
            inserted .RowStatus,
            inserted .ChangedById
 into @tmpOutputTable ( 
                        EntityType ,    
                        EntityTypeId ,    
                        RowSeq ,   
                        RetailElementId ,
                        FieldValue ,    
                        FieldName ,    
                        ValueSeq ,    
                        CreateDatetime , 
                        ChangedDatetime ,
                        RowStatus, changedbyid)
declare @tablename nvarchar (100)
set @tablename = ‘party’
declare @query nvarchar (1000)
set @query= ‘select top 10 * from ‘ + @tablename
exec (@query )
— Get the dynamic query value assigned to a outer variable
DECLARE @RsDataSQL nvarchar(2000)=‘select top 1 @Rstable2 = ”RS_” + rc.TablePrefix + ”_” + rf.TablePrefix + ”_EventType”’
       +‘ from RControl rc 
      inner join Form rf  on rf.FormId=rc.FormId
      where rf.partyid = 49310′            
declare @lrstable nvarchar (500) =
exec sp_executesql @RsDataSQL, N’@Rstable2 nvarchar(500) out’, @lrstable out
select @lrstable
— another example
declare @tablename nvarchar (100)
set @tablename = ‘party’
declare @query nvarchar (1000)
set @query= ‘select top 1 @partyid = partyid from ‘ + @tablename
declare @partyid1 int =
exec sp_executesql @query, N’@partyid int out’, @partyid1 out
select @partyid1