EPStatementStartMethod.java

Index Score
com.espertech.esper.core
Esper

View: Reasons, Metrics, Source Code

These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.

MetricDescription
EXEC_COMMENTSComments in executable code
LINE_COMMENTNumber of line comments
SIZESize of the file in bytes
JAVA0144JAVA0144 Line exceeds maximum M characters
EXITSProcedure exits
OPERANDSNumber of operands
PROGRAM_LENGTHHalstead program length
OPERATORSNumber of operators
UNIQUE_OPERANDSNumber of unique operands
LOGICAL_LINESNumber of statements
PROGRAM_VOCABHalstead program vocabulary
LOCLines of code
ELOCEffective lines of code
LINESNumber of lines in the source file
RETURNSNumber of return points from functions
BLOCKSNumber of blocks
INTERFACE_COMPLEXITYInterface complexity
COMPARISONSNumber of comparison operators
CYCLOMATICCyclomatic complexity
LOOPSNumber of loops
JAVA0177JAVA0177 Variable declaration missing initializer
PARAMSNumber of formal parameter declarations
JAVA0034JAVA0034 Missing braces in if statement
JAVA0160JAVA0160 Method does not throw specified exception
WHITESPACENumber of whitespace lines
NEST_DEPTHMaximum nesting depth
JAVA0123JAVA0123 Use all three components of for loop
JAVA0179JAVA0179 Local variable hides visible field
JAVA0049JAVA0049 Nested block at depth N (maximum: M)
JAVA0067JAVA0067 Array descriptor on identifier name
JAVA0130JAVA0130 Non-static method does not use instance fields
JAVA0075JAVA0075 Method parameter hides field
JAVA0117JAVA0117 Missing javadoc: method 'method'
UNIQUE_OPERATORSNumber of unique operators
PROGRAM_VOLUMEHalstead program volume
JAVA0174JAVA0174 Assigned local variable never used
JAVA0138JAVA0138 N parameters defined for method (maximum: M)
JAVA0126JAVA0126 Method declares unchecked exception in throws
JAVA0110JAVA0110 Incorrect javadoc: no @return tag
JAVA0108JAVA0108 Incorrect javadoc: no @param tag for 'parameter'
JAVA0254JAVA0254 Use enhanced for loop construct instead of Iterator
DOC_COMMENTNumber of javadoc comment lines
DECL_COMMENTSComments in declarations
JAVA0145JAVA0145 Tab character used in source file
package com.espertech.esper.core; import com.espertech.esper.client.EPStatementException; import com.espertech.esper.collection.Pair; import com.espertech.esper.collection.UniformPair; import com.espertech.esper.epl.core.*; import com.espertech.esper.epl.db.DatabasePollingViewableFactory; import com.espertech.esper.epl.expression.*; import com.espertech.esper.epl.join.*; import com.espertech.esper.epl.join.plan.FilterExprAnalyzer; import com.espertech.esper.epl.join.plan.QueryGraph; import com.espertech.esper.epl.join.table.EventTable; import com.espertech.esper.epl.join.table.PropertyIndTableCoerceAdd; import com.espertech.esper.epl.join.table.PropertyIndexedEventTable; import com.espertech.esper.epl.join.table.UnindexedEventTable; import com.espertech.esper.epl.lookup.*; import com.espertech.esper.epl.named.*; import com.espertech.esper.epl.spec.*; import com.espertech.esper.epl.variable.CreateVariableView; import com.espertech.esper.epl.variable.OnSetVariableView; import com.espertech.esper.epl.variable.VariableDeclarationException; import com.espertech.esper.epl.variable.VariableExistsException; import com.espertech.esper.epl.view.*; import com.espertech.esper.epl.agg.AggregationService; import com.espertech.esper.epl.agg.AggregationServiceFactory; import com.espertech.esper.epl.subquery.SubselectAggregatorView; import com.espertech.esper.epl.subquery.SubqueryStopCallback; import com.espertech.esper.epl.subquery.SubselectBufferObserver; import com.espertech.esper.event.EventBean; import com.espertech.esper.event.EventType; import com.espertech.esper.pattern.EvalRootNode; import com.espertech.esper.pattern.PatternContext; import com.espertech.esper.pattern.PatternMatchCallback; import com.espertech.esper.pattern.PatternStopCallback; import com.espertech.esper.util.JavaClassHelper; import com.espertech.esper.util.ManagedLock; import com.espertech.esper.util.StopCallback; import com.espertech.esper.view.*; import com.espertech.esper.view.internal.BufferView; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.*; /** * Starts and provides the stop method for EPL statements. */ public class EPStatementStartMethod { private final StatementSpecCompiled statementSpec; private final EPServicesContext services; private final StatementContext statementContext; /** * Ctor. * @param statementSpec is a container for the definition of all statement constructs that * may have been used in the statement, i.e. if defines the select clauses, insert into, outer joins etc. * @param services is the service instances for dependency injection * @param statementContext is statement-level information and statement services */ public EPStatementStartMethod(StatementSpecCompiled statementSpec, EPServicesContext services, StatementContext statementContext) { this.statementSpec = statementSpec; this.services = services; this.statementContext = statementContext; } /** * Starts the EPL statement. * @return a viewable to attach to for listening to events, and a stop method to invoke to clean up * @param isNewStatement indicator whether the statement is new or a stop-restart statement * @throws ExprValidationException when the expression validation fails * @throws ViewProcessingException when views cannot be started */ public Pair<Viewable, EPStatementStopMethod> start(boolean isNewStatement) throws ExprValidationException, ViewProcessingException { statementContext.getVariableService().setLocalVersion(); // get current version of variables if (statementSpec.getOnTriggerDesc() != null) { return startOnTrigger(); } else if (statementSpec.getCreateWindowDesc() != null) { return startCreateWindow(); } else if (statementSpec.getCreateVariableDesc() != null) { return startCreateVariable(isNewStatement); } else { return startSelect(); } } private Pair<Viewable, EPStatementStopMethod> startOnTrigger() throws ExprValidationException, ViewProcessingException { final List<StopCallback> stopCallbacks = new LinkedList<StopCallback>(); // Create streams Viewable eventStreamParentViewable; final StreamSpecCompiled streamSpec = statementSpec.getStreamSpecs().get(0); if (streamSpec instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) streamSpec; // Since only for non-joins we get the existing stream's lock and try to reuse it's views Pair<EventStream, ManagedLock> streamLockPair = services.getStreamService().createStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), statementContext.getEpStatementHandle(), false); eventStreamParentViewable = streamLockPair.getFirst(); // Use the re-used stream's lock for all this statement's locking needs if (streamLockPair.getSecond() != null) { statementContext.getEpStatementHandle().setStatementLock(streamLockPair.getSecond()); } } else if (streamSpec instanceof PatternStreamSpecCompiled) { PatternStreamSpecCompiled patternStreamSpec = (PatternStreamSpecCompiled) streamSpec; final EventType eventType = services.getEventAdapterService().createAnonymousCompositeType(patternStreamSpec.getTaggedEventTypes()); final EventStream sourceEventStream = new ZeroDepthStream(eventType); eventStreamParentViewable = sourceEventStream; EvalRootNode rootNode = new EvalRootNode(); rootNode.addChildNode(patternStreamSpec.getEvalNode()); PatternMatchCallback callback = new PatternMatchCallback() { public void matchFound(Map<String, EventBean> matchEvent) { EventBean compositeEvent = statementContext.getEventAdapterService().adapterForCompositeEvent(eventType, matchEvent); sourceEventStream.insert(compositeEvent); } }; PatternContext patternContext = statementContext.getPatternContextFactory().createContext(statementContext, 0, rootNode); PatternStopCallback patternStopCallback = rootNode.start(callback, patternContext); stopCallbacks.add(patternStopCallback); } else if (streamSpec instanceof NamedWindowConsumerStreamSpec) { NamedWindowConsumerStreamSpec namedSpec = (NamedWindowConsumerStreamSpec) streamSpec; NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(namedSpec.getWindowName()); eventStreamParentViewable = processor.addConsumer(namedSpec.getFilterExpressions(), statementContext.getEpStatementHandle(), statementContext.getStatementStopService()); } else { throw new ExprValidationException("Unknown stream specification type: " + streamSpec); } // create stop method using statement stream specs EPStatementStopMethod stopMethod = new EPStatementStopMethod() { public void stop() { statementContext.getStatementStopService().fireStatementStopped(); if (streamSpec instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) streamSpec; services.getStreamService().dropStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), false); } for (StopCallback stopCallback : stopCallbacks) { stopCallback.stop(); } } }; View onExprView; EventType streamEventType = eventStreamParentViewable.getEventType(); ResultSetProcessor resultSetProcessor; // For on-delete and on-select triggers if (statementSpec.getOnTriggerDesc() instanceof OnTriggerWindowDesc) { // Determine event types OnTriggerWindowDesc onTriggerDesc = (OnTriggerWindowDesc) statementSpec.getOnTriggerDesc(); NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(onTriggerDesc.getWindowName()); EventType namedWindowType = processor.getNamedWindowType(); String namedWindowAlias = onTriggerDesc.getOptionalAsName(); if (namedWindowAlias == null) { namedWindowAlias = "stream_0"; } String streamAlias = streamSpec.getOptionalStreamName(); if (streamAlias == null) { streamAlias = "stream_1"; } StreamTypeService typeService = new StreamTypeServiceImpl(new EventType[] {namedWindowType, streamEventType}, new String[] {namedWindowAlias, streamAlias}); // validate join expression ExprNode validatedJoin = validateJoinNamedWindow(statementSpec.getFilterRootNode(), namedWindowType, namedWindowAlias, streamEventType, streamAlias); // Construct a processor for results; for use in on-select to process selection results // Use a wildcard select if the select-clause is empty, such as for on-delete. // For on-select the select clause is not empty. if (statementSpec.getSelectClauseSpec().getSelectExprList().size() == 0) { statementSpec.getSelectClauseSpec().add(new SelectClauseElementWildcard()); } resultSetProcessor = ResultSetProcessorFactory.getProcessor( statementSpec, statementContext, typeService, null); InternalEventRouter routerService = (statementSpec.getInsertIntoDesc() == null)? null : services.getInternalEventRouter(); onExprView = processor.addOnExpr(onTriggerDesc, validatedJoin, streamEventType, statementContext.getStatementStopService(), routerService, resultSetProcessor, statementContext.getEpStatementHandle(), statementContext.getStatementResultService()); eventStreamParentViewable.addView(onExprView); } else { OnTriggerSetDesc desc = (OnTriggerSetDesc) statementSpec.getOnTriggerDesc(); StreamTypeService typeService = new StreamTypeServiceImpl(new EventType[] {streamEventType}, new String[] {streamSpec.getOptionalStreamName()}); for (OnTriggerSetAssignment assignment : desc.getAssignments()) { ExprNode validated = assignment.getExpression().getValidatedSubtree(typeService, statementContext.getMethodResolutionService(), null, statementContext.getSchedulingService(), statementContext.getVariableService()); assignment.setExpression(validated); } onExprView = new OnSetVariableView(desc, statementContext.getEventAdapterService(), statementContext.getVariableService(), statementContext.getStatementResultService()); eventStreamParentViewable.addView(onExprView); } // For on-delete, create an output processor that passes on as a wildcard the underlying event if ((statementSpec.getOnTriggerDesc().getOnTriggerType() == OnTriggerType.ON_DELETE) || (statementSpec.getOnTriggerDesc().getOnTriggerType() == OnTriggerType.ON_SET)) { StatementSpecCompiled defaultSelectAllSpec = new StatementSpecCompiled(); defaultSelectAllSpec.getSelectClauseSpec().add(new SelectClauseElementWildcard()); ResultSetProcessor outputResultSetProcessor = ResultSetProcessorFactory.getProcessor( defaultSelectAllSpec, statementContext, new StreamTypeServiceImpl(new EventType[] {onExprView.getEventType()}, new String[] {"trigger_stream"}), null); // Attach output view OutputProcessView outputView = OutputProcessViewFactory.makeView(outputResultSetProcessor, defaultSelectAllSpec, statementContext, services.getInternalEventRouter()); onExprView.addView(outputView); onExprView = outputView; } log.debug(".start Statement start completed"); return new Pair<Viewable, EPStatementStopMethod>(onExprView, stopMethod); } private Pair<Viewable, EPStatementStopMethod> startCreateWindow() throws ExprValidationException, ViewProcessingException { final FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) statementSpec.getStreamSpecs().get(0); String windowName = statementSpec.getCreateWindowDesc().getWindowName(); EventType windowType = filterStreamSpec.getFilterSpec().getEventType(); services.getNamedWindowService().addProcessor(windowName, windowType, statementContext.getEpStatementHandle(), statementContext.getStatementResultService()); // Create streams and views Viewable eventStreamParentViewable; ViewFactoryChain unmaterializedViewChain; // Create view factories and parent view based on a filter specification // Since only for non-joins we get the existing stream's lock and try to reuse it's views Pair<EventStream, ManagedLock> streamLockPair = services.getStreamService().createStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), statementContext.getEpStatementHandle(), false); eventStreamParentViewable = streamLockPair.getFirst(); // Use the re-used stream's lock for all this statement's locking needs if (streamLockPair.getSecond() != null) { statementContext.getEpStatementHandle().setStatementLock(streamLockPair.getSecond()); } // Create data window view factories unmaterializedViewChain = services.getViewService().createFactories(0, eventStreamParentViewable.getEventType(), filterStreamSpec.getViewSpecs(), statementContext); // The root view of the named window NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(statementSpec.getCreateWindowDesc().getWindowName()); View rootView = processor.getRootView(); eventStreamParentViewable.addView(rootView); // request remove stream capability from views ViewResourceDelegate viewResourceDelegate = new ViewResourceDelegateImpl(new ViewFactoryChain[] {unmaterializedViewChain}, statementContext); if (!viewResourceDelegate.requestCapability(0, new RemoveStreamViewCapability(), null)) { throw new ExprValidationException(NamedWindowService.ERROR_MSG_DATAWINDOWS); } // create stop method using statement stream specs EPStatementStopMethod stopMethod = new EPStatementStopMethod() { public void stop() { statementContext.getStatementStopService().fireStatementStopped(); services.getStreamService().dropStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), false); String windowName = statementSpec.getCreateWindowDesc().getWindowName(); services.getNamedWindowService().removeProcessor(windowName); } }; // Materialize views Viewable finalView = services.getViewService().createViews(rootView, unmaterializedViewChain.getViewFactoryChain(), statementContext); // Attach tail view NamedWindowTailView tailView = processor.getTailView(); finalView.addView(tailView); finalView = tailView; // Add a wildcard to the select clause as subscribers received the window contents statementSpec.getSelectClauseSpec().getSelectExprList().clear(); statementSpec.getSelectClauseSpec().add(new SelectClauseElementWildcard()); statementSpec.setSelectStreamDirEnum(SelectClauseStreamSelectorEnum.RSTREAM_ISTREAM_BOTH); ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.getProcessor( statementSpec, statementContext, new StreamTypeServiceImpl(new EventType[] {windowType}, new String[] {windowName}), null); // Attach output view OutputProcessView outputView = OutputProcessViewFactory.makeView(resultSetProcessor, statementSpec, statementContext, services.getInternalEventRouter()); finalView.addView(outputView); finalView = outputView; log.debug(".start Statement start completed"); return new Pair<Viewable, EPStatementStopMethod>(finalView, stopMethod); } private Pair<Viewable, EPStatementStopMethod> startCreateVariable(boolean isNewStatement) throws ExprValidationException, ViewProcessingException { CreateVariableDesc createDesc = statementSpec.getCreateVariableDesc(); // Determime the variable type Class type; try { type = JavaClassHelper.getClassForSimpleName(createDesc.getVariableType()); } catch (Throwable t) { throw new ExprValidationException("Cannot create variable '" + createDesc.getVariableName() + "', type '" + createDesc.getVariableType() + "' is not a recognized type"); } // Get assignment value Object value = null; if (createDesc.getAssignment() != null) { // Evaluate assignment expression StreamTypeService typeService = new StreamTypeServiceImpl(new EventType[0], new String[0]); ExprNode validated = createDesc.getAssignment().getValidatedSubtree(typeService, statementContext.getMethodResolutionService(), null, statementContext.getSchedulingService(), statementContext.getVariableService()); value = validated.evaluate(null, true); } // Create variable try { services.getVariableService().createNewVariable(createDesc.getVariableName(), type, value, statementContext.getExtensionServicesContext()); } catch (VariableExistsException ex) { // for new statement we don't allow creating the same variable if (isNewStatement) { throw new ExprValidationException("Cannot create variable: " + ex.getMessage()); } // compare the type if (services.getVariableService().getReader(createDesc.getVariableName()).getType() != type) { throw new ExprValidationException("Cannot create variable: " + ex.getMessage()); } } catch (VariableDeclarationException ex) { throw new ExprValidationException("Cannot create variable: " + ex.getMessage()); } CreateVariableView createView = new CreateVariableView(services.getEventAdapterService(), services.getVariableService(), createDesc.getVariableName(), statementContext.getStatementResultService()); services.getVariableService().registerCallback(services.getVariableService().getReader(createDesc.getVariableName()).getVariableNumber(), createView); // Create result set processor, use wildcard selection statementSpec.getSelectClauseSpec().getSelectExprList().clear(); statementSpec.getSelectClauseSpec().add(new SelectClauseElementWildcard()); statementSpec.setSelectStreamDirEnum(SelectClauseStreamSelectorEnum.RSTREAM_ISTREAM_BOTH); ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.getProcessor( statementSpec, statementContext, new StreamTypeServiceImpl(new EventType[] {createView .getEventType()}, new String[] {"create_variable"}), null); // Attach output view OutputProcessView outputView = OutputProcessViewFactory.makeView(resultSetProcessor, statementSpec, statementContext, services.getInternalEventRouter()); createView.addView(outputView); return new Pair<Viewable, EPStatementStopMethod>(outputView, new EPStatementStopMethod(){ public void stop() { } }); } private Pair<Viewable, EPStatementStopMethod> startSelect() throws ExprValidationException, ViewProcessingException { // Determine stream names for each stream - some streams may not have a name given String[] streamNames = determineStreamNames(statementSpec.getStreamSpecs()); final boolean isJoin = statementSpec.getStreamSpecs().size() > 1; // First we create streams for subselects, if there are any SubSelectStreamCollection subSelectStreamDesc = createSubSelectStreams(isJoin); int numStreams = streamNames.length; final List<StopCallback> stopCallbacks = new LinkedList<StopCallback>(); // Create streams and views Viewable[] eventStreamParentViewable = new Viewable[numStreams]; ViewFactoryChain[] unmaterializedViewChain = new ViewFactoryChain[numStreams]; boolean[] isUnidirectional = new boolean[numStreams]; boolean[] hasChildViews = new boolean[numStreams]; for (int i = 0; i < statementSpec.getStreamSpecs().size(); i++) { StreamSpecCompiled streamSpec = statementSpec.getStreamSpecs().get(i); isUnidirectional[i] = streamSpec.isUnidirectional(); hasChildViews[i] = !streamSpec.getViewSpecs().isEmpty(); // Create view factories and parent view based on a filter specification if (streamSpec instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) streamSpec; // Since only for non-joins we get the existing stream's lock and try to reuse it's views Pair<EventStream, ManagedLock> streamLockPair = services.getStreamService().createStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), statementContext.getEpStatementHandle(), isJoin); eventStreamParentViewable[i] = streamLockPair.getFirst(); // Use the re-used stream's lock for all this statement's locking needs if (streamLockPair.getSecond() != null) { statementContext.getEpStatementHandle().setStatementLock(streamLockPair.getSecond()); } unmaterializedViewChain[i] = services.getViewService().createFactories(i, eventStreamParentViewable[i].getEventType(), streamSpec.getViewSpecs(), statementContext); } // Create view factories and parent view based on a pattern expression else if (streamSpec instanceof PatternStreamSpecCompiled) { PatternStreamSpecCompiled patternStreamSpec = (PatternStreamSpecCompiled) streamSpec; final EventType eventType = services.getEventAdapterService().createAnonymousCompositeType(patternStreamSpec.getTaggedEventTypes()); final EventStream sourceEventStream = new ZeroDepthStream(eventType); eventStreamParentViewable[i] = sourceEventStream; unmaterializedViewChain[i] = services.getViewService().createFactories(i, sourceEventStream.getEventType(), streamSpec.getViewSpecs(), statementContext); EvalRootNode rootNode = new EvalRootNode(); rootNode.addChildNode(patternStreamSpec.getEvalNode()); PatternMatchCallback callback = new PatternMatchCallback() { public void matchFound(Map<String, EventBean> matchEvent) { EventBean compositeEvent = statementContext.getEventAdapterService().adapterForCompositeEvent(eventType, matchEvent); sourceEventStream.insert(compositeEvent); } }; PatternContext patternContext = statementContext.getPatternContextFactory().createContext(statementContext, i, rootNode); PatternStopCallback patternStopCallback = rootNode.start(callback, patternContext); stopCallbacks.add(patternStopCallback); } // Create view factories and parent view based on a database SQL statement else if (streamSpec instanceof DBStatementStreamSpec) { if (!streamSpec.getViewSpecs().isEmpty()) { throw new ExprValidationException("Historical data joins do not allow views onto the data, view '" + streamSpec.getViewSpecs().get(0).getObjectNamespace() + ':' + streamSpec.getViewSpecs().get(0).getObjectName() + "' is not valid in this context"); } DBStatementStreamSpec sqlStreamSpec = (DBStatementStreamSpec) streamSpec; HistoricalEventViewable historicalEventViewable = DatabasePollingViewableFactory.createDBStatementView(i, sqlStreamSpec, services.getDatabaseRefService(), services.getEventAdapterService(), statementContext.getEpStatementHandle()); unmaterializedViewChain[i] = new ViewFactoryChain(historicalEventViewable.getEventType(), new LinkedList<ViewFactory>()); eventStreamParentViewable[i] = historicalEventViewable; stopCallbacks.add(historicalEventViewable); } else if (streamSpec instanceof MethodStreamSpec) { if (!streamSpec.getViewSpecs().isEmpty()) { throw new ExprValidationException("Method data joins do not allow views onto the data, view '" + streamSpec.getViewSpecs().get(0).getObjectNamespace() + ':' + streamSpec.getViewSpecs().get(0).getObjectName() + "' is not valid in this context"); } MethodStreamSpec methodStreamSpec = (MethodStreamSpec) streamSpec; HistoricalEventViewable historicalEventViewable = MethodPollingViewableFactory.createPollMethodView(i, methodStreamSpec, services.getEventAdapterService(), statementContext.getEpStatementHandle(), statementContext.getMethodResolutionService(), services.getEngineImportService(), services.getSchedulingService(), statementContext.getScheduleBucket()); unmaterializedViewChain[i] = new ViewFactoryChain(historicalEventViewable.getEventType(), new LinkedList<ViewFactory>()); eventStreamParentViewable[i] = historicalEventViewable; stopCallbacks.add(historicalEventViewable); } else if (streamSpec instanceof NamedWindowConsumerStreamSpec) { NamedWindowConsumerStreamSpec namedSpec = (NamedWindowConsumerStreamSpec) streamSpec; NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(namedSpec.getWindowName()); NamedWindowConsumerView consumerView = processor.addConsumer(namedSpec.getFilterExpressions(), statementContext.getEpStatementHandle(), statementContext.getStatementStopService()); eventStreamParentViewable[i] = consumerView; unmaterializedViewChain[i] = services.getViewService().createFactories(i, consumerView.getEventType(), namedSpec.getViewSpecs(), statementContext); // Consumers to named windows cannot declare a data window view onto the named window to avoid duplicate remove streams ViewResourceDelegate viewResourceDelegate = new ViewResourceDelegateImpl(unmaterializedViewChain, statementContext); viewResourceDelegate.requestCapability(i, new NotADataWindowViewCapability(), null); } else { throw new ExprValidationException("Unknown stream specification type: " + streamSpec); } } // Obtain event types from ViewFactoryChains EventType[] streamEventTypes = new EventType[statementSpec.getStreamSpecs().size()]; for (int i = 0; i < unmaterializedViewChain.length; i++) { streamEventTypes[i] = unmaterializedViewChain[i].getEventType(); } // Materialize sub-select views startSubSelect(subSelectStreamDesc, streamNames, streamEventTypes, stopCallbacks); // List of statement streams final List<StreamSpecCompiled> statementStreamSpecs = new ArrayList<StreamSpecCompiled>(); statementStreamSpecs.addAll(statementSpec.getStreamSpecs()); // Construct type information per stream StreamTypeService typeService = new StreamTypeServiceImpl(streamEventTypes, streamNames); ViewResourceDelegate viewResourceDelegate = new ViewResourceDelegateImpl(unmaterializedViewChain, statementContext); // create stop method using statement stream specs EPStatementStopMethod stopMethod = new EPStatementStopMethod() { public void stop() { statementContext.getStatementStopService().fireStatementStopped(); for (StreamSpecCompiled streamSpec : statementStreamSpecs) { if (streamSpec instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) streamSpec; services.getStreamService().dropStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), isJoin); } } for (StopCallback stopCallback : stopCallbacks) { stopCallback.stop(); } for (ExprSubselectNode subselect : statementSpec.getSubSelectExpressions()) { StreamSpecCompiled subqueryStreamSpec = subselect.getStatementSpecCompiled().getStreamSpecs().get(0); if (subqueryStreamSpec instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) subselect.getStatementSpecCompiled().getStreamSpecs().get(0); services.getStreamService().dropStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), isJoin); } } } }; // Validate views that require validation, specifically streams that don't have // sub-views such as DB SQL joins for (Viewable viewable : eventStreamParentViewable) { if (viewable instanceof ValidatedView) { ValidatedView validatedView = (ValidatedView) viewable; validatedView.validate(typeService, statementContext.getMethodResolutionService(), statementContext.getSchedulingService(), statementContext.getVariableService()); } } // Construct a processor for results posted by views and joins, which takes care of aggregation if required. // May return null if we don't need to post-process results posted by views or joins. ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.getProcessor( statementSpec, statementContext, typeService, viewResourceDelegate); // Validate where-clause filter tree and outer join clause validateNodes(typeService, statementContext.getMethodResolutionService(), viewResourceDelegate); // Materialize views Viewable[] streamViews = new Viewable[streamEventTypes.length]; for (int i = 0; i < streamViews.length; i++) { streamViews[i] = services.getViewService().createViews(eventStreamParentViewable[i], unmaterializedViewChain[i].getViewFactoryChain(), statementContext); } // For just 1 event stream without joins, handle the one-table process separatly. Viewable finalView; JoinPreloadMethod joinPreloadMethod = null; if (streamNames.length == 1) { finalView = handleSimpleSelect(streamViews[0], resultSetProcessor, statementContext); } else { Pair<Viewable, JoinPreloadMethod> pair = handleJoin(streamNames, streamEventTypes, streamViews, resultSetProcessor, statementSpec.getSelectStreamSelectorEnum(), statementContext, stopCallbacks, isUnidirectional, hasChildViews); finalView = pair.getFirst(); joinPreloadMethod = pair.getSecond(); } // Replay any named window data, for later consumers of named data windows boolean hasNamedWindow = false; for (int i = 0; i < statementSpec.getStreamSpecs().size(); i++) { StreamSpecCompiled streamSpec = statementSpec.getStreamSpecs().get(i); if (streamSpec instanceof NamedWindowConsumerStreamSpec) { hasNamedWindow = true; NamedWindowConsumerStreamSpec namedSpec = (NamedWindowConsumerStreamSpec) streamSpec; NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(namedSpec.getWindowName()); NamedWindowTailView consumerView = processor.getTailView(); NamedWindowConsumerView view = (NamedWindowConsumerView) eventStreamParentViewable[i]; // preload view for stream ArrayList<EventBean> eventsInWindow = new ArrayList<EventBean>(); for (EventBean aConsumerView : consumerView) { eventsInWindow.add(aConsumerView); } if (!eventsInWindow.isEmpty()) { EventBean[] newEvents = eventsInWindow.toArray(new EventBean[eventsInWindow.size()]); view.update(newEvents, null); } // in a join, preload indexes, if any if (joinPreloadMethod != null) { joinPreloadMethod.preloadFromBuffer(i); } } } // last, for aggregation we need to send the current join results to the result set processor if ((hasNamedWindow) && (joinPreloadMethod != null)) { joinPreloadMethod.preloadAggregation(resultSetProcessor); } log.debug(".start Statement start completed"); return new Pair<Viewable, EPStatementStopMethod>(finalView, stopMethod); } private Pair<Viewable, JoinPreloadMethod> handleJoin(String[] streamNames, EventType[] streamTypes, Viewable[] streamViews, ResultSetProcessor resultSetProcessor, SelectClauseStreamSelectorEnum selectStreamSelectorEnum, StatementContext statementContext, List<StopCallback> stopCallbacks, boolean[] isUnidirectional, boolean[] hasChildViews) throws ExprValidationException { // Handle joins final JoinSetComposer composer = statementContext.getJoinSetComposerFactory().makeComposer(statementSpec.getOuterJoinDescList(), statementSpec.getFilterRootNode(), streamTypes, streamNames, streamViews, selectStreamSelectorEnum, isUnidirectional, hasChildViews); stopCallbacks.add(new StopCallback(){ public void stop() { composer.destroy(); } }); JoinSetFilter filter = new JoinSetFilter(statementSpec.getFilterRootNode()); OutputProcessView indicatorView = OutputProcessViewFactory.makeView(resultSetProcessor, statementSpec, statementContext, services.getInternalEventRouter()); // Create strategy for join execution JoinExecutionStrategy execution = new JoinExecutionStrategyImpl(composer, filter, indicatorView); // The view needs a reference to the join execution to pull iterator values indicatorView.setJoinExecutionStrategy(execution); // Hook up dispatchable with buffer and execution strategy JoinExecStrategyDispatchable joinStatementDispatch = new JoinExecStrategyDispatchable(execution, statementSpec.getStreamSpecs().size()); statementContext.getEpStatementHandle().setOptionalDispatchable(joinStatementDispatch); JoinPreloadMethodImpl preloadMethod = new JoinPreloadMethodImpl(streamNames.length, composer); // Create buffer for each view. Point buffer to dispatchable for join. for (int i = 0; i < statementSpec.getStreamSpecs().size(); i++) { BufferView buffer = new BufferView(i); streamViews[i].addView(buffer); buffer.setObserver(joinStatementDispatch); preloadMethod.setBuffer(buffer, i); } return new Pair<Viewable, JoinPreloadMethod>(indicatorView, preloadMethod); } /** * Returns a stream name assigned for each stream, generated if none was supplied. * @param streams - stream specifications * @return array of stream names */ @SuppressWarnings({"StringContatenationInLoop"}) protected static String[] determineStreamNames(List<StreamSpecCompiled> streams) { String[] streamNames = new String[streams.size()]; for (int i = 0; i < streams.size(); i++) { // Assign a stream name for joins, if not supplied streamNames[i] = streams.get(i).getOptionalStreamName(); if (streamNames[i] == null) { streamNames[i] = "stream_" + i; } } return streamNames; } private void validateNodes(StreamTypeService typeService, MethodResolutionService methodResolutionService, ViewResourceDelegate viewResourceDelegate) { if (statementSpec.getFilterRootNode() != null) { ExprNode optionalFilterNode = statementSpec.getFilterRootNode(); // Validate where clause, initializing nodes to the stream ids used try { optionalFilterNode = optionalFilterNode.getValidatedSubtree(typeService, methodResolutionService, viewResourceDelegate, statementContext.getSchedulingService(), statementContext.getVariableService()); statementSpec.setFilterExprRootNode(optionalFilterNode); // Make sure there is no aggregation in the where clause List<ExprAggregateNode> aggregateNodes = new LinkedList<ExprAggregateNode>(); ExprAggregateNode.getAggregatesBottomUp(optionalFilterNode, aggregateNodes); if (!aggregateNodes.isEmpty()) { throw new ExprValidationException("An aggregate function may not appear in a WHERE clause (use the HAVING clause)"); } } catch (ExprValidationException ex) { log.debug(".validateNodes Validation exception for filter=" + optionalFilterNode.toExpressionString(), ex); throw new EPStatementException("Error validating expression: " + ex.getMessage(), statementContext.getExpression()); } } for (int outerJoinCount = 0; outerJoinCount < statementSpec.getOuterJoinDescList().size(); outerJoinCount++) { OuterJoinDesc outerJoinDesc = statementSpec.getOuterJoinDescList().get(outerJoinCount); UniformPair<Integer> streamIdPair = validateOuterJoinPropertyPair(outerJoinDesc.getLeftNode(), outerJoinDesc.getRightNode(), outerJoinCount, typeService, methodResolutionService, viewResourceDelegate); if (outerJoinDesc.getAdditionalLeftNodes() != null) { Set<Integer> streamSet = new HashSet<Integer>(); streamSet.add(streamIdPair.getFirst()); streamSet.add(streamIdPair.getSecond()); for (int i = 0; i < outerJoinDesc.getAdditionalLeftNodes().length; i++) { UniformPair<Integer> streamIdPairAdd = validateOuterJoinPropertyPair(outerJoinDesc.getAdditionalLeftNodes()[i], outerJoinDesc.getAdditionalRightNodes()[i], outerJoinCount, typeService, methodResolutionService, viewResourceDelegate); // make sure all additional properties point to the same two streams if ((!streamSet.contains(streamIdPairAdd.getFirst()) || (!streamSet.contains(streamIdPairAdd.getSecond())))) { String message = "Outer join ON-clause columns must refer to properties of the same joined streams" + " when using multiple columns in the on-clause"; throw new EPStatementException("Error validating expression: " + message, statementContext.getExpression()); } } } } } private UniformPair<Integer> validateOuterJoinPropertyPair(ExprIdentNode leftNode, ExprIdentNode rightNode, int outerJoinCount, StreamTypeService typeService, MethodResolutionService methodResolutionService, ViewResourceDelegate viewResourceDelegate) { // Validate the outer join clause using an artificial equals-node on top. // Thus types are checked via equals. // Sets stream ids used for validated nodes. ExprNode equalsNode = new ExprEqualsNode(false); equalsNode.addChildNode(leftNode); equalsNode.addChildNode(rightNode); try { equalsNode = equalsNode.getValidatedSubtree(typeService, methodResolutionService, viewResourceDelegate, statementContext.getSchedulingService(), statementContext.getVariableService()); } catch (ExprValidationException ex) { log.debug("Validation exception for outer join node=" + equalsNode.toExpressionString(), ex); throw new EPStatementException("Error validating expression: " + ex.getMessage(), statementContext.getExpression()); } // Make sure we have left-hand-side and right-hand-side refering to different streams int streamIdLeft = leftNode.getStreamId(); int streamIdRight = rightNode.getStreamId(); if (streamIdLeft == streamIdRight) { String message = "Outer join ON-clause cannot refer to properties of the same stream"; throw new EPStatementException("Error validating expression: " + message, statementContext.getExpression()); } // Make sure one of the properties refers to the acutual stream currently being joined int expectedStreamJoined = outerJoinCount + 1; if ((streamIdLeft != expectedStreamJoined) && (streamIdRight != expectedStreamJoined)) { String message = "Outer join ON-clause must refer to at least one property of the joined stream" + " for stream " + expectedStreamJoined; throw new EPStatementException("Error validating expression: " + message, statementContext.getExpression()); } // Make sure neither of the streams refer to a 'future' stream String badPropertyName = null; if (streamIdLeft > outerJoinCount + 1) { badPropertyName = leftNode.getResolvedPropertyName(); } if (streamIdRight > outerJoinCount + 1) { badPropertyName = rightNode.getResolvedPropertyName(); } if (badPropertyName != null) { String message = "Outer join ON-clause invalid scope for property" + " '" + badPropertyName + "', expecting the current or a prior stream scope"; throw new EPStatementException("Error validating expression: " + message, statementContext.getExpression()); } return new UniformPair<Integer>(streamIdLeft, streamIdRight); } private Viewable handleSimpleSelect(Viewable view, ResultSetProcessor resultSetProcessor, StatementContext statementContext) throws ExprValidationException { Viewable finalView = view; // Add filter view that evaluates the filter expression if (statementSpec.getFilterRootNode() != null) { FilterExprView filterView = new FilterExprView(statementSpec.getFilterRootNode()); finalView.addView(filterView); finalView = filterView; } OutputProcessView selectView = OutputProcessViewFactory.makeView(resultSetProcessor, statementSpec, statementContext, services.getInternalEventRouter()); finalView.addView(selectView); finalView = selectView; return finalView; } private SubSelectStreamCollection createSubSelectStreams(boolean isJoin) throws ExprValidationException, ViewProcessingException { SubSelectStreamCollection subSelectStreamDesc = new SubSelectStreamCollection(); int subselectStreamNumber = 1024; // Process all subselect expression nodes for (ExprSubselectNode subselect : statementSpec.getSubSelectExpressions()) { StatementSpecCompiled statementSpec = subselect.getStatementSpecCompiled(); if (statementSpec.getStreamSpecs().get(0) instanceof FilterStreamSpecCompiled) { FilterStreamSpecCompiled filterStreamSpec = (FilterStreamSpecCompiled) statementSpec.getStreamSpecs().get(0); // A child view is required to limit the stream if (filterStreamSpec.getViewSpecs().size() == 0) { throw new ExprValidationException("Subqueries require one or more views to limit the stream, consider declaring a length or time window"); } subselectStreamNumber++; // Register filter, create view factories Pair<EventStream, ManagedLock> streamLockPair = services.getStreamService().createStream(filterStreamSpec.getFilterSpec(), services.getFilterService(), statementContext.getEpStatementHandle(), isJoin); Viewable viewable = streamLockPair.getFirst(); ViewFactoryChain viewFactoryChain = services.getViewService().createFactories(subselectStreamNumber, viewable.getEventType(), filterStreamSpec.getViewSpecs(), statementContext); subselect.setRawEventType(viewFactoryChain.getEventType()); // Add lookup to list, for later starts subSelectStreamDesc.add(subselect, subselectStreamNumber, viewable, viewFactoryChain); } else { NamedWindowConsumerStreamSpec namedSpec = (NamedWindowConsumerStreamSpec) statementSpec.getStreamSpecs().get(0); NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(namedSpec.getWindowName()); NamedWindowConsumerView consumerView = processor.addConsumer(namedSpec.getFilterExpressions(), statementContext.getEpStatementHandle(), statementContext.getStatementStopService()); ViewFactoryChain viewFactoryChain = services.getViewService().createFactories(0, consumerView.getEventType(), namedSpec.getViewSpecs(), statementContext); subSelectStreamDesc.add(subselect, subselectStreamNumber, consumerView, viewFactoryChain); } } return subSelectStreamDesc; } private void startSubSelect(SubSelectStreamCollection subSelectStreamDesc, String[] outerStreamNames, EventType outerEventTypes[], List<StopCallback> stopCallbacks) throws ExprValidationException { for (ExprSubselectNode subselect : statementSpec.getSubSelectExpressions()) { StatementSpecCompiled statementSpec = subselect.getStatementSpecCompiled(); StreamSpecCompiled filterStreamSpec = statementSpec.getStreamSpecs().get(0); ViewFactoryChain viewFactoryChain = subSelectStreamDesc.getViewFactoryChain(subselect); EventType eventType = viewFactoryChain.getEventType(); // determine a stream name unless one was supplied String subexpressionStreamName = filterStreamSpec.getOptionalStreamName(); int subselectStreamNumber = subSelectStreamDesc.getStreamNumber(subselect); if (subexpressionStreamName == null) { subexpressionStreamName = "$subselect_" + subselectStreamNumber; } // Named windows don't allow data views if (filterStreamSpec instanceof NamedWindowConsumerStreamSpec) { ViewResourceDelegate viewResourceDelegate = new ViewResourceDelegateImpl(new ViewFactoryChain[] {viewFactoryChain}, statementContext); viewResourceDelegate.requestCapability(0, new NotADataWindowViewCapability(), null); } // Streams event types are the original stream types with the stream zero the subselect stream LinkedHashMap<String, EventType> namesAndTypes = new LinkedHashMap<String, EventType>(); namesAndTypes.put(subexpressionStreamName, eventType); for (int i = 0; i < outerEventTypes.length; i++) { namesAndTypes.put(outerStreamNames[i], outerEventTypes[i]); } StreamTypeService subselectTypeService = new StreamTypeServiceImpl(namesAndTypes, true, true); ViewResourceDelegate viewResourceDelegateSubselect = new ViewResourceDelegateImpl(new ViewFactoryChain[] {viewFactoryChain}, statementContext); // Validate select expression SelectClauseSpecCompiled selectClauseSpec = subselect.getStatementSpecCompiled().getSelectClauseSpec(); AggregationService aggregationService = null; if (selectClauseSpec.getSelectExprList().size() > 0) { SelectClauseElementCompiled element = selectClauseSpec.getSelectExprList().get(0); if (element instanceof SelectClauseExprCompiledSpec) { // validate SelectClauseExprCompiledSpec compiled = (SelectClauseExprCompiledSpec) element; ExprNode selectExpression = compiled.getSelectExpression(); selectExpression = selectExpression.getValidatedSubtree(subselectTypeService, statementContext.getMethodResolutionService(), viewResourceDelegateSubselect, statementContext.getSchedulingService(), statementContext.getVariableService()); subselect.setSelectClause(selectExpression); subselect.setSelectAsName(compiled.getAssignedName()); // handle aggregation List<ExprAggregateNode> aggExprNodes = new LinkedList<ExprAggregateNode>(); ExprAggregateNode.getAggregatesBottomUp(selectExpression, aggExprNodes); if (aggExprNodes.size() > 0) { List<ExprAggregateNode> havingAgg = Collections.emptyList(); List<ExprAggregateNode> orderByAgg = Collections.emptyList(); aggregationService = AggregationServiceFactory.getService(aggExprNodes, havingAgg, orderByAgg, false, null); // Other stream properties, if there is aggregation, cannot be under aggregation. for (ExprAggregateNode aggNode : aggExprNodes) { List<Pair<Integer, String>> propertiesNodesAggregated = getExpressionProperties(aggNode, true); for (Pair<Integer, String> pair : propertiesNodesAggregated) { if (pair.getFirst() != 0) { throw new ExprValidationException("Subselect aggregation function cannot aggregate across correlated properties"); } } } // This stream (stream 0) properties must either all be under aggregation, or all not be. List<Pair<Integer, String>> propertiesNotAggregated = getExpressionProperties(selectExpression, false); for (Pair<Integer, String> pair : propertiesNotAggregated) { if (pair.getFirst() == 0) { throw new ExprValidationException("Subselect properties must all be within aggregation functions"); } } } } } // no aggregation functions allowed in filter if (statementSpec.getFilterRootNode() != null) { List<ExprAggregateNode> aggExprNodesFilter = new LinkedList<ExprAggregateNode>(); ExprAggregateNode.getAggregatesBottomUp(statementSpec.getFilterRootNode(), aggExprNodesFilter); if (aggExprNodesFilter.size() > 0) { throw new ExprValidationException("Aggregation functions are not supported within subquery filters, consider using insert-into instead"); } } // Validate filter expression, if there is one ExprNode filterExpr = statementSpec.getFilterRootNode(); if (filterExpr != null) { filterExpr = filterExpr.getValidatedSubtree(subselectTypeService, statementContext.getMethodResolutionService(), viewResourceDelegateSubselect, statementContext.getSchedulingService(), statementContext.getVariableService()); if (JavaClassHelper.getBoxedType(filterExpr.getType()) != Boolean.class) { throw new ExprValidationException("Subselect filter expression must return a boolean value"); } // check the presence of a correlated filter, not allowed with aggregation ExprNodeIdentifierVisitor visitor = new ExprNodeIdentifierVisitor(true); filterExpr.accept(visitor); List<Pair<Integer, String>> propertiesNodes = visitor.getExprProperties(); for (Pair<Integer, String> pair : propertiesNodes) { if ((pair.getFirst() != 0) && (aggregationService != null)) { throw new ExprValidationException("Subselect filter expression cannot be a correlated expression when aggregating properties via aggregation function"); } } } // Finally create views Viewable viewableRoot = subSelectStreamDesc.getRootViewable(subselect); Viewable subselectView = services.getViewService().createViews(viewableRoot, viewFactoryChain.getViewFactoryChain(), statementContext); // If we do aggregation, then the view results must be added and removed from aggregation final EventTable eventIndex; // Under aggregation conditions, there is no lookup/corelated subquery strategy, and // the view-supplied events are simply aggregated, a null-event supplied to the stream for the select-clause, and not kept in index. // Note that "var1 + max(var2)" is not allowed as some properties are not under aggregation (which event to use?). if (aggregationService != null) { SubselectAggregatorView aggregatorView = new SubselectAggregatorView(aggregationService, filterExpr); subselectView.addView(aggregatorView); subselectView = aggregatorView; eventIndex = null; subselect.setStrategy(new TableLookupStrategyNullRow()); subselect.setFilterExpr(null); // filter not evaluated by subselect expression as not correlated } else { // Determine indexing of the filter expression Pair<EventTable, TableLookupStrategy> indexPair = determineSubqueryIndex(filterExpr, eventType, outerEventTypes, subselectTypeService); subselect.setStrategy(indexPair.getSecond()); subselect.setFilterExpr(filterExpr); eventIndex = indexPair.getFirst(); } // Clear out index on statement stop stopCallbacks.add(new SubqueryStopCallback(eventIndex)); // Preload if (filterStreamSpec instanceof NamedWindowConsumerStreamSpec) { NamedWindowConsumerStreamSpec namedSpec = (NamedWindowConsumerStreamSpec) filterStreamSpec ; NamedWindowProcessor processor = services.getNamedWindowService().getProcessor(namedSpec.getWindowName()); NamedWindowTailView consumerView = processor.getTailView(); // preload view for stream ArrayList<EventBean> eventsInWindow = new ArrayList<EventBean>(); for(Iterator<EventBean> it = consumerView.iterator(); it.hasNext();) { eventsInWindow.add(it.next()); } EventBean[] newEvents = eventsInWindow.toArray(new EventBean[eventsInWindow.size()]); ((View)viewableRoot).update(newEvents, null); // fill view if (eventIndex != null) { eventIndex.add(newEvents); // fill index } } else // preload from the data window that site on top { // Start up event table from the iterator Iterator<EventBean> it = subselectView.iterator(); if ((it != null) && (it.hasNext())) { ArrayList<EventBean> preloadEvents = new ArrayList<EventBean>(); for (;it.hasNext();) { preloadEvents.add(it.next()); } if (eventIndex != null) { eventIndex.add(preloadEvents.toArray(new EventBean[preloadEvents.size()])); } } } // hook up subselect viewable and event table BufferView bufferView = new BufferView(subselectStreamNumber); bufferView.setObserver(new SubselectBufferObserver(eventIndex)); subselectView.addView(bufferView); } } private Pair<EventTable, TableLookupStrategy> determineSubqueryIndex(ExprNode filterExpr, EventType viewableEventType, EventType[] outerEventTypes, StreamTypeService subselectTypeService) throws ExprValidationException { // No filter expression means full table scan if (filterExpr == null) { UnindexedEventTable table = new UnindexedEventTable(0); FullTableScanLookupStrategy strategy = new FullTableScanLookupStrategy(table); return new Pair<EventTable, TableLookupStrategy>(table, strategy); } // analyze query graph QueryGraph queryGraph = new QueryGraph(outerEventTypes.length + 1); FilterExprAnalyzer.analyze(filterExpr, queryGraph); // Build a list of streams and indexes Map<String, JoinedPropDesc> joinProps = new LinkedHashMap<String, JoinedPropDesc>(); boolean mustCoerce = false; for (int stream = 0; stream < outerEventTypes.length; stream++) { int lookupStream = stream + 1; String[] keyPropertiesJoin = queryGraph.getKeyProperties(lookupStream, 0); String[] indexPropertiesJoin = queryGraph.getIndexProperties(lookupStream, 0); if ((keyPropertiesJoin == null) || (keyPropertiesJoin.length == 0)) { continue; } if (keyPropertiesJoin.length != indexPropertiesJoin.length) { throw new IllegalStateException("Invalid query key and index property collection for stream " + stream); } for (int i = 0; i < keyPropertiesJoin.length; i++) { Class keyPropType = JavaClassHelper.getBoxedType(subselectTypeService.getEventTypes()[lookupStream].getPropertyType(keyPropertiesJoin[i])); Class indexedPropType = JavaClassHelper.getBoxedType(subselectTypeService.getEventTypes()[0].getPropertyType(indexPropertiesJoin[i])); Class coercionType = indexedPropType; if (keyPropType != indexedPropType) { coercionType = JavaClassHelper.getCompareToCoercionType(keyPropType, keyPropType); mustCoerce = true; } JoinedPropDesc desc = new JoinedPropDesc(indexPropertiesJoin[i], coercionType, keyPropertiesJoin[i], stream); joinProps.put(indexPropertiesJoin[i], desc); } } if (joinProps.size() != 0) { String indexedProps[] = joinProps.keySet().toArray(new String[0]); int[] keyStreamNums = JoinedPropDesc.getKeyStreamNums(joinProps.values()); String[] keyProps = JoinedPropDesc.getKeyProperties(joinProps.values()); if (!mustCoerce) { PropertyIndexedEventTable table = new PropertyIndexedEventTable(0, viewableEventType, indexedProps); TableLookupStrategy strategy = new IndexedTableLookupStrategy( outerEventTypes, keyStreamNums, keyProps, table); return new Pair<EventTable, TableLookupStrategy>(table, strategy); } else { Class coercionTypes[] = JoinedPropDesc.getCoercionTypes(joinProps.values()); PropertyIndTableCoerceAdd table = new PropertyIndTableCoerceAdd(0, viewableEventType, indexedProps, coercionTypes); TableLookupStrategy strategy = new IndexedTableLookupStrategyCoercing( outerEventTypes, keyStreamNums, keyProps, table, coercionTypes); return new Pair<EventTable, TableLookupStrategy>(table, strategy); } } else { UnindexedEventTable table = new UnindexedEventTable(0); return new Pair<EventTable, TableLookupStrategy>(table, new FullTableScanLookupStrategy(table)); } } // For delete actions from named windows private ExprNode validateJoinNamedWindow(ExprNode deleteJoinExpr, EventType namedWindowType, String namedWindowStreamName, EventType filteredType, String filterStreamName) throws ExprValidationException { if (deleteJoinExpr == null) { return null; } LinkedHashMap<String, EventType> namesAndTypes = new LinkedHashMap<String, EventType>(); namesAndTypes.put(namedWindowStreamName, namedWindowType); namesAndTypes.put(filterStreamName, filteredType); StreamTypeService typeService = new StreamTypeServiceImpl(namesAndTypes, false, false); return deleteJoinExpr.getValidatedSubtree(typeService, statementContext.getMethodResolutionService(), null, statementContext.getSchedulingService(), statementContext.getVariableService()); } private List<Pair<Integer, String>> getExpressionProperties(ExprNode exprNode, boolean visitAggregateNodes) { ExprNodeIdentifierVisitor visitor = new ExprNodeIdentifierVisitor(visitAggregateNodes); exprNode.accept(visitor); return visitor.getExprProperties(); } private static final Log log = LogFactory.getLog(EPStatementStartMethod.class); }

The table below shows all metrics for EPStatementStartMethod.java.

MetricValueDescription
BLOCKS139.00Number of blocks
BLOCK_COMMENT 0.00Number of block comment lines
COMMENTS125.00Comment lines
COMMENT_DENSITY 0.19Comment density
COMPARISONS86.00Number of comparison operators
CYCLOMATIC120.00Cyclomatic complexity
DECL_COMMENTS 5.00Comments in declarations
DOC_COMMENT22.00Number of javadoc comment lines
ELOC656.00Effective lines of code
EXEC_COMMENTS93.00Comments in executable code
EXITS206.00Procedure exits
FUNCTIONS23.00Number of function declarations
HALSTEAD_DIFFICULTY97.66Halstead difficulty
HALSTEAD_EFFORT 0.00Halstead effort
INTERFACE_COMPLEXITY142.00Interface complexity
JAVA0001 0.00JAVA0001 Package name does not contain only lower case letters
JAVA0002 0.00JAVA0002 Package name does not begin with a top level domain name or country code
JAVA0003 0.00JAVA0003 Minimize use of on-demand (.*) imports
JAVA0004 0.00JAVA0004 Unnecessary import from java.lang
JAVA0005 1.00JAVA0005 Imports not in specified order
JAVA0006 0.00JAVA0006 Empty finally block
JAVA0007 0.00JAVA0007 Should not declare public field
JAVA0008 0.00JAVA0008 Empty catch block
JAVA0009 0.00JAVA0009 Protected member in final class
JAVA0010 0.00JAVA0010 Non-instantiable class does not contain a non-private static member
JAVA0011 0.00JAVA0011 Abstract class does not contain an abstract method
JAVA0012 0.00JAVA0012 Non-constructor method with same name as declaring class
JAVA0013 0.00JAVA0013 Non-blank final field is not static
JAVA0014 0.00JAVA0014 Class with only static members has non-private constructor
JAVA0015 0.00JAVA0015 Package class contains public nested type
JAVA0016 0.00JAVA0016 Abstract class contains public constructor
JAVA0017 0.00JAVA0017 Class name does not have required form
JAVA0018 0.00JAVA0018 Method name does not have required form
JAVA0019 0.00JAVA0019 Interface name does not have required form
JAVA0020 0.00JAVA0020 Field name does not have required form
JAVA0021 0.00JAVA0021 Interface method name does not have required form
JAVA0022 0.00JAVA0022 Static final field name does not have required form
JAVA0023 0.00JAVA0023 Empty finalize method
JAVA0024 0.00JAVA0024 Empty class
JAVA0025 0.00JAVA0025 Method override is empty
JAVA0026 0.00JAVA0026 Finalize method with parameters
JAVA0029 0.00JAVA0029 Private method not used
JAVA0030 0.00JAVA0030 Private field not used
JAVA0031 0.00JAVA0031 Case statement not properly closed
JAVA0032 0.00JAVA0032 Switch statement missing default
JAVA0033 0.00JAVA0033 default: not last case in switch statement
JAVA0034 0.00JAVA0034 Missing braces in if statement
JAVA0035 0.00JAVA0035 Missing braces in for statement
JAVA0036 0.00JAVA0036 Missing braces in while statement
JAVA0038 0.00JAVA0038 Non-case label in switch statement
JAVA0039 0.00JAVA0039 Break statement with label
JAVA0040 0.00JAVA0040 Switch statement contains N cases (maximum: M)
JAVA0041 0.00JAVA0041 Nested synchronized block
JAVA0042 0.00JAVA0042 Empty synchronized statement
JAVA0043 0.00JAVA0043 Inner class does not use outer class
JAVA0044 0.00JAVA0044 Serializable class with no instance variables
JAVA0045 0.00JAVA0045 Serializable class with only transient fields
JAVA0046 0.00JAVA0046 Name of class not derived from Exception ends with 'Exception'
JAVA0047 0.00JAVA0047 Serializable class derives from invalid base class
JAVA0048 0.00JAVA0048 Name of class derived from Exception does not end with 'Exception'
JAVA0049 2.00JAVA0049 Nested block at depth N (maximum: M)
JAVA0050 0.00JAVA0050 Class derives from java.lang.Error
JAVA0051 0.00JAVA0051 Class derives from java.lang.RuntimeException
JAVA0052 0.00JAVA0052 Class derives from java.lang.Throwable
JAVA0053 0.00JAVA0053 Unused label
JAVA0054 0.00JAVA0054 Inheritance depth N exceeds maximum M
JAVA0055 0.00JAVA0055 Class should be interface
JAVA0056 0.00JAVA0056 Unnecessary abstract modifier for interface or annotation
JAVA0057 0.00JAVA0057 Unnecessary default constructor
JAVA0058 0.00JAVA0058 Constructor calls super()
JAVA0059 0.00JAVA0059 Method override only calls super()
JAVA0061 0.00JAVA0061 Inaccessible member in anonymous class
JAVA0062 0.00JAVA0062 Public class missing public member or protected constructor
JAVA0063 0.00JAVA0063 Identifier name should not contain '$'
JAVA0064 0.00JAVA0064 N variations of identifier name (maximum: M)
JAVA0065 0.00JAVA0065 Unnecessary final modifier for method in final class
JAVA0066 0.00JAVA0066 Unnecessary modifier for interface nested type
JAVA0067 2.00JAVA0067 Array descriptor on identifier name
JAVA0068 0.00JAVA0068 Modifiers not declared in recommended order
JAVA0071 0.00JAVA0071 Strings compared with ==
JAVA0073 0.00JAVA0073 Integer division in floating-point context
JAVA0074 0.00JAVA0074 Use of Object.notify()
JAVA0075 2.00JAVA0075 Method parameter hides field
JAVA0076 1.00JAVA0076 Use of magic number
JAVA0077 0.00JAVA0077 Private field not used in declaring class
JAVA0078 0.00JAVA0078 Floating point values compared with ==
JAVA0079 0.00JAVA0079 Use of instance to reference static member
JAVA0080 0.00JAVA0080 Import declaration not used
JAVA0081 0.00JAVA0081 Boolean literal in comparison
JAVA0082 0.00JAVA0082 Unnecessary widening cast
JAVA0083 0.00JAVA0083 Unnecessary instanceof test
JAVA0084 0.00JAVA0084 Should use compound assignment operator
JAVA0085 0.00JAVA0085 Use of sun.* class
JAVA0087 0.00JAVA0087 Use of Thread.sleep()
JAVA0089 0.00JAVA0089 Use of restricted package
JAVA0092 0.00JAVA0092 Use of restricted type
JAVA0093 0.00JAVA0093 Redundant assignment
JAVA0094 0.00JAVA0094 Field hides a superclass field
JAVA0095 0.00JAVA0095 Uninitialized private field
JAVA0096 0.00JAVA0096 Field in nested class hides outer field
JAVA0098 0.00JAVA0098 Minimize use of implicit field initializers
JAVA0100 0.00JAVA0100 Class contains N non-final fields (maximum: M)
JAVA0101 0.00JAVA0101 Unnecessary modifier for field in interface
JAVA0102 0.00JAVA0102 Last statement in finalize() not super.finalize()
JAVA0103 0.00JAVA0103 Explicit call to finalize()
JAVA0104 0.00JAVA0104 finalize() only calls super.finalize()
JAVA0105 0.00JAVA0105 Duplicate import declaration
JAVA0106 0.00JAVA0106 Unnecessary import from current package
JAVA0108 0.00JAVA0108 Incorrect javadoc: no @param tag for 'parameter'
JAVA0109 0.00JAVA0109 Incorrect javadoc: no parameter 'parameter'
JAVA0110 0.00JAVA0110 Incorrect javadoc: no @return tag
JAVA0111 0.00JAVA0111 Incorrect javadoc: @return tag for void method
JAVA0112 0.00JAVA0112 Incorrect javadoc: no exception 'exception' in throws
JAVA0113 1.00JAVA0113 Incorrect javadoc: no @author tag
JAVA0114 1.00JAVA0114 Incorrect javadoc: no @version tag
JAVA0115 0.00JAVA0115 Incorrect javadoc: no @throws or @exception tag for 'exception'
JAVA0116 0.00JAVA0116 Missing javadoc: field 'field'
JAVA0117 0.00JAVA0117 Missing javadoc: method 'method'
JAVA0118 0.00JAVA0118 Missing javadoc: type 'type'
JAVA0119 0.00JAVA0119 Control variable changed within body of for loop
JAVA0123 2.00JAVA0123 Use all three components of for loop
JAVA0125 0.00JAVA0125 Continue statement with label
JAVA0126 0.00JAVA0126 Method declares unchecked exception in throws
JAVA0128 0.00JAVA0128 Public constructor in non-public class
JAVA0130 2.00JAVA0130 Non-static method does not use instance fields
JAVA0131 0.00JAVA0131 Compatible method does not override base
JAVA0132 0.00JAVA0132 Method overload with compatible signature
JAVA0133 0.00JAVA0133 Non-synchronized method overrides synchronized method
JAVA0135 0.00JAVA0135 Only one of Object.equals and Object.hashCode defined: missing 'method'
JAVA0136 0.00JAVA0136 N methods defined in class (maximum: M)
JAVA0137 0.00JAVA0137 Non-abstract class missing constructor
JAVA0138 2.00JAVA0138 N parameters defined for method (maximum: M)
JAVA0139 0.00JAVA0139 Definition of main other than public static void main(java.lang.String[])
JAVA0141 0.00JAVA0141 Unnecessary modifier for method in interface
JAVA0143 0.00JAVA0143 Synchronized method
JAVA014470.00JAVA0144 Line exceeds maximum M characters
JAVA0145 0.00JAVA0145 Tab character used in source file
JAVA0150 0.00JAVA0150 java.lang.Error (or subclass) thrown
JAVA0153 0.00JAVA0153 Inefficient conversion of integer to string
JAVA0159 0.00JAVA0159 Inefficient conversion of string to integer
JAVA0160 3.00JAVA0160 Method does not throw specified exception
JAVA0161 0.00JAVA0161 Conditional wait() not in loop
JAVA0163 0.00JAVA0163 Empty statement
JAVA0165 0.00JAVA0165 Conflicting return statement in finally block
JAVA0166 1.00JAVA0166 Generic exception caught
JAVA0167 0.00JAVA0167 ThreadDeath not rethrown
JAVA0169 0.00JAVA0169 Unnecessary catch block: exception 'exception'
JAVA0170 1.00JAVA0170 Caught exception not derived from java.lang.Exception
JAVA0171 0.00JAVA0171 Unused local variable
JAVA0173 0.00JAVA0173 Unused method parameter
JAVA0174 1.00JAVA0174 Assigned local variable never used
JAVA0175 0.00JAVA0175 Successive assignment to variable
JAVA0176 0.00JAVA0176 Local variable name does not have required form
JAVA0177 8.00JAVA0177 Variable declaration missing initializer
JAVA0179 2.00JAVA0179 Local variable hides visible field
JAVA0233 0.00JAVA0233 Definition of serialVersionUID other than 'private static final long serialVersionUID'
JAVA0234 0.00JAVA0234 Class is Serializable but does not define serialVersionUID
JAVA0235 0.00JAVA0235 Class defines serialVersionUID but does not implement Serializable
JAVA0236 0.00JAVA0236 Attempt to clone an object which does not implement Cloneable
JAVA0237 0.00JAVA0237 Class implements Cloneable but does not have public clone method
JAVA0238 0.00JAVA0238 Clone method does not call super.clone()
JAVA0239 0.00JAVA0239 Class declares 'readObject' or 'writeObject' but does not implement Serializable
JAVA0240 0.00JAVA0240 Serializable class which declares readObject or writeObject but not both
JAVA0241 0.00JAVA0241 'readObject' or 'writeObject' should be declared private in Serializable class
JAVA0242 0.00JAVA0242 Transient field in non-Serializable class
JAVA0243 0.00JAVA0243 'readResolve' or 'writeReplace' should be declared private or protected
JAVA0244 0.00JAVA0244 Field or method name in subclass differs only by case from inherited field or method
JAVA0245 0.00JAVA0245 JUnit TestCase with non-trivial constructor
JAVA0246 0.00JAVA0246 JUnit assertXXX statement missing message parameter
JAVA0247 0.00JAVA0247 JUnit 'setUp()' and 'tearDown()' should call super method
JAVA0248 0.00JAVA0248 JUnit method 'setUp' or 'tearDown' with incorrect signature
JAVA0249 0.00JAVA0249 JUnit TestCase 'suite()' should be declared static
JAVA0250 0.00JAVA0250 JUnit TestCase declares testXXX method with incorrect signature
JAVA0251 0.00JAVA0251 Use '%n' for line breaks in printf/format for platform independence
JAVA0252 0.00JAVA0252 'enum' is a Java 1.5 reserved word
JAVA0253 0.00JAVA0253 Not all enum constants consumed in switch statement
JAVA0254 1.00JAVA0254 Use enhanced for loop construct instead of Iterator
JAVA0255 0.00JAVA0255 Result of m