Skip to content

Commit

Permalink
Merge pull request #117 from tomaslin/parallelStream
Browse files Browse the repository at this point in the history
jenkins : use java 8 parallelStream and write out last build to redis…
  • Loading branch information
tomaslin authored Jun 17, 2016
2 parents 9bfd201 + 090177e commit 004ee19
Showing 1 changed file with 79 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class JenkinsBuildMonitor implements PollingMonitor {
@Autowired
Environment environment

Scheduler scheduler = Schedulers.newThread()
Scheduler scheduler = Schedulers.io()
Worker worker = scheduler.createWorker()

@Autowired
Expand Down Expand Up @@ -107,19 +107,18 @@ class JenkinsBuildMonitor implements PollingMonitor {
void onApplicationEvent(ContextRefreshedEvent event) {
log.info('Started')
worker.schedulePeriodically(
{
if (isInService()) {
Observable.from( buildMasters.filteredMap(BuildServiceProvider.JENKINS).keySet() )
.subscribe(
{ master ->
changedBuilds(master)
}, { log.error("Error: ${it.message}") }
)
} else {
log.info("not in service (lastPoll: ${lastPoll ?: 'n/a'})")
lastPoll = null
}
} as Action0, 0, pollInterval, TimeUnit.SECONDS
{
if (isInService()) {
log.info "- Polling cycle started -"
buildMasters.filteredMap(BuildServiceProvider.JENKINS).keySet().parallelStream().forEach(
{ master -> changedBuilds(master) }
)
log.info "- Polling cycle done -"
} else {
log.info("not in service (lastPoll: ${lastPoll ?: 'n/a'})")
lastPoll = null
}
} as Action0, 0, pollInterval, TimeUnit.SECONDS
)
}

Expand Down Expand Up @@ -148,97 +147,94 @@ class JenkinsBuildMonitor implements PollingMonitor {
def startTime = System.currentTimeMillis()
List<Project> builds = buildMasters.map[master].projects?.list

log.info( "finding new builds in ${master} : ${ builds.size() } items" )

log.info("Took ${System.currentTimeMillis() - startTime}ms to retrieve projects (master: ${master})")
log.info("finding new builds in ${master} : ${builds.size()} items")

List<String> buildNames = builds*.name

Observable.from(cachedBuilds).filter { String name ->
!(name in buildNames)
}.subscribe(
{ String jobName ->
log.info "Removing ${master}:${jobName}"
cache.remove(master, jobName)
},
{ log.error("Error: ${it.message}") }
{ String jobName ->
log.info "Removing ${master}:${jobName}"
cache.remove(master, jobName)
},
{ log.error("Error: ${it.message}") }
)

Observable.from(builds)
.subscribeOn(scheduler)
.subscribe(
{ Project project ->
try {
boolean addToCache = false

Map cachedBuild = null
log.debug "processing build : ${project?.name} : building? ${project?.lastBuild?.building}"
if (!project?.lastBuild) {
log.debug "no builds found for ${project.name}, skipping"
} else if (cachedBuilds.contains(project.name)) {
cachedBuild = cache.getLastBuild(master, project.name)
if ((project.lastBuild.building != cachedBuild.lastBuildBuilding) ||
(project.lastBuild.number != Integer.valueOf(cachedBuild.lastBuildLabel))) {

addToCache = true

log.info "Build changed: ${master}: ${project.name} : ${project.lastBuild.number} : ${project.lastBuild.building}"

if (echoService) {
int currentBuild = project.lastBuild.number
int lastBuild = Integer.valueOf(cachedBuild.lastBuildLabel)

log.info "sending build events for builds between ${lastBuild} and ${currentBuild}"

try {
buildMasters.map[master].getBuilds(project.name).list.sort {
it.number
}.each { build ->
if (build.number >= lastBuild && build.number < currentBuild) {
try {
Project oldProject = new Project(name: project.name, lastBuild: build)
if (build.number != lastBuild
|| (build.number == lastBuild && cachedBuild.lastBuildBuilding != build.building)) {
echoService.postEvent(
new BuildEvent(content: new BuildContent(project: oldProject, master: master)))
builds.parallelStream().forEach(
{ Project project ->
try {
boolean addToCache = false
Map cachedBuild = null
log.debug "processing build : ${project?.name} : building? ${project?.lastBuild?.building}"
if (!project?.lastBuild) {
log.debug "no builds found for ${project.name}, skipping"
} else if (cachedBuilds.contains(project.name)) {
cachedBuild = cache.getLastBuild(master, project.name)
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)
if ((project.lastBuild.building != cachedBuild.lastBuildBuilding) ||
(project.lastBuild.number != Integer.valueOf(cachedBuild.lastBuildLabel))) {

addToCache = true

log.info "Build changed: ${master}: ${project.name} : ${project.lastBuild.number} : ${project.lastBuild.building}"

if (echoService) {
int currentBuild = project.lastBuild.number
int lastBuild = Integer.valueOf(cachedBuild.lastBuildLabel)

log.info "sending build events for builds between ${lastBuild} and ${currentBuild}"

try {
buildMasters.map[master].getBuilds(project.name).list.sort {
it.number
}.each { build ->
if (build.number >= lastBuild && build.number < currentBuild) {
try {
Project oldProject = new Project(name: project.name, lastBuild: build)
if (build.number != lastBuild
|| (build.number == lastBuild && cachedBuild.lastBuildBuilding != build.building)) {
echoService.postEvent(
new BuildEvent(content: new BuildContent(project: oldProject, master: master)))
}
} catch (e) {
log.error("An error occurred fetching ${master}:${project.name}:${build.number}", e)
}
} catch (e) {
log.error("An error occurred fetching ${master}:${project.name}:${build.number}", e)
}
}
} catch (e) {
log.error("failed getting builds for ${master}", e)
}
} catch (e) {
log.error("failed getting builds for ${master}", e)
}
}
} else {
log.info "New Build: ${master}: ${project.name} : ${project.lastBuild.number} : " +
"${project.lastBuild.result}"
addToCache = true
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)
}
} else {
log.info "New Build: ${master}: ${project.name} : ${project.lastBuild.number} : " +
"${project.lastBuild.result}"
addToCache = true
}
if (addToCache) {
project.lastBuild.result = project?.lastBuild?.result ?: project.lastBuild.building ? BUILD_IN_PROGRESS : ""
log.debug "setting result to ${project.lastBuild.result}"
cache.setLastBuild(master, project.name, project.lastBuild.number, project.lastBuild.building)
if (echoService) {
echoService.postEvent(
new BuildEvent(content: new BuildContent(project: project, master: master))
)
if (addToCache) {
project.lastBuild.result = project?.lastBuild?.result ?: project.lastBuild.building ? BUILD_IN_PROGRESS : ""
log.debug "setting result to ${project.lastBuild.result}"
if (echoService) {
echoService.postEvent(
new BuildEvent(content: new BuildContent(project: project, master: master))
)
}
results << [previous: cachedBuild, current: project]
}
results << [previous: cachedBuild, current: project]
} catch (e) {
log.error("fail processing build : ${project?.name}", e)
}
} catch( e ){
log.error("fail processing build : ${project?.name}", e)
}
}, {
log.error("Error: ${it.message} (${master})")
}
)

log.info("Took ${System.currentTimeMillis() - startTime}ms to retrieve projects (master: ${master})")

} catch (e) {
log.error("failed to update master $master", e)
}

log.info("Last poll took ${System.currentTimeMillis() - thisPoll}ms (master: ${master})")
results
}

Expand Down

0 comments on commit 004ee19

Please sign in to comment.